你正在阅读 Celery 3.1 的文档。开发版本文档见: 此处.

celery.bin.multi 源代码

# -*- coding: utf-8 -*-
"""

.. program:: celery multi

Examples
========

.. code-block:: bash

    # Single worker with explicit name and events enabled.
    $ celery multi start Leslie -E

    # Pidfiles and logfiles are stored in the current directory
    # by default.  Use --pidfile and --logfile argument to change
    # this.  The abbreviation %N will be expanded to the current
    # node name.
    $ celery multi start Leslie -E --pidfile=/var/run/celery/%N.pid
                                    --logfile=/var/log/celery/%N.log


    # You need to add the same arguments when you restart,
    # as these are not persisted anywhere.
    $ celery multi restart Leslie -E --pidfile=/var/run/celery/%N.pid
                                     --logfile=/var/run/celery/%N.log

    # To stop the node, you need to specify the same pidfile.
    $ celery multi stop Leslie --pidfile=/var/run/celery/%N.pid

    # 3 workers, with 3 processes each
    $ celery multi start 3 -c 3
    celery worker -n celery1@myhost -c 3
    celery worker -n celery2@myhost -c 3
    celery worker -n celery3@myhost -c 3

    # start 3 named workers
    $ celery multi start image video data -c 3
    celery worker -n image@myhost -c 3
    celery worker -n video@myhost -c 3
    celery worker -n data@myhost -c 3

    # specify custom hostname
    $ celery multi start 2 --hostname=worker.example.com -c 3
    celery worker -n celery1@worker.example.com -c 3
    celery worker -n celery2@worker.example.com -c 3

    # specify fully qualified nodenames
    $ celery multi start foo@worker.example.com bar@worker.example.com -c 3

    # Advanced example starting 10 workers in the background:
    #   * Three of the workers processes the images and video queue
    #   * Two of the workers processes the data queue with loglevel DEBUG
    #   * the rest processes the default' queue.
    $ celery multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data
        -Q default -L:4,5 DEBUG

    # You can show the commands necessary to start the workers with
    # the 'show' command:
    $ celery multi show 10 -l INFO -Q:1-3 images,video -Q:4,5 data
        -Q default -L:4,5 DEBUG

    # Additional options are added to each celery worker' comamnd,
    # but you can also modify the options for ranges of, or specific workers

    # 3 workers: Two with 3 processes, and one with 10 processes.
    $ celery multi start 3 -c 3 -c:1 10
    celery worker -n celery1@myhost -c 10
    celery worker -n celery2@myhost -c 3
    celery worker -n celery3@myhost -c 3

    # can also specify options for named workers
    $ celery multi start image video data -c 3 -c:image 10
    celery worker -n image@myhost -c 10
    celery worker -n video@myhost -c 3
    celery worker -n data@myhost -c 3

    # ranges and lists of workers in options is also allowed:
    # (-c:1-3 can also be written as -c:1,2,3)
    $ celery multi start 5 -c 3  -c:1-3 10
    celery worker -n celery1@myhost -c 10
    celery worker -n celery2@myhost -c 10
    celery worker -n celery3@myhost -c 10
    celery worker -n celery4@myhost -c 3
    celery worker -n celery5@myhost -c 3

    # lists also works with named workers
    $ celery multi start foo bar baz xuzzy -c 3 -c:foo,bar,baz 10
    celery worker -n foo@myhost -c 10
    celery worker -n bar@myhost -c 10
    celery worker -n baz@myhost -c 10
    celery worker -n xuzzy@myhost -c 3

"""
from __future__ import absolute_import, print_function, unicode_literals

import errno
import os
import shlex
import signal
import socket
import sys

from collections import defaultdict, namedtuple
from subprocess import Popen
from time import sleep

from kombu.utils import cached_property
from kombu.utils.compat import OrderedDict
from kombu.utils.encoding import from_utf8

from celery import VERSION_BANNER
from celery.five import items
from celery.platforms import Pidfile, IS_WINDOWS
from celery.utils import term, nodesplit
from celery.utils.text import pluralize

__all__ = ['MultiTool']

SIGNAMES = set(sig for sig in dir(signal)
               if sig.startswith('SIG') and '_' not in sig)
SIGMAP = dict((getattr(signal, name), name) for name in SIGNAMES)

USAGE = """\
usage: {prog_name} start <node1 node2 nodeN|range> [worker options]
       {prog_name} stop <n1 n2 nN|range> [-SIG (default: -TERM)]
       {prog_name} restart <n1 n2 nN|range> [-SIG] [worker options]
       {prog_name} kill <n1 n2 nN|range>

       {prog_name} show <n1 n2 nN|range> [worker options]
       {prog_name} get hostname <n1 n2 nN|range> [-qv] [worker options]
       {prog_name} names <n1 n2 nN|range>
       {prog_name} expand template <n1 n2 nN|range>
       {prog_name} help

additional options (must appear after command name):

    * --nosplash:   Don't display program info.
    * --quiet:      Don't show as much output.
    * --verbose:    Show more output.
    * --no-color:   Don't display colors.
"""

multi_args_t = namedtuple(
    'multi_args_t', ('name', 'argv', 'expander', 'namespace'),
)


def main():
    sys.exit(MultiTool().execute_from_commandline(sys.argv))


CELERY_EXE = 'celery'
if sys.version_info < (2, 7):
    # pkg.__main__ first supported in Py2.7
    CELERY_EXE = 'celery.__main__'


def celery_exe(*args):
    return ' '.join((CELERY_EXE, ) + args)


[文档]class MultiTool(object): retcode = 0 # Final exit code. def __init__(self, env=None, fh=None, quiet=False, verbose=False, no_color=False, nosplash=False): self.fh = fh or sys.stderr self.env = env self.nosplash = nosplash self.quiet = quiet self.verbose = verbose self.no_color = no_color self.prog_name = 'celery multi' self.commands = {'start': self.start, 'show': self.show, 'stop': self.stop, 'stopwait': self.stopwait, 'stop_verify': self.stopwait, # compat alias 'restart': self.restart, 'kill': self.kill, 'names': self.names, 'expand': self.expand, 'get': self.get, 'help': self.help}
[文档] def execute_from_commandline(self, argv, cmd='celery worker'): argv = list(argv) # don't modify callers argv. # Reserve the --nosplash|--quiet|-q/--verbose options. if '--nosplash' in argv: self.nosplash = argv.pop(argv.index('--nosplash')) if '--quiet' in argv: self.quiet = argv.pop(argv.index('--quiet')) if '-q' in argv: self.quiet = argv.pop(argv.index('-q')) if '--verbose' in argv: self.verbose = argv.pop(argv.index('--verbose')) if '--no-color' in argv: self.no_color = argv.pop(argv.index('--no-color')) self.prog_name = os.path.basename(argv.pop(0)) if not argv or argv[0][0] == '-': return self.error() try: self.commands[argv[0]](argv[1:], cmd) except KeyError: self.error('Invalid command: {0}'.format(argv[0])) return self.retcode
[文档] def say(self, m, newline=True): print(m, file=self.fh, end='\n' if newline else '')
[文档] def names(self, argv, cmd): p = NamespacedOptionParser(argv) self.say('\n'.join( n.name for n in multi_args(p, cmd)), )
[文档] def get(self, argv, cmd): wanted = argv[0] p = NamespacedOptionParser(argv[1:]) for node in multi_args(p, cmd): if node.name == wanted: self.say(' '.join(node.argv)) return
[文档] def show(self, argv, cmd): p = NamespacedOptionParser(argv) self.with_detacher_default_options(p) self.say('\n'.join( ' '.join([sys.executable] + n.argv) for n in multi_args(p, cmd)), )
[文档] def start(self, argv, cmd): self.splash() p = NamespacedOptionParser(argv) self.with_detacher_default_options(p) retcodes = [] self.note('> Starting nodes...') for node in multi_args(p, cmd): self.note('\t> {0}: '.format(node.name), newline=False) retcode = self.waitexec(node.argv) self.note(retcode and self.FAILED or self.OK) retcodes.append(retcode) self.retcode = int(any(retcodes))
[文档] def with_detacher_default_options(self, p): _setdefaultopt(p.options, ['--pidfile', '-p'], '%N.pid') _setdefaultopt(p.options, ['--logfile', '-f'], '%N.log') p.options.setdefault( '--cmd', '-m {0}'.format(celery_exe('worker', '--detach')), )
[文档] def signal_node(self, nodename, pid, sig): try: os.kill(pid, sig) except OSError as exc: if exc.errno != errno.ESRCH: raise self.note('Could not signal {0} ({1}): No such process'.format( nodename, pid)) return False return True
[文档] def node_alive(self, pid): try: os.kill(pid, 0) except OSError as exc: if exc.errno == errno.ESRCH: return False raise return True
[文档] def shutdown_nodes(self, nodes, sig=signal.SIGTERM, retry=None, callback=None): if not nodes: return P = set(nodes) def on_down(node): P.discard(node) if callback: callback(*node) self.note(self.colored.blue('> Stopping nodes...')) for node in list(P): if node in P: nodename, _, pid = node self.note('\t> {0}: {1} -> {2}'.format( nodename, SIGMAP[sig][3:], pid)) if not self.signal_node(nodename, pid, sig): on_down(node) def note_waiting(): left = len(P) if left: pids = ', '.join(str(pid) for _, _, pid in P) self.note(self.colored.blue( '> Waiting for {0} {1} -> {2}...'.format( left, pluralize(left, 'node'), pids)), newline=False) if retry: note_waiting() its = 0 while P: for node in P: its += 1 self.note('.', newline=False) nodename, _, pid = node if not self.node_alive(pid): self.note('\n\t> {0}: {1}'.format(nodename, self.OK)) on_down(node) note_waiting() break if P and not its % len(P): sleep(float(retry)) self.note('')
[文档] def getpids(self, p, cmd, callback=None): _setdefaultopt(p.options, ['--pidfile', '-p'], '%N.pid') nodes = [] for node in multi_args(p, cmd): try: pidfile_template = _getopt( p.namespaces[node.namespace], ['--pidfile', '-p'], ) except KeyError: pidfile_template = _getopt(p.options, ['--pidfile', '-p']) pid = None pidfile = node.expander(pidfile_template) try: pid = Pidfile(pidfile).read_pid() except ValueError: pass if pid: nodes.append((node.name, tuple(node.argv), pid)) else: self.note('> {0.name}: {1}'.format(node, self.DOWN)) if callback: callback(node.name, node.argv, pid) return nodes
[文档] def kill(self, argv, cmd): self.splash() p = NamespacedOptionParser(argv) for nodename, _, pid in self.getpids(p, cmd): self.note('Killing node {0} ({1})'.format(nodename, pid)) self.signal_node(nodename, pid, signal.SIGKILL)
[文档] def stop(self, argv, cmd, retry=None, callback=None): self.splash() p = NamespacedOptionParser(argv) return self._stop_nodes(p, cmd, retry=retry, callback=callback)
def _stop_nodes(self, p, cmd, retry=None, callback=None): restargs = p.args[len(p.values):] self.shutdown_nodes(self.getpids(p, cmd, callback=callback), sig=findsig(restargs), retry=retry, callback=callback)
[文档] def restart(self, argv, cmd): self.splash() p = NamespacedOptionParser(argv) self.with_detacher_default_options(p) retvals = [] def on_node_shutdown(nodename, argv, pid): self.note(self.colored.blue( '> Restarting node {0}: '.format(nodename)), newline=False) retval = self.waitexec(argv) self.note(retval and self.FAILED or self.OK) retvals.append(retval) self._stop_nodes(p, cmd, retry=2, callback=on_node_shutdown) self.retval = int(any(retvals))
[文档] def stopwait(self, argv, cmd): self.splash() p = NamespacedOptionParser(argv) self.with_detacher_default_options(p) return self._stop_nodes(p, cmd, retry=2)
stop_verify = stopwait # compat
[文档] def expand(self, argv, cmd=None): template = argv[0] p = NamespacedOptionParser(argv[1:]) for node in multi_args(p, cmd): self.say(node.expander(template))
[文档] def help(self, argv, cmd=None): self.say(__doc__)
[文档] def usage(self): self.splash() self.say(USAGE.format(prog_name=self.prog_name))
[文档] def splash(self): if not self.nosplash: c = self.colored self.note(c.cyan('celery multi v{0}'.format(VERSION_BANNER)))
[文档] def waitexec(self, argv, path=sys.executable): args = ' '.join([path] + list(argv)) argstr = shlex.split(from_utf8(args), posix=not IS_WINDOWS) pipe = Popen(argstr, env=self.env) self.info(' {0}'.format(' '.join(argstr))) retcode = pipe.wait() if retcode < 0: self.note('* Child was terminated by signal {0}'.format(-retcode)) return -retcode elif retcode > 0: self.note('* Child terminated with errorcode {0}'.format(retcode)) return retcode
[文档] def error(self, msg=None): if msg: self.say(msg) self.usage() self.retcode = 1 return 1
[文档] def info(self, msg, newline=True): if self.verbose: self.note(msg, newline=newline)
[文档] def note(self, msg, newline=True): if not self.quiet: self.say(str(msg), newline=newline)
@cached_property
[文档] def colored(self): return term.colored(enabled=not self.no_color)
@cached_property
[文档] def OK(self): return str(self.colored.green('OK'))
@cached_property
[文档] def FAILED(self): return str(self.colored.red('FAILED'))
@cached_property
[文档] def DOWN(self): return str(self.colored.magenta('DOWN'))
def multi_args(p, cmd='celery worker', append='', prefix='', suffix=''): names = p.values options = dict(p.options) passthrough = p.passthrough ranges = len(names) == 1 if ranges: try: noderange = int(names[0]) except ValueError: pass else: names = [str(n) for n in range(1, noderange + 1)] prefix = 'celery' cmd = options.pop('--cmd', cmd) append = options.pop('--append', append) hostname = options.pop('--hostname', options.pop('-n', socket.gethostname())) prefix = options.pop('--prefix', prefix) or '' suffix = options.pop('--suffix', suffix) or hostname if suffix in ('""', "''"): suffix = '' for ns_name, ns_opts in list(items(p.namespaces)): if ',' in ns_name or (ranges and '-' in ns_name): for subns in parse_ns_range(ns_name, ranges): p.namespaces[subns].update(ns_opts) p.namespaces.pop(ns_name) for name in names: this_suffix = suffix if '@' in name: this_name = options['-n'] = name nodename, this_suffix = nodesplit(name) name = nodename else: nodename = '%s%s' % (prefix, name) this_name = options['-n'] = '%s@%s' % (nodename, this_suffix) expand = abbreviations({'%h': this_name, '%n': name, '%N': nodename, '%d': this_suffix}) argv = ([expand(cmd)] + [format_opt(opt, expand(value)) for opt, value in items(p.optmerge(name, options))] + [passthrough]) if append: argv.append(expand(append)) yield multi_args_t(this_name, argv, expand, name) class NamespacedOptionParser(object): def __init__(self, args): self.args = args self.options = OrderedDict() self.values = [] self.passthrough = '' self.namespaces = defaultdict(lambda: OrderedDict()) self.parse() def parse(self): rargs = list(self.args) pos = 0 while pos < len(rargs): arg = rargs[pos] if arg == '--': self.passthrough = ' '.join(rargs[pos:]) break elif arg[0] == '-': if arg[1] == '-': self.process_long_opt(arg[2:]) else: value = None if len(rargs) > pos + 1 and rargs[pos + 1][0] != '-': value = rargs[pos + 1] pos += 1 self.process_short_opt(arg[1:], value) else: self.values.append(arg) pos += 1 def process_long_opt(self, arg, value=None): if '=' in arg: arg, value = arg.split('=', 1) self.add_option(arg, value, short=False) def process_short_opt(self, arg, value=None): self.add_option(arg, value, short=True) def optmerge(self, ns, defaults=None): if defaults is None: defaults = self.options return OrderedDict(defaults, **self.namespaces[ns]) def add_option(self, name, value, short=False, ns=None): prefix = short and '-' or '--' dest = self.options if ':' in name: name, ns = name.split(':') dest = self.namespaces[ns] dest[prefix + name] = value def quote(v): return "\\'".join("'" + p + "'" for p in v.split("'")) def format_opt(opt, value): if not value: return opt if opt.startswith('--'): return '{0}={1}'.format(opt, value) return '{0} {1}'.format(opt, value) def parse_ns_range(ns, ranges=False): ret = [] for space in ',' in ns and ns.split(',') or [ns]: if ranges and '-' in space: start, stop = space.split('-') ret.extend( str(n) for n in range(int(start), int(stop) + 1) ) else: ret.append(space) return ret def abbreviations(mapping): def expand(S): ret = S if S is not None: for short_opt, long_opt in items(mapping): ret = ret.replace(short_opt, long_opt) return ret return expand def findsig(args, default=signal.SIGTERM): for arg in reversed(args): if len(arg) == 2 and arg[0] == '-': try: return int(arg[1]) except ValueError: pass if arg[0] == '-': maybe_sig = 'SIG' + arg[1:] if maybe_sig in SIGNAMES: return getattr(signal, maybe_sig) return default def _getopt(d, alt): for opt in alt: try: return d[opt] except KeyError: pass raise KeyError(alt[0]) def _setdefaultopt(d, alt, value): for opt in alt[1:]: try: return d[opt] except KeyError: pass return d.setdefault(alt[0], value) if __name__ == '__main__': # pragma: no cover main()