Python中级教程 第七课:网络编程、并发与项目实践 2/7

浏览量:16 次 发布时间:2026-01-15 18:08 作者:明扬工控商城 下载docx

最近更新:Python中级教程 第三课:面向对象编程深入与装饰器

2.2 HTTP客户端

python

import requests

import json

import time


class APIClient:

   """API客户端"""

   

   def __init__(self, base_url='http://localhost:8000'):

       self.base_url = base_url

       self.session = requests.Session()

       self.session.headers.update({

           'Content-Type': 'application/json',

           'User-Agent': 'Python APIClient/1.0'

       })

   

   def make_request(self, method, endpoint, data=None, params=None):

       """发送HTTP请求"""

       url = f"{self.base_url}/{endpoint.lstrip('/')}"

       

       try:

           response = self.session.request(

               method=method,

               url=url,

               json=data,

               params=params,

               timeout=10

           )

           

           response.raise_for_status()  # 如果状态码不是200,抛出异常

           

           if response.headers.get('Content-Type', '').startswith('application/json'):

               return response.json()

           else:

               return response.text

       

       except requests.exceptions.RequestException as e:

           print(f"请求失败: {e}")

           if hasattr(e, 'response') and e.response is not None:

               print(f"响应状态码: {e.response.status_code}")

               try:

                   print(f"响应内容: {e.response.json()}")

               except:

                   print(f"响应内容: {e.response.text}")

           return None

   

   def get_api_info(self):

       """获取API信息"""

       return self.make_request('GET', '/')

   

   def get_users(self):

       """获取所有用户"""

       return self.make_request('GET', '/users')

   

   def get_user(self, user_id):

       """获取特定用户"""

       return self.make_request('GET', f'/users/{user_id}')

   

   def create_user(self, name, email):

       """创建新用户"""

       data = {'name': name, 'email': email}

       return self.make_request('POST', '/users', data=data)

   

   def update_user(self, user_id, name=None, email=None):

       """更新用户"""

       data = {}

       if name:

           data['name'] = name

       if email:

           data['email'] = email

       

       return self.make_request('PUT', f'/users/{user_id}', data=data)

   

   def delete_user(self, user_id):

       """删除用户"""

       return self.make_request('DELETE', f'/users/{user_id}')

   

   def get_posts(self):

       """获取所有文章"""

       return self.make_request('GET', '/posts')

   

   def get_post(self, post_id):

       """获取特定文章"""

       return self.make_request('GET', f'/posts/{post_id}')

   

   def create_post(self, title, content, author_id):

       """创建新文章"""

       data = {'title': title, 'content': content, 'author_id': author_id}

       return self.make_request('POST', '/posts', data=data)


# 客户端使用示例

def client_demo():

   """演示客户端使用"""

   client = APIClient('http://localhost:8000')

   

   print("=== API客户端演示 ===")

   

   # 1. 获取API信息

   print("\n1. 获取API信息:")

   api_info = client.get_api_info()

   if api_info:

       print(json.dumps(api_info, indent=2, ensure_ascii=False))

   

   # 2. 获取所有用户

   print("\n2. 获取所有用户:")

   users = client.get_users()

   if users:

       print(json.dumps(users, indent=2, ensure_ascii=False))

   

   # 3. 创建新用户

   print("\n3. 创建新用户:")

   new_user = client.create_user("王五", "wangwu@example.com")

   if new_user:

       print(json.dumps(new_user, indent=2, ensure_ascii=False))

   

   # 4. 更新用户

   print("\n4. 更新用户:")

   updated_user = client.update_user(3, name="王五(更新)", email="wangwu_updated@example.com")

   if updated_user:

       print(json.dumps(updated_user, indent=2, ensure_ascii=False))

   

   # 5. 创建文章

   print("\n5. 创建文章:")

   new_post = client.create_post(

       title="新文章",

       content="这是新创建的文章内容",

       author_id=3

   )

   if new_post:

       print(json.dumps(new_post, indent=2, ensure_ascii=False))

   

   # 6. 删除用户

   print("\n6. 删除用户:")

   deleted = client.delete_user(3)

   if deleted:

       print(json.dumps(deleted, indent=2, ensure_ascii=False))

   

   # 7. 最终用户列表

   print("\n7. 最终用户列表:")

   final_users = client.get_users()

   if final_users:

       print(json.dumps(final_users, indent=2, ensure_ascii=False))


# 并发测试客户端

class ConcurrentAPITester:

   """并发API测试器"""

   

   def __init__(self, base_url, num_threads=10):

       self.base_url = base_url

       self.num_threads = num_threads

       self.results = []

       self.errors = []

   

   def worker(self, worker_id):

       """工作线程函数"""

       client = APIClient(self.base_url)

       

       for i in range(5):  # 每个线程执行5次操作

           try:

               # 随机执行不同的操作

               import random

               operation = random.choice(['get_users', 'get_posts', 'create_user', 'create_post'])

               

               start_time = time.time()

               

               if operation == 'get_users':

                   result = client.get_users()

                   status = 'success' if result else 'failed'

               

               elif operation == 'get_posts':

                   result = client.get_posts()

                   status = 'success' if result else 'failed'

               

               elif operation == 'create_user':

                   name = f"并发用户_{worker_id}_{i}"

                   email = f"user_{worker_id}_{i}@example.com"

                   result = client.create_user(name, email)

                   status = 'success' if result else 'failed'

               

               elif operation == 'create_post':

                   result = client.create_post(

                       title=f"并发文章_{worker_id}_{i}",

                       content=f"内容_{worker_id}_{i}",

                       author_id=random.randint(1, 2)

                   )

                   status = 'success' if result else 'failed'

               

               end_time = time.time()

               duration = end_time - start_time

               

               self.results.append({

                   'worker_id': worker_id,

                   'operation': operation,

                   'status': status,

                   'duration': duration,

                   'timestamp': time.time()

               })

               

               time.sleep(random.uniform(0.1, 0.5))  # 随机延迟

           

           except Exception as e:

               self.errors.append({

                   'worker_id': worker_id,

                   'error': str(e),

                   'timestamp': time.time()

               })

   

   def run(self):

       """运行并发测试"""

       import threading

       

       print(f"开始并发测试,使用 {self.num_threads} 个线程...")

       start_time = time.time()

       

       threads = []

       for i in range(self.num_threads):

           thread = threading.Thread(target=self.worker, args=(i,))

           threads.append(thread)

           thread.start()

       

       # 等待所有线程完成

       for thread in threads:

           thread.join()

       

       end_time = time.time()

       total_duration = end_time - start_time

       

       # 统计结果

       successful = sum(1 for r in self.results if r['status'] == 'success')

       failed = sum(1 for r in self.results if r['status'] == 'failed')

       

       # 按操作类型统计

       operations = {}

       for r in self.results:

           op = r['operation']

           if op not in operations:

               operations[op] = {'success': 0, 'failed': 0, 'total_duration': 0}

           

           operations[op]['success' if r['status'] == 'success' else 'failed'] += 1

           operations[op]['total_duration'] += r['duration']

       

       print("\n=== 并发测试结果 ===")

       print(f"总耗时: {total_duration:.2f}秒")

       print(f"总请求数: {len(self.results)}")

       print(f"成功: {successful}")

       print(f"失败: {failed}")

       print(f"错误数: {len(self.errors)}")

       

       print("\n按操作类型统计:")

       for op, stats in operations.items():

           avg_duration = stats['total_duration'] / (stats['success'] + stats['failed'])

           print(f"  {op}:")

           print(f"    成功: {stats['success']}")

           print(f"    失败: {stats['failed']}")

           print(f"    平均耗时: {avg_duration:.3f}秒")

       

       if self.errors:

           print("\n错误列表 (前5个):")

           for error in self.errors[:5]:

               print(f"  线程{error['worker_id']}: {error['error']}")


if __name__ == "__main__":

   import sys

   

   if len(sys.argv) > 1 and sys.argv[1] == 'test':

       # 运行并发测试

       tester = ConcurrentAPITester('http://localhost:8000', num_threads=5)

       tester.run()

   else:

       # 运行单客户端演示

       client_demo()

第三部分:并发编程深入



明扬工控商城

推荐阅读:

Python中级教程 第六课:高级特性与性能优化 2/2

Python中级教程 第六课:高级特性与性能优化 1/2

Python中级教程 第五课:异步编程与并发 2/2

Python中级教程 第五课:异步编程与并发 1/2

Python中级教程 第四课:上下文管理器、生成器与迭代器

Python中级教程 第三课:面向对象编程深入与装饰器

热门标签:
Python中级教程 第七课:网络编程、并发与项目实践 2/7.docx

将本文的Word文档下载到电脑

推荐度:

下载

全部评论

请登录
产业新闻-明扬资讯网
科技资讯-明扬资讯网