make_server入手

首先来看看官方给的服务器demo

1
2
3
4
5
6
7
8
9
10
11
import thriftpy
pingpong_thrift = thriftpy.load("pingpong.thrift", module_name="pingpong_thrift")
from thriftpy.rpc import make_server
class Dispatcher(object):
def ping(self):
return "pong"
server = make_server(pingpong_thrift.PingPong, Dispatcher(), '127.0.0.1', 6000)
server.serve()

首先载入thrift文件没话说,下来是创建服务器,它是直接用里面的一个函数make_server ,这个函数会接收thrift导入协议中定义的服务和处理这个服务的处理对象,以及host和port作为参数,然后调用serve()方法,来创建一个服务器。

那么关键就在make_server函数上了,进入查看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def make_server(service, handler, host="localhost", port=9090, unix_socket=None, proto_factory=TBinaryProtocolFactory(), trans_factory=TBufferedTransportFactory(), client_timeout=3000, certfile=None):
truetrue# 为传入的服务和处理方法创建处理器
processor = TProcessor(service, handler)
truetrue# 如果是Unix_socket
if unix_socket:
server_socket = TServerSocket(unix_socket=unix_socket)
if certfile:
warnings.warn("SSL only works with host:port, not unix_socket.")
elif host and port:
if certfile:
server_socket = TSSLServerSocket(
host=host, port=port, client_timeout=client_timeout,
certfile=certfile)
else:
server_socket = TServerSocket(
host=host, port=port, client_timeout=client_timeout)
else:
raise ValueError("Either host/port or unix_socket must be provided.")
server = TThreadedServer(processor, server_socket,
iprot_factory=proto_factory,
itrans_factory=trans_factory)
return server

可以看到,创建一个服务器其实过程非常简单,首先创建一个处理器Processor,然后创建一个Socket,接下来创建服务器,这里使用的是TThreadedServer,顾名思义是多线程的服务器了。

其中有进行判断unix_socket和SSL的,SSL不多解释了。但是unix_socket是什么呢,UNIX 域套接字(UNIX domain socket),即通过文件系统(而非网络地址)进行寻址和访问的套接字。因此其不需要host和port。其实是进程间通信的一种方式吧,在一台电脑上进程间通信也可以使用TCP套接字或者UDP套接字,但它们的效率都没有域套接字高,另外域套接字使用的库函数和TCP套接字使用的库函数一样,这样操作比较简单。

那么不管这些unix_socket还有SSL这些东西,我们抽象一下就可以发现,其实只有5个东西:

  • 处理器 Processor 默认使用的是TProcessor
  • 套接字 socket 默认使用的是TServerSocket
  • 协议 默认使用的是TBinaryProtocolFactory()
  • 传输类型 默认使用的是TBufferedTransportFactory()
  • 服务器类型 默认使用的是TThreadedServer

那么要自定义一个自己的服务器,那么只要自定义这些就行了,比如我不想用多线程的服务器,我想用基于协程的服务器,自己写一个就好了= = !

下面一系列文章,将对这些内容进行分析讲解,使得大家可以轻松的自定义的使用thriftpy

处理器 Processor

目前在源码中看到的有2个Processor,一个是TProcessor,一个是TMultiplexedProcessor,另外还有一个Processor工厂TProcessorFactory。所以处理器总共就两个,一个是单服务处理器,一个是多服务处理器。

处理器中process方法是外部比如服务器调用的方法,而process_in是处理器内部调用的处理逻辑

TProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class TProcessor(object):
"""Base class for procsessor, which works on two streams."""
def __init__(self, service, handler):
self._service = service
self._handler = handler
def process_in(self, iprot):
api, type, seqid = iprot.read_message_begin()
if api not in self._service.thrift_services:
iprot.skip(TType.STRUCT)
iprot.read_message_end()
return api, seqid, TApplicationException(TApplicationException.UNKNOWN_METHOD), None # noqa
args = getattr(self._service, api + "_args")()
args.read(iprot)
iprot.read_message_end()
result = getattr(self._service, api + "_result")()
# convert kwargs to args
api_args = [args.thrift_spec[k][1] for k in sorted(args.thrift_spec)]
def call():
f = getattr(self._handler, api)
return f(*(args.__dict__[k] for k in api_args))
return api, seqid, result, call
def send_exception(self, oprot, api, exc, seqid):
oprot.write_message_begin(api, TMessageType.EXCEPTION, seqid)
exc.write(oprot)
oprot.write_message_end()
oprot.trans.flush()
def send_result(self, oprot, api, result, seqid):
oprot.write_message_begin(api, TMessageType.REPLY, seqid)
result.write(oprot)
oprot.write_message_end()
oprot.trans.flush()
def handle_exception(self, e, result):
for k in sorted(result.thrift_spec):
if result.thrift_spec[k][1] == "success":
continue
_, exc_name, exc_cls, _ = result.thrift_spec[k]
if isinstance(e, exc_cls):
setattr(result, exc_name, e)
break
else:
raise
def process(self, iprot, oprot):
api, seqid, result, call = self.process_in(iprot)
if isinstance(result, TApplicationException):
return self.send_exception(oprot, api, result, seqid)
try:
result.success = call()
except Exception as e:
# raise if api don't have throws
self.handle_exception(e, result)
if not result.oneway:
self.send_result(oprot, api, result, seqid)

