离上次更新thrift源码解析已经快有2个月了,自己也真是效率低。不管了,这次介绍的是Thriftpy中的套接字。

上次提到默认使用的套接字是TServerSocket,是在make_server中创建的,可以使用unix_socket或者TCP的套接字,另外如果基于https,就可以使用TSSLServerSocket。其实套接字也算传输transport协议中的一个部分,所以可以看到源码有关socket的都在Thriftpy/transport里面,其中有socket.pysslsocket.py,这里我们只介绍以下socket.py

socket.py中只有两个类,一个是TSocket,一个是TServerSocket,前者是在客户端方面使用的,TServerSocket是在服务器端使用的

客户端和服务器端对应,它使用make_client来创建一个客户端,其内部使用了TSocketTSSLSocket

TServerSocket

我们先来看服务器端的Socket实现。Socket编程在服务器部分大致流程就是:

  • 创建
  • 绑定,监听
  • 接受
  • 读取和回应
  • 关闭

基本上TServerSocket类中就是这么些方法:

创建和初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class TServerSocket(object):
"""Socket implementation for server side."""
def __init__(self, host=None, port=None, unix_socket=None,
socket_family=socket.AF_INET, client_timeout=3000,
backlog=128):
if unix_socket:
self.unix_socket = unix_socket
self.host = None
self.port = None
else:
self.unix_socket = None
self.host = host
self.port = port
self.socket_family = socket_family
self.client_timeout = client_timeout / 1000 if client_timeout else None
self.backlog = backlog
def _init_sock(self):
if self.unix_socket:
# try remove the sock file it already exists
_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
_sock.connect(self.unix_socket)
except (socket.error, OSError) as err:
if err.args[0] == errno.ECONNREFUSED:
os.unlink(self.unix_socket)
else:
_sock = socket.socket(self.socket_family, socket.SOCK_STREAM)
# 这里设置了端口重用,在套接字级别上设置选项
_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, "SO_REUSEPORT"):
_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
_sock.settimeout(None)
self.sock = _sock

监听

1
2
3
4
5
6
def listen(self):
self._init_sock()
addr = self.unix_socket or (self.host, self.port)
self.sock.bind(addr)
self.sock.listen(self.backlog)

接受

1
2
3
4
5
def accept(self):
client, _ = self.sock.accept()
if self.client_timeout:
client.settimeout(self.client_timeout)
return TSocket(sock=client)

读取和回应

TServerSocket没有定义这几个方法,因为是Thriftpy支持不同的协议,因此实现由不同的协议来实现不同的读取和解析方法。

关闭

1
2
3
4
5
6
7
8
9
def close(self):
if not self.sock:
return
try:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
except (socket.error, OSError):
pass

TSocket

TSocket即客户端的套接字,遵循以下流程:

  • 创建
  • 连接
  • 发送
  • 读取
  • 关闭

代码依旧很简单,这里就全部贴上来了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
class TSocket(object):
"""Socket implementation for client side."""
def __init__(self, host=None, port=None, unix_socket=None,
sock=None, socket_family=socket.AF_INET,
socket_timeout=3000, connect_timeout=None):
"""
@param sock(socket) 使用一个打开的套接字直接初始化
@param socket_family(str) socket.AF_INET or socket.AF_INET6.
"""
if sock:
self.sock = sock
elif unix_socket:
self.unix_socket = unix_socket
self.host = None
self.port = None
self.sock = None
else:
self.unix_socket = None
self.host = host
self.port = port
self.sock = None
self.socket_family = socket_family
self.socket_timeout = socket_timeout / 1000 if socket_timeout else None
self.connect_timeout = connect_timeout / 1000 if connect_timeout \
else self.socket_timeout
def _init_sock(self):
if self.unix_socket:
_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
_sock = socket.socket(self.socket_family, socket.SOCK_STREAM)
_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# socket options
linger = struct.pack('ii', 0, 0)
# close或 shutdown将等到所有套接字里排队的消息成功发送或到达延迟时间后才会返回。
# 该选项的参数(option_value)是一个linger结构
# struct linger {
# int l_onoff;
# int l_linger;
# };
_sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, linger)
_sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.sock = _sock
def set_handle(self, sock):
self.sock = sock
def set_timeout(self, ms):
"""Backward compat api, will bind the timeout to both connect_timeout
and socket_timeout.
"""
self.socket_timeout = ms / 1000 if (ms and ms > 0) else None
self.connect_timeout = self.socket_timeout
if self.sock is not None:
self.sock.settimeout(self.socket_timeout)
def is_open(self):
return bool(self.sock)
def open(self):
self._init_sock()
addr = self.unix_socket or (self.host, self.port)
try:
if self.connect_timeout:
self.sock.settimeout(self.connect_timeout)
self.sock.connect(addr)
if self.socket_timeout:
self.sock.settimeout(self.socket_timeout)
except (socket.error, OSError):
raise TTransportException(
type=TTransportException.NOT_OPEN,
message="Could not connect to %s" % str(addr))
def read(self, sz):
try:
buff = self.sock.recv(sz)
except socket.error as e:
if (e.args[0] == errno.ECONNRESET and
(sys.platform == 'darwin' or
sys.platform.startswith('freebsd'))):
# freebsd and Mach don't follow POSIX semantic of recv
# and fail with ECONNRESET if peer performed shutdown.
# See corresponding comment and code in TSocket::read()
# in lib/cpp/src/transport/TSocket.cpp.
self.close()
# Trigger the check to raise the END_OF_FILE exception below.
buff = ''
else:
raise
if len(buff) == 0:
raise TTransportException(type=TTransportException.END_OF_FILE,
message='TSocket read 0 bytes')
return buff
def write(self, buff):
self.sock.sendall(buff)
def flush(self):
pass
def close(self):
if not self.sock:
return
try:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
self.sock = None
except (socket.error, OSError):
pass

struct

这里补充一个知识点,就是Python操作字节流的库struct。

Python没有专门处理字节的数据类型。但由于b’str’可以表示字节,所以,字节数组=二进制str。但是对于其他的二进制数据比如整数值,浮点数等就比较麻烦了,因此Python提供了一个struct模块来解决bytes和其他二进制数据类型的转换。

struct的pack函数把任意数据类型变成bytes

1
2
3
>>> import struct
>>> struct.pack('>I', 10240099)
b'\x00\x9c@c'

pack的第一个参数是处理指令,'>I'的意思是:>表示字节顺序是big-endian,也就是网络序,I表示4字节无符号整数。后面的参数个数要和处理指令一致。

unpack把bytes变成相应的数据类型:

1
2
>>> struct.unpack('>IH', b'\xf0\xf0\xf0\xf0\x80\x80')
(4042322160, 32896)

根据>IH的说明,后面的bytes依次变为I:4字节无符号整数和H:2字节无符号整数。

上面程序里用到了linger = struct.pack('ii', 0, 0),也就是将两个0分别转换成int(有符号)的字节序列(bytes)

这里有一些转换的格式

小结

关于Thriftpy中使用的套接字就介绍到这里了,下一节将是Thriftpy中使用的协议,比如常用的TJSONProtocol