Python中级教程 第七课:网络编程、并发与项目实践 1/7

浏览量:41 次 发布时间:2026-01-15 18:05 作者:明扬工控商城 下载docx

最近更新:Python中级教程 第三课:面向对象编程深入与装饰器

第一部分:网络编程基础

1.1 Socket编程基础

python

import socket

import threading

import time


# 简单TCP服务器

def basic_tcp_server():

   """基本的TCP服务器"""

   # 创建socket

   server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

   

   # 设置地址重用

   server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

   

   # 绑定地址和端口

   server_address = ('127.0.0.1', 8888)

   server_socket.bind(server_address)

   

   # 监听连接

   server_socket.listen(5)

   print(f"服务器启动在 {server_address}")

   

   try:

       while True:

           # 接受连接

           client_socket, client_address = server_socket.accept()

           print(f"接受来自 {client_address} 的连接")

           

           # 处理客户端

           handle_client(client_socket, client_address)

   except KeyboardInterrupt:

       print("\n服务器关闭")

   finally:

       server_socket.close()


def handle_client(client_socket, client_address):

   """处理客户端连接"""

   try:

       # 接收数据

       data = client_socket.recv(1024)

       if data:

           message = data.decode('utf-8')

           print(f"来自 {client_address} 的消息: {message}")

           

           # 发送响应

           response = f"服务器收到: {message}"

           client_socket.send(response.encode('utf-8'))

   except Exception as e:

       print(f"处理客户端 {client_address} 时出错: {e}")

   finally:

       client_socket.close()

       print(f"关闭与 {client_address} 的连接")


# 简单TCP客户端

def basic_tcp_client():

   """基本的TCP客户端"""

   client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

   

   try:

       # 连接服务器

       server_address = ('127.0.0.1', 8888)

       client_socket.connect(server_address)

       print(f"连接到服务器 {server_address}")

       

       # 发送消息

       message = "Hello, Server!"

       client_socket.send(message.encode('utf-8'))

       

       # 接收响应

       response = client_socket.recv(1024)

       print(f"服务器响应: {response.decode('utf-8')}")

   

   except ConnectionRefusedError:

       print("连接被拒绝,请确保服务器正在运行")

   except Exception as e:

       print(f"客户端错误: {e}")

   finally:

       client_socket.close()


# 运行示例

if __name__ == "__main__":

   import sys

   

   if len(sys.argv) > 1 and sys.argv[1] == 'server':

       basic_tcp_server()

   else:

       basic_tcp_client()

1.2 多客户端服务器

python

import socket

import threading

import time

from queue import Queue

import json


