你正在阅读 Celery 3.1 的文档。开发版本文档见: 此处.

celery.backends.base 源代码

# -*- coding: utf-8 -*-
"""
    celery.backends.base
    ~~~~~~~~~~~~~~~~~~~~

    Result backend base classes.

    - :class:`BaseBackend` defines the interface.

    - :class:`BaseDictBackend` assumes the fields are stored in a dict.

    - :class:`KeyValueStoreBackend` is a common base class
      using K/V semantics like _get and _put.

"""
from __future__ import absolute_import

import time
import sys

from datetime import timedelta

from billiard.einfo import ExceptionInfo
from kombu import serialization
from kombu.utils.encoding import bytes_to_str, ensure_bytes, from_utf8

from celery import states
from celery.app import current_task
from celery.datastructures import LRUCache
from celery.exceptions import ChordError, TaskRevokedError, TimeoutError
from celery.result import from_serializable, GroupResult
from celery.utils import timeutils
from celery.utils.serialization import (
    get_pickled_exception,
    get_pickleable_exception,
    create_exception_cls,
)

EXCEPTION_ABLE_CODECS = frozenset(['pickle', 'yaml'])
is_py3k = sys.version_info >= (3, 0)


[文档]def unpickle_backend(cls, args, kwargs): """Returns an unpickled backend.""" return cls(*args, **kwargs)
[文档]class BaseBackend(object): """Base backend class.""" READY_STATES = states.READY_STATES UNREADY_STATES = states.UNREADY_STATES EXCEPTION_STATES = states.EXCEPTION_STATES TimeoutError = TimeoutError #: Time to sleep between polling each individual item #: in `ResultSet.iterate`. as opposed to the `interval` #: argument which is for each pass. subpolling_interval = None #: If true the backend must implement :meth:`get_many`. supports_native_join = False def __init__(self, *args, **kwargs): from celery.app import app_or_default self.app = app_or_default(kwargs.get('app')) self.serializer = kwargs.get('serializer', self.app.conf.CELERY_RESULT_SERIALIZER) (self.content_type, self.content_encoding, self.encoder) = serialization.registry._encoders[self.serializer]
[文档] def encode(self, data): _, _, payload = serialization.encode(data, serializer=self.serializer) return payload
[文档] def decode(self, payload): payload = is_py3k and payload or str(payload) return serialization.decode(payload, content_type=self.content_type, content_encoding=self.content_encoding)
[文档] def prepare_expires(self, value, type=None): if value is None: value = self.app.conf.CELERY_TASK_RESULT_EXPIRES if isinstance(value, timedelta): value = timeutils.timedelta_seconds(value) if value is not None and type: return type(value) return value
[文档] def encode_result(self, result, status): if status in self.EXCEPTION_STATES and isinstance(result, Exception): return self.prepare_exception(result) else: return self.prepare_value(result)
[文档] def store_result(self, task_id, result, status, traceback=None): """Store the result and status of a task.""" raise NotImplementedError( 'store_result is not supported by this backend.')
[文档] def mark_as_started(self, task_id, **meta): """Mark a task as started""" return self.store_result(task_id, meta, status=states.STARTED)
[文档] def mark_as_done(self, task_id, result): """Mark task as successfully executed.""" return self.store_result(task_id, result, status=states.SUCCESS)
[文档] def mark_as_failure(self, task_id, exc, traceback=None): """Mark task as executed with failure. Stores the execption.""" return self.store_result(task_id, exc, status=states.FAILURE, traceback=traceback)
[文档] def fail_from_current_stack(self, task_id, exc=None): type_, real_exc, tb = sys.exc_info() try: exc = real_exc if exc is None else exc ei = ExceptionInfo((type_, exc, tb)) self.mark_as_failure(task_id, exc, ei.traceback) return ei finally: del(tb)
[文档] def mark_as_retry(self, task_id, exc, traceback=None): """Mark task as being retries. Stores the current exception (if any).""" return self.store_result(task_id, exc, status=states.RETRY, traceback=traceback)
[文档] def mark_as_revoked(self, task_id, reason=''): return self.store_result(task_id, TaskRevokedError(reason), status=states.REVOKED, traceback=None)
[文档] def prepare_exception(self, exc): """Prepare exception for serialization.""" if self.serializer in EXCEPTION_ABLE_CODECS: return get_pickleable_exception(exc) return {'exc_type': type(exc).__name__, 'exc_message': str(exc)}
[文档] def exception_to_python(self, exc): """Convert serialized exception to Python exception.""" if self.serializer in EXCEPTION_ABLE_CODECS: return get_pickled_exception(exc) return create_exception_cls(from_utf8(exc['exc_type']), sys.modules[__name__])(exc['exc_message'])
[文档] def prepare_value(self, result): """Prepare value for storage.""" if isinstance(result, GroupResult): return result.serializable() return result
[文档] def forget(self, task_id): raise NotImplementedError('%s does not implement forget.' % ( self.__class__))
[文档] def wait_for(self, task_id, timeout=None, propagate=True, interval=0.5): """Wait for task and return its result. If the task raises an exception, this exception will be re-raised by :func:`wait_for`. If `timeout` is not :const:`None`, this raises the :class:`celery.exceptions.TimeoutError` exception if the operation takes longer than `timeout` seconds. """ time_elapsed = 0.0 while 1: status = self.get_status(task_id) if status == states.SUCCESS: return self.get_result(task_id) elif status in states.PROPAGATE_STATES: result = self.get_result(task_id) if propagate: raise result return result # avoid hammering the CPU checking status. time.sleep(interval) time_elapsed += interval if timeout and time_elapsed >= timeout: raise TimeoutError('The operation timed out.')
[文档] def cleanup(self): """Backend cleanup. Is run by :class:`celery.task.DeleteExpiredTaskMetaTask`.""" pass
[文档] def process_cleanup(self): """Cleanup actions to do at the end of a task worker process.""" pass
[文档] def get_status(self, task_id): """Get the status of a task.""" raise NotImplementedError( 'get_status is not supported by this backend.')
[文档] def get_result(self, task_id): """Get the result of a task.""" raise NotImplementedError( 'get_result is not supported by this backend.')
[文档] def get_children(self, task_id): raise NotImplementedError( 'get_children is not supported by this backend.')
[文档] def get_traceback(self, task_id): """Get the traceback for a failed task.""" raise NotImplementedError( 'get_traceback is not supported by this backend.')
[文档] def save_group(self, group_id, result): """Store the result and status of a task.""" raise NotImplementedError( 'save_group is not supported by this backend.')
[文档] def restore_group(self, group_id, cache=True): """Get the result of a group.""" raise NotImplementedError( 'restore_group is not supported by this backend.')
[文档] def delete_group(self, group_id): raise NotImplementedError( 'delete_group is not supported by this backend.')
[文档] def reload_task_result(self, task_id): """Reload task result, even if it has been previously fetched.""" raise NotImplementedError( 'reload_task_result is not supported by this backend.')
[文档] def reload_group_result(self, task_id): """Reload group result, even if it has been previously fetched.""" raise NotImplementedError( 'reload_group_result is not supported by this backend.')
[文档] def on_chord_part_return(self, task, propagate=True): pass
[文档] def fallback_chord_unlock(self, group_id, body, result=None, countdown=1, **kwargs): kwargs['result'] = [r.id for r in result] self.app.tasks['celery.chord_unlock'].apply_async( (group_id, body, ), kwargs, countdown=countdown, )
on_chord_apply = fallback_chord_unlock
[文档] def current_task_children(self): current = current_task() if current: return [r.serializable() for r in current.request.children]
def __reduce__(self, args=(), kwargs={}): return (unpickle_backend, (self.__class__, args, kwargs))
[文档] def is_cached(self, task_id): return False
[文档]class BaseDictBackend(BaseBackend): def __init__(self, *args, **kwargs): super(BaseDictBackend, self).__init__(*args, **kwargs) self._cache = LRUCache(limit=kwargs.get('max_cached_results') or self.app.conf.CELERY_MAX_CACHED_RESULTS)
[文档] def is_cached(self, task_id): return task_id in self._cache
[文档] def store_result(self, task_id, result, status, traceback=None, **kwargs): """Store task result and status.""" result = self.encode_result(result, status) self._store_result(task_id, result, status, traceback, **kwargs) return result
[文档] def forget(self, task_id): self._cache.pop(task_id, None) self._forget(task_id)
def _forget(self, task_id): raise NotImplementedError('%s does not implement forget.' % ( self.__class__))
[文档] def get_status(self, task_id): """Get the status of a task.""" return self.get_task_meta(task_id)['status']
[文档] def get_traceback(self, task_id): """Get the traceback for a failed task.""" return self.get_task_meta(task_id).get('traceback')
[文档] def get_result(self, task_id): """Get the result of a task.""" meta = self.get_task_meta(task_id) if meta['status'] in self.EXCEPTION_STATES: return self.exception_to_python(meta['result']) else: return meta['result']
[文档] def get_children(self, task_id): """Get the list of subtasks sent by a task.""" try: return self.get_task_meta(task_id)['children'] except KeyError: pass
[文档] def get_task_meta(self, task_id, cache=True): if cache: try: return self._cache[task_id] except KeyError: pass meta = self._get_task_meta_for(task_id) if cache and meta.get('status') == states.SUCCESS: self._cache[task_id] = meta return meta
[文档] def reload_task_result(self, task_id): self._cache[task_id] = self.get_task_meta(task_id, cache=False)
[文档] def reload_group_result(self, group_id): self._cache[group_id] = self.get_group_meta(group_id, cache=False)
[文档] def get_group_meta(self, group_id, cache=True): if cache: try: return self._cache[group_id] except KeyError: pass meta = self._restore_group(group_id) if cache and meta is not None: self._cache[group_id] = meta return meta
[文档] def restore_group(self, group_id, cache=True): """Get the result for a group.""" meta = self.get_group_meta(group_id, cache=cache) if meta: return meta['result']
[文档] def save_group(self, group_id, result): """Store the result of an executed group.""" return self._save_group(group_id, result)
[文档] def delete_group(self, group_id): self._cache.pop(group_id, None) return self._delete_group(group_id)
[文档]class KeyValueStoreBackend(BaseDictBackend): task_keyprefix = ensure_bytes('celery-task-meta-') group_keyprefix = ensure_bytes('celery-taskset-meta-') chord_keyprefix = ensure_bytes('chord-unlock-') implements_incr = False
[文档] def get(self, key): raise NotImplementedError('Must implement the get method.')
[文档] def mget(self, keys): raise NotImplementedError('Does not support get_many')
[文档] def set(self, key, value): raise NotImplementedError('Must implement the set method.')
[文档] def delete(self, key): raise NotImplementedError('Must implement the delete method')
[文档] def incr(self, key): raise NotImplementedError('Does not implement incr')
[文档] def expire(self, key, value): pass
[文档] def get_key_for_task(self, task_id): """Get the cache key for a task by id.""" return self.task_keyprefix + ensure_bytes(task_id)
[文档] def get_key_for_group(self, group_id): """Get the cache key for a group by id.""" return self.group_keyprefix + ensure_bytes(group_id)
[文档] def get_key_for_chord(self, group_id): """Get the cache key for the chord waiting on group with given id.""" return self.chord_keyprefix + ensure_bytes(group_id)
def _strip_prefix(self, key): """Takes bytes, emits string.""" key = ensure_bytes(key) for prefix in self.task_keyprefix, self.group_keyprefix: if key.startswith(prefix): return bytes_to_str(key[len(prefix):]) return bytes_to_str(key) def _mget_to_results(self, values, keys): if hasattr(values, 'items'): # client returns dict so mapping preserved. return dict((self._strip_prefix(k), self.decode(v)) for k, v in values.iteritems() if v is not None) else: # client returns list so need to recreate mapping. return dict((bytes_to_str(keys[i]), self.decode(value)) for i, value in enumerate(values) if value is not None)
[文档] def get_many(self, task_ids, timeout=None, interval=0.5): ids = set(task_ids) cached_ids = set() for task_id in ids: try: cached = self._cache[task_id] except KeyError: pass else: if cached['status'] in states.READY_STATES: yield bytes_to_str(task_id), cached cached_ids.add(task_id) ids ^= cached_ids iterations = 0 while ids: keys = list(ids) r = self._mget_to_results(self.mget([self.get_key_for_task(k) for k in keys]), keys) self._cache.update(r) ids ^= set(map(bytes_to_str, r)) for key, value in r.iteritems(): yield bytes_to_str(key), value if timeout and iterations * interval >= timeout: raise TimeoutError('Operation timed out (%s)' % (timeout, )) time.sleep(interval) # don't busy loop. iterations += 1
def _forget(self, task_id): self.delete(self.get_key_for_task(task_id)) def _store_result(self, task_id, result, status, traceback=None): meta = {'status': status, 'result': result, 'traceback': traceback, 'children': self.current_task_children()} self.set(self.get_key_for_task(task_id), self.encode(meta)) return result def _save_group(self, group_id, result): self.set(self.get_key_for_group(group_id), self.encode({'result': result.serializable()})) return result def _delete_group(self, group_id): self.delete(self.get_key_for_group(group_id)) def _get_task_meta_for(self, task_id): """Get task metadata for a task by id.""" meta = self.get(self.get_key_for_task(task_id)) if not meta: return {'status': states.PENDING, 'result': None} return self.decode(meta) def _restore_group(self, group_id): """Get task metadata for a task by id.""" meta = self.get(self.get_key_for_group(group_id)) # previously this was always pickled, but later this # was extended to support other serializers, so the # structure is kind of weird. if meta: meta = self.decode(meta) result = meta['result'] if isinstance(result, (list, tuple)): return {'result': from_serializable(result)} return meta
[文档] def on_chord_apply(self, group_id, body, result=None, **kwargs): if self.implements_incr: self.app.GroupResult(group_id, result).save() else: self.fallback_chord_unlock(group_id, body, result, **kwargs)
[文档] def on_chord_part_return(self, task, propagate=None): if not self.implements_incr: return from celery import subtask from celery.result import GroupResult if propagate is None: propagate = self.app.conf.CELERY_CHORD_PROPAGATES gid = task.request.group if not gid: return key = self.get_key_for_chord(gid) deps = GroupResult.restore(gid, backend=task.backend) val = self.incr(key) if val >= len(deps): j = deps.join_native if deps.supports_native_join else deps.join callback = subtask(task.request.chord) try: ret = j(propagate=propagate) except Exception, exc: culprit = deps._failed_join_report().next() self.app._tasks[callback.task].backend.fail_from_current_stack( callback.id, exc=ChordError('Dependency %s raised %r' % ( culprit.id, exc)) ) else: callback.delay(ret) finally: deps.delete() self.client.delete(key) else: self.expire(key, 86400)
[文档]class DisabledBackend(BaseBackend): _cache = {} # need this attribute to reset cache in tests.
[文档] def store_result(self, *args, **kwargs): pass
def _is_disabled(self, *args, **kwargs): raise NotImplementedError( 'No result backend configured. ' 'Please see the documentation for more information.') wait_for = get_status = get_result = get_traceback = _is_disabled