RabbitMQ是AMQP(Advanced Message Queuing Protocol)的开源实现,由以高性能、健壮以及可伸缩出名的Erlang写成的开源消息代理。MQ消息队列,是消费-生产者模型的一个典型代表,一端往消息队列中写入消息,另一端读取或者订阅队列中的消息。

RabbitMQ安装

安装Erlang

安装RabbitMQ

RabbitMQ节点

节点用来指代RabbitMQ服务器实例,节点描述的是一个Erlang节点运行着一个Erlang应用程序。Erlang有虚拟机,而虚拟机的每个实例称之为节点,多个Erlang应用程序可以运行在一个节点上,节点之间可以进行本地通信。

  • 启动节点

    rabbitmq-server

    RabbitMQ启动.jpg

  • 查看状态

    rabbitmqctl status

    RabbitMQ状态.jpg

  • 关闭节点

    rabbitmqctl stop

    RabbitMQ停止.jpg

RabbitMQ可视化

  • 启动可视化插件

    rabbitmq-plugins enable rabbitmq_management

RabbitMQ可视化.jpg

RabbitMQ实践

HelloWorld

RabbitMQ是一个消息代理:接受和转发消息。可以比作一个邮局,将要发布的邮件放在邮箱中,并确定谁会发送邮件给谁,在这里,RabbitMQ是邮箱,邮局和邮递员。与邮局不同的是,RabbitMQ接受、存储和转发的是二进制Blob数据-消息。

  • Producer:消息发送者

  • Queue:消息接收、存储、转发的队列

  • Consumer:消息接受者

    RabbitMQ机制1.jpg

  1. 创建生产者Send类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    namespace Send
    {
    class Program
    {
    //消息发送者:Producer
    public static void Main(string[] args)
    {
    //在本地创建一个消息代理连接,如果要连接到不同机器上的消息代理,可以使用IP地址
    var factory = new ConnectionFactory { HostName = "localhost" };

    using (var connection = factory.CreateConnection())
    {
    //创建一个管道,里面包含所有操作的API
    using (var channel = connection.CreateModel())
    {
    //声明一个消息队列
    channel.QueueDeclare(
    queue: "hello",//队列名称
    durable: false,//是否持久
    exclusive: false,//是否排外
    autoDelete: false,//是否自动删除
    arguments: null//参数
    );
    string message = "Hello World";//定义发送的消息
    var body = Encoding.UTF8.GetBytes(message);//由于发送的消息是二进制,所以需要经过编码

    //发送消息
    channel.BasicPublish(
    exchange: "",
    routingKey: "hello",
    basicProperties: null,
    body: body
    );
    //输出发送的消息
    Console.WriteLine("[P] Sent {0}", message);
    }
    Console.WriteLine("Press [Enter] to exist.");
    Console.ReadLine();
    }
    }
    }
    }

    //QueueDeclare参数介绍:如果存在什么都不做,不存在,才会创建
    //queue:队列的名称
    //durable:是否持久化队列,队列默认是在内存中的,如果RabbitMQ重启会丢失,如果设置为true,则会保存到Erlang自带的数据库中,重启,会进行读取。
    //exclusive:是否排外,有两个作用:1.当连接关闭时,是否会自动删除队列2.是否私有当前队列,
    //如果为true,一般适用一个消费者情况
    //autoDelete:是否自动删除
    //arguments:其他参数
  2. 创建消费者Receive类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    namespace Receive
    {
    class Program
    {
    //消息接受者:Consumer,与发布者不同,消费者会持续运行以监听消息
    static void Main(string[] args)
    {
    //本地建立连接
    var faction = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = faction.CreateConnection())
    {
    //创建通道
    using (var channel = connection.CreateModel())
    {
    //声明一个队列,要与发布者的队列匹配
    //由于发布者可能没有启动,这里声明是确保队列存在
    channel.QueueDeclare(
    queue: "hello",
    exclusive: false,
    autoDelete: false,
    arguments: null
    );
    //由于服务器是异步的发送消息,因此需要提供回调
    //实例化一个消费者
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
    //获取消息
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);
    //输出接收的消息
    Console.WriteLine("[C] Received {0}", message);
    };
    //启动消费者
    channel.BasicConsume(
    queue: "hello",
    autoAck: true,//自动确认
    consumer: consumer
    );
    Console.WriteLine("Press [Enter] to exist.");
    Console.ReadLine();
    }
    }
    }
    }
    }
  3. 运行生产者和消费者

    RabbitMQ-helloWrold.jpg

