最近更新:Python中级教程 第六课:高级特性与性能优化 1/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 2/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 1/2
2026-01-15
最近更新:Python中级教程 第四课:上下文管理器、生成器与迭代器
2026-01-15
最近更新:Python中级教程 第三课:面向对象编程深入与装饰器
2026-01-15
浏览量:16 次 发布时间:2026-01-15 18:08 作者:明扬工控商城 下载docx
最近更新:Python中级教程 第六课:高级特性与性能优化 1/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 2/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 1/2
2026-01-15
最近更新:Python中级教程 第四课:上下文管理器、生成器与迭代器
2026-01-15
最近更新:Python中级教程 第三课:面向对象编程深入与装饰器
2026-01-15
2.2 HTTP客户端
python
import requests
import json
import time
class APIClient:
"""API客户端"""
def __init__(self, base_url='http://localhost:8000'):
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'User-Agent': 'Python APIClient/1.0'
})
def make_request(self, method, endpoint, data=None, params=None):
"""发送HTTP请求"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
try:
response = self.session.request(
method=method,
url=url,
json=data,
params=params,
timeout=10
)
response.raise_for_status() # 如果状态码不是200,抛出异常
if response.headers.get('Content-Type', '').startswith('application/json'):
return response.json()
else:
return response.text
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f"响应状态码: {e.response.status_code}")
try:
print(f"响应内容: {e.response.json()}")
except:
print(f"响应内容: {e.response.text}")
return None
def get_api_info(self):
"""获取API信息"""
return self.make_request('GET', '/')
def get_users(self):
"""获取所有用户"""
return self.make_request('GET', '/users')
def get_user(self, user_id):
"""获取特定用户"""
return self.make_request('GET', f'/users/{user_id}')
def create_user(self, name, email):
"""创建新用户"""
data = {'name': name, 'email': email}
return self.make_request('POST', '/users', data=data)
def update_user(self, user_id, name=None, email=None):
"""更新用户"""
data = {}
if name:
data['name'] = name
if email:
data['email'] = email
return self.make_request('PUT', f'/users/{user_id}', data=data)
def delete_user(self, user_id):
"""删除用户"""
return self.make_request('DELETE', f'/users/{user_id}')
def get_posts(self):
"""获取所有文章"""
return self.make_request('GET', '/posts')
def get_post(self, post_id):
"""获取特定文章"""
return self.make_request('GET', f'/posts/{post_id}')
def create_post(self, title, content, author_id):
"""创建新文章"""
data = {'title': title, 'content': content, 'author_id': author_id}
return self.make_request('POST', '/posts', data=data)
# 客户端使用示例
def client_demo():
"""演示客户端使用"""
client = APIClient('http://localhost:8000')
print("=== API客户端演示 ===")
# 1. 获取API信息
print("\n1. 获取API信息:")
api_info = client.get_api_info()
if api_info:
print(json.dumps(api_info, indent=2, ensure_ascii=False))
# 2. 获取所有用户
print("\n2. 获取所有用户:")
users = client.get_users()
if users:
print(json.dumps(users, indent=2, ensure_ascii=False))
# 3. 创建新用户
print("\n3. 创建新用户:")
new_user = client.create_user("王五", "wangwu@example.com")
if new_user:
print(json.dumps(new_user, indent=2, ensure_ascii=False))
# 4. 更新用户
print("\n4. 更新用户:")
updated_user = client.update_user(3, name="王五(更新)", email="wangwu_updated@example.com")
if updated_user:
print(json.dumps(updated_user, indent=2, ensure_ascii=False))
# 5. 创建文章
print("\n5. 创建文章:")
new_post = client.create_post(
title="新文章",
content="这是新创建的文章内容",
author_id=3
)
if new_post:
print(json.dumps(new_post, indent=2, ensure_ascii=False))
# 6. 删除用户
print("\n6. 删除用户:")
deleted = client.delete_user(3)
if deleted:
print(json.dumps(deleted, indent=2, ensure_ascii=False))
# 7. 最终用户列表
print("\n7. 最终用户列表:")
final_users = client.get_users()
if final_users:
print(json.dumps(final_users, indent=2, ensure_ascii=False))
# 并发测试客户端
class ConcurrentAPITester:
"""并发API测试器"""
def __init__(self, base_url, num_threads=10):
self.base_url = base_url
self.num_threads = num_threads
self.results = []
self.errors = []
def worker(self, worker_id):
"""工作线程函数"""
client = APIClient(self.base_url)
for i in range(5): # 每个线程执行5次操作
try:
# 随机执行不同的操作
import random
operation = random.choice(['get_users', 'get_posts', 'create_user', 'create_post'])
start_time = time.time()
if operation == 'get_users':
result = client.get_users()
status = 'success' if result else 'failed'
elif operation == 'get_posts':
result = client.get_posts()
status = 'success' if result else 'failed'
elif operation == 'create_user':
name = f"并发用户_{worker_id}_{i}"
email = f"user_{worker_id}_{i}@example.com"
result = client.create_user(name, email)
status = 'success' if result else 'failed'
elif operation == 'create_post':
result = client.create_post(
title=f"并发文章_{worker_id}_{i}",
content=f"内容_{worker_id}_{i}",
author_id=random.randint(1, 2)
)
status = 'success' if result else 'failed'
end_time = time.time()
duration = end_time - start_time
self.results.append({
'worker_id': worker_id,
'operation': operation,
'status': status,
'duration': duration,
'timestamp': time.time()
})
time.sleep(random.uniform(0.1, 0.5)) # 随机延迟
except Exception as e:
self.errors.append({
'worker_id': worker_id,
'error': str(e),
'timestamp': time.time()
})
def run(self):
"""运行并发测试"""
import threading
print(f"开始并发测试,使用 {self.num_threads} 个线程...")
start_time = time.time()
threads = []
for i in range(self.num_threads):
thread = threading.Thread(target=self.worker, args=(i,))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
end_time = time.time()
total_duration = end_time - start_time
# 统计结果
successful = sum(1 for r in self.results if r['status'] == 'success')
failed = sum(1 for r in self.results if r['status'] == 'failed')
# 按操作类型统计
operations = {}
for r in self.results:
op = r['operation']
if op not in operations:
operations[op] = {'success': 0, 'failed': 0, 'total_duration': 0}
operations[op]['success' if r['status'] == 'success' else 'failed'] += 1
operations[op]['total_duration'] += r['duration']
print("\n=== 并发测试结果 ===")
print(f"总耗时: {total_duration:.2f}秒")
print(f"总请求数: {len(self.results)}")
print(f"成功: {successful}")
print(f"失败: {failed}")
print(f"错误数: {len(self.errors)}")
print("\n按操作类型统计:")
for op, stats in operations.items():
avg_duration = stats['total_duration'] / (stats['success'] + stats['failed'])
print(f" {op}:")
print(f" 成功: {stats['success']}")
print(f" 失败: {stats['failed']}")
print(f" 平均耗时: {avg_duration:.3f}秒")
if self.errors:
print("\n错误列表 (前5个):")
for error in self.errors[:5]:
print(f" 线程{error['worker_id']}: {error['error']}")
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == 'test':
# 运行并发测试
tester = ConcurrentAPITester('http://localhost:8000', num_threads=5)
tester.run()
else:
# 运行单客户端演示
client_demo()
第三部分:并发编程深入
将本文的Word文档下载到电脑
推荐度: