你正在阅读 Celery 3.1 的文档。开发版本文档见: 此处.
Answer: Queue everything and delight everyone is a good article describing why you would use a queue in a web context.
These are some common use cases:
And to some degree:
Answer: No, this and similarly large numbers have been reported at various locations.
The numbers as of this writing are:
- core: 7,141 lines of code.
- tests: 14,209 lines.
- backends, contrib, compat utilities: 9,032 lines.
Lines of code is not a useful metric, so even if Celery did consist of 50k lines of code you would not be able to draw any conclusions from such a number.
A common criticism is that Celery uses too many dependencies. The rationale behind such a fear is hard to imagine, especially considering code reuse as the established way to combat complexity in modern software development, and that the cost of adding dependencies is very low now that package managers like pip and PyPI makes the hassle of installing and maintaining dependencies a thing of the past.
Celery has replaced several dependencies along the way, and the current list of dependencies are:
Kombu is part of the Celery ecosystem and is the library used to send and receive messages. It is also the library that enables us to support many different message brokers. It is also used by the OpenStack project, and many others, validating the choice to separate it from the Celery codebase.
Billiard is a fork of the Python multiprocessing module containing many performance and stability improvements. It is an eventual goal that these improvements will be merged back into Python one day.
It is also used for compatibility with older Python versions that doesn’t come with the multiprocessing module.
The pytz module provides timezone definitions and related tools.
If you use django-celery then you don’t have to install celery separately, as it will make sure that the required version is installed.
django-celery does not have any other dependencies.
Kombu depends on the following packages:
The underlying pure-Python amqp client implementation. AMQP being the default broker this is a natural dependency.
anyjson is an utility library to select the best possible JSON implementation.
注解
For compatibility reasons additional packages may be installed if you are running on older Python versions, for example Python 2.6 depends on the importlib, and ordereddict libraries.
Also, to handle the dependencies for popular configuration choices Celery defines a number of “bundle” packages, see 捆绑.
Celery poses very little overhead both in memory footprint and performance.
But please note that the default configuration is not optimized for time nor space, see the Optimizing guide for more information.
Answer: No.
Celery can support any serialization scheme and has built-in support for JSON, YAML, Pickle and msgpack. Also, as every task is associated with a content type, you can even send one task using pickle, and another using JSON.
The default serialization format is pickle simply because it is convenient (it supports sending complex Python objects as task arguments).
If you need to communicate with other languages you should change to a serialization format that is suitable for that.
You can set a global default serializer, the default serializer for a particular Task, or even what serializer to use when sending a single task instance.
Answer: No.
Although using RabbitMQ is recommended you can also use Redis. There are also experimental transports available such as MongoDB, Beanstalk, CouchDB, or using SQL databases. See Brokers for more information.
The experimental transports may have reliability problems and limited broadcast and event functionality. For example remote control commands only works with AMQP and Redis.
Redis or a database won’t perform as well as an AMQP broker. If you have strict reliability requirements you are encouraged to use RabbitMQ or another AMQP broker. Some transports also uses polling, so they are likely to consume more resources. However, if you for some reason are not able to use AMQP, feel free to use these alternatives. They will probably work fine for most use cases, and note that the above points are not specific to Celery; If using Redis/database as a queue worked fine for you before, it probably will now. You can always upgrade later if you need to.
Answer: Yes.
worker is an implementation of Celery in Python. If the language has an AMQP client, there shouldn’t be much work to create a worker in your language. A Celery worker is just a program connecting to the broker to process messages.
Also, there’s another way to be language independent, and that is to use REST tasks, instead of your tasks being functions, they’re URLs. With this information you can even create simple web servers that enable preloading of code. See: User Guide: Remote Tasks.
Answer: MySQL has default isolation level set to REPEATABLE-READ, if you don’t really need that, set it to READ-COMMITTED. You can do that by adding the following to your my.cnf:
[mysqld]
transaction-isolation = READ-COMMITTED
For more information about InnoDB`s transaction model see MySQL - The InnoDB Transaction Model and Locking in the MySQL user manual.
(Thanks to Honza Kral and Anton Tsigularov for this solution)
Answer: If you’re using the database backend for results, and in particular using MySQL, see MySQL is throwing deadlock errors, what can I do?.
Answer: There is a bug in some AMQP clients that will make it hang if it’s not able to authenticate the current user, the password doesn’t match or the user does not have access to the virtual host specified. Be sure to check your broker logs (for RabbitMQ that is /var/log/rabbitmq/rabbit.log on most systems), it usually contains a message describing the reason.
Answer: The prefork pool requires a working POSIX semaphore implementation which isn’t enabled in FreeBSD by default. You have to enable POSIX semaphores in the kernel and manually recompile multiprocessing.
Luckily, Viktor Petersson has written a tutorial to get you started with Celery on FreeBSD here: http://www.playingwithwire.com/2009/10/how-to-get-celeryd-to-work-on-freebsd/
Answer: See MySQL is throwing deadlock errors, what can I do?. Thanks to howsthedotcom.
Answer: With RabbitMQ you can see how many consumers are currently receiving tasks by running the following command:
$ rabbitmqctl list_queues -p <myvhost> name messages consumers
Listing queues ...
celery 2891 2
This shows that there’s 2891 messages waiting to be processed in the task queue, and there are two consumers processing them.
One reason that the queue is never emptied could be that you have a stale worker process taking the messages hostage. This could happen if the worker wasn’t properly shut down.
When a message is received by a worker the broker waits for it to be acknowledged before marking the message as processed. The broker will not re-send that message to another consumer until the consumer is shut down properly.
If you hit this problem you have to kill all workers manually and restart them:
ps auxww | grep celeryd | awk '{print $2}' | xargs kill
You might have to wait a while until all workers have finished the work they’re doing. If it’s still hanging after a long time you can kill them by force with:
ps auxww | grep celeryd | awk '{print $2}' | xargs kill -9
Answer: There might be syntax errors preventing the tasks module being imported.
You can find out if Celery is able to run the task by executing the task manually:
>>> from myapp.tasks import MyPeriodicTask
>>> MyPeriodicTask.delay()
Watch the workers log file to see if it’s able to find the task, or if some other error is happening.
Answer: See Why won’t my Task run?.
Answer: You can use the celery purge command to purge all configured task queues:
$ celery purge
or programatically:
>>> from celery import current_app as celery
>>> celery.control.purge()
1753
If you only want to purge messages from a specific queue you have to use the AMQP API or the celery amqp utility:
$ celery amqp queue.purge <queue name>
The number 1753 is the number of messages deleted.
You can also start worker with the --purge argument, to purge messages when the worker starts.
Answer: Tasks are acknowledged (removed from the queue) as soon as they are actually executed. After the worker has received a task, it will take some time until it is actually executed, especially if there are a lot of tasks already waiting for execution. Messages that are not acknowledged are held on to by the worker until it closes the connection to the broker (AMQP server). When that connection is closed (e.g. because the worker was stopped) the tasks will be re-sent by the broker to the next available worker (or the same worker when it has been restarted), so to properly purge the queue of waiting tasks you have to stop all the workers, and then purge the tasks using celery.control.purge().
Answer: Use task.AsyncResult:
>>> result = my_task.AsyncResult(task_id)
>>> result.get()
This will give you a AsyncResult instance using the tasks current result backend.
If you need to specify a custom result backend, or you want to use the current application’s default backend you can use Celery.AsyncResult:
>>> result = app.AsyncResult(task_id)
>>> result.get()
Answer: Yes, indeed it is.
You are right to have a security concern, as this can indeed be a real issue. It is essential that you protect against unauthorized access to your broker, databases and other services transmitting pickled data.
For the task messages you can set the CELERY_TASK_SERIALIZER setting to “json” or “yaml” instead of pickle. There is currently no alternative solution for task results (but writing a custom result backend using JSON is a simple task)
Note that this is not just something you should be aware of with Celery, for example also Django uses pickle for its cache client.
Answer: Some AMQP brokers supports using SSL (including RabbitMQ). You can enable this using the BROKER_USE_SSL setting.
It is also possible to add additional encryption and security to messages, if you have a need for this then you should contact the Mailing list.
Answer: No!
We’re not currently aware of any security issues, but it would be incredibly naive to assume that they don’t exist, so running the Celery services (celery worker, celery beat, celeryev, etc) as an unprivileged user is recommended.
Answer: RabbitMQ will crash if it runs out of memory. This will be fixed in a future release of RabbitMQ. please refer to the RabbitMQ FAQ: http://www.rabbitmq.com/faq.html#node-runs-out-of-memory
注解
This is no longer the case, RabbitMQ versions 2.0 and above includes a new persister, that is tolerant to out of memory errors. RabbitMQ 2.1 or higher is recommended for Celery.
If you’re still running an older version of RabbitMQ and experience crashes, then please upgrade!
Misconfiguration of Celery can eventually lead to a crash on older version of RabbitMQ. Even if it doesn’t crash, this can still consume a lot of resources, so it is very important that you are aware of the common pitfalls.
Running worker with the -E/--events option will send messages for events happening inside of the worker.
Events should only be enabled if you have an active monitor consuming them, or if you purge the event queue periodically.
When running with the AMQP result backend, every task result will be sent as a message. If you don’t collect these results, they will build up and RabbitMQ will eventually run out of memory.
Results expire after 1 day by default. It may be a good idea to lower this value by configuring the CELERY_TASK_RESULT_EXPIRES setting.
If you don’t use the results for a task, make sure you set the ignore_result option:
Answer: No. It used to be supported by Carrot, but is not currently supported in Kombu.
This is an incomplete list of features not available when using the virtual transports:
Remote control commands (supported only by Redis).
Monitoring with events may not work in all virtual transports.
- The header and fanout exchange types
(fanout is supported by Redis).
Answer: See the BROKER_POOL_LIMIT setting. The connection pool is enabled by default since version 2.5.
There is a sudo configuration option that makes it illegal for process without a tty to run sudo:
Defaults requiretty
If you have this configuration in your /etc/sudoers file then tasks will not be able to call sudo when the worker is running as a daemon. If you want to enable that, then you need to remove the line from sudoers.
Answer:
The worker rejects unknown tasks, messages with encoding errors and messages that doesn’t contain the proper fields (as per the task message protocol).
If it did not reject them they could be redelivered again and again, causing a loop.
Recent versions of RabbitMQ has the ability to configure a dead-letter queue for exchange, so that rejected messages is moved there.
Answer: Yes. Use celery.execute.send_task(). You can also call a task by name from any language that has an AMQP client.
>>> from celery.execute import send_task
>>> send_task("tasks.add", args=[2, 2], kwargs={})
<AsyncResult: 373550e8-b9a0-4666-bc61-ace01fa4f91d>
Answer: The current id and more is available in the task request:
@app.task(bind=True)
def mytask(self):
cache.set(self.request.id, "Running")
For more information see Context.
Answer: Yes. Use the task_id argument to Task.apply_async():
>>> task.apply_async(args, kwargs, task_id='…')
Answer: Yes. But please see note in the sidebar at Basics.
Answer: Yes, but make sure it is unique, as the behavior for two tasks existing with the same id is undefined.
The world will probably not explode, but at the worst they can overwrite each others results.
Answer: You can safely launch a task inside a task. Also, a common pattern is to add callbacks to tasks:
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task
def add(x, y):
return x + y
@app.task(ignore_result=True)
def log_result(result):
logger.info("log_result got: %r", result)
Invocation:
>>> (add.s(2, 2) | log_result.s()).delay()
See Canvas: Designing Workflows for more information.
Answer: Yes. Use result.revoke:
>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()
or if you only have the task id:
>>> from celery import current_app as celery
>>> celery.control.revoke(task_id)
Answer: To receive broadcast remote control commands, every worker node uses its host name to create a unique queue name to listen to, so if you have more than one worker with the same host name, the control commands will be received in round-robin between them.
To work around this you can explicitly set the nodename for every worker using the -n argument to worker:
$ celery worker -n worker1@%h
$ celery worker -n worker2@%h
where %h is automatically expanded into the current hostname.
Answer: Yes. You can route tasks to an arbitrary server using AMQP, and a worker can bind to as many queues as it wants.
See Routing Tasks for more information.
Answer: Yes. You can use the Django database scheduler, or you can create a new schedule subclass and override is_due():
from celery.schedules import schedule
class my_schedule(schedule):
def is_due(self, last_run_at):
return …
Answer: No. In theory, yes, as AMQP supports priorities. However RabbitMQ doesn’t implement them yet.
The usual way to prioritize work in Celery, is to route high priority tasks to different servers. In the real world this may actually work better than per message priorities. You can use this in combination with rate limiting to achieve a highly responsive system.
Answer: Depends. It’s not necessarily one or the other, you may want to use both.
Task.retry is used to retry tasks, notably for expected errors that is catchable with the try: block. The AMQP transaction is not used for these errors: if the task raises an exception it is still acknowledged!.
The acks_late setting would be used when you need the task to be executed again if the worker (for some reason) crashes mid-execution. It’s important to note that the worker is not known to crash, and if it does it is usually an unrecoverable error that requires human intervention (bug in the worker, or task code).
In an ideal world you could safely retry any task that has failed, but this is rarely the case. Imagine the following task:
@app.task
def process_upload(filename, tmpfile):
# Increment a file count stored in a database
increment_file_counter()
add_file_metadata_to_db(filename, tmpfile)
copy_file_to_destination(filename, tmpfile)
If this crashed in the middle of copying the file to its destination the world would contain incomplete state. This is not a critical scenario of course, but you can probably imagine something far more sinister. So for ease of programming we have less reliability; It’s a good default, users who require it and know what they are doing can still enable acks_late (and in the future hopefully use manual acknowledgement)
In addition Task.retry has features not available in AMQP transactions: delay between retries, max retries, etc.
So use retry for Python errors, and if your task is idempotent combine that with acks_late if that level of reliability is required.
Answer: Yes. You can use the eta argument of Task.apply_async().
Or to schedule a periodic task at a specific time, use the celery.schedules.crontab schedule behavior:
from celery.schedules import crontab
from celery.task import periodic_task
@periodic_task(run_every=crontab(hour=7, minute=30, day_of_week="mon"))
def every_monday_morning():
print("This is run every Monday morning at 7:30")
Answer: Use the TERM signal, and the worker will finish all currently executing jobs and shut down as soon as possible. No tasks should be lost.
You should never stop worker with the KILL signal (-9), unless you’ve tried TERM a few times and waited a few minutes to let it get a chance to shut down. As if you do tasks may be terminated mid-execution, and they will not be re-run unless you have the acks_late option set (Task.acks_late / CELERY_ACKS_LATE).
Several database tables are created by default, these relate to
Monitoring
When you use the django-admin monitor, the cluster state is written to the TaskState and WorkerState models.
Periodic tasks
When the database-backed schedule is used the periodic task schedule is taken from the PeriodicTask model, there are also several other helper tables (IntervalSchedule, CrontabSchedule, PeriodicTasks).
Task results
The database result backend is enabled by default when using django-celery (this is for historical reasons, and thus for backward compatibility).
The results are stored in the TaskMeta and TaskSetMeta models. these tables are not created if another result backend is configured.
Answer: That’s right. Run celery beat and celery worker as separate services instead.