Python中级教程 第五课:异步编程与并发 1/2

浏览量:23 次 发布时间:2026-01-15 18:58 作者:明扬工控商城 下载docx

最近更新:Python中级教程 第三课:面向对象编程深入与装饰器

第一部分:异步编程基础

1.1 什么是异步编程?

异步编程允许程序在等待某些操作(如I/O)完成时继续执行其他任务,而不是阻塞等待。


python

import asyncio

import time


# 同步版本

def sync_task(name, delay):

   print(f"{name} 开始")

   time.sleep(delay)  # 阻塞

   print(f"{name} 结束")

   return f"{name} 完成"


# 异步版本

async def async_task(name, delay):

   print(f"{name} 开始")

   await asyncio.sleep(delay)  # 非阻塞

   print(f"{name} 结束")

   return f"{name} 完成"


# 同步执行

print("=== 同步执行 ===")

start = time.time()

sync_task("任务1", 2)

sync_task("任务2", 1)

print(f"总耗时: {time.time() - start:.2f}秒")


# 异步执行

print("\n=== 异步执行 ===")

async def main():

   start = time.time()

   

   # 创建任务

   task1 = asyncio.create_task(async_task("任务1", 2))

   task2 = asyncio.create_task(async_task("任务2", 1))

   

   # 等待所有任务完成

   results = await asyncio.gather(task1, task2)

   

   print(f"结果: {results}")

   print(f"总耗时: {time.time() - start:.2f}秒")


# 运行异步程序

asyncio.run(main())

1.2 async/await 基础语法

python

import asyncio


# 定义异步函数

async def fetch_data(url, delay=1):

   print(f"开始获取 {url}")

   await asyncio.sleep(delay)  # 模拟网络请求

   print(f"完成获取 {url}")

   return {"url": url, "data": f"来自{url}的数据"}


async def process_data(data):

   print(f"处理数据: {data}")

   await asyncio.sleep(0.5)  # 模拟数据处理

   return f"已处理: {data}"


async def main():

   # 顺序执行(不推荐)

   print("=== 顺序执行 ===")

   result1 = await fetch_data("https://api.example.com/1", 1)

   result2 = await fetch_data("https://api.example.com/2", 2)

   print(f"结果: {result1}, {result2}")

   

   # 并发执行(推荐)

   print("\n=== 并发执行 ===")

   task1 = asyncio.create_task(fetch_data("https://api.example.com/1", 1))

   task2 = asyncio.create_task(fetch_data("https://api.example.com/2", 2))

   

   # 等待任务完成

   result1 = await task1

   result2 = await task2

   

   # 处理结果

   processed1 = await process_data(result1)

   processed2 = await process_data(result2)

   

   print(f"最终结果: {processed1}, {processed2}")


asyncio.run(main())

1.3 异步上下文管理器

python

import asyncio

from contextlib import asynccontextmanager

import aiohttp  # 需要安装: pip install aiohttp


# 自定义异步上下文管理器

class AsyncDatabaseConnection:

   def __init__(self, db_name):

       self.db_name = db_name

       self.connection = None

   

   async def __aenter__(self):

       print(f"连接到数据库: {self.db_name}")

       # 模拟异步连接

       await asyncio.sleep(0.5)

       self.connection = f"AsyncConnection to {self.db_name}"

       return self

   

   async def __aexit__(self, exc_type, exc_val, exc_tb):

       print(f"关闭数据库连接: {self.db_name}")

       self.connection = None

       await asyncio.sleep(0.2)  # 模拟清理

       if exc_type:

           print(f"发生异常: {exc_type.__name__}")

       return False

   

   async def query(self, sql):

       print(f"执行查询: {sql}")

       await asyncio.sleep(0.3)

       return f"结果: {sql}"


# 使用@asynccontextmanager装饰器

@asynccontextmanager

async def async_timer(name):

   """异步计时器"""

   start = asyncio.get_event_loop().time()

   print(f"[{name}] 开始")

   try:

       yield

   finally:

       end = asyncio.get_event_loop().time()

       print(f"[{name}] 结束,耗时: {end - start:.2f}秒")


# 使用示例

async def main():

   # 使用类实现的异步上下文管理器

   async with AsyncDatabaseConnection("test_db") as db:

       result = await db.query("SELECT * FROM users")

       print(result)

   

   print()

   

   # 使用装饰器实现的异步上下文管理器

   async with async_timer("计算任务"):

       await asyncio.sleep(1)

       total = sum(range(1000000))

       print(f"计算结果: {total}")


