一、生产者代码
1、封装工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package com.atguigu.rabbitmq.util; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtil { public static final String HOST_ADDRESS = "192.168.200.100"; public static Connection getConnection() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_ADDRESS); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("123456"); Connection connection = factory.newConnection(); return connection; } public static void main(String[] args) throws Exception { Connection con = ConnectionUtil.getConnection(); System.out.println(con); con.close(); } }
|
2、编写代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.atguigu.rabbitmq.work; import com.atguigu.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { public static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); for (int i = 1; i <= 10; i++) { String body = i+"hello rabbitmq~~~"; channel.basicPublish("",QUEUE_NAME,null,body.getBytes()); } channel.close(); connection.close(); } }
|
3、发送消息效果

二、消费者代码
1、编写代码
创建Consumer1和Consumer2。Consumer2只是类名和打印提示不同,代码完全一样。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package com.atguigu.rabbitmq.work; import com.atguigu.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Consumer1 body:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
|
注意:运行的时候先启动两个消费端程序,然后再启动生产者端程序。
如果已经运行过生产者程序,则手动把work_queue队列删掉。
2、运行效果
最终两个消费端程序竞争结果如下:

