hyperf 对接企业微信 将消息发送功能改造为异步,使用 HyperF AsyncQueue投递消息任务,失败后自动重试 3 次,超出重试次数后记录

张开发
2026/4/15 3:33:17 15 分钟阅读

分享文章

hyperf 对接企业微信 将消息发送功能改造为异步,使用 HyperF AsyncQueue投递消息任务,失败后自动重试 3 次,超出重试次数后记录
安装 composer require hyperf/async-queue---1.队列配置?php// config/autoload/async_queue.phpreturn[wechat[// 队列名可自定义driver\Hyperf\AsyncQueue\Driver\RedisDriver::class,redis[pooldefault],channelqueue:wechat,// Redis key 前缀timeout30,// 单个任务最长执行秒数retry_seconds[5,10,20],// 每次重试的延迟秒3次对应3个值handle_timeout10,processes2,// 消费进程数concurrent[limit5],// 并发限制],];---2.失败日志表迁移?php// migrations/2024_01_01_000001_create_wechat_failed_jobs_table.phpuse Hyperf\Database\Schema\Schema;use Hyperf\Database\Schema\Blueprint;use Hyperf\Database\Migrations\Migration;classCreateWechatFailedJobsTableextends Migration{publicfunctionup():void{Schema::create(wechat_failed_jobs,function(Blueprint $table){$table-bigIncrements(id);$table-string(job_class);$table-string(to_user)-comment(目标 userId);$table-string(msg_type)-comment(消息类型: text/image/news);$table-json(payload)-comment(完整消息内容);$table-text(exception)-comment(最终失败原因);$table-unsignedTinyInteger(attempts)-default(0)-comment(已重试次数);$table-timestamps();});}publicfunctiondown():void{Schema::dropIfExists(wechat_failed_jobs);}}---3.Job 基类统一 maxAttempts?php// app/Job/AbstractWechatJob.phpnamespaceApp\Job;use Hyperf\AsyncQueue\Job;abstractclassAbstractWechatJobextends Job{/** * 最大尝试次数含第一次超出后进入 failed 队列并触发 FailedMessage 事件 */publicint$maxAttempts3;}---4.发送文本消息 Job?php// app/Job/SendWechatTextJob.phpnamespaceApp\Job;use App\Service\WechatMessageSender;use Hyperf\Di\Annotation\Inject;classSendWechatTextJobextends AbstractWechatJob{publicfunction__construct(publicreadonly string $userId,publicreadonly string $content,){}publicfunctionhandle():void{/** var WechatMessageSender $sender */$sendermake(WechatMessageSender::class);$sender-sendText($this-userId,$this-content);}}---5.发送图片消息 Job?php// app/Job/SendWechatImageJob.phpnamespaceApp\Job;use App\Service\WechatMediaService;classSendWechatImageJobextends AbstractWechatJob{publicfunction__construct(publicreadonly string $userId,publicreadonly string $localPath,// 本地图片绝对路径){}publicfunctionhandle():void{make(WechatMediaService::class)-sendImageToUser($this-userId,$this-localPath);}}---6.失败事件监听器写入失败日志表?php// app/Listener/WechatJobFailedListener.phpnamespaceApp\Listener;use App\Job\AbstractWechatJob;use App\Job\SendWechatImageJob;use App\Job\SendWechatTextJob;use Hyperf\AsyncQueue\Event\FailedHandle;use Hyperf\Event\Annotation\Listener;use Hyperf\Event\Contract\ListenerInterface;use Hyperf\DbConnection\Db;use Psr\Log\LoggerInterface;use Hyperf\Logger\LoggerFactory;#[Listener]classWechatJobFailedListenerimplements ListenerInterface{privateLoggerInterface $logger;publicfunction__construct(LoggerFactory $loggerFactory){$this-logger$loggerFactory-get(wechat);}publicfunctionlisten():array{return[FailedHandle::class,// 超出 maxAttempts 后触发];}publicfunctionprocess(object $event):void{/** var FailedHandle $event */$job$event-getMessage()-job();$throwable$event-getThrowable();// 只处理企业微信相关 Jobif(!$job instanceof AbstractWechatJob){return;}[$toUser,$msgType,$payload]$this-extractJobInfo($job);$this-logger-error(企业微信消息最终发送失败,[jobget_class($job),to_user$toUser,msg_type$msgType,exception$throwable-getMessage(),]);// 写入失败日志表Db::table(wechat_failed_jobs)-insert([job_classget_class($job),to_user$toUser,msg_type$msgType,payloadjson_encode($payload,JSON_UNESCAPED_UNICODE),exception$throwable-getMessage().\n.$throwable-getTraceAsString(),attempts$job-maxAttempts,created_atdate(Y-m-d H:i:s),updated_atdate(Y-m-d H:i:s),]);}privatefunctionextractJobInfo(AbstractWechatJob $job):array{returnmatch(true){$job instanceof SendWechatTextJob[$job-userId,text,[content$job-content],],$job instanceof SendWechatImageJob[$job-userId,image,[local_path$job-localPath],],default[unknown,unknown,[]],};}}---7.投递队列的服务封装?php// app/Service/WechatQueueService.phpnamespaceApp\Service;use App\Job\SendWechatImageJob;use App\Job\SendWechatTextJob;use Hyperf\AsyncQueue\Driver\DriverFactory;use Hyperf\AsyncQueue\Driver\DriverInterface;classWechatQueueService{privateDriverInterface $driver;publicfunction__construct(DriverFactory $factory){$this-driver$factory-get(wechat);// 对应 async_queue.php 中的 key}/** * 异步发送文本消息 */publicfunctionpushText(string $userId,string $content,int$delay0):bool{return$this-driver-push(newSendWechatTextJob($userId,$content),$delay);}/** * 异步发送图片消息 */publicfunctionpushImage(string $userId,string $localPath,int$delay0):bool{return$this-driver-push(newSendWechatImageJob($userId,$localPath),$delay);}}---8.控制器调用?php// app/Controller/MessageController.phpnamespaceApp\Controller;use App\Service\WechatQueueService;use Hyperf\HttpServer\Annotation\Controller;use Hyperf\HttpServer\Annotation\PostMapping;use Hyperf\HttpServer\Contract\RequestInterface;use Hyperf\HttpServer\Contract\ResponseInterface;#[Controller(prefix:/api)]classMessageController{publicfunction__construct(privatereadonly WechatQueueService $queue,privatereadonly RequestInterface $request,privatereadonly ResponseInterface $response,){}#[PostMapping(path:/message/send)]publicfunctionsend():\Psr\Http\Message\ResponseInterface{$userId$this-request-input(user_id,);$content$this-request-input(content,);if(empty($userId)||empty($content)){return$this-response-json([code400,messageuser_id 和 content 不能为空])-withStatus(400);}// 投递到异步队列立即返回不阻塞请求$this-queue-pushText($userId,$content);return$this-response-json([code0,message已加入发送队列]);}}---9.启动消费进程 在 config/autoload/processes.php 注册消费进程?php// config/autoload/processes.phpuse Hyperf\AsyncQueue\Process\ConsumerProcess;return[[classConsumerProcess::class,constructor[queuewechat],],];---完整流程 POST/api/message/send │ └─WechatQueueService::pushText()│ └─ Redis queue:wechat:waiting │ ConsumerProcess 消费 │SendWechatTextJob::handle()│ ┌────┴────────────────────┐ │ 成功 │ 失败抛出异常 │ │ └─ 完成 └─ 重试最多3次 │ 第3次仍失败 │ FailedHandle 事件触发 │WechatJobFailedListener::process()│ 写入 wechat_failed_jobs 表---关键点 ┌───────────────────┬────────────────────────────────────────────────────────────┐ │ 项目 │ 说明 │ ├───────────────────┼────────────────────────────────────────────────────────────┤ │ maxAttempts3│ 含第一次执行共尝试3次 │ ├───────────────────┼────────────────────────────────────────────────────────────┤ │ retry_seconds │ 配置[5,10,20]对应3次重试的间隔指数退避 │ ├───────────────────┼────────────────────────────────────────────────────────────┤ │ FailedHandle 事件 │ 超出次数后由框架自动触发无需手动捕获 │ ├───────────────────┼────────────────────────────────────────────────────────────┤ │ Job 属性序列化 │ Job 会被序列化存入 Redis属性只能是可序列化的基础类型 │ ├───────────────────┼────────────────────────────────────────────────────────────┤ │make()获取依赖 │ Job 的handle()内用make()从容器取服务不能构造函数注入 │

更多文章