Python中级教程 第五课:异步编程与并发 2/2

浏览量:17 次 发布时间:2026-01-15 18:59 作者:明扬工控商城 下载docx

最近更新:Python中级教程 第三课:面向对象编程深入与装饰器

第五部分:综合实战

实战1:异步Web爬虫

python

import asyncio

import aiohttp

import async_timeout

from urllib.parse import urljoin

import re


class AsyncWebCrawler:

   def __init__(self, max_concurrent=5, timeout=10):

       self.max_concurrent = max_concurrent

       self.timeout = timeout

       self.semaphore = asyncio.Semaphore(max_concurrent)

       self.visited = set()

       self.results = []

   

   async def fetch_page(self, session, url):

       """异步获取页面内容"""

       async with self.semaphore:  # 限制并发数

           try:

               async with async_timeout.timeout(self.timeout):

                   async with session.get(url) as response:

                       if response.status == 200:

                           content = await response.text()

                           return content

                       else:

                           print(f"请求失败: {url} - 状态码: {response.status}")

                           return None

           except Exception as e:

               print(f"请求异常: {url} - {e}")

               return None

   

   def extract_links(self, html, base_url):

       """从HTML中提取链接"""

       if not html:

           return []

       

       # 简单的链接提取

       pattern = r'href="([^"]+)"'

       links = re.findall(pattern, html)

       

       # 转换为绝对URL

       absolute_links = []

       for link in links:

           absolute_link = urljoin(base_url, link)

           if absolute_link.startswith('http'):  # 只处理HTTP链接

               absolute_links.append(absolute_link)

       

       return absolute_links

   

   async def crawl(self, session, url, depth=2, max_pages=20):

       """递归爬取"""

       if (depth <= 0 or

           len(self.visited) >= max_pages or

           url in self.visited):

           return

       

       print(f"爬取: {url} (深度: {depth})")

       self.visited.add(url)

       

       # 获取页面

       html = await self.fetch_page(session, url)

       if not html:

           return

       

       # 提取标题

       title_match = re.search(r'<title>(.*?)</title>', html, re.IGNORECASE)

       title = title_match.group(1) if title_match else "无标题"

       

       # 保存结果

       self.results.append({

           'url': url,

           'title': title[:100],  # 截断长标题

           'length': len(html)

       })

       

       # 如果还有深度,提取并爬取链接

       if depth > 1:

           links = self.extract_links(html, url)

           

           # 为每个链接创建爬取任务

           tasks = [

               self.crawl(session, link, depth-1, max_pages)

               for link in links[:5]  # 限制每个页面只爬取5个链接

           ]

           

           # 并发爬取

           await asyncio.gather(*tasks)

   

   async def run(self, start_urls, max_depth=2, max_pages=20):

       """运行爬虫"""

       connector = aiohttp.TCPConnector(limit=self.max_concurrent)

       

       async with aiohttp.ClientSession(connector=connector) as session:

           tasks = [

               self.crawl(session, url, max_depth, max_pages)

               for url in start_urls

           ]

           

           await asyncio.gather(*tasks)

       

       return self.results


# 使用示例

async def main():

   crawler = AsyncWebCrawler(max_concurrent=3, timeout=5)

   

   start_urls = [

       'https://httpbin.org/html',

       'https://httpbin.org/links/10/0'

   ]

   

   print("开始异步爬取...")

   results = await crawler.run(start_urls, max_depth=2, max_pages=10)

   

   print(f"\n爬取完成! 共爬取 {len(results)} 个页面:")

   for i, result in enumerate(results, 1):

       print(f"{i}. {result['title']} - {result['url']} ({result['length']} 字节)")


# 注意:实际运行可能需要处理robots.txt和遵守网站规则

asyncio.run(main())

实战2:异步数据处理管道

python

import asyncio

import random

from dataclasses import dataclass

from typing import List, Optional

import json


@dataclass

class DataRecord:

   id: int

   value: float

   category: str

   timestamp: float


class AsyncDataPipeline:

   """异步数据处理管道"""

   

   def __init__(self, buffer_size=10):

       self.extract_queue = asyncio.Queue(maxsize=buffer_size)

       self.transform_queue = asyncio.Queue(maxsize=buffer_size)

       self.load_queue = asyncio.Queue(maxsize=buffer_size)

       self.processed_count = 0

       self.errors = []

   

   async def extract(self, num_records=100):

       """数据提取阶段"""

       print("开始数据提取...")

       

       for i in range(num_records):

           # 模拟数据生成

           record = DataRecord(

               id=i,

               value=random.uniform(0, 100),

               category=random.choice(['A', 'B', 'C', 'D']),

               timestamp=asyncio.get_event_loop().time()

           )

           

           await self.extract_queue.put(record)

           

           if i % 20 == 0:

               print(f"已生成 {i+1} 条记录")

           

           # 控制生成速度

           await asyncio.sleep(random.uniform(0.01, 0.05))

       

       # 发送结束信号

       await self.extract_queue.put(None)

       print("数据提取完成")

   

   async def transform(self, num_workers=3):

       """数据转换阶段"""

       print("开始数据转换...")

       

       async def worker(worker_id):

           while True:

               record = await self.extract_queue.get()

               

               if record is None:

                   # 将结束信号放回,让其他worker也能收到

                   await self.extract_queue.put(None)

                   await self.transform_queue.put(None)  # 传递结束信号

                   break

               

               try:

                   # 模拟数据转换

                   await asyncio.sleep(random.uniform(0.02, 0.1))

                   

                   # 转换逻辑

                   transformed = {

                       'id': record.id,

                       'value_squared': record.value ** 2,

                       'category': record.category,

                       'processed_at': asyncio.get_event_loop().time(),

                       'original_value': record.value

                   }

                   

                   await self.transform_queue.put(transformed)

                   

                   if record.id % 25 == 0:

                       print(f"Worker {worker_id} 转换记录 {record.id}")

               

               except Exception as e:

                   self.errors.append(f"转换错误 ID={record.id}: {e}")

                   print(f"Worker {worker_id} 转换失败: {e}")

               

               finally:

                   self.extract_queue.task_done()

       

       # 启动多个转换worker

       workers = [asyncio.create_task(worker(i)) for i in range(num_workers)]

       await asyncio.gather(*workers)

       print("数据转换完成")

   

   async def load(self, output_file="output.json"):

       """数据加载阶段"""

       print("开始数据加载...")

       

       all_data = []

       

       while True:

           data = await self.transform_queue.get()

           

           if data is None:

               # 将结束信号放回

               await self.transform_queue.put(None)

               break

           

           # 模拟数据加载

           await asyncio.sleep(random.uniform(0.01, 0.03))

           

           all_data.append(data)

           self.processed_count += 1

           

           if self.processed_count % 20 == 0:

               print(f"已加载 {self.processed_count} 条记录")

           

           self.transform_queue.task_done()

       

       # 写入文件

       with open(output_file, 'w') as f:

           json.dump(all_data, f, indent=2, default=str)

       

       print(f"数据加载完成,共处理 {self.processed_count} 条记录")

       print(f"错误数: {len(self.errors)}")

       

       if self.errors:

           print("前5个错误:")

           for error in self.errors[:5]:

               print(f"  - {error}")

   

   async def run_pipeline(self, num_records=100):

       """运行整个管道"""

       print("=== 启动异步数据处理管道 ===")

       

       # 创建并运行所有阶段

       extract_task = asyncio.create_task(self.extract(num_records))

       transform_task = asyncio.create_task(self.transform())

       load_task = asyncio.create_task(self.load())

       

       # 等待所有任务完成

       await asyncio.gather(extract_task, transform_task, load_task)

       

       print("=== 管道执行完成 ===")


# 使用示例

async def main():

   pipeline = AsyncDataPipeline(buffer_size=20)

   await pipeline.run_pipeline(num_records=50)


