Skip to content
作者:daily5am创建:-更新:-
字数:预计阅读: 分钟访问量:--

数据一致性

AI生成声明: 本文档由AI辅助生成,旨在提供数据一致性的基础知识和实践指南。

🎯 学习目标

通过本章节的学习,你将能够:

  • 理解数据一致性的概念和重要性
  • 掌握CAP理论和BASE理论
  • 了解数据一致性保证机制
  • 学习分布式事务和最终一致性

📚 数据一致性概念

一致性类型

1. 强一致性

所有节点在同一时刻看到相同的数据。

特点:

  • 数据完全一致
  • 性能影响较大
  • 可用性可能受影响

2. 弱一致性

系统不保证立即一致,但保证最终一致。

特点:

  • 性能好
  • 可能出现短暂不一致
  • 最终会达到一致

3. 最终一致性

系统保证在没有新的更新操作后,最终会达到一致状态。

特点:

  • 性能好
  • 延迟一致性
  • 适合大多数场景

一致性场景

  • 主从复制: 主库写入,从库同步
  • 分布式缓存: 缓存与数据库一致
  • 分布式存储: 多副本数据一致
  • 分布式事务: 跨服务数据一致

🔍 CAP理论

CAP定理

在一个分布式系统中,以下三个特性最多只能同时满足两个:

  • C (Consistency): 一致性
  • A (Availability): 可用性
  • P (Partition tolerance): 分区容错性

CAP组合

CP系统

保证一致性和分区容错性,牺牲可用性。

示例: 分布式数据库(如MongoDB、HBase)

AP系统

保证可用性和分区容错性,牺牲一致性。

示例: DNS系统、CDN系统

CA系统

保证一致性和可用性,不适合分布式环境。

注意: 实际分布式系统必须满足P,所以CA组合在分布式系统中不存在。

🏗️ BASE理论

BASE含义

  • BA (Basically Available): 基本可用
  • S (Soft state): 软状态
  • E (Eventual consistency): 最终一致性

BASE vs ACID

特性ACIDBASE
一致性强一致性最终一致性
可用性低(强一致导致)
事务支持不支持
性能

💾 数据一致性保证

1. 主从复制一致性

同步复制

主库等待所有从库确认后才返回成功。

sql
-- MySQL半同步复制
SET GLOBAL rpl_semi_sync_master_enabled = 1;
SET GLOBAL rpl_semi_sync_slave_enabled = 1;

优点: 数据一致性好 缺点: 性能影响,可用性降低

异步复制

主库不等待从库确认直接返回。

优点: 性能好,可用性高 缺点: 可能丢失数据

2. 分布式锁

保证同一时刻只有一个节点能修改数据。

python
import redis
import time
import uuid

class DistributedLock:
    def __init__(self, redis_client, key, timeout=10):
        self.redis = redis_client
        self.key = f"lock:{key}"
        self.timeout = timeout
        self.identifier = str(uuid.uuid4())
    
    def acquire(self, block=True, timeout=None):
        """获取锁"""
        end_time = time.time() + (timeout or self.timeout)
        
        while True:
            # 尝试获取锁
            if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
                return True
            
            if not block or time.time() > end_time:
                return False
            
            time.sleep(0.001)  # 短暂等待后重试
    
    def release(self):
        """释放锁"""
        # 使用Lua脚本保证原子性
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        self.redis.eval(lua_script, 1, self.key, self.identifier)

# 使用示例
redis_client = redis.Redis()
lock = DistributedLock(redis_client, 'user:123')

if lock.acquire():
    try:
        # 修改数据
        update_user_data('user:123')
    finally:
        lock.release()

3. 版本控制

使用版本号或时间戳保证数据一致性。

python
class VersionedData:
    def __init__(self):
        self.data = {}
        self.versions = {}
    
    def update(self, key, value, expected_version=None):
        """更新数据(带版本检查)"""
        current_version = self.versions.get(key, 0)
        
        # 乐观锁检查
        if expected_version is not None and current_version != expected_version:
            raise ValueError("版本不匹配,更新失败")
        
        # 更新数据和版本
        self.data[key] = value
        self.versions[key] = current_version + 1
        
        return self.versions[key]
    
    def get(self, key):
        """获取数据和版本"""
        return {
            'value': self.data.get(key),
            'version': self.versions.get(key, 0)
        }

# 使用示例
store = VersionedData()

# 第一次更新
version = store.update('user:123', {'name': 'Alice'})

# 读取数据
data = store.get('user:123')
current_version = data['version']

# 带版本检查的更新
try:
    store.update('user:123', {'name': 'Bob'}, expected_version=current_version)
except ValueError as e:
    print(f"更新失败: {e}")

4. 两阶段提交(2PC)

分布式事务协议,保证所有节点同时提交或回滚。

python
class TwoPhaseCommit:
    def __init__(self, participants):
        self.participants = participants
        self.transaction_id = None
    
    def prepare(self, transaction_id):
        """阶段1: 准备阶段"""
        self.transaction_id = transaction_id
        prepare_results = []
        
        for participant in self.participants:
            try:
                result = participant.prepare(transaction_id)
                prepare_results.append(result)
            except Exception as e:
                # 任一节点失败,回滚所有
                self.rollback(transaction_id)
                raise
        
        return all(prepare_results)
    
    def commit(self, transaction_id):
        """阶段2: 提交阶段"""
        if not self.prepare(transaction_id):
            return False
        
        commit_results = []
        for participant in self.participants:
            try:
                result = participant.commit(transaction_id)
                commit_results.append(result)
            except Exception as e:
                # 提交失败,尝试回滚
                self.rollback(transaction_id)
                raise
        
        return all(commit_results)
    
    def rollback(self, transaction_id):
        """回滚"""
        for participant in self.participants:
            try:
                participant.rollback(transaction_id)
            except Exception as e:
                print(f"回滚失败: {e}")

5. 最终一致性实现

消息队列保证

python
import json
from kafka import KafkaProducer, KafkaConsumer

class EventualConsistency:
    def __init__(self, kafka_producer, kafka_consumer):
        self.producer = kafka_producer
        self.consumer = kafka_consumer
    
    def update_primary(self, key, value):
        """更新主数据"""
        # 1. 更新主库
        update_database(key, value)
        
        # 2. 发送同步消息
        event = {
            'key': key,
            'value': value,
            'timestamp': time.time()
        }
        self.producer.send('data_sync', json.dumps(event).encode())
    
    def sync_to_cache(self):
        """同步到缓存"""
        for message in self.consumer:
            event = json.loads(message.value.decode())
            
            # 更新缓存
            update_cache(event['key'], event['value'])
            
            # 确认消费
            self.consumer.commit()

版本向量

python
class VersionVector:
    def __init__(self, node_id):
        self.node_id = node_id
        self.vector = {node_id: 0}
    
    def increment(self):
        """递增版本"""
        self.vector[self.node_id] = self.vector.get(self.node_id, 0) + 1
    
    def update(self, other_vector):
        """合并版本向量"""
        for node, version in other_vector.items():
            current_version = self.vector.get(node, 0)
            self.vector[node] = max(current_version, version)
    
    def compare(self, other_vector):
        """比较版本向量"""
        for node in set(self.vector.keys()) | set(other_vector.keys()):
            v1 = self.vector.get(node, 0)
            v2 = other_vector.get(node, 0)
            
            if v1 < v2:
                return -1  # 小于
            elif v1 > v2:
                return 1  # 大于
        
        return 0  # 相等

🚀 实践应用

分布式事务解决方案

Saga模式

python
class SagaOrchestrator:
    def __init__(self):
        self.steps = []
        self.compensations = []
    
    def add_step(self, action, compensation):
        """添加步骤"""
        self.steps.append(action)
        self.compensations.append(compensation)
    
    def execute(self):
        """执行Saga"""
        completed_steps = []
        
        try:
            for i, step in enumerate(self.steps):
                step()  # 执行步骤
                completed_steps.append(i)
            
            return True
        except Exception as e:
            # 补偿已完成的步骤
            for i in reversed(completed_steps):
                self.compensations[i]()
            raise e

分布式锁应用

python
# 使用分布式锁保证库存扣减一致性
def deduct_inventory(product_id, quantity):
    lock_key = f"inventory_lock:{product_id}"
    lock = DistributedLock(redis_client, lock_key, timeout=10)
    
    if lock.acquire():
        try:
            # 检查库存
            current_stock = get_inventory(product_id)
            if current_stock < quantity:
                raise ValueError("库存不足")
            
            # 扣减库存
            update_inventory(product_id, current_stock - quantity)
            return True
        finally:
            lock.release()
    else:
        raise Exception("获取锁失败")

⚠️ 注意事项

1. 性能权衡

  • 强一致性: 性能影响大
  • 最终一致性: 性能好,但需要处理不一致

2. 故障处理

  • 数据冲突: 设计冲突解决策略
  • 补偿机制: 失败后补偿处理
  • 重试机制: 失败后重试

3. 业务场景

  • 金融系统: 强一致性
  • 电商系统: 最终一致性
  • 社交系统: 弱一致性

📖 推荐资源

💡 下一步


最后更新时间: 2025-01-20