使用Python的multiprocessing.connections实现远程方法调用(RPC)


0. 背景

实现远程方法调用(RPC)的主要功能目标是让构建分布式计算(应用)更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性。 为实现该目标,RPC 框架需提供一种透明调用机制让使用者不必显式的区分本地调用和远程调用。 下面我们将具体细化stub结构的实现。

RPC 结构拆解如下图所示:

这里写图片描述
图1. RPC结构拆解

RPC 服务方通过 RpcServer 去导出(export)远程接口方法,而客户方通过 RpcClient 去引入(import)远程接口方法。 客户方像调用本地方法一样去调用远程接口方法,RPC 框架提供接口的代理实现,实际的调用将委托给代理 RpcProxy 。 代理封装调用信息并将调用转交给 RpcInvoker 去实际执行。 在客户端的 RpcInvoker 通过连接器 RpcConnector 去维持与服务端的通道 RpcChannel, 并使用 RpcProtocol 执行协议编码(encode)并将编码后的请求消息通过通道发送给服务方。
RPC 服务端接收器 RpcAcceptor 接收客户端的调用请求,同样使用 RpcProtocol 执行协议解码(decode)。 解码后的调用信息传递给 RpcProcessor 去控制处理调用过程,最后再委托调用给 RpcInvoker 去实际执行并返回调用结果。

基于消息传输层如socketsZeroMQ的基础之上,本文使用Python的multiprocessing.connections实现远程方法调用(RPC)。

1. 代码

远程方法调用的代码框架十分简单,具体可分为三部分:rpchandler, rpcserver, rpcclient。其中rpchandler为远程调用的服务类,在rpcserver中新建一个rpchandler类的实例,注册(导出)供远程调用的方法,并且运行server,而rpcclient中新建一个proxy实例代理客户端。

a) rpchandler.py

rpchandler.py中,将函数请求、参数和返回值使用pickle编码后,在不同的解释器直接传送pickle字节字符串,可以很容易的实现RPC。下面是一个简单的PRC处理器,可以被整合到一个处理器(handler)中去:

# -*- coding: utf-8 -*-
# rpchandler.py

import pickle

class RPCHandler(object):
def __init__(self):
# rpc functions map
self._functions = {}

def register_function(self, func):
self._functions[func.__name__] = func

def handle_connection(self, connection):
try:
while True:
# 接收到一条消息, 使用pickle协议编码
func_name, args, kwargs = pickle.loads(connection.recv())
# rpc调用函数,并返回结果
try:
r = self._functions[func_name](*args, **kwargs)
connection.send(pickle.dumps(r))
except Exception as e:
connection.send(pickle.dumps(e))
except EOFError:
pass

这个处理器包含了register_function注册函数,换句话说,就是暴露的接口,而handler_connection接口,是被rpcserver调用的,并需要将connection传入进去,参数的loads,接口调用,以及结果的dumps和send都是在这个函数里面。

b) rpcserver.py

要使用这个RPCHandler处理器,需要将它加入到一个消息服务器中。我们有很多种选择,但是使用multiprocessing.connection库是最简单的。下面是一个rpcserver例子:

# -*- coding: utf-8 -*-
# rpcserver.py

from multiprocessing.connection import Listener
from threading import Thread

def rpc_server(handler, address, authkey):

sock = Listener(address, authkey=authkey)
while True:
client = sock.accept()
t = Thread(target=handler.handle_connection, args = (client,))
t.daemon = True
t.start()


if __name__ == '__main__':

# 写几个测试方法
def add(x, y):
return x+y

def printdict(**kwargs):
cnt = 0
for k, v in kwargs.iteritems():
print(''.join(['"', str(k), '":"', str(v), '"']))
cnt += 1
return cnt


# 新建一个handler类实例, 并将add, printdict方法注册到handler里面
from rpchandler import RPCHandler
rpc_handler = RPCHandler()
rpc_handler.register_function(add)
rpc_handler.register_function(printdict)

# 运行server
rpc_server(rpc_handler, ('localhost', 17000), authkey='tab_space')

这个rpcserver中写了两个接口addprintdict,并将这两个接口注册到新建的RPCHandler的实例,最后运行rpc_server,把新建RPCHnalder实例,IPAddress以及authkey传入进去。

这是一个简单rpcserver就写好了,这时可以将其运行。

c) rpcclient.py

为了从一个远程客户端访问服务器,你需要创建一个对应的用来传送请求的RPC代理类。例如

# -*- coding: utf-8 -*-

import pickle

class RPCProxy(object):

def __init__(self, connection):
self._connection = connection

def __getattr__(self, name):
# 通过name,得到一个函数
def do_rpc(*args, **kwargs):
self._connection.send(pickle.dumps((name, args, kwargs)))
result = pickle.loads(self._connection.recv())
if isinstance(result, Exception):
raise result
return result

return do_rpc

# 远程连接并且调用
if __name__ == '__main__':

from multiprocessing.connection import Client
rpc_client = Client(('localhost', 17000), authkey='tab_space')

proxy = RPCProxy(rpc_client)
print proxy.add(2,3)

print proxy.printdict(**{"tab_space":"rpc", "github":"https://github.com/csdz"})

PRCProxy代理类中,重写了__getattr__方法,这个方法是通过一个name获得一个方法。最后在下面使用multiprocessing.connection新建一个rpc_clientconnection,放入到proxy中,然后就可以直接调用。这是一个同步调用操作,如果接口执行时间较长,建议使用异步调用。运行rpcclient.py后,在console得到运行结果:

> python rpcclient.py

5 # proxy.add(2,3)
2 # proxy.printdict, return cnt

而在rpcserver.py中在console得到运行结果:

> python rpcserver.py

"tab_space":"rpc"
"github":"https://github.com/csdz"

要注意的是很多消息层(比如multiprocessing )已经使用pickle 或是自定义协议序列化了数据。如果是这样的话,对pickle.dumps() 和pickle.loads() 的调用要去掉。

2. 简单探讨

RPCHandlerRPCProxy的基本思路是很比较简单的。如果一个客户端想要调用一个远程函数,比如foo(1,2,z=3),代理类创建一个包含了函数名和参数的元组('foo', (1,2), {'z':3})。这个元组被pickle序列化后通过网络连接发生出去。
这一步在RPCProxygetattr()方法返回的do_rpc() 闭包中完成。

服务器接收后通过pickle反序列化消息,查找函数名看看是否已经注册过,然后执行相应的函数。执行结果(或异常)被pickle序列化后返回发送给客户端。我们的实例需要依赖multiprocessing进行通信。不过,这种方式可以适用于其他任何消息系统。例如,
如果你想在ZeroMQ 之上使用RPC,仅仅只需要将连接对象换成合适的ZeroMQsocket 对象即可。

由于底层需要依赖pickle,那么安全问题就需要考虑了(因为一个聪明的黑客可以创建特定的消息,能够让任意函数通过pickle反序列化后被执行)。

作为pickle的替代,你也许可以考虑使用JSON、XML或一些其他的编码格式来序列化消息。例如,本机实例可以很容易的改写成JSON编码方案。还需要将pickle.loads()pickle.dumps()替换成json.loads()json.dumps()即可。

实现RPC的一个比较复杂的问题是如何去处理异常。至少,当方法产生异常时服务器不应该奔溃(上述简单的rpc当产生异常时服务器是会raise Exception的)。因此,返回给客户端的异常所代表的含义就要好好设计了。如果要使用pickle,异常对象实例在客户端能被反序列化并抛出。如果要使用其他的协议,那得想想另外的方法了。不过至少,应该在响应中返回异常字符串。

3. 总结

本文主要讲述了如何使用Pythonmultiprocessing.connections简单实现远程方法调用。这是一个简单demo,对于消息协议异常处理多线程执行异步调用等等问题,还有待考虑。github(https://github.com/csdz/)上有上述的代码demo

智能推荐

注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
© 2014-2019 ITdaan.com 粤ICP备14056181号  

赞助商广告