自己动手开发简单消息队列(异步任务队列):Python实现

又是造轮子系列咯,Python上有很多成熟完善的异步任务队列框架可以用,比如Celery,或者RQ,不过这些都不自带消息队列服务,都需要使用Redis、RabbitMQ之类的消息队列才行,我用到小项目中又不需要附带这么多东西,于是自己动手来实现咯。

思路

  1. 将需要异步执行的任务添加到队列
  2. 自动从队列中取出任务,创建新线程执行
  3. 保存任务的执行结果和输出
  4. 任务完成,调用回调函数,处理返回的数据
  5. 使用输出重定向处理任务的输出

下面分析一下几个关键部分的代码实现

优先级队列

Python标准库已经有实现好的优先级队列了,但是当在相同优先级下传入同样的对象时,他会自动比较这些对象,不过我们的Task类没有实现相关的运算符重载,所以无法比较。

要解决这个问题,要不就在Task里面实现运算符重载,要不就是自己实现一个优先级队列,我选择后者。

class PriorityQueue:
    """
    自己实现的优先级队列,使用了Python的堆
    """
    def __init__(self):
        self._index = 0
        self.queue = []

    def push(self, priority, val):
        heapq.heappush(self.queue, (priority, self._index, val))
        self._index += 1

    def pop(self):
        return heapq.heappop(self.queue)[-1]

    @property
    def empty(self):
        return len(self.queue) == 0

输出重定向

其实这个挺坑的了,毕竟是异步任务队列,难免涉及到线程的竞争问题,又没办法单独控制每个人物的输出,不过我还是做了,小项目的话还是可以用的(其实就是懒得移植一下已有代码)

代码如下,同样是使用队列来实现输出缓冲区,大小可以自己调整,默认支持输出到控制台、文件或者返回列表。

一般重定向输出的话需要自己实现可以输出处理函数,直接给Redirection类的custom 属性赋值即可。

class Redirection:
    def __init__(self, buffer_size=512):
        self.buffer = Queue(maxsize=512)
        self._console = sys.stdout
        # 自定义的输出端
        self.custom = None

    def write(self, output_stream):
        # 加入缓冲区队列
        self.buffer.put(output_stream)

    def to_console(self):
        sys.stdout = self._console
        # 出列
        while not self.buffer.empty():
            print(self.buffer.get())

    def to_file(self, file_path):
        with open(file_path, 'w+') as f:
            sys.stdout = f
            while not self.buffer.empty():
                print(self.buffer.get())
            f.close()

    def to_custom(self):
        while not self.buffer.empty():
            self.custom(self.buffer.get())

    def to_list(self):
        data = []
        while not self.buffer.empty():
            data.append(self.buffer.get())
        return data

    def flush(self):
        self.buffer.empty()

    def reset(self):
        sys.stdout = self._console

Task类

没什么好说的,定义任务类,Task,代码如下:

class Task:
    def __init__(self, func, callback=None, priority=Priority.MIDDLE, args=(), kwargs={}):
        """
        Args:
            func: 需要执行的函数
            callback:  执行完的回调函数
            priority: 优先级
            *args:
            **kwargs:
        """
        self._id = uuid.uuid4().hex
        self.function = func
        self.callback = callback
        self.priority = priority
        self.args = args
        self.kwargs = kwargs
        # 任务运行过程的输出,stdout的输出
        self._outputs: Redirection = None

    @property
    def id(self):
        return self._id

    @property
    def outputs(self) -> Redirection:
        return self._outputs

    @outputs.setter
    def outputs(self, value: Redirection):
        self._outputs = value

    def run(self):
        try:
            if self.callback:
                # 回调函数原型 callback(task_obj, result)
                result = self.callback(self, self.function(*self.args, **self.kwargs))
            else:
                result = self.function(*self.args, **self.kwargs)
            return result
        except Exception as e:
            if self.callback:
                result = self.callback(self, e)
            else:
                result = e
            return result

使用起来很简单,如下:

def fun1(num1, num2):
    print(f'num1={num1}')
    print(f'num2={num2}')
    return num1 + num2

Task(
    func=fun1,
    callback=lambda task, result: print(f'task result: {result}'),
)

# 也可以写成这样
Task(
    func=lambda num1, num2: num1 + num2,
    callback=lambda task, result: print(f'task result: {result}'),
    args=[2, 3]
)

任务队列

最后是实现任务队列,也很简单,根据任务的优先级进行调度即可。

class TaskQueue:
    """
    基于线程的异步任务队列
    todo 下次做一个基于进程的队列,充分利用多核CPU性能
    """

    def __init__(self, output_redirect=False):
        self.queue = PriorityQueue()
        self.output_redirect = output_redirect
        self._redirect_objs = {}
        self._results = {}

    def put(self, task: Task):
        """
        将task加入任务列表
        Args:
            task:
        Returns:返回task id
        """
        self.queue.push(task.priority, task)
        return task.id

    def get(self):
        return self.queue.pop()

    def run(self):
        while not self.queue.empty:
            task = self.get()
            # 开启新线程
            t = threading.Thread(target=self._task_wrapper, name=task.id, args=[task])
            self._log(f'Start thread {task.id}')
            t.start()

    def get_output(self, task_id: str) -> Redirection:
        return self._redirect_objs.get(task_id, None)

    def get_result(self, task_id: str):
        return self._results.get(task_id, None)

    @staticmethod
    def _log(msg: str):
        """日志输出接口,可以替换为日志组件"""
        print(f'[TaskQueue] {msg}')

    def _task_wrapper(self, task: Task):
        if self.output_redirect:
            if task.id in self._redirect_objs:
                redirect_obj = self._redirect_objs[task.id]
            else:
                redirect_obj = Redirection(2048)
                self._redirect_objs[task.id] = redirect_obj
            # 重定向输出
            sys.stdout = redirect_obj
            task.outputs = redirect_obj
            result = task.run()
            # 恢复默认输出
            redirect_obj.reset()
            self._log(f'Task finished. {task.id}')
        else:
            result = task.run()

        # 保存结果
        self._results[task.id] = result

整个项目代码就在这了,很简单,也有很多不足的地方,不过小项目用用勉强还可以吧~

过几天发布到pip,有需要的同学直接pip安装就可以使用,请关注博客更新。

Demo

附上使用任务队列的demo代码:

if __name__ == '__main__':
    task_queue = TaskQueue(output_redirect=True)


    def fun1():
        time.sleep(2)
        return 1


    def fun2():
        time.sleep(3)
        return 2


    task_queue.put(Task(
        func=fun1,
        callback=lambda task, result: print(f'task1 result: {result}'),
    ))
    task_queue.put(Task(
        func=fun2,
        callback=lambda task, result: print(f'task2 result: {result}'),
    ))


    def custom_output(msg):
        print(f'[custom] {msg}')


    def fun3(num1, num2):
        print(f'num1={num1}')
        print(f'num2={num2}')
        return num1 + num2


    def callback(task_obj, result):
        print(f'task3 result={result}')
        output = task_obj.outputs
        output.custom = custom_output
        output.reset()
        output.to_custom()


    task_queue.put(Task(func=fun3, callback=callback, args=[2, 3]))

    print('task queue run')
    task_queue.run()
    print('do other things...')

    for i in range(0, 100):
        print(i * i)

参考资料

欢迎交流

交流问题请在微信公众号后台留言,每一条信息我都会回复哈~ - 微信公众号:画星星高手 - 打代码直播间:https://live.bilibili.com/11883038 - 知乎:https://www.zhihu.com/people/dealiaxy - 简书:https://www.jianshu.com/u/965b95853b9f