RabbitMQ是AMQP(Advanced Message Queuing Protocol)的开源实现,由以高性能、健壮以及可伸缩出名的Erlang写成的开源消息代理。MQ消息队列,是消费-生产者模型的一个典型代表,一端往消息队列中写入消息,另一端读取或者订阅队列中的消息。
RabbitMQ安装
安装Erlang
安装RabbitMQ
RabbitMQ节点
节点用来指代RabbitMQ服务器实例,节点描述的是一个Erlang节点运行着一个Erlang应用程序。Erlang有虚拟机,而虚拟机的每个实例称之为节点,多个Erlang应用程序可以运行在一个节点上,节点之间可以进行本地通信。
启动节点
rabbitmq-server
查看状态
rabbitmqctl status
关闭节点
rabbitmqctl stop
RabbitMQ可视化
启动可视化插件
rabbitmq-plugins enable rabbitmq_management
登陆网页查看
账号:guest;密码:guest
RabbitMQ实践
HelloWorld
RabbitMQ是一个消息代理:接受和转发消息。可以比作一个邮局,将要发布的邮件放在邮箱中,并确定谁会发送邮件给谁,在这里,RabbitMQ是邮箱,邮局和邮递员。与邮局不同的是,RabbitMQ接受、存储和转发的是二进制Blob数据-消息。
Producer:消息发送者
Queue:消息接收、存储、转发的队列
Consumer:消息接受者
创建生产者Send类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50namespace Send
{
class Program
{
//消息发送者:Producer
public static void Main(string[] args)
{
//在本地创建一个消息代理连接,如果要连接到不同机器上的消息代理,可以使用IP地址
var factory = new ConnectionFactory { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
//创建一个管道,里面包含所有操作的API
using (var channel = connection.CreateModel())
{
//声明一个消息队列
channel.QueueDeclare(
queue: "hello",//队列名称
durable: false,//是否持久
exclusive: false,//是否排外
autoDelete: false,//是否自动删除
arguments: null//参数
);
string message = "Hello World";//定义发送的消息
var body = Encoding.UTF8.GetBytes(message);//由于发送的消息是二进制,所以需要经过编码
//发送消息
channel.BasicPublish(
exchange: "",
routingKey: "hello",
basicProperties: null,
body: body
);
//输出发送的消息
Console.WriteLine("[P] Sent {0}", message);
}
Console.WriteLine("Press [Enter] to exist.");
Console.ReadLine();
}
}
}
}
//QueueDeclare参数介绍:如果存在什么都不做,不存在,才会创建
//queue:队列的名称
//durable:是否持久化队列,队列默认是在内存中的,如果RabbitMQ重启会丢失,如果设置为true,则会保存到Erlang自带的数据库中,重启,会进行读取。
//exclusive:是否排外,有两个作用:1.当连接关闭时,是否会自动删除队列2.是否私有当前队列,
//如果为true,一般适用一个消费者情况
//autoDelete:是否自动删除
//arguments:其他参数创建消费者Receive类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46namespace Receive
{
class Program
{
//消息接受者:Consumer,与发布者不同,消费者会持续运行以监听消息
static void Main(string[] args)
{
//本地建立连接
var faction = new ConnectionFactory() { HostName = "localhost" };
using (var connection = faction.CreateConnection())
{
//创建通道
using (var channel = connection.CreateModel())
{
//声明一个队列,要与发布者的队列匹配
//由于发布者可能没有启动,这里声明是确保队列存在
channel.QueueDeclare(
queue: "hello",
exclusive: false,
autoDelete: false,
arguments: null
);
//由于服务器是异步的发送消息,因此需要提供回调
//实例化一个消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
//获取消息
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
//输出接收的消息
Console.WriteLine("[C] Received {0}", message);
};
//启动消费者
channel.BasicConsume(
queue: "hello",
autoAck: true,//自动确认
consumer: consumer
);
Console.WriteLine("Press [Enter] to exist.");
Console.ReadLine();
}
}
}
}
}运行生产者和消费者
Work Queues
多个消费者之间分配耗时任务,工作队列(Work Queues)思想:避免立即执行资源密集型任务等待。多个工作进程之间共享任务。
消息生产者NewTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68namespace NewTask
{
class Program
{
public static void Main(string[] args)
{
//通过工厂模式创建连接
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
//通过连接创建通道
using (var channel = connection.CreateModel())
{
//通过通道声明队列
channel.QueueDeclare(
queue: "task_queue",
exclusive: false,
autoDelete: false,
arguments: null
);
//编码发送的消息
var message1 = "First message.";
var body1 = Encoding.UTF8.GetBytes(message1);
var message2 = "Second message..";
var body2 = Encoding.UTF8.GetBytes(message2);
var message3 = "Third message...";
var body3 = Encoding.UTF8.GetBytes(message3);
//设置持久连接
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//发布消息到消息队列
//发送消息1
channel.BasicPublish(
exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body1
);
Console.WriteLine("[P] Sent {0}", message1);
//发送消息2
channel.BasicPublish(
exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body2
);
Console.WriteLine("[P] Sent {0}", message2);
//发送消息3
channel.BasicPublish(
exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body3
);
Console.WriteLine("[P] Sent {0}", message3);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}消息消费者Worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59namespace Worker
{
class Program
{
public static void Main(string[] args)
{
//通过工厂模式创建连接
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
//通过连接创建通道
using (var channel = connection.CreateModel())
{
//声明队列
channel.QueueDeclare(
queue: "task_queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
//配置Qos参数:通知服务器,当前消息未确认前,不要在发新的消息,均衡各个 消费者
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine("[*] Waiting for message.");
//声明消费者
var consumer = new EventingBasicConsumer(channel);
//接收消息回调
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("[C] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);//模拟耗时
Console.WriteLine("[C] Done");
//消息确认
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
//启动消费者获取消息
channel.BasicConsume(
queue: "task_queue",
autoAck: false,//手动确认
consumer: consumer
);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
}运行
默认,RabbitMQ会按顺序发送每个消息到下一个消费者,平均,每个消费者将获得同样数量的消息,这种分发方法称为循环法。
如果关闭一个Worker,就会丢失他处理的消息,及分发给它未处理的消息,为了确保消息永不丢失,RabbitMQ支持消息确认(message acknowledgment)。当消费者关闭,RabbMQ将重新发布消息。
Publish/Subscribe
- Exchange
RabbitMQ中消息传递模型的核心思想:生产者永远不会将消息直接发送给队列。实际上,生产者通常不知道消息是否被传递到任何队列。
生产者向Exchange 发送消息,由Exchange推送给队列。
Exchanges
rabbitmqctl list_exchanges
服务器上的echanges列表。
消息发送者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37namespace EmitLog
{
class Program
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
//声明Exchange
channel.ExchangeDeclare(
exchange: "logs",//交换机名称
type: "fanout"//交换机类型:发布订阅
);
//消息
var message = "Hello Wrold";
var body = Encoding.UTF8.GetBytes(message);
//发送消息到Exchange
channel.BasicPublish(
exchange: "logs",
routingKey: "",
basicProperties: null,
body: body
);
Console.WriteLine(" [C] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}消息接收者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50namespace ReceiveLog
{
class Program
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
//声明Exchange
channel.ExchangeDeclare(
exchange: "logs",
type: "fanout"
);
//随机队列名
var queueName = channel.QueueDeclare().QueueName;
//将Exchange与队列绑定
channel.QueueBind(
queue: queueName,//队列名称
exchange: "logs",//交换机名称
routingKey: ""
);
Console.WriteLine(" [*] Waiting for logs.");
//消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] {0}", message);
};
//启动
channel.BasicConsume(
queue: queueName,
autoAck: true,
consumer: consumer
);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
}运行
Routing
Exchange 类型设置为”fanout”,Exchange会将消息发送个每个队列,本次通过设置Exchange类型为”direct”,将消息发送到指定的队列。
消息生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34namespace EmitLogDirect
{
class Program
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
//声明Exchange
channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
var serverity = "info";
var message = "Hello World";
var body = Encoding.UTF8.GetBytes(message);
//发送消息到Exchange
channel.BasicPublish(
exchange: "direct_logs",
routingKey: serverity,//与队列绑定的key匹配
basicProperties: null,
body: body
);
Console.WriteLine(" [x] Sent '{0}':'{1}'", serverity, message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}消息接收者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53namespace ReceiveLogDirect
{
class Program
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
var queueName = channel.QueueDeclare().QueueName;
var serverity = "info";
//var serverity="error";
//将Exhange与队列绑定
channel.QueueBind(
queue: queueName,
exchange: "direct_logs",
routingKey: serverity //Exchange会将消息发送到匹配key的队列上
);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'",
routingKey, message);
};
channel.BasicConsume(
queue: queueName,
autoAck: true,
consumer: consumer
);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
}运行
只有routingKey匹配队列才会接收Exchange发送的消息。
Topics
Exchange类型是”direct”,只能匹配到固定的key的队列,为了更加灵活,类似正则匹配,使用Exchange类型”topic”。多个匹配的key用点号分隔,*(星号)表示匹配任意一个单词;#(井号)表示匹配零个或者多个单词。
消息生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33namespace EmitLogTopic
{
class Program
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
var message = "Hello World";
var body = Encoding.UTF8.GetBytes(message);
var routingKey = "chc.com";
channel.BasicPublish(
exchange: "topic_logs",
routingKey: routingKey,
basicProperties: null,
body: body
);
Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}消息接收者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48namespace ReceiveLogTopic
{
class Program
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
var queueName = channel.QueueDeclare().QueueName;
var bindingKey = "*.com";
//var bindingKey="chc.*";
channel.QueueBind(
exchange: "topic_logs",
queue: queueName,
routingKey: bindingKey
);
Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
};
channel.BasicConsume(
queue: queueName,
autoAck: true,
consumer: consumer
);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
}运行
RPC
RPC:远程过程调用,请求远程服务器上的数据,并等待返回的过程。
工作原理:
- 客户端启动,创建一个匿名的回调队列
- RPC请求,客户端发送消息:ReplyTo指示回调队列,CorrelationId指示每个请求的唯一值,与回调队列相匹配
- 请求发送到指定队列rpc_queue;
- 服务端接收请求,根据ReplyTo队列的属性将结果发送给客户端
- 客户端接收回调队列的数据,匹配CorrelationId,输出响应
RPC服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83namespace RPCServer
{
class Program
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(
queue: "rpc_queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
//多个服务进程平均分配负载
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(
queue: "rpc_queue",
autoAck: false,
consumer: consumer
);
Console.WriteLine(" [x] Awaiting RPC requests");
//接收请求消息
consumer.Received += (model, ea) =>
{
string response = null;
var body = ea.Body;
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
try
{
var message = Encoding.UTF8.GetString(body);
int n = int.Parse(message);
Console.WriteLine("[.] fib({0})", message);
response = fib(n).ToString();
}
catch (Exception e)
{
Console.WriteLine("[.]" + e.Message);
response = "";
}
finally
{
//发送响应消息
var responseBytes = Encoding.UTF8.GetBytes(response);
channel.BasicPublish(
exchange: "",
routingKey: props.ReplyTo,
basicProperties: replyProps,
body: responseBytes
);
channel.BasicAck(
deliveryTag: ea.DeliveryTag,
multiple: false
);
}
};
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
public static int fib(int n)
{
if (n == 0 || n == 1) return n;
return fib(n - 1) + fib(n - 2);
}
}
}PRC客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86namespace RPCClient
{
class Program
{
public static void Main(string[] args)
{
var rpcClient = new RPCClient();
Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
namespace RPCClient
{
public class RPCClient
{
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly EventingBasicConsumer consumer;
private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
private readonly IBasicProperties props;
public RPCClient()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel);
props = channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;//RPC请求与响应关连--确定是哪个客户端发出的请求
props.ReplyTo = replyQueueName;//回调队列
//接收响应
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var response = Encoding.UTF8.GetString(body);
if (ea.BasicProperties.CorrelationId == correlationId)
{
respQueue.Add(response);
}
};
}
public string Call(string message)
{
var messageBytes = Encoding.UTF8.GetBytes(message);
//客户端向服务端发送请求
channel.BasicPublish(
exchange: "",
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes
);
//客户端接收服务端的响应
channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true
);
return respQueue.Take();
}
public void Close()
{
connection.Close();
}
}
}运行
最后更新: 2019年02月27日 23:33
原始链接: https://www.github.com/ChangHub/BlogImages/raw/master/2018/09/20/RabbitMQ/