Producer consumer mode
In the actual software development process, we often encounter the following scenarios: one module is responsible for generating data, which is handled by another module (here the module is generalized, can be classes, functions, threads, processes, etc.). The module that produces data is known as the producer in an image; and the module that processes the data.Chunks are called consumers.
The only way to abstract producers and consumers is not enough to be a producer consumer model. The model also needs a buffer between producers and consumers as an intermediary. The producer puts the data in the buffer and the consumer takes the data out of the buffer area, as shown in the following figure:
The producer consumer mode is to solve the problem of strong coupling between producers and consumers through a container. Producers and consumers do not communicate directly with each other, but through message queues (buffers) to communicate, so the producer produces the data without waiting for consumers to process, directly thrown to the message queue, consumption.Instead of looking for data from the producer, the message queue is taken directly from the message queue, which is equivalent to a buffer, balancing the processing power of the producer and consumer. This message queue is used to decouple producers and consumers. ————-> there is another problem here.What is decoupling?
decoupling:Suppose that producers and consumers are two categories respectively. If the producer calls a method directly from the consumer, the producer becomes dependent on the consumer (that is, coupling). In the future, if the consumer’s code changes, it may affect the producers. And if both depend on a buffer, twoThere is no direct dependence between them, and coupling decreases accordingly. There is another drawback to the way producers call consumers directly. Because the function calls are synchronous (or blocked), the producer has to wait there until the consumer’s method returns. Should consumers deal with data very slowly,Producers will waste good time. The buffer has another advantage. If the speed of manufacturing data is fast and slow, the advantages of the buffer zone will be reflected. When data is made fast, consumers have no time to process it, and unprocessed data can be temporarily stored in a buffer. The speed of manufacturing is slowing down.Consumers will slowly deal with it.
Because it is too abstract, after reading the instructions on the Internet, through my understanding, I give an example: eating steamed buns.
If you really like steamed buns, today your mother is steaming steamed buns. There is a table in the kitchen. Your mother puts steamed buns on a plate and puts them on the table. You are watching the Brazilian Olympics and you see steamed buns on the kitchen table.On the plate, you take the plate away and watch the Olympic Games while eating steamed stuffed bun. In the process, you and your mom use the same table to place the plates and take them away, where the table is a shared object. Producers add food and consumers take food. The advantage of a table is that your mother doesn’t need to plate directly.Here you are. Just put the steamed buns on the plate and put them on the table. If the table is full, it won’t be put anymore. Wait. And producers have other things to do, consumers eat steamed buns slowly, producers can not wait until consumers eat steamed buns put the dishes back to production, because there are many people eating steamed buns, ifIn the meantime, your good friend comes and eats steamed buns with you. The producer doesn’t have to pay attention to which consumer goes to the table to pick up the dishes. The consumer only pays attention to whether there are any dishes on the table. If there are, bring them to eat the steamed buns on the plate. If not, wait. The corresponding relation is as follows:
Looking at the original design of this pattern, mainly to deal with concurrency issues, and Celery is a python written parallel distributed framework.
Then I went on to learn Celery.
CeleryDefinition
Celery(Celery) is a simple, flexible and reliable distributed system that handles a large number of messages and provides the necessary tools to maintain such a system.
One of my favorites is that Celery supports task scheduling on distributed machines, processes, and threads using task queues. Then I went on to understand what the task queue was.
Task queue
Task queue is a mechanism for distributing tasks between threads or machines.
Message queue
The input to the message queue is a unit of work called a task, and the independent worker process continuously monitors whether there are new tasks in the queue that need to be processed.
Celery Message communication is usually mediated by intermediaries (Broker) between clients and jobs. The process starts with the client adding a message to the queue, and then the middleman sends the message to the job that processes the message. As shown in the following figure:
Celery The system can include multiple jobs and intermediaries to achieve high availability and lateral expansion capability.
CeleryArchitecture
CeleryThe architecture consists of three parts: message broker, task execution unit and task result store.
Message middleware
CeleryIt doesn’t provide messaging services, but it’s easy to integrate with third-party messaging middleware, including RabbitMQ, Redis, MongoDB, and so on. Here I’ll get to know RabbitMQ, Redis.
Task execution unit
WorkerIt is the unit of task execution provided by Celery, and worker concurrency runs in distributed system nodes.
Task result storage
Task result storeUsed to store the results of tasks performed by workers, Celery supports storing the results of tasks in different ways, including Redis, MongoDB, Django ORM, AMQP, and so on.Task execution results.
Then I went on to install Celery. Before installing Celery, I had installed Python on my virtual machine, version 2.7, to better support Celery’s version 3.0 or above.
Because of the message middleware involved, I chose a message middleware that I wanted to use in my work (called middleman & lt; broker & gt; in the Celery help documentation), and I installed two middleware, Rab, to better understand the examples in the documentation.BitMQ, a redis.
Here I’ll install and set up RabbitMQ according to Celery 3.1’s help documentation. To use Celery, we need to create a RabbitMQ user, a virtual host, and allow this user to access the virtual host. The following is my personal experience.Settings on the virtual machine Ubuntu14.04:
$ sudo rabbitmqctl add_user forward password
#A RabbitMQ user was created with a user name of forward and a password of password.
$ sudo rabbitmqctl add_vhost ubuntu
#A virtual host was created with the host name Ubuntu.
$ sudo rabbitmqctl set_permissions -p ubuntu forward ".*" ".*" ".*"
#Allow user forward to access the virtual host Ubuntu, because RabbitMQ communicate with the node through the host name.
$ sudo rabbitmq-server
After that, I enabled the RabbitMQ server. The result is as follows:

After that, I install Redis, which is relatively simple to install, as follows:
$ sudo pip install redis
Then, a simple configuration is needed, only the location of the Redis database is set.
BROKER_URL = 'redis://localhost:6379/0'
URLThe format is:
redis://:password@hostname:port/db_number
URL Scheme All fields after that are optional and defaults to the 6379 port of localhost, using database 0. My configuration is:
redis://:password@ubuntu:6379/5
After installing Celery, I installed it with the standard Python tool pip, as follows:
$ sudo pip install celery
To test whether Celery works, I ran the simplest task of writing tasks. py, as shown in the following figure:
After saving and exiting the editor, I run the following command in the current directory:
$ celery -A tasks worker --loglevel=info
#Look up the document and see that the – A parameter in this command represents the name of Celery APP, which in this case refers to tasks. py, the next tasks are the names of APP, the worker is a task role, and the loglevel = inf laterO Logging type defaults to info, which starts a worker to perform the add task in the program.
Then I see the results of the interface display as follows:
We can see that Celery works properly on the virtual host with the name ubuntu, version 3.1.23. In the following [config] we can see the current APP name tasks, and the transport transport is what we set up in the program.Redis: / / 127.0.0.1:6379/5, result we have not set, temporarily disabled, and then we can see that the worker defaults to use performance to perform concurrency, the current concurrency is shown as 1, and then you canLooking at the following [queues] is what we call the queue, the default queue is celery at the moment, and then we see a task task. add in the following [tasks].
With this in mind, I reopen a terminal based on the document, execute Python, enter the Python interface, invoke the task with the delay () method, and do the following:
This task has been performed asynchronously by the previously started Worker, and then I open the previously started worker’s console to view and verify the output as follows:
The first line in the green section says that the worker receives a task: tasks. add. Here we compare it to AsyncResult, which sent the task back earlier, and we find that each task has a unique ID. The second line says that the task executes succeedThe implementation result is 12.
Check the data to say that an AsyncResult instance is returned after the task is invoked, which can be used to check the status of the task, wait for the task to complete or get the return value (exception and backtracking if the task fails). However, this function is not opened by default. It needs to set a result of Celery.Backend, I learned in the next example.
Through this example, I have a preliminary understanding of Celery, and then on the basis of this example to further study.
Because Celery was written in Python, I used the python package to create a celery service called PJ to make the code more structured, like an application. The file catalogue is as follows:
celery.py
from __future __ import absolute_import
#Define the absolute import of future documents, and absolute import must be enabled at the top of each module.
from celery import Celery
#Application interface for importing Celery from celery
App.config_from_object(‘pj.config’)
#Importing configuration files from config.py
if __name__ == ‘__main__’:
app.start()
#Execute the current file and run celery
app = Celery(‘pj’,
broker=‘redis://localhost’,
backend=‘redis://localhost’,
include=[‘pj.tasks’]
)
#First, a celery instance app is created. During the instantiation process, the task name PJ (the same as the name of the current file) is defined. The first parameter of Celery is the name of the current module, in this case pj. The latter parameter can be specified directly here or written inIn the configuration file, we can call config_from_object() to have the Celery instance load the configuration module. In my example, the configuration file is named config.py, and the configuration file is as follows:
In the configuration file, we can manage the execution of tasks, for example, we may have a lot of tasks, but I want some higher priority tasks to be executed first, rather than waiting first in, first out. Then you need to introduce a queue problem, that is to say, in my broker message.There are queues in the storage that run in parallel, but the worker only fetches tasks from the corresponding queue. Here we hope that add in tasks.py will be executed first. In task, I have set up two tasks:
So I introduced group through from celery import group to create a set of tasks that are executed in parallel. And then what I need to understand is this @app.task, @symbol used as a function modifier in python, and here I go backLet’s first look at how python’s decorator (the way it dynamically adds functionality during code execution) actually works, where the task () decorator creates a task on the callable object (app).
After understanding the decorator, I went back to sorting out the configuration, which I mentioned earlier. In this example, if we want the add task to take precedence over the subtract subtract subtraction task, we can put the two tasks in a different queue, and we decide to do it firstWhich task can we configure in the configuration file?
First, I understand the meaning of several commonly used parameters.
Exchange:The switch determines the message routing rules.
Queue:Message queue;
Channel:Channel for message read and write;
Bind:Queue and Exchange are bound, which means which message queue the message will be placed into, and which message queue the message will be bound to.
I put the add function task in a queue called for_add, put the subtract function task in a queue called for_subtract, and then I execute the command in the current application directory:
This worker is only responsible for handling the task of for_add queue and performing this task:
The task has been executed. I checked the results at the worker console.
You can see that worker receives the task and performs the task.
Here we’re still doing it manually in interactive mode. We want crontab to generate and execute on time. We can use celery’s beat to periodically generate and execute tasks. In this case, I want to generate a task every 10 seconds and then execute it.I can configure this way:
Using scheduler, set the time zone: CELERY_TIMEZONE =’Asia/Shanghai’, start celery with the – B parameter:
And from datetime import timedelta is added to config.py.
Further, if I want to generate a task at 19:30 a.m. every Thursday, distribute the task, and let the worker take it away, I can configure it this way:
After looking at the basics, I looked back at celery and sketched out the framework with a diagram, as follows: