你正在阅读 Celery 3.1 的文档。开发版本文档见: 此处.
# -*- coding: utf-8 -*-
"""
celery._state
~~~~~~~~~~~~~~~
This is an internal module containing thread state
like the ``current_app``, and ``current_task``.
This module shouldn't be used directly.
"""
from __future__ import absolute_import
import os
import threading
import weakref
from celery.local import Proxy
from celery.utils.threads import LocalStack
#: Global default app used when no current app.
default_app = None
#: List of all app instances (weakrefs), must not be used directly.
_apps = set()
class _TLS(threading.local):
#: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
#: sets this, so it will always contain the last instantiated app,
#: and is the default app returned by :func:`app_or_default`.
current_app = None
_tls = _TLS()
_task_stack = LocalStack()
[文档]def get_current_app():
if default_app is None:
#: creates the global fallback app instance.
from celery.app import Celery
set_default_app(Celery(
'default',
loader=os.environ.get('CELERY_LOADER') or 'default',
set_as_current=False, accept_magic_kwargs=True,
))
return _tls.current_app or default_app
[文档]def get_current_worker_task():
"""Currently executing task, that was applied by the worker.
This is used to differentiate between the actual task
executed by the worker and any task that was called within
a task (using ``task.__call__`` or ``task.apply``)
"""
for task in reversed(_task_stack.stack):
if not task.request.called_directly:
return task
#: Proxy to current app.
current_app = Proxy(get_current_app)
#: Proxy to current task.
current_task = Proxy(get_current_task)
def _register_app(app):
_apps.add(weakref.ref(app))
def _get_active_apps():
dirty = []
try:
for appref in _apps:
app = appref()
if app is None:
dirty.append(appref)
else:
yield app
finally:
while dirty:
_apps.discard(dirty.pop())