性能优化原则
AI生成声明: 本文档由AI辅助生成,旨在提供性能优化原则的基础知识和实践指南。
🎯 学习目标
通过本章节的学习,你将能够:
- 理解性能优化的基本原则
- 掌握常见的性能优化方法
- 了解性能优化的陷阱和注意事项
- 学习性能优化的实践技巧
📚 性能优化原则
1. 测量优先原则
在优化之前,必须先测量和定位问题。
python
import cProfile
import pstats
from io import StringIO
def profile_function(func, *args, **kwargs):
"""性能分析装饰器"""
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
s = StringIO()
ps = pstats.Stats(profiler, stream=s)
ps.sort_stats('cumulative')
ps.print_stats(10) # 打印前10个最耗时的函数
print(s.getvalue())
return result
return wrapper
# 使用示例
@profile_function
def slow_function():
total = 0
for i in range(1000000):
total += i * i
return total
slow_function()2. 瓶颈识别原则
找到真正的性能瓶颈,避免优化非关键路径。
python
from line_profiler import LineProfiler
def identify_bottleneck(func, *args, **kwargs):
"""识别性能瓶颈"""
profiler = LineProfiler()
profiler.add_function(func)
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
profiler.print_stats()
return result3. 80/20原则
80%的性能问题来自20%的代码,重点优化热点代码。
4. 分层优化原则
应用层优化(业务逻辑)
↓
算法层优化(算法和数据结构)
↓
代码层优化(代码实现)
↓
系统层优化(操作系统配置)
↓
硬件层优化(硬件升级)🔍 常见优化方法
1. 算法优化
时间复杂度优化
python
# 优化前: O(n²)
def find_duplicates_slow(arr):
duplicates = []
for i in range(len(arr)):
for j in range(i + 1, len(arr)):
if arr[i] == arr[j]:
duplicates.append(arr[i])
return duplicates
# 优化后: O(n)
def find_duplicates_fast(arr):
seen = set()
duplicates = []
for item in arr:
if item in seen:
duplicates.append(item)
else:
seen.add(item)
return duplicates空间复杂度优化
python
# 优化前: 使用额外数组
def reverse_array_slow(arr):
result = []
for i in range(len(arr) - 1, -1, -1):
result.append(arr[i])
return result
# 优化后: 原地反转
def reverse_array_fast(arr):
left, right = 0, len(arr) - 1
while left < right:
arr[left], arr[right] = arr[right], arr[left]
left += 1
right -= 1
return arr2. 数据结构优化
选择合适的数据结构
python
# 频繁查找: 使用集合而不是列表
# 优化前
def check_membership_slow(item, items_list):
return item in items_list # O(n)
# 优化后
def check_membership_fast(item, items_set):
return item in items_set # O(1)
# 频繁插入删除: 使用双端队列
from collections import deque
# 优化前
items = [] # 列表插入/删除首元素是O(n)
items.insert(0, item) # 慢
# 优化后
items = deque() # 双端队列插入/删除首元素是O(1)
items.appendleft(item) # 快3. 缓存优化
python
from functools import lru_cache
import time
# 使用装饰器缓存
@lru_cache(maxsize=128)
def expensive_function(n):
"""昂贵的计算函数"""
time.sleep(0.1) # 模拟耗时操作
return n * n
# 使用字典缓存
class Cache:
def __init__(self):
self.cache = {}
self.access_count = {}
def get(self, key):
if key in self.cache:
self.access_count[key] = self.access_count.get(key, 0) + 1
return self.cache[key]
return None
def set(self, key, value):
self.cache[key] = value
self.access_count[key] = 0
def evict_lru(self):
"""LRU淘汰策略"""
if not self.cache:
return
# 找到最少使用的key
lru_key = min(self.access_count.items(), key=lambda x: x[1])[0]
del self.cache[lru_key]
del self.access_count[lru_key]4. 并发优化
python
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
# 异步I/O优化
async def fetch_urls_async(urls):
"""异步获取多个URL"""
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [await r.text() for r in responses]
# 多线程优化
def process_items_parallel(items, func, max_workers=4):
"""并行处理多个项目"""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(func, items))
return results5. 内存优化
python
import sys
from array import array
# 使用更紧凑的数据类型
# 优化前
numbers = [1, 2, 3, 4, 5] # Python列表,每个元素是对象
print(sys.getsizeof(numbers)) # 较大
# 优化后
numbers = array('i', [1, 2, 3, 4, 5]) # C数组,更紧凑
print(sys.getsizeof(numbers)) # 较小
# 生成器优化内存
# 优化前
def get_numbers_list(n):
return [i * i for i in range(n)] # 生成完整列表
# 优化后
def get_numbers_generator(n):
for i in range(n):
yield i * i # 按需生成,节省内存6. I/O优化
python
# 批量I/O操作
# 优化前
def write_items_slow(items, filename):
with open(filename, 'w') as f:
for item in items:
f.write(str(item) + '\n') # 多次I/O
# 优化后
def write_items_fast(items, filename):
with open(filename, 'w') as f:
content = '\n'.join(str(item) for item in items)
f.write(content) # 一次I/O
# 缓冲优化
import io
def process_file_buffered(filename):
"""使用缓冲区处理文件"""
buffer_size = 8192 # 8KB缓冲区
with open(filename, 'rb', buffering=buffer_size) as f:
while True:
chunk = f.read(buffer_size)
if not chunk:
break
process_chunk(chunk)⚠️ 优化陷阱
1. 过早优化
python
# ❌ 不好的做法: 过早优化
def calculate_total(numbers):
# 使用复杂的数据结构优化,但实际上数据量很小
from collections import Counter
counter = Counter(numbers)
return sum(counter.values())
# ✅ 好的做法: 先写简单版本,需要时再优化
def calculate_total(numbers):
return sum(numbers)2. 优化非关键路径
python
# ❌ 不好的做法: 优化非关键代码
def process_data(data):
# 优化了日志输出,但这不是性能瓶颈
logger.debug(f"Processing {len(data)} items")
# 真正的瓶颈在这里,但没有优化
result = []
for item in data:
result.append(expensive_operation(item))
return result
# ✅ 好的做法: 优化关键路径
def process_data(data):
logger.debug(f"Processing {len(data)} items")
# 优化关键路径: 使用列表推导式
return [expensive_operation(item) for item in data]3. 忽略可读性
python
# ❌ 不好的做法: 过度优化导致代码难读
def process(items):
return [x*2 if x%2==0 else x*3 for x in items if x>0]
# ✅ 好的做法: 平衡性能和可读性
def process(items):
result = []
for item in items:
if item > 0:
if item % 2 == 0:
result.append(item * 2)
else:
result.append(item * 3)
return result📖 推荐资源
💡 下一步
最后更新时间: 2025-01-20