image-20221121105538734

image-20221121105600068

MQ的基本概念

MQ概述

image-20221121105832289

image-20221121105915985

image-20221121105942854

MQ的优势和劣势

image-20221121110024682

MQ的优势

image-20221121110110470

应用解耦

image-20221121110244343

异步提速

image-20221121110426268

image-20221121110547394

削峰填谷

image-20221121110654988

image-20221121110801838

image-20221121110822179

image-20221121110836607

MQ的劣势

image-20221121110954361

image-20221121111128954

常见的MQ产品

image-20221121111308058

image-20221121111500127

RabbitMQ简介

image-20221121111633880

image-20221121111913537

image-20221121112054942

image-20221121112142075

image-20221121112205134

JMS

image-20221121112336951

image-20221121112403408

RabbitMQ的安装和配置

image-20221121112533222

以下操作均基于centos7.9

1. 安装依赖环境

在线安装依赖环境:

1
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

2. 安装Erlang

上传文件

erlang-18.3-1.el7.centos.x86_64.rpm
socat-1.7.3.2-5.el7.lux.x86_64.rpm
rabbitmq-server-3.6.5-1.noarch.rpm

image-20221121205254932

1
2
# 安装
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

image-20221121205507059

3. 安装RabbitMQ

1
2
3
4
5
6
# 安装
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm

# 安装
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

安装socat报如下错误

image-20221121205656732

执行

1
rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm --force --nodeps

继续安装rabbitmq

image-20221121210025465

4. 开启管理界面及配置

1
2
3
4
5
6
# 开启管理界面
rabbitmq-plugins enable rabbitmq_management
# 修改默认配置信息
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
# 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest

image-20221121212652079

image-20221121212939036

5. 启动

1
2
3
service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务

image-20221121213759365

  • 设置配置文件
1
2
cd /usr/share/doc/rabbitmq-server-3.6.5/
cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

image-20221121214928717

image-20221121215131342

重启使配置文件生效

1
service rabbitmq-server restart # 重启服务

image-20221121220435374

6. 配置虚拟主机及用户

6.1. 用户角色

RabbitMQ在安装好后,可以访问http://ip地址:15672 ;其自带了guest/guest的用户名和密码;如果需要创建自定义用户;那么也可以登录管理界面后,如下操作:

image-20221121213949492

image-20221121214027340

1565098315375

角色说明

1、 超级管理员(administrator)

可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

2、 监控者(monitoring)

可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

3、 策略制定者(policymaker)

可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

4、 普通管理者(management)

仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

5、 其他

无法登陆管理控制台,通常就是普通的生产者和消费者。

6.2. Virtual Hosts配置

像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头。

6.2.1. 创建Virtual Hosts

1565098496482

6.2.2. 设置Virtual Hosts权限

1565098585317

1565098719054

RabbitMQ快速入门

入门程序

image-20221121220821760

image-20221122140515633

装备两个maven模块,并导入依赖

image-20221122140631360

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

image-20221122141021201

生产者

image-20221204193633316

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.itheima.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.60.45.18");//ip 默认值localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itheima");//虚拟机名 默认值/
factory.setUsername("heima");//用户名 默认值guest
factory.setPassword("heima");//密码 默认值guest
//3.创建连接connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1.queue:队列名称
2.durable:是否持久化,当mq重启之后,还在
3.exclusive:
*是否独占
*当connection关闭时,是否删除队列
*
4.autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5.argument:参数。
*/
//如果没有一个名字叫hello——world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("hello_world",true,false,false,null);
/*
basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
参数:
1.exchange:交换机名称。简单模式下交换机使用默认的""
2.routingKey:路由名称
3.props:配置信息
4.body:发送消息

*/
String body="hello world";
//6.发送消息
channel.basicPublish("","hello_world",null,body.getBytes());
//7.释放资源
channel.close();
connection.close();
}
}

观察执行前队列情况

image-20221204193724061

执行后

image-20221204194438282

image-20221204194510687

image-20221204194554245

取消关闭通道

1
2
//        channel.close();
// connection.close();

image-20221204194848060

image-20221204194909691

image-20221204194935117

image-20221204195047424

消费者

image-20221204202427217

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.itheima.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.60.45.18");//ip 默认值localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itheima");//虚拟机名 默认值/
factory.setUsername("heima");//用户名 默认值guest
factory.setPassword("heima");//密码 默认值guest
//3.创建连接connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1.queue:队列名称
2.durable:是否持久化,当mq重启之后,还在
3.exclusive:
*是否独占
*当connection关闭时,是否删除队列
*
4.autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5.argument:参数。
*/
//如果没有一个名字叫hello——world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("hello_world",true,false,false,null);

/*
basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)
参数:
1.queue:队列名称
2.autoAck:是否自动确认
3.callback:回调对象
*/

//接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,自动执行该方法
1.consumerTag:标识
2.envelope:获取一些信息,交换机,路由key
3.properties:配置信息
4.body:数据
*/

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("hello_world",true,defaultConsumer);
// 不需要关闭资源
}
}

执行

image-20221204202526853

image-20221204202640354

小结

image-20221204202755561

RabbitMQ的工作模式

Work queues工作队列模式

模式说明

image-20221205141937759

代码编写

image-20221205145440203

生产者

