使用RabbitMQ的RPC

Linux大全评论1.5K views阅读模式

笔者不才,根据小弟的经验觉得使用rabbitMQ进行RPC调研不太妥当,需要他能够实现跨语言,但是对于整体来说使用消息队列服务进行RPC调用,通过RabbitMQ的事务来确定消息已经成功处理完毕,然后通过消息队列服务的reply队列返回处理结果。总觉得差点什么,或者你跟我一样发现了一些问题。第一如何处理分布式事务,这个的确有点费解,这个后面在spring和JPA的时候再去说吧。第二个问题也是我还没有弄懂的一个问题,就是如何做到多线程并发处理。

为什么我会提出这个问题,因为根据rabbitMQ的消息队列处理机制当中,消息队列在消息没有ack之前他是不会继续分发消息的,所以同一时间内你只能处理一条消息,也代表你处理的消息是线性的。再看WEB程序,大家都清楚在Servlet 被调用的时候会新开一条线程,然后在独立的线程去调用serivce 或 dao 等等的业务,也证明了WEB服务是多线程去处理的,而非线性的,总不能一个人在调用这个链接之后,下一个用户也调用这个链接就卡住那里等着吧?所以对于消息队列这种线性的处理方式,我们应该怎么去做到并发呢?方法有能多种,设置成no_ack 然后每次接收到消息之后开启性的线程去处理。第二种方法也是比较简单的开多几个channel,这样就形成多个消费者了。不过以上两个想法,只是我想象而已,也没有实践过,不过今天在写这个东西的时候,我打算实践一下。而且我会在这个笔记当中记录RabbitMQ 的RPC 调用方式。

本人参考的文档为官网关于RPC的介绍:

http://www.rabbitmq.com/tutorials/tutorial-six-java.html

CentOS 5.6 安装RabbitMQ http://www.linuxidc.com/Linux/2013-02/79508.htm

RabbitMQ客户端C++安装详细记录 http://www.linuxidc.com/Linux/2012-02/53521.htm

用Python尝试RabbitMQ http://www.linuxidc.com/Linux/2011-12/50653.htm

RabbitMQ集群环境生产实例部署 http://www.linuxidc.com/Linux/2012-10/72720.htm

CentOS7环境安装使用专业的消息队列产品RabbitMQ http://www.linuxidc.com/Linux/2016-11/13673.htm

在CentOS上安装RabbitMQ流程 http://www.linuxidc.com/Linux/2011-12/49610.htm

RabbitMQ概念及环境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htm

RabbitMQ入门教程  http://www.linuxidc.com/Linux/2015-02/113983.htm

RabbitMQ 的详细介绍:请点这里
RabbitMQ 的下载地址:请点这里

本次我开发所使用的语言为JAVA,在动手写代码之前先说明几个重点问题。

在RabbitMQ 实现RPC的原理

在rabbitMQ上实现RPC,与之前的订阅消息和生产消息非常相似,唯一不一样的是RPC需要消息处理的放回结果,当然有时你不需要结果,你只在乎是否执行成功而已,但是这里所涉及的问题是我们怎样能做到当消息发送到队列之后,开始等待直到消息处理完成后返回处理结果后再进行执行其他处理。

引用一下官方的图,真心画的不错。

使用RabbitMQ的RPC

在到这个图基本上都清晰了。
1. client发起消息,然后带上一个唯一的correlation_id,同时在reply_to中提供一个独立的队列。官方一般建议使用默认独立。后面会有代码~
2. 携带好correlation_id 和 reply_to 之后将消息发布。
3. 然后server 获得消息,并处理消息。
4. 得出的处理结果通过 reply_to 中的队列 返回消息 并携带上 消息中携带的的correlation_id。
5. client 监听 reply_to 队列,当获得消息后,判断消息中correlation_id 是否与请求时发出的一致,如果一致就证明该消息是这个业务的处理结果。如果不一致就证明消息不是你的啦~ 别碰人家的东西!

贴出官方的summary:

  1. When the Client starts up, it creates an anonymous exclusive callback queue.【当Client运行时,创建一个匿名的 独立的 回调队列】
  2. For an RPC request, the Client sends a message with two properties: replyTo, which is set to the callback queue and correlationId, which is set to a unique value for every request. 【client 发送的消息 会携带两个参数是为了 RPC调用的,replyTo 是设置回调的队列,correlation 是 唯一的数值,不同的请求就不同的correlationID】
  3. The request is sent to an rpc_queue queue. [发送消息到队列当中]
  4. The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the replyTo field.【worker 即是服务器 等待队列中的消息,当请求出现时,开始干活同时通过replyTo字段中的队列发送处理结果】
  5. The client waits for data on the callback queue. When a message appears, it checks the correlationId property. If it matches the value from the request it returns the response to the application.【client 等待数据返回的队列,当有消息出现时,检查corrleationID 参数,如果和之前发出的correlationid吻合,就将结果返回到应用程序】

