你正在阅读 Celery 3.1 的文档。开发版本文档见: 此处.

celery.app.routes 源代码

# -*- coding: utf-8 -*-
"""
    celery.routes
    ~~~~~~~~~~~~~

    Contains utilities for working with task routers,
    (:setting:`CELERY_ROUTES`).

"""
from __future__ import absolute_import

from celery.exceptions import QueueNotFound
from celery.utils import lpmerge
from celery.utils.functional import firstmethod, mpromise
from celery.utils.imports import instantiate

_first_route = firstmethod('route_for_task')


[文档]class MapRoute(object): """Creates a router out of a :class:`dict`.""" def __init__(self, map): self.map = map
[文档] def route_for_task(self, task, *args, **kwargs): route = self.map.get(task) if route: return dict(route)
[文档]class Router(object): def __init__(self, routes=None, queues=None, create_missing=False, app=None): self.app = app self.queues = {} if queues is None else queues self.routes = [] if routes is None else routes self.create_missing = create_missing
[文档] def route(self, options, task, args=(), kwargs={}): options = self.expand_destination(options) # expands 'queue' if self.routes: route = self.lookup_route(task, args, kwargs) if route: # expands 'queue' in route. return lpmerge(self.expand_destination(route), options) if 'queue' not in options: options = lpmerge(self.expand_destination( self.app.conf.CELERY_DEFAULT_QUEUE), options) return options
[文档] def expand_destination(self, route): # Route can be a queue name: convenient for direct exchanges. if isinstance(route, basestring): queue, route = route, {} else: # can use defaults from configured queue, but override specific # things (like the routing_key): great for topic exchanges. queue = route.pop('queue', None) if queue: try: Q = self.queues[queue] # noqa except KeyError: if not self.create_missing: raise QueueNotFound( 'Queue %r is not defined in CELERY_QUEUES' % queue) for key in 'exchange', 'routing_key': if route.get(key) is None: route[key] = queue Q = self.app.amqp.queues.add(queue, **route) # needs to be declared by publisher route['queue'] = Q return route
[文档] def lookup_route(self, task, args=None, kwargs=None): return _first_route(self.routes, task, args, kwargs)
[文档]def prepare(routes): """Expands the :setting:`CELERY_ROUTES` setting.""" def expand_route(route): if isinstance(route, dict): return MapRoute(route) if isinstance(route, basestring): return mpromise(instantiate, route) return route if routes is None: return () if not isinstance(routes, (list, tuple)): routes = (routes, ) return map(expand_route, routes)