asyncio.run(main())

第二部分:asyncio 核心组件

2.1 任务(Task)管理

python

import asyncio

import random


async def worker(name, queue):

   """工作协程"""

   while True:

       # 从队列获取任务

       task_id = await queue.get()

       

       # 模拟工作

       delay = random.uniform(0.5, 2.0)

       print(f"Worker {name} 开始处理任务 {task_id} (预计 {delay:.1f}秒)")

       await asyncio.sleep(delay)

       

       # 标记任务完成

       queue.task_done()

       print(f"Worker {name} 完成任务 {task_id}")


async def task_producer(queue, num_tasks=10):

   """任务生产者"""

   for i in range(num_tasks):

       await queue.put(i)

       print(f"已添加任务 {i} 到队列")

       await asyncio.sleep(0.2)  # 控制添加速度


async def monitor_tasks(tasks, interval=1):

   """任务监控器"""

   while True:

       running = sum(1 for task in tasks if not task.done())

       print(f"[监控] 运行中的任务: {running}/{len(tasks)}")

       

       if running == 0:

           break

           

       await asyncio.sleep(interval)


async def main():

   # 创建任务队列

   queue = asyncio.Queue(maxsize=5)

   

   # 创建多个工作协程

   workers = [

       asyncio.create_task(worker(f"W{i}", queue))

       for i in range(3)  # 3个工人

   ]

   

   # 创建生产者

   producer = asyncio.create_task(task_producer(queue, 15))

   

   # 创建监控器

   monitor = asyncio.create_task(monitor_tasks([producer] + workers))

   

   # 等待所有任务完成

   await queue.join()  # 等待队列清空

   

   # 取消工作协程

   for w in workers:

       w.cancel()

   

   # 等待所有任务结束

   await asyncio.gather(producer, monitor, *workers, return_exceptions=True)

   

   print("所有任务完成!")


asyncio.run(main())

2.2 Future 对象

python

import asyncio


# 创建 Future 对象

async def create_future_demo():

   loop = asyncio.get_running_loop()

   

   # 创建 Future

   future = loop.create_future()

   print(f"Future 状态: {future.done()}")

   

   # 设置结果(模拟异步操作完成后调用)

   async def set_result_later():

       await asyncio.sleep(1)

       if not future.done():

           future.set_result("Future 完成!")

   

   # 设置异常(模拟错误)

   async def set_exception_later():

       await asyncio.sleep(0.5)

       if not future.done():

           future.set_exception(ValueError("发生错误!"))

   

   # 启动设置结果的任务

   asyncio.create_task(set_result_later())

   # asyncio.create_task(set_exception_later())  # 可以取消注释测试异常情况

   

   try:

       # 等待 Future 完成

       result = await future

       print(f"收到结果: {result}")

   except Exception as e:

       print(f"捕获异常: {e}")

   

   print(f"Future 最终状态: {future.done()}")

   print(f"Future 结果: {future.result() if future.done() and not future.cancelled() else '无结果'}")


# 使用 Future 回调

async def future_with_callbacks():

   loop = asyncio.get_running_loop()

   future = loop.create_future()

   

   # 添加完成回调

   def on_done(fut):

       if fut.cancelled():

           print("回调: Future 被取消")

       elif fut.exception():

           print(f"回调: 发生异常 {fut.exception()}")

       else:

           print(f"回调: 收到结果 {fut.result()}")

   

   future.add_done_callback(on_done)

   

   # 设置结果

   asyncio.create_task(

       asyncio.sleep(0.5, result="Future 结果")

   ).add_done_callback(

       lambda task: future.set_result(task.result())

   )

   

   result = await future

   print(f"主程序收到: {result}")


# 运行示例

async def main():

   print("=== Future 基础示例 ===")

   await create_future_demo()

   

   print("\n=== Future 回调示例 ===")

   await future_with_callbacks()


asyncio.run(main())

2.3 事件循环(Event Loop)

python

import asyncio

import threading

import time


# 获取和操作事件循环

async def event_loop_demo():

   # 获取当前事件循环

   loop = asyncio.get_running_loop()

   print(f"当前事件循环: {loop}")

   print(f"循环是否运行: {loop.is_running()}")

   print(f"循环是否关闭: {loop.is_closed()}")

   

   # 安排回调

   def regular_callback():

       print(f"常规回调在 {threading.current_thread().name}")

   

   loop.call_soon(regular_callback)

   

   # 安排延迟回调

   def delayed_callback():

       print("延迟回调 (1秒后)")

   

   loop.call_later(1, delayed_callback)

   

   # 在指定时间执行回调

   current_time = loop.time()

   def timed_callback():

       print(f"定时回调 (在循环时间 {loop.time():.1f})")

   

   loop.call_at(current_time + 2, timed_callback)

   

   # 运行一段时间

   await asyncio.sleep(2.5)


# 在不同线程中运行事件循环

def run_in_thread():

   """在新线程中运行事件循环"""

   async def thread_task():

       print(f"在线程 {threading.current_thread().name} 中运行")

       await asyncio.sleep(1)

       return "线程任务完成"

   

   # 创建新的事件循环

   loop = asyncio.new_event_loop()

   

   try:

       # 在新线程中运行事件循环

       result = loop.run_until_complete(thread_task())

       print(f"结果: {result}")

   finally:

       loop.close()


# 主程序

async def main():

   print("=== 事件循环基础 ===")

   await event_loop_demo()

   

   print("\n=== 多线程事件循环 ===")

   # 在主线程运行主任务

   main_task = asyncio.create_task(asyncio.sleep(0.5, result="主任务"))

   

   # 在新线程中运行事件循环

   import concurrent.futures

   with concurrent.futures.ThreadPoolExecutor() as executor:

       executor.submit(run_in_thread)

   

   # 等待主任务完成

   result = await main_task

   print(f"主程序: {result}")


asyncio.run(main())

第三部分:异步生成器与迭代

3.1 异步生成器

python

import asyncio

import random


# 异步生成器

async def async_generator(limit=5):

   """生成一系列数字"""

   for i in range(limit):

       await asyncio.sleep(random.uniform(0.1, 0.5))  # 模拟异步延迟

       yield i


# 使用 async for 迭代异步生成器

async def process_async_gen():

   print("开始处理异步生成器...")

   

   # 使用 async for

   async for value in async_generator(5):

       print(f"收到值: {value}")

       # 模拟处理

       await asyncio.sleep(0.1)

   

   print("处理完成!")


# 异步生成器表达式

async def async_gen_expression():

   # 异步生成器表达式

   gen = (i async for i in async_generator(3))

   

   async for value in gen:

       print(f"表达式生成: {value}")


# 异步生成器与普通生成器的混合

async def mixed_generators():

   # 普通生成器

   def sync_gen():

       for i in range(3):

           yield f"sync:{i}"

   

   # 异步生成器

   async def async_gen():

       for i in range(3):

           await asyncio.sleep(0.1)

           yield f"async:{i}"

   

   print("混合迭代:")

   

   # 迭代普通生成器(同步)

   for item in sync_gen():

       print(f"同步: {item}")

   

   # 迭代异步生成器

   async for item in async_gen():

       print(f"异步: {item}")


# 运行示例

async def main():

   print("=== 异步生成器基础 ===")

   await process_async_gen()

   

   print("\n=== 异步生成器表达式 ===")

   await async_gen_expression()

   

   print("\n=== 混合生成器 ===")

   await mixed_generators()


asyncio.run(main())

3.2 异步迭代器

python

import asyncio

from collections import deque


# 自定义异步迭代器

class AsyncDataStream:

   """异步数据流迭代器"""

   

   def __init__(self, data_source):

       self.data_source = data_source

       self.index = 0

   

   def __aiter__(self):

       return self

   

   async def __anext__(self):

       if self.index >= len(self.data_source):

           raise StopAsyncIteration

       

       # 模拟异步获取数据

       await asyncio.sleep(0.2)

       item = self.data_source[self.index]

       self.index += 1

       

       return item


# 异步队列迭代器

class AsyncQueueIterator:

   """异步队列迭代器"""

   

   def __init__(self, max_items=10):

       self.queue = asyncio.Queue()

       self.max_items = max_items

       self.produced = 0

   

   async def producer(self):

       """生产者协程"""

       while self.produced < self.max_items:

           item = f"item_{self.produced}"

           await self.queue.put(item)

           self.produced += 1

           print(f"生产: {item}")

           await asyncio.sleep(0.1)

       

       # 发送结束信号

       await self.queue.put(None)

   

   def __aiter__(self):

       return self

   

   async def __anext__(self):

       item = await self.queue.get()

       

       if item is None:

           # 将结束信号放回队列,以便其他消费者也能收到

           await self.queue.put(None)

           raise StopAsyncIteration

       

       return item


