首页 > 编程 > PHP > 正文

使用 mixphp 打造多进程异步邮件发送

2020-03-22 18:01:39
字体:
来源:转载
供稿:网友
这篇文章主要介绍了关于使用 mixphp 打造多进程异步邮件发送,有着一定的参考价值,现在分享给大家,有需要的朋友可以参考一下

邮件发送是很常见的需求,由于发送邮件的操作一般是比较耗时的,所以我们一般采用异步处理来提升用户体验,而异步通常我们使用消息队列来实现。

传统 MVC 框架由于缺少多进程开发能力,通常是采用同一个脚本执行多次,产生多个进程的方式,mixphp 封装了 TaskExecutor 专用于多进程开发,用户能非常简单的开发出功能完善的高可用多进程应用。

下面演示一个异步邮件发送系统的开发过程,涉及知识点:

异步

消息队列

多进程

守护进程

如何使用消息队列实现异步

PHP 使用消息队列通常是使用中间件来实现,常用的消息中间件有:

redis

rabbitmq

kafka

本次我们选用 redis 来实现异步邮件发送,redis 的数据类型中有一个 list 类型,可实现消息队列,使用以下命令:

// 入列$redis- lpush($key, $data);// 出列$data = $redis- rpop($key);// 阻塞出列$data = $redis- brpop($key, 10);
架构设计

本实例由传统 MVC 框架投递邮件发送需求,MixPHP 多进程执行发送任务。

邮件发送库选型

以往我们通常使用框架提供的邮件发送库,或者网上下载别的用户分享的库,composer 出现后,http://packagist.org/ 上有大量优质的库,我们只需选择一个最好的即可,本例选择 swiftmailer。

由于发送任务是由 MixPHP 执行,所以 swiftmailer 是安装在 MixPHP 项目中,在项目根目录中执行以下命令安装:

composer require swiftmailer/swiftmailer
生产者开发

在邮件发送这个需求中生产者是指投递发送任务的一方,这一方通常是一个接口或网页,这个部分并不一定需 mixphp 开发,TP、CI、YII 这些都可以,只需在接口或网页中把任务信息投递到消息队列中即可。

在传统 MVC 框架的控制器中增加如下代码:

通常框架中使用 redis 会安装一个类库来使用,本例使用原生代码,便于理解。
// 连接$redis = new /Redis();if (!$redis- connect( 127.0.0.1 , 6379)) { throw new /Exception( Redis Connect Failure $redis- auth( $redis- select(0);// 投递任务$data = [ to = [ ***@qq.com = A name ], body = Here is the message itself , subject = The title content ,$redis- lpush( queue:email , serialize($data));

通常异步开发中,投递完成后就会立即响应一个消息给用户,当然此时该任务并没有执行。

消费者开发

本例我们使用 MixPHP 的多进程开发工具 TaskExecutor 来完成这个需求,通常使用常驻进程来处理队列的消费,所以我们使用 TaskExecutor 的 TYPE_DAEMON 类型,MODE_PUSH 模式。

TaskExecutor 的 MODE_PUSH 模式有二种进程:

左进程:负责从消息队列取出任务数据,投放给中进程。

中进程:负责执行邮件发送任务。

PushCommand.php 代码如下:

 ?phpnamespace apps/daemon/commands;use mix/console/ExitCode;use mix/facades/Input;use mix/facades/Redis;use mix/task/CenterProcess;use mix/task/LeftProcess;use mix/task/TaskExecutor; * 推送模式范例 * @author 刘健 coder.liu@qq.com html' target='_blank'>class PushCommand extends BaseCommand // 配置信息 const HOST = smtpdm.aliyun.com  const PORT = 465; const SECURITY = ssl  const USERNAME = ****@email.***.com  const PASSWORD = ****  // 初始化事件 public function onInitialize() parent::onInitialize(); // TODO: Change the autogenerated stub // 获取程序名称 $this- programName = Input::getCommandName(); // 设置pidfile $this- pidFile = /var/run/{$this- programName}.pid  * 获取服务 * @return TaskExecutor public function getTaskService() return create_object( // 类路径 class = mix/task/TaskExecutor , // 服务名称 name = mix-daemon: {$this- programName} , // 执行类型 type = /mix/task/TaskExecutor::TYPE_DAEMON, // 执行模式 mode = /mix/task/TaskExecutor::MODE_PUSH, // 左进程数 leftProcess = 1, // 中进程数 centerProcess = 5, // 任务超时时间 (秒) timeout = 5, // 启动 public function actionStart() // 预处理 if (!parent::actionStart()) { return ExitCode::UNSPECIFIED_ERROR; // 启动服务 $service = $this- getTaskService(); $service- on( LeftStart , [$this, onLeftStart  $service- on( CenterStart , [$this, onCenterStart  $service- start(); // 返回退出码 return ExitCode::OK; // 左进程启动事件回调函数 public function onLeftStart(LeftProcess $worker) try { // 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线 $queueModel = Redis::getInstance(); // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出 for ($j = 0; $j 16000; $j++) { // 从消息队列中间件阻塞获取一条消息 $data = $queueModel- brpop( queue:email , 10); if (empty($data)) { continue; list(, $data) = $data; // 将消息推送给中进程去处理,push有长度限制 (http://wiki.swoole.com/wiki/page/290.html) $worker- push($data, false); } catch (/Exception $e) { // 休息一会,避免 CPU 出现 100% sleep(1); // 抛出错误 throw $e; // 中进程启动事件回调函数 public function onCenterStart(CenterProcess $worker) // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出 for ($j = 0; $j 16000; $j++) { // 从进程消息队列中抢占一条消息 $data = $worker- pop(); if (empty($data)) { continue; // 处理消息 try { // 处理消息,比如:发送短信、发送邮件、微信推送 var_dump($data); $ret = self::sendEmail($data); var_dump($ret); } catch (/Exception $e) { // 回退数据到消息队列 $worker- rollback($data); // 休息一会,避免 CPU 出现 100% sleep(1); // 抛出错误 throw $e; // 发送邮件 public static function sendEmail($data) // Create the Transport $transport = (new /Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY)) - setUsername(self::USERNAME) - setPassword(self::PASSWORD); // Create the Mailer using your created Transport $mailer = new /Swift_Mailer($transport); // Create a message $message = (new /Swift_Message($data[ subject ])) - setFrom([self::USERNAME = **网 ]) - setTo($data[ to ]) - setBody($data[ body  // Send the message $result = $mailer- send($message); return $result;}
测试

在 shell 中启动 push 常驻程序。

[root@localhost bin]# ./mix-daemon push startmix-daemon push start successed.

调用接口往消息队列投放任务。

此时 shell 终端将打印:

5710806-053c97487e5344ac[1].png

成功收到测试邮件:

5710806-6f6ce15dc9d77933[1].png

以上就是本文的全部内容,希望对大家的学习有所帮助,更多相关内容请关注PHP !

相关推荐:

给PHP开启shmop扩展实现共享内存

php实现共享内存进程通信函数(_shm)

以上就是使用 mixphp 打造多进程异步邮件发送的详细内容,PHP教程

郑重声明:本文版权归原作者所有,转载文章仅为传播更多信息之目的,如作者信息标记有误,请第一时间联系我们修改或删除,多谢。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表