image-20221205144418837

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.itheima.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer_WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.60.45.18");//ip 默认值localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itheima");//虚拟机名 默认值/
factory.setUsername("heima");//用户名 默认值guest
factory.setPassword("heima");//密码 默认值guest
//3.创建连接connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1.queue:队列名称
2.durable:是否持久化,当mq重启之后,还在
3.exclusive:
*是否独占
*当connection关闭时,是否删除队列
*
4.autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5.argument:参数。
*/
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues",true,false,false,null);
/*
basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
参数:
1.exchange:交换机名称。简单模式下交换机使用默认的""
2.routingKey:路由名称
3.props:配置信息
4.body:发送消息
*/
for (int i=1;i<=10;i++){
String body=i+"hello world";
//6.发送消息
channel.basicPublish("","work_queues",null,body.getBytes());
}
//7.释放资源
channel.close();
connection.close();
}
}

消费者

image-20221205144622631

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.itheima.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_WorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.60.45.18");//ip 默认值localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itheima");//虚拟机名 默认值/
factory.setUsername("heima");//用户名 默认值guest
factory.setPassword("heima");//密码 默认值guest
//3.创建连接connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1.queue:队列名称
2.durable:是否持久化,当mq重启之后,还在
3.exclusive:
*是否独占
*当connection关闭时,是否删除队列
*
4.autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5.argument:参数。
*/
//如果没有一个名字叫hello——world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues",true,false,false,null);

/*
basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)
参数:
1.queue:队列名称
2.autoAck:是否自动确认
3.callback:回调对象
*/

//接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,自动执行该方法
1.consumerTag:标识
2.envelope:获取一些信息,交换机,路由key
3.properties:配置信息
4.body:数据
*/

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("work_queues",true,defaultConsumer);
// 不需要关闭资源
}
}

开启两个消费者

image-20221205145007003

image-20221205145039210

生产者生产消息

image-20221205145218170

image-20221205145228797

image-20221205145240239

小结

image-20221205145502724

Pub/Sub 订阅模式

模式说明

image-20221205145626622

代码编写

生产者

image-20221205152434977

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.itheima.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer_PubSub {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.60.45.18");//ip 默认值localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itheima");//虚拟机名 默认值/
factory.setUsername("heima");//用户名 默认值guest
factory.setPassword("heima");//密码 默认值guest
//3.创建连接connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
1.exchange:交换机名称
2.type:交换机类型
DIRECT("direct"),定向
FANOUT("fanout"),扇形(广播),发送到每一个队列
TOPIC("topic"),通配符的方式
H EADERS("headers");参数匹配
3.durable:是否持久化
4.autoDelete:自动删除
5.internal:内部使用。一般为false
6.arguments:参数
*/
String exchangeName="test_fanout";
//5.创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//6.创建队列
String queue1Name="test_fanout_queue1";
String queue2Name="test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7.绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1.queue:队列名称
2.exchange:交换机名称
3.routingKey:路由键,绑定规则
如果交换机的类型为fanout,routingKey设置为""
*/

channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
//8.发送消息
String body="日志信息:张三调用了findAll方法...日志级别:info...";
channel.basicPublish(exchangeName,"",null,body.getBytes());
//9.释放资源
channel.close();
connection.close();
}
}

image-20221205152548465

image-20221205152618521

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.itheima.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_PubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.60.45.18");//ip 默认值localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itheima");//虚拟机名 默认值/
factory.setUsername("heima");//用户名 默认值guest
factory.setPassword("heima");//密码 默认值guest
//3.创建连接connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();

String queue1Name="test_fanout_queue1";
String queue2Name="test_fanout_queue2";


//接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,自动执行该方法
1.consumerTag:标识
2.envelope:获取一些信息,交换机,路由key
3.properties:配置信息
4.body:数据
*/

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印控制台");
}
};
channel.basicConsume(queue1Name,true,defaultConsumer);
// 不需要关闭资源
}
}

启动

image-20221206114307998

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.itheima.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_PubSub2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.60.45.18");//ip 默认值localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itheima");//虚拟机名 默认值/
factory.setUsername("heima");//用户名 默认值guest
factory.setPassword("heima");//密码 默认值guest
//3.创建连接connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();

String queue1Name="test_fanout_queue1";
String queue2Name="test_fanout_queue2";


//接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,自动执行该方法
1.consumerTag:标识
2.envelope:获取一些信息,交换机,路由key
3.properties:配置信息
4.body:数据
*/

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
System.out.println("将日志信息存至数据库");
}
};
channel.basicConsume(queue2Name,true,defaultConsumer);
// 不需要关闭资源
}
}

image-20221206114831633

Routing路由模式

模式说明

image-20221206114958949

代码编写

生产者

image-20221206203527592

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.itheima.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer_Routing {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.60.45.18");//ip 默认值localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itheima");//虚拟机名 默认值/
factory.setUsername("heima");//用户名 默认值guest
factory.setPassword("heima");//密码 默认值guest
//3.创建连接connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
1.exchange:交换机名称
2.type:交换机类型
DIRECT("direct"),定向
FANOUT("fanout"),扇形(广播),发送到每一个队列
TOPIC("topic"),通配符的方式
H EADERS("headers");参数匹配
3.durable:是否持久化
4.autoDelete:自动删除
5.internal:内部使用。一般为false
6.arguments:参数
*/
String exchangeName="test_direct";
//5.创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//6.创建队列
String queue1Name="test_direct_queue1";
String queue2Name="test_direct_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7.绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1.queue:队列名称
2.exchange:交换机名称
3.routingKey:路由键,绑定规则
如果交换机的类型为fanout,routingKey设置为""
*/

//队列1绑定error
channel.queueBind(queue1Name,exchangeName,"error");
//队列2绑定 info,error,warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
//8.发送消息
String body="日志信息:张三调用了findAll方法...日志级别:info...";
channel.basicPublish(exchangeName,"info",null,body.getBytes());
//9.释放资源
channel.close();
connection.close();
}
}

执行后观察队列

image-20221206203658772

image-20221206203934543

消费者1 控制台打印info级别信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.itheima.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_Routing2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.60.45.18");//ip 默认值localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itheima");//虚拟机名 默认值/
factory.setUsername("heima");//用户名 默认值guest
factory.setPassword("heima");//密码 默认值guest
//3.创建连接connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();

String queue1Name="test_direct_queue1";
String queue2Name="test_direct_queue2";


//接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,自动执行该方法
1.consumerTag:标识
2.envelope:获取一些信息,交换机,路由key
3.properties:配置信息
4.body:数据
*/

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印控制台");
}
};
channel.basicConsume(queue2Name,true,defaultConsumer);
// 不需要关闭资源
}
}

image-20221206205248833

消费者2 数据库保存error级别信息

队列无信息消费

image-20221206205346657

生产者 生产一个error级别信息

image-20221206205540267

观察消费者

image-20221206205601884

image-20221206205624051

Topics通配符模式

模式说明

image-20221208164240030

image-20221208165019397

代码编写

生产者

image-20221208170353450

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.itheima.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer_Topics {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.60.45.18");//ip 默认值localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itheima");//虚拟机名 默认值/
factory.setUsername("heima");//用户名 默认值guest
factory.setPassword("heima");//密码 默认值guest
//3.创建连接connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
1.exchange:交换机名称
2.type:交换机类型
DIRECT("direct"),定向
FANOUT("fanout"),扇形(广播),发送到每一个队列
TOPIC("topic"),通配符的方式
H EADERS("headers");参数匹配
3.durable:是否持久化
4.autoDelete:自动删除
5.internal:内部使用。一般为false
6.arguments:参数
*/
String exchangeName="test_topic";
//5.创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
//6.创建队列
String queue1Name="test_topic_queue1";
String queue2Name="test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7.绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1.queue:队列名称
2.exchange:交换机名称
3.routingKey:路由键,绑定规则
如果交换机的类型为fanout,routingKey设置为""
*/
//routing key 系统的名称.日志的级别
//需求:所有error级别的日志存入数据库,所有order系统的日志存入数据库
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");

channel.queueBind(queue2Name,exchangeName,"*.*");
//8.发送消息
String body="日志信息:张三调用了findAll方法...日志级别:info...";
channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
//9.释放资源
channel.close();
connection.close();
}
}

执行

image-20221208170515998

queue1

image-20221208170622337

queue2

image-20221208170645324

消费者

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.itheima.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_Topics1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.60.45.18");//ip 默认值localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itheima");//虚拟机名 默认值/
factory.setUsername("heima");//用户名 默认值guest
factory.setPassword("heima");//密码 默认值guest
//3.创建连接connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();

String queue1Name="test_topic_queue1";
String queue2Name="test_topic_queue2";


//接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,自动执行该方法
1.consumerTag:标识
2.envelope:获取一些信息,交换机,路由key
3.properties:配置信息
4.body:数据
*/

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
System.out.println("将日志信息保存数据库");
}
};
channel.basicConsume(queue1Name,true,defaultConsumer);
// 不需要关闭资源
}
}

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.itheima.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_Topics2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.60.45.18");//ip 默认值localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itheima");//虚拟机名 默认值/
factory.setUsername("heima");//用户名 默认值guest
factory.setPassword("heima");//密码 默认值guest
//3.创建连接connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();

String queue1Name="test_topic_queue1";
String queue2Name="test_topic_queue2";


//接收消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,自动执行该方法
1.consumerTag:标识
2.envelope:获取一些信息,交换机,路由key
3.properties:配置信息
4.body:数据
*/

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印控制台");
}
};
channel.basicConsume(queue2Name,true,defaultConsumer);
// 不需要关闭资源
}
}

分别执行

image-20221208171038109

image-20221208171048134

生产者修改routingkey,并发送消息

image-20221208171156831

观察消费者1无法收到消息,消费者2成功收到消息

image-20221208171302629

image-20221208171311532

生产者再次修改routingkey,并发送消息

image-20221208171354682

image-20221208171418993

image-20221208171443366

小结

image-20221208171650285

工作模式总结

image-20221208171719456

Spring整合RabbitMQ

image-20221209134437892

生产者

image-20221213145954907

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

编写配置文件

image-20221213153936650

rabbitmq.properties

1
2
3
4
5
rabbitmq.host=123.60.45.18
rabbitmq.port=5672
rabbitmq.username=heima
rabbitmq.password=heima
rabbitmq.virtual-host=/itheima

spring-rabbitmq-producer.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:/rabbitmq.properties"/>

<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>

<!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机
默认交换机类型为direct,名字为:"",路由键为队列的名称
-->

<!--
id:bean的名称
name:queue的名称
auto-declare:自动创建
auto-delete:自动删除,最后一个消费者和该队列断开后,自动删除队列
exclusive:是否独占
durable:是否持久化
-->

<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>

<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播;所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>

<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>

<!--定义广播类型交换机;并绑定上述两个队列-->
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1"/>
<rabbit:binding queue="spring_fanout_queue_2"/>
</rabbit:bindings>
</rabbit:fanout-exchange>

<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符;*匹配一个单词,#匹配多个单词 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>

<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="heima.*" queue="spring_topic_queue_star"/>
<rabbit:binding pattern="heima.#" queue="spring_topic_queue_well"/>
<rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/>
</rabbit:bindings>
</rabbit:topic-exchange>