# 使用示例

async def main():

   print("=== 自定义异步迭代器 ===")

   

   data = ["数据1", "数据2", "数据3", "数据4", "数据5"]

   stream = AsyncDataStream(data)

   

   async for item in stream:

       print(f"从流中获取: {item}")

   

   print("\n=== 异步队列迭代器 ===")

   

   queue_iter = AsyncQueueIterator(5)

   

   # 启动生产者

   producer_task = asyncio.create_task(queue_iter.producer())

   

   # 消费者1

   async def consumer1():

       async for item in queue_iter:

           print(f"消费者1 收到: {item}")

           await asyncio.sleep(0.15)  # 模拟处理时间

   

   # 消费者2

   async def consumer2():

       async for item in queue_iter:

           print(f"消费者2 收到: {item}")

           await asyncio.sleep(0.25)  # 模拟处理时间

   

   # 运行消费者

   await asyncio.gather(

       consumer1(),

       consumer2(),

       producer_task

   )

   

   print("所有消费完成!")


asyncio.run(main())

第四部分:并发与并行

4.1 asyncio 并发模式

python

import asyncio

import time


# 并发执行多个任务

async def concurrent_demo():

   # 定义多个异步任务

   async def task(name, delay):

       print(f"{name} 开始")

       await asyncio.sleep(delay)

       print(f"{name} 完成")

       return f"{name} 结果"

   

   print("=== asyncio.gather() ===")

   

   # 使用 gather 并发执行

   results = await asyncio.gather(

       task("任务A", 2),

       task("任务B", 1),

       task("任务C", 3),

       return_exceptions=True  # 防止一个任务失败影响其他

   )

   

   print(f"所有结果: {results}")

   

   print("\n=== asyncio.wait() ===")

   

   # 使用 wait 并发执行

   tasks = [

       asyncio.create_task(task("任务X", 1)),

       asyncio.create_task(task("任务Y", 2)),

       asyncio.create_task(task("任务Z", 0.5)),

   ]

   

   # 等待所有任务完成

   done, pending = await asyncio.wait(tasks, timeout=2.5)

   

   print(f"已完成: {len(done)}")

   print(f"未完成: {len(pending)}")

   

   # 获取结果

   for task in done:

       print(f"任务结果: {task.result()}")

   

   # 取消未完成的任务

   for task in pending:

       task.cancel()


# 限制并发数量

async def limited_concurrency():

   semaphore = asyncio.Semaphore(2)  # 限制同时运行2个

   

   async def limited_task(name, delay):

       async with semaphore:

           print(f"{name} 获得信号量,开始执行")

           await asyncio.sleep(delay)

           print(f"{name} 释放信号量")

           return f"{name} 完成"

   

   # 创建多个任务

   tasks = [

       limited_task(f"任务{i}", i % 3 + 1)

       for i in range(6)

   ]

   

   # 并发执行

   results = await asyncio.gather(*tasks)

   print(f"所有任务完成: {results}")


# 运行示例

async def main():

   print("=== 并发模式演示 ===")

   await concurrent_demo()

   

   print("\n=== 限制并发数量 ===")

   await limited_concurrency()


asyncio.run(main())

4.2 多线程与异步的混合使用

python

import asyncio

import concurrent.futures

import threading

import time


# CPU密集型任务(不适合asyncio,适合线程池)

def cpu_intensive_task(n):

   """计算密集型任务"""

   print(f"CPU任务开始 (线程: {threading.current_thread().name})")

   result = sum(i * i for i in range(n))

   time.sleep(1)  # 模拟CPU计算

   print(f"CPU任务完成: {result}")

   return result


# IO密集型任务(适合asyncio)

async def io_intensive_task(name, delay):

   """IO密集型任务"""

   print(f"IO任务 {name} 开始")

   await asyncio.sleep(delay)

   print(f"IO任务 {name} 完成")

   return f"{name} 结果"


# 混合使用:在线程池中运行CPU密集型任务

async def mixed_execution():

   print("=== 混合执行模式 ===")

   

   # 创建线程池

   loop = asyncio.get_running_loop()

   

   with concurrent.futures.ThreadPoolExecutor(max_workers=2) as pool:

       # 在线程池中执行CPU密集型任务

       cpu_future1 = loop.run_in_executor(pool, cpu_intensive_task, 1000000)

       cpu_future2 = loop.run_in_executor(pool, cpu_intensive_task, 2000000)

       

       # 同时执行IO密集型任务

       io_task1 = asyncio.create_task(io_intensive_task("IO1", 1))

       io_task2 = asyncio.create_task(io_intensive_task("IO2", 2))

       

       # 等待所有任务完成

       cpu_result1 = await cpu_future1

       cpu_result2 = await cpu_future2

       io_result1 = await io_task1

       io_result2 = await io_task2

       

       print(f"\n所有任务完成:")

       print(f"CPU结果: {cpu_result1}, {cpu_result2}")

       print(f"IO结果: {io_result1}, {io_result2}")


# 异步任务与同步函数的桥接

async def sync_async_bridge():

   """同步函数与异步代码的桥接"""

   

   # 同步函数

   def sync_function(x):

       time.sleep(0.5)

       return x * 2

   

   # 将同步函数包装为异步

   async def async_wrapper(func, *args):

       loop = asyncio.get_running_loop()

       

       # 在线程池中执行同步函数

       return await loop.run_in_executor(None, func, *args)

   

   print("\n=== 同步异步桥接 ===")

   

   # 并发执行同步函数的异步版本

   tasks = [

       async_wrapper(sync_function, i)

       for i in range(1, 6)

   ]

   

   results = await asyncio.gather(*tasks)

   print(f"桥接执行结果: {results}")


# 运行示例

async def main():

   await mixed_execution()

   await sync_async_bridge()


asyncio.run(main())

4.3 异步锁与同步原语

python

import asyncio


# 异步锁

async def async_lock_demo():

   lock = asyncio.Lock()

   shared_data = []

   

   async def modify_data(name):

       async with lock:  # 获取锁

           print(f"{name} 获取锁")

           await asyncio.sleep(0.2)  # 模拟操作

           

           shared_data.append(f"数据来自 {name}")

           print(f"{name} 修改数据: {shared_data}")

           print(f"{name} 释放锁")

   

   # 并发修改

   tasks = [modify_data(f"任务{i}") for i in range(5)]

   await asyncio.gather(*tasks)

   

   print(f"最终数据: {shared_data}")


# 信号量

async def semaphore_demo():

   semaphore = asyncio.Semaphore(2)  # 最多2个同时访问

   

   async def limited_access(name, delay):

       async with semaphore:

           print(f"{name} 获得访问权限")

           await asyncio.sleep(delay)

           print(f"{name} 释放访问权限")

       return f"{name} 完成"

   

   # 多个任务竞争访问

   tasks = [

       limited_access(f"访问者{i}", i % 2 + 1)

       for i in range(6)

   ]

   

   await asyncio.gather(*tasks)


# 事件

async def event_demo():

   event = asyncio.Event()

   

   async def waiter(name):

       print(f"{name} 等待事件")

       await event.wait()

       print(f"{name} 收到事件")

   

   async def setter():

       await asyncio.sleep(1)

       print("设置事件")

       event.set()

   

   # 创建等待者

   waiters = [asyncio.create_task(waiter(f"等待者{i}")) for i in range(3)]

   

   # 设置事件

   await setter()

   

   # 等待所有等待者完成

   await asyncio.gather(*waiters)


# 运行示例

async def main():

   print("=== 异步锁 ===")

   await async_lock_demo()

   

   print("\n=== 信号量 ===")

   await semaphore_demo()

   

   print("\n=== 事件 ===")

   await event_demo()


asyncio.run(main())

第五部分:综合实战


明扬工控商城

推荐阅读:

Python中级教程 第六课:高级特性与性能优化 2/2

Python中级教程 第六课:高级特性与性能优化 1/2

Python中级教程 第五课:异步编程与并发 2/2

Python中级教程 第五课:异步编程与并发 1/2

Python中级教程 第四课:上下文管理器、生成器与迭代器

Python中级教程 第三课:面向对象编程深入与装饰器

热门标签:
Python中级教程 第五课:异步编程与并发 1/2.docx

将本文的Word文档下载到电脑

推荐度:

下载

全部评论

请登录
产业新闻-明扬资讯网
科技资讯-明扬资讯网