Python中级教程 第七课:网络编程、并发与项目实践 3/7

浏览量:14 次 发布时间:2026-01-15 18:07 作者:明扬工控商城 下载docx

最近更新:Python中级教程 第三课:面向对象编程深入与装饰器

第三部分:并发编程深入

3.1 线程池与进程池

python

import concurrent.futures

import time

import math

import random

from functools import wraps


# 性能计时装饰器

def timer(func):

   @wraps(func)

   def wrapper(*args, **kwargs):

       start_time = time.time()

       result = func(*args, **kwargs)

       end_time = time.time()

       print(f"{func.__name__} 耗时: {end_time - start_time:.4f}秒")

       return result

   return wrapper


# CPU密集型任务

def cpu_intensive_task(n):

   """计算质数 - CPU密集型"""

   def is_prime(num):

       if num < 2:

           return False

       for i in range(2, int(math.sqrt(num)) + 1):

           if num % i == 0:

               return False

       return True

   

   primes = []

   for i in range(2, n + 1):

       if is_prime(i):

           primes.append(i)

   

   return primes[:10]  # 只返回前10个


# I/O密集型任务

def io_intensive_task(task_id, delay=1):

   """模拟I/O操作 - I/O密集型"""

   time.sleep(delay)

   return f"任务 {task_id} 完成,延迟 {delay}秒"


# 线程池示例

@timer

def thread_pool_demo(num_tasks=10):

   """使用线程池执行I/O密集型任务"""

   print(f"使用线程池执行 {num_tasks} 个I/O密集型任务")

   

   with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

       # 提交任务

       futures = [

           executor.submit(io_intensive_task, i, random.uniform(0.5, 2))

           for i in range(num_tasks)

       ]

       

       # 收集结果

       results = []

       for future in concurrent.futures.as_completed(futures):

           try:

               result = future.result()

               results.append(result)

               print(f"完成: {result}")

           except Exception as e:

               print(f"任务失败: {e}")

       

       return results


# 进程池示例

@timer

def process_pool_demo(num_tasks=5):

   """使用进程池执行CPU密集型任务"""

   print(f"使用进程池执行 {num_tasks} 个CPU密集型任务")

   

   with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:

       # 提交任务(计算不同范围的质数)

       futures = [

           executor.submit(cpu_intensive_task, n * 1000)

           for n in range(1, num_tasks + 1)

       ]

       

       # 收集结果

       results = []

       for future in concurrent.futures.as_completed(futures):

           try:

               result = future.result()

               results.append(result)

               print(f"找到质数: {len(result)} 个,示例: {result}")

           except Exception as e:

               print(f"任务失败: {e}")

       

       return results


# 混合使用线程池和进程池

@timer

def hybrid_pool_demo(num_cpu_tasks=3, num_io_tasks=6):

   """混合使用线程池和进程池"""

   print(f"混合执行 {num_cpu_tasks} 个CPU任务和 {num_io_tasks} 个I/O任务")

   

   results = {}

   

   # 使用进程池执行CPU密集型任务

   with concurrent.futures.ProcessPoolExecutor(max_workers=2) as process_executor:

       cpu_futures = [

           process_executor.submit(cpu_intensive_task, n * 2000)

           for n in range(1, num_cpu_tasks + 1)

       ]

       

       # 在线程池执行I/O密集型任务的同时等待CPU任务

       with concurrent.futures.ThreadPoolExecutor(max_workers=4) as thread_executor:

           io_futures = [

               thread_executor.submit(io_intensive_task, i, random.uniform(0.3, 1.5))

               for i in range(num_io_tasks)

           ]

           

           # 收集CPU任务结果

           results['cpu'] = []

           for future in concurrent.futures.as_completed(cpu_futures):

               try:

                   result = future.result()

                   results['cpu'].append(result)

                   print(f"CPU任务完成: 找到 {len(result)} 个质数")

               except Exception as e:

                   print(f"CPU任务失败: {e}")

           

           # 收集I/O任务结果

           results['io'] = []

           for future in concurrent.futures.as_completed(io_futures):

               try:

                   result = future.result()

                   results['io'].append(result)

                   print(f"I/O任务完成: {result}")

               except Exception as e:

                   print(f"I/O任务失败: {e}")

   

   return results


