class TestController extends Controller { //其他方法 //发送消息 public function SendMessage(Request $request){ ... $this->dispatch((new SendMessage($sendParams))->onQueue('snail:SendMessage')); } }
内部实现:
创建消费任务
命令行运行如下命令:
/home/niuyufu/php/bin/php /home/niuyufu/webroot/assistant_api/artisan queue:work --queue=snail:SendMessage --tries=3 --memory=512 --daemon
内部实现:
队列消息分析:
监控redis对应队列消息,具体产生的消息操作,如下:
tail -f | redis-cli -h 10.94.120.13 -p 6380 monitor | grep "queues:snail"
1492446053.406282 [0 10.95.117.155:57132] "WATCH" "queues:snail:SendMessage:delayed"
1492446053.406452 [0 10.95.117.155:57132] "ZRANGEBYSCORE" "queues:snail:SendMessage:delayed" "-inf" "1492446053"
1492446053.406754 [0 10.95.117.155:57132] "WATCH" "queues:snail:SendMessage:reserved"
1492446053.406842 [0 10.95.117.155:57132] "ZRANGEBYSCORE" "queues:snail:SendMessage:reserved" "-inf" "1492446053"
1492446053.407029 [0 10.95.117.155:57132] "LPOP" "queues:snail:SendMessage"
1492446053.407700 [0 10.95.117.155:57132] "ZADD" "queues:snail:SendMessage:reserved" "1492446113" "{job}"
1492446053.463953 [0 10.95.117.155:57132] "ZREM" "queues:snail:SendMessage:reserved" "{job}"
PS:如果你的redis是codis的话,注意了,因为codis禁用方法列表
KEYS, MOVE, OBJECT, RENAME, RENAMENX, SORT, SCAN, BITOP,MSETNX, BLPOP, BRPOP, BRPOPLPUSH, PSUBSCRIBE,PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, DISCARD, EXEC, MULTI, UNWATCH, WATCH, SCRIPT EXISTS, SCRIPT FLUSH, SCRIPT KILL, SCRIPT LOAD, AUTH, ECHO, SELECT, BGREWRITEAOF, BGSAVE, CLIENT KILL, CLIENT LIST, CONFIG GET, CONFIG SET, CONFIG RESETSTAT, DBSIZE, DEBUG OBJECT, DEBUG SEGFAULT, FLUSHALL, FLUSHDB, INFO, LASTSAVE, MONITOR, SAVE, SHUTDOWN, SLAVEOF, SLOWLOG, SYNC, TIME
所以执行消费任务会有以下错误:
[Predis\Connection\ConnectionException]
Error while reading line from the server. [tcp://100.90.154.39:3000]
解决方法为修改config/queue.php
'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => 'default', 'expire' => null, //禁用即可 ],
优化日志处理:
如果你的系统有切割文件日志操作。会发现虽然日志被切分了,但程序却没有往新文件里写入。如,2017-04-18 13:50启动的项目,日志会一直打到 snail.log.2017041813上。
改进方案:
app/Jobs/Job.php文件中添加如下方法:
public function releaseLoggerFile(){ $handles=\Log::getMonolog()->getHandlers(); if(!is_array($handles) || empty($handles)){ return; } foreach($handles as $handle){ if(method_exists($handle, "close")){ $handle->close(); } } return; }在具体job实现类中的handle方法结尾添加:
public function handle() { ... $this->releaseLoggerFile(); //释放log文件 }
线上部署
创建任务shell
#!/bin/sh day=$(date +'%y%m%d') now=$(date +"%F %T") php_command="/home/niuyufu/php/bin/php" command="/home/niuyufu/webroot/snail_api/artisan" log="/home/niuyufu/webroot/log/wave" queue_name_arr=("snail:SendMessage" "snail:SendMessageToApp") case $1 in "run") for queue_name in ${queue_name_arr[*]} do count=$(ps aux | grep artisan |grep queue=${queue_name} | grep -v grep | wc -l) if [ ${count} -lt 1 ];then echo "${now}:${queue_name} process has exit ${count}\n"; nohup $php_command $command queue:work --queue=${queue_name} --tries=3 --memory=512 --daemon >> $log/queueMonitor.log 2>&1 & else echo "${now}:${queue_name} process is runing"; fi done ;; "stop") kill -9 $(ps -ef | grep "queue:work" | grep -v grep | awk '{print $2}' | tr -s '\n' ' ') echo ${now}."Queue process all stop"; ;; "list") ps -ef | grep "queue:work" | grep -v grep ;; *) echo " Usage: QueueMonitorCommandShell.sh [run|stop|list] " ;; esac
总结:
laravel这边的延迟队列使用了三个队列。
queue:default:delayed // 存储延迟任务
queue:default // 存储"生"任务,就是未处理任务
queue:default:reserved // 存储待处理任务
任务在三个队列中进行轮转,最后一定进入到queue:default:reserved,并且成功后把任务从这个队列中删除。
laravel5.1 使用了watch来控制队列的原子操作,但由于codis本身不支持 watch 方法。所以使用codis不能完全体验队列功能:延迟队列不支持、不支持数据重跑,对线上数据比较严格操作谨慎使用。
laravel5.3 之后redis队列 开始使用lua脚本支持的队列原子操作,它没有使用 watch multi等操作,所以如果线上codis 支持lua的话,可以完整体验到队列功能。
扫码二维码 获取免费视频学习资料
- 本文固定链接: http://phpxs.com/post/6711/
- 转载请注明:转载必须在正文中标注并保留原文链接
- 扫码: 扫上方二维码获取免费视频资料