文档中心 V3.0
Rabbitmq

MQ全称为Message Queue,消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ。

MQ的消费-生产者模型的一个典型的代表,一端往消息队列中不断的写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。

在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。

使用示例

接下来,我们将通过本教程介绍如何使用RabbitMQ服务。步骤包括:
1.在您的应用管理界面中创建绑定RabbitMQ服务。
2.代码示例。
在应用列表页面,点击某个应用,进入应用详情页。点击“服务详情”可以查看并添加当前支持的所有服务。创建RabbitMQ步骤如下:
1、选择RabbitMQ,输入服务名称,点击创建按钮

rabbit-1.png

2、创建成功后,将在您的服务列表中显示出数据库RabbitMQ。

rabbit-2.png

3、点击绑定按钮,使服务RabbitMQ与当前应用绑定。

rabbit-3.png

4、点击管理按钮进入数据库详情页,显示了数据库的各项详细信息。

rabbit-4.png

代码示例

Java: 写一个简单的生产者与消费者代码 (1)定义全局配置常量

package com.hascode.tutorial.config;
public class Configuration {
public static final String USERNAME = "guest";
public static final String PASSWORD = "guest";
public static final String HOSTNAME = "localhost";
public static final int PORT = 5672;
}

(2)开始写生产者。将消息产生出来抛到消息队列里面

package com.hascode.tutorial.client;
import java.io.IOException;
import com.hascode.tutorial.config.Configuration;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class SampleProducer implements Runnable {
private final String message;
private static final String EXCHANGE_NAME = "test";
private static final String ROUTING_KEY = "test";
public SampleProducer(final String message) {
// TODO Auto-generated constructor stub
this.message = message;
}
@Override
public void run() {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(Configuration.USERNAME);
factory.setPassword(Configuration.PASSWORD);
factory.setHost(Configuration.HOSTNAME);
factory.setPort(Configuration.PORT);
Connection conn;
try{
conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);//EXCHANGE 定义交换机
String queueName = channel.queueDeclare().getQueue();//message-queue得到消息队列
channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);//route-bind 定义类似路由器的东西路由交换机VS队列
System.out.println("生产者:" + message + " in thread:" + Thread.currentThread().getName());
//publish / sub 生产者的作用就是将消息推送到消息队列里面去 实现类似于publish的功能
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());//将消息丢到队列里面。只是内容。这个消息的格式是否可以自行定义?比如定义一个JSON包或XML的包?比如一条发布的内容加一个代号?后台消费者会依据这个代号调用相应的号码进行处理掉?
channel.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

(3)有了生产者之后写消费者

package com.hascode.tutorial.client;
import java.io.IOException;
import com.hascode.tutorial.config.Configuration;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
//开始消费
public class SampleConsumer implements Runnable {
private static final String EXCHANGE_NAME = "test";
private static final String ROUTING_KEY = "test";
private static final boolean NO_ACK = false;
@Override
public void run() {
// TODO Auto-generated method stub
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(Configuration.USERNAME);
factory.setPassword(Configuration.PASSWORD);
factory.setHost(Configuration.HOSTNAME);
factory.setPort(Configuration.PORT);
Connection conn;
try{
conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);//EXCHANGE
String queueName = channel.queueDeclare().getQueue();//message-queue
channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, NO_ACK, consumer);
while (true) {
//开始消费。可以理解为一个快递员将信件从队列里面取出来给消费者用
QueueingConsumer.Delivery delivery;
try {
delivery = consumer.nextDelivery();
System.out.println("消费者: " + new String(delivery.getBody()) + " in thread: " + Thread.currentThread().getName());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (InterruptedException ie) {
continue;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

(4)调用主方法

package com.hascode.tutorial.client;
public class Main {
public static void main(String[] args) {
for (int i = 0; i < 6; i++) {
final String messageString = "This is message numero " + i;
SampleProducer producer = new SampleProducer(messageString);//不断地生产新的数据元素放到队列里面去
new Thread(producer).start();
}
Thread consumerThread = new Thread(new SampleConsumer());
consumerThread.start();
}
}

PHP: 写一个简单的生产者与消费者代码 (1)生产者:

<?php
$params = array('host' =>'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/');
$cnn = new AMQPConnect($params);
// declare Exchange
$exchange = new AMQPExchange($cnn);
$exchange->declare('ex1', 'topic', AMQP_DURABLE );
// declare Queue
$queue = new AMQPQueue($cnn);
$queue->declare('queue1', AMQP_DURABLE);
// bind Queue
$queue->bind('ex1','wei.#');
// publishing
$msg = "msg";
for ($i=0; $i < 100; $i++) {
$res = $exchange->publish($i . 'msg', 'wei.' . $i);
if ($res) {
echo $i . 'msg' . " Yes\n";
} else {
echo $i . 'msg' . " No\n";
}
}
?>

(2)消费者:

<?php
$params = array('host' =>'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/');
$cnn = new AMQPConnect($params);
// create the Queue
$queue = new AMQPQueue($cnn, 'queue1');
$queueMessages = $queue->consume(100);
foreach($queueMessages as $item) {
echo "$i.$item\n";
}
?>