具体的就不讲解了,主要的东西就是这个处理器可以将一个服务和一个处理方法对应起来,这个处理方法一般是类的实例,然后服务里定义的的函数一般就是这个类的方法,在process_in方法里面定义了call函数,它其实就是调用handler的具体方法,其中api就是thrift中定义的服务的api。

另外需要根据不同的协议来确定返回的值,比如下面的例子,返回的是字典,我就需要使用TJSONProtocolFactory,还有其他的一些协议,后面再讲。

handler的话比如这样,实例方法也ok,我这里函数都是独立的,所以使用静态方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class CalcAbility(object):
@staticmethod
def sync_ability(fundId, assets, flows, stocks, start, stop, calc_type):
try:
assert fundId and assets and start and stop, "fundId or assets or start or end not found - testing"
start_date = strpdate(start)
stop_date = strpdate(stop)
calc_type = calc_type or DEFAULT_CALC_TYPE
res = is_continuous_trade_day(start, stop, assets)
assert res[0], res[1]
except Exception as e:
sentry_logger.exception(e)
return gen_response(data=None, msg=str(e), errorcode=1)
return assets

thrift文件定义如下,thrift文件定义花样比较多,后面再说。

1
2
3
4
5
6
7
8
9
10
service Calc {
truestring sync_ability(
truetrue1: string fundId,
truetrue2: string assets,
truetrue3: string flows,
truetrue4: string stocks,
truetrue5: string start,
truetrue6: string stop,
truetrue7: string calc_type,)
}

TMultiplexedProcessor

这个处理器支持多个服务分别对应不同的处理方法,其实是一堆TProcessor和处理方法的集合,使用字典实现,其中service_name对应processor,所以定义TMultiplexedProcessor后,必须进行注册:register_processor

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class TMultiplexedProcessor(TProcessor):
SEPARATOR = ":"
def __init__(self):
self.processors = {}
def register_processor(self, service_name, processor):
if service_name in self.processors:
raise TApplicationException(
type=TApplicationException.INTERNAL_ERROR,
message='processor for `{0}` already registered'
.format(service_name))
self.processors[service_name] = processor
def process_in(self, iprot):
api, type, seqid = iprot.read_message_begin()
if type not in (TMessageType.CALL, TMessageType.ONEWAY):
raise TException("TMultiplex protocol only supports CALL & ONEWAY")
if TMultiplexedProcessor.SEPARATOR not in api:
raise TException("Service name not found in message. "
"You should use TMultiplexedProtocol in client.")
service_name, api = api.split(TMultiplexedProcessor.SEPARATOR)
if service_name not in self.processors:
iprot.skip(TType.STRUCT)
iprot.read_message_end()
e = TApplicationException(TApplicationException.UNKNOWN_METHOD)
return api, seqid, e, None
proc = self.processors[service_name]
args = getattr(proc._service, api + "_args")()
args.read(iprot)
iprot.read_message_end()
result = getattr(proc._service, api + "_result")()
# convert kwargs to args
api_args = [args.thrift_spec[k][1] for k in sorted(args.thrift_spec)]
def call():
f = getattr(proc._handler, api)
return f(*(args.__dict__[k] for k in api_args))
return api, seqid, result, call

使用TMultiplexedProcessor来注册几个服务和处理器:

1
2
3
4
5
6
7
thrift_app = TMultiplexedProcessor()
calc_processor = TProcessor(ability_thrift.Calc, CalcAbility())
read_processor = TProcessor(ability_thrift.Read, ReadAbility())
thrift_app.register_processor('ability_thrift.Calc', calc_processor)
thrift_app.register_processor('ability_thrift.Read', read_processor)

其中Read服务在thrift文件中定义,然后处理方法ReadAbility根据thrift协议来定,其实就是通过service_name找到对应的处理器,其他的就和TProcessor一样了

TProcessorFactory

处理器工厂,感觉其实没什么卵用,源码如下:

1
2
3
4
5
6
7
8
9
10
class TProcessorFactory(object):
def __init__(self, processor_class, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.processor_class = processor_class
def get_processor(self):
return self.processor_class(*self.args, **self.kwargs)

小结

这一节对常用的Processor做了一个介绍,当然深入的实现细节没有讲到,毕竟太复杂,现在的目标只是做个介绍罢了。

剧透

下一节我们将了解下TServerSocket