笔者不才,根据小弟的经验觉得使用rabbitMQ进行RPC调研不太妥当,需要他能够实现跨语言,但是对于整体来说使用消息队列服务进行RPC调用,通过RabbitMQ的事务来确定消息已经成功处理完毕,然后通过消息队列服务的reply队列返回处理结果。总觉得差点什么,或者你跟我一样发现了一些问题。第一如何处理分布式事务,这个的确有点费解,这个后面在spring和JPA的时候再去说吧。第二个问题也是我还没有弄懂的一个问题,就是如何做到多线程并发处理。
为什么我会提出这个问题,因为根据rabbitMQ的消息队列处理机制当中,消息队列在消息没有ack之前他是不会继续分发消息的,所以同一时间内你只能处理一条消息,也代表你处理的消息是线性的。再看WEB程序,大家都清楚在Servlet 被调用的时候会新开一条线程,然后在独立的线程去调用serivce 或 dao 等等的业务,也证明了WEB服务是多线程去处理的,而非线性的,总不能一个人在调用这个链接之后,下一个用户也调用这个链接就卡住那里等着吧?所以对于消息队列这种线性的处理方式,我们应该怎么去做到并发呢?方法有能多种,设置成no_ack 然后每次接收到消息之后开启性的线程去处理。第二种方法也是比较简单的开多几个channel,这样就形成多个消费者了。不过以上两个想法,只是我想象而已,也没有实践过,不过今天在写这个东西的时候,我打算实践一下。而且我会在这个笔记当中记录RabbitMQ 的RPC 调用方式。
本人参考的文档为官网关于RPC的介绍:
http://www.rabbitmq.com/tutorials/tutorial-six-java.html
CentOS 5.6 安装RabbitMQ http://www.linuxidc.com/Linux/2013-02/79508.htm
RabbitMQ客户端C++安装详细记录 http://www.linuxidc.com/Linux/2012-02/53521.htm
用Python尝试RabbitMQ http://www.linuxidc.com/Linux/2011-12/50653.htm
RabbitMQ集群环境生产实例部署 http://www.linuxidc.com/Linux/2012-10/72720.htm
CentOS7环境安装使用专业的消息队列产品RabbitMQ http://www.linuxidc.com/Linux/2016-11/13673.htm
在CentOS上安装RabbitMQ流程 http://www.linuxidc.com/Linux/2011-12/49610.htm
RabbitMQ概念及环境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htm
RabbitMQ入门教程 http://www.linuxidc.com/Linux/2015-02/113983.htm
RabbitMQ 的详细介绍:请点这里
RabbitMQ 的下载地址:请点这里
本次我开发所使用的语言为JAVA,在动手写代码之前先说明几个重点问题。
在RabbitMQ 实现RPC的原理
在rabbitMQ上实现RPC,与之前的订阅消息和生产消息非常相似,唯一不一样的是RPC需要消息处理的放回结果,当然有时你不需要结果,你只在乎是否执行成功而已,但是这里所涉及的问题是我们怎样能做到当消息发送到队列之后,开始等待直到消息处理完成后返回处理结果后再进行执行其他处理。
引用一下官方的图,真心画的不错。
在到这个图基本上都清晰了。
1. client发起消息,然后带上一个唯一的correlation_id,同时在reply_to中提供一个独立的队列。官方一般建议使用默认独立。后面会有代码~
2. 携带好correlation_id 和 reply_to 之后将消息发布。
3. 然后server 获得消息,并处理消息。
4. 得出的处理结果通过 reply_to 中的队列 返回消息 并携带上 消息中携带的的correlation_id。
5. client 监听 reply_to 队列,当获得消息后,判断消息中correlation_id 是否与请求时发出的一致,如果一致就证明该消息是这个业务的处理结果。如果不一致就证明消息不是你的啦~ 别碰人家的东西!
贴出官方的summary:
- When the Client starts up, it creates an anonymous exclusive callback queue.【当Client运行时,创建一个匿名的 独立的 回调队列】
- For an RPC request, the Client sends a message with two properties: replyTo, which is set to the callback queue and correlationId, which is set to a unique value for every request. 【client 发送的消息 会携带两个参数是为了 RPC调用的,replyTo 是设置回调的队列,correlation 是 唯一的数值,不同的请求就不同的correlationID】
- The request is sent to an rpc_queue queue. [发送消息到队列当中]
- The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the replyTo field.【worker 即是服务器 等待队列中的消息,当请求出现时,开始干活同时通过replyTo字段中的队列发送处理结果】
- The client waits for data on the callback queue. When a message appears, it checks the correlationId property. If it matches the value from the request it returns the response to the application.【client 等待数据返回的队列,当有消息出现时,检查corrleationID 参数,如果和之前发出的correlationid吻合,就将结果返回到应用程序】
实现代码如下
Client 如下:
package com.maxfunner.rpc;
import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;
import java.io.IOException;
import java.util.UUID;
/** * Created by Tony on 2016/11/3. */
public class Client {
public String sayHelloToServer(String username) throws IOException, InterruptedException {
String exchangeName = "rpc_exchange"; //交换器名称
String queueName = "rpc_queue"; //队列名称
String routingKey = "rpc_key"; //路由键
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("test");
factory.setHost("192.168.0.21");
factory.setPort(5673);
factory.setUsername("tony");
factory.setPassword("tonypwd"); //创建链接
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "direct", false, false, null); //定义交换器
channel.queueDeclare(queueName, false, false, false, null); //定义队列
channel.queueBind(queueName, exchangeName, routingKey, null); //绑定队列
String callbackQueue = channel.queueDeclare().getQueue(); //获得匿名的 独立的 默认队列
String correlationId = UUID.randomUUID().toString(); //产生一个 关联ID correlationID
QueueingConsumer consumer = new QueueingConsumer(channel); // 创建一个消费者对象
channel.basicConsume(callbackQueue,true,consumer); //消费消息
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder() //创建消息属性
.correlationId(correlationId) //携带唯一的 correlationID
.replyTo(callbackQueue).build(); //携带callback 回调的队列路由键
channel.basicPublish(exchangeName,routingKey,basicProperties, SerializationUtils.serialize(username)); //发布消息
String response = null;
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //循环获得消息
if(correlationId.equals(delivery.getProperties().getCorrelationId())){ //匹配correlationID是否与发出去的correlation的ID一直
response = (String) SerializationUtils.deserialize(delivery.getBody()); //获得处理结果
break;
}
}
channel.close();
connection.close();
//关闭链接
return response;
}
public static void main(String[] args) throws IOException, InterruptedException {
Client client = new Client();
String response = client.sayHelloToServer("TONY");
System.out.println("server response : " + response);
}
}
Server 代码如下:
package com.maxfunner.rpc;
import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;
import java.io.IOException;
/** * Created by Tony on 2016/11/3. */
public class Server {
public static void main(String[] args) throws IOException, InterruptedException {
String exchangeName = "rpc_exchange"; //交换器名称
String queueName = "rpc_queue"; //队列名称
String routingKey = "rpc_key"; //路由键
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("test");
factory.setHost("192.168.0.21");
factory.setPort(5673);
factory.setUsername("tony");
factory.setPassword("tonypwd");
Connection connection = factory.newConnection(); //创建链接
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "direct", false, false, null); //定义交换器
channel.queueDeclare(queueName, false, false, false, null); //定义队列
channel.queueBind(queueName, exchangeName, routingKey, null); //绑定队列
QueueingConsumer consumer = new QueueingConsumer(channel); //创建一个消费者
channel.basicConsume(queueName,true,consumer); //消费消息
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //获得一条消息
String correlationID = delivery.getProperties().getCorrelationId(); //获得额外携带的correlationID
String replyTo = delivery.getProperties().getReplyTo(); //获得回调的队列路由键
String body = (String) SerializationUtils.deserialize(delivery.getBody()); //获得请求的内容
String responseMsg = "welcome " + body; //处理返回内容
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.correlationId(correlationID) //返回消息时携带 请求时传过来的correlationID
.build();
channel.basicPublish("",replyTo,properties,SerializationUtils.serialize(responseMsg)); //返回处理结果
}
}
}
运行结果:
server response : welcome TONY
Process finished with exit code 0
最后看一个问题:如果这样去进行RPC调用,由于消息队列是线性去处理的,所以关键的问题是在几个点上【 auto_ack | channel&&exclusive 】:
- auto_ack :上面的程序当中我将auto_ack设置成true,每次消息到达后我都会去确认消息已经收到,当确认后队列就会删除该消息,如果在等待消费返回的时候有其他channel发出的请求处理结果,然而这个correlationID和目前channel发出的correlation并不一致,但是这消息被你给确认了,所以正在需要获得返回结果的channel就与处理结果无缘了。
- channel && exclusive:虽然exclusive 是独立的只供一个应用程序使用,但是多个channel之间是可以使用同一个独立队列的。所以如果多个线程去使用这个队列去消费消息的话,rabbitMQ会循环派发的。所以你要在不同的线程中获得相应请求的处理的结果,那就没有那么简单了。
事实上我是错的。看看下面贴出的代码大家都懂了。auto_ack 可以设置成true 不过重点还是在 channel 那个默认独立的 匿名队列,原来每次channel调用都是不一样的routingkey 吓死宝宝了~
看代码:
public class Client {
private static Connection connection = null;
private static ConnectionFactory factory = null;
public Connection getConnection() throws IOException {
if (connection == null) {
factory = new ConnectionFactory();
factory.setVirtualHost("test");
factory.setHost("192.168.0.21");
factory.setPort(5673);
factory.setUsername("tony");
factory.setPassword("tonypwd"); //创建链接
connection = factory.newConnection();
}
return connection;
}
public String sayHelloToServer(String username) throws IOException, InterruptedException {
String exchangeName = "rpc_exchange"; //交换器名称
String queueName = "rpc_queue"; //队列名称
String routingKey = "rpc_key"; //路由键
Channel channel = getConnection().createChannel();
channel.exchangeDeclare(exchangeName, "direct", false, false, null); //定义交换器
channel.queueDeclare(queueName, false, false, false, null); //定义队列
channel.queueBind(queueName, exchangeName, routingKey, null); //绑定队列
String callbackQueue = channel.queueDeclare().getQueue(); //获得匿名的 独立的 默认队列
String correlationId = UUID.randomUUID().toString(); //产生一个 关联ID correlationID
QueueingConsumer consumer = new QueueingConsumer(channel); // 创建一个消费者对象
channel.basicConsume(callbackQueue, true, consumer); //消费消息
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder() //创建消息属性
.correlationId(correlationId) //携带唯一的 correlationID
.replyTo(callbackQueue).build(); //携带callback 回调的队列路由键
channel.basicPublish(exchangeName, routingKey, basicProperties, SerializationUtils.serialize(username)); //发布消息
String response = null;
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //循环获得消息
System.out.println("delivery >>>>[user:"+ username +"] >> routingKey : " + callbackQueue);
if (correlationId.equals(delivery.getProperties().getCorrelationId())) { //匹配correlationID是否与发出去的correlation的ID一直
response = (String) SerializationUtils.deserialize(delivery.getBody()); //获得处理结果
break;
}
}
channel.close();
//关闭链接
return response;
}
public static void main(String[] args) throws IOException, InterruptedException {
List<String> usernameList = new ArrayList<String>();
usernameList.add("TONY_A");
usernameList.add("TONY_B");
usernameList.add("TONY_C");
usernameList.add("TONY_D");
usernameList.add("TONY_E");
usernameList.add("TONY_F");
usernameList.add("TONY_G");
for (int i = 0; i < usernameList.size(); i++) {
final String username = usernameList.get(i);
new Thread(new Runnable() {
public void run() {
Client client = new Client();
String response = null;
try {
response = client.sayHelloToServer(username);
} catch (IOException e) {
e.printStackTrace();
response = "ERROR!!!";
} catch (InterruptedException e) {
e.printStackTrace();
response = "ERROR!!!";
}
System.out.println("server response : " + response);
}
}).start();
}
}
}
服务器代码:
public class Server {
public static void main(String[] args) throws IOException, InterruptedException {
String exchangeName = "rpc_exchange"; //交换器名称
String queueName = "rpc_queue"; //队列名称
String routingKey = "rpc_key"; //路由键
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("test");
factory.setHost("192.168.0.21");
factory.setPort(5673);
factory.setUsername("tony");
factory.setPassword("tonypwd");
Connection connection = factory.newConnection(); //创建链接
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "direct", false, false, null); //定义交换器
channel.queueDeclare(queueName, false, false, false, null); //定义队列
channel.queueBind(queueName, exchangeName, routingKey, null); //绑定队列
QueueingConsumer consumer = new QueueingConsumer(channel); //创建一个消费者
channel.basicConsume(queueName,true,consumer); //消费消息
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //获得一条消息
String correlationID = delivery.getProperties().getCorrelationId(); //获得额外携带的correlationID
String replyTo = delivery.getProperties().getReplyTo(); //获得回调的队列路由键
String body = (String) SerializationUtils.deserialize(delivery.getBody()); //获得请求的内容
Thread.sleep(2000);
String responseMsg = "welcome " + body; //处理返回内容
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.correlationId(correlationID) //返回消息时携带 请求时传过来的correlationID
.build();
channel.basicPublish("",replyTo,properties,SerializationUtils.serialize(responseMsg)); //返回处理结果
}
}
}
运行之后吓死你~
delivery >>>>[user:TONY_A] >> routingKey : amq.gen-8IIKmpbatOaxi4qpk2iq1g
server response : welcome TONY_A
delivery >>>>[user:TONY_F] >> routingKey : amq.gen-DoLdGmIjpyQ58rDM1S-0Bw
server response : welcome TONY_F
delivery >>>>[user:TONY_D] >> routingKey : amq.gen-Xa-fQW2uUXuwcp9PXI8gzg
server response : welcome TONY_D
delivery >>>>[user:TONY_E] >> routingKey : amq.gen-RZcnSdVx4SWCsZRQD6Umcw
server response : welcome TONY_E
delivery >>>>[user:TONY_G] >> routingKey : amq.gen-ab8Wy9fCks5TuyETmZ0jFQ
server response : welcome TONY_G
delivery >>>>[user:TONY_C] >> routingKey : amq.gen-rhKc2IVr3VHpL0vmNxraPA
server response : welcome TONY_C
delivery >>>>[user:TONY_B] >> routingKey : amq.gen-DWYsJva6D7MAFdiVZacr-g
server response : welcome TONY_B
可以看到每一个channel的默认匿名的队列routingKey都是不一样的。所以不存在我所想的白痴问题。
通过上面的实验线性执行的,但是可以将处理的业务开一个线程去处理。就好像这样。
服务器的代码改改:
public class Server {
public static void main(String[] args) throws IOException, InterruptedException {
String exchangeName = "rpc_exchange"; //交换器名称
String queueName = "rpc_queue"; //队列名称
String routingKey = "rpc_key"; //路由键
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("test");
factory.setHost("192.168.0.21");
factory.setPort(5673);
factory.setUsername("tony");
factory.setPassword("tonypwd");
final Connection connection = factory.newConnection(); //创建链接
final Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "direct", false, false, null); //定义交换器
channel.queueDeclare(queueName, false, false, false, null); //定义队列
channel.queueBind(queueName, exchangeName, routingKey, null); //绑定队列
QueueingConsumer consumer = new QueueingConsumer(channel); //创建一个消费者
channel.basicConsume(queueName,true,consumer); //消费消息
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //获得一条消息
final String correlationID = delivery.getProperties().getCorrelationId(); //获得额外携带的correlationID
final String replyTo = delivery.getProperties().getReplyTo(); //获得回调的队列路由键
final String body = (String) SerializationUtils.deserialize(delivery.getBody()); //获得请求的内容
new Thread(new Runnable() {
public void run() {
Channel channel = null;
try {
channel = connection.createChannel();
} catch (IOException e) {
e.printStackTrace();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String responseMsg = "welcome " + body; //处理返回内容
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.correlationId(correlationID) //返回消息时携带 请求时传过来的correlationID
.build();
try {
channel.basicPublish("",replyTo,properties,SerializationUtils.serialize(responseMsg)); //返回处理结果
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
当然我只是单纯的新增了一下线程,你可以做成线程池等等去处理性能上的问题。