实现代码如下

Client 如下:

package com.maxfunner.rpc;

import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;

import java.io.IOException;
import java.util.UUID;

/** * Created by Tony on 2016/11/3. */
public class Client {


    public String sayHelloToServer(String username) throws IOException, InterruptedException {

        String exchangeName = "rpc_exchange";   //交换器名称
        String queueName = "rpc_queue";     //队列名称
        String routingKey = "rpc_key";  //路由键

        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("test");
        factory.setHost("192.168.0.21");
        factory.setPort(5673);
        factory.setUsername("tony");
        factory.setPassword("tonypwd");     //创建链接

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();   

        channel.exchangeDeclare(exchangeName, "direct", false, false, null);    //定义交换器

        channel.queueDeclare(queueName, false, false, false, null); //定义队列

        channel.queueBind(queueName, exchangeName, routingKey, null); //绑定队列

        String callbackQueue = channel.queueDeclare().getQueue();   //获得匿名的 独立的 默认队列

        String correlationId = UUID.randomUUID().toString();    //产生一个 关联ID correlationID

        QueueingConsumer consumer = new QueueingConsumer(channel);  // 创建一个消费者对象

        channel.basicConsume(callbackQueue,true,consumer);      //消费消息

        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()   //创建消息属性 
                .correlationId(correlationId)   //携带唯一的 correlationID
                .replyTo(callbackQueue).build();    //携带callback 回调的队列路由键

        channel.basicPublish(exchangeName,routingKey,basicProperties, SerializationUtils.serialize(username));  //发布消息


        String response = null;

        while(true){

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();   //循环获得消息

            if(correlationId.equals(delivery.getProperties().getCorrelationId())){  //匹配correlationID是否与发出去的correlation的ID一直

                response = (String) SerializationUtils.deserialize(delivery.getBody()); //获得处理结果

                break;
            }

        }


        channel.close();

        connection.close();


        //关闭链接

        return response;

    }


    public static void main(String[] args) throws IOException, InterruptedException {

        Client client = new Client();

        String response = client.sayHelloToServer("TONY");

        System.out.println("server response : " + response);

    }


}

Server 代码如下:

package com.maxfunner.rpc;

import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;

import java.io.IOException;

/** * Created by Tony on 2016/11/3. */
public class Server {

    public static void main(String[] args) throws IOException, InterruptedException {

        String exchangeName = "rpc_exchange";   //交换器名称
        String queueName = "rpc_queue";     //队列名称
        String routingKey = "rpc_key";  //路由键

        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("test");
        factory.setHost("192.168.0.21");
        factory.setPort(5673);
        factory.setUsername("tony");
        factory.setPassword("tonypwd");

        Connection connection = factory.newConnection();    //创建链接

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(exchangeName, "direct", false, false, null);    //定义交换器

        channel.queueDeclare(queueName, false, false, false, null); //定义队列

        channel.queueBind(queueName, exchangeName, routingKey, null); //绑定队列

        QueueingConsumer consumer = new QueueingConsumer(channel);     //创建一个消费者

        channel.basicConsume(queueName,true,consumer);  //消费消息

        while (true){

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  //获得一条消息

            String correlationID  = delivery.getProperties().getCorrelationId();    //获得额外携带的correlationID

            String replyTo = delivery.getProperties().getReplyTo(); //获得回调的队列路由键

            String body = (String) SerializationUtils.deserialize(delivery.getBody());  //获得请求的内容

            String responseMsg = "welcome " + body; //处理返回内容

            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .correlationId(correlationID)   //返回消息时携带 请求时传过来的correlationID
                    .build();

            channel.basicPublish("",replyTo,properties,SerializationUtils.serialize(responseMsg)); //返回处理结果

        }



    }



}

运行结果:

server response : welcome TONY

Process finished with exit code 0

最后看一个问题:如果这样去进行RPC调用,由于消息队列是线性去处理的,所以关键的问题是在几个点上【 auto_ack | channel&&exclusive 】:

  • auto_ack :上面的程序当中我将auto_ack设置成true,每次消息到达后我都会去确认消息已经收到,当确认后队列就会删除该消息,如果在等待消费返回的时候有其他channel发出的请求处理结果,然而这个correlationID和目前channel发出的correlation并不一致,但是这消息被你给确认了,所以正在需要获得返回结果的channel就与处理结果无缘了。
  • channel && exclusive:虽然exclusive 是独立的只供一个应用程序使用,但是多个channel之间是可以使用同一个独立队列的。所以如果多个线程去使用这个队列去消费消息的话,rabbitMQ会循环派发的。所以你要在不同的线程中获得相应请求的处理的结果,那就没有那么简单了。

事实上我是错的。看看下面贴出的代码大家都懂了。auto_ack 可以设置成true 不过重点还是在 channel 那个默认独立的 匿名队列,原来每次channel调用都是不一样的routingkey 吓死宝宝了~

看代码:

public class Client {

    private static Connection connection = null;
    private static ConnectionFactory factory = null;

    public Connection getConnection() throws IOException {

        if (connection == null) {
            factory = new ConnectionFactory();
            factory.setVirtualHost("test");
            factory.setHost("192.168.0.21");
            factory.setPort(5673);
            factory.setUsername("tony");
            factory.setPassword("tonypwd");     //创建链接
            connection = factory.newConnection();
        }
        return connection;
    }


    public String sayHelloToServer(String username) throws IOException, InterruptedException {

        String exchangeName = "rpc_exchange";   //交换器名称
        String queueName = "rpc_queue";     //队列名称
        String routingKey = "rpc_key";  //路由键


        Channel channel = getConnection().createChannel();

        channel.exchangeDeclare(exchangeName, "direct", false, false, null);    //定义交换器

        channel.queueDeclare(queueName, false, false, false, null); //定义队列

        channel.queueBind(queueName, exchangeName, routingKey, null); //绑定队列

        String callbackQueue = channel.queueDeclare().getQueue();   //获得匿名的 独立的 默认队列

        String correlationId = UUID.randomUUID().toString();    //产生一个 关联ID correlationID

        QueueingConsumer consumer = new QueueingConsumer(channel);  // 创建一个消费者对象

        channel.basicConsume(callbackQueue, true, consumer);      //消费消息

        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()   //创建消息属性
                .correlationId(correlationId)   //携带唯一的 correlationID
                .replyTo(callbackQueue).build();    //携带callback 回调的队列路由键

        channel.basicPublish(exchangeName, routingKey, basicProperties, SerializationUtils.serialize(username));  //发布消息


        String response = null;

        while (true) {

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();   //循环获得消息

            System.out.println("delivery >>>>[user:"+ username +"] >> routingKey : " + callbackQueue);

            if (correlationId.equals(delivery.getProperties().getCorrelationId())) {  //匹配correlationID是否与发出去的correlation的ID一直

                response = (String) SerializationUtils.deserialize(delivery.getBody()); //获得处理结果

                break;
            }

        }


        channel.close();

        //关闭链接

        return response;

    }


    public static void main(String[] args) throws IOException, InterruptedException {


        List<String> usernameList = new ArrayList<String>();

        usernameList.add("TONY_A");
        usernameList.add("TONY_B");
        usernameList.add("TONY_C");
        usernameList.add("TONY_D");
        usernameList.add("TONY_E");
        usernameList.add("TONY_F");
        usernameList.add("TONY_G");


        for (int i = 0; i < usernameList.size(); i++) {

            final String username = usernameList.get(i);

            new Thread(new Runnable() {
                public void run() {

                    Client client = new Client();

                    String response = null;

                    try {

                        response = client.sayHelloToServer(username);

                    } catch (IOException e) {

                        e.printStackTrace();
                        response = "ERROR!!!";

                    } catch (InterruptedException e) {

                        e.printStackTrace();
                        response = "ERROR!!!";
                    }

                    System.out.println("server response : " + response);
                }
            }).start();


        }

    }


}

服务器代码:

public class Server {

