你正在阅读 Celery 3.1 的文档。开发版本文档见: 此处.
# -*- coding: utf-8 -*-
App utilities: Compat settings, bugreport tool, pickling apps.
from __future__ import absolute_import
import os
import platform as _platform
import types
from celery import platforms
from celery.datastructures import ConfigurationView, DictAttribute
from celery.utils.text import pretty
from celery.utils.imports import qualname
from .defaults import find
#: Format used to generate bugreport information.
software -> celery:%(celery_v)s kombu:%(kombu_v)s py:%(py_v)s
billiard:%(billiard_v)s %(driver_v)s
platform -> system:%(system)s arch:%(arch)s imp:%(py_i)s
loader -> %(loader)s
settings -> transport:%(transport)s results:%(results)s
[文档]class Settings(ConfigurationView):
"""Celery settings object."""
[文档] def BROKER_TRANSPORT(self):
return self.first('BROKER_TRANSPORT',
[文档] def BROKER_BACKEND(self):
"""Deprecated compat alias to :attr:`BROKER_TRANSPORT`."""
[文档] def BROKER_HOST(self):
return (os.environ.get('CELERY_BROKER_URL') or
self.first('BROKER_URL', 'BROKER_HOST'))
[文档] def CELERY_TIMEZONE(self):
# this way we also support django's time zone.
return self.first('CELERY_TIMEZONE', 'TIME_ZONE')
[文档] def without_defaults(self):
"""Returns the current configuration, but without defaults."""
# the last stash is the default settings, so just skip that
return Settings({}, self._order[:-1])
def _pickleable_changes(self):
# attempt to include keys from configuration modules,
# to work with multiprocessing execv/fork emulation.
# see note at celery.app.base:Celery.__reduce_args__.
R = {}
for d in reversed(self._order[:-1]):
if isinstance(d, DictAttribute):
d = object.__getattribute__(d, 'obj')
if isinstance(d, types.ModuleType):
d = dict((k, v) for k, v in vars(d).iteritems()
if not k.startswith('__') and k.isupper())
return R
[文档] def find_option(self, name, namespace='celery'):
"""Search for option by name.
Will return ``(namespace, option_name, Option)`` tuple, e.g.::
>>> celery.conf.find_option('disable_rate_limits')
<Option: type->bool default->False>))
:param name: Name of option, cannot be partial.
:keyword namespace: Preferred namespace (``CELERY`` by default).
return find(name, namespace)
[文档] def find_value_for_key(self, name, namespace='celery'):
"""Shortcut to ``get_by_parts(*find_option(name)[:-1])``"""
return self.get_by_parts(*self.find_option(name, namespace)[:-1])
[文档] def get_by_parts(self, *parts):
"""Returns the current value for setting specified as a path.
>>> celery.conf.get_by_parts('CELERY', 'DISABLE_RATE_LIMITS')
return self['_'.join(filter(None, parts))]
[文档] def humanize(self):
"""Returns a human readable string showing changes to the
return '\n'.join(
'%s %s' % (key + ':', pretty(value, width=50))
for key, value in self.without_defaults().iteritems())
[文档]class AppPickler(object):
"""Default application pickler/unpickler."""
def __call__(self, cls, *args):
kwargs = self.build_kwargs(*args)
app = self.construct(cls, **kwargs)
self.prepare(app, **kwargs)
return app
[文档] def build_standard_kwargs(self, main, changes, loader, backend, amqp,
events, log, control, accept_magic_kwargs):
return dict(main=main, loader=loader, backend=backend, amqp=amqp,
changes=changes, events=events, log=log, control=control,
def _unpickle_app(cls, pickler, *args):
return pickler()(cls, *args)
[文档]def bugreport(app):
"""Returns a string containing information useful in bug reports."""
import billiard
import celery
import kombu
conn = app.connection()
driver_v = '%s:%s' % (conn.transport.driver_name,
transport = conn.transport_cls
except Exception:
transport = driver_v = ''
'system': _platform.system(),
'arch': ', '.join(filter(None, _platform.architecture())),
'py_i': platforms.pyimplementation(),
'celery_v': celery.VERSION_BANNER,
'kombu_v': kombu.__version__,
'billiard_v': billiard.__version__,
'py_v': _platform.python_version(),
'driver_v': driver_v,
'transport': transport,
'results': app.conf.CELERY_RESULT_BACKEND or 'disabled',
'human_settings': app.conf.humanize(),
'loader': qualname(app.loader.__class__),