• content
    {:toc}

消息队列。

消息队列

what?

对比

ActiveMQ、RabbitMQ、Redis、ZeroMQ、Jafka/kafka

RabbitMQ

安装

官网

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

public class Send {
public static String QUEUE_NAME = "queue";

public static void main(String[] args) {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("127.0.0.1");
try {
Connection conn = cf.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String mess = "helloworld across application !";
channel.basicPublish("", QUEUE_NAME,null, mess.getBytes());

System.out.println("mess:::"+mess);
channel.close();
conn.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

//----------------------------------------------
public class Recv {
public static void main(String[] args) {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("127.0.0.1");
try {
Connection conn = cf.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
System.out.println("wait for message,exit for Ctrl+c");

QueueingConsumer qc = new QueueingConsumer(channel);
channel.basicConsume(Send.QUEUE_NAME, true,qc);
while(true){
QueueingConsumer.Delivery deli = qc.nextDelivery();
String mess = new String(deli.getBody());
System.out.println("Received:::"+mess);
}

} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (ShutdownSignalException e) {
e.printStackTrace();
} catch (ConsumerCancelledException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

组件

Broker

中间人,中间件

消息队列服务器的实体

Exchange

交换,交易,交换机

接收消息,转发消息到绑定的队列上,指定消息按照什么规则,路由到那个队列上,

一个exchange可以绑定多个队列,一个队列可以被多个Exchange绑定。

常用Exchange类型:

  • direct:转发消息到routingkey指定的队列,完全根据key进行投递的叫做Direct交换机。
  • topic:按照规则转发消息,对key进行模式匹配后,进行投递的叫做Topic交换机,符号#,匹配一个或多个,*匹配正好一个词。
  • fanout:转发消息到所有绑定的队列,采用广播模式,一个消息进来,投递到该机绑定的所有队列。

Queue

消息队列的载体,用来存储消息。每个消息都会被投入到一个或多个队列,

Binding

绑定

用于把Exchange和Queue按照路由规则绑定。

RoutingKey

路由关键字

Exchange根据这个关键字进行投递。

Producter

消息生产者

产生消息的程序

Consumer

消息消费者

接受消息的程序

Channel

消息通道,

客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

使用流程

  1. 客户端连接到消息队列服务器,打开一个channel
  2. 客户端声明一个Exchange,并设置相关属性
  3. 客户端声明一个queue,并设置相关属性
  4. 客户端使用routing key,在Exchange和Queue之间建立绑定关系
  5. 客户端投递消息到Exchange
  6. Exchange接受到消息后,根据key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

使用模式

单机模式、集群模式、镜像模式。