# 自定义线程池实现

class CustomThreadPool:

   """自定义线程池实现"""

   

   def __init__(self, max_workers=5):

       self.max_workers = max_workers

       self.workers = []

       self.task_queue = []

       self.results = {}

       self.lock = threading.Lock()

       self.running = False

   

   def submit(self, func, *args, **kwargs):

       """提交任务"""

       task_id = len(self.task_queue)

       task = {

           'id': task_id,

           'func': func,

           'args': args,

           'kwargs': kwargs

       }

       

       self.task_queue.append(task)

       return task_id

   

   def worker(self, worker_id):

       """工作线程函数"""

       while self.running:

           task = None

           

           # 从队列获取任务

           with self.lock:

               if self.task_queue:

                   task = self.task_queue.pop(0)

           

           if task:

               try:

                   # 执行任务

                   result = task['func'](*task['args'], **task['kwargs'])

                   

                   with self.lock:

                       self.results[task['id']] = {

                           'status': 'success',

                           'result': result,

                           'worker_id': worker_id

                       }

                   

                   print(f"工作线程 {worker_id} 完成任务 {task['id']}")

               

               except Exception as e:

                   with self.lock:

                       self.results[task['id']] = {

                           'status': 'error',

                           'error': str(e),

                           'worker_id': worker_id

                       }

                   

                   print(f"工作线程 {worker_id} 任务 {task['id']} 失败: {e}")

           

           else:

               # 队列为空,短暂等待

               time.sleep(0.1)

   

   def start(self):

       """启动线程池"""

       self.running = True

       self.workers = []

       

       for i in range(self.max_workers):

           worker = threading.Thread(target=self.worker, args=(i,), daemon=True)

           worker.start()

           self.workers.append(worker)

       

       print(f"自定义线程池启动,工作线程数: {self.max_workers}")

   

   def shutdown(self, wait=True):

       """关闭线程池"""

       self.running = False

       

       if wait:

           # 等待所有线程完成

           for worker in self.workers:

               worker.join()

       

       print("自定义线程池关闭")

   

   def get_result(self, task_id, timeout=None):

       """获取任务结果"""

       start_time = time.time()

       

       while True:

           with self.lock:

               if task_id in self.results:

                   result_info = self.results[task_id]

                   if result_info['status'] == 'success':

                       return result_info['result']

                   else:

                       raise Exception(f"任务失败: {result_info['error']}")

           

           if timeout and (time.time() - start_time) > timeout:

               raise TimeoutError(f"获取结果超时 (任务ID: {task_id})")

           

           time.sleep(0.1)

   

   def map(self, func, iterables):

       """类似于map的功能"""

       task_ids = []

       

       # 提交所有任务

       for item in iterables:

           if isinstance(item, tuple):

               task_id = self.submit(func, *item)

           else:

               task_id = self.submit(func, item)

           task_ids.append(task_id)

       

       # 收集结果

       results = []

       for task_id in task_ids:

           try:

               result = self.get_result(task_id, timeout=30)

               results.append(result)

           except Exception as e:

               results.append(e)

       

       return results


# 使用自定义线程池

def custom_thread_pool_demo():

   """演示自定义线程池"""

   print("\n=== 自定义线程池演示 ===")

   

   pool = CustomThreadPool(max_workers=3)

   pool.start()

   

   # 提交任务

   task_ids = []

   for i in range(10):

       delay = random.uniform(0.5, 1.5)

       task_id = pool.submit(io_intensive_task, i, delay)

       task_ids.append(task_id)

       print(f"提交任务 {task_id}")

   

   # 获取结果

   print("\n获取任务结果:")

   for task_id in task_ids:

       try:

           result = pool.get_result(task_id, timeout=5)

           print(f"任务 {task_id}: {result}")

       except Exception as e:

           print(f"任务 {task_id} 错误: {e}")

   

   # 使用map功能

   print("\n使用map功能:")

   tasks = [(i, random.uniform(0.2, 1)) for i in range(5)]

   results = pool.map(io_intensive_task, tasks)

   

   for i, result in enumerate(results):

       print(f"任务 {i}: {result}")

   

   # 关闭线程池

   pool.shutdown()


# 协程池

import asyncio


async def async_io_task(task_id, delay=1):

   """异步I/O任务"""

   await asyncio.sleep(delay)

   return f"异步任务 {task_id} 完成"


async def async_cpu_task(task_id, n=1000):

   """异步CPU任务(需要在线程池中执行)"""

   loop = asyncio.get_event_loop()

   

   # 在线程池中执行CPU密集型任务

   result = await loop.run_in_executor(

       None,  # 使用默认执行器(线程池)

       cpu_intensive_task,

       n

   )

   

   return f"异步CPU任务 {task_id} 完成,找到 {len(result)} 个质数"


class AsyncTaskPool:

   """异步任务池"""

   

   def __init__(self, max_concurrent=5):

       self.max_concurrent = max_concurrent

       self.semaphore = asyncio.Semaphore(max_concurrent)

       self.tasks = []

   

   async def run_task(self, coro_func, *args, **kwargs):

       """运行任务,控制并发数"""

       async with self.semaphore:

           return await coro_func(*args, **kwargs)

   

   async def run_all(self, tasks):

       """运行所有任务"""

       self.tasks = [

           self.run_task(func, *args, **kwargs)

           for func, args, kwargs in tasks

       ]

       

       results = await asyncio.gather(*self.tasks, return_exceptions=True)

       return results


async def async_pool_demo():

   """演示异步任务池"""

   print("\n=== 异步任务池演示 ===")

   

   pool = AsyncTaskPool(max_concurrent=3)

   

   # 准备任务

   tasks = []

   

   # 添加I/O任务

   for i in range(5):

       delay = random.uniform(0.5, 1.5)

       tasks.append((async_io_task, (i, delay), {}))

   

   # 添加CPU任务

   for i in range(3):

       n = random.randint(1000, 5000)

       tasks.append((async_cpu_task, (i, n), {}))

   

   print(f"准备执行 {len(tasks)} 个任务,最大并发数: 3")

   

   start_time = time.time()

   results = await pool.run_all(tasks)

   end_time = time.time()

   

   print(f"\n所有任务完成,总耗时: {end_time - start_time:.2f}秒")

   

   for i, result in enumerate(results):

       if isinstance(result, Exception):

           print(f"任务 {i} 失败: {result}")

       else:

           print(f"任务 {i}: {result}")


# 主程序

if __name__ == "__main__":

   import sys

   

   if len(sys.argv) > 1:

       mode = sys.argv[1]

       

       if mode == 'thread':

           thread_pool_demo(8)

       

       elif mode == 'process':

           process_pool_demo(4)

       

       elif mode == 'hybrid':

           hybrid_pool_demo(2, 6)

       

       elif mode == 'custom':

           custom_thread_pool_demo()

       

       elif mode == 'async':

           asyncio.run(async_pool_demo())

       

       else:

           print("可用模式: thread, process, hybrid, custom, async")

   else:

       # 运行所有演示

       print("=== 并发编程演示 ===")

       

       print("\n1. 线程池演示:")

       thread_pool_demo(8)

       

       print("\n2. 进程池演示:")

       process_pool_demo(4)

       

       print("\n3. 混合池演示:")

       hybrid_pool_demo(2, 6)

       

       print("\n4. 自定义线程池演示:")

       custom_thread_pool_demo()

       

       print("\n5. 异步任务池演示:")

       asyncio.run(async_pool_demo())

第四部分:数据结构与算法优化


明扬工控商城

推荐阅读:

Python中级教程 第六课:高级特性与性能优化 2/2

Python中级教程 第六课:高级特性与性能优化 1/2

Python中级教程 第五课:异步编程与并发 2/2

Python中级教程 第五课:异步编程与并发 1/2

Python中级教程 第四课:上下文管理器、生成器与迭代器

Python中级教程 第三课:面向对象编程深入与装饰器

热门标签:
Python中级教程 第七课:网络编程、并发与项目实践 3/7.docx

将本文的Word文档下载到电脑

推荐度:

下载

全部评论

请登录
产业新闻-明扬资讯网
科技资讯-明扬资讯网