你正在阅读 Celery 3.1 的文档。开发版本文档见: 此处.
# -*- coding: utf-8 -*-
"""
celery.backends.rpc
~~~~~~~~~~~~~~~~~~~
RPC-style result backend, using reply-to and one queue per client.
"""
from __future__ import absolute_import
from kombu import Consumer, Exchange
from kombu.common import maybe_declare
from kombu.utils import cached_property
from celery import current_task
from celery.backends import amqp
__all__ = ['RPCBackend']
[文档]class RPCBackend(amqp.AMQPBackend):
persistent = False
def _create_exchange(self, name, type='direct', delivery_mode=2):
# uses direct to queue routing (anon exchange).
return Exchange(None)
[文档] def on_task_call(self, producer, task_id):
maybe_declare(self.binding(producer.channel), retry=True)
def _create_binding(self, task_id):
return self.binding
def _many_bindings(self, ids):
return [self.binding]
[文档] def destination_for(self, task_id, request):
# Request is a new argument for backends, so must still support
# old code that rely on current_task
try:
request = request or current_task.request
except AttributeError:
raise RuntimeError(
'RPC backend missing task request for {0!r}'.format(task_id),
)
return request.reply_to, request.correlation_id or task_id
@property
[文档] def binding(self):
return self.Queue(self.oid, self.exchange, self.oid,
durable=False, auto_delete=False)
@cached_property