Hadoop源码分析之二(RPC机制之Call处理)

Linux大全评论453 views阅读模式

上一篇介绍整个RPC Server端的处理机制。http://www.linuxidc.com/Linux/2013-01/77251.htm

下面介绍在整个处理机制中怎么把具体的Request Call转换并调用到整体的实现逻辑。

主要以NameNode Client PRC Server作为例子来说明,整个转换通过Google Protocol Buffer RPC来实现。

          final Call call = callQueue.take(); // pop the queue; maybe blocked here
        .....
          CurCall.set(call);
          try {
            // Make the call as the user via Subject.doAs, thus associating the call with the Subject
            if (call.connection.user == null) {
              value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
                          call.timestamp);
            } else {
              value =
                call.connection.user.doAs
                  (new PrivilegedExceptionAction<Writable>() {
                    @Override
                    public Writable run() throws Exception {
                      // make the call
                      return call(call.rpcKind, call.connection.protocolName,
                                  call.rpcRequest, call.timestamp);

                    }
                  }
                  );
            }
          } catch (Throwable e) {
            //process exception
          }
          CurCall.set(null);
          synchronized (call.connection.responseQueue) {
            // setupResponse() needs to be sync'ed together with responder.doResponse() since setupResponse may use
            // SASL to encrypt response data and SASL enforces
            // its own message ordering.
            setupResponse(buf, call, (error == null) ? RpcStatusProto.SUCCESS
                : RpcStatusProto.ERROR, value, errorClass, error);
           
            // Discard the large buf and reset it back to smaller size to free up heap
            if (buf.size() > maxRespSize) {
              LOG.warn("Large response size " + buf.size() + " for call "
                  + call.toString());
              buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
            }
            responder.doRespond(call);
          }

从上面可以看出从call queue中取出一个call,调用call方法来处理call的逻辑,处理之后把call的结果封装成response并通过responder的doRespond把call的response加到response queue中。那行call方法是怎么处理具体call的逻辑的呢,下面就来介绍,具体call方法的逻辑在RPC中实现具体如下:

 public Writable call(RPC.RpcKind rpcKind, String protocol,
        Writable rpcRequest, long receiveTime) throws Exception {
      return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
          receiveTime);
 }

企鹅博客
  • 本文由 发表于 2019年9月21日 03:30:39
  • 转载请务必保留本文链接:https://www.qieseo.com/156882.html

发表评论