教你使用mixphp打造多进程异步邮件发送

2025-07-08 03:30:02 3
  • 收藏
  • 管理
    注意:这个是 MixPHP V1 的范例 邮件发送是很常见的需求,由于发送邮件的操作一般是比较耗时的,所以我们一般采用异步处理来提升用户体验,而异步通常我们使用消息队列来实现。 传统 MVC 框架由于缺少多进程开发能力,通常是采用同一个脚本执行多次,产生多个进程的方式,mixphp 封装了 TaskExecutor 专用于多进程开发,用户能非常简单的开发出功能完善的高可用多进程应用。 推荐:《PHP视频教程》 下面演示一个异步邮件发送系统的开发过程,涉及知识点: 异步消息队列多进程守护进程如何使用消息队列实现异步 PHP 使用消息队列通常是使用中间件来实现,常用的消息中间件有: redisrabbitmqkafka 本次我们选用 redis 来实现异步邮件发送,redis 的数据类型中有一个 list 类型,可实现消息队列,使用以下命令: 复制// 入列 $redis->lpush($key, $data);// 出列 $data = $redis->rpop($key);// 阻塞出列 $data = $redis->brpop($key,10); 架构设计 本实例由传统 MVC 框架投递邮件发送需求,MixPHP 多进程执行发送任务。 邮件发送库选型 以往我们通常使用框架提供的邮件发送库,或者网上下载别的用户分享的库,composer 出现后,https://packagist.org/ 上有大量优质的库,我们只需选择一个最好的即可,本例选择 swiftmailer。 由于发送任务是由 MixPHP 执行,所以 swiftmailer 是安装在 MixPHP 项目中,在项目根目录中执行以下命令安装: 复制composer require swiftmailer/swiftmailer 生产者开发 在邮件发送这个需求中生产者是指投递发送任务的一方,这一方通常是一个接口或网页,这个部分并不一定需 mixphp 开发,TP、CI、YII 这些都可以,只需在接口或网页中把任务信息投递到消息队列中即可。 在传统 MVC 框架的控制器中增加如下代码: 通常框架中使用 redis 会安装一个类库来使用,本例使用原生代码,便于理解。 复制// 连接 $redis =new \\\\Redis();if(!$redis->connect(127.0.0.1,6379)){thrownew \\\\Exception(Redis Connect Failure);} $redis->auth(); $redis->select(0);// 投递任务 $data =[to=>[***@qq.com=>A name],body=>Here is the message itself,subject=>The title content,]; $redis->lpush(queue:email, serialize($data)); 通常异步开发中,投递完成后就会立即响应一个消息给用户,当然此时该任务并没有执行。 消费者开发 本例我们使用 MixPHP 的多进程开发工具 TaskExecutor 来完成这个需求,通常使用常驻进程来处理队列的消费,所以我们使用 TaskExecutor 的 TYPE_DAEMON 类型,MODE_PUSH 模式。 TaskExecutor 的 MODE_PUSH 模式有二种进程: 左进程:负责从消息队列取出任务数据,投放给中进程。 中进程:负责执行邮件发送任务。 PushCommand.php 代码如下: 复制 */classPushCommandextendsBaseCommand{// 配置信息const HOST =smtpdm.aliyun.com;const PORT =465;const SECURITY =ssl;const USERNAME =****@email.***.com;const PASSWORD =****;// 初始化事件publicfunction onInitialize(){ parent::onInitialize();// TODO: Change the autogenerated stub// 获取程序名称 $this->programName =Input::getCommandName();// 设置pidfile $this->pidFile ="/var/run/{$this->programName}.pid";}/** * 获取服务 * @return TaskExecutor */publicfunction getTaskService(){return create_object([// 类路径class=>mix\\\\task\\\\TaskExecutor,// 服务名称name=>"mix-daemon: {$this->programName}",// 执行类型type=> \\\\mix\\\\task\\\\TaskExecutor::TYPE_DAEMON,// 执行模式mode=> \\\\mix\\\\task\\\\TaskExecutor::MODE_PUSH,// 左进程数leftProcess=>1,// 中进程数centerProcess=>5,// 任务超时时间 (秒)timeout=>5,]);}// 启动publicfunction actionStart(){// 预处理if(!parent::actionStart()){returnExitCode::UNSPECIFIED_ERROR;}// 启动服务 $service = $this->getTaskService(); $service->on(LeftStart,[$this,onLeftStart]); $service->on(CenterStart,[$this,onCenterStart]); $service->start();// 返回退出码returnExitCode::OK;}// 左进程启动事件回调函数publicfunction onLeftStart(LeftProcess $worker){try{// 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线 $queueModel =Redis::getInstance();// 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出for($j =0; $j <16000; $j ){// 从消息队列中间件阻塞获取一条消息 $data = $queueModel->brpop(queue:email,10);if(empty($data)){continue;} list(, $data)= $data;// 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html) $worker->push($data,false);}}catch(\\\\Exception $e){// 休息一会,避免 CPU 出现 100% sleep(1);// 抛出错误throw $e;}}// 中进程启动事件回调函数publicfunction onCenterStart(CenterProcess $worker){// 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出for($j =0; $j <16000; $j ){// 从进程消息队列中抢占一条消息 $data = $worker->pop();if(empty($data)){continue;}// 处理消息try{// 处理消息,比如:发送短信、发送邮件、微信推送 var_dump($data); $ret =self::sendEmail($data); var_dump($ret);}catch(\\\\Exception $e){// 回退数据到消息队列 $worker->rollback($data);// 休息一会,避免 CPU 出现 100% sleep(1);// 抛出错误throw $e;}}}// 发送邮件publicstaticfunction sendEmail($data){// Create the Transport $transport =(new \\\\Swift_SmtpTransport(self::HOST,self::PORT,self::SECURITY))->setUsername(self::USERNAME)->setPassword(self::PASSWORD);// Create the Mailer using your created Transport $mailer =new \\\\Swift_Mailer($transport);// Create a message $message =(new \\\\Swift_Message($data[subject]))->setFrom([self::USERNAME =>**网])->setTo($data[to])->setBody($data[body]);// Send the message $result = $mailer->send($message);return $result;}} 测试 1.在 shell 中启动 push 常驻程序。 复制[root@localhost bin]# ./mix-daemon push start mix-daemon push start successed. 1.调用接口往消息队列投放任务。 此时 shell 终端将打印: 成功收到测试邮件: MixPHP GitHub: https://github.com/mix-php/mix 官网:http://www.mixphp.cn/
    上一页:教你使用优化的概念进行网站推广 下一页:教你使用golang实现redi服务器
    全部评论(0)