最近更新:Python中级教程 第六课:高级特性与性能优化 1/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 2/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 1/2
2026-01-15
最近更新:Python中级教程 第四课:上下文管理器、生成器与迭代器
2026-01-15
最近更新:Python中级教程 第三课:面向对象编程深入与装饰器
2026-01-15
浏览量:14 次 发布时间:2026-01-15 18:07 作者:明扬工控商城 下载docx
最近更新:Python中级教程 第六课:高级特性与性能优化 1/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 2/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 1/2
2026-01-15
最近更新:Python中级教程 第四课:上下文管理器、生成器与迭代器
2026-01-15
最近更新:Python中级教程 第三课:面向对象编程深入与装饰器
2026-01-15
第三部分:并发编程深入
3.1 线程池与进程池
python
import concurrent.futures
import time
import math
import random
from functools import wraps
# 性能计时装饰器
def timer(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{func.__name__} 耗时: {end_time - start_time:.4f}秒")
return result
return wrapper
# CPU密集型任务
def cpu_intensive_task(n):
"""计算质数 - CPU密集型"""
def is_prime(num):
if num < 2:
return False
for i in range(2, int(math.sqrt(num)) + 1):
if num % i == 0:
return False
return True
primes = []
for i in range(2, n + 1):
if is_prime(i):
primes.append(i)
return primes[:10] # 只返回前10个
# I/O密集型任务
def io_intensive_task(task_id, delay=1):
"""模拟I/O操作 - I/O密集型"""
time.sleep(delay)
return f"任务 {task_id} 完成,延迟 {delay}秒"
# 线程池示例
@timer
def thread_pool_demo(num_tasks=10):
"""使用线程池执行I/O密集型任务"""
print(f"使用线程池执行 {num_tasks} 个I/O密集型任务")
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 提交任务
futures = [
executor.submit(io_intensive_task, i, random.uniform(0.5, 2))
for i in range(num_tasks)
]
# 收集结果
results = []
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
results.append(result)
print(f"完成: {result}")
except Exception as e:
print(f"任务失败: {e}")
return results
# 进程池示例
@timer
def process_pool_demo(num_tasks=5):
"""使用进程池执行CPU密集型任务"""
print(f"使用进程池执行 {num_tasks} 个CPU密集型任务")
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
# 提交任务(计算不同范围的质数)
futures = [
executor.submit(cpu_intensive_task, n * 1000)
for n in range(1, num_tasks + 1)
]
# 收集结果
results = []
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
results.append(result)
print(f"找到质数: {len(result)} 个,示例: {result}")
except Exception as e:
print(f"任务失败: {e}")
return results
# 混合使用线程池和进程池
@timer
def hybrid_pool_demo(num_cpu_tasks=3, num_io_tasks=6):
"""混合使用线程池和进程池"""
print(f"混合执行 {num_cpu_tasks} 个CPU任务和 {num_io_tasks} 个I/O任务")
results = {}
# 使用进程池执行CPU密集型任务
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as process_executor:
cpu_futures = [
process_executor.submit(cpu_intensive_task, n * 2000)
for n in range(1, num_cpu_tasks + 1)
]
# 在线程池执行I/O密集型任务的同时等待CPU任务
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as thread_executor:
io_futures = [
thread_executor.submit(io_intensive_task, i, random.uniform(0.3, 1.5))
for i in range(num_io_tasks)
]
# 收集CPU任务结果
results['cpu'] = []
for future in concurrent.futures.as_completed(cpu_futures):
try:
result = future.result()
results['cpu'].append(result)
print(f"CPU任务完成: 找到 {len(result)} 个质数")
except Exception as e:
print(f"CPU任务失败: {e}")
# 收集I/O任务结果
results['io'] = []
for future in concurrent.futures.as_completed(io_futures):
try:
result = future.result()
results['io'].append(result)
print(f"I/O任务完成: {result}")
except Exception as e:
print(f"I/O任务失败: {e}")
return results
# 自定义线程池实现
class CustomThreadPool:
"""自定义线程池实现"""
def __init__(self, max_workers=5):
self.max_workers = max_workers
self.workers = []
self.task_queue = []
self.results = {}
self.lock = threading.Lock()
self.running = False
def submit(self, func, *args, **kwargs):
"""提交任务"""
task_id = len(self.task_queue)
task = {
'id': task_id,
'func': func,
'args': args,
'kwargs': kwargs
}
self.task_queue.append(task)
return task_id
def worker(self, worker_id):
"""工作线程函数"""
while self.running:
task = None
# 从队列获取任务
with self.lock:
if self.task_queue:
task = self.task_queue.pop(0)
if task:
try:
# 执行任务
result = task['func'](*task['args'], **task['kwargs'])
with self.lock:
self.results[task['id']] = {
'status': 'success',
'result': result,
'worker_id': worker_id
}
print(f"工作线程 {worker_id} 完成任务 {task['id']}")
except Exception as e:
with self.lock:
self.results[task['id']] = {
'status': 'error',
'error': str(e),
'worker_id': worker_id
}
print(f"工作线程 {worker_id} 任务 {task['id']} 失败: {e}")
else:
# 队列为空,短暂等待
time.sleep(0.1)
def start(self):
"""启动线程池"""
self.running = True
self.workers = []
for i in range(self.max_workers):
worker = threading.Thread(target=self.worker, args=(i,), daemon=True)
worker.start()
self.workers.append(worker)
print(f"自定义线程池启动,工作线程数: {self.max_workers}")
def shutdown(self, wait=True):
"""关闭线程池"""
self.running = False
if wait:
# 等待所有线程完成
for worker in self.workers:
worker.join()
print("自定义线程池关闭")
def get_result(self, task_id, timeout=None):
"""获取任务结果"""
start_time = time.time()
while True:
with self.lock:
if task_id in self.results:
result_info = self.results[task_id]
if result_info['status'] == 'success':
return result_info['result']
else:
raise Exception(f"任务失败: {result_info['error']}")
if timeout and (time.time() - start_time) > timeout:
raise TimeoutError(f"获取结果超时 (任务ID: {task_id})")
time.sleep(0.1)
def map(self, func, iterables):
"""类似于map的功能"""
task_ids = []
# 提交所有任务
for item in iterables:
if isinstance(item, tuple):
task_id = self.submit(func, *item)
else:
task_id = self.submit(func, item)
task_ids.append(task_id)
# 收集结果
results = []
for task_id in task_ids:
try:
result = self.get_result(task_id, timeout=30)
results.append(result)
except Exception as e:
results.append(e)
return results
# 使用自定义线程池
def custom_thread_pool_demo():
"""演示自定义线程池"""
print("\n=== 自定义线程池演示 ===")
pool = CustomThreadPool(max_workers=3)
pool.start()
# 提交任务
task_ids = []
for i in range(10):
delay = random.uniform(0.5, 1.5)
task_id = pool.submit(io_intensive_task, i, delay)
task_ids.append(task_id)
print(f"提交任务 {task_id}")
# 获取结果
print("\n获取任务结果:")
for task_id in task_ids:
try:
result = pool.get_result(task_id, timeout=5)
print(f"任务 {task_id}: {result}")
except Exception as e:
print(f"任务 {task_id} 错误: {e}")
# 使用map功能
print("\n使用map功能:")
tasks = [(i, random.uniform(0.2, 1)) for i in range(5)]
results = pool.map(io_intensive_task, tasks)
for i, result in enumerate(results):
print(f"任务 {i}: {result}")
# 关闭线程池
pool.shutdown()
# 协程池
import asyncio
async def async_io_task(task_id, delay=1):
"""异步I/O任务"""
await asyncio.sleep(delay)
return f"异步任务 {task_id} 完成"
async def async_cpu_task(task_id, n=1000):
"""异步CPU任务(需要在线程池中执行)"""
loop = asyncio.get_event_loop()
# 在线程池中执行CPU密集型任务
result = await loop.run_in_executor(
None, # 使用默认执行器(线程池)
cpu_intensive_task,
n
)
return f"异步CPU任务 {task_id} 完成,找到 {len(result)} 个质数"
class AsyncTaskPool:
"""异步任务池"""
def __init__(self, max_concurrent=5):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self.tasks = []
async def run_task(self, coro_func, *args, **kwargs):
"""运行任务,控制并发数"""
async with self.semaphore:
return await coro_func(*args, **kwargs)
async def run_all(self, tasks):
"""运行所有任务"""
self.tasks = [
self.run_task(func, *args, **kwargs)
for func, args, kwargs in tasks
]
results = await asyncio.gather(*self.tasks, return_exceptions=True)
return results
async def async_pool_demo():
"""演示异步任务池"""
print("\n=== 异步任务池演示 ===")
pool = AsyncTaskPool(max_concurrent=3)
# 准备任务
tasks = []
# 添加I/O任务
for i in range(5):
delay = random.uniform(0.5, 1.5)
tasks.append((async_io_task, (i, delay), {}))
# 添加CPU任务
for i in range(3):
n = random.randint(1000, 5000)
tasks.append((async_cpu_task, (i, n), {}))
print(f"准备执行 {len(tasks)} 个任务,最大并发数: 3")
start_time = time.time()
results = await pool.run_all(tasks)
end_time = time.time()
print(f"\n所有任务完成,总耗时: {end_time - start_time:.2f}秒")
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务 {i} 失败: {result}")
else:
print(f"任务 {i}: {result}")
# 主程序
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
mode = sys.argv[1]
if mode == 'thread':
thread_pool_demo(8)
elif mode == 'process':
process_pool_demo(4)
elif mode == 'hybrid':
hybrid_pool_demo(2, 6)
elif mode == 'custom':
custom_thread_pool_demo()
elif mode == 'async':
asyncio.run(async_pool_demo())
else:
print("可用模式: thread, process, hybrid, custom, async")
else:
# 运行所有演示
print("=== 并发编程演示 ===")
print("\n1. 线程池演示:")
thread_pool_demo(8)
print("\n2. 进程池演示:")
process_pool_demo(4)
print("\n3. 混合池演示:")
hybrid_pool_demo(2, 6)
print("\n4. 自定义线程池演示:")
custom_thread_pool_demo()
print("\n5. 异步任务池演示:")
asyncio.run(async_pool_demo())
第四部分:数据结构与算法优化
将本文的Word文档下载到电脑
推荐度: