Flask添加请求队列–使用消费者生产者模式

717 views

Flask作为一个轻量级的基于Python的web框架,使用很简单,但是没有内置的缓存及队列。在一些应用场景下,为了增加并发量,缓解用户的大量的异步请求,需要加入队列的功能。

需要注意的是,使用队列的情况下,会将任务请求变为异步的请求。比如当我们发送请求一个请求A给服务器的时候,服务器可能会立马返回OK我已经收到了,但是实际上请求A只是在任务队列中,还没有被请求,这样请求A就变成了一个异步的请求。

对于队列的加入有很多种,可以使用redies自带的队列,可以使用kafa这种消息队列中间件,这些工具可以解决为集群访问建立队列的功能,但是如果要解决给每个服务器建立一个队列(每个服务器拥有一个自己的请求队列,然后进行管理)成本很高。

最简单的办法就是使用Python的Queue这个队列数据结构,需要说明的是这个数据结构是线程安全的(在多线程访问的时候,阻塞式的)。为什么要使用生产者消费者模式呢?通过将请求放置在生产者队列,然后服务器执行的时候从生产者者队列中获得请求,然后放置在消费者队列中,我们可以控制消费者队列的大小,这样可以控制服务器最多同时处理请求的数量,从而使服务器更加稳定。

Talk is cheap, show me the code!

首先建立队列

from queue import Queue
from flask import request, Flask


# 请求队列
request_queue = Queue()

# 执行队列
MAX_EXECUTING_SIZE = 5
executing_queue = Queue(MAX_EXECUTING_SIZE)

# 新建flask应用
app = Flask(__name__)

将所有请求放入请求队列中

@app.route("/dojob", methods=['POST'])
def dojob():
if request.method == 'POST':
request_queue.put(request.get_data())

真正的执行接口从请求队列中取出请求

@app.route("/do", methods=['POST'])
def do():
if request.method == 'POST':
# 判断执行队列是否已经满了
if executing_queue.qsize() >= MAX_EXECUTING_SIZE:
return "Queue has been full fitted"
if request_queue.qsize() == 0:
return "The request queue is empty"
data = request_queue.get()
executing_queue.put(data)
# do something
pass
executing_queue.get()
return 'OK'

真正的执行接口判断当前的执行队列是否已经超过最大执行的数量,如果超过了,就不会进行请求,第二步,会判断当前请求队列中是否为空,如果不为空,才能从请求队列中获取数据进行处理。

消费者进程定时进行消费

from thread import Thread
import request
Class comsumer(Thread):
def run(self):
while True:
if executing_queue.qsize() < MAX_QUQUE_SIZE and request_queue.qsize() > 0:
request.post("http:127.0.0.1:8000/do")

当执行队列没有满的时候且请求队列不为空的时候,消费者会请求真正的执行接口进行消费。

到此,一个完整的基于消费者生产者的服务器请求队列已经完成,最终效果加上前端如下所示:

Rating: 5.0/5. From 1 vote.
Please wait...