Golang基于Redis的高性能发布订阅(PubSub)系统设计与实现

张开发
2026/4/11 18:55:56 15 分钟阅读

分享文章

Golang基于Redis的高性能发布订阅(PubSub)系统设计与实现
引言在分布式系统中发布订阅(Pub/Sub)是最常见的异步通信模型之一。本项目在core/pkg/pubsub中基于 Redis 实现了一套高性能、可配置、支持批量发送的 PubSub 组件用于在多节点之间高效传递事件消息。参考代码https://github.com/openskeye/go-vss/blob/main/core/pkg/pubsub一、背景与需求在实际业务中我们需要满足以下场景多节点消息广播IM 消息、通知、事件推送等需要跨节点分发。高吞吐写入单节点每秒可能产生大量消息如果直接一条一发会给 Redis 和网络带来较大压力。可控的延迟与批量需要在「实时性」和「吞吐」之间做折中通过配置控制。可观测与可靠出现异常能够发告警邮件避免静默失败。设计目标批量发送同一频道的多条消息聚合后一次性 Publish减少 Redis QPS 与网络开销。可配置节流策略支持按消息数量与时间窗口两种维度做发送节流。健壮的并发与关闭机制在高并发下不会 panic不会出现数据竞争。简单易用的 API对业务方暴露的只有Send和Subscribe两个核心入口。二、整体架构与核心组件2.1 关键结构体typeConfstruct{Email tps.YamlEmail// 消息列表最大容量MaxMessageCount,// 心跳检测清空数据周期HeartbeatInterval,// 没有消息进入是最后一次发送时间间隔SendIntervalint// 当前节点域名Hoststring}// 内部状态typepsstruct{Ctx context.Context conf*Conf closeOnce sync.Once MessagechanredisPublishMessageChanType// 生产者写入的消息Messages sync.Map// channel - []stringSendTimestamps sync.Map// channel - int64(lastSend)PublishMessageschan*redisMessageChanType// 待发布到 Redis 的批量消息ExitSignalchanerrorIsClosedboolclosedint32}// 对外暴露的 Redis 客户端包装typeRedisClientstruct{*ps isClusterboolclient*redis.Client clusterClient*redis.ClusterClient}2.2 系统时序图消息发布全流程RedisClient.SubscribeRedispublishProc(批量发布)heartbeatProc(心跳flush)queueProc(缓冲队列)RedisClient.Send业务方(Producer)RedisClient.SubscribeRedispublishProc(批量发布)heartbeatProc(心跳flush)queueProc(缓冲队列)RedisClient.Send业务方(Producer)消息发布流程alt[队列满 或 超过SendInterval]loop[批量发布]消息订阅流程Send(channel, message)写入 Message chan按channel累积消息\n(channel - []string)写入 PublishMessages定时遍历Messages\n将剩余消息写入 PublishMessages过滤空消息、组装[]stringPublish(channel, json(messages))Subscribe(channel)推送 json([]string)反序列化为 []string通过worker pool并发执行completion三、核心流程解析3.1 消息生产Send 与内部队列业务方只需要调用一个简单的接口// 推送消息func(r*RedisClient)Send(channelstring,message[]byte){ifr.isClosed(){return}r.Message-redisPublishMessageChanType{channel,string(message)}}特性非阻塞场景可控Message通道有缓冲容量5000足以应对普通峰值。统一入口所有发布请求都汇聚到queueProc集中做批量与节流控制。3.2 队列聚合与按 channel 维度的节流queueProc是整个组件的“心脏”负责将同一channel的消息聚合为一个[]string。根据配置决定何时触发一次批量发送。维护每个 channel 的最近发送时间。核心逻辑简化如下伪代码for{select{case-r.Ctx.Done():r.close()r.sendEmail(redis publish 消息队列异常结束,...)returncaseval:-r.Message:ifr.isClosed()||val.channel{continue}now:nowMilli()// 取出当前 channel 的消息列表msgs:loadOrInitMessages(val.channel)// 读取上次发送时间lastSend:loadOrInitLastSend(val.channel,now)// 满足任一条件则触发批量发送iflen(msgs)conf.MaxMessageCount||now-lastSendint64(conf.SendInterval){PublishMessages-{channel:val.channel,messages:msgs}updateLastSend(val.channel,now)clearMessages(val.channel)}// 追加当前消息appendMessage(val.channel,val.message)caseerr:-r.ExitSignal:r.sendEmail(redis publish 消息队列异常退出,...,err.Error())r.close()return}}关键点按 channel 维度独立节流Messages与SendTimestamps都是按 channel 分片管理不同业务频道互不干扰。双条件触发数量阈值MaxMessageCount时间阈值SendInterval毫秒配置驱动所有阈值都由Conf控制支持按场景调参。3.3 心跳 flush避免残留仅依靠SendInterval可能会出现一种情况某个 channel 短时间内只收到少量消息数量未达阈值但长时间也没有新的消息进入。为了解决这个数据残留问题引入了heartbeatProcfunc(r*RedisClient)heartbeatProc(){ticker:time.NewTicker(time.Millisecond*time.Duration(r.conf.HeartbeatInterval))deferticker.Stop()for{select{case-r.Ctx.Done():returncase-ticker.C:ifr.isClosed(){return}now:nowMilli()r.Messages.Range(func(key,value any)bool{ifr.isClosed(){returnfalse}channel,ok:key.(string)msgs,ok2:value.(redisMessages)if!ok||!ok2||len(msgs)0{returntrue}r.PublishMessages-redisMessageChanType{channel:channel,messages:msgs}r.SendTimestamps.Store(channel,now)r.Messages.Store(channel,redisMessages(nil))returntrue})}}}作用定期扫描所有 channel主动 flush 剩余消息。避免“低频” channel 的消息长时间滞留在内存中。3.4 批量发布publishProcpublishProc将PublishMessages中的批量消息真正写入 Redisfor{select{case-r.Ctx.Done():returncasedata:-r.PublishMessages:ifr.isClosed()||datanil||len(data.messages)0{continue}// 过滤空消息组装最终批量消息msgs:filterEmpty(data.messages)iflen(msgs)0{continue}payload,err:JSONMarshal(msgs)iferr!nil{LogError(redis publish[data.channel] 消息序列化失败)continue}if_,err:r.publish(data.channel,payload).Result();err!nil{ifr.isClosed(){return}r.ExitSignal-errreturn}}}特点统一 JSON 批量格式订阅端一次性拿到[]string减少流量与解析开销。错误上报发布失败会通过ExitSignal通知queueProc并最终触发邮件告警。四、订阅端设计高并发安全消费4.1 基础订阅流程订阅端接口func(r*RedisClient)Subscribe(channelstring,completionfunc(messages RedisPublishMessageType)){ps:r.subscribe(channel)deferfunc(){_ps.Close()}()const(workerCount10bufferSize100)msgCh:make(chanRedisPublishMessageType,bufferSize)varwg sync.WaitGroup// 固定 worker 数并发消费fori:0;iworkerCount;i{wg.Add(1)gofunc(){deferwg.Done()foritem:rangemsgCh{completion(item)}}()}deferfunc(){close(msgCh)wg.Wait()}()foritem:rangeps.Channel(){ifitem.Payload{continue}varlist RedisPublishMessageTypeiferr:functions.JSONUnmarshal([]byte(item.Payload),list);err!nil{functions.LogError(消息解析失败, err: %s,err)continue}// 统一走 worker pool避免每条消息起一个 goroutinemsgCh-list}}设计要点固定 worker 数workerCount控制并发度防止高 QPS 时疯狂起 goroutine。buffered channel 缓冲bufferSize提供背压缓冲区在短暂突发时不上来就阻塞。统一退出机制defer close(msgCh)wg.Wait()确保所有消息处理完毕再返回避免 goroutine 泄漏。4.2 订阅时序图completion回调Worker PoolRedisClient.SubscribeRediscompletion回调Worker PoolRedisClient.SubscribeRedisloop[并发消费]推送 payload(json: []string)JSONUnmarshal(payload)msgCh - []stringcompletion([]string)五、并发安全与优雅关闭5.1 防止重复 close 与数据竞争ps内部使用closeOnce sync.Once保证close()至多执行一次。closed int32atomic提供isClosed()/markClosed()两个方法并发安全判断状态。func(r*RedisClient)isClosed()bool{returnatomic.LoadInt32(r.closed)1}func(r*RedisClient)markClosed(){r.IsClosedtrueatomic.StoreInt32(r.closed,1)}func(r*RedisClient)close(){r.closeOnce.Do(func(){r.markClosed()close(r.Message)close(r.PublishMessages)close(r.ExitSignal)})}效果即使多个 goroutine 同时触发关闭逻辑也不会出现close of closed channel的 panic。所有发送和消费逻辑都会优先调用isClosed()判断是否需要提前退出。5.2 异常告警机制当Ctx.Done()导致队列异常结束或publishProc发布失败触发ExitSignal都会调用sendEmail发送邮件告警包含节点信息conf.Host邮件配置conf.Email简要错误描述这保证了发布订阅链路出问题时不会静默失效。六、配置参数与调优建议6.1 关键配置配置项含义典型建议值影响维度MaxMessageCount单个 channel 批量最大条数100 ~ 5000吞吐量 / 延迟 / 内存SendInterval未达数量阈值时的最大发送间隔(ms)50 ~ 1000实时性 / 批量程度HeartbeatInterval心跳强制 flush 周期(ms)500 ~ 5000尾部消息滞留时间Email告警邮件配置视环境而定故障可观测性Host当前节点标识用于日志/路由节点域名/IP运维排查6.2 调优建议实时性优先将SendInterval调小如 50~100msMaxMessageCount适度降低HeartbeatInterval可以略大如 1000ms。吞吐优先提高MaxMessageCount适当放大SendInterval结合 Redis 集群能力合理评估单 channel 流量。内存敏感场景限制MaxMessageCount避免单个 channel 堆积过多消息HeartbeatInterval不宜过大避免残留过久。七、总结这个基于 Redis 的 PubSub 组件通过按 channel 维度的批量聚合与节流策略心跳驱动的尾部 flush 机制固定 worker pool 的订阅消费模型并发安全的关闭与异常告警机制在保证高吞吐的同时也兼顾了实时性、可靠性与可维护性。在需要跨节点消息广播、事件推送、高频通知的场景下这套设计可以作为一个通用的基础通信组件进一步和业务协议封装后即可在多项目间复用。

更多文章