<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>

测试方法

image-20221213154113505

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.itheima;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.RabbitAccessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {

@Autowired
private RabbitTemplate rabbitTemplate;


@Test
public void testHelloWorld(){
//2.发送消息
rabbitTemplate.convertAndSend("spring_queue","hello world spring");
}

/*
发送fanout消息
*/
@Test
public void testFanout(){
rabbitTemplate.convertAndSend("spring_fanout_exchange","","spring fanout...");
}

/*
发送topic消息
*/
@Test
public void testTopic(){
//发型消息
rabbitTemplate.convertAndSend("spring_topic_exchange","heima.hehe.haha","spring topic ...");
}
}

观察消息情况

image-20221213154821609

消费者

编写配置文件

image-20221213170440035

rabbitmq.properties

1
2
3
4
5
rabbitmq.host=123.60.45.18
rabbitmq.port=5672
rabbitmq.username=heima
rabbitmq.password=heima
rabbitmq.virtual-host=/itheima

spring-rabbitmq-consumer.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 加载配置文件 -->
<context:property-placeholder location="classpath:/rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<bean id="springQueueListener" class="com.itheima.rabbitmq.listener.SpringQueueListener"/>
<!-- <bean id="fanoutListener1" class="com.itheima.rabbitmq.listener.FanoutListener1"/>-->
<!-- <bean id="fanoutListener2" class="com.itheima.rabbitmq.listener.FanoutListener2"/>-->
<!-- <bean id="topicListenerStar" class="com.itheima.rabbitmq.listener.TopicListenerStar"/>-->
<!-- <bean id="topicListenerWell" class="com.itheima.rabbitmq.listener.TopicListenerWell"/>-->
<!-- <bean id="topicListenerWell2" class="com.itheima.rabbitmq.listener.TopicListenerWell2"/>-->
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
<!-- <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>-->
<!-- <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>-->
<!-- <rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>-->
<!-- <rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>-->
<!-- <rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>-->
</rabbit:listener-container>
</beans>

编写监听器

image-20221213170610232

1
2
3
4
5
6
7
8
9
10
11
12
13
package com.itheima.rabbitmq.listener;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class SpringQueueListener implements MessageListener {

@Override
public void onMessage(Message message) {

System.out.println(new String(message.getBody()));
}
}

编写测试方法启动服务监听

image-20221213170715933

image-20221213170742021

SpringBoot整合RabbitMQ

生产者

image-20221213171855416

创建springboot工程并引入依赖

image-20221213225437359

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.7-SNAPSHOT</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.itheima</groupId>
<artifactId>producer-springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>producer-springboot</name>
<description>producer-springboot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<releases>
<enabled>false</enabled>
</releases>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<releases>
<enabled>false</enabled>
</releases>
</pluginRepository>
</pluginRepositories>

</project>

配置文件

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 123.60.45.18
username: guest
password: guest
virtual-host: /
port: 5672

编写配置类

image-20221213232056981

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.itheima.producerspringboot.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.Exchanger;

@Configuration
public class RabbitMQConfig {

public static final String EXCHANGE_NAME="boot_topic_exchange";
public static final String QUEUE_NAME="boot_queue";


//交换机
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}

//2.Queue队列
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}

//3.队列和交换就绑定关系 Binding
/*
1.知道哪个队列
2.知道哪个交换机
3.routing key
*/

@Bean
public Binding bindingQueueExchange(@Qualifier("bootQueue")Queue queue,@Qualifier("bootExchange")Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}

编写测试方法

image-20221213232916257

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.itheima.producerspringboot;

import com.itheima.producerspringboot.rabbitmq.config.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

@SpringBootTest
@RunWith(SpringRunner.class)
class ProducerSpringbootApplicationTests {

@Resource
private RabbitTemplate rabbitTemplate;

@Test
public void testSend(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot mq hello~~~~");
}

}

image-20221213233304290

image-20221213233339003

消费者

image-20221214000326670

编写监听类

image-20221214000148768

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.itheima.consumerspringboot;


import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQListener {

@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message){
System.out.println(new String(message.getBody()));
}
}

启动消费者springboot

image-20221214000248870

小结

image-20221214000349806

image-20221214223708162

image-20221214223727050

RabbitMQ高级特性

消息的可靠投递

image-20221214224010876

confirm模式

生产者

创建一个基于spring的工程

image-20221214230022054

编写配置文件

image-20221214230118161

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>

<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>

<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

<!--消息可靠性投递(生产端)-->
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
<rabbit:direct-exchange name="test_exchange_confirm">
<rabbit:bindings>
<rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>

编写测试类及测试方法

image-20221214230241121

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.itheima.test;


import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {

@Autowired
private RabbitTemplate rabbitTemplate;


/**
* 确认模式:
* 步骤:
* 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true"
* 2. 在rabbitTemplate定义ConfirmCallBack回调函数
*/
@Test
public void testConfirm() {

//2. 定义回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 相关配置信息
* @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被执行了....");

if (ack) {
//接收成功
System.out.println("接收成功消息" + cause);
} else {
//接收失败
System.out.println("接收失败消息" + cause);
//做一些处理,让消息再次发送。
}
}
});

//3. 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
}

}

执行测试方法

image-20221214230627267

模拟交换机名称错误,并发送消息

1
2
//3. 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm111", "confirm", "message confirm....");

image-20221214230956049

return模式

开启return模式

image-20221214231411695

编写return模式测试方法并开启处理失败消息的模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

