Laravel中使用beanstalkd队列系统

Beanstalkd是一个高性能、轻量级的分布式内存队列系统,最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延迟,支持过有9.5百万用户的Facebook Causes应用。后来开源,现在有PostRank大规模部署和使用,每天处理百万级任务。国内使用者例如小米公司。

高性能离不开异步,异步离不开队列,而其内部都是Producer-Comsumer模式(生产者消费者模式)的原理。

Beanstalkd设计里面的核心概念:
job(任务)
一个需要异步处理的任务,是Beanstalkd中的基本单元,需要放在一个tube(任务队列)中。
tube(管道)
一个有名的任务队列,用来存储统一类型的job,是producer和consumer操作的对象。
producer (发布者)
Job的生产者,通过put命令来将一个job放到一个tube中。
consumer (消费者)
Job的消费者,通过reserve/release/bury/delete命令来获取job或改变job的状态。

安装
http://kr.github.io/beanstalkd/
beanstalkd-1.9.tar.gz
$ tar -zxvf beanstalkd-1.9.tar.gz
$ cd beanstalkd-1.9
$ make
$ ./beanstalkd -b data_dir
后台启动:

beanstalkd -l 地址 -p 端口号 -z 最大的任务大小(byte) -b 日志目录 &

如果是外部客户端连接,ip地址要写外网地址,这样才能连接上

默认端口11300

框架配置
composer.json

“require”: {
//加入下面代码
“pda/pheanstalk”: “2.*”
}

./composer.phar update

如果报错: Fatal error: Allowed memory size of 536870912 bytes exhausted ,请修改php.ini中的php内存限制
memory_limit = 1024M

测试
routes.php中添加任务
Route::get(‘/test’, function()
{
Queue::push(‘Test’, array(‘message’ => ‘test1’));
Queue::push(‘Test’, array(‘message’ => ‘test2’));
Queue::push(‘Test’, array(‘message’ => ‘test3’));
Queue::push(‘Test’, array(‘message’ => ‘test4’));

return ‘OK’;
});

用浏览器访问一下,这里队列中将有4个job等待处理

到models中,新建一个Test.php
class Test {
public function fire($job, $data)
{
//Job对象
//var_dump($job);

//Queue::push的第二个参数
//var_dump($data);

echo $data[‘message’].”\n”;
//sleep(1);

//处理完成,删除
$job->delete();

//将一个任务放回队列
//$job->release();
//指定几秒后将任务放回队列
//$job->release(5);

//如果任务在处理时发生异常,它会被自动放回队列。你可以使用 attempts 方法获取尝试处理任务的次数:
//获取尝试处理任务的次数//if ($job->attempts() > 3){}

//获取任务ID $job->getJobId();

}
}

到框架目录执行命令
php artisan queue:listen

Laravel包含了一个用于运行已推送到队列的任务的Artisan服务。可以使用 queue:listen 命令来运行这个功能。
一旦任务启动,它会一直运行除非你手动停止它。可以使用进程监视工具(例如 Supervisor)来确保队列监听器处于运行状态。

你也可以设置单个任务可以执行的最长时间(单位秒) php artisan queue:listen –timeout=60 你还可以指定新任务轮询之前所需要等待的秒数

php artisan queue:listen –sleep=5

将会看到如下处理结果
test1
Processed: Test
test2
Processed: Test
test3
Processed: Test
test4
Processed: Test

测试成功完成。

发表评论