class MultiClientServer:

   """支持多客户端的服务器"""

   

   def __init__(self, host='127.0.0.1', port=9999, max_clients=10):

       self.host = host

       self.port = port

       self.max_clients = max_clients

       self.server_socket = None

       self.clients = {}  # 客户端字典:{client_id: (socket, address)}

       self.client_counter = 0

       self.running = False

       self.message_queue = Queue()

       self.lock = threading.Lock()

   

   def start(self):

       """启动服务器"""

       self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

       self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

       self.server_socket.bind((self.host, self.port))

       self.server_socket.listen(self.max_clients)

       

       print(f"多客户端服务器启动在 {self.host}:{self.port}")

       print("输入 'quit' 停止服务器")

       

       self.running = True

       

       # 启动消息处理线程

       message_thread = threading.Thread(target=self.process_messages, daemon=True)

       message_thread.start()

       

       # 启动控制台输入线程

       console_thread = threading.Thread(target=self.console_input, daemon=True)

       console_thread.start()

       

       # 接受连接

       self.accept_connections()

   

   def accept_connections(self):

       """接受客户端连接"""

       while self.running:

           try:

               client_socket, client_address = self.server_socket.accept()

               

               with self.lock:

                   self.client_counter += 1

                   client_id = self.client_counter

                   self.clients[client_id] = (client_socket, client_address)

               

               # 启动客户端处理线程

               client_thread = threading.Thread(

                   target=self.handle_client,

                   args=(client_id, client_socket, client_address),

                   daemon=True

               )

               client_thread.start()

               

               print(f"客户端 {client_id} 连接: {client_address}")

               

               # 发送欢迎消息

               welcome_msg = {

                   'type': 'welcome',

                   'client_id': client_id,

                   'message': f'欢迎! 你的ID是 {client_id}'

               }

               self.send_to_client(client_id, welcome_msg)

               

           except OSError:

               # 服务器socket关闭

               break

           except Exception as e:

               print(f"接受连接时出错: {e}")

   

   def handle_client(self, client_id, client_socket, client_address):

       """处理单个客户端"""

       try:

           while self.running:

               try:

                   # 接收数据

                   data = client_socket.recv(4096)

                   if not data:

                       break

                   

                   # 解析JSON消息

                   try:

                       message = json.loads(data.decode('utf-8'))

                       message['from_client'] = client_id

                       message['timestamp'] = time.time()

                       

                       # 将消息放入队列

                       self.message_queue.put(message)

                       

                   except json.JSONDecodeError:

                       print(f"客户端 {client_id} 发送了无效的JSON")

                       error_msg = {'type': 'error', 'message': '无效的JSON格式'}

                       client_socket.send(json.dumps(error_msg).encode('utf-8'))

               

               except ConnectionResetError:

                   break

               except Exception as e:

                   print(f"处理客户端 {client_id} 数据时出错: {e}")

                   break

       

       finally:

           # 客户端断开连接

           with self.lock:

               if client_id in self.clients:

                   del self.clients[client_id]

           

           client_socket.close()

           print(f"客户端 {client_id} 断开连接")

           

           # 广播断开消息

           disconnect_msg = {

               'type': 'client_disconnected',

               'client_id': client_id

           }

           self.broadcast(disconnect_msg, exclude=client_id)

   

   def process_messages(self):

       """处理消息队列"""

       while self.running:

           try:

               message = self.message_queue.get(timeout=1)

               

               # 处理不同类型的消息

               msg_type = message.get('type', 'unknown')

               

               if msg_type == 'chat':

                   # 聊天消息,广播给所有客户端

                   self.broadcast(message)

               

               elif msg_type == 'private':

                   # 私聊消息

                   to_client = message.get('to_client')

                   if to_client:

                       self.send_to_client(to_client, message)

               

               elif msg_type == 'command':

                   # 命令消息

                   command = message.get('command')

                   if command == 'list_clients':

                       self.list_clients(message['from_client'])

               

               else:

                   print(f"未知消息类型: {msg_type}")

               

           except Queue.Empty:

               continue

           except Exception as e:

               print(f"处理消息时出错: {e}")

   

   def send_to_client(self, client_id, message):

       """发送消息给特定客户端"""

       with self.lock:

           if client_id in self.clients:

               client_socket, _ = self.clients[client_id]

               try:

                   client_socket.send(json.dumps(message).encode('utf-8'))

               except Exception as e:

                   print(f"发送消息到客户端 {client_id} 时出错: {e}")

   

   def broadcast(self, message, exclude=None):

       """广播消息给所有客户端"""

       with self.lock:

           for client_id in self.clients:

               if client_id != exclude:

                   self.send_to_client(client_id, message)

   

   def list_clients(self, requester_id):

       """列出所有客户端"""

       with self.lock:

           client_list = [

               {'id': cid, 'address': addr[0]}

               for cid, (_, addr) in self.clients.items()

           ]

       

       response = {

           'type': 'client_list',

           'clients': client_list

       }

       

       self.send_to_client(requester_id, response)

   

   def console_input(self):

       """处理控制台输入"""

       while self.running:

           try:

               cmd = input("服务器> ").strip()

               

               if cmd.lower() == 'quit':

                   self.stop()

                   break

               elif cmd.lower() == 'list':

                   print("已连接客户端:")

                   with self.lock:

                       for cid, (_, addr) in self.clients.items():

                           print(f"  {cid}: {addr}")

               elif cmd.lower() == 'count':

                   with self.lock:

                       print(f"在线客户端数: {len(self.clients)}")

               elif cmd.startswith('broadcast '):

                   msg = cmd[10:]

                   broadcast_msg = {

                       'type': 'server_message',

                       'message': msg,

                       'timestamp': time.time()

                   }

                   self.broadcast(broadcast_msg)

                   print(f"已广播: {msg}")

               else:

                   print("可用命令: quit, list, count, broadcast <消息>")

           

           except EOFError:

               break

           except Exception as e:

               print(f"控制台输入错误: {e}")

   

   def stop(self):

       """停止服务器"""

       print("正在停止服务器...")

       self.running = False

       

       # 关闭所有客户端连接

       with self.lock:

           for client_id, (client_socket, _) in self.clients.items():

               try:

                   disconnect_msg = {'type': 'server_shutdown', 'message': '服务器关闭'}

                   client_socket.send(json.dumps(disconnect_msg).encode('utf-8'))

                   client_socket.close()

               except:

                   pass

       

       # 关闭服务器socket

       if self.server_socket:

           self.server_socket.close()

       

       print("服务器已停止")


