最近更新:Python中级教程 第六课:高级特性与性能优化 1/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 2/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 1/2
2026-01-15
最近更新:Python中级教程 第四课:上下文管理器、生成器与迭代器
2026-01-15
最近更新:Python中级教程 第三课:面向对象编程深入与装饰器
2026-01-15
浏览量:41 次 发布时间:2026-01-15 18:05 作者:明扬工控商城 下载docx
最近更新:Python中级教程 第六课:高级特性与性能优化 1/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 2/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 1/2
2026-01-15
最近更新:Python中级教程 第四课:上下文管理器、生成器与迭代器
2026-01-15
最近更新:Python中级教程 第三课:面向对象编程深入与装饰器
2026-01-15
第一部分:网络编程基础
1.1 Socket编程基础
python
import socket
import threading
import time
# 简单TCP服务器
def basic_tcp_server():
"""基本的TCP服务器"""
# 创建socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置地址重用
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定地址和端口
server_address = ('127.0.0.1', 8888)
server_socket.bind(server_address)
# 监听连接
server_socket.listen(5)
print(f"服务器启动在 {server_address}")
try:
while True:
# 接受连接
client_socket, client_address = server_socket.accept()
print(f"接受来自 {client_address} 的连接")
# 处理客户端
handle_client(client_socket, client_address)
except KeyboardInterrupt:
print("\n服务器关闭")
finally:
server_socket.close()
def handle_client(client_socket, client_address):
"""处理客户端连接"""
try:
# 接收数据
data = client_socket.recv(1024)
if data:
message = data.decode('utf-8')
print(f"来自 {client_address} 的消息: {message}")
# 发送响应
response = f"服务器收到: {message}"
client_socket.send(response.encode('utf-8'))
except Exception as e:
print(f"处理客户端 {client_address} 时出错: {e}")
finally:
client_socket.close()
print(f"关闭与 {client_address} 的连接")
# 简单TCP客户端
def basic_tcp_client():
"""基本的TCP客户端"""
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# 连接服务器
server_address = ('127.0.0.1', 8888)
client_socket.connect(server_address)
print(f"连接到服务器 {server_address}")
# 发送消息
message = "Hello, Server!"
client_socket.send(message.encode('utf-8'))
# 接收响应
response = client_socket.recv(1024)
print(f"服务器响应: {response.decode('utf-8')}")
except ConnectionRefusedError:
print("连接被拒绝,请确保服务器正在运行")
except Exception as e:
print(f"客户端错误: {e}")
finally:
client_socket.close()
# 运行示例
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == 'server':
basic_tcp_server()
else:
basic_tcp_client()
1.2 多客户端服务器
python
import socket
import threading
import time
from queue import Queue
import json
class MultiClientServer:
"""支持多客户端的服务器"""
def __init__(self, host='127.0.0.1', port=9999, max_clients=10):
self.host = host
self.port = port
self.max_clients = max_clients
self.server_socket = None
self.clients = {} # 客户端字典:{client_id: (socket, address)}
self.client_counter = 0
self.running = False
self.message_queue = Queue()
self.lock = threading.Lock()
def start(self):
"""启动服务器"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(self.max_clients)
print(f"多客户端服务器启动在 {self.host}:{self.port}")
print("输入 'quit' 停止服务器")
self.running = True
# 启动消息处理线程
message_thread = threading.Thread(target=self.process_messages, daemon=True)
message_thread.start()
# 启动控制台输入线程
console_thread = threading.Thread(target=self.console_input, daemon=True)
console_thread.start()
# 接受连接
self.accept_connections()
def accept_connections(self):
"""接受客户端连接"""
while self.running:
try:
client_socket, client_address = self.server_socket.accept()
with self.lock:
self.client_counter += 1
client_id = self.client_counter
self.clients[client_id] = (client_socket, client_address)
# 启动客户端处理线程
client_thread = threading.Thread(
target=self.handle_client,
args=(client_id, client_socket, client_address),
daemon=True
)
client_thread.start()
print(f"客户端 {client_id} 连接: {client_address}")
# 发送欢迎消息
welcome_msg = {
'type': 'welcome',
'client_id': client_id,
'message': f'欢迎! 你的ID是 {client_id}'
}
self.send_to_client(client_id, welcome_msg)
except OSError:
# 服务器socket关闭
break
except Exception as e:
print(f"接受连接时出错: {e}")
def handle_client(self, client_id, client_socket, client_address):
"""处理单个客户端"""
try:
while self.running:
try:
# 接收数据
data = client_socket.recv(4096)
if not data:
break
# 解析JSON消息
try:
message = json.loads(data.decode('utf-8'))
message['from_client'] = client_id
message['timestamp'] = time.time()
# 将消息放入队列
self.message_queue.put(message)
except json.JSONDecodeError:
print(f"客户端 {client_id} 发送了无效的JSON")
error_msg = {'type': 'error', 'message': '无效的JSON格式'}
client_socket.send(json.dumps(error_msg).encode('utf-8'))
except ConnectionResetError:
break
except Exception as e:
print(f"处理客户端 {client_id} 数据时出错: {e}")
break
finally:
# 客户端断开连接
with self.lock:
if client_id in self.clients:
del self.clients[client_id]
client_socket.close()
print(f"客户端 {client_id} 断开连接")
# 广播断开消息
disconnect_msg = {
'type': 'client_disconnected',
'client_id': client_id
}
self.broadcast(disconnect_msg, exclude=client_id)
def process_messages(self):
"""处理消息队列"""
while self.running:
try:
message = self.message_queue.get(timeout=1)
# 处理不同类型的消息
msg_type = message.get('type', 'unknown')
if msg_type == 'chat':
# 聊天消息,广播给所有客户端
self.broadcast(message)
elif msg_type == 'private':
# 私聊消息
to_client = message.get('to_client')
if to_client:
self.send_to_client(to_client, message)
elif msg_type == 'command':
# 命令消息
command = message.get('command')
if command == 'list_clients':
self.list_clients(message['from_client'])
else:
print(f"未知消息类型: {msg_type}")
except Queue.Empty:
continue
except Exception as e:
print(f"处理消息时出错: {e}")
def send_to_client(self, client_id, message):
"""发送消息给特定客户端"""
with self.lock:
if client_id in self.clients:
client_socket, _ = self.clients[client_id]
try:
client_socket.send(json.dumps(message).encode('utf-8'))
except Exception as e:
print(f"发送消息到客户端 {client_id} 时出错: {e}")
def broadcast(self, message, exclude=None):
"""广播消息给所有客户端"""
with self.lock:
for client_id in self.clients:
if client_id != exclude:
self.send_to_client(client_id, message)
def list_clients(self, requester_id):
"""列出所有客户端"""
with self.lock:
client_list = [
{'id': cid, 'address': addr[0]}
for cid, (_, addr) in self.clients.items()
]
response = {
'type': 'client_list',
'clients': client_list
}
self.send_to_client(requester_id, response)
def console_input(self):
"""处理控制台输入"""
while self.running:
try:
cmd = input("服务器> ").strip()
if cmd.lower() == 'quit':
self.stop()
break
elif cmd.lower() == 'list':
print("已连接客户端:")
with self.lock:
for cid, (_, addr) in self.clients.items():
print(f" {cid}: {addr}")
elif cmd.lower() == 'count':
with self.lock:
print(f"在线客户端数: {len(self.clients)}")
elif cmd.startswith('broadcast '):
msg = cmd[10:]
broadcast_msg = {
'type': 'server_message',
'message': msg,
'timestamp': time.time()
}
self.broadcast(broadcast_msg)
print(f"已广播: {msg}")
else:
print("可用命令: quit, list, count, broadcast <消息>")
except EOFError:
break
except Exception as e:
print(f"控制台输入错误: {e}")
def stop(self):
"""停止服务器"""
print("正在停止服务器...")
self.running = False
# 关闭所有客户端连接
with self.lock:
for client_id, (client_socket, _) in self.clients.items():
try:
disconnect_msg = {'type': 'server_shutdown', 'message': '服务器关闭'}
client_socket.send(json.dumps(disconnect_msg).encode('utf-8'))
client_socket.close()
except:
pass
# 关闭服务器socket
if self.server_socket:
self.server_socket.close()
print("服务器已停止")
# 客户端类
class ChatClient:
"""聊天客户端"""
def __init__(self, host='127.0.0.1', port=9999):
self.host = host
self.port = port
self.client_socket = None
self.client_id = None
self.running = False
def connect(self):
"""连接到服务器"""
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.client_socket.connect((self.host, self.port))
self.running = True
# 启动接收线程
receive_thread = threading.Thread(target=self.receive_messages, daemon=True)
receive_thread.start()
print(f"已连接到服务器 {self.host}:{self.port}")
print("输入消息发送聊天,命令:")
print(" /list - 列出在线用户")
print(" /msg <id> <消息> - 私聊")
print(" /quit - 退出")
self.send_input()
except ConnectionRefusedError:
print("连接被拒绝,请确保服务器正在运行")
except Exception as e:
print(f"连接错误: {e}")
def receive_messages(self):
"""接收服务器消息"""
while self.running:
try:
data = self.client_socket.recv(4096)
if not data:
print("与服务器的连接断开")
self.running = False
break
message = json.loads(data.decode('utf-8'))
self.handle_message(message)
except json.JSONDecodeError:
print("收到无效的JSON数据")
except ConnectionResetError:
print("与服务器的连接被重置")
self.running = False
break
except Exception as e:
print(f"接收消息时出错: {e}")
self.running = False
break
def handle_message(self, message):
"""处理收到的消息"""
msg_type = message.get('type', 'unknown')
if msg_type == 'welcome':
self.client_id = message.get('client_id')
print(f"\n{message.get('message')}")
elif msg_type == 'chat':
from_client = message.get('from_client')
msg = message.get('message', '')
timestamp = message.get('timestamp')
time_str = time.strftime('%H:%M:%S', time.localtime(timestamp))
print(f"\n[{time_str}] 用户{from_client}: {msg}")
elif msg_type == 'private':
from_client = message.get('from_client')
msg = message.get('message', '')
timestamp = message.get('timestamp')
time_str = time.strftime('%H:%M:%S', time.localtime(timestamp))
print(f"\n[{time_str}] 私聊来自用户{from_client}: {msg}")
elif msg_type == 'server_message':
msg = message.get('message', '')
print(f"\n[服务器]: {msg}")
elif msg_type == 'client_list':
clients = message.get('clients', [])
print("\n在线用户:")
for client in clients:
print(f" 用户{client['id']}: {client['address']}")
elif msg_type == 'client_disconnected':
client_id = message.get('client_id')
print(f"\n用户{client_id} 已断开连接")
elif msg_type == 'server_shutdown':
print(f"\n服务器关闭: {message.get('message')}")
self.running = False
else:
print(f"\n未知消息类型: {message}")
def send_message(self, msg_type, **kwargs):
"""发送消息到服务器"""
if not self.running or not self.client_socket:
return
message = {'type': msg_type, **kwargs}
try:
self.client_socket.send(json.dumps(message).encode('utf-8'))
except Exception as e:
print(f"发送消息时出错: {e}")
self.running = False
def send_input(self):
"""处理用户输入"""
while self.running:
try:
user_input = input("> ").strip()
if not user_input:
continue
if user_input.lower() == '/quit':
print("断开连接...")
self.running = False
break
elif user_input.lower() == '/list':
self.send_message('command', command='list_clients')
elif user_input.startswith('/msg '):
parts = user_input[5:].split(' ', 1)
if len(parts) == 2:
to_client, msg = parts
try:
to_client = int(to_client)
self.send_message('private', to_client=to_client, message=msg)
except ValueError:
print("用户ID必须是数字")
else:
print("用法: /msg <用户ID> <消息>")
else:
# 普通聊天消息
self.send_message('chat', message=user_input)
except EOFError:
print("\n输入结束")
self.running = False
break
except KeyboardInterrupt:
print("\n中断连接")
self.running = False
break
# 关闭连接
if self.client_socket:
self.client_socket.close()
# 使用示例
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
if sys.argv[1] == 'server':
server = MultiClientServer()
server.start()
elif sys.argv[1] == 'client':
client = ChatClient()
client.connect()
else:
print("用法: python script.py [server|client]")
else:
print("请指定模式: server 或 client")
第二部分:HTTP客户端与服务器
2.1 使用http.server创建Web服务器
python
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import urllib.parse
import time
class SimpleAPIHandler(BaseHTTPRequestHandler):
"""简单的API请求处理器"""
# 内存数据库
database = {
'users': {
1: {'id': 1, 'name': '张三', 'email': 'zhangsan@example.com'},
2: {'id': 2, 'name': '李四', 'email': 'lisi@example.com'},
},
'posts': {
1: {'id': 1, 'title': '第一篇', 'content': '这是第一篇内容', 'author_id': 1},
2: {'id': 2, 'title': '第二篇', 'content': '这是第二篇内容', 'author_id': 2},
}
}
def do_GET(self):
"""处理GET请求"""
# 解析URL路径
parsed_path = urllib.parse.urlparse(self.path)
path_parts = parsed_path.path.strip('/').split('/')
# 设置响应头
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
response = {}
try:
if path_parts[0] == '':
# 根路径,返回API信息
response = {
'api': 'Simple API Server',
'version': '1.0',
'endpoints': {
'/users': '获取所有用户',
'/users/{id}': '获取特定用户',
'/posts': '获取所有文章',
'/posts/{id}': '获取特定文章'
}
}
elif path_parts[0] == 'users':
if len(path_parts) == 1:
# 获取所有用户
users = list(self.database['users'].values())
response = {'users': users}
elif len(path_parts) == 2:
# 获取特定用户
user_id = int(path_parts[1])
user = self.database['users'].get(user_id)
if user:
response = {'user': user}
else:
self.send_error(404, '用户不存在')
return
elif path_parts[0] == 'posts':
if len(path_parts) == 1:
# 获取所有文章
posts = list(self.database['posts'].values())
response = {'posts': posts}
elif len(path_parts) == 2:
# 获取特定文章
post_id = int(path_parts[1])
post = self.database['posts'].get(post_id)
if post:
response = {'post': post}
else:
self.send_error(404, '文章不存在')
return
else:
self.send_error(404, '路径不存在')
return
except Exception as e:
self.send_error(500, f'服务器错误: {e}')
return
# 发送响应
self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8'))
def do_POST(self):
"""处理POST请求"""
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length)
# 解析JSON数据
try:
data = json.loads(post_data.decode('utf-8'))
except json.JSONDecodeError:
self.send_error(400, '无效的JSON数据')
return
# 解析URL路径
parsed_path = urllib.parse.urlparse(self.path)
path_parts = parsed_path.path.strip('/').split('/')
response = {}
try:
if path_parts[0] == 'users' and len(path_parts) == 1:
# 创建新用户
new_id = max(self.database['users'].keys()) + 1
new_user = {
'id': new_id,
'name': data.get('name'),
'email': data.get('email')
}
# 验证数据
if not new_user['name'] or not new_user['email']:
self.send_error(400, '缺少必要字段')
return
self.database['users'][new_id] = new_user
response = {'user': new_user, 'message': '用户创建成功'}
elif path_parts[0] == 'posts' and len(path_parts) == 1:
# 创建新文章
new_id = max(self.database['posts'].keys()) + 1
new_post = {
'id': new_id,
'title': data.get('title'),
'content': data.get('content'),
'author_id': data.get('author_id'),
'created_at': time.time()
}
# 验证数据
if not new_post['title'] or not new_post['content'] or not new_post['author_id']:
self.send_error(400, '缺少必要字段')
return
# 验证作者是否存在
if new_post['author_id'] not in self.database['users']:
self.send_error(400, '作者不存在')
return
self.database['posts'][new_id] = new_post
response = {'post': new_post, 'message': '文章创建成功'}
else:
self.send_error(404, '路径不存在')
return
except Exception as e:
self.send_error(500, f'服务器错误: {e}')
return
# 发送响应
self.send_response(201) # 201 Created
self.send_header('Content-type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8'))
def do_PUT(self):
"""处理PUT请求"""
content_length = int(self.headers.get('Content-Length', 0))
put_data = self.rfile.read(content_length)
# 解析JSON数据
try:
data = json.loads(put_data.decode('utf-8'))
except json.JSONDecodeError:
self.send_error(400, '无效的JSON数据')
return
# 解析URL路径
parsed_path = urllib.parse.urlparse(self.path)
path_parts = parsed_path.path.strip('/').split('/')
response = {}
try:
if path_parts[0] == 'users' and len(path_parts) == 2:
# 更新用户
user_id = int(path_parts[1])
if user_id not in self.database['users']:
self.send_error(404, '用户不存在')
return
# 更新用户信息
user = self.database['users'][user_id]
user.update({
'name': data.get('name', user['name']),
'email': data.get('email', user['email'])
})
response = {'user': user, 'message': '用户更新成功'}
else:
self.send_error(404, '路径不存在')
return
except Exception as e:
self.send_error(500, f'服务器错误: {e}')
return
# 发送响应
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8'))
def do_DELETE(self):
"""处理DELETE请求"""
# 解析URL路径
parsed_path = urllib.parse.urlparse(self.path)
path_parts = parsed_path.path.strip('/').split('/')
response = {}
try:
if path_parts[0] == 'users' and len(path_parts) == 2:
# 删除用户
user_id = int(path_parts[1])
if user_id not in self.database['users']:
self.send_error(404, '用户不存在')
return
# 删除用户(同时删除相关文章)
deleted_user = self.database['users'].pop(user_id)
# 删除该用户的所有文章
post_ids_to_delete = [
pid for pid, post in self.database['posts'].items()
if post['author_id'] == user_id
]
for pid in post_ids_to_delete:
del self.database['posts'][pid]
response = {
'message': '用户删除成功',
'deleted_user': deleted_user,
'deleted_posts': len(post_ids_to_delete)
}
else:
self.send_error(404, '路径不存在')
return
except Exception as e:
self.send_error(500, f'服务器错误: {e}")
return
# 发送响应
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8'))
def do_OPTIONS(self):
"""处理OPTIONS请求(CORS预检)"""
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
def log_message(self, format, *args):
"""自定义日志格式"""
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {format % args}")
# 启动服务器
def run_server(port=8000):
"""运行HTTP服务器"""
server_address = ('', port)
httpd = HTTPServer(server_address, SimpleAPIHandler)
print(f"API服务器启动在端口 {port}")
print("访问 http://localhost:{port}/ 查看API文档")
print("按 Ctrl+C 停止服务器")
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\n服务器停止")
finally:
httpd.server_close()
if __name__ == "__main__":
run_server(8000)
2.2 HTTP客户端
将本文的Word文档下载到电脑
推荐度: