协程是一种轻量级的线程,可以在单线程中模拟多个线程的并发操作。协程是 Python 3.4 引入的新特性,它是一种比线程更轻量级的并发模型。与线程不同,协程不需要操作系统介入,可以通过程序员的控制来实现并发操作。

asyncio 是 Python 中用于编写异步代码的标准库。它提供了一组用于编写协程(coroutine)的工具和 API,可以在单线程中实现高效的并发任务处理。

asyncio 模块提供了用于定义协程和异步函数的语法和 API,包括 asyncawait 关键字。使用 async 关键字来定义协程函数,await 关键字用于等待协程或异步函数的执行结果。

asyncio 模块提供了一组事件循环(event loop)和任务(task)的管理工具,可以在单线程中高效地完成异步任务的处理。事件循环是一个无限循环,它会等待所有注册的事件发生,并在事件发生时调用相应的回调函数。任务是协程对象的高层抽象,它可以被添加到事件循环中,以便在事件循环中异步执行。

通过 asyncio,Python 程序员可以方便地编写异步代码,并利用现代计算机的多核 CPU 和高速网络连接,处理大量的并发任务。

协程在 Python 中的实现是通过 asyncio 模块来完成的,下面是一个简单的示例程序来说明协程的使用方式:

import asyncio

async def say_hello(delay, name):
    while True:
        await asyncio.sleep(delay)
        print(f"Hello, {name}!")

async def main():
    tasks = [
        asyncio.create_task(say_hello(1, "Alice")),
        asyncio.create_task(say_hello(2, "Bob")),
        asyncio.create_task(say_hello(3, "Charlie"))
    ]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

当我们运行这个程序时,我们会看到如下输出:

Hello, Alice!
Hello, Bob!
Hello, Alice!
Hello, Charlie!
Hello, Alice!
Hello, Bob!
Hello, Alice!
Hello, Alice!
Hello, Charlie!
。。。。。。

这个示例程序演示了如何使用协程函数和 asyncio 模块来实现异步任务处理。通过协程,我们可以高效地处理大量的并发任务,而不会导致线程或进程的开销增加。


除了使用create_task之外,还可以使用ensure_future

import asyncio

async def socketmain():
    async with serve(echo, "localhost", 8765):
        await asyncio.Future()  # run forever

async def periodic_task():
    while True:
        print("执行周期性任务...")
        await asyncio.sleep(1)  # 暂停1秒

async def main():
    # 创建WebSocket服务任务
    server_task = asyncio.ensure_future(socketmain())

    # 创建周期性任务
    periodic_task_task = asyncio.ensure_future(periodic_task())

    # 等待两个任务全部完成(这将永远不会发生,因为两个任务都是永久的)
    # await asyncio.wait([server_task, periodic_task_task], return_when=asyncio.ALL_COMPLETED)

if __name__ == "__main__":
    asyncio.run(main())

以上展示了在一个loop中管理协程,但是在实战中往往会遇到非常非常复杂的情况,下面的代码展示了如何在一个子线程中维护一个loop

from flask import Flask
import threading
import asyncio
from websockets.server import serve

app = Flask(__name__)

@app.route('/')
def hello():
    return 'Hello, World!'

async def echo(websocket, path):
    async for message in websocket:
        await websocket.send(message)

def run_flask():
    app.run(host="0.0.0.0", port=5000)

def run_websocket():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    server = serve(echo, "localhost", 8769)
    loop.run_until_complete(server)
    loop.run_forever()

if __name__ == '__main__':
    flask_thread = threading.Thread(target=run_flask)
    flask_thread.start()

    websocket_thread = threading.Thread(target=run_websocket)
    websocket_thread.start()

    flask_thread.join()
    websocket_thread.join()

同步机制

Lock锁保护

import asyncio

key = False

async def request_by_many():
    lock = asyncio.Lock()
    async with lock:
        if key is False:
            await only_run_once()

async def only_run_once():
    global key
    print(f"Only run once function called")
    key = True
    await asyncio.sleep(1)

async def with_lock():
    tasks = []
    for i in range(10):
        tasks.append(asyncio.create_task(request_by_many()))
    await asyncio.gather(*tasks)

asyncio.run(with_lock())