最近更新:Python中级教程 第六课:高级特性与性能优化 1/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 2/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 1/2
2026-01-15
最近更新:Python中级教程 第四课:上下文管理器、生成器与迭代器
2026-01-15
最近更新:Python中级教程 第三课:面向对象编程深入与装饰器
2026-01-15
浏览量:27 次 发布时间:2026-01-15 18:50 作者:明扬工控商城 下载docx
最近更新:Python中级教程 第六课:高级特性与性能优化 1/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 2/2
2026-01-15
最近更新:Python中级教程 第五课:异步编程与并发 1/2
2026-01-15
最近更新:Python中级教程 第四课:上下文管理器、生成器与迭代器
2026-01-15
最近更新:Python中级教程 第三课:面向对象编程深入与装饰器
2026-01-15
第一部分:上下文管理器(with语句)
1. 什么是上下文管理器
上下文管理器是Python中用于管理资源的对象,确保资源在使用后被正确清理。
python
# 传统方式处理文件
file = open('example.txt', 'r')
try:
content = file.read()
finally:
file.close()
# 使用上下文管理器
with open('example.txt', 'r') as file:
content = file.read()
# 文件会自动关闭
2. 实现自定义上下文管理器
2.1 基于类的实现
python
class DatabaseConnection:
def __init__(self, db_name):
self.db_name = db_name
self.connection = None
def __enter__(self):
print(f"连接到数据库: {self.db_name}")
# 模拟数据库连接
self.connection = f"Connection to {self.db_name}"
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print(f"关闭数据库连接: {self.db_name}")
self.connection = None
# 如果发生异常,exc_type不为None
if exc_type:
print(f"发生异常: {exc_type.__name__}: {exc_val}")
# 返回True表示异常已处理,False或None会让异常继续传播
return False
def execute_query(self, query):
print(f"执行查询: {query}")
return f"Result of: {query}"
# 使用示例
with DatabaseConnection("my_database") as db:
result = db.execute_query("SELECT * FROM users")
print(result)
2.2 基于生成器的实现(使用contextlib)
python
from contextlib import contextmanager
import time
@contextmanager
def timer_context(name):
"""计时上下文管理器"""
start_time = time.time()
print(f"[{name}] 开始执行")
try:
yield # 在此处执行代码块
finally:
end_time = time.time()
print(f"[{name}] 执行结束,耗时: {end_time - start_time:.4f}秒")
@contextmanager
def change_directory(path):
"""临时切换工作目录"""
import os
original_path = os.getcwd()
print(f"当前目录: {original_path}")
try:
os.chdir(path)
print(f"切换到: {path}")
yield
finally:
os.chdir(original_path)
print(f"切换回: {original_path}")
# 使用示例
with timer_context("计算任务"):
total = 0
for i in range(1000000):
total += i
print(f"计算结果: {total}")
3. 实际应用示例
python
import sqlite3
from contextlib import contextmanager
@contextmanager
def get_db_connection(db_path):
"""数据库连接上下文管理器"""
conn = None
try:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row # 返回字典格式
yield conn
except sqlite3.Error as e:
print(f"数据库错误: {e}")
raise
finally:
if conn:
conn.close()
# 使用示例
with get_db_connection("test.db") as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE
)
""")
conn.commit()
# 嵌套上下文管理器
@contextmanager
def transaction(conn):
"""事务处理上下文管理器"""
try:
yield
conn.commit()
print("事务提交成功")
except Exception as e:
conn.rollback()
print(f"事务回滚,原因: {e}")
raise
# 综合使用
with get_db_connection("test.db") as conn:
with transaction(conn):
cursor = conn.cursor()
cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)",
("张三", "zhangsan@example.com"))
cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)",
("李四", "lisi@example.com"))
4. 高级上下文管理器
python
from contextlib import ContextDecorator
import threading
class LockContext(ContextDecorator):
"""线程锁上下文管理器,也可用作装饰器"""
def __init__(self, lock_name="default"):
self.lock_name = lock_name
self.lock = threading.Lock()
def __enter__(self):
print(f"获取锁: {self.lock_name}")
self.lock.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.lock.release()
print(f"释放锁: {self.lock_name}")
return False
# 作为上下文管理器使用
lock = LockContext("resource_lock")
def process_data(data):
with lock:
print(f"处理数据: {data}")
time.sleep(0.1)
# 作为装饰器使用
@LockContext("decorator_lock")
def critical_function():
print("执行关键操作")
time.sleep(0.2)
# 测试
threads = []
for i in range(5):
t = threading.Thread(target=process_data, args=(f"数据{i}",))
threads.append(t)
t.start()
for t in threads:
t.join()
critical_function()
第二部分:生成器与迭代器
1. 迭代器基础
迭代器是实现了迭代器协议(__iter__和__next__方法)的对象。
python
class CountDown:
"""倒计时迭代器"""
def __init__(self, start):
self.current = start
def __iter__(self):
return self
def __next__(self):
if self.current <= 0:
raise StopIteration
value = self.current
self.current -= 1
return value
# 使用迭代器
countdown = CountDown(5)
for num in countdown:
print(f"倒计时: {num}")
# 手动使用迭代器
countdown2 = CountDown(3)
while True:
try:
print(f"手动迭代: {next(countdown2)}")
except StopIteration:
print("迭代结束")
break
2. 生成器基础
生成器是一种特殊的迭代器,使用yield关键字创建。
python
def simple_generator():
"""简单生成器"""
print("开始")
yield 1
print("继续")
yield 2
print("结束")
yield 3
# 使用生成器
gen = simple_generator()
print(next(gen)) # 输出: 开始, 然后 1
print(next(gen)) # 输出: 继续, 然后 2
print(next(gen)) # 输出: 结束, 然后 3
# 使用for循环
for value in simple_generator():
print(f"值: {value}")
3. 生成器表达式
python
# 生成器表达式(惰性求值)
squares = (x**2 for x in range(10))
print(list(squares)) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# 与列表推导式的区别
list_comp = [x**2 for x in range(10)] # 立即计算,占用内存
gen_exp = (x**2 for x in range(10)) # 惰性计算,节省内存
# 大文件读取示例
def read_large_file(file_path):
"""逐行读取大文件"""
with open(file_path, 'r', encoding='utf-8') as file:
for line in file:
yield line.strip()
# 使用示例
# for line in read_large_file("huge_file.txt"):
# process(line)
4. 生成器的高级用法
python
def fibonacci_generator(n):
"""生成斐波那契数列"""
a, b = 0, 1
count = 0
while count < n:
yield a
a, b = b, a + b
count += 1
def generator_with_send():
"""支持send方法的生成器"""
print("开始")
x = yield "第一步"
print(f"收到: {x}")
y = yield "第二步"
print(f"收到: {y}")
yield "结束"
# 使用send方法
gen = generator_with_send()
print(next(gen)) # 输出: 开始, 然后 "第一步"
print(gen.send(100)) # 输出: 收到: 100, 然后 "第二步"
print(gen.send(200)) # 输出: 收到: 200, 然后 "结束"
# 无限生成器
def infinite_counter():
"""无限计数器"""
count = 0
while True:
yield count
count += 1
# 使用islice限制无限生成器
from itertools import islice
counter = infinite_counter()
first_10 = list(islice(counter, 10))
print(f"前10个: {first_10}")
5. yield from 语法
python
def chain_generators():
"""连接多个生成器"""
yield from range(3)
yield from ['a', 'b', 'c']
yield from (x**2 for x in range(3))
print(list(chain_generators())) # [0, 1, 2, 'a', 'b', 'c', 0, 1, 4]
def flatten(nested_list):
"""展平嵌套列表"""
for item in nested_list:
if isinstance(item, list):
yield from flatten(item)
else:
yield item
nested = [1, [2, [3, 4], 5], 6, [7, 8]]
print(list(flatten(nested))) # [1, 2, 3, 4, 5, 6, 7, 8]
6. 协程与生成器
python
def coroutine_example():
"""简单的协程示例"""
while True:
received = yield
print(f"收到: {received}")
# 创建协程
coro = coroutine_example()
next(coro) # 启动协程
coro.send("Hello")
coro.send("World")
def averager():
"""计算移动平均值的协程"""
total = 0
count = 0
average = None
while True:
value = yield average
if value is None:
break
total += value
count += 1
average = total / count
return (count, average) if count > 0 else (0, 0)
# 使用协程
avg = averager()
next(avg) # 启动
print(avg.send(10)) # 10.0
print(avg.send(20)) # 15.0
print(avg.send(30)) # 20.0
try:
avg.send(None) # 停止协程
except StopIteration as e:
print(f"最终结果: {e.value}") # (3, 20.0)
第三部分:综合实战
实战1:数据处理管道
python
def read_file(filepath):
"""读取文件生成器"""
with open(filepath, 'r') as file:
for line in file:
yield line.strip()
def filter_comments(lines):
"""过滤注释行"""
for line in lines:
if not line.startswith('#'):
yield line
def parse_numbers(lines):
"""解析数字"""
for line in lines:
try:
yield int(line)
except ValueError:
continue
def running_average(numbers):
"""计算移动平均值"""
total = 0
count = 0
for num in numbers:
total += num
count += 1
yield total / count
# 构建数据处理管道
def process_data_pipeline(filepath):
pipeline = running_average(
parse_numbers(
filter_comments(
read_file(filepath)
)
)
)
return pipeline
# 使用示例
# 假设data.txt内容:
# 10
# # 注释行
# 20
# 30
# 40
pipeline = process_data_pipeline("data.txt")
for avg in pipeline:
print(f"移动平均值: {avg:.2f}")
实战2:分页数据获取器
python
import time
class PaginatedDataFetcher:
"""分页数据获取器"""
def __init__(self, page_size=10):
self.page_size = page_size
self.current_page = 0
self.total_items = 100 # 模拟总数据量
def fetch_page(self, page):
"""获取单页数据(模拟)"""
time.sleep(0.1) # 模拟网络延迟
start = page * self.page_size
end = min(start + self.page_size, self.total_items)
if start >= self.total_items:
return []
return list(range(start, end))
def __iter__(self):
return self.generator()
def generator(self):
"""生成器实现"""
while True:
data = self.fetch_page(self.current_page)
if not data:
break
yield from data
self.current_page += 1
# 使用示例
fetcher = PaginatedDataFetcher(page_size=5)
for i, item in enumerate(fetcher, 1):
print(f"项目 {i}: {item}")
if i >= 20: # 只取前20个
break
实战3:事件流处理器
python
import random
import time
from collections import deque
class EventStream:
"""事件流生成器"""
def __init__(self):
self.event_types = ['click', 'view', 'purchase', 'login', 'logout']
def generate_events(self, count=100):
"""生成事件"""
for i in range(count):
event = {
'id': i,
'type': random.choice(self.event_types),
'timestamp': time.time(),
'data': {'value': random.randint(1, 100)}
}
yield event
time.sleep(0.01) # 模拟事件间隔
def filter_events(events, event_type):
"""过滤特定类型的事件"""
for event in events:
if event['type'] == event_type:
yield event
def batch_events(events, batch_size=10):
"""批量处理事件"""
batch = []
for event in events:
batch.append(event)
if len(batch) >= batch_size:
yield batch
batch = []
if batch:
yield batch
def process_batches(batches):
"""处理批次数据"""
for batch in batches:
# 模拟处理逻辑
types = [e['type'] for e in batch]
total_value = sum(e['data']['value'] for e in batch)
yield {
'batch_size': len(batch),
'event_types': set(types),
'total_value': total_value,
'avg_value': total_value / len(batch)
}
# 构建事件处理管道
def create_event_pipeline(event_type=None, batch_size=5):
stream = EventStream()
events = stream.generate_events(50)
if event_type:
events = filter_events(events, event_type)
batches = batch_events(events, batch_size)
results = process_batches(batches)
return results
# 使用示例
print("处理所有事件:")
for result in create_event_pipeline():
print(f"批次结果: {result}")
print("\n只处理点击事件:")
for result in create_event_pipeline(event_type='click'):
print(f"点击事件批次: {result}")
实战4:上下文管理器与生成器的结合
python
from contextlib import contextmanager
import threading
import queue
@contextmanager
def threaded_generator(generator_func, *args, maxsize=10, **kwargs):
"""
在独立线程中运行生成器的上下文管理器
用于计算密集型生成器
"""
q = queue.Queue(maxsize=maxsize)
exception = None
def worker():
"""工作线程函数"""
try:
for item in generator_func(*args, **kwargs):
q.put(item)
q.put(None) # 结束信号
except Exception as e:
q.put(e) # 传递异常
# 启动工作线程
thread = threading.Thread(target=worker)
thread.daemon = True
thread.start()
try:
while True:
item = q.get()
if isinstance(item, Exception):
exception = item
break
elif item is None:
break
else:
yield item
finally:
# 等待线程结束
thread.join(timeout=1.0)
if exception:
raise exception
# 计算密集型的生成器
def heavy_computation_generator(n):
"""模拟计算密集型生成器"""
import math
for i in range(n):
# 模拟复杂计算
result = sum(math.sin(x) * math.cos(x) for x in range(10000))
yield (i, result)
# 使用示例
print("开始处理(在主线程中消费,在后台线程中计算):")
with threaded_generator(heavy_computation_generator, 10) as gen:
for i, value in gen:
print(f"结果 {i}: {value:.6f}")
练习作业
作业1:实现一个支持重试机制的上下文管理器
要求:
可以设置最大重试次数
可以设置重试延迟
可以指定需要重试的异常类型
记录重试次数和最终结果
作业2:实现一个目录树遍历生成器
要求:
支持深度优先和广度优先遍历
可以过滤特定文件类型
可以限制遍历深度
返回文件路径和文件信息
作业3:实现一个数据流窗口生成器
要求:
从无限数据流中生成固定大小的窗口
支持滑动窗口和跳跃窗口
可以设置窗口大小和滑动步长
处理数据流结束的情况
作业4:综合项目 - 简易ETL管道
要求:
从CSV文件读取数据
使用生成器进行数据清洗和转换
使用上下文管理器处理数据库连接
将处理后的数据写入数据库
支持错误处理和日志记录
学习要点总结
上下文管理器核心概念:
__enter__和__exit__方法
使用contextlib简化实现
异常处理和资源清理
生成器核心概念:
yield关键字和生成器函数
生成器表达式
yield from语法
协程与生成器的关系
迭代器协议:
__iter__和__next__方法
StopIteration异常
可迭代对象与迭代器的区别
实际应用场景:
大文件处理
数据流处理
异步任务处理
资源管理
最佳实践:
使用生成器处理大数据集
使用上下文管理器确保资源清理
构建可组合的数据处理管道
合理使用惰性求值
常见问题解答
Q: 生成器和列表有什么区别?
A: 生成器是惰性求值的,一次只产生一个值,节省内存;列表是立即求值的,占用内存存储所有值。
Q: 什么时候应该使用上下文管理器?
A: 当需要确保资源被正确清理时,如文件操作、数据库连接、锁管理等。
Q: yield和return有什么区别?
A: return结束函数执行并返回值;yield暂停函数执行并返回值,下次从暂停处继续执行。
Q: 如何调试生成器?
A: 可以使用inspect.getgeneratorstate()查看生成器状态,或添加日志输出。
下一课预告
第五课将学习:
异步编程基础(asyncio)
async/await语法
异步上下文管理器
异步生成器
并发与并行编程
将本文的Word文档下载到电脑
推荐度: