自己动手开发简单消息队列(异步任务队列):Python实现
又是造轮子系列咯,Python上有很多成熟完善的异步任务队列框架可以用,比如Celery,或者RQ,不过这些都不自带消息队列服务,都需要使用Redis、RabbitMQ之类的消息队列才行,我用到小项目中又不需要附带这么多东西,于是自己动手来实现咯。
思路
- 将需要异步执行的任务添加到队列
- 自动从队列中取出任务,创建新线程执行
- 保存任务的执行结果和输出
- 任务完成,调用回调函数,处理返回的数据
- 使用输出重定向处理任务的输出
下面分析一下几个关键部分的代码实现
优先级队列
Python标准库已经有实现好的优先级队列了,但是当在相同优先级下传入同样的对象时,他会自动比较这些对象,不过我们的Task类没有实现相关的运算符重载,所以无法比较。
要解决这个问题,要不就在Task里面实现运算符重载,要不就是自己实现一个优先级队列,我选择后者。
class PriorityQueue:
"""
自己实现的优先级队列,使用了Python的堆
"""
def __init__(self):
self._index = 0
self.queue = []
def push(self, priority, val):
heapq.heappush(self.queue, (priority, self._index, val))
self._index += 1
def pop(self):
return heapq.heappop(self.queue)[-1]
@property
def empty(self):
return len(self.queue) == 0
输出重定向
其实这个挺坑的了,毕竟是异步任务队列,难免涉及到线程的竞争问题,又没办法单独控制每个人物的输出,不过我还是做了,小项目的话还是可以用的(其实就是懒得移植一下已有代码)
代码如下,同样是使用队列来实现输出缓冲区,大小可以自己调整,默认支持输出到控制台、文件或者返回列表。
一般重定向输出的话需要自己实现可以输出处理函数,直接给Redirection类的custom 属性赋值即可。
class Redirection:
def __init__(self, buffer_size=512):
self.buffer = Queue(maxsize=512)
self._console = sys.stdout
# 自定义的输出端
self.custom = None
def write(self, output_stream):
# 加入缓冲区队列
self.buffer.put(output_stream)
def to_console(self):
sys.stdout = self._console
# 出列
while not self.buffer.empty():
print(self.buffer.get())
def to_file(self, file_path):
with open(file_path, 'w+') as f:
sys.stdout = f
while not self.buffer.empty():
print(self.buffer.get())
f.close()
def to_custom(self):
while not self.buffer.empty():
self.custom(self.buffer.get())
def to_list(self):
data = []
while not self.buffer.empty():
data.append(self.buffer.get())
return data
def flush(self):
self.buffer.empty()
def reset(self):
sys.stdout = self._console
Task类
没什么好说的,定义任务类,Task,代码如下:
class Task:
def __init__(self, func, callback=None, priority=Priority.MIDDLE, args=(), kwargs={}):
"""
Args:
func: 需要执行的函数
callback: 执行完的回调函数
priority: 优先级
*args:
**kwargs:
"""
self._id = uuid.uuid4().hex
self.function = func
self.callback = callback
self.priority = priority
self.args = args
self.kwargs = kwargs
# 任务运行过程的输出,stdout的输出
self._outputs: Redirection = None
@property
def id(self):
return self._id
@property
def outputs(self) -> Redirection:
return self._outputs
@outputs.setter
def outputs(self, value: Redirection):
self._outputs = value
def run(self):
try:
if self.callback:
# 回调函数原型 callback(task_obj, result)
result = self.callback(self, self.function(*self.args, **self.kwargs))
else:
result = self.function(*self.args, **self.kwargs)
return result
except Exception as e:
if self.callback:
result = self.callback(self, e)
else:
result = e
return result
使用起来很简单,如下:
def fun1(num1, num2):
print(f'num1={num1}')
print(f'num2={num2}')
return num1 + num2
Task(
func=fun1,
callback=lambda task, result: print(f'task result: {result}'),
)
# 也可以写成这样
Task(
func=lambda num1, num2: num1 + num2,
callback=lambda task, result: print(f'task result: {result}'),
args=[2, 3]
)
任务队列
最后是实现任务队列,也很简单,根据任务的优先级进行调度即可。
class TaskQueue:
"""
基于线程的异步任务队列
todo 下次做一个基于进程的队列,充分利用多核CPU性能
"""
def __init__(self, output_redirect=False):
self.queue = PriorityQueue()
self.output_redirect = output_redirect
self._redirect_objs = {}
self._results = {}
def put(self, task: Task):
"""
将task加入任务列表
Args:
task:
Returns:返回task id
"""
self.queue.push(task.priority, task)
return task.id
def get(self):
return self.queue.pop()
def run(self):
while not self.queue.empty:
task = self.get()
# 开启新线程
t = threading.Thread(target=self._task_wrapper, name=task.id, args=[task])
self._log(f'Start thread {task.id}')
t.start()
def get_output(self, task_id: str) -> Redirection:
return self._redirect_objs.get(task_id, None)
def get_result(self, task_id: str):
return self._results.get(task_id, None)
@staticmethod
def _log(msg: str):
"""日志输出接口,可以替换为日志组件"""
print(f'[TaskQueue] {msg}')
def _task_wrapper(self, task: Task):
if self.output_redirect:
if task.id in self._redirect_objs:
redirect_obj = self._redirect_objs[task.id]
else:
redirect_obj = Redirection(2048)
self._redirect_objs[task.id] = redirect_obj
# 重定向输出
sys.stdout = redirect_obj
task.outputs = redirect_obj
result = task.run()
# 恢复默认输出
redirect_obj.reset()
self._log(f'Task finished. {task.id}')
else:
result = task.run()
# 保存结果
self._results[task.id] = result
整个项目代码就在这了,很简单,也有很多不足的地方,不过小项目用用勉强还可以吧~
过几天发布到pip,有需要的同学直接pip安装就可以使用,请关注博客更新。
Demo
附上使用任务队列的demo代码:
if __name__ == '__main__':
task_queue = TaskQueue(output_redirect=True)
def fun1():
time.sleep(2)
return 1
def fun2():
time.sleep(3)
return 2
task_queue.put(Task(
func=fun1,
callback=lambda task, result: print(f'task1 result: {result}'),
))
task_queue.put(Task(
func=fun2,
callback=lambda task, result: print(f'task2 result: {result}'),
))
def custom_output(msg):
print(f'[custom] {msg}')
def fun3(num1, num2):
print(f'num1={num1}')
print(f'num2={num2}')
return num1 + num2
def callback(task_obj, result):
print(f'task3 result={result}')
output = task_obj.outputs
output.custom = custom_output
output.reset()
output.to_custom()
task_queue.put(Task(func=fun3, callback=callback, args=[2, 3]))
print('task queue run')
task_queue.run()
print('do other things...')
for i in range(0, 100):
print(i * i)
参考资料
- 运算符重载 https://blog.csdn.net/goodlixueyong/article/details/52589979
- https://www.open-open.com/code/view/1433410658322
- Celery异步任务队列 https://www.cnblogs.com/StitchSun/p/8552488.html
- https://zhuanlan.zhihu.com/p/37637660
欢迎交流
交流问题请在微信公众号后台留言,每一条信息我都会回复哈~ - 微信公众号:画星星高手 - 打代码直播间:https://live.bilibili.com/11883038 - 知乎:https://www.zhihu.com/people/dealiaxy - 简书:https://www.jianshu.com/u/965b95853b9f