一般的队列系统,是指linux中的crontab定时启动脚本来处理任务:
首先下载一个rabbitmq的客户端,他相当于一个容器,装排队数据的容器
http://www.rabbitmq.com/download.html
默认的端口是55672 访问地址http://127.0.0.1:55672/
默认帐号密码 guest guest
你可以看到rabbitmq 的管理界面
mq的任务是一个不浪费资源,的一个队列系统!
php使用需要下载一个amqp扩展,或者直接点击下面的地址找到适合自己的版本,下载
http://pecl.php.net/package/amqp/1.4.0/windows
根据当前使用的php版本选择相应的扩展dll,下载后是一个压缩包,里面有两个dll扩展(php_amqp.dll和rabbitmq.1.dll)。
队列开启的过程:
rabbitmq.1.dll 放在C盘windows下
php_amqp.dll 放入php扩展中
开启php_amqp.dll的引用
详细步骤如下:
1.将rabbitmq.1.dll文件放在php的根目录里(也就是ext目录的父级目录),然后修改apache的httpd.con文件,文件尾部添加一行,这里的路径根据情况修改,我这里使用的wampserver软件。
1
|
LoadFile "d:/wamp/bin/php/php5.5.12/rabbitmq.1.dll"
|
2.将php_amqp.dll放在php的ext目录里,然后修改php.ini文件,在文件的最后面添加两行
1
2
|
[amqp] extension=php_amqp.dll |
3.重启apache,并查看phpinfo信息。只要看到amqp 字样即可。
首先是rabbitmq的生产者:
生产者的逻辑:创建连接-->创建channel-->创建交换机对象-->发送消息
创建第一个index文件:然后去mq中查看,如果添加一个test001的队列名信息,就说明已经添加进去了,xx22的信息已经在mq中存储!
接下来就需要跑数据了。
createQueue(array('xxx','2222'),'test001');
echo "ok";
function createQueue($message,$queueName,$exchangeName = '', $queueKey = '')
{
$queueName = self::getQueueName($queueName);
$conn_args = array('host' =>'localhost', 'port'=> '5672',
'login' =>'guest', //mq帐号
'password'=> '', //mq密码
'vhost' => '/');
$conn = new AMQPConnection($conn_args);
$conn->connect();
$channel = new AMQPChannel($conn);
if (!$exchangeName) {
$exchangeName = $queueName;
}
$queueName = $queueName;
if (!$queueKey) {
$queueKey = $queueName;
}
$ex = new AMQPExchange($channel);
$ex->setName($exchangeName);
$ex->setType(AMQP_EX_TYPE_TOPIC);
$ex->setFlags(AMQP_DURABLE); //exchange持久化
$ex->declareExchange();
$q = new AMQPQueue($channel);
$q->setName($queueName);
$q->setFlags(AMQP_DURABLE); //queue持久化
$q->declareQueue();
$q->bind($exchangeName, $queueKey);
$channel->startTransaction();
/**
* 消息持久化,delivery_mode:2持久化、delivery_mode:1非持久化,其中priority是设置消息的优先级,测试中发现并未起作用。
* 消息还有其他属性,请参考http://www.php.net/manual/zh/amqpexchange.publish.php
*/
$result = $ex->publish(json_encode($message), $queueKey, AMQP_NOPARAM, array('delivery_mode'=>2, 'priority'=> 9));
$channel->commitTransaction();
$conn->disconnect();
}
有了生产者,那就有消费者。
脚本如果没有其他的修改或问题,基本上都是常年启动的:
消费者逻辑:创建连接-->创建channel-->创建交换机-->创建队列-->绑定交换机/队列/路由键-->接收消息
消费者基类:
class WorkerCommand{
function qInit($q_name,$e_name='',$k_route=''){
$q_name = Utils::getQueueName($q_name);
$conn_args = array(
'host' => '127.0.0.1', //mq的配置
'port' => '5672',
'login' => 'guest',
'password' => 'huoxingxing',
'vhost' => '/'
);
//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
//创建交换机
$ex = new AMQPExchange($channel);
if (!$e_name) {
$e_name = $q_name;
}
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
// echo "Exchange Status:" . $ex->declareExchange() . "\n";
//创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
// echo "Message Total:" . $q->declareExchange() . "\n";
if (!$k_route) {
$k_route = $q_name;
}
//绑定交换机与队列,并指定路由键
// echo 'Queue Bind: ' . $q->declareQueue($e_name, $k_route) . "\n";
//阻塞模式接收消息
echo "Message:\n";
while (True) {
$q->consume(array($this,'processMessage'));
//$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
}
$conn->disconnect();
}
}
消费者:
class WorkerWareSyncBackUpCommand extends WorkerCommand {
function actionIndex()
{
$this->qInit('SyncWareBackup');
}
function processMessage($envelope, $queue)
{
$msg = json_decode($envelope->getBody());
Utils::doBackUp('back',$msg,'');
$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}
}
相关推荐
rabbitmq3.8.5 和下面版本都有一切区别,首先就是erlang语言版本的区别,但还好用的是openssl1.0,...一般centos7都是装的openssl1.0版本,该压缩包,给了详细的安装文档,稍微区别于3.7和3.6,已经增加了延迟队列扩展
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展...
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展...
前言 为什么使用RabbitMq而不是ActiveMq或者RocketMq? 首先,从业务上来讲,我并不要求消息的100%接受率,...php扩展地址: http://pecl.php.net/package/amqp 具体以官网为准 http://www.rabbitmq.com/getstarted.h
主要介绍了使用PHP访问RabbitMQ消息队列的方法,结合实例形式分析了RabbitMQ消息队列的相关扩展安装、队列建立、队列绑定、消息发送、消息接收等相关操作技巧,需要的朋友可以参考下
本文实例讲述了PHP基于rabbitmq操作类的生产者和消费者功能。分享给大家供大家参考,具体如下: 注意事项: 1、accept.php消费者代码需要在命令行执行 2、'username'=>'asdf','password'=>'123456' 改成自己的帐号和...
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展...
在RabbitMQ中,一些核心概念包括生产者(发送消息的应用)、消费者(接收消息的应用)、队列(存储消息的缓存)、消息(由生产者通过RabbitMQ发送给消费者的信息)、连接(连接RabbitMQ和应用服务器的TCP连接)以及...
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展...
php扩展 Yes Yes(弱 bug) 性能对比图 Item RabbitMQ Beanstalkd QPS(多连SET) 106 930 QPS(单连多SET) 6210 3240 注1:从目前测试结果来看,在建立连接+队列操作+销毁连接这个过程beanstalkd效率还是比较高的。但是...
如果由于服务器宕机等严重问题,那么我们就需要手工进行ACK,保障消费端消费成功,比如我们在消费端调用第三方接口,但这个第三方接口出现了问题导致接口调用失败,这时我们就要使消息重回队列 消费端的重回队列 ...
Yii2 Queue Extension是用于通过队列异步运行任务的扩展。 它支持基于DB,Redis,RabbitMQ,Beanstalk和Gearman的队列。 文档位于docs / guide / README.md。 安装p Yii2 Queue Extension是用于通过队列异步运行任务...
Yii2队列扩展, 支持DB,Redis,RabbitMQ,Beanstalk和Gearman
您应该在要将错误消息记录到rabbitmq的所有项目上使用此扩展名。 ConsumerExtension用于从队列中获取消息,并可以选择使用原则2将其保存到数据库中。它还提供了网格以显示已保存的消息。 您应该将此扩展程序与内部...
Yii2队列扩展通过队列异步运行任务的扩展。 它支持基于DB , Redis , RabbitMQ , AMQP , Beanstalk , ActiveMQ和Gearman的队列。 文档位于 。安装安装此扩展的首选方法是通过 : php composer.phar require --...
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展...
PhalApi 2.x 扩展类库 - Redis在PHP开发中运用场景已经无处不在,小到简单缓存大到数据库或消息队列都可以使用Redis来进行实现,基于PhalApi2的出世,PhalApi2-Redis也紧接着进行了本次适配来提供更好的开发体验,...
基于 `Swoole 4.5+` 实现的高性能、高灵活性的 PHP 协程框架,内置协程服务器及大量常用的组件,性能较传统基于 `PHP-FPM` 的框架有质的提升,提供超高性能的同时,也保持着极其灵活的可扩展性,标准组件均基于 [PSR...