程序员波特的个人博客

一个小而美的程序员编程资料站

0%

实操6、RabbitMQ的主题模式

一、生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.atguigu.rabbitmq.topic;  

import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

public static void main(String[] args) throws Exception {

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

String exchangeName = "test_topic";

channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);

String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";

channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);

// 绑定队列和交换机
// 参数1. queue:队列名称
// 参数2. exchange:交换机名称
// 参数3. routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
// routing key 常用格式:系统的名称.日志的级别。
// 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
channel.queueBind(queue2Name,exchangeName,"*.*");

// 分别发送消息到队列:order.info、goods.info、goods.error
String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";
channel.basicPublish(exchangeName,"order.info",null,body.getBytes());

body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";
channel.basicPublish(exchangeName,"goods.info",null,body.getBytes());

body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";
channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());

channel.close();
connection.close();

}

}

二、消费者代码

1、消费者1号

消费者1监听队列1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.atguigu.rabbitmq.topic;  

import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Consumer1 {

public static void main(String[] args) throws Exception {

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

String QUEUE_NAME = "test_topic_queue1";

channel.queueDeclare(QUEUE_NAME,true,false,false,null);

Consumer consumer = new DefaultConsumer(channel){

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("body:"+new String(body));

}

};

channel.basicConsume(QUEUE_NAME,true,consumer);

}

}

2、消费者2号

消费者2监听队列2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.atguigu.rabbitmq.topic;  

import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Consumer2 {

public static void main(String[] args) throws Exception {

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

String QUEUE_NAME = "test_topic_queue2";

channel.queueDeclare(QUEUE_NAME,true,false,false,null);

Consumer consumer = new DefaultConsumer(channel){

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("body:"+new String(body));

}

};

channel.basicConsume(QUEUE_NAME,true,consumer);

}

}

三、运行效果

队列1:

img

队列2:

images