    public static void main(String[] args) throws IOException, InterruptedException {

        String exchangeName = "rpc_exchange";   //交换器名称
        String queueName = "rpc_queue";     //队列名称
        String routingKey = "rpc_key";  //路由键

        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("test");
        factory.setHost("192.168.0.21");
        factory.setPort(5673);
        factory.setUsername("tony");
        factory.setPassword("tonypwd");

        Connection connection = factory.newConnection();    //创建链接

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(exchangeName, "direct", false, false, null);    //定义交换器

        channel.queueDeclare(queueName, false, false, false, null); //定义队列

        channel.queueBind(queueName, exchangeName, routingKey, null); //绑定队列

        QueueingConsumer consumer = new QueueingConsumer(channel);     //创建一个消费者

        channel.basicConsume(queueName,true,consumer);  //消费消息

        while (true){

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  //获得一条消息

            String correlationID  = delivery.getProperties().getCorrelationId();    //获得额外携带的correlationID

            String replyTo = delivery.getProperties().getReplyTo(); //获得回调的队列路由键

            String body = (String) SerializationUtils.deserialize(delivery.getBody());  //获得请求的内容

            Thread.sleep(2000);

            String responseMsg = "welcome " + body; //处理返回内容

            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .correlationId(correlationID)   //返回消息时携带 请求时传过来的correlationID
                    .build();

            channel.basicPublish("",replyTo,properties,SerializationUtils.serialize(responseMsg)); //返回处理结果

        }



    }



}

运行之后吓死你~

delivery >>>>[user:TONY_A] >> routingKey : amq.gen-8IIKmpbatOaxi4qpk2iq1g
server response : welcome TONY_A
delivery >>>>[user:TONY_F] >> routingKey : amq.gen-DoLdGmIjpyQ58rDM1S-0Bw
server response : welcome TONY_F
delivery >>>>[user:TONY_D] >> routingKey : amq.gen-Xa-fQW2uUXuwcp9PXI8gzg
server response : welcome TONY_D
delivery >>>>[user:TONY_E] >> routingKey : amq.gen-RZcnSdVx4SWCsZRQD6Umcw
server response : welcome TONY_E
delivery >>>>[user:TONY_G] >> routingKey : amq.gen-ab8Wy9fCks5TuyETmZ0jFQ
server response : welcome TONY_G
delivery >>>>[user:TONY_C] >> routingKey : amq.gen-rhKc2IVr3VHpL0vmNxraPA
server response : welcome TONY_C
delivery >>>>[user:TONY_B] >> routingKey : amq.gen-DWYsJva6D7MAFdiVZacr-g
server response : welcome TONY_B

可以看到每一个channel的默认匿名的队列routingKey都是不一样的。所以不存在我所想的白痴问题。

我还白痴到画了一个图,这是我本以为是这样的:
使用RabbitMQ的RPC

这是事实:
使用RabbitMQ的RPC

通过上面的实验线性执行的,但是可以将处理的业务开一个线程去处理。就好像这样。

服务器的代码改改:

public class Server {

    public static void main(String[] args) throws IOException, InterruptedException {

        String exchangeName = "rpc_exchange";   //交换器名称
        String queueName = "rpc_queue";     //队列名称
        String routingKey = "rpc_key";  //路由键

        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("test");
        factory.setHost("192.168.0.21");
        factory.setPort(5673);
        factory.setUsername("tony");
        factory.setPassword("tonypwd");

        final Connection connection = factory.newConnection();    //创建链接

        final Channel channel = connection.createChannel();

        channel.exchangeDeclare(exchangeName, "direct", false, false, null);    //定义交换器

        channel.queueDeclare(queueName, false, false, false, null); //定义队列

        channel.queueBind(queueName, exchangeName, routingKey, null); //绑定队列

        QueueingConsumer consumer = new QueueingConsumer(channel);     //创建一个消费者

        channel.basicConsume(queueName,true,consumer);  //消费消息

        while (true){

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  //获得一条消息

            final String correlationID  = delivery.getProperties().getCorrelationId();    //获得额外携带的correlationID

            final String replyTo = delivery.getProperties().getReplyTo(); //获得回调的队列路由键

            final String body = (String) SerializationUtils.deserialize(delivery.getBody());  //获得请求的内容

            new Thread(new Runnable() {

                public void run() {

                    Channel channel = null;
                    try {
                        channel = connection.createChannel();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }

                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    String responseMsg = "welcome " + body; //处理返回内容

                    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                            .correlationId(correlationID)   //返回消息时携带 请求时传过来的correlationID
                            .build();

                    try {
                        channel.basicPublish("",replyTo,properties,SerializationUtils.serialize(responseMsg)); //返回处理结果
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }

            }).start();




        }



    }

}

当然我只是单纯的新增了一下线程,你可以做成线程池等等去处理性能上的问题。

企鹅博客
  • 本文由 发表于 2019年8月25日 21:53:15
  • 转载请务必保留本文链接:https://www.qieseo.com/134498.html

发表评论