asyncio.run(main())

实战3:异步Web服务器

python

from aiohttp import web

import asyncio

import json

from datetime import datetime


class AsyncWebServer:

   """异步Web服务器"""

   

   def __init__(self):

       self.app = web.Application()

       self.setup_routes()

       self.request_count = 0

       self.cache = {}

   

   def setup_routes(self):

       """设置路由"""

       self.app.router.add_get('/', self.handle_root)

       self.app.router.add_get('/api/data', self.handle_get_data)

       self.app.router.add_post('/api/data', self.handle_post_data)

       self.app.router.add_get('/api/stats', self.handle_stats)

       self.app.router.add_get('/api/stream', self.handle_stream)

   

   async def handle_root(self, request):

       """根路径处理器"""

       return web.Response(

           text="欢迎使用异步Web服务器!",

           content_type='text/html'

       )

   

   async def handle_get_data(self, request):

       """获取数据"""

       # 模拟异步数据库查询

       await asyncio.sleep(0.1)

       

       data_id = request.query.get('id', 'default')

       

       if data_id in self.cache:

           data = self.cache[data_id]

           cached = True

       else:

           # 模拟生成数据

           data = {

               'id': data_id,

               'value': 42,

               'timestamp': datetime.now().isoformat(),

               'server': 'async-server'

           }

           self.cache[data_id] = data

           cached = False

       

       return web.json_response({

           'data': data,

           'cached': cached,

           'request_id': self.request_count

       })

   

   async def handle_post_data(self, request):

       """提交数据"""

       try:

           data = await request.json()

           

           # 模拟数据处理

           await asyncio.sleep(0.2)

           

           # 添加处理信息

           data['processed'] = True

           data['processed_at'] = datetime.now().isoformat()

           data['request_id'] = self.request_count

           

           return web.json_response({

               'status': 'success',

               'message': '数据已处理',

               'result': data

           })

       

       except json.JSONDecodeError:

           return web.json_response(

               {'error': '无效的JSON数据'},

               status=400

           )

   

   async def handle_stats(self, request):

       """获取统计信息"""

       stats = {

           'request_count': self.request_count,

           'cache_size': len(self.cache),

           'uptime': '运行中',

           'timestamp': datetime.now().isoformat()

       }

       

       return web.json_response(stats)

   

   async def handle_stream(self, request):

       """流式响应"""

       response = web.StreamResponse()

       response.headers['Content-Type'] = 'text/plain'

       await response.prepare(request)

       

       # 发送流式数据

       for i in range(10):

           data = f"数据块 {i+1} - 时间: {datetime.now().isoformat()}\n"

           await response.write(data.encode('utf-8'))

           await asyncio.sleep(0.5)  # 模拟延迟

       

       await response.write_eof()

       return response

   

   async def middleware(self, app, handler):

       """中间件:统计请求次数"""

       async def middleware_handler(request):

           self.request_count += 1

           print(f"请求 #{self.request_count}: {request.method} {request.path}")

           

           try:

               response = await handler(request)

               return response

           except web.HTTPException as ex:

               print(f"HTTP异常: {ex.status} {ex.reason}")

               raise

           except Exception as e:

               print(f"服务器错误: {e}")

               return web.json_response(

                   {'error': '内部服务器错误'},

                   status=500

               )

       

       return middleware_handler

   

   def run(self, host='127.0.0.1', port=8080):

       """运行服务器"""

       # 添加中间件

       self.app.middlewares.append(self.middleware)

       

       print(f"启动服务器在 http://{host}:{port}")

       print("可用端点:")

       print("  GET  /              - 欢迎页面")

       print("  GET  /api/data      - 获取数据 (?id=参数)")

       print("  POST /api/data      - 提交数据")

       print("  GET  /api/stats     - 服务器统计")

       print("  GET  /api/stream    - 流式响应")

       

       web.run_app(self.app, host=host, port=port)


# 客户端测试代码

