程序员波特的个人博客

一个小而美的程序员编程资料站

0%

实操3、RabbitMQ的工作队列模式

一、生产者代码

1、封装工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.atguigu.rabbitmq.util;  

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {

public static final String HOST_ADDRESS = "192.168.200.100";

public static Connection getConnection() throws Exception {

// 定义连接工厂
ConnectionFactory factory = new ConnectionFactory();

// 设置服务地址
factory.setHost(HOST_ADDRESS);

// 端口
factory.setPort(5672);

//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("123456");

// 通过工程获取连接
Connection connection = factory.newConnection();

return connection;
}



public static void main(String[] args) throws Exception {

Connection con = ConnectionUtil.getConnection();

// amqp://guest@192.168.200.100:5672/
System.out.println(con);

con.close();

}

}

2、编写代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.atguigu.rabbitmq.work;  

import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {

public static final String QUEUE_NAME = "work_queue";

public static void main(String[] args) throws Exception {

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,true,false,false,null);

for (int i = 1; i <= 10; i++) {

String body = i+"hello rabbitmq~~~";

channel.basicPublish("",QUEUE_NAME,null,body.getBytes());

}

channel.close();

connection.close();

}

}

3、发送消息效果

img

二、消费者代码

1、编写代码

创建Consumer1和Consumer2。Consumer2只是类名和打印提示不同,代码完全一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.atguigu.rabbitmq.work;  

import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {

static final String QUEUE_NAME = "work_queue";

public static void main(String[] args) throws Exception {

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,true,false,false,null);

Consumer consumer = new DefaultConsumer(channel){

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("Consumer1 body:"+new String(body));

}

};

channel.basicConsume(QUEUE_NAME,true,consumer);

}

}

注意:运行的时候先启动两个消费端程序,然后再启动生产者端程序。

如果已经运行过生产者程序,则手动把work_queue队列删掉。

2、运行效果

最终两个消费端程序竞争结果如下:

image-20231103103841644


image-20231103103955165