Celery and asyncio: A Guide to Bridging the Synchronous World
How to use asyncio with Celery workers
Disclosure: This was inspired by real bugs I faced at inventive.ai. LLMs were used to debug the issue and create this blog post (along with the silly analogies). But I have personally proof read this blog post. Perhaps, I can call myself the author?
Here is the premise: You have a robust Celery setup, diligently processing background tasks. You decide to incorporate a modern Python library. Perhaps a new database driver, a third-party API client, or an agentic framework. You find it has a convenient do_stuff_sync()
function, which you call from your Celery task.
And then your worker explodes:
RuntimeError: There is no current event loop in thread 'MainThread'.
If you've encountered this, you've stumbled into the subtle but critical gap between the synchronous world of a standard Celery worker and the asynchronous world of asyncio
. This article will demystify this error, explore the underlying concepts, and provide a layered, production-ready pattern for making these two worlds coexist peacefully.
The Clash of Two Worlds
The root of the problem lies in two fundamentally different models of concurrency.
The Celery prefork
Worker: The Factory Worker
Think of a Celery worker process (using the default prefork
model) as a factory worker. It's a simple, reliable, and isolated process. It's given a task, it executes the instructions from top to bottom, and when it's done, it waits for the next task. Crucially, this worker doesn't come with a "personal assistant" to juggle tasks; it does one thing at a time. By default, it has no concept of an asyncio
event loop.
The asyncio Event Loop: The Juggling Manager
An asyncio
event loop is like a highly efficient manager assigned to a single worker. It allows that worker to handle many I/O-bound tasks (like waiting for network requests or database queries) concurrently. When the worker starts a task that involves waiting, it hands it to the manager and immediately starts on other work. The manager keeps track of all the waiting tasks and notifies the worker the moment one is ready to proceed.
This "manager" is not a separate thread; it's a system that runs within a single thread. The problem is, it must be explicitly hired and assigned to the thread.
The error: There is no current event loop
happens because the async- native library you called needs its "manager" to function, but the Celery factory worker it's running in never hired one. The library's attempt to find its manager fails.
The First Step - Taking Control of the Loop
The logical first step is to hire the manager ourselves. The most robust way to do this from a synchronous function is to anticipate that the manager might not be there and create one if needed.
Let's create a generic helper function for this.
# in a utils.py or helpers.py file
import asyncio
def run_async_in_sync(coroutine):
try:
# Try to get the existing event loop for this thread.
loop = asyncio.get_event_loop()
except RuntimeError as e:
# If it fails, it's likely because no loop exists.
if "There is no current event loop" in str(e):
# So, we create a new one...
loop = asyncio.new_event_loop()
# ...and set it as the official loop for this thread.
asyncio.set_event_loop(loop)
else:
raise
# Now we can safely run our coroutine on the loop.
return loop.run_until_complete(coroutine)
# --- In your tasks.py ---
from .utils import run_async_in_sync
@app.task
def my_celery_task(api_endpoint: str):
# Some async library function
async def fetch_data(url: str):
# ... logic using httpx, aiohttp, etc. ...
return "some data"
result = run_async_in_sync(fetch_data(api_endpoint))
return result
This try-new-set
pattern is the core of the solution. It authoritatively ensures that an event loop is available, making it safe to call from any synchronous context.
The Architect's Approach - Centralizing with Celery Signals
While the helper function works, repeating it or its logic everywhere isn't ideal. We can do better by preparing our environment the moment it's created. Celery Signals are the perfect tool for this.
The @worker_process_init
signal fires once every time a prefork
worker process is initialized. By connecting to this signal, we can ensure an event loop is ready and waiting before the worker even receives its first task.
@worker_process_init.connect
def setup_async_loop_for_worker(**kwargs):
"""
This function is called when a Celery worker process starts.
It ensures an asyncio event loop is available in that process.
"""
try:
asyncio.get_event_loop()
except RuntimeError as e:
if "There is no current event loop" in str(e):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
else:
raise
With this in place, our task code becomes beautifully simple. We can now safely assume the loop exists.
# --- In your tasks.py ---
@app.task
def my_celery_task(api_endpoint: str):
async def fetch_data(url: str):
# ... async logic ...
return "some data"
# We can now get the loop with confidence.
loop = asyncio.get_event_loop()
result = loop.run_until_complete(fetch_data(api_endpoint))
return result
The Final Boss - The Library with Its Own Threads
You have the perfect setup. Your tasks run flawlessly. Then, you integrate a complex third-party library—like a graph execution framework—and a new, but familiar, error appears:
RuntimeError: There is no current event loop in thread ThreadPoolExecutor
The thread name is the clue. The library you're using has created its own pool of worker threads to manage its internal work. Our Celery signal only configured the worker's main thread. These new threads created by the library are clean slates; they don't have an event loop.
The solution is to make the code that needs the event loop responsible for creating it. The dependency should be self-sufficient. If you have an AsyncAPIClient
that makes async
calls, it should not assume an event loop exists. It should ensure it.
# In your async_client.py
import asyncio
import httpx
class AsyncAPIClient:
def __init__(self, base_url: str):
self._client = httpx.AsyncClient(base_url=base_url)
self.event_loop = self._get_or_create_event_loop()
def _get_or_create_event_loop(self):
"""Make this client thread-safe and environment-agnostic."""
try:
return asyncio.get_event_loop()
except RuntimeError as e:
if "There is no current event loop" in str(e):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
else:
raise
async def _fetch_data(self, endpoint: str):
response = await self._client.get(endpoint)
return response.json()
def fetch_data_sync(self, endpoint: str):
"""A sync-callable method for convenience."""
return self.event_loop.run_until_complete(self._fetch_data(endpoint))
# ... async close method, etc.
By placing the try-new-set
logic inside the client's __init__
, you make it robust. It no longer matters if it's instantiated in Celery's main thread or a library's temporary thread pool; it will always work.
Conclusion
Running asyncio
code inside a synchronous Celery worker is a common requirement in modern Python development. While seemingly complex, the solution is built on a clear principle: explicitly manage the event loop lifecycle.
Bridge the Gap: Use a robust
try-new-set
pattern to safely run anasync
function from async
context.Centralize Setup: For cleaner architecture, use Celery's
@worker_process_init
signal to prepare each worker process with an event loop.Encapsulate Dependencies: When dealing with multi-threaded libraries, make your async-dependent classes self-sufficient by having them manage their own event loop access.
By following these patterns, you can confidently integrate the power and efficiency of the asyncio
ecosystem into the robust, distributed world of Celery.