# 客户端类

class ChatClient:

   """聊天客户端"""

   

   def __init__(self, host='127.0.0.1', port=9999):

       self.host = host

       self.port = port

       self.client_socket = None

       self.client_id = None

       self.running = False

   

   def connect(self):

       """连接到服务器"""

       self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

       

       try:

           self.client_socket.connect((self.host, self.port))

           self.running = True

           

           # 启动接收线程

           receive_thread = threading.Thread(target=self.receive_messages, daemon=True)

           receive_thread.start()

           

           print(f"已连接到服务器 {self.host}:{self.port}")

           print("输入消息发送聊天,命令:")

           print("  /list - 列出在线用户")

           print("  /msg <id> <消息> - 私聊")

           print("  /quit - 退出")

           

           self.send_input()

           

       except ConnectionRefusedError:

           print("连接被拒绝,请确保服务器正在运行")

       except Exception as e:

           print(f"连接错误: {e}")

   

   def receive_messages(self):

       """接收服务器消息"""

       while self.running:

           try:

               data = self.client_socket.recv(4096)

               if not data:

                   print("与服务器的连接断开")

                   self.running = False

                   break

               

               message = json.loads(data.decode('utf-8'))

               self.handle_message(message)

           

           except json.JSONDecodeError:

               print("收到无效的JSON数据")

           except ConnectionResetError:

               print("与服务器的连接被重置")

               self.running = False

               break

           except Exception as e:

               print(f"接收消息时出错: {e}")

               self.running = False

               break

   

   def handle_message(self, message):

       """处理收到的消息"""

       msg_type = message.get('type', 'unknown')

       

       if msg_type == 'welcome':

           self.client_id = message.get('client_id')

           print(f"\n{message.get('message')}")

       

       elif msg_type == 'chat':

           from_client = message.get('from_client')

           msg = message.get('message', '')

           timestamp = message.get('timestamp')

           

           time_str = time.strftime('%H:%M:%S', time.localtime(timestamp))

           print(f"\n[{time_str}] 用户{from_client}: {msg}")

       

       elif msg_type == 'private':

           from_client = message.get('from_client')

           msg = message.get('message', '')

           timestamp = message.get('timestamp')

           

           time_str = time.strftime('%H:%M:%S', time.localtime(timestamp))

           print(f"\n[{time_str}] 私聊来自用户{from_client}: {msg}")

       

       elif msg_type == 'server_message':

           msg = message.get('message', '')

           print(f"\n[服务器]: {msg}")

       

       elif msg_type == 'client_list':

           clients = message.get('clients', [])

           print("\n在线用户:")

           for client in clients:

               print(f"  用户{client['id']}: {client['address']}")

       

       elif msg_type == 'client_disconnected':

           client_id = message.get('client_id')

           print(f"\n用户{client_id} 已断开连接")

       

       elif msg_type == 'server_shutdown':

           print(f"\n服务器关闭: {message.get('message')}")

           self.running = False

       

       else:

           print(f"\n未知消息类型: {message}")

   

   def send_message(self, msg_type, **kwargs):

       """发送消息到服务器"""

       if not self.running or not self.client_socket:

           return

       

       message = {'type': msg_type, **kwargs}

       

       try:

           self.client_socket.send(json.dumps(message).encode('utf-8'))

       except Exception as e:

           print(f"发送消息时出错: {e}")

           self.running = False

   

   def send_input(self):

       """处理用户输入"""

       while self.running:

           try:

               user_input = input("> ").strip()

               

               if not user_input:

                   continue

               

               if user_input.lower() == '/quit':

                   print("断开连接...")

                   self.running = False

                   break

               

               elif user_input.lower() == '/list':

                   self.send_message('command', command='list_clients')

               

               elif user_input.startswith('/msg '):

                   parts = user_input[5:].split(' ', 1)

                   if len(parts) == 2:

                       to_client, msg = parts

                       try:

                           to_client = int(to_client)

                           self.send_message('private', to_client=to_client, message=msg)

                       except ValueError:

                           print("用户ID必须是数字")

                   else:

                       print("用法: /msg <用户ID> <消息>")

               

               else:

                   # 普通聊天消息

                   self.send_message('chat', message=user_input)

           

           except EOFError:

               print("\n输入结束")

               self.running = False

               break

           except KeyboardInterrupt:

               print("\n中断连接")

               self.running = False

               break

       

       # 关闭连接

       if self.client_socket:

           self.client_socket.close()


