分布式事务的21种武器 - 5

news/2024/10/30 13:35:04/

在分布式系统中,事务的处理分布在不同组件、服务中,因此分布式事务的ACID保障面临着一些特殊难点。本系列文章介绍了21种分布式事务设计模式,并分析其实现原理和优缺点,在面对具体分布式事务问题时,可以选择合适的模式进行处理。原文: Exploring Solutions for Distributed Transactions (5)

Duncan Meyer @Unsplash
Duncan Meyer @Unsplash

在不同业务场景下,可以有不同的解决方案,常见方法有:

  1. 阻塞重试(Blocking Retry)
  2. 二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
  3. 基于后台队列的异步处理(Using Queues to Process Asynchronously in the Background)
  4. TCC补偿(TCC Compensation Matters)
  5. 本地消息表(异步保证)/发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
  6. MQ事务(MQ Transaction)
  7. Saga模式(Saga Pattern)
  8. 事件驱动(Event Sourcing)
  9. 命令查询职责分离(Command Query Responsibility Segregation, CQRS)
  10. 原子提交(Atomic Commitment)
  11. 并行提交(Parallel Commits)
  12. 事务复制(Transactional Replication)
  13. 一致性算法(Consensus Algorithms)
  14. 时间戳排序(Timestamp Ordering)
  15. 乐观并发控制(Optimistic Concurrency Control)
  16. 拜占庭容错(Byzantine Fault Tolerance, BFT)
  17. 分布式锁(Distributed Locking)
  18. 分片(Sharding)
  19. 多版本并发控制(Multi-Version Concurrency Control, MVCC)
  20. 分布式快照(Distributed Snapshots)
  21. 主从复制(Leader-Follower Replication)

本文将介绍一致性算法、时间戳排序以及乐观并发控制三种模式。

13. 一致性算法(Consensus Algorithms)
图片来源: https://www.baeldung.com/cs/consensus-algorithms-distributed-systems
图片来源: https://www.baeldung.com/cs/consensus-algorithms-distributed-systems
  • 系统中的所有节点都同意最终结果或决策。
  • 涉及如下步骤:
    1. 提议(Proposal) —— 一个节点向系统中的其他节点提出一个值
    2. 广播(Broadcast) —— 将建议的值广播到系统中所有节点
    3. 确认(Acknowledgment) —— 每个节点确认提议并向提议者返回确认消息
    4. 决定(Decision) —— 一旦提议者收到大多数节点的确认,就可以对提议的值做出决定
    5. 承诺(Commitment) —— 系统中所有节点提交决定
  • Paxos算法还包括以下几个步骤:
    1. 准备阶段(Prepare phase) —— 提议者选定提案,并向系统中的大多数节点发送准备请求
    2. 承诺阶段(Promise phase) —— 如果某个节点收到的准备消息序列号高于之前看到的序列号,将以承诺响应,并不再接受任何低于该序列号的提案
    3. 接受阶段(Accept phase) —— 如果提议者收到大多数节点的承诺,j就向大多数节点发送带有提案序列号的"接受"请求
    4. 已接受阶段(Accepted phase) —— 如果某个节点收到的接受消息序列号高于以前见过的任何接受请求序列号,就接受该提案,并向提议者和所有其他节点发送接受消息
    5. 学习阶段(Learn phase) —— 一旦提议者从大多数节点接收到被接受的消息,就可以提交提议。
from typing import List, Tuple

class PaxosNode:
    def __init__(self, node_id: int, nodes: List[int]):
        self.node_id = node_id
        self.nodes = nodes
        self.state = "proposed"
        self.proposed_value = None
        self.accepted_value = None
        self.accepted_round = -1
    
    def run_paxos(self, value: int) -> int:
        while True:
            if self.state == "proposed":
                self.proposed_value = value
                self.state = "prepare"
            
            if self.state == "prepare":
                max_round, max_val = self.prepare()
                if max_val is None:
                    self.state = "accept"
                else:
                    self.state = "proposed"
            
            if self.state == "accept":
                self.accepted_value = self.proposed_value
                self.accepted_round = max_round
                self.send_accept()
                self.state = "decided"
            
            if self.state == "decided":
                return self.accepted_value
    
    def prepare(self) -> Tuple[int, int]:
        max_round = -1
        max_val = None
        for node in self.nodes:
            round, val = node.receive_prepare()
            if round > max_round:
                max_round = round
                max_val = val
        
        return max_round, max_val
    
    def send_prepare(self, round: int):
        for node in self.nodes:
            node.receive_prepare_request(round, self.node_id)
    
    def receive_prepare_request(self, round: int, sender_id: int):
        if round > self.accepted_round:
            self.accepted_round = round
            self.send_prepare(round)
    
    def receive_prepare(self) -> Tuple[int, int]:
        return self.accepted_round, self.accepted_value
    
    def send_accept(self):
        for node in self.nodes:
            node.receive_accept_request(self.accepted_round, self.accepted_value)
    
    def receive_accept_request(self, round: int, value: int):
        if round >= self.accepted_round:
            self.accepted_round = round
            self.accepted_value = value
            self.send_accepted()
    
    def send_accepted(self):
        for node in self.nodes:
            node.receive_accepted(self.accepted_round, self.accepted_value)
    
    def receive_accepted(self, round: int, value: int):
        if round == self.accepted_round:
            self.proposed_value = value

示例代码

  • 示例代码为Paxos算法的一种实现,即使存在网络故障或其他可能发生的故障,也能在一组节点之间就某个值达成共识。
  • PaxosNode类表示分布式系统中参与Paxos算法的节点。
  • 构造函数接受2个参数, node_id是节点的ID, nodes是系统中所有节点的ID列表。
  • run_paxos方法运行Paxos算法,接受一个值作为输入,并返回系统中所有节点都同意的值。该方法将无限循环执行,直到商定一个值为止。
  • prepare方法向系统中所有节点发送"prepare"消息,并等待响应。该方法返回系统中其他节点可以接受的最大整数值,如果不接受任何值,则返回 None
  • send_prepare方法,当节点想要向系统中的其他节点发送"prepare"消息时,调用该方法。
  • receive_prepare_request方法,当一个节点从另一个节点接收到"prepare"消息时,调用该方法。如果"prepare"消息携带的整数大于该节点接受的整数,则该节点更新其接受的整数,并向系统中其他节点发送"prepare"消息。
  • receive_prepare方法,当节点接收到对"prepare"消息的响应时,调用该方法,返回可接受的整数值。
  • send_accept方法,当节点想要向系统中的其他节点发送"accept"消息时,调用该方法。
  • receive_accept_request方法,当节点接收到来自另一个节点的"accept"消息时,调用该方法。如果"accept"消息携带的整数大于或等于该节点已接受的整数,则该节点更新其接受的整数值,并向系统中其他节点发送"accepted"消息。
  • send_accepted方法,当节点想要向系统中其他节点发送"accepted”消息时,调用该方法。
  • receive_accepted方法,当节点从另一个节点接收到"accepted"消息时,调用该方法。如果"accepted"消息的整数与节点已接受的整数相同,则节点使用"accepted"消息中的值更新其建议值。

优点

  • 所有节点都同意系统的状态
  • 可以容忍某些节点的故障,即使某些节点发生故障,系统也可以继续运行

缺点

  • 算法比较复杂,难以实现
  • 算法执行较慢,可能导致系统延迟增加
  • 算法需要在节点之间通信,增加了网络带宽和处理能力方面的开销

适用场景

  • 确保交易一致和准确的金融系统
  • 确保所有节点具有相同供应链视图的供应链管理系统

14. 时间戳排序(Timestamp Ordering)
图片来源: https://www.geeksforgeeks.org/multiversion-timestamp-ordering/
图片来源: https://www.geeksforgeeks.org/multiversion-timestamp-ordering/
  • 一种用于在分布式系统中对事务排序的共识算法。
  • 每个事务被分配一个时间戳,并且事务按照其时间戳的顺序执行。
  • 涉及如下步骤:
    1. 每个节点为接收到的事务生成唯一的时间戳,可以用全局时钟生成时间戳,也可以使用带有某种同步机制的本地时钟生成时间戳。
    2. 时间戳由(T, N)组成,其中T为时间戳值,N为生成时间戳的节点标识符。
    3. 当节点接收到新事务时,会根据之前接收到的所有事务的时间戳检查该事务的时间戳。如果新事务的时间戳比之前接收到的事务的时间戳都早,则立即执行。如果新事务的时间戳比以前接收到的事务的时间戳都要新,那么将被延迟,直到所有旧事务都执行完为止。
    4. 如果两个事务具有相同的时间戳,则使用tie-breaking机制来解决冲突。一种可能的tie-breaking机制是使用节点标识符作为判断标准,首先执行具有较低节点标识符的事务。
    5. 一旦事务被执行,结果就会传播到所有其他节点。
    6. 如果某个节点发生故障或断开网络连接,则一旦它重新加入网络,它的事务可以由另一个节点重新执行。事务可以按照时间戳顺序执行,以确保系统状态保持一致。
from typing import List, Tuple

class Timestamp:
    def __init__(self, node_id: int):
        self.node_id = node_id
        self.counter = 0

    def increment(self):
        self.counter += 1

    def __str__(self):
        return f"{self.node_id}:{self.counter}"

class Event:
    def __init__(self, node_id: int, timestamp: Timestamp, data: str):
        self.node_id = node_id
        self.timestamp = timestamp
        self.data = data

    def __str__(self):
        return f"{self.node_id} {self.timestamp} {self.data}"

class Network:
    def __init__(self, nodes: List[int]):
        self.nodes = nodes
        self.message_queues = {node: [] for node in nodes}

    def send(self, sender: int, receiver: int, message: str):
        self.message_queues[receiver].append((sender, message))

    def receive(self, node_id: int) -> Tuple[int, str]:
        if len(self.message_queues[node_id]) > 0:
            return self.message_queues[node_id].pop(0)
        else:
            return None

class Node:
    def __init__(self, node_id: int, network: Network, initial_data: List[str]):
        self.node_id = node_id
        self.network = network
        self.clock = Timestamp(node_id)
        self.queue = []
        for data in initial_data:
            self.queue.append(Event(node_id, self.clock, data))
            self.clock.increment()

    def run(self):
        while True:
            event = self.queue.pop(0)
            print(f"Node {self.node_id} executing event {event}")
            self.clock = max(self.clock, event.timestamp)  # Update local clock

示例代码

  • 实现分布式系统的时间戳排序算法
  • 定义了几个类和方法来实现时间戳排序算法的模拟
  • Timestamp类表示时间戳,由 node_idcounter组成
  • Event类表示分布式系统中的事件。事件由 node_id(生成事件的节点ID)、 timestamp(事件的时间戳)和 data(事件的有效负载)组成。
  • Network类表示连接分布式系统中节点的网络
  • Node类代表节点,由 node_id(节点ID)、 network(所连接的网络)、 clock(本地时间戳)和 queue(节点已经生成并等待执行的事件列表)组成。
  • increment()方法增加时间戳的计数器值
  • __str__()方法以" node_id:counter"的格式返回时间戳的字符串表示形式。
  • __str__()方法以" node_id:timestamp data"的格式返回事件的字符串表示形式。
  • send()方法将消息从一个节点发送到另一个节点
  • receive()方法接收给定节点队列中的下一条消息,如果队列中没有消息,则返回 None
  • run()方法是节点的主循环,实现为无限循环,按时间戳顺序执行队列中的事件。当某个事件被执行时,节点将其本地时钟更新为其当前时间戳和被执行事件时间戳的最大值。
  • nodes属性是系统中节点的id列表
  • message_queues属性将每个节点ID映射到字典

优点

  • 所有事件都是有序的,并且在系统中所有节点上以一致的顺序发生
  • 解决并发事件之间的冲突

缺点

  • 过程复杂且资源密集
  • 时钟同步、一致性等问题

适用场景

  • 金融系统: 交易按照正确顺序进行处理,而不管来自哪个节点
  • 供应链管理系统: 跟踪货物流动,确保所有事件都按照正确的顺序处理

15. 乐观并发控制(Optimistic Concurrency Control)
  • 一种在数据库管理系统(DBMS)中用于处理并发访问数据的技术
  • 主要没有冲突,可以允许多个事务并发修改相同的数据项
  • 涉及如下步骤:
    • 开始事务(T1) —— 当事务开始时,读取需要的数据库记录,并将读取的版本记录在其私有工作区中。
    • 修改数据 —— 当事务修改数据时,将修改记录在其私有工作区中,而不更新实际的数据库记录。
    • 事务结束(T1) —— 当事务准备提交时,检查是否有其他事务在读取数据后修改了相同的数据。另外,将其私有工作区中的数据版本与数据库中数据的当前版本进行比较。
    • 验证检查 —— 如果数据库中数据的当前版本与T1读取的版本相同,则T1可以将其更改提交到数据库。但是,如果数据的当前版本与T1读取的版本不同,则意味着T1的更改与另一个事务的更改发生了冲突。
    • 回滚 —— 如果T1的更改与另一个事务的更改发生了冲突,那么T1必须中止并回滚其更改。T1可以在延迟后重试事务或采取其他适当操作。
    • 提交 —— 如果T1的更改不与任何其他事务的更改冲突,那么T1可以将其更改提交到数据库,用其私有工作区中所做的修改来更新数据库记录。
from typing import List

class Account:
    def __init__(self, id: int, balance: float):
        self.id = id
        self.balance = balance
        self.version = 0

    def withdraw(self, amount: float):
        self.balance -= amount
        self.version += 1

    def deposit(self, amount: float):
        self.balance += amount
        self.version += 1

class OptimisticConcurrencyControl:
    def __init__(self, accounts: List[Account]):
        self.accounts = accounts

    def transfer(self, sender_id: int, receiver_id: int, amount: float):
        # Find sender and receiver accounts
        sender = next(acc for acc in self.accounts if acc.id == sender_id)
        receiver = next(acc for acc in self.accounts if acc.id == receiver_id)

        # Create copies of the accounts to modify
        sender_copy = Account(sender.id, sender.balance)
        receiver_copy = Account(receiver.id, receiver.balance)

        # Withdraw from sender and deposit to receiver
        sender_copy.withdraw(amount)
        receiver_copy.deposit(amount)

        # Update the global accounts list if there are no conflicts
        for i, acc in enumerate(self.accounts):
            if acc.id == sender_id:
                if acc.version != sender.version:
                    raise Exception("Optimistic Concurrency Control failed")
                self.accounts[i] = sender_copy
            elif acc.id == receiver_id:
                if acc.version != receiver.version:
                    raise Exception("Optimistic Concurrency Control failed")
                self.accounts[i] = receiver_copy

示例代码

  • Account类表示包含ID、余额和版本号的银行帐户,版本号用于跟踪帐户更新的次数。
  • OptimisticConcurrencyControl类将帐户列表作为输入。
  • transfer方法的输入为发送方和接收方账户id以及要转账的金额。
  • withdrawdeposit方法,修改帐户余额并增加版本号。
  • 将全局帐号列表中帐号的版本号与发送方、接收方帐号的版本号进行比较,检查更新帐号是否存在冲突,如果存在冲突就会引发异常。如果没有冲突,用修改后的帐户更新全局帐户列表。在发生冲突的情况下,可以重试事务,或者通知用户手动解决冲突。

优点

  • 不需要锁或复杂的数据结构
  • 事务可以同时修改相同的数据,因此可以支持高并发性
  • 事务不需要等待锁被释放

缺点

  • 如果多个事务频繁修改相同的数据项,将造成大量修改失败并回滚
  • 需要额外的处理开销来比较数据版本并检查冲突
  • 不提供任何关于事务提交顺序的保证

适用场景

  • 电子商务应用 —— 大量用户同时访问同一个数据库
  • 银行和金融应用 —— 大量交易同时发生

挑战

  • 不正确的实现将导致数据不一致
  • 需要处理冲突和回滚的机制,会增加应用程序的复杂性

参考文献

What is a consensus algorithm?

Consensus Algorithms in Blockchain

How Many Consensus Algorithms Are There? An Overview

Analysis of the Blockchain Consensus Algorithms

Consensus Algorithms Distributed Systems

Multiversion Timestamp Ordering

DBMS Timestamp Ordering Protocol

Timestamp-based Concurrency Control

Timestamp Ordering Protocol in DBMS

Timestamp-based Ordering Protocol in DBMS

What is an optimistic concurrency control in DBMS

Optimistic vs Pessimistic Concurrency: What Every Developer Should Know

Dealing with Optimistic Concurrency Control Collisions


你好,我是俞凡,在Motorola做过研发,现在在Mavenir做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。
微信公众号:DeepNoMind

- END -

本文由 mdnice 多平台发布


http://www.ppmy.cn/news/88951.html

相关文章

【CloudCompare教程】008:基于点云的三维模型重建(泊松重建)

本文讲述基于点云的三维模型重建方法,PoissonRecon是“Poisson Surface Reconstruction”的缩写,它是由约翰霍普金斯大学的Misha Kazhdan47提出的三角形网格生成算法的简单接口。 文章目录 一、加载点云二、计算法向量三、泊松建模四、利用输出密度一、加载点云 加载兔子点云…

使用亚马逊云科技Amazon VPC Lattice简化服务间的连接、安全和监控

在亚马逊云科技re:Invent 2022中,亚马逊云科技介绍了Amazon VPC Lattice预览版,这是Amazon Virtual Private Cloud(Amazon VPC)的一项新功能,可通过一致的方式连接、保护和监控服务之间的通信。借助Amazon VPC Lattice…

白银实时价格应该在最适合的地方下注

小时候我们看战争片,总是发现主角们带兵打仗,战无不胜,偶尔有一场大的失利,但是总是能耐化险为夷,逢凶化吉,甚至最后成功反扑、反败为胜。后来小编一琢磨,发现,其实这些将才们打仗&a…

ESD防静电监控系统后台实时掌控现场静电防护情况

当静电积累到一定程度时,它可能会产生电击,从而对工人造成伤害。因此,工厂应该采取必要的预防措施,如提供防静电鞋和衣服,以保护工人免受静电伤害。 ESD防静电监控系统实现工业4.0技术要求,ESD物联技术稳定…

Talk预告 | ICML‘23 Oral 字节跳动 AI Lab 研究员郑在翔:人工智能如何助力蛋白质设计?

本期为TechBeat人工智能社区第500期线上Talk! 北京时间5月25日(周四)20:00,字节跳动 AI Lab 研究员 — 郑在翔的Talk将准时在TechBeat人工智能社区开播! 他与大家分享的主题是: “人工智能如何助力蛋白质设计 ”,届时将介绍基于…

发力电商培训 淘宝天下小二助力品牌商家成长

过去一年,平台各种玩法层出不穷,使得不少商家大呼有心无力,感觉什么都要学,但又不知从哪里入手为好……为帮助更多商家在2023年找对方向,突破店铺运营瓶颈,淘宝天下基于多年行业深耕经验,用可复…

人工智能学习07--pytorch18--目标检测:Faster RCNN源码解析(pytorch)

参考博客: https://blog.csdn.net/weixin_46676835/article/details/130175898 VOC2012 1、代码的使用 查看pytorch中的faster-rcnn源码: 在pytorch中导入: import torchvision.models.detection.faster_rcnn即可找到faster rcnn所实现的源…

【leetcode】!longest substring without repeating chars

参考资料:《剑指offer》,《程序员代码面试指南》 思路: 对每一个位置str[i]来说,找它的以str[i]为end、最长、无重复字符的子串 的过程 相当于 尽可能以str[i]为end, 向左扩, 直至扩到 以str[i-1]为end、最…