async def test_client():

   """测试客户端"""

   import aiohttp

   

   async with aiohttp.ClientSession() as session:

       # 测试根路径

       async with session.get('http://127.0.0.1:8080/') as resp:

           print(f"根路径: {await resp.text()}")

       

       # 测试获取数据

       async with session.get('http://127.0.0.1:8080/api/data?id=test1') as resp:

           data = await resp.json()

           print(f"获取数据: {data}")

       

       # 测试提交数据

       async with session.post('http://127.0.0.1:8080/api/data',

                              json={'name': '测试', 'value': 123}) as resp:

           result = await resp.json()

           print(f"提交数据结果: {result}")

       

       # 测试统计

       async with session.get('http://127.0.0.1:8080/api/stats') as resp:

           stats = await resp.json()

           print(f"服务器统计: {stats}")


# 主程序

if __name__ == '__main__':

   server = AsyncWebServer()

   

   # 可以在单独的线程中运行客户端测试

   import threading

   import time

   

   def run_client():

       time.sleep(2)  # 等待服务器启动

       asyncio.run(test_client())

   

   # 启动客户端线程

   client_thread = threading.Thread(target=run_client, daemon=True)

   client_thread.start()

   

   # 运行服务器

   server.run()

练习作业

作业1:实现异步缓存系统

要求:


支持设置缓存过期时间


支持异步获取和设置缓存


实现LRU缓存淘汰策略


添加缓存统计功能


作业2:实现异步消息队列

要求:


支持发布/订阅模式


支持消息持久化


实现消息确认机制


支持消息优先级


作业3:实现异步任务调度器

要求:


支持定时任务(cron表达式)


支持一次性任务和循环任务


实现任务依赖关系


添加任务监控和错误处理


作业4:综合项目 - 异步监控系统

要求:


监控多个服务的状态


实现异步健康检查


支持告警规则配置


实现实时仪表盘


数据持久化和历史查询


学习要点总结

异步编程核心概念:


async/await语法


事件循环机制


协程与任务


Future对象


异步编程模式:


并发执行多个任务


异步上下文管理器


异步生成器和迭代器


异步锁和同步原语


asyncio核心组件:


事件循环操作


任务创建和管理


异步队列


信号量和锁


实际应用场景:


高并发网络应用


异步数据处理管道


实时监控系统


高性能爬虫


最佳实践:


正确使用async/await


合理控制并发数量


正确处理异常


避免阻塞操作


性能优化:


选择合适的并发模型


使用连接池


合理设置超时时间


监控和调试异步代码


常见问题解答

Q: 什么时候应该使用异步编程?

A: 当应用是I/O密集型(网络请求、文件操作、数据库查询)且需要高并发时。


Q: async/await和线程有什么区别?

A: async/await在单线程中通过协程切换实现并发,开销小;线程是多线程并行,适合CPU密集型任务。


Q: 如何在异步函数中调用同步函数?

A: 使用asyncio.to_thread()或loop.run_in_executor()在线程池中运行。


Q: 如何调试异步代码?

A: 使用asyncio.debug = True,或使用专门的异步调试工具。


Q: 异步编程有什么缺点?

A: 学习曲线较陡,代码结构复杂,调试困难,某些库可能不支持异步。


下一课预告

第六课将学习:


Python高级特性:元类、描述符


装饰器高级用法


性能优化与 profiling


内存管理与垃圾回收


Python设计模式深入


明扬工控商城

推荐阅读:

Python中级教程 第六课:高级特性与性能优化 2/2

Python中级教程 第六课:高级特性与性能优化 1/2

Python中级教程 第五课:异步编程与并发 2/2

Python中级教程 第五课:异步编程与并发 1/2

Python中级教程 第四课:上下文管理器、生成器与迭代器

Python中级教程 第三课:面向对象编程深入与装饰器

热门标签:
Python中级教程 第五课:异步编程与并发 2/2.docx

将本文的Word文档下载到电脑

推荐度:

下载

全部评论

请登录
产业新闻-明扬资讯网
科技资讯-明扬资讯网