上一篇介绍整个RPC Server端的处理机制。http://www.linuxidc.com/Linux/2013-01/77251.htm
下面介绍在整个处理机制中怎么把具体的Request Call转换并调用到整体的实现逻辑。
主要以NameNode Client PRC Server作为例子来说明,整个转换通过Google Protocol Buffer RPC来实现。
final Call call = callQueue.take(); // pop the queue; maybe blocked here
.....
CurCall.set(call);
try {
// Make the call as the user via Subject.doAs, thus associating the call with the Subject
if (call.connection.user == null) {
value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
call.timestamp);
} else {
value =
call.connection.user.doAs
(new PrivilegedExceptionAction<Writable>() {
@Override
public Writable run() throws Exception {
// make the call
return call(call.rpcKind, call.connection.protocolName,
call.rpcRequest, call.timestamp);
}
}
);
}
} catch (Throwable e) {
//process exception
}
CurCall.set(null);
synchronized (call.connection.responseQueue) {
// setupResponse() needs to be sync'ed together with responder.doResponse() since setupResponse may use
// SASL to encrypt response data and SASL enforces
// its own message ordering.
setupResponse(buf, call, (error == null) ? RpcStatusProto.SUCCESS
: RpcStatusProto.ERROR, value, errorClass, error);
// Discard the large buf and reset it back to smaller size to free up heap
if (buf.size() > maxRespSize) {
LOG.warn("Large response size " + buf.size() + " for call "
+ call.toString());
buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
}
responder.doRespond(call);
}
从上面可以看出从call queue中取出一个call,调用call方法来处理call的逻辑,处理之后把call的结果封装成response并通过responder的doRespond把call的response加到response queue中。那行call方法是怎么处理具体call的逻辑的呢,下面就来介绍,具体call方法的逻辑在RPC中实现具体如下:
public Writable call(RPC.RpcKind rpcKind, String protocol,
Writable rpcRequest, long receiveTime) throws Exception {
return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
receiveTime);
}