最近更新:Python中级教程 第六课:高级特性与性能优化 1/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 2/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 1/2
2026-01-15
最近更新:Python中级教程 第四课:上下文管理器、生成器与迭代器
2026-01-15
最近更新:Python中级教程 第三课:面向对象编程深入与装饰器
2026-01-15
浏览量:23 次 发布时间:2026-01-15 18:58 作者:明扬工控商城 下载docx
最近更新:Python中级教程 第六课:高级特性与性能优化 1/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 2/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 1/2
2026-01-15
最近更新:Python中级教程 第四课:上下文管理器、生成器与迭代器
2026-01-15
最近更新:Python中级教程 第三课:面向对象编程深入与装饰器
2026-01-15
第一部分:异步编程基础
1.1 什么是异步编程?
异步编程允许程序在等待某些操作(如I/O)完成时继续执行其他任务,而不是阻塞等待。
python
import asyncio
import time
# 同步版本
def sync_task(name, delay):
print(f"{name} 开始")
time.sleep(delay) # 阻塞
print(f"{name} 结束")
return f"{name} 完成"
# 异步版本
async def async_task(name, delay):
print(f"{name} 开始")
await asyncio.sleep(delay) # 非阻塞
print(f"{name} 结束")
return f"{name} 完成"
# 同步执行
print("=== 同步执行 ===")
start = time.time()
sync_task("任务1", 2)
sync_task("任务2", 1)
print(f"总耗时: {time.time() - start:.2f}秒")
# 异步执行
print("\n=== 异步执行 ===")
async def main():
start = time.time()
# 创建任务
task1 = asyncio.create_task(async_task("任务1", 2))
task2 = asyncio.create_task(async_task("任务2", 1))
# 等待所有任务完成
results = await asyncio.gather(task1, task2)
print(f"结果: {results}")
print(f"总耗时: {time.time() - start:.2f}秒")
# 运行异步程序
asyncio.run(main())
1.2 async/await 基础语法
python
import asyncio
# 定义异步函数
async def fetch_data(url, delay=1):
print(f"开始获取 {url}")
await asyncio.sleep(delay) # 模拟网络请求
print(f"完成获取 {url}")
return {"url": url, "data": f"来自{url}的数据"}
async def process_data(data):
print(f"处理数据: {data}")
await asyncio.sleep(0.5) # 模拟数据处理
return f"已处理: {data}"
async def main():
# 顺序执行(不推荐)
print("=== 顺序执行 ===")
result1 = await fetch_data("https://api.example.com/1", 1)
result2 = await fetch_data("https://api.example.com/2", 2)
print(f"结果: {result1}, {result2}")
# 并发执行(推荐)
print("\n=== 并发执行 ===")
task1 = asyncio.create_task(fetch_data("https://api.example.com/1", 1))
task2 = asyncio.create_task(fetch_data("https://api.example.com/2", 2))
# 等待任务完成
result1 = await task1
result2 = await task2
# 处理结果
processed1 = await process_data(result1)
processed2 = await process_data(result2)
print(f"最终结果: {processed1}, {processed2}")
asyncio.run(main())
1.3 异步上下文管理器
python
import asyncio
from contextlib import asynccontextmanager
import aiohttp # 需要安装: pip install aiohttp
# 自定义异步上下文管理器
class AsyncDatabaseConnection:
def __init__(self, db_name):
self.db_name = db_name
self.connection = None
async def __aenter__(self):
print(f"连接到数据库: {self.db_name}")
# 模拟异步连接
await asyncio.sleep(0.5)
self.connection = f"AsyncConnection to {self.db_name}"
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f"关闭数据库连接: {self.db_name}")
self.connection = None
await asyncio.sleep(0.2) # 模拟清理
if exc_type:
print(f"发生异常: {exc_type.__name__}")
return False
async def query(self, sql):
print(f"执行查询: {sql}")
await asyncio.sleep(0.3)
return f"结果: {sql}"
# 使用@asynccontextmanager装饰器
@asynccontextmanager
async def async_timer(name):
"""异步计时器"""
start = asyncio.get_event_loop().time()
print(f"[{name}] 开始")
try:
yield
finally:
end = asyncio.get_event_loop().time()
print(f"[{name}] 结束,耗时: {end - start:.2f}秒")
# 使用示例
async def main():
# 使用类实现的异步上下文管理器
async with AsyncDatabaseConnection("test_db") as db:
result = await db.query("SELECT * FROM users")
print(result)
print()
# 使用装饰器实现的异步上下文管理器
async with async_timer("计算任务"):
await asyncio.sleep(1)
total = sum(range(1000000))
print(f"计算结果: {total}")
asyncio.run(main())
第二部分:asyncio 核心组件
2.1 任务(Task)管理
python
import asyncio
import random
async def worker(name, queue):
"""工作协程"""
while True:
# 从队列获取任务
task_id = await queue.get()
# 模拟工作
delay = random.uniform(0.5, 2.0)
print(f"Worker {name} 开始处理任务 {task_id} (预计 {delay:.1f}秒)")
await asyncio.sleep(delay)
# 标记任务完成
queue.task_done()
print(f"Worker {name} 完成任务 {task_id}")
async def task_producer(queue, num_tasks=10):
"""任务生产者"""
for i in range(num_tasks):
await queue.put(i)
print(f"已添加任务 {i} 到队列")
await asyncio.sleep(0.2) # 控制添加速度
async def monitor_tasks(tasks, interval=1):
"""任务监控器"""
while True:
running = sum(1 for task in tasks if not task.done())
print(f"[监控] 运行中的任务: {running}/{len(tasks)}")
if running == 0:
break
await asyncio.sleep(interval)
async def main():
# 创建任务队列
queue = asyncio.Queue(maxsize=5)
# 创建多个工作协程
workers = [
asyncio.create_task(worker(f"W{i}", queue))
for i in range(3) # 3个工人
]
# 创建生产者
producer = asyncio.create_task(task_producer(queue, 15))
# 创建监控器
monitor = asyncio.create_task(monitor_tasks([producer] + workers))
# 等待所有任务完成
await queue.join() # 等待队列清空
# 取消工作协程
for w in workers:
w.cancel()
# 等待所有任务结束
await asyncio.gather(producer, monitor, *workers, return_exceptions=True)
print("所有任务完成!")
asyncio.run(main())
2.2 Future 对象
python
import asyncio
# 创建 Future 对象
async def create_future_demo():
loop = asyncio.get_running_loop()
# 创建 Future
future = loop.create_future()
print(f"Future 状态: {future.done()}")
# 设置结果(模拟异步操作完成后调用)
async def set_result_later():
await asyncio.sleep(1)
if not future.done():
future.set_result("Future 完成!")
# 设置异常(模拟错误)
async def set_exception_later():
await asyncio.sleep(0.5)
if not future.done():
future.set_exception(ValueError("发生错误!"))
# 启动设置结果的任务
asyncio.create_task(set_result_later())
# asyncio.create_task(set_exception_later()) # 可以取消注释测试异常情况
try:
# 等待 Future 完成
result = await future
print(f"收到结果: {result}")
except Exception as e:
print(f"捕获异常: {e}")
print(f"Future 最终状态: {future.done()}")
print(f"Future 结果: {future.result() if future.done() and not future.cancelled() else '无结果'}")
# 使用 Future 回调
async def future_with_callbacks():
loop = asyncio.get_running_loop()
future = loop.create_future()
# 添加完成回调
def on_done(fut):
if fut.cancelled():
print("回调: Future 被取消")
elif fut.exception():
print(f"回调: 发生异常 {fut.exception()}")
else:
print(f"回调: 收到结果 {fut.result()}")
future.add_done_callback(on_done)
# 设置结果
asyncio.create_task(
asyncio.sleep(0.5, result="Future 结果")
).add_done_callback(
lambda task: future.set_result(task.result())
)
result = await future
print(f"主程序收到: {result}")
# 运行示例
async def main():
print("=== Future 基础示例 ===")
await create_future_demo()
print("\n=== Future 回调示例 ===")
await future_with_callbacks()
asyncio.run(main())
2.3 事件循环(Event Loop)
python
import asyncio
import threading
import time
# 获取和操作事件循环
async def event_loop_demo():
# 获取当前事件循环
loop = asyncio.get_running_loop()
print(f"当前事件循环: {loop}")
print(f"循环是否运行: {loop.is_running()}")
print(f"循环是否关闭: {loop.is_closed()}")
# 安排回调
def regular_callback():
print(f"常规回调在 {threading.current_thread().name}")
loop.call_soon(regular_callback)
# 安排延迟回调
def delayed_callback():
print("延迟回调 (1秒后)")
loop.call_later(1, delayed_callback)
# 在指定时间执行回调
current_time = loop.time()
def timed_callback():
print(f"定时回调 (在循环时间 {loop.time():.1f})")
loop.call_at(current_time + 2, timed_callback)
# 运行一段时间
await asyncio.sleep(2.5)
# 在不同线程中运行事件循环
def run_in_thread():
"""在新线程中运行事件循环"""
async def thread_task():
print(f"在线程 {threading.current_thread().name} 中运行")
await asyncio.sleep(1)
return "线程任务完成"
# 创建新的事件循环
loop = asyncio.new_event_loop()
try:
# 在新线程中运行事件循环
result = loop.run_until_complete(thread_task())
print(f"结果: {result}")
finally:
loop.close()
# 主程序
async def main():
print("=== 事件循环基础 ===")
await event_loop_demo()
print("\n=== 多线程事件循环 ===")
# 在主线程运行主任务
main_task = asyncio.create_task(asyncio.sleep(0.5, result="主任务"))
# 在新线程中运行事件循环
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.submit(run_in_thread)
# 等待主任务完成
result = await main_task
print(f"主程序: {result}")
asyncio.run(main())
第三部分:异步生成器与迭代
3.1 异步生成器
python
import asyncio
import random
# 异步生成器
async def async_generator(limit=5):
"""生成一系列数字"""
for i in range(limit):
await asyncio.sleep(random.uniform(0.1, 0.5)) # 模拟异步延迟
yield i
# 使用 async for 迭代异步生成器
async def process_async_gen():
print("开始处理异步生成器...")
# 使用 async for
async for value in async_generator(5):
print(f"收到值: {value}")
# 模拟处理
await asyncio.sleep(0.1)
print("处理完成!")
# 异步生成器表达式
async def async_gen_expression():
# 异步生成器表达式
gen = (i async for i in async_generator(3))
async for value in gen:
print(f"表达式生成: {value}")
# 异步生成器与普通生成器的混合
async def mixed_generators():
# 普通生成器
def sync_gen():
for i in range(3):
yield f"sync:{i}"
# 异步生成器
async def async_gen():
for i in range(3):
await asyncio.sleep(0.1)
yield f"async:{i}"
print("混合迭代:")
# 迭代普通生成器(同步)
for item in sync_gen():
print(f"同步: {item}")
# 迭代异步生成器
async for item in async_gen():
print(f"异步: {item}")
# 运行示例
async def main():
print("=== 异步生成器基础 ===")
await process_async_gen()
print("\n=== 异步生成器表达式 ===")
await async_gen_expression()
print("\n=== 混合生成器 ===")
await mixed_generators()
asyncio.run(main())
3.2 异步迭代器
python
import asyncio
from collections import deque
# 自定义异步迭代器
class AsyncDataStream:
"""异步数据流迭代器"""
def __init__(self, data_source):
self.data_source = data_source
self.index = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.data_source):
raise StopAsyncIteration
# 模拟异步获取数据
await asyncio.sleep(0.2)
item = self.data_source[self.index]
self.index += 1
return item
# 异步队列迭代器
class AsyncQueueIterator:
"""异步队列迭代器"""
def __init__(self, max_items=10):
self.queue = asyncio.Queue()
self.max_items = max_items
self.produced = 0
async def producer(self):
"""生产者协程"""
while self.produced < self.max_items:
item = f"item_{self.produced}"
await self.queue.put(item)
self.produced += 1
print(f"生产: {item}")
await asyncio.sleep(0.1)
# 发送结束信号
await self.queue.put(None)
def __aiter__(self):
return self
async def __anext__(self):
item = await self.queue.get()
if item is None:
# 将结束信号放回队列,以便其他消费者也能收到
await self.queue.put(None)
raise StopAsyncIteration
return item
# 使用示例
async def main():
print("=== 自定义异步迭代器 ===")
data = ["数据1", "数据2", "数据3", "数据4", "数据5"]
stream = AsyncDataStream(data)
async for item in stream:
print(f"从流中获取: {item}")
print("\n=== 异步队列迭代器 ===")
queue_iter = AsyncQueueIterator(5)
# 启动生产者
producer_task = asyncio.create_task(queue_iter.producer())
# 消费者1
async def consumer1():
async for item in queue_iter:
print(f"消费者1 收到: {item}")
await asyncio.sleep(0.15) # 模拟处理时间
# 消费者2
async def consumer2():
async for item in queue_iter:
print(f"消费者2 收到: {item}")
await asyncio.sleep(0.25) # 模拟处理时间
# 运行消费者
await asyncio.gather(
consumer1(),
consumer2(),
producer_task
)
print("所有消费完成!")
asyncio.run(main())
第四部分:并发与并行
4.1 asyncio 并发模式
python
import asyncio
import time
# 并发执行多个任务
async def concurrent_demo():
# 定义多个异步任务
async def task(name, delay):
print(f"{name} 开始")
await asyncio.sleep(delay)
print(f"{name} 完成")
return f"{name} 结果"
print("=== asyncio.gather() ===")
# 使用 gather 并发执行
results = await asyncio.gather(
task("任务A", 2),
task("任务B", 1),
task("任务C", 3),
return_exceptions=True # 防止一个任务失败影响其他
)
print(f"所有结果: {results}")
print("\n=== asyncio.wait() ===")
# 使用 wait 并发执行
tasks = [
asyncio.create_task(task("任务X", 1)),
asyncio.create_task(task("任务Y", 2)),
asyncio.create_task(task("任务Z", 0.5)),
]
# 等待所有任务完成
done, pending = await asyncio.wait(tasks, timeout=2.5)
print(f"已完成: {len(done)}")
print(f"未完成: {len(pending)}")
# 获取结果
for task in done:
print(f"任务结果: {task.result()}")
# 取消未完成的任务
for task in pending:
task.cancel()
# 限制并发数量
async def limited_concurrency():
semaphore = asyncio.Semaphore(2) # 限制同时运行2个
async def limited_task(name, delay):
async with semaphore:
print(f"{name} 获得信号量,开始执行")
await asyncio.sleep(delay)
print(f"{name} 释放信号量")
return f"{name} 完成"
# 创建多个任务
tasks = [
limited_task(f"任务{i}", i % 3 + 1)
for i in range(6)
]
# 并发执行
results = await asyncio.gather(*tasks)
print(f"所有任务完成: {results}")
# 运行示例
async def main():
print("=== 并发模式演示 ===")
await concurrent_demo()
print("\n=== 限制并发数量 ===")
await limited_concurrency()
asyncio.run(main())
4.2 多线程与异步的混合使用
python
import asyncio
import concurrent.futures
import threading
import time
# CPU密集型任务(不适合asyncio,适合线程池)
def cpu_intensive_task(n):
"""计算密集型任务"""
print(f"CPU任务开始 (线程: {threading.current_thread().name})")
result = sum(i * i for i in range(n))
time.sleep(1) # 模拟CPU计算
print(f"CPU任务完成: {result}")
return result
# IO密集型任务(适合asyncio)
async def io_intensive_task(name, delay):
"""IO密集型任务"""
print(f"IO任务 {name} 开始")
await asyncio.sleep(delay)
print(f"IO任务 {name} 完成")
return f"{name} 结果"
# 混合使用:在线程池中运行CPU密集型任务
async def mixed_execution():
print("=== 混合执行模式 ===")
# 创建线程池
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as pool:
# 在线程池中执行CPU密集型任务
cpu_future1 = loop.run_in_executor(pool, cpu_intensive_task, 1000000)
cpu_future2 = loop.run_in_executor(pool, cpu_intensive_task, 2000000)
# 同时执行IO密集型任务
io_task1 = asyncio.create_task(io_intensive_task("IO1", 1))
io_task2 = asyncio.create_task(io_intensive_task("IO2", 2))
# 等待所有任务完成
cpu_result1 = await cpu_future1
cpu_result2 = await cpu_future2
io_result1 = await io_task1
io_result2 = await io_task2
print(f"\n所有任务完成:")
print(f"CPU结果: {cpu_result1}, {cpu_result2}")
print(f"IO结果: {io_result1}, {io_result2}")
# 异步任务与同步函数的桥接
async def sync_async_bridge():
"""同步函数与异步代码的桥接"""
# 同步函数
def sync_function(x):
time.sleep(0.5)
return x * 2
# 将同步函数包装为异步
async def async_wrapper(func, *args):
loop = asyncio.get_running_loop()
# 在线程池中执行同步函数
return await loop.run_in_executor(None, func, *args)
print("\n=== 同步异步桥接 ===")
# 并发执行同步函数的异步版本
tasks = [
async_wrapper(sync_function, i)
for i in range(1, 6)
]
results = await asyncio.gather(*tasks)
print(f"桥接执行结果: {results}")
# 运行示例
async def main():
await mixed_execution()
await sync_async_bridge()
asyncio.run(main())
4.3 异步锁与同步原语
python
import asyncio
# 异步锁
async def async_lock_demo():
lock = asyncio.Lock()
shared_data = []
async def modify_data(name):
async with lock: # 获取锁
print(f"{name} 获取锁")
await asyncio.sleep(0.2) # 模拟操作
shared_data.append(f"数据来自 {name}")
print(f"{name} 修改数据: {shared_data}")
print(f"{name} 释放锁")
# 并发修改
tasks = [modify_data(f"任务{i}") for i in range(5)]
await asyncio.gather(*tasks)
print(f"最终数据: {shared_data}")
# 信号量
async def semaphore_demo():
semaphore = asyncio.Semaphore(2) # 最多2个同时访问
async def limited_access(name, delay):
async with semaphore:
print(f"{name} 获得访问权限")
await asyncio.sleep(delay)
print(f"{name} 释放访问权限")
return f"{name} 完成"
# 多个任务竞争访问
tasks = [
limited_access(f"访问者{i}", i % 2 + 1)
for i in range(6)
]
await asyncio.gather(*tasks)
# 事件
async def event_demo():
event = asyncio.Event()
async def waiter(name):
print(f"{name} 等待事件")
await event.wait()
print(f"{name} 收到事件")
async def setter():
await asyncio.sleep(1)
print("设置事件")
event.set()
# 创建等待者
waiters = [asyncio.create_task(waiter(f"等待者{i}")) for i in range(3)]
# 设置事件
await setter()
# 等待所有等待者完成
await asyncio.gather(*waiters)
# 运行示例
async def main():
print("=== 异步锁 ===")
await async_lock_demo()
print("\n=== 信号量 ===")
await semaphore_demo()
print("\n=== 事件 ===")
await event_demo()
asyncio.run(main())
第五部分:综合实战
将本文的Word文档下载到电脑
推荐度: