RabbitMQ使用


RabbitMQ RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License。主页地址:https://www.rabbitmq.com/

本章内容


    • RabbitMQ 配置
    • RabbitMQ Java dome

RabbitMQ 配置


由于我的开发环境是mac环境,这里会详细介绍mac环境的RabbitMQ配置,linux和window配置简单描述

mac环境安装:

  • 系统已经自带了Erlang,所以只需要安装rabbitmq-server,可以使用工具brew或者macports安装,我是使用了macports
  • 安装命令:prot install rabbitmq-server

linux

  • RabbitMQ是基于Erlang的,所以首先必须配置Erlang环境。

  • 从Erlang的官网 http://www.erlang.org/download.html 下载最新的erlang安装包,我下载的版本是 otp_src_R14B03.tar.gz 。

  • 然后:

    $ tar xvzf otp_src_R14B03.tar.gz
    $ cd otp_src_R14B03
    $ ./configure

window安装

参考:[http://blog.csdn.net/a__java__a/article/details/17614797](http://blog.csdn.net/a__java__a/article/details/17614797)

管理工具

管理工具安装很简单,执行命令: rabbitmq-plugins enable rabbitmq_management

参考官方文档:http://www.rabbitmq.com/management.html

管理工具截图

java dome实现


下载 rabbitmq客户端包

  • 手动下载 地址:http://www.rabbitmq.com/download.html 里面有各种版本的rabbitmq clinet,根据需要选择,我这里选择了java。

  • maven下载

     <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>3.5.3</version>
    </dependency>
    

几个概念说明

Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

生产者实现


    package com.company;


    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;


    /**
     * Created by zteliuyw on 15/7/6.
     */
    public class Producer {

        //队列名称
        private final static String QUEUE_NAME = "hello";

        //生成数据到mq
        public void sendMsg(String msg) {

            /**
             * 创建连接连接到MabbitMQ
             */

            try {
                ConnectionFactory factory = new ConnectionFactory();
                //设置MabbitMQ所在主机ip或者主机名
                factory.setHost("localhost");
                //创建一个连接
                Connection  connection = factory.newConnection();
                //创建一个频道
                Channel  channel = connection.createChannel();
                //指定一个队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);

                //往队列中发出一条消息
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                System.out.println(" [x] Sent '" + msg + "'");
                //关闭频道和连接
                channel.close();
                connection.close();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }


        }
    }



消费者实现


    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
     * Created by zteliuyw on 15/7/6.
     */
    public class Consumer extends Thread {

        //队列名称
        private final static String QUEUE_NAME = "hello";

        private void recv()
        {
            //打开连接和创建频道,与发送端一样
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            try {
                connection = factory.newConnection();
                Channel channel = connection.createChannel();
                //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

                //创建队列消费者
                QueueingConsumer consumer = new QueueingConsumer(channel);
                //指定消费队列
                channel.basicConsume(QUEUE_NAME, true, consumer);
                while (true)
                {
                    //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    String message = new String(delivery.getBody());
                    System.out.println(" [x] Received '" + message + "'");
                }

            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

        @Override
        public void run() {
            recv();
        }

        public  void recvMsg(){
            start();
        }
    }




这里使用继承了Thread类实现多线程。

测试Dome ,使用生产者发送数据,消费者接受数据

2个java工程,一个是生产者工程,main声明一个生产者,然后丢一条hello world消息,实现如下:

    public static void main(String[] args) {
        Producer producer = new Producer();
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//设置日期格式
        producer.sendMsg("hello world "+ date);
    }

消费者的java工程,main方法启动一个消费者起一个线程接受消息,实现方法如下:

    public static void main(String[] args) {

        Consumer consumer = new Consumer();
        consumer.recvMsg();

    }

效果如图:左侧是生产者,右侧是消费者 效果如图:

Loading Disqus comments...
Table of Contents