你正在阅读 Celery 3.1 的文档。开发版本文档见: 此处.

celery.backends.cache 源代码

# -*- coding: utf-8 -*-
"""
    celery.backends.cache
    ~~~~~~~~~~~~~~~~~~~~~

    Memcache and in-memory cache result backend.

"""
from __future__ import absolute_import

from kombu.utils import cached_property

from celery.datastructures import LRUCache
from celery.exceptions import ImproperlyConfigured

from .base import KeyValueStoreBackend

_imp = [None]

REQUIRES_BACKEND = """\
The memcached backend requires either pylibmc or python-memcached.\
"""

UNKNOWN_BACKEND = """\
The cache backend %r is unknown,
Please use one of the following backends instead: %s\
"""


[文档]def import_best_memcache(): if _imp[0] is None: is_pylibmc = False try: import pylibmc as memcache is_pylibmc = True except ImportError: try: import memcache # noqa except ImportError: raise ImproperlyConfigured(REQUIRES_BACKEND) _imp[0] = (is_pylibmc, memcache) return _imp[0]
[文档]def get_best_memcache(*args, **kwargs): behaviors = kwargs.pop('behaviors', None) is_pylibmc, memcache = import_best_memcache() client = memcache.Client(*args, **kwargs) if is_pylibmc and behaviors is not None: client.behaviors = behaviors return client
[文档]class DummyClient(object): def __init__(self, *args, **kwargs): self.cache = LRUCache(limit=5000)
[文档] def get(self, key, *args, **kwargs): return self.cache.get(key)
[文档] def get_multi(self, keys): cache = self.cache return dict((k, cache[k]) for k in keys if k in cache)
[文档] def set(self, key, value, *args, **kwargs): self.cache[key] = value
[文档] def delete(self, key, *args, **kwargs): self.cache.pop(key, None)
[文档] def incr(self, key, delta=1): return self.cache.incr(key, delta)
backends = {'memcache': lambda: get_best_memcache, 'memcached': lambda: get_best_memcache, 'pylibmc': lambda: get_best_memcache, 'memory': lambda: DummyClient}
[文档]class CacheBackend(KeyValueStoreBackend): servers = None supports_native_join = True implements_incr = True def __init__(self, expires=None, backend=None, options={}, **kwargs): super(CacheBackend, self).__init__(self, **kwargs) self.options = dict(self.app.conf.CELERY_CACHE_BACKEND_OPTIONS, **options) self.backend = backend or self.app.conf.CELERY_CACHE_BACKEND if self.backend: self.backend, _, servers = self.backend.partition('://') self.servers = servers.rstrip('/').split(';') self.expires = self.prepare_expires(expires, type=int) try: self.Client = backends[self.backend]() except KeyError: raise ImproperlyConfigured(UNKNOWN_BACKEND % ( self.backend, ', '.join(backends)))
[文档] def get(self, key): return self.client.get(key)
[文档] def mget(self, keys): return self.client.get_multi(keys)
[文档] def set(self, key, value): return self.client.set(key, value, self.expires)
[文档] def delete(self, key): return self.client.delete(key)
[文档] def on_chord_apply(self, group_id, body, result=None, **kwargs): self.client.set(self.get_key_for_chord(group_id), '0', time=86400)
[文档] def incr(self, key): return self.client.incr(key)
@cached_property
[文档] def client(self): return self.Client(self.servers, **self.options)
def __reduce__(self, args=(), kwargs={}): servers = ';'.join(self.servers) backend = '%s://%s/' % (self.backend, servers) kwargs.update( dict(backend=backend, expires=self.expires, options=self.options)) return super(CacheBackend, self).__reduce__(args, kwargs)