ThinkPHP队列服务think-queue实战:从安装到高并发场景应用

张开发
2026/4/11 20:31:34 15 分钟阅读

分享文章

ThinkPHP队列服务think-queue实战:从安装到高并发场景应用
1. 为什么需要消息队列在开发Web应用时经常会遇到一些耗时操作比如发送短信、邮件通知、处理大文件等。如果直接在用户请求中处理这些操作会导致用户等待时间过长体验非常差。我曾经在一个电商项目中遇到过这样的问题促销活动时大量用户同时下单系统需要给每个用户发送订单确认短信结果短信接口响应慢直接拖垮了整个下单流程。这时候消息队列就派上用场了。它就像是一个任务中转站把耗时的操作放到后台异步处理。用户请求只需要把任务放进队列就可以立即返回由专门的消费者进程慢慢处理这些任务。这种削峰填谷的方式能有效应对高并发场景。ThinkPHP官方提供的think-queue扩展包就是一个轻量级的消息队列解决方案。它支持Redis、数据库等多种驱动配置简单特别适合中小型项目使用。我在多个实际项目中使用过它处理过日均百万级的消息量稳定性相当不错。2. 安装与基础配置2.1 使用Composer安装安装think-queue非常简单只需要在项目根目录下执行composer require topthink/think-queue这里有个小技巧如果你用的是ThinkPHP5.1需要指定版本号composer require topthink/think-queue:2.0.4安装完成后可以通过以下命令检查是否安装成功php think queue:status如果看到队列状态信息说明安装成功了。2.2 配置队列驱动think-queue支持多种驱动我强烈推荐使用Redis驱动因为它的性能最好。配置文件通常位于config/queue.php下面是一个典型的Redis配置return [ default redis, connections [ redis [ type redis, queue default, host 127.0.0.1, port 6379, password , select 0, timeout 0, persistent false, ], ], ];在实际项目中我通常会做这些优化配置设置persistent为true使用持久连接提高性能为不同业务设置不同的select数据库方便管理配置适当的timeout值防止长时间阻塞3. 创建和处理队列任务3.1 定义任务类任务类可以放在app/job目录下。比如我们要处理发送短信的任务namespace app\job; use think\queue\Job; class SmsJob { public function fire(Job $job, $data) { // 处理发送短信逻辑 $result $this-sendSms($data[mobile], $data[content]); if ($result) { $job-delete(); return true; } if ($job-attempts() 3) { $job-delete(); $this-logFailedJob($data); } else { $job-release(60); // 1分钟后重试 } } protected function sendSms($mobile, $content) { // 实际调用短信接口的代码 // 返回true或false表示成功或失败 } protected function logFailedJob($data) { // 记录失败任务日志 } }这里有几个实践经验分享一定要处理任务失败的情况设置合理的重试次数对于重要任务建议记录失败日志方便后续处理重试间隔要根据业务特点设置比如短信可以设置1分钟邮件可以设置5分钟3.2 推送任务到队列在控制器中推送任务非常简单use think\facade\Queue; class OrderController { public function create() { // 处理订单创建逻辑... // 推送短信任务 $data [ mobile 13800138000, content 您的订单已创建 ]; Queue::push(\app\job\SmsJob::class, $data, sms); return success(订单创建成功); } }在实际项目中我通常会封装一个队列服务类统一管理各种队列任务。这样可以避免在控制器中直接写队列逻辑也方便统一处理异常情况。4. 高并发场景下的优化实践4.1 多队列管理当系统中有多种类型的任务时建议使用不同的队列分开处理。比如// 短信队列 Queue::push(SmsJob::class, $data, sms); // 邮件队列 Queue::push(EmailJob::class, $data, email); // 日志队列 Queue::push(LogJob::class, $data, log);然后可以针对不同类型的队列启动不同的消费者进程# 处理短信队列 php think queue:work --queue sms # 处理邮件队列 php think queue:work --queue email这样做的好处是不同类型的任务互不影响可以根据任务重要性设置不同的并发数某个队列出问题时不会影响其他队列4.2 使用Supervisor管理进程在生产环境中我们需要确保队列消费者进程一直运行。Supervisor是一个很好的进程管理工具。配置示例如下[program:queue_worker] commandphp think queue:work --queue sms,email,log --memory128 --sleep3 --tries3 directory/path/to/your/project autostarttrue autorestarttrue userwww numprocs4 redirect_stderrtrue stdout_logfile/var/log/queue_worker.log关键配置说明numprocs4 表示启动4个进程并行处理memory128 限制每个进程最大内存128Msleep3 队列空时休眠3秒tries3 失败任务最多重试3次在实际部署时我通常会根据服务器配置调整numprocs值一般设置为CPU核心数的1.5-2倍。4.3 性能优化技巧在处理高并发队列时我总结了一些优化经验批量处理对于可以批量处理的任务比如发送短信可以一次处理多条减少IO操作public function fire(Job $job, $data) { // 如果是批量数据 if (isset($data[0])) { $this-batchSendSms($data); } else { $this-sendSms($data); } }内存控制设置合理的内存限制防止内存泄漏php think queue:work --memory128超时设置对于可能长时间运行的任务设置超时时间php think queue:work --timeout60延迟队列对于不需要立即处理的任务可以使用延迟队列// 1小时后执行 Queue::later(3600, SmsJob::class, $data, sms);5. 常见问题与解决方案在实际使用think-queue的过程中我遇到过不少坑这里分享几个典型问题的解决方法。5.1 任务重复执行有时候会发现同一个任务被执行了多次。这通常是因为任务执行时间过长超过了visibility timeout消费者进程异常退出解决方案优化任务处理逻辑缩短执行时间设置合理的超时时间实现任务幂等性处理5.2 内存泄漏长时间运行的PHP进程容易出现内存泄漏。可以通过以下方式避免定期重启消费者进程使用--memory参数限制内存在代码中注意释放大变量# 每个进程处理100个任务后自动重启 php think queue:work --max-jobs1005.3 队列堆积当生产者速度远大于消费者时会导致队列堆积。解决方法增加消费者进程数量优化消费者处理逻辑使用多个队列服务器分流监控队列长度也很重要可以写个简单的脚本检查队列长度超过阈值时报警。$redis new Redis(); $redis-connect(127.0.0.1, 6379); $length $redis-llen(queues:sms); if ($length 1000) { // 发送报警 }6. 监控与维护一个健壮的队列系统需要完善的监控机制。我通常会在项目中实现以下监控点队列长度监控实时监控各队列的任务积压情况消费者进程监控确保Supervisor管理的进程正常运行失败任务监控记录并定期处理失败任务性能监控记录任务处理耗时等指标可以使用ThinkPHP的日志系统记录这些信息或者集成专门的监控系统如Prometheus。对于失败任务我通常会实现一个重试机制定期检查失败任务表对可重试的任务重新放入队列。同时也会设置最大重试次数避免无限重试。class RetryFailedJobs { public function run() { $jobs FailedJob::where(attempts, , 3) -where(last_attempt, , time() - 3600) -select(); foreach ($jobs as $job) { Queue::push($job-payload[job], $job-payload[data], $job-queue); $job-increment(attempts); $job-last_attempt time(); $job-save(); } } }这个重试任务可以放在计划任务中每小时执行一次。对于重要的业务任务还应该实现人工干预接口方便运营人员手动重试失败任务。

更多文章