C++ 与 Raft 共识协议:在分布式存储系统内核中利用 C++ 实现高可靠日志复制状态机

张开发
2026/4/11 3:44:19 15 分钟阅读
C++ 与 Raft 共识协议:在分布式存储系统内核中利用 C++ 实现高可靠日志复制状态机
在当今高度依赖数据的世界中分布式存储系统已成为支撑现代应用不可或缺的基础设施。这些系统面临的核心挑战之一是如何在面对节点故障、网络分区等不确定性因素时依然能提供高可用性和数据一致性。Raft 共识协议正是为解决这一问题而生它提供了一种易于理解且能保证强一致性的日志复制状态机方案。本讲座将深入探讨如何在分布式存储系统的内核中利用 C 的强大能力高效、可靠地实现 Raft 共识协议构建一个高可靠的日志复制状态机。分布式系统中的共识问题与 Raft 协议的基石分布式系统中的共识Consensus问题旨在让一个集群中的所有节点就某个提议的值达成一致。在分布式存储场景下这意味着所有节点需要就操作序列即日志的顺序和内容达成一致。一旦达成共识每个节点都能按照相同的顺序执行这些操作从而维护一个复制的状态机。早期的 Paxos 协议虽然理论上完美但其复杂性使得工程实现极具挑战性。Raft 协议的出现正是为了在保证与 Paxos 相同安全性和活性的前提下极大地提升了协议的可理解性和实现难度。Raft 协议的核心思想是“首先理解然后实现”它通过精心设计的三种角色、两种 RPC 类型和严格的日志复制规则使得协议逻辑清晰明了。Raft 的核心概念与角色Raft 协议定义了三种节点角色Follower (跟随者)被动接收来自 Leader 的日志和心跳。如果长时间未收到 Leader 的消息Follower 会成为 Candidate。Candidate (候选者)在选举超时后Follower 会转换为 Candidate发起选举以争取成为新的 Leader。Leader (领导者)负责处理所有客户端请求管理日志复制并周期性地向 Follower 发送心跳以维持领导地位。一个 Raft 集群在任何时刻只有一个 Leader。Raft 协议通过一个不断递增的整数Term (任期)来逻辑地划分时间。每个 Term 都从一次选举开始成功当选的 Leader 在该 Term 内领导集群。如果选举失败或者 Leader 发现有更高 Term 的节点它将退位。Term 在 Raft 的安全性和活性中扮演着至关重要的角色。Raft 的核心 RPCsRaft 协议主要通过两种 RPC (远程过程调用) 来实现其功能RequestVote RPC (请求投票)由 Candidate 发送给其他节点用于请求投票。AppendEntries RPC (附加日志条目)由 Leader 发送给 Follower用于日志复制和心跳。这些 RPC 不仅承载数据还包含 Term 信息用于确保节点始终遵守最新 Term 的领导。日志复制状态机Raft 的核心是复制状态机Replicated State Machine。客户端请求被封装成命令Command这些命令作为日志条目Log Entry附加到 Leader 的日志中。Leader 负责将这些日志条目复制到大多数 Follower 节点上。一旦某个日志条目被大多数节点复制并持久化它就被认为是已提交 (Committed)。Leader 会通知 Follower 提交这些条目然后每个节点按序将已提交的日志条目应用到其本地状态机中从而保证所有节点的状态最终一致。为何选择 C 实现 Raft在分布式存储系统的内核中实现 Raft对性能、资源控制和稳定性有着极高的要求。C 在此场景下展现出无与伦比的优势极致性能与低延迟C 允许直接内存管理和底层硬件访问避免了垃圾回收等高级语言的运行时开销使得 Raft 协议的关键路径如日志持久化、网络通信、状态机应用能够以纳秒级的延迟执行。这对于存储系统吞吐量和响应时间至关重要。精细的资源控制在内核或接近内核的环境中对内存、CPU 和 I/O 资源的精确控制是必须的。C 提供了手动内存管理new/delete以及通过std::unique_ptr/std::shared_ptr实现智能指针既能保证性能又能兼顾安全性。跨平台兼容性与系统级编程能力C 拥有强大的系统编程能力能够直接调用操作系统 API处理文件 I/O、网络套接字、多线程与并发原语这使得它非常适合构建底层基础设施。丰富的生态系统与成熟工具链尽管 C 语言本身是低级的但其生态系统提供了大量高性能库如 Boost.Asio 用于异步网络编程Protobuf 用于高效序列化以及各种并发工具。确定性行为对于分布式共识协议确定性是关键。C 程序的行为在很大程度上是可预测的有助于调试和理解复杂的并发逻辑。Raft 协议的 C 实现架构设计一个 Raft 节点的 C 实现需要精心设计其内部结构以高效管理状态、日志、网络通信和并发。核心数据结构1. LogEntry日志条目是 Raft 协议中复制的基本单元。它包含任期、索引和客户端命令。// raft_node.h struct LogEntry { uint64_t term; // 创建该日志条目时的任期 uint64_t index; // 日志条目在日志中的索引 (从1开始) std::vectorchar command; // 客户端命令的序列化字节 // 其他元数据例如CRC校验和用于数据完整性 // 默认构造函数 LogEntry() : term(0), index(0) {} // 参数化构造函数 LogEntry(uint64_t t, uint64_t idx, const std::vectorchar cmd) : term(t), index(idx), command(cmd) {} // 序列化与反序列化方法 (可能通过 Protobuf 或自定义格式实现) std::vectorchar serialize() const { // ... 实现序列化逻辑例如将term, index, command长度和command数据打包 // 示例简单的拼接 std::vectorchar buffer; buffer.resize(sizeof(term) sizeof(index) sizeof(uint32_t) command.size()); size_t offset 0; memcpy(buffer.data() offset, term, sizeof(term)); offset sizeof(term); memcpy(buffer.data() offset, index, sizeof(index)); offset sizeof(index); uint32_t cmd_size command.size(); memcpy(buffer.data() offset, cmd_size, sizeof(cmd_size)); offset sizeof(cmd_size); if (!command.empty()) { memcpy(buffer.data() offset, command.data(), command.size()); } return buffer; } static LogEntry deserialize(const std::vectorchar buffer) { LogEntry entry; size_t offset 0; if (buffer.size() sizeof(entry.term) sizeof(entry.index) sizeof(uint32_t)) { // 错误处理缓冲区过小 return LogEntry(); } memcpy(entry.term, buffer.data() offset, sizeof(entry.term)); offset sizeof(entry.term); memcpy(entry.index, buffer.data() offset, sizeof(entry.index)); offset sizeof(entry.index); uint32_t cmd_size; memcpy(cmd_size, buffer.data() offset, sizeof(cmd_size)); offset sizeof(cmd_size); if (offset cmd_size buffer.size()) { // 错误处理命令数据大小不匹配 return LogEntry(); } entry.command.assign(buffer.data() offset, buffer.data() offset cmd_size); return entry; } };2. RaftNode 类RaftNode是核心封装了节点的所有状态、逻辑和行为。// raft_node.h #include vector #include string #include atomic #include mutex #include condition_variable #include map #include chrono #include memory #include functional // 前向声明RPC相关结构 struct RequestVoteArgs; struct RequestVoteReply; struct AppendEntriesArgs; struct AppendEntriesReply; // 节点状态 enum class NodeState { Follower, Candidate, Leader }; class RaftNode { public: RaftNode(uint64_t id, const std::vectorstd::string peer_addrs, std::functionvoid(const LogEntry) apply_log_callback); ~RaftNode(); void start(); void stop(); // 客户端接口 bool propose(const std::vectorchar command); // 提议一个新命令 // RPC 处理函数 RequestVoteReply handleRequestVote(const RequestVoteArgs args); AppendEntriesReply handleAppendEntries(const AppendEntriesArgs args); private: uint64_t node_id_; std::vectorstd::string peer_addrs_; // 存储集群中所有节点的地址 (包括自身) std::mapuint64_t, std::string peer_id_to_addr_map_; // 节点ID到地址的映射 // 持久化状态 (需要定期写入磁盘) std::atomicuint64_t current_term_; // 节点看到的最新任期 std::atomicuint64_t voted_for_; // 在当前任期投票给的候选者ID (0表示未投票) std::vectorLogEntry log_; // 日志条目集合 // 易失性状态 (所有服务器) std::atomicuint64_t commit_index_; // 已知最高的已提交日志条目索引 std::atomicuint64_t last_applied_; // 已应用到状态机的最高日志条目索引 // 易失性状态 (Leader 特有) std::mapuint64_t, uint64_t next_index_; // 对于每个 FollowerLeader 接下来要发送给它的日志条目索引 std::mapuint64_t, uint64_t match_index_; // 对于每个 FollowerLeader 已知它已复制的最高日志条目索引 // 状态机 NodeState state_; std::mutex mtx_; // 保护RaftNode内部状态的互斥锁 std::condition_variable cv_; // 用于线程间通信 // 定时器相关 std::thread timer_thread_; std::atomicbool running_; std::chrono::steady_clock::time_point last_heartbeat_time_; // 上次收到Leader心跳或有效AppendEntries的时间 std::chrono::steady_clock::time_point election_start_time_; // 当前选举的开始时间 std::chrono::milliseconds election_timeout_min_; std::chrono::milliseconds election_timeout_max_; std::chrono::milliseconds heartbeat_interval_; // 网络层接口 (抽象) std::unique_ptrRpcServer rpc_server_; std::unique_ptrRpcClientPool rpc_client_pool_; // 每个peer一个client或连接池 // 状态机应用回调 std::functionvoid(const LogEntry) apply_log_callback_; // 内部方法 void run_loop(); // Raft主循环 void become_follower(uint64_t term); void become_candidate(); void become_leader(); void reset_election_timer(); std::chrono::milliseconds get_random_election_timeout(); void send_append_entries_to_peers(bool is_heartbeat false); void send_request_vote_to_peers(); uint64_t get_last_log_index() const; uint64_t get_last_log_term() const; void persist_state(); // 持久化 current_term, voted_for, log void load_state(); // 从磁盘加载状态 // 辅助函数用于检查日志匹配 bool check_log_match(uint64_t prev_log_index, uint64_t prev_log_term) const; void apply_committed_entries(); // 将已提交的日志应用到状态机 };RPC 接口定义Raft 协议的 RPCs 需要清晰的参数和返回结构以便于序列化和网络传输。// raft_rpc_messages.h // RequestVote RPC struct RequestVoteArgs { uint64_t term; // 候选者的任期 uint64_t candidate_id; // 候选者ID uint64_t last_log_index; // 候选者最新日志条目的索引 uint64_t last_log_term; // 候选者最新日志条目的任期 // 序列化/反序列化方法 std::vectorchar serialize() const { /* ... */ return {}; } static RequestVoteArgs deserialize(const std::vectorchar buffer) { /* ... */ return {}; } }; struct RequestVoteReply { uint64_t term; // 接收者当前的任期用于候选者更新自身任期 bool vote_granted; // 如果候选者获得了投票则为 true // 序列化/反序列化方法 std::vectorchar serialize() const { /* ... */ return {}; } static RequestVoteReply deserialize(const std::vectorchar buffer) { /* ... */ return {}; } }; // AppendEntries RPC struct AppendEntriesArgs { uint64_t term; // Leader 的任期 uint64_t leader_id; // Leader 的 ID uint64_t prev_log_index; // 紧随新日志条目之前的日志条目索引 uint64_t prev_log_term; // 紧随新日志条目之前的日志条目任期 std::vectorLogEntry entries; // 待复制的日志条目 (可能为空用于心跳) uint64_t leader_commit; // Leader 已提交的最高日志条目索引 // 序列化/反序列化方法 std::vectorchar serialize() const { /* ... */ return {}; } static AppendEntriesArgs deserialize(const std::vectorchar buffer) { /* ... */ return {}; } }; struct AppendEntriesReply { uint64_t term; // 接收者当前的任期用于 Leader 更新自身任期 bool success; // 如果 Follower 包含了匹配 prev_log_index 和 prev_log_term 的条目则为 true uint64_t conflict_term; // 用于快速回溯如果 success 为 false 且日志冲突此为冲突日志的任期 uint64_t conflict_index; // 用于快速回溯如果 success 为 false 且日志冲突此为冲突日志在该任期内的第一个索引 // 序列化/反序列化方法 std::vectorchar serialize() const { /* ... */ return {}; } static AppendEntriesReply deserialize(const std::vectorchar buffer) { /* ... */ return {}; } };序列化/反序列化是网络通信的关键。在实际系统中通常会使用高性能的序列化库如 Google Protobuf、FlatBuffers 或 MessagePack。这些库能够将 C 对象高效地转换为字节流进行网络传输并反之。持久化层设计Raft 协议要求current_term、voted_for和log必须是持久化的以便在节点崩溃重启后能够恢复状态。// raft_node.cpp (部分) void RaftNode::persist_state() { // 实际实现中这里应该将 current_term_, voted_for_, log_ 写入到持久存储。 // 可以使用文件系统例如一个专门的日志文件一个元数据文件。 // 为了性能和原子性通常会采用 WAL (Write-Ahead Log) 机制。 // 例如 // 1. 创建一个临时文件 // 2. 将 current_term, voted_for 写入元数据文件 // 3. 将 log_ entries 逐条或批量写入日志文件 // 4. fsync() 确保数据写入物理磁盘 // 5. 原子性地替换旧文件 (rename) // 简化示例将状态序列化到内存实际应写入磁盘 std::vectorchar term_data(sizeof(uint64_t)); uint64_t current_term_val current_term_.load(); memcpy(term_data.data(), current_term_val, sizeof(uint64_t)); std::vectorchar voted_for_data(sizeof(uint64_t)); uint64_t voted_for_val voted_for_.load(); memcpy(voted_for_data.data(), voted_for_val, sizeof(uint64_t)); // 假设有一个持久化存储接口 // persistence_manager_-write_metadata(current_term, term_data); // persistence_manager_-write_metadata(voted_for, voted_for_data); // persistence_manager_-write_log_entries(log_); // LOG(INFO) Raft state persisted.; } void RaftNode::load_state() { // 从磁盘加载持久化状态 // 示例 // std::vectorchar term_data persistence_manager_-read_metadata(current_term); // if (!term_data.empty()) { // uint64_t current_term_val; // memcpy(current_term_val, term_data.data(), sizeof(uint64_t)); // current_term_.store(current_term_val); // } // ... 类似地加载 voted_for 和 log // 简化假定初始状态 current_term_.store(0); voted_for_.store(0); log_.clear(); // 确保日志至少包含一个虚拟的空条目索引为0任期为0 // 这有助于简化 prevLogIndex 和 prevLogTerm 的处理 log_.emplace_back(0, 0, std::vectorchar()); // LOG(INFO) Raft state loaded or initialized.; }日志的持久化通常采用分段文件和索引的方式以支持高效的随机读写和修剪例如当生成快照时。网络通信与并发模型高性能的 Raft 实现离不开高效的网络通信和并发处理。1. 网络层抽象为了解耦 Raft 核心逻辑与具体的网络库我们通常会定义一个抽象的网络接口。// rpc_interface.h class RpcServer { public: virtual ~RpcServer() default; virtual void start() 0; virtual void stop() 0; // 注册RPC处理函数例如 // void register_handler(const std::string method_name, std::functionstd::vectorchar(const std::vectorchar) handler); }; class RpcClient { public: virtual ~RpcClient() default; virtual std::vectorchar send_request(const std::string method_name, const std::vectorchar request_data, std::chrono::milliseconds timeout) 0; }; // 实际实现可以使用 Boost.Asio 或裸露的 socket API // 例如 Boost.Asio 实现的 RpcServer 和 RpcClient在RaftNode中rpc_server_负责监听传入的 RPC 请求并将其分派给handleRequestVote或handleAppendEntries方法。rpc_client_pool_负责向其他节点发送 RPC 请求。2. 并发模型Raft 协议的事件驱动性质定时器、RPC 请求决定了其并发模型。主循环线程一个专用的线程或asio::io_context运行 Raft 协议的主循环处理定时器事件选举超时、心跳并驱动状态转换。RPC 服务线程池一个线程池处理所有传入的 RPC 请求。每个请求在一个单独的线程中执行其处理逻辑。RPC 客户端线程池/异步发送对于 Leader 向 Follower 发送的AppendEntries请求可以采用异步非阻塞的方式或者使用一个小的线程池来并行发送。互斥锁与条件变量RaftNode的所有共享状态current_term_,voted_for_,log_,state_,next_index_,match_index_等都必须受到互斥锁std::mutex mtx_的保护以防止数据竞争。条件变量std::condition_variable cv_用于在不同线程之间同步例如当日志被提交时通知应用状态机线程。// raft_node.cpp (部分) void RaftNode::run_loop() { // 这是一个简化版的主循环实际可能与 Boost.Asio 的 io_context 结合 running_ true; reset_election_timer(); // 初始时作为Follower启动选举计时器 while (running_) { std::unique_lockstd::mutex lock(mtx_); // 锁定RaftNode状态 // 等待选举超时或收到外部信号 // 使用条件变量等待直到超时或被其他事件唤醒 bool timeout cv_.wait_for(lock, get_random_election_timeout(), [this]() { // Predicate: 如果收到心跳或有效AppendEntries则计时器重置 // 实际上这个逻辑会在 handleAppendEntries 中更新 last_heartbeat_time_ // 并且通过 notify_one/all 来唤醒 return !running_ || (state_ NodeState::Follower std::chrono::steady_clock::now() last_heartbeat_time_ get_random_election_timeout()); // 上述Predicate需要更精细的控制例如如果handleAppendEntries重置了计时器 // 那么run_loop应该被唤醒并重新计算等待时间。 // 简化处理wait_for直接返回超时或被通知 return !running_; // 如果running_变为false则退出循环 }); if (!running_) { break; // 停止 } switch (state_) { case NodeState::Follower: { // 如果选举超时且没有收到Leader的心跳 if (std::chrono::steady_clock::now() last_heartbeat_time_ get_random_election_timeout()) { become_candidate(); } break; } case NodeState::Candidate: { // 选举超时再次发起选举 if (std::chrono::steady_clock::now() election_start_time_ get_random_election_timeout()) { become_candidate(); // 再次转为Candidate开始新的任期和选举 } break; } case NodeState::Leader: { // Leader 定期发送心跳 if (std::chrono::steady_clock::now() last_heartbeat_time_ heartbeat_interval_) { send_append_entries_to_peers(true); // 发送心跳 (空的AppendEntries) last_heartbeat_time_ std::chrono::steady_clock::now(); } break; } } // 在实际应用中这里可能会使用更复杂的定时器管理例如 Boost.Asio 的 steady_timer // 确保每次循环都能处理RPC或定时器事件 lock.unlock(); // 解锁以便其他线程可以处理RPC请求 std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 避免CPU空转 } }实现 Raft 状态机逻辑1. Follower 状态初始化所有节点启动时都作为 Follower。选举超时如果 Follower 在选举超时时间内没有收到来自 Leader 或 Candidate 的有效AppendEntries或RequestVoteRPC它将转换为 Candidate 状态并开始一次新的选举。RPC 处理RequestVote RPC如果请求的term小于当前term拒绝投票。如果请求的term等于当前term且已经投票给其他 Candidate拒绝投票。如果请求的term大于当前term或者term相等且尚未投票或已投票给自己则检查 Candidate 的日志是否至少和自己的日志一样新。日志“一样新”的定义last_log_term更大或者last_log_term相等但last_log_index更大。如果日志足够新投票给 Candidate更新current_term和voted_for并重置选举计时器。AppendEntries RPC如果请求的term小于当前term拒绝。如果请求的term大于等于当前term则接受 Leader更新current_term并重置选举计时器。检查prev_log_index和prev_log_term是否与本地日志匹配。如果不匹配返回successfalse并提供冲突信息以帮助 Leader 快速回溯。如果匹配则删除从prev_log_index 1开始的所有冲突日志条目然后追加 Leader 发来的新日志条目。更新commit_index到leader_commit和本地last_log_index的最小值。// raft_node.cpp (handleAppendEntries 简化版) AppendEntriesReply RaftNode::handleAppendEntries(const AppendEntriesArgs args) { std::lock_guardstd::mutex lock(mtx_); AppendEntriesReply reply; reply.term current_term_.load(); reply.success false; reply.conflict_term 0; reply.conflict_index 0; // 1. 如果 Leader 的 Term 小于当前 Term拒绝 if (args.term current_term_.load()) { return reply; } // 2. 发现更高 Term 的 Leader或当前 Leader有效转换为 Follower if (args.term current_term_.load() || state_ ! NodeState::Follower) { become_follower(args.term); // 如果是新的Leader需要重置投票信息 if (args.term current_term_.load()) { voted_for_.store(0); } current_term_.store(args.term); // 更新到Leader的term reply.term current_term_.load(); } reset_election_timer(); // 收到有效心跳或日志重置选举计时器 // 3. 检查 prev_log_index 和 prev_log_term 是否匹配 // 注意log_的索引从1开始log_[0]是虚拟条目 if (args.prev_log_index log_.size() || log_[args.prev_log_index].term ! args.prev_log_term) { // 日志不匹配需要告诉 Leader 回溯 if (args.prev_log_index log_.size()) { reply.conflict_index log_.size(); // Follower 日志太短 } else { reply.conflict_term log_[args.prev_log_index].term; // 找到该冲突任期的第一个日志条目索引 uint64_t conflict_idx args.prev_log_index; while (conflict_idx 0 log_[conflict_idx - 1].term reply.conflict_term) { conflict_idx--; } reply.conflict_index conflict_idx; } return reply; } // 4. 如果有新日志条目与现有日志冲突删除冲突部分 size_t new_entries_start_idx args.prev_log_index 1; size_t num_conflicting_entries 0; for (size_t i 0; i args.entries.size(); i) { if (new_entries_start_idx i log_.size() log_[new_entries_start_idx i].term ! args.entries[i].term) { num_conflicting_entries log_.size() - (new_entries_start_idx i); log_.erase(log_.begin() new_entries_start_idx i, log_.end()); break; } } // 5. 追加 Leader 发来的新日志条目 for (const auto entry : args.entries) { // 避免重复添加已存在的日志条目 if (entry.index log_.size() || log_[entry.index].term ! entry.term) { // 这里的entry.index可能比log_.size()大需要填充中间的空洞但Raft协议保证是连续的 // 实际实现中如果 prev_log_index 1 ! log_.size()则存在不一致需要处理 // 简单处理直接push_back假定prev_log_index已正确匹配且冲突已解决 log_.push_back(entry); } } persist_state(); // 日志有变动持久化状态 // 6. 更新 commit_index if (args.leader_commit commit_index_.load()) { commit_index_.store(std::min(args.leader_commit, get_last_log_index())); cv_.notify_one(); // 通知应用状态机线程有新的日志可以应用 } reply.success true; return reply; }2. Candidate 状态转换从 Follower 状态因选举超时而转换。选举开始递增current_term。投票给自己 (voted_for node_id_)。重置选举计时器。向所有其他节点发送RequestVoteRPC。RPC 处理AppendEntries RPC如果收到来自 Leader 的AppendEntriesRPC (Term 大于等于当前 Term)立即转换回 Follower 状态。RequestVote RPC如果请求的term大于当前term则转换回 Follower 状态并处理该RequestVote。如果请求的term小于或等于当前term拒绝投票。投票统计如果收到大多数节点的投票转换为 Leader 状态。如果在选举超时时间内没有赢得选举则再次递增current_term开始新的选举。// raft_node.cpp (部分) void RaftNode::become_candidate() { state_ NodeState::Candidate; current_term_; voted_for_.store(node_id_); persist_state(); // 任期和投票信息改变持久化 election_start_time_ std::chrono::steady_clock::now(); // LOG(INFO) Node node_id_ became Candidate for Term current_term_.load(); // 启动一个新线程或使用线程池发送 RequestVote RPCs std::thread([this]() { send_request_vote_to_peers(); }).detach(); } void RaftNode::send_request_vote_to_peers() { RequestVoteArgs args; args.term current_term_.load(); args.candidate_id node_id_; args.last_log_index get_last_log_index(); args.last_log_term get_last_log_term(); std::atomicint votes_received(1); // 包含自己的投票 std::atomicbool became_leader_or_follower(false); for (const auto entry : peer_id_to_addr_map_) { uint64_t peer_id entry.first; if (peer_id node_id_) continue; // 异步发送 RequestVote RPC std::thread([this, peer_id, args, votes_received, became_leader_or_follower]() { // RpcClient::send_request 应该处理网络错误和超时 std::vectorchar reply_data rpc_client_pool_-get_client(peer_id)-send_request( RequestVote, args.serialize(), std::chrono::milliseconds(500)); if (reply_data.empty()) { // LOG(WARNING) RequestVote RPC to peer_id timed out or failed.; return; } RequestVoteReply reply RequestVoteReply::deserialize(reply_data); std::lock_guardstd::mutex lock(mtx_); if (state_ ! NodeState::Candidate) { // 可能已经变为Leader或Follower return; } if (reply.term current_term_.load()) { // 发现更高任期的节点退回Follower // LOG(INFO) Node node_id_ found higher term reply.term // from peer_id , stepping down to Follower.; become_follower(reply.term); became_leader_or_follower.store(true); cv_.notify_one(); // 唤醒主循环重新评估状态 return; } if (reply.vote_granted reply.term current_term_.load()) { votes_received; if (votes_received.load() peer_id_to_addr_map_.size() / 2 !became_leader_or_follower.load()) { // 赢得多数投票 // LOG(INFO) Node node_id_ won election for Term current_term_.load(); become_leader(); became_leader_or_follower.store(true); cv_.notify_one(); // 唤醒主循环重新评估状态 } } }).detach(); // 使用detach允许线程独立运行 } }3. Leader 状态转换从 Candidate 状态因赢得选举而转换。初始化对于每个 Follower初始化next_index为last_log_index 1( Leader 认为 Follower 下一个应该接收的日志索引)。初始化match_index为 0。立即发送一次心跳 (空的AppendEntriesRPC) 给所有 Follower。客户端请求接收客户端命令将其封装为LogEntry附加到自己的日志中。持久化日志。并行向所有 Follower 发送AppendEntriesRPC。日志复制Leader 周期性地发送AppendEntriesRPC (心跳或带有新日志条目)。根据 Follower 返回的AppendEntriesReply更新next_index和match_index。如果successtrue更新match_index和next_index。如果successfalse(通常是日志不匹配)减少next_index并重试AppendEntries直到日志匹配。提交日志Leader 追踪每个日志条目在大多数 Follower 上被复制的情况。如果存在一个日志条目N它在当前term中被 Leader 创建并且已经被大多数 Follower 复制即match_index[i] N对于大多数i成立Leader 将commit_index更新为N。然后 Leader 通知 Follower 提交该条目。退位如果 Leader 发现有更高term的 RPC (例如来自新 Leader 的AppendEntries)它将立即转换回 Follower 状态。// raft_node.cpp (部分) void RaftNode::become_leader() { state_ NodeState::Leader; // LOG(INFO) Node node_id_ became Leader for Term current_term_.load(); // 初始化 Leader 状态 uint64_t last_log_idx get_last_log_index(); for (const auto entry : peer_id_to_addr_map_) { uint64_t peer_id entry.first; if (peer_id node_id_) continue; next_index_[peer_id] last_log_idx 1; match_index_[peer_id] 0; } // 立即发送一次心跳 send_append_entries_to_peers(true); last_heartbeat_time_ std::chrono::steady_clock::now(); } // 客户端调用此函数向 Leader 提议一个命令 bool RaftNode::propose(const std::vectorchar command) { std::lock_guardstd::mutex lock(mtx_); if (state_ ! NodeState::Leader) { // 客户端需要知道当前的 Leader或 Leader 转发请求 // LOG(WARNING) Node node_id_ is not Leader, cannot propose command.; return false; } uint64_t new_log_index get_last_log_index() 1; log_.emplace_back(current_term_.load(), new_log_index, command); persist_state(); // 新日志条目需要持久化 // LOG(INFO) Leader node_id_ proposed command at index new_log_index; // 立即向所有 Follower 复制新日志 send_append_entries_to_peers(false); // 不是心跳带有新日志 // Leader 还需要等待日志被多数复制并提交这里可能需要一个异步等待机制 // 例如返回一个Future客户端可以等待Future完成 // 简化假设这里只是发起复制不等待结果立即返回 return true; } void RaftNode::send_append_entries_to_peers(bool is_heartbeat) { // 异步发送 AppendEntries RPC 到所有 Follower for (const auto entry : peer_id_to_addr_map_) { uint64_t peer_id entry.first; if (peer_id node_id_) continue; std::thread([this, peer_id, is_heartbeat]() { std::lock_guardstd::mutex lock(mtx_); // 锁定以访问 next_index_, match_index_, log_ AppendEntriesArgs args; args.term current_term_.load(); args.leader_id node_id_; args.leader_commit commit_index_.load(); uint64_t next_idx_for_peer next_index_[peer_id]; // 如果 next_idx_for_peer 是0说明是初始状态或者需要从头开始发送 // 正常情况下next_idx_for_peer至少为1对应虚拟条目之后的第一个实际条目 if (next_idx_for_peer 0) { // 应该是 last_log_index 1 next_idx_for_peer 1; // 假定从第一个实际日志条目开始 } // prev_log_index 是 next_index_for_peer - 1 args.prev_log_index next_idx_for_peer - 1; args.prev_log_term (args.prev_log_index 0 args.prev_log_index log_.size()) ? log_[args.prev_log_index].term : 0; // 如果是心跳entries为空否则发送从 next_idx_for_peer 开始的所有日志条目 if (!is_heartbeat) { for (size_t i next_idx_for_peer; i log_.size(); i) { args.entries.push_back(log_[i]); } } // 发送 RPC std::vectorchar reply_data rpc_client_pool_-get_client(peer_id)-send_request( AppendEntries, args.serialize(), std::chrono::milliseconds(500)); if (reply_data.empty()) { // LOG(WARNING) AppendEntries RPC to peer_id timed out or failed.; return; } AppendEntriesReply reply AppendEntriesReply::deserialize(reply_data); // 处理回复 std::lock_guardstd::mutex lock_reply(mtx_); // 再次锁定防止与主循环冲突 if (state_ ! NodeState::Leader) { // 可能在发送期间已经退位 return; } if (reply.term current_term_.load()) { // 发现更高任期的节点退回Follower become_follower(reply.term); cv_.notify_one(); return; } if (reply.success) { // 成功复制更新 next_index 和 match_index // 成功复制的日志条目数量是 args.entries.size() // match_index 应该是 prev_log_index entries.size() match_index_[peer_id] args.prev_log_index args.entries.size(); next_index_[peer_id] match_index_[peer_id] 1; } else { // 日志不匹配需要回溯 next_index // 利用 reply.conflict_term 和 reply.conflict_index 快速回溯 if (reply.conflict_term ! 0) { // 尝试在 Leader 日志中找到冲突任期的最后一个条目 uint64_t idx get_last_log_index(); while (idx 0 log_[idx].term reply.conflict_term) { idx--; } if (idx 0 || log_[idx].term ! reply.conflict_term) { // Leader 日志中没有该任期或者该任期早于冲突任期直接回溯到冲突索引 next_index_[peer_id] reply.conflict_index; } else { // 找到冲突任期从该任期最后一个条目之后开始发送 next_index_[peer_id] idx 1; } } else { // 默认回溯一个条目 next_index_[peer_id] std::max(1ULL, next_index_[peer_id] - 1); } // LOG(INFO) Leader node_id_ failed to append to peer_id // , trying to backtrack to next_index next_index_[peer_id]; } // 检查是否可以提交新的日志条目 // 从当前的 commit_index 1 开始尝试提交更高索引的日志 uint64_t current_commit_idx commit_index_.load(); for (uint64_t n get_last_log_index(); n current_commit_idx; --n) { // 只有当前任期的日志条目才能通过计数方式提交 if (log_[n].term ! current_term_.load()) { continue; } int replicas 1; // Leader 自己也有一份 for (const auto kv : match_index_) { if (kv.first ! node_id_ kv.second n) { replicas; } } if (replicas peer_id_to_addr_map_.size() / 2) { commit_index_.store(n); cv_.notify_one(); // 通知应用状态机 // LOG(INFO) Leader node_id_ committed log n; break; // 每次只提交一个最高索引 } } }).detach(); } }状态机应用一个独立的线程负责将commit_index到last_applied之间的日志条目应用到实际的状态机中。// raft_node.cpp (部分) void RaftNode::apply_committed_entries() { while (running_) { std::unique_lockstd::mutex lock(mtx_); // 等待直到有新的日志可以应用或者停止 cv_.wait(lock, [this]() { return !running_ || commit_index_.load() last_applied_.load(); }); if (!running_) { break; } while (last_applied_.load() commit_index_.load()) { last_applied_; LogEntry entry_to_apply log_[last_applied_.load()]; lock.unlock(); // 解锁避免在执行用户回调时阻塞 Raft 核心逻辑 apply_log_callback_(entry_to_apply); // 调用用户提供的回调应用到存储状态机 lock.lock(); // 重新锁定 } } }挑战与高级主题1. 日志压缩 (Snapshotting)随着时间推移Raft 日志会不断增长消耗大量存储空间并延长启动时间。日志压缩通过创建快照来解决此问题。快照包含当前状态机的完整状态以及最新的last_included_index和last_included_term。创建快照后快照之前的所有日志条目都可以被丢弃。Leader 需要将快照发送给落后的 Follower。2. 成员变更动态地添加或移除集群成员是一个复杂的问题。Raft 协议通过两阶段提交的方式安全地处理集群成员变更。Leader 首先将一个包含新旧配置的联合配置条目复制到集群中待其提交后再提交一个只包含新配置的条目。3. 客户端交互客户端需要知道当前的 Leader 才能发送请求。常见的做法是客户端随机向一个节点发送请求。如果该节点不是 Leader它会返回 Leader 的地址或者一个错误让客户端重试客户端缓存 Leader 信息。4. 性能优化批量提交将多个客户端请求打包成一个日志条目减少 RPC 次数和磁盘 I/O。并行复制Leader 可以并行地向多个 Follower 发送AppendEntriesRPC。零拷贝在日志持久化和网络传输中尽量使用零拷贝技术减少数据在内存中的复制。5. 容错与测试故障注入模拟网络分区、节点崩溃、消息丢失/乱序等故障验证 Raft 实现的健壮性。线性一致性验证确保所有客户端读写操作都满足线性一致性语义。结语在分布式存储系统内核中利用 C 实现 Raft 共识协议是一项充满挑战但极具价值的工作。它要求开发者不仅精通 Raft 协议的每一个细节还需要深入理解 C 的性能特性、并发模型和系统级编程。通过本文的探讨我们希望能够为读者勾勒出一条清晰的实现路径并强调在追求极致性能和高可靠性时C 依然是构建此类核心基础设施的卓越选择。一个健壮的 Raft 实现是分布式存储系统实现高可用、强一致性和数据持久化的基石。

更多文章