最近更新:Python中级教程 第六课:高级特性与性能优化 1/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 2/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 1/2
2026-01-15
最近更新:Python中级教程 第四课:上下文管理器、生成器与迭代器
2026-01-15
最近更新:Python中级教程 第三课:面向对象编程深入与装饰器
2026-01-15
浏览量:17 次 发布时间:2026-01-15 18:59 作者:明扬工控商城 下载docx
最近更新:Python中级教程 第六课:高级特性与性能优化 1/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 2/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 1/2
2026-01-15
最近更新:Python中级教程 第四课:上下文管理器、生成器与迭代器
2026-01-15
最近更新:Python中级教程 第三课:面向对象编程深入与装饰器
2026-01-15
第五部分:综合实战
实战1:异步Web爬虫
python
import asyncio
import aiohttp
import async_timeout
from urllib.parse import urljoin
import re
class AsyncWebCrawler:
def __init__(self, max_concurrent=5, timeout=10):
self.max_concurrent = max_concurrent
self.timeout = timeout
self.semaphore = asyncio.Semaphore(max_concurrent)
self.visited = set()
self.results = []
async def fetch_page(self, session, url):
"""异步获取页面内容"""
async with self.semaphore: # 限制并发数
try:
async with async_timeout.timeout(self.timeout):
async with session.get(url) as response:
if response.status == 200:
content = await response.text()
return content
else:
print(f"请求失败: {url} - 状态码: {response.status}")
return None
except Exception as e:
print(f"请求异常: {url} - {e}")
return None
def extract_links(self, html, base_url):
"""从HTML中提取链接"""
if not html:
return []
# 简单的链接提取
pattern = r'href="([^"]+)"'
links = re.findall(pattern, html)
# 转换为绝对URL
absolute_links = []
for link in links:
absolute_link = urljoin(base_url, link)
if absolute_link.startswith('http'): # 只处理HTTP链接
absolute_links.append(absolute_link)
return absolute_links
async def crawl(self, session, url, depth=2, max_pages=20):
"""递归爬取"""
if (depth <= 0 or
len(self.visited) >= max_pages or
url in self.visited):
return
print(f"爬取: {url} (深度: {depth})")
self.visited.add(url)
# 获取页面
html = await self.fetch_page(session, url)
if not html:
return
# 提取标题
title_match = re.search(r'<title>(.*?)</title>', html, re.IGNORECASE)
title = title_match.group(1) if title_match else "无标题"
# 保存结果
self.results.append({
'url': url,
'title': title[:100], # 截断长标题
'length': len(html)
})
# 如果还有深度,提取并爬取链接
if depth > 1:
links = self.extract_links(html, url)
# 为每个链接创建爬取任务
tasks = [
self.crawl(session, link, depth-1, max_pages)
for link in links[:5] # 限制每个页面只爬取5个链接
]
# 并发爬取
await asyncio.gather(*tasks)
async def run(self, start_urls, max_depth=2, max_pages=20):
"""运行爬虫"""
connector = aiohttp.TCPConnector(limit=self.max_concurrent)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [
self.crawl(session, url, max_depth, max_pages)
for url in start_urls
]
await asyncio.gather(*tasks)
return self.results
# 使用示例
async def main():
crawler = AsyncWebCrawler(max_concurrent=3, timeout=5)
start_urls = [
'https://httpbin.org/html',
'https://httpbin.org/links/10/0'
]
print("开始异步爬取...")
results = await crawler.run(start_urls, max_depth=2, max_pages=10)
print(f"\n爬取完成! 共爬取 {len(results)} 个页面:")
for i, result in enumerate(results, 1):
print(f"{i}. {result['title']} - {result['url']} ({result['length']} 字节)")
# 注意:实际运行可能需要处理robots.txt和遵守网站规则
asyncio.run(main())
实战2:异步数据处理管道
python
import asyncio
import random
from dataclasses import dataclass
from typing import List, Optional
import json
@dataclass
class DataRecord:
id: int
value: float
category: str
timestamp: float
class AsyncDataPipeline:
"""异步数据处理管道"""
def __init__(self, buffer_size=10):
self.extract_queue = asyncio.Queue(maxsize=buffer_size)
self.transform_queue = asyncio.Queue(maxsize=buffer_size)
self.load_queue = asyncio.Queue(maxsize=buffer_size)
self.processed_count = 0
self.errors = []
async def extract(self, num_records=100):
"""数据提取阶段"""
print("开始数据提取...")
for i in range(num_records):
# 模拟数据生成
record = DataRecord(
id=i,
value=random.uniform(0, 100),
category=random.choice(['A', 'B', 'C', 'D']),
timestamp=asyncio.get_event_loop().time()
)
await self.extract_queue.put(record)
if i % 20 == 0:
print(f"已生成 {i+1} 条记录")
# 控制生成速度
await asyncio.sleep(random.uniform(0.01, 0.05))
# 发送结束信号
await self.extract_queue.put(None)
print("数据提取完成")
async def transform(self, num_workers=3):
"""数据转换阶段"""
print("开始数据转换...")
async def worker(worker_id):
while True:
record = await self.extract_queue.get()
if record is None:
# 将结束信号放回,让其他worker也能收到
await self.extract_queue.put(None)
await self.transform_queue.put(None) # 传递结束信号
break
try:
# 模拟数据转换
await asyncio.sleep(random.uniform(0.02, 0.1))
# 转换逻辑
transformed = {
'id': record.id,
'value_squared': record.value ** 2,
'category': record.category,
'processed_at': asyncio.get_event_loop().time(),
'original_value': record.value
}
await self.transform_queue.put(transformed)
if record.id % 25 == 0:
print(f"Worker {worker_id} 转换记录 {record.id}")
except Exception as e:
self.errors.append(f"转换错误 ID={record.id}: {e}")
print(f"Worker {worker_id} 转换失败: {e}")
finally:
self.extract_queue.task_done()
# 启动多个转换worker
workers = [asyncio.create_task(worker(i)) for i in range(num_workers)]
await asyncio.gather(*workers)
print("数据转换完成")
async def load(self, output_file="output.json"):
"""数据加载阶段"""
print("开始数据加载...")
all_data = []
while True:
data = await self.transform_queue.get()
if data is None:
# 将结束信号放回
await self.transform_queue.put(None)
break
# 模拟数据加载
await asyncio.sleep(random.uniform(0.01, 0.03))
all_data.append(data)
self.processed_count += 1
if self.processed_count % 20 == 0:
print(f"已加载 {self.processed_count} 条记录")
self.transform_queue.task_done()
# 写入文件
with open(output_file, 'w') as f:
json.dump(all_data, f, indent=2, default=str)
print(f"数据加载完成,共处理 {self.processed_count} 条记录")
print(f"错误数: {len(self.errors)}")
if self.errors:
print("前5个错误:")
for error in self.errors[:5]:
print(f" - {error}")
async def run_pipeline(self, num_records=100):
"""运行整个管道"""
print("=== 启动异步数据处理管道 ===")
# 创建并运行所有阶段
extract_task = asyncio.create_task(self.extract(num_records))
transform_task = asyncio.create_task(self.transform())
load_task = asyncio.create_task(self.load())
# 等待所有任务完成
await asyncio.gather(extract_task, transform_task, load_task)
print("=== 管道执行完成 ===")
# 使用示例
async def main():
pipeline = AsyncDataPipeline(buffer_size=20)
await pipeline.run_pipeline(num_records=50)
asyncio.run(main())
实战3:异步Web服务器
python
from aiohttp import web
import asyncio
import json
from datetime import datetime
class AsyncWebServer:
"""异步Web服务器"""
def __init__(self):
self.app = web.Application()
self.setup_routes()
self.request_count = 0
self.cache = {}
def setup_routes(self):
"""设置路由"""
self.app.router.add_get('/', self.handle_root)
self.app.router.add_get('/api/data', self.handle_get_data)
self.app.router.add_post('/api/data', self.handle_post_data)
self.app.router.add_get('/api/stats', self.handle_stats)
self.app.router.add_get('/api/stream', self.handle_stream)
async def handle_root(self, request):
"""根路径处理器"""
return web.Response(
text="欢迎使用异步Web服务器!",
content_type='text/html'
)
async def handle_get_data(self, request):
"""获取数据"""
# 模拟异步数据库查询
await asyncio.sleep(0.1)
data_id = request.query.get('id', 'default')
if data_id in self.cache:
data = self.cache[data_id]
cached = True
else:
# 模拟生成数据
data = {
'id': data_id,
'value': 42,
'timestamp': datetime.now().isoformat(),
'server': 'async-server'
}
self.cache[data_id] = data
cached = False
return web.json_response({
'data': data,
'cached': cached,
'request_id': self.request_count
})
async def handle_post_data(self, request):
"""提交数据"""
try:
data = await request.json()
# 模拟数据处理
await asyncio.sleep(0.2)
# 添加处理信息
data['processed'] = True
data['processed_at'] = datetime.now().isoformat()
data['request_id'] = self.request_count
return web.json_response({
'status': 'success',
'message': '数据已处理',
'result': data
})
except json.JSONDecodeError:
return web.json_response(
{'error': '无效的JSON数据'},
status=400
)
async def handle_stats(self, request):
"""获取统计信息"""
stats = {
'request_count': self.request_count,
'cache_size': len(self.cache),
'uptime': '运行中',
'timestamp': datetime.now().isoformat()
}
return web.json_response(stats)
async def handle_stream(self, request):
"""流式响应"""
response = web.StreamResponse()
response.headers['Content-Type'] = 'text/plain'
await response.prepare(request)
# 发送流式数据
for i in range(10):
data = f"数据块 {i+1} - 时间: {datetime.now().isoformat()}\n"
await response.write(data.encode('utf-8'))
await asyncio.sleep(0.5) # 模拟延迟
await response.write_eof()
return response
async def middleware(self, app, handler):
"""中间件:统计请求次数"""
async def middleware_handler(request):
self.request_count += 1
print(f"请求 #{self.request_count}: {request.method} {request.path}")
try:
response = await handler(request)
return response
except web.HTTPException as ex:
print(f"HTTP异常: {ex.status} {ex.reason}")
raise
except Exception as e:
print(f"服务器错误: {e}")
return web.json_response(
{'error': '内部服务器错误'},
status=500
)
return middleware_handler
def run(self, host='127.0.0.1', port=8080):
"""运行服务器"""
# 添加中间件
self.app.middlewares.append(self.middleware)
print(f"启动服务器在 http://{host}:{port}")
print("可用端点:")
print(" GET / - 欢迎页面")
print(" GET /api/data - 获取数据 (?id=参数)")
print(" POST /api/data - 提交数据")
print(" GET /api/stats - 服务器统计")
print(" GET /api/stream - 流式响应")
web.run_app(self.app, host=host, port=port)
# 客户端测试代码
async def test_client():
"""测试客户端"""
import aiohttp
async with aiohttp.ClientSession() as session:
# 测试根路径
async with session.get('http://127.0.0.1:8080/') as resp:
print(f"根路径: {await resp.text()}")
# 测试获取数据
async with session.get('http://127.0.0.1:8080/api/data?id=test1') as resp:
data = await resp.json()
print(f"获取数据: {data}")
# 测试提交数据
async with session.post('http://127.0.0.1:8080/api/data',
json={'name': '测试', 'value': 123}) as resp:
result = await resp.json()
print(f"提交数据结果: {result}")
# 测试统计
async with session.get('http://127.0.0.1:8080/api/stats') as resp:
stats = await resp.json()
print(f"服务器统计: {stats}")
# 主程序
if __name__ == '__main__':
server = AsyncWebServer()
# 可以在单独的线程中运行客户端测试
import threading
import time
def run_client():
time.sleep(2) # 等待服务器启动
asyncio.run(test_client())
# 启动客户端线程
client_thread = threading.Thread(target=run_client, daemon=True)
client_thread.start()
# 运行服务器
server.run()
练习作业
作业1:实现异步缓存系统
要求:
支持设置缓存过期时间
支持异步获取和设置缓存
实现LRU缓存淘汰策略
添加缓存统计功能
作业2:实现异步消息队列
要求:
支持发布/订阅模式
支持消息持久化
实现消息确认机制
支持消息优先级
作业3:实现异步任务调度器
要求:
支持定时任务(cron表达式)
支持一次性任务和循环任务
实现任务依赖关系
添加任务监控和错误处理
作业4:综合项目 - 异步监控系统
要求:
监控多个服务的状态
实现异步健康检查
支持告警规则配置
实现实时仪表盘
数据持久化和历史查询
学习要点总结
异步编程核心概念:
async/await语法
事件循环机制
协程与任务
Future对象
异步编程模式:
并发执行多个任务
异步上下文管理器
异步生成器和迭代器
异步锁和同步原语
asyncio核心组件:
事件循环操作
任务创建和管理
异步队列
信号量和锁
实际应用场景:
高并发网络应用
异步数据处理管道
实时监控系统
高性能爬虫
最佳实践:
正确使用async/await
合理控制并发数量
正确处理异常
避免阻塞操作
性能优化:
选择合适的并发模型
使用连接池
合理设置超时时间
监控和调试异步代码
常见问题解答
Q: 什么时候应该使用异步编程?
A: 当应用是I/O密集型(网络请求、文件操作、数据库查询)且需要高并发时。
Q: async/await和线程有什么区别?
A: async/await在单线程中通过协程切换实现并发,开销小;线程是多线程并行,适合CPU密集型任务。
Q: 如何在异步函数中调用同步函数?
A: 使用asyncio.to_thread()或loop.run_in_executor()在线程池中运行。
Q: 如何调试异步代码?
A: 使用asyncio.debug = True,或使用专门的异步调试工具。
Q: 异步编程有什么缺点?
A: 学习曲线较陡,代码结构复杂,调试困难,某些库可能不支持异步。
下一课预告
第六课将学习:
Python高级特性:元类、描述符
装饰器高级用法
性能优化与 profiling
内存管理与垃圾回收
Python设计模式深入
将本文的Word文档下载到电脑
推荐度: