RabbitMQ使用
RabbitMQ RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License。主页地址:https://www.rabbitmq.com/
本章内容
-
- RabbitMQ 配置
-
- RabbitMQ Java dome
RabbitMQ 配置
由于我的开发环境是mac环境,这里会详细介绍mac环境的RabbitMQ配置,linux和window配置简单描述
mac环境安装:
- 系统已经自带了Erlang,所以只需要安装rabbitmq-server,可以使用工具brew或者macports安装,我是使用了macports
- 安装命令:prot install rabbitmq-server
linux
-
RabbitMQ是基于Erlang的,所以首先必须配置Erlang环境。
-
从Erlang的官网 http://www.erlang.org/download.html 下载最新的erlang安装包,我下载的版本是 otp_src_R14B03.tar.gz 。
-
然后:
$ tar xvzf otp_src_R14B03.tar.gz
$ cd otp_src_R14B03
$ ./configure
window安装
参考:[http://blog.csdn.net/a__java__a/article/details/17614797](http://blog.csdn.net/a__java__a/article/details/17614797)
管理工具
管理工具安装很简单,执行命令: rabbitmq-plugins enable rabbitmq_management
参考官方文档:http://www.rabbitmq.com/management.html
java dome实现
下载 rabbitmq客户端包
-
手动下载 地址:http://www.rabbitmq.com/download.html 里面有各种版本的rabbitmq clinet,根据需要选择,我这里选择了java。
-
maven下载
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.5.3</version> </dependency>
几个概念说明
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
生产者实现
package com.company;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created by zteliuyw on 15/7/6.
*/
public class Producer {
//队列名称
private final static String QUEUE_NAME = "hello";
//生成数据到mq
public void sendMsg(String msg) {
/**
* 创建连接连接到MabbitMQ
*/
try {
ConnectionFactory factory = new ConnectionFactory();
//设置MabbitMQ所在主机ip或者主机名
factory.setHost("localhost");
//创建一个连接
Connection connection = factory.newConnection();
//创建一个频道
Channel channel = connection.createChannel();
//指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//往队列中发出一条消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println(" [x] Sent '" + msg + "'");
//关闭频道和连接
channel.close();
connection.close();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
消费者实现
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created by zteliuyw on 15/7/6.
*/
public class Consumer extends Thread {
//队列名称
private final static String QUEUE_NAME = "hello";
private void recv()
{
//打开连接和创建频道,与发送端一样
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = null;
try {
connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//创建队列消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消费队列
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true)
{
//nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void run() {
recv();
}
public void recvMsg(){
start();
}
}
这里使用继承了Thread类实现多线程。
测试Dome ,使用生产者发送数据,消费者接受数据
2个java工程,一个是生产者工程,main声明一个生产者,然后丢一条hello world消息,实现如下:
public static void main(String[] args) {
Producer producer = new Producer();
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//设置日期格式
producer.sendMsg("hello world "+ date);
}
消费者的java工程,main方法启动一个消费者起一个线程接受消息,实现方法如下:
public static void main(String[] args) {
Consumer consumer = new Consumer();
consumer.recvMsg();
}
效果如图:左侧是生产者,右侧是消费者