你正在阅读 Celery 3.1 的文档。开发版本文档见: 此处.

celery._state 源代码

# -*- coding: utf-8 -*-

    This is an internal module containing thread state
    like the ``current_app``, and ``current_task``.

    This module shouldn't be used directly.

from __future__ import absolute_import

import os
import threading
import weakref

from celery.local import Proxy
from celery.utils.threads import LocalStack

#: Global default app used when no current app.
default_app = None

#: List of all app instances (weakrefs), must not be used directly.
_apps = set()

class _TLS(threading.local):
    #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
    #: sets this, so it will always contain the last instantiated app,
    #: and is the default app returned by :func:`app_or_default`.
    current_app = None
_tls = _TLS()

_task_stack = LocalStack()

[文档]def set_default_app(app): global default_app default_app = app
[文档]def get_current_app(): if default_app is None: #: creates the global fallback app instance. from celery.app import Celery set_default_app(Celery( 'default', loader=os.environ.get('CELERY_LOADER') or 'default', set_as_current=False, accept_magic_kwargs=True, )) return _tls.current_app or default_app
[文档]def get_current_task(): """Currently executing task.""" return _task_stack.top
[文档]def get_current_worker_task(): """Currently executing task, that was applied by the worker. This is used to differentiate between the actual task executed by the worker and any task that was called within a task (using ``task.__call__`` or ``task.apply``) """ for task in reversed(_task_stack.stack): if not task.request.called_directly: return task #: Proxy to current app.
current_app = Proxy(get_current_app) #: Proxy to current task. current_task = Proxy(get_current_task) def _register_app(app): _apps.add(weakref.ref(app)) def _get_active_apps(): dirty = [] try: for appref in _apps: app = appref() if app is None: dirty.append(appref) else: yield app finally: while dirty: _apps.discard(dirty.pop())