/**
* 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败时 才会执行 ReturnCallBack
* 步骤:
* 1. 开启回退模式:publisher-returns="true"
* 2. 设置ReturnCallBack
* 3. 设置Exchange处理消息的模式:
* 1. 如果消息没有路由到Queue,则丢弃消息(默认)
* 2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
*/

@Test
public void testReturn() {

//设置交换机处理失败消息的模式
rabbitTemplate.setMandatory(true);

//2.设置ReturnCallBack
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 执行了....");

System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);

//处理
}
});


//3. 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm111", "message confirm....");
}

routingkey错误并执行测试方法

image-20221214232406002

小结

image-20221214232554219

Consumer Ack

image-20221214234847034

创建一个spring工程

image-20221214235256581

定义监听器

image-20221214235414010

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.itheima.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* Consumer ACK机制:
* 1. 设置手动签收。acknowledge="manual"
* 2. 让监听器类实现ChannelAwareMessageListener接口
* 3. 如果消息成功处理,则调用channel的 basicAck()签收
* 4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
*
*
*/

@Component
public class AckListener implements ChannelAwareMessageListener {

@Override
public void onMessage(Message message, Channel channel) throws Exception {

long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {
//1.接收转换消息
System.out.println(new String(message.getBody()));

//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
int i = 3/0;//出现错误
//3. 手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//e.printStackTrace();

//4.拒绝签收
/*
第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
*/
channel.basicNack(deliveryTag,true,true);
//channel.basicReject(deliveryTag,true);
}
}
}

修改spring配置文件,并开启手动签收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>

<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>


<context:component-scan base-package="com.itheima.listener" />

<!--定义监听器容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>
</rabbit:listener-container>

</beans>

编写测试方法并启动

image-20221215000549806

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.itheima.test;


import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {

@Test
public void test(){
while (true){

}
}
}

生产者发送消息

image-20221215000623857

观察消费者监听情况

image-20221215000645219

模拟出错的情况

image-20221215000952980

消费者监听情况–不断的重发

image-20221215001103990

image-20221215001205631

小结

image-20221215001237460

消息可靠性总结

image-20221215001404566

消费端限流

image-20221221184722213

编写监听器

image-20221221185752225

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.itheima.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

/**
* Consumer 限流机制
* 1. 确保ack机制为手动确认。
* 2. listener-container配置属性
* perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
*/

@Component
public class QosListener implements ChannelAwareMessageListener {

@Override
public void onMessage(Message message, Channel channel) throws Exception {

Thread.sleep(1000);
//1.获取消息
System.out.println(new String(message.getBody()));

//2. 处理业务逻辑

//3. 签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);

}
}

改写spring配置

spring-rabbitmq-consumer.xml

1
2
3
4
5
6
7
8
9
    <!--定义监听器容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!-- <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>-->
<rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>
<!-- &lt;!&ndash;定义监听器,监听正常队列&ndash;&gt;-->
<!-- &lt;!&ndash;<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener>&ndash;&gt;-->
<!-- &lt;!&ndash;延迟队列效果实现: 一定要监听的是 死信队列!!!&ndash;&gt;-->
<!-- <rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener>-->
</rabbit:listener-container>

实验1:模拟监听器不加prefetch 以及消费端不签收

image-20221221190113607

image-20221221190142660

生产者发送十条数据,并观察消费者,可见一次性获取了所有的十条消息

1
2
3
4
5
6
7
@Test
public void testSend() {
for (int i = 0; i < 10; i++) {
// 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
}
}

image-20221221190420413

实验2:模拟监听器加上prefetch 以及消费端不签收

image-20221221190621623

生产者重新发送10条数据,可见消费者仅接收到一条消息

image-20221221190729366

实验3:打开消费端签收,可以观察到消费端依次消费每次打印一行。

image-20221221213842724

TTL

image-20221227211250411

设置整个队列的过期时间

编写spring配置

spring-rabbitmq-producer.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<!--ttl-->
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
<!--设置queue的参数-->
<rabbit:queue-arguments>
<!--x-message-ttl指队列的过期时间-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>

</rabbit:queue>

<rabbit:topic-exchange name="test_exchange_ttl" >
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

编写测试方法

ProducerTest.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

/**
* TTL:过期时间
* 1. 队列统一过期
*
* 2. 消息单独过期
*
*
* 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
* 队列过期后,会将队列所有消息全部移除。
* 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)
*
*/
@Test
public void testTtl() {
for (int i = 0; i < 10; i++) {
// 发送消息
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
}
}

测试运行观察

image-20221227212253072

10s后消失

image-20221227212314583

设置单个消息的过期时间

编写测试方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
      /**
* TTL:过期时间
* 1. 队列统一过期
*
* 2. 消息单独过期
*
*
* 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
* 队列过期后,会将队列所有消息全部移除。
* 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)
*
*/
@Test
public void testTtl() {

// 消息后处理对象,设置一些消息的参数信息
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {

@Override
public Message postProcessMessage(Message message) throws AmqpException {
//1.设置message的信息
message.getMessageProperties().setExpiration("5000");//消息的过期时间
//2.返回该消息
return message;
}
};


//消息单独过期
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
}

测试运行并观察

image-20221227212550785

5s后消失

image-20221227212606453

测试单个消息非队列顶部设置超时时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* TTL:过期时间
* 1. 队列统一过期
*
* 2. 消息单独过期
*
*
* 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
* 队列过期后,会将队列所有消息全部移除。
* 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)
*
*/
@Test
public void testTtl() {
// 消息后处理对象,设置一些消息的参数信息
MessagePostProcessor messagePostProcessor = message -> {
//1.设置message的信息
message.getMessageProperties().setExpiration("5000");//消息的过期时间
//2.返回该消息
return message;
};

for (int i = 0; i < 10; i++) {
if(i == 5){
//消息单独过期
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
}else{
//不过期的消息
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");

}
}
}

观察设置单个超时时间的消息不会消失

image-20221227212936701

死信队列

image-20221227213304807

image-20221227213532687

image-20221227213608862

编写spring配置

spring-rabbitmq-producer.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

<!--
死信队列:
1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
3. 正常队列绑定死信交换机
设置两个参数:
* x-dead-letter-exchange:死信交换机名称
* x-dead-letter-routing-key:发送给死信交换机的routingkey
-->

<!--
1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
-->

<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
<!--3. 正常队列绑定死信交换机-->
<rabbit:queue-arguments>
<!--3.1 x-dead-letter-exchange:死信交换机名称-->
<entry key="x-dead-letter-exchange" value="exchange_dlx" />

<!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
<entry key="x-dead-letter-routing-key" value="dlx.hehe" />

<!--4.1 设置队列的过期时间 ttl-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
<!--4.2 设置队列的长度限制 max-length -->
<entry key="x-max-length" value="10" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>


<!--
2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
-->

<rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

测试过期时间

编写测试方法

1
2
3
4
5
6
7
8
9
10
11
12
13

/**
* 发送测试死信消息:
* 1. 过期时间
* 2. 长度限制
* 3. 消息拒收
*/
@Test
public void testDlx(){
//1. 测试过期时间,死信消息
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");

}

执行并观察

image-20221227214849761

10s后消息进入死信队列

image-20221227214917514

测试长度限制

编写测试方法

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 发送测试死信消息:
* 1. 过期时间
* 2. 长度限制
* 3. 消息拒收
*/
@Test
public void testDlx(){
//2. 测试长度限制后,消息死信
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
}
}

执行并观察,10条消息进入了普通队列 超过10条限制的消息直接进入了死信队列

image-20221227215247232

10s,普通队列的消息超时也进入了死信队列

image-20221227215325227

测试消息拒收

编写监听器

image-20221227215842256

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.itheima.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;



@Component
public class DlxListener implements ChannelAwareMessageListener {

@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {
//1.接收转换消息
System.out.println(new String(message.getBody()));

//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
int i = 3/0;//出现错误
//3. 手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//e.printStackTrace();
System.out.println("出现异常,拒绝接受");
//4.拒绝签收,不重回队列 requeue=false
channel.basicNack(deliveryTag,true,false);
}
}
}

1
2
3
4
5
<!--定义监听器容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!--定义监听器,监听正常队列-->
<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener>
</rabbit:listener-container>

编写生产者测试方法

1
2
3
4
5
6
7
8
9
10
11
12
13
    /**
* 发送测试死信消息:
* 1. 过期时间
* 2. 长度限制
* 3. 消息拒收
*/
@Test
public void testDlx(){

// //3. 测试消息拒收
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");

}

执行并观察

image-20221227220206106

消息直接进入了死信队列

image-20221227220224963

延迟队列

image-20221228145947307

image-20221228150319866

代码编写

编写生产者spring配置,定义普通队列和死信队列并绑定

spring-rabbitmq-producer.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<!--
延迟队列:
1. 定义正常交换机(order_exchange)和队列(order_queue)
2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
3. 绑定,设置正常队列过期时间为30分钟
-->
<!-- 1. 定义正常交换机(order_exchange)和队列(order_queue)-->
<rabbit:queue id="order_queue" name="order_queue">
<!-- 3. 绑定,设置正常队列过期时间为30分钟-->
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
<entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />

</rabbit:queue-arguments>

</rabbit:queue>
<rabbit:topic-exchange name="order_exchange">
<rabbit:bindings>
<rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

<!-- 2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
<rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="order_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

编写消费者监听器

image-20221228151601690

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.itheima.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;


@Component
public class OrderListener implements ChannelAwareMessageListener {






@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {
//1.接收转换消息
System.out.println(new String(message.getBody()));

//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
System.out.println("根据订单id查询其状态...");
System.out.println("判断状态是否为支付成功");
System.out.println("取消订单,回滚库存....");
//3. 手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//e.printStackTrace();
System.out.println("出现异常,拒绝接受");
//4.拒绝签收,不重回队列 requeue=false
channel.basicNack(deliveryTag,true,false);
}
}
}

编写消费者spring配置绑定监听死信队列

spring-rabbitmq-consumer.xml

1
2
3
4
5
    <!--定义监听器容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!-- &lt;!&ndash;延迟队列效果实现: 一定要监听的是 死信队列!!!&ndash;&gt;-->
<rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener>
</rabbit:listener-container>

运行消费者

定义生产者测试方法

1
2
3
4
5
6
7
8
9
10
11
12
    @Test
public void testDelay() throws InterruptedException {
//1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2019年8月17日16:41:47");

// 2.打印倒计时10秒
for (int i = 10; i > 0 ; i--) {
System.out.println(i+"...");
Thread.sleep(1000);
}

}

执行测试方法,并观察延迟队列效果

image-20221228152155313

image-20221228152136393

10s后,消息进入死信队列并被消费者消费

image-20221228152244940

image-20221228152304713

image-20221228152358765

日志监控

image-20221228152738323

image-20221228152755022

image-20221228152842875

image-20221228152918217

消息追踪

image-20221228190823227

image-20221228153322289

添加队列

image-20221228191726685

image-20221228191741077

在默认的交换机amq.rabbitmq.trace中绑定创建好的队列

image-20221228191855367

不开启追踪往队列test_trace中发送消息

image-20221228192053698

观察队列中的消息情况

image-20221228192131023

开启追踪功能

1
2
3
[root@hecs-33111 ~]# rabbitmqctl trace_on
Starting tracing for vhost "/" ...
[root@hecs-33111 ~]#

观察队列消息情况,amq.rabbitmq.trace交换机也发送了消息并带有详细日志信息

image-20221228192530912

image-20221228190716899

image-20221228192732118

启用该插件

1
2
3
4
5
6
[root@hecs-33111 ~]# rabbitmq-plugins enable rabbitmq_tracing
The following plugins have been enabled:
rabbitmq_tracing

Applying plugin configuration to rabbit@hecs-33111... started 1 plugin.
[root@hecs-33111 ~]#

观察控制台多了一个选项

image-20221228192929754

定义一个trace

image-20221228193051332

image-20221228193121486

往队列里发送一个消息

image-20221228193205099

image-20221228193217634

RabbitMQ应用问题

image-20221228193435235

消息可靠性保障

image-20221228193508128

image-20221228193539855

消息幂等性保障

image-20221228194239318

image-20221228194314520

RabbitMQ集群搭建

摘要:实际生产应用中都会采用消息队列的集群方案,如果选择RabbitMQ那么有必要了解下它的集群方案原理

一般来说,如果只是为了学习RabbitMQ或者验证业务工程的正确性那么在本地环境或者测试环境上使用其单实例部署就可以了,但是出于MQ中间件本身的可靠性、并发性、吞吐量和消息堆积能力等问题的考虑,在生产环境上一般都会考虑使用RabbitMQ的集群方案。

集群方案的原理

RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。

1565245219265

单机多实例部署

由于某些因素的限制,有时候你不得不在一台机器上去搭建一个rabbitmq集群,这个有点类似zookeeper的单机版。真实生成环境还是要配成多机集群的。有关怎么配置多机集群的可以参考其他的资料,这里主要论述如何在单机中配置多个rabbitmq实例。

主要参考官方文档:https://www.rabbitmq.com/clustering.html

首先确保RabbitMQ运行没有问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
[root@super ~]# rabbitmqctl status
Status of node rabbit@super ...
[{pid,10232},
{running_applications,
[{rabbitmq_management,"RabbitMQ Management Console","3.6.5"},
{rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.5"},
{webmachine,"webmachine","1.10.3"},
{mochiweb,"MochiMedia Web Server","2.13.1"},
{rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.5"},
{rabbit,"RabbitMQ","3.6.5"},
{os_mon,"CPO CXC 138 46","2.4"},
{syntax_tools,"Syntax tools","1.7"},
{inets,"INETS CXC 138 49","6.2"},
{amqp_client,"RabbitMQ AMQP Client","3.6.5"},
{rabbit_common,[],"3.6.5"},
{ssl,"Erlang/OTP SSL application","7.3"},
{public_key,"Public key infrastructure","1.1.1"},
{asn1,"The Erlang ASN1 compiler version 4.0.2","4.0.2"},
{ranch,"Socket acceptor pool for TCP protocols.","1.2.1"},
{mnesia,"MNESIA CXC 138 12","4.13.3"},
{compiler,"ERTS CXC 138 10","6.0.3"},
{crypto,"CRYPTO","3.6.3"},
{xmerl,"XML parser","1.3.10"},
{sasl,"SASL CXC 138 11","2.7"},
{stdlib,"ERTS CXC 138 10","2.8"},
{kernel,"ERTS CXC 138 10","4.2"}]},
{os,{unix,linux}},
{erlang_version,
"Erlang/OTP 18 [erts-7.3] [source] [64-bit] [async-threads:64] [hipe] [kernel-poll:true]\n"},
{memory,
[{total,56066752},
{connection_readers,0},
{connection_writers,0},
{connection_channels,0},
{connection_other,2680},
{queue_procs,268248},
{queue_slave_procs,0},
{plugins,1131936},
{other_proc,18144280},
{mnesia,125304},
{mgmt_db,921312},
{msg_index,69440},
{other_ets,1413664},
{binary,755736},
{code,27824046},
{atom,1000601},
{other_system,4409505}]},
{alarms,[]},
{listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,411294105},
{disk_free_limit,50000000},
{disk_free,13270233088},
{file_descriptors,
[{total_limit,924},{total_used,6},{sockets_limit,829},{sockets_used,0}]},
{processes,[{limit,1048576},{used,262}]},
{run_queue,0},
{uptime,43651},
{kernel,{net_ticktime,60}}]

停止rabbitmq服务

1
2
3
[root@super sbin]# service rabbitmq-server stop
Stopping rabbitmq-server: rabbitmq-server.

启动第一个节点:

1
2
3
4
5
6
7
8
9
10
[root@super sbin]# RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start

RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /var/log/rabbitmq/rabbit1.log
###### ## /var/log/rabbitmq/rabbit1-sasl.log
##########
Starting broker...
completed with 6 plugins.

启动第二个节点:

web管理插件端口占用,所以还要指定其web插件占用的端口号。

1
2
3
4
5
6
7
8
9
10
11
[root@super ~]# RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start

RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /var/log/rabbitmq/rabbit2.log
###### ## /var/log/rabbitmq/rabbit2-sasl.log
##########
Starting broker...
completed with 6 plugins.

结束命令:

1
2
rabbitmqctl -n rabbit1 stop
rabbitmqctl -n rabbit2 stop

rabbit1操作作为主节点:

1
2
3
4
5
6
7
[root@super ~]# rabbitmqctl -n rabbit1 stop_app  
Stopping node rabbit1@super ...
[root@super ~]# rabbitmqctl -n rabbit1 reset
Resetting node rabbit1@super ...
[root@super ~]# rabbitmqctl -n rabbit1 start_app
Starting node rabbit1@super ...
[root@super ~]#

rabbit2操作为从节点:

1
2
3
4
5
6
7
8
9
[root@super ~]# rabbitmqctl -n rabbit2 stop_app
Stopping node rabbit2@super ...
[root@super ~]# rabbitmqctl -n rabbit2 reset
Resetting node rabbit2@super ...
[root@super ~]# rabbitmqctl -n rabbit2 join_cluster rabbit1@'super' ###''内是主机名换成自己的
Clustering node rabbit2@super with rabbit1@super ...
[root@super ~]# rabbitmqctl -n rabbit2 start_app
Starting node rabbit2@super ...

查看集群状态:

1
2
3
4
5
6
7
[root@super ~]# rabbitmqctl cluster_status -n rabbit1
Cluster status of node rabbit1@super ...
[{nodes,[{disc,[rabbit1@super,rabbit2@super]}]},
{running_nodes,[rabbit2@super,rabbit1@super]},
{cluster_name,<<"rabbit1@super">>},
{partitions,[]},
{alarms,[{rabbit2@super,[]},{rabbit1@super,[]}]}]

web监控:

1566065096459

集群管理

rabbitmqctl join_cluster {cluster_node} [–ram]
将节点加入指定集群中。在这个命令执行前需要停止RabbitMQ应用并重置节点。

rabbitmqctl cluster_status
显示集群的状态。

rabbitmqctl change_cluster_node_type {disc|ram}
修改集群节点的类型。在这个命令执行前需要停止RabbitMQ应用。

rabbitmqctl forget_cluster_node [–offline]
将节点从集群中删除,允许离线执行。

rabbitmqctl update_cluster_nodes {clusternode}

在集群中的节点应用启动前咨询clusternode节点的最新信息,并更新相应的集群信息。这个和join_cluster不同,它不加入集群。考虑这样一种情况,节点A和节点B都在集群中,当节点A离线了,节点C又和节点B组成了一个集群,然后节点B又离开了集群,当A醒来的时候,它会尝试联系节点B,但是这样会失败,因为节点B已经不在集群中了。

rabbitmqctl cancel_sync_queue [-p vhost] {queue}
取消队列queue同步镜像的操作。

rabbitmqctl set_cluster_name {name}
设置集群名称。集群名称在客户端连接时会通报给客户端。Federation和Shovel插件也会有用到集群名称的地方。集群名称默认是集群中第一个节点的名称,通过这个命令可以重新设置。

RabbitMQ镜像集群配置

上面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制。虽然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列。

镜像队列是基于普通的集群模式的,然后再添加一些策略,所以你还是得先配置普通集群,然后才能设置镜像队列,我们就以上面的集群接着做。

设置的镜像队列可以通过开启的网页的管理端Admin->Policies,也可以通过命令。

rabbitmqctl set_policy my_ha “^” ‘{“ha-mode”:”all”}’

1566072300852

  • Name:策略名称
  • Pattern:匹配的规则,如果是匹配所有的队列,是^.
  • Definition:使用ha-mode模式中的all,也就是同步所有匹配的队列。问号链接帮助文档。

负载均衡-HAProxy

HAProxy提供高可用性、负载均衡以及基于TCP和HTTP应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括Twitter,Reddit,StackOverflow,GitHub在内的多家知名互联网公司在使用。HAProxy实现了一种事件驱动、单一进程模型,此模型支持非常大的并发连接数。

安装HAProxy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//下载依赖包
yum install gcc vim wget
//上传haproxy源码包
//解压
tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local
//进入目录、进行编译、安装
cd /usr/local/haproxy-1.6.5
make TARGET=linux31 PREFIX=/usr/local/haproxy
make install PREFIX=/usr/local/haproxy
mkdir /etc/haproxy
//赋权
groupadd -r -g 149 haproxy
useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
//创建haproxy配置文件
mkdir /etc/haproxy
vim /etc/haproxy/haproxy.cfg
配置HAProxy

配置文件路径:/etc/haproxy/haproxy.cfg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#logging options
global
log 127.0.0.1 local0 info
maxconn 5120
chroot /usr/local/haproxy
uid 99
gid 99
daemon
quiet
nbproc 20
pidfile /var/run/haproxy.pid

defaults
log global

mode tcp

option tcplog
option dontlognull
retries 3
option redispatch
maxconn 2000
contimeout 5s

clitimeout 60s

srvtimeout 15s
#front-end IP for consumers and producters

listen rabbitmq_cluster
bind 0.0.0.0:5672

mode tcp
#balance url_param userid
#balance url_param session_id check_post 64
#balance hdr(User-Agent)
#balance hdr(host)
#balance hdr(Host) use_domain_only
#balance rdp-cookie
#balance leastconn
#balance source //ip

balance roundrobin

server node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2
server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2

listen stats
bind 172.16.98.133:8100
mode http
option httplog
stats enable
stats uri /rabbitmq-stats
stats refresh 5s

启动HAproxy负载

1
2
3
4
5
6
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
//查看haproxy进程状态
ps -ef | grep haproxy

访问如下地址对mq节点进行监控
http://172.16.98.133:8100/rabbitmq-stats

代码中访问mq集群地址,则变为访问haproxy地址:5672