# 使用示例

if __name__ == "__main__":

   import sys

   

   if len(sys.argv) > 1:

       if sys.argv[1] == 'server':

           server = MultiClientServer()

           server.start()

       elif sys.argv[1] == 'client':

           client = ChatClient()

           client.connect()

       else:

           print("用法: python script.py [server|client]")

   else:

       print("请指定模式: server 或 client")

第二部分:HTTP客户端与服务器

2.1 使用http.server创建Web服务器

python

from http.server import HTTPServer, BaseHTTPRequestHandler

import json

import urllib.parse

import time


class SimpleAPIHandler(BaseHTTPRequestHandler):

   """简单的API请求处理器"""

   

   # 内存数据库

   database = {

       'users': {

           1: {'id': 1, 'name': '张三', 'email': 'zhangsan@example.com'},

           2: {'id': 2, 'name': '李四', 'email': 'lisi@example.com'},

       },

       'posts': {

           1: {'id': 1, 'title': '第一篇', 'content': '这是第一篇内容', 'author_id': 1},

           2: {'id': 2, 'title': '第二篇', 'content': '这是第二篇内容', 'author_id': 2},

       }

   }

   

   def do_GET(self):

       """处理GET请求"""

       # 解析URL路径

       parsed_path = urllib.parse.urlparse(self.path)

       path_parts = parsed_path.path.strip('/').split('/')

       

       # 设置响应头

       self.send_response(200)

       self.send_header('Content-type', 'application/json')

       self.send_header('Access-Control-Allow-Origin', '*')

       self.end_headers()

       

       response = {}

       

       try:

           if path_parts[0] == '':

               # 根路径,返回API信息

               response = {

                   'api': 'Simple API Server',

                   'version': '1.0',

                   'endpoints': {

                       '/users': '获取所有用户',

                       '/users/{id}': '获取特定用户',

                       '/posts': '获取所有文章',

                       '/posts/{id}': '获取特定文章'

                   }

               }

           

           elif path_parts[0] == 'users':

               if len(path_parts) == 1:

                   # 获取所有用户

                   users = list(self.database['users'].values())

                   response = {'users': users}

               elif len(path_parts) == 2:

                   # 获取特定用户

                   user_id = int(path_parts[1])

                   user = self.database['users'].get(user_id)

                   if user:

                       response = {'user': user}

                   else:

                       self.send_error(404, '用户不存在')

                       return

           

           elif path_parts[0] == 'posts':

               if len(path_parts) == 1:

                   # 获取所有文章

                   posts = list(self.database['posts'].values())

                   response = {'posts': posts}

               elif len(path_parts) == 2:

                   # 获取特定文章

                   post_id = int(path_parts[1])

                   post = self.database['posts'].get(post_id)

                   if post:

                       response = {'post': post}

                   else:

                       self.send_error(404, '文章不存在')

                       return

           

           else:

               self.send_error(404, '路径不存在')

               return

       

       except Exception as e:

           self.send_error(500, f'服务器错误: {e}')

           return

       

       # 发送响应

       self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8'))

   

   def do_POST(self):

       """处理POST请求"""

       content_length = int(self.headers.get('Content-Length', 0))

       post_data = self.rfile.read(content_length)

       

       # 解析JSON数据

       try:

           data = json.loads(post_data.decode('utf-8'))

       except json.JSONDecodeError:

           self.send_error(400, '无效的JSON数据')

           return

       

       # 解析URL路径

       parsed_path = urllib.parse.urlparse(self.path)

       path_parts = parsed_path.path.strip('/').split('/')

       

       response = {}

       

       try:

           if path_parts[0] == 'users' and len(path_parts) == 1:

               # 创建新用户

               new_id = max(self.database['users'].keys()) + 1

               new_user = {

                   'id': new_id,

                   'name': data.get('name'),

                   'email': data.get('email')

               }

               

               # 验证数据

               if not new_user['name'] or not new_user['email']:

                   self.send_error(400, '缺少必要字段')

                   return

               

               self.database['users'][new_id] = new_user

               response = {'user': new_user, 'message': '用户创建成功'}

           

           elif path_parts[0] == 'posts' and len(path_parts) == 1:

               # 创建新文章

               new_id = max(self.database['posts'].keys()) + 1

               new_post = {

                   'id': new_id,

                   'title': data.get('title'),

                   'content': data.get('content'),

                   'author_id': data.get('author_id'),

                   'created_at': time.time()

               }

               

               # 验证数据

               if not new_post['title'] or not new_post['content'] or not new_post['author_id']:

                   self.send_error(400, '缺少必要字段')

                   return

               

               # 验证作者是否存在

               if new_post['author_id'] not in self.database['users']:

                   self.send_error(400, '作者不存在')

                   return

               

               self.database['posts'][new_id] = new_post

               response = {'post': new_post, 'message': '文章创建成功'}

           

           else:

               self.send_error(404, '路径不存在')

               return

       

       except Exception as e:

           self.send_error(500, f'服务器错误: {e}')

           return

       

       # 发送响应

       self.send_response(201)  # 201 Created

       self.send_header('Content-type', 'application/json')

       self.send_header('Access-Control-Allow-Origin', '*')

       self.end_headers()

       

       self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8'))

   

   def do_PUT(self):

       """处理PUT请求"""

       content_length = int(self.headers.get('Content-Length', 0))

       put_data = self.rfile.read(content_length)

       

       # 解析JSON数据

       try:

           data = json.loads(put_data.decode('utf-8'))

       except json.JSONDecodeError:

           self.send_error(400, '无效的JSON数据')

           return

       

       # 解析URL路径

       parsed_path = urllib.parse.urlparse(self.path)

       path_parts = parsed_path.path.strip('/').split('/')

       

       response = {}

       

       try:

           if path_parts[0] == 'users' and len(path_parts) == 2:

               # 更新用户

               user_id = int(path_parts[1])

               

               if user_id not in self.database['users']:

                   self.send_error(404, '用户不存在')

                   return

               

               # 更新用户信息

               user = self.database['users'][user_id]

               user.update({

                   'name': data.get('name', user['name']),

                   'email': data.get('email', user['email'])

               })

               

               response = {'user': user, 'message': '用户更新成功'}

           

           else:

               self.send_error(404, '路径不存在')

               return

       

       except Exception as e:

           self.send_error(500, f'服务器错误: {e}')

           return

       

       # 发送响应

       self.send_response(200)

       self.send_header('Content-type', 'application/json')

       self.send_header('Access-Control-Allow-Origin', '*')

       self.end_headers()

       

       self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8'))

   

   def do_DELETE(self):

       """处理DELETE请求"""

       # 解析URL路径

       parsed_path = urllib.parse.urlparse(self.path)

       path_parts = parsed_path.path.strip('/').split('/')

       

       response = {}

       

       try:

           if path_parts[0] == 'users' and len(path_parts) == 2:

               # 删除用户

               user_id = int(path_parts[1])

               

               if user_id not in self.database['users']:

                   self.send_error(404, '用户不存在')

                   return

               

               # 删除用户(同时删除相关文章)

               deleted_user = self.database['users'].pop(user_id)

               

               # 删除该用户的所有文章

               post_ids_to_delete = [

                   pid for pid, post in self.database['posts'].items()

                   if post['author_id'] == user_id

               ]

               for pid in post_ids_to_delete:

                   del self.database['posts'][pid]

               

               response = {

                   'message': '用户删除成功',

                   'deleted_user': deleted_user,

                   'deleted_posts': len(post_ids_to_delete)

               }

           

           else:

               self.send_error(404, '路径不存在')

               return

       

       except Exception as e:

           self.send_error(500, f'服务器错误: {e}")

           return

       

       # 发送响应

       self.send_response(200)

       self.send_header('Content-type', 'application/json')

       self.send_header('Access-Control-Allow-Origin', '*')

       self.end_headers()

       

       self.wfile.write(json.dumps(response, ensure_ascii=False).encode('utf-8'))

   

   def do_OPTIONS(self):

       """处理OPTIONS请求(CORS预检)"""

       self.send_response(200)

       self.send_header('Access-Control-Allow-Origin', '*')

       self.send_header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS')

       self.send_header('Access-Control-Allow-Headers', 'Content-Type')

       self.end_headers()

   

   def log_message(self, format, *args):

       """自定义日志格式"""

       print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {format % args}")


# 启动服务器

def run_server(port=8000):

   """运行HTTP服务器"""

   server_address = ('', port)

   httpd = HTTPServer(server_address, SimpleAPIHandler)

   

   print(f"API服务器启动在端口 {port}")

   print("访问 http://localhost:{port}/ 查看API文档")

   print("按 Ctrl+C 停止服务器")

   

   try:

       httpd.serve_forever()

   except KeyboardInterrupt:

       print("\n服务器停止")

   finally:

       httpd.server_close()


if __name__ == "__main__":

   run_server(8000)

2.2 HTTP客户端


明扬工控商城

推荐阅读:

Python中级教程 第六课:高级特性与性能优化 2/2

Python中级教程 第六课:高级特性与性能优化 1/2

Python中级教程 第五课:异步编程与并发 2/2

Python中级教程 第五课:异步编程与并发 1/2

Python中级教程 第四课:上下文管理器、生成器与迭代器

Python中级教程 第三课:面向对象编程深入与装饰器

热门标签:
Python中级教程 第七课:网络编程、并发与项目实践 1/7.docx

将本文的Word文档下载到电脑

推荐度:

下载

全部评论

请登录
产业新闻-明扬资讯网
科技资讯-明扬资讯网