本篇文章给大家分享的是有关 RpcClient发送消息和同步接收消息原理是什么,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
成都创新互联网站建设公司一直秉承“诚信做人,踏实做事”的原则,不欺瞒客户,是我们最起码的底线! 以服务为基础,以质量求生存,以技术求发展,成交一个客户多一个朋友!专注中小微企业官网定制,做网站、网站设计,塑造企业网络形象打造互联网企业效应。
本身使用RpcClient发送消息与同步接收消息的代码是很简单的,如下:
RpcClient client = new RpcClient(channel, exchange, routingKey);
String msg = "hello world!";
byte[] result = client.primitiveCall(msg.getBytes());
这里的primitiveCall调用后,当前线程会进行同步等待,等待消息接收端给自己的回复消息
一个完整的发送消息与接收回复消息的图例:

整个流程详解:
RpcClient client = new RpcClient(channel, exchange, routingKey);
创建RpcClient时会做两件事:
A:创建一个回复queue,接收当前RpcClient发送的消息的消息接收人会将回复消息发到这个replyQueue上供当前RpcClient去接收回复消息
_replyQueue = setupReplyQueue();
protected String setupReplyQueue() throws IOException {
return _channel.queueDeclare("", false, false, true, true, null).getQueue();
//这里实际上是由rabbitmq server去定义一个唯一的queue(因为queueName是空的,所以是由server去生成queueName),最后返回这个queueName,queueName是由server生成的,使用的是以下这个方法:
Queue.DeclareOk queueDeclare(String queueName, boolean passive, boolean durable, boolean exclusive, boolean autoDelete,
Map arguments)
}
B:创建一个接收回复消息的consumer
_consumer = setupConsumer();
protected DefaultConsumer setupConsumer() throws IOException {
//创建一个接收消息的DefaultConsumer实例
DefaultConsumer consumer = new DefaultConsumer(_channel) {
@Override //发生shutdown的时候回调
public void handleShutdownSignal(String consumerTag,
ShutdownSignalException signal) {
synchronized (_continuationMap) {
for (Entry> entry : _continuationMap.entrySet()) {
entry.getValue().set(signal);
}
_consumer = null;
}
}
@Override //处理消息交付
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
//这部分就是和下面的代码一起协作来实现将异步接收强制变成同步接收
synchronized (_continuationMap) {
String replyId = properties.getCorrelationId();
BlockingCell