你正在阅读 Celery 3.1 的文档。开发版本文档见: 此处.

celery.bin.amqp 源代码

# -*- coding: utf-8 -*-
"""
The :program:`celery amqp` command.

.. program:: celery amqp

"""
from __future__ import absolute_import, print_function, unicode_literals

import cmd
import sys
import shlex
import pprint

from functools import partial
from itertools import count

from amqp import Message
from kombu.utils.encoding import safe_str

from celery.utils.functional import padlist

from celery.bin.base import Command
from celery.five import string_t
from celery.utils import strtobool

__all__ = ['AMQPAdmin', 'AMQShell', 'Spec', 'amqp']

# Map to coerce strings to other types.
COERCE = {bool: strtobool}

HELP_HEADER = """
Commands
--------
""".rstrip()

EXAMPLE_TEXT = """
Example:
    -> queue.delete myqueue yes no
"""

say = partial(print, file=sys.stderr)


[文档]class Spec(object): """AMQP Command specification. Used to convert arguments to Python values and display various help and tooltips. :param args: see :attr:`args`. :keyword returns: see :attr:`returns`. .. attribute args:: List of arguments this command takes. Should contain `(argument_name, argument_type)` tuples. .. attribute returns: Helpful human string representation of what this command returns. May be :const:`None`, to signify the return type is unknown. """ def __init__(self, *args, **kwargs): self.args = args self.returns = kwargs.get('returns')
[文档] def coerce(self, index, value): """Coerce value for argument at index.""" arg_info = self.args[index] arg_type = arg_info[1] # Might be a custom way to coerce the string value, # so look in the coercion map. return COERCE.get(arg_type, arg_type)(value)
[文档] def str_args_to_python(self, arglist): """Process list of string arguments to values according to spec. e.g: >>> spec = Spec([('queue', str), ('if_unused', bool)]) >>> spec.str_args_to_python('pobox', 'true') ('pobox', True) """ return tuple( self.coerce(index, value) for index, value in enumerate(arglist))
[文档] def format_response(self, response): """Format the return value of this command in a human-friendly way.""" if not self.returns: return 'ok.' if response is None else response if callable(self.returns): return self.returns(response) return self.returns.format(response)
[文档] def format_arg(self, name, type, default_value=None): if default_value is not None: return '{0}:{1}'.format(name, default_value) return name
[文档] def format_signature(self): return ' '.join(self.format_arg(*padlist(list(arg), 3)) for arg in self.args)
def dump_message(message): if message is None: return 'No messages in queue. basic.publish something.' return {'body': message.body, 'properties': message.properties, 'delivery_info': message.delivery_info} def format_declare_queue(ret): return 'ok. queue:{0} messages:{1} consumers:{2}.'.format(*ret)
[文档]class AMQShell(cmd.Cmd): """AMQP API Shell. :keyword connect: Function used to connect to the server, must return connection object. :keyword silent: If :const:`True`, the commands won't have annoying output not relevant when running in non-shell mode. .. attribute: builtins Mapping of built-in command names -> method names .. attribute:: amqp Mapping of AMQP API commands and their :class:`Spec`. """ conn = None chan = None prompt_fmt = '{self.counter}> ' identchars = cmd.IDENTCHARS = '.' needs_reconnect = False counter = 1 inc_counter = count(2) builtins = {'EOF': 'do_exit', 'exit': 'do_exit', 'help': 'do_help'} amqp = { 'exchange.declare': Spec(('exchange', str), ('type', str), ('passive', bool, 'no'), ('durable', bool, 'no'), ('auto_delete', bool, 'no'), ('internal', bool, 'no')), 'exchange.delete': Spec(('exchange', str), ('if_unused', bool)), 'queue.bind': Spec(('queue', str), ('exchange', str), ('routing_key', str)), 'queue.declare': Spec(('queue', str), ('passive', bool, 'no'), ('durable', bool, 'no'), ('exclusive', bool, 'no'), ('auto_delete', bool, 'no'), returns=format_declare_queue), 'queue.delete': Spec(('queue', str), ('if_unused', bool, 'no'), ('if_empty', bool, 'no'), returns='ok. {0} messages deleted.'), 'queue.purge': Spec(('queue', str), returns='ok. {0} messages deleted.'), 'basic.get': Spec(('queue', str), ('no_ack', bool, 'off'), returns=dump_message), 'basic.publish': Spec(('msg', Message), ('exchange', str), ('routing_key', str), ('mandatory', bool, 'no'), ('immediate', bool, 'no')), 'basic.ack': Spec(('delivery_tag', int)), } def __init__(self, *args, **kwargs): self.connect = kwargs.pop('connect') self.silent = kwargs.pop('silent', False) self.out = kwargs.pop('out', sys.stderr) cmd.Cmd.__init__(self, *args, **kwargs) self._reconnect()
[文档] def note(self, m): """Say something to the user. Disabled if :attr:`silent`.""" if not self.silent: say(m, file=self.out)
[文档] def say(self, m): say(m, file=self.out)
[文档] def get_amqp_api_command(self, cmd, arglist): """With a command name and a list of arguments, convert the arguments to Python values and find the corresponding method on the AMQP channel object. :returns: tuple of `(method, processed_args)`. """ spec = self.amqp[cmd] args = spec.str_args_to_python(arglist) attr_name = cmd.replace('.', '_') if self.needs_reconnect: self._reconnect() return getattr(self.chan, attr_name), args, spec.format_response
[文档] def do_exit(self, *args): """The `'exit'` command.""" self.note("\n-> please, don't leave!") sys.exit(0)
[文档] def display_command_help(self, cmd, short=False): spec = self.amqp[cmd] self.say('{0} {1}'.format(cmd, spec.format_signature()))
[文档] def do_help(self, *args): if not args: self.say(HELP_HEADER) for cmd_name in self.amqp: self.display_command_help(cmd_name, short=True) self.say(EXAMPLE_TEXT) else: self.display_command_help(args[0])
[文档] def default(self, line): self.say("unknown syntax: {0!r}. how about some 'help'?".format(line))
[文档] def get_names(self): return set(self.builtins) | set(self.amqp)
[文档] def completenames(self, text, *ignored): """Return all commands starting with `text`, for tab-completion.""" names = self.get_names() first = [cmd for cmd in names if cmd.startswith(text.replace('_', '.'))] if first: return first return [cmd for cmd in names if cmd.partition('.')[2].startswith(text)]
[文档] def dispatch(self, cmd, argline): """Dispatch and execute the command. Lookup order is: :attr:`builtins` -> :attr:`amqp`. """ arglist = shlex.split(safe_str(argline)) if cmd in self.builtins: return getattr(self, self.builtins[cmd])(*arglist) fun, args, formatter = self.get_amqp_api_command(cmd, arglist) return formatter(fun(*args))
[文档] def parseline(self, line): """Parse input line. :returns: tuple of three items: `(command_name, arglist, original_line)` """ parts = line.split() if parts: return parts[0], ' '.join(parts[1:]), line return '', '', line
[文档] def onecmd(self, line): """Parse line and execute command.""" cmd, arg, line = self.parseline(line) if not line: return self.emptyline() self.lastcmd = line self.counter = next(self.inc_counter) try: self.respond(self.dispatch(cmd, arg)) except (AttributeError, KeyError) as exc: self.default(line) except Exception as exc: self.say(exc) self.needs_reconnect = True
[文档] def respond(self, retval): """What to do with the return value of a command.""" if retval is not None: if isinstance(retval, string_t): self.say(retval) else: self.say(pprint.pformat(retval))
def _reconnect(self): """Re-establish connection to the AMQP server.""" self.conn = self.connect(self.conn) self.chan = self.conn.default_channel self.needs_reconnect = False @property
[文档] def prompt(self): return self.prompt_fmt.format(self=self)
[文档]class AMQPAdmin(object): """The celery :program:`celery amqp` utility.""" Shell = AMQShell def __init__(self, *args, **kwargs): self.app = kwargs['app'] self.out = kwargs.setdefault('out', sys.stderr) self.silent = kwargs.get('silent') self.args = args
[文档] def connect(self, conn=None): if conn: conn.close() conn = self.app.connection() self.note('-> connecting to {0}.'.format(conn.as_uri())) conn.connect() self.note('-> connected.') return conn
[文档] def run(self): shell = self.Shell(connect=self.connect, out=self.out) if self.args: return shell.onecmd(' '.join(self.args)) try: return shell.cmdloop() except KeyboardInterrupt: self.note('(bibi)') pass
[文档] def note(self, m): if not self.silent: say(m, file=self.out)
[文档]class amqp(Command): """AMQP Administration Shell. Also works for non-amqp transports (but not ones that store declarations in memory). Examples:: celery amqp start shell mode celery amqp help show list of commands celery amqp exchange.delete name celery amqp queue.delete queue celery amqp queue.delete queue yes yes """
[文档] def run(self, *args, **options): options['app'] = self.app return AMQPAdmin(*args, **options).run()
def main(): amqp().execute_from_commandline() if __name__ == '__main__': # pragma: no cover main()