Work Queues

多个消费者之间分配耗时任务,工作队列(Work Queues)思想:避免立即执行资源密集型任务等待。多个工作进程之间共享任务。

RabbitMQ机制2.jpg

  1. 消息生产者NewTask

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    namespace NewTask
    {
    class Program
    {
    public static void Main(string[] args)
    {
    //通过工厂模式创建连接
    var factory = new ConnectionFactory() { HostName = "localhost" };

    using (var connection = factory.CreateConnection())
    {
    //通过连接创建通道
    using (var channel = connection.CreateModel())
    {
    //通过通道声明队列
    channel.QueueDeclare(
    queue: "task_queue",
    exclusive: false,
    autoDelete: false,
    arguments: null
    );

    //编码发送的消息
    var message1 = "First message.";
    var body1 = Encoding.UTF8.GetBytes(message1);

    var message2 = "Second message..";
    var body2 = Encoding.UTF8.GetBytes(message2);

    var message3 = "Third message...";
    var body3 = Encoding.UTF8.GetBytes(message3);

    //设置持久连接
    var properties = channel.CreateBasicProperties();
    properties.Persistent = true;

    //发布消息到消息队列
    //发送消息1
    channel.BasicPublish(
    exchange: "",
    routingKey: "task_queue",
    basicProperties: properties,
    body: body1
    );
    Console.WriteLine("[P] Sent {0}", message1);
    //发送消息2
    channel.BasicPublish(
    exchange: "",
    routingKey: "task_queue",
    basicProperties: properties,
    body: body2
    );
    Console.WriteLine("[P] Sent {0}", message2);
    //发送消息3
    channel.BasicPublish(
    exchange: "",
    routingKey: "task_queue",
    basicProperties: properties,
    body: body3
    );
    Console.WriteLine("[P] Sent {0}", message3);
    }
    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
    }
    }
    }
    }
  2. 消息消费者Worker

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    namespace Worker
    {
    class Program
    {
    public static void Main(string[] args)
    {
    //通过工厂模式创建连接
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
    //通过连接创建通道
    using (var channel = connection.CreateModel())
    {
    //声明队列
    channel.QueueDeclare(
    queue: "task_queue",
    durable: false,
    exclusive: false,
    autoDelete: false,
    arguments: null
    );

    //配置Qos参数:通知服务器,当前消息未确认前,不要在发新的消息,均衡各个 消费者
    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    Console.WriteLine("[*] Waiting for message.");

    //声明消费者
    var consumer = new EventingBasicConsumer(channel);

    //接收消息回调
    consumer.Received += (model, ea) =>
    {
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);

    Console.WriteLine("[C] Received {0}", message);

    int dots = message.Split('.').Length - 1;

    Thread.Sleep(dots * 1000);//模拟耗时

    Console.WriteLine("[C] Done");
    //消息确认
    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    };

    //启动消费者获取消息
    channel.BasicConsume(
    queue: "task_queue",
    autoAck: false,//手动确认
    consumer: consumer
    );
    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
    }
    }
    }
    }
    }
  3. 运行

    默认,RabbitMQ会按顺序发送每个消息到下一个消费者,平均,每个消费者将获得同样数量的消息,这种分发方法称为循环法。

    如果关闭一个Worker,就会丢失他处理的消息,及分发给它未处理的消息,为了确保消息永不丢失,RabbitMQ支持消息确认(message acknowledgment)。当消费者关闭,RabbMQ将重新发布消息。

    RabbitMQ-WorkQueue.jpg

    Publish/Subscribe

  • Exchange

RabbitMQ中消息传递模型的核心思想:生产者永远不会将消息直接发送给队列。实际上,生产者通常不知道消息是否被传递到任何队列。

生产者向Exchange 发送消息,由Exchange推送给队列。

RabbitMQ机制3.jpg

  • Exchanges

    rabbitmqctl list_exchanges服务器上的echanges列表。

    RabbitMQ-Exchanges.jpg

  1. 消息发送者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    namespace EmitLog
    {
    class Program
    {
    public static void Main(string[] args)
    {
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
    using (var channel = connection.CreateModel())
    {
    //声明Exchange
    channel.ExchangeDeclare(
    exchange: "logs",//交换机名称
    type: "fanout"//交换机类型:发布订阅
    );

    //消息
    var message = "Hello Wrold";
    var body = Encoding.UTF8.GetBytes(message);

    //发送消息到Exchange
    channel.BasicPublish(
    exchange: "logs",
    routingKey: "",
    basicProperties: null,
    body: body
    );

    Console.WriteLine(" [C] Sent {0}", message);
    }
    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
    }
    }
    }
    }
  2. 消息接收者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    namespace ReceiveLog
    {
    class Program
    {
    public static void Main(string[] args)
    {
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
    using (var channel = connection.CreateModel())
    {
    //声明Exchange
    channel.ExchangeDeclare(
    exchange: "logs",
    type: "fanout"
    );

    //随机队列名
    var queueName = channel.QueueDeclare().QueueName;
    //将Exchange与队列绑定
    channel.QueueBind(
    queue: queueName,//队列名称
    exchange: "logs",//交换机名称
    routingKey: ""
    );

    Console.WriteLine(" [*] Waiting for logs.");
    //消费者
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);

    Console.WriteLine(" [x] {0}", message);
    };
    //启动
    channel.BasicConsume(
    queue: queueName,
    autoAck: true,
    consumer: consumer
    );

    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
    }
    }
    }
    }
    }
  3. 运行

    RabbitMQ-P_S.jpg

Routing

Exchange 类型设置为”fanout”,Exchange会将消息发送个每个队列,本次通过设置Exchange类型为”direct”,将消息发送到指定的队列。

RabbitMQ机制4.jpg

  1. 消息生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    namespace EmitLogDirect
    {
    class Program
    {
    public static void Main(string[] args)
    {
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
    using (var channel = connection.CreateModel())
    {
    //声明Exchange
    channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");

    var serverity = "info";

    var message = "Hello World";
    var body = Encoding.UTF8.GetBytes(message);
    //发送消息到Exchange
    channel.BasicPublish(
    exchange: "direct_logs",
    routingKey: serverity,//与队列绑定的key匹配
    basicProperties: null,
    body: body
    );

    Console.WriteLine(" [x] Sent '{0}':'{1}'", serverity, message);
    }
    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
    }
    }
    }
    }
  2. 消息接收者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    namespace ReceiveLogDirect
    {
    class Program
    {
    public static void Main(string[] args)
    {
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
    using (var channel = connection.CreateModel())
    {
    channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");

    var queueName = channel.QueueDeclare().QueueName;

    var serverity = "info";
    //var serverity="error";

    //将Exhange与队列绑定
    channel.QueueBind(
    queue: queueName,
    exchange: "direct_logs",
    routingKey: serverity //Exchange会将消息发送到匹配key的队列上
    );

    Console.WriteLine(" [*] Waiting for messages.");

    var consumer = new EventingBasicConsumer(channel);

    consumer.Received += (model, ea) =>
    {
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);

    var routingKey = ea.RoutingKey;

    Console.WriteLine(" [x] Received '{0}':'{1}'",
    routingKey, message);
    };

    channel.BasicConsume(
    queue: queueName,
    autoAck: true,
    consumer: consumer
    );

    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
    }
    }
    }
    }
    }
  3. 运行

    只有routingKey匹配队列才会接收Exchange发送的消息。

    RabbitMQ-Routing.jpg

Topics

Exchange类型是”direct”,只能匹配到固定的key的队列,为了更加灵活,类似正则匹配,使用Exchange类型”topic”。多个匹配的key用点号分隔,*(星号)表示匹配任意一个单词;#(井号)表示匹配零个或者多个单词。

RabbitMQ机制5.jpg

  1. 消息生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    namespace EmitLogTopic
    {
    class Program
    {
    public static void Main(string[] args)
    {
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
    using (var channel = connection.CreateModel())
    {
    channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");

    var message = "Hello World";
    var body = Encoding.UTF8.GetBytes(message);

    var routingKey = "chc.com";

    channel.BasicPublish(
    exchange: "topic_logs",
    routingKey: routingKey,
    basicProperties: null,
    body: body
    );

    Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
    }
    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
    }
    }
    }
    }
  2. 消息接收者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    namespace ReceiveLogTopic
    {
    class Program
    {
    public static void Main(string[] args)
    {
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
    using (var channel = connection.CreateModel())
    {
    channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");

    var queueName = channel.QueueDeclare().QueueName;
    var bindingKey = "*.com";
    //var bindingKey="chc.*";

    channel.QueueBind(
    exchange: "topic_logs",
    queue: queueName,
    routingKey: bindingKey
    );

    Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);

    var routingKey = ea.RoutingKey;
    Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
    };

    channel.BasicConsume(
    queue: queueName,
    autoAck: true,
    consumer: consumer
    );

    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
    }
    }
    }
    }
    }
  3. 运行

RabbitMQ-topics.jpg

RPC

RPC:远程过程调用,请求远程服务器上的数据,并等待返回的过程。

RabbitMQ机制6.jpg

工作原理:

  • 客户端启动,创建一个匿名的回调队列
  • RPC请求,客户端发送消息:ReplyTo指示回调队列,CorrelationId指示每个请求的唯一值,与回调队列相匹配
  • 请求发送到指定队列rpc_queue;
  • 服务端接收请求,根据ReplyTo队列的属性将结果发送给客户端
  • 客户端接收回调队列的数据,匹配CorrelationId,输出响应
  1. RPC服务端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    namespace RPCServer
    {
    class Program
    {
    public static void Main(string[] args)
    {
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
    using (var channel = connection.CreateModel())
    {
    channel.QueueDeclare(
    queue: "rpc_queue",
    durable: false,
    exclusive: false,
    autoDelete: false,
    arguments: null
    );

    //多个服务进程平均分配负载
    channel.BasicQos(0, 1, false);

    var consumer = new EventingBasicConsumer(channel);

    channel.BasicConsume(
    queue: "rpc_queue",
    autoAck: false,
    consumer: consumer
    );
    Console.WriteLine(" [x] Awaiting RPC requests");

    //接收请求消息
    consumer.Received += (model, ea) =>
    {
    string response = null;

    var body = ea.Body;
    var props = ea.BasicProperties;

    var replyProps = channel.CreateBasicProperties();
    replyProps.CorrelationId = props.CorrelationId;
    try
    {
    var message = Encoding.UTF8.GetString(body);
    int n = int.Parse(message);

    Console.WriteLine("[.] fib({0})", message);
    response = fib(n).ToString();
    }
    catch (Exception e)
    {
    Console.WriteLine("[.]" + e.Message);
    response = "";
    }
    finally
    {
    //发送响应消息
    var responseBytes = Encoding.UTF8.GetBytes(response);
    channel.BasicPublish(
    exchange: "",
    routingKey: props.ReplyTo,
    basicProperties: replyProps,
    body: responseBytes
    );
    channel.BasicAck(
    deliveryTag: ea.DeliveryTag,
    multiple: false
    );
    }
    };
    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
    }
    }
    }

    public static int fib(int n)
    {
    if (n == 0 || n == 1) return n;
    return fib(n - 1) + fib(n - 2);
    }
    }
    }
  2. PRC客户端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    namespace RPCClient
    {
    class Program
    {
    public static void Main(string[] args)
    {
    var rpcClient = new RPCClient();
    Console.WriteLine(" [x] Requesting fib(30)");

    var response = rpcClient.Call("30");
    Console.WriteLine(" [.] Got '{0}'", response);

    rpcClient.Close();

    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
    }
    }
    }


    namespace RPCClient
    {
    public class RPCClient
    {
    private readonly IConnection connection;
    private readonly IModel channel;
    private readonly string replyQueueName;
    private readonly EventingBasicConsumer consumer;
    private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
    private readonly IBasicProperties props;

    public RPCClient()
    {
    var factory = new ConnectionFactory() { HostName = "localhost" };

    connection = factory.CreateConnection();
    channel = connection.CreateModel();

    replyQueueName = channel.QueueDeclare().QueueName;

    consumer = new EventingBasicConsumer(channel);

    props = channel.CreateBasicProperties();
    var correlationId = Guid.NewGuid().ToString();
    props.CorrelationId = correlationId;//RPC请求与响应关连--确定是哪个客户端发出的请求
    props.ReplyTo = replyQueueName;//回调队列

    //接收响应
    consumer.Received += (model, ea) =>
    {
    var body = ea.Body;
    var response = Encoding.UTF8.GetString(body);
    if (ea.BasicProperties.CorrelationId == correlationId)
    {
    respQueue.Add(response);
    }
    };
    }
    public string Call(string message)
    {
    var messageBytes = Encoding.UTF8.GetBytes(message);
    //客户端向服务端发送请求
    channel.BasicPublish(
    exchange: "",
    routingKey: "rpc_queue",
    basicProperties: props,
    body: messageBytes
    );
    //客户端接收服务端的响应
    channel.BasicConsume(
    consumer: consumer,
    queue: replyQueueName,
    autoAck: true
    );

    return respQueue.Take();
    }

    public void Close()
    {
    connection.Close();
    }

    }
    }
  3. 运行

RabbitMQ-RPC.jpg

× 请我吃糖~
打赏二维码