011、消息队列应用:RabbitMQ、Kafka与Celery

张开发
2026/4/12 1:05:01 15 分钟阅读

分享文章

011、消息队列应用:RabbitMQ、Kafka与Celery
011、消息队列应用RabbitMQ、Kafka与Celery从一次线上故障说起上周半夜被报警叫醒系统里堆积了十几万条未处理的消息。登录服务器一看RabbitMQ内存占用98%生产者还在拼命往里塞数据消费者却全躺平了。紧急扩容后开始排查发现某个消费者处理单条消息要30秒而生产速率是每秒50条——典型的“消费能力跟不上生产速度”导致消息堆积。这种场景在我们后端系统里太常见了今天就来聊聊消息队列这个“系统解耦神器”和“性能杀手”的双面角色。RabbitMQ老牌选手的生存之道RabbitMQ用Erlang实现稳定得像个老管家。它的AMQP协议设计得很严谨但刚上手容易懵。看看这个典型的生产者示例importpika# 这里踩过坑线上环境一定要用ConnectionParameters配置心跳connectionpika.BlockingConnection(pika.ConnectionParameters(hostlocalhost,heartbeat600))channelconnection.channel()# 声明队列时记得设置持久化不然重启就丢数据channel.queue_declare(queueorder_queue,durableTrue)# 发消息时delivery_mode2是持久化关键channel.basic_publish(exchange,routing_keyorder_queue,body订单数据JSON字符串,propertiespika.BasicProperties(delivery_mode2,# 持久化消息))消费者端更要注意很多人在这里栽跟头defcallback(ch,method,properties,body):try:# 业务处理逻辑process_order(body)# 手动确认别用auto_ack消息丢了都不知道ch.basic_ack(delivery_tagmethod.delivery_tag)exceptExceptionase:# 处理失败时记录日志并拒绝消息logger.error(f处理失败:{e})# 第三个参数requeueTrue会让消息重新入队ch.basic_nack(delivery_tagmethod.delivery_tag,requeueTrue)# 设置QoS控制消费者同时处理的消息数channel.basic_qos(prefetch_count1)# 一次只处理一条避免某个消费者负载过高channel.basic_consume(queueorder_queue,on_message_callbackcallback)RabbitMQ的exchange类型direct、topic、fanout选型要看场景。订单系统用direct日志广播用fanout灵活路由用topic。但记住功能越复杂性能代价越大。Kafka吞吐量怪兽的脾气Kafka是另一种思路。它不在乎消息的“生死”只保证消息按顺序持久化。第一次用Kafka的人常被它的术语搞晕——broker、partition、offset、consumer group。看段生产者代码fromkafkaimportKafkaProducerimportjson# 序列化器选对很重要json是最稳妥的选择producerKafkaProducer(bootstrap_servers[localhost:9092],value_serializerlambdav:json.dumps(v).encode(utf-8),# 这两个参数影响可靠性和吞吐的平衡acksall,# 所有副本确认才算成功retries3# 失败重试次数)# 发送时指定key相同key的消息会进入同一个partitionproducer.send(user_actions,keyuser_id.encode(),value{action:click,page:home})# 一定要flush不然可能丢最后一批消息producer.flush()消费者这边学问更大fromkafkaimportKafkaConsumer consumerKafkaConsumer(user_actions,bootstrap_servers[localhost:9092],group_idanalytics_group,# 同一个group的消费者共享offsetenable_auto_commitFalse,# 手动提交offset控制更精准value_deserializerlambdax:json.loads(x.decode(utf-8)))formessageinconsumer:try:process_action(message.value)# 处理成功才提交offsetconsumer.commit()exceptException:# 处理失败不提交下次还能读到这条logger.error(处理失败暂停消费)time.sleep(5)# 暂停一会儿避免死循环Kafka的partition数量要在创建topic时就规划好后期修改很麻烦。经验公式partition数 消费者数量 × 消费能力系数。别设太少成为瓶颈也别设太多增加管理开销。CeleryPython开发者的快速解决方案Celery是另一种思路——它本质是分布式任务队列但很多人当消息队列用。它的优势是跟Python生态无缝集成fromceleryimportCelery# broker用RabbitMQ或Redis都行RabbitMQ更稳定appCelery(tasks,brokerpyamqp://guestlocalhost//,backendredis://localhost:6379/0)app.task(bindTrue,max_retries3)defprocess_image(self,image_path):try:# 业务逻辑resultresize_image(image_path)returnresultexceptExceptionasexc:# 失败重试指数退避raiseself.retry(excexc,countdown2**self.request.retries)Celery的worker启动有讲究# 别用默认并发数celery-Atasks worker--loglevelinfo--concurrency4# 生产环境用gevent或eventlet提升IO密集型任务性能celery-Atasks worker--loglevelinfo--poolgevent--concurrency100监控Celery任务状态很重要我习惯用flower# 启动监控celery-A tasks flower--port5555选型实战什么时候用什么去年设计电商系统时我们这样分配RabbitMQ处理订单流程——需要严格的消息确认、死信队列、优先级队列。用户下单后订单消息进RabbitMQ库存服务、优惠券服务、日志服务各自订阅一个消息驱动多个动作。Kafka处理用户行为日志——每天几十GB的点击流数据允许少量丢失但要超高吞吐。数据进Kafka后实时分析服务、离线数仓、推荐系统各取所需。Celery处理后台任务——用户上传图片生成缩略图、发送营销邮件、数据报表生成。这些任务需要重试机制、进度查询、结果存储。调试血泪史RabbitMQ内存爆过三次。第一次是消息没设置过期时间积压了百万条第二次是消费者忘记ack消息重复消费第三次是connection没设心跳网络抖动后连接假死。Kafka的坑在partition。有次设了10个partition但只有2个消费者8个partition闲置。另一次是consumer group没规划好多个服务互相抢消息。Celery最坑的是版本兼容。从3.x升级到4.x时API大变半夜回滚代码。还有一次backend用Redis结果Redis内存满了任务结果全丢。个人经验包监控必须到位RabbitMQ的管理界面、Kafka的JMX指标、Celery的flower一个都不能少。设好告警阈值别等爆了才发现。消费者要设计成幂等的消息可能重复投递你的业务逻辑要能处理重复消息。加个唯一ID处理前查一下是否已处理过。别迷信吞吐量数字Kafka宣称百万TPS那是理想场景。实际业务中序列化、网络、业务逻辑都是瓶颈。先压测再上线。消息格式向前兼容JSON比Protobuf灵活但体积大。我们折中方案JSON主体加schema版本号解析时根据版本号处理字段变化。死信队列一定要设处理失败的消息移到死信队列方便排查和修复后重试。见过太多因为没死信队列错误消息在队列里循环的惨剧。本地开发用Docker Compose一套yml文件把RabbitMQ、Kafka、Redis全启起来省去安装配置的麻烦。消息队列用好了是系统骨架用不好就是事故源泉。每次设计新队列时多问几句消息丢了怎么办重复了怎么办积压了怎么办这三个问题想清楚能避开80%的坑。夜深了监控告警又亮了——这次是Kafka的disk usage超过85%。你看消息队列从来不会让你无聊。去扩容磁盘了下次聊。

更多文章