twisted
# twisted
twisted 是用 Python 实现的基于事件驱动的网络引擎框架,twisted 支持常见的传输及应用层协议,包括 TCP、UDP、SSL/TLS、HTTP、IMAP、SSH、IRC 以及 FTP。 安装
$ pip install twisted
github 地址 https://github.com/twisted/twisted
文档支持 https://twistedmatrix.com/documents/current/
Twisted 源码分析之 reactor https://www.jianshu.com/p/26ae331b09b0
Twisted deferred 翻译讲解 https://blog.csdn.net/happyanger6/article/details/53073881
协议处理类通常是 twisted.internet.protocol.Protocol 的子类,但是协议处理的持久化通常是一个工厂类,这个工厂类是 twisted.internet.protocol.Factory 的子类。工厂的 buildProtocol 方法用于为每个新连接创建一个协议。
# 1. reactor(反应堆)
reactor 是 Twisted 中事件循环的核心,事件循环使用 Twisted 驱动应用程序。事件循环是一种程序设计结构,它等待并调度程序中的事件或消息。它的工作方式是调用一些内部或外部“事件提供程序”,该事件提供程序通常会阻塞直到事件到达,然后再调用相关的事件处理程序(调度事件)。reactor 为许多服务提供基本接口,包括网络通信,线程和事件调度。
反应器有多种实现方式,每种实现方式都经过修改,可以为默认功能提供更好的支持,以支持默认功能。有关这些内容以及如何使用特定实现的更多信息,可以通过 选择 Reactor 获得。
Twisted 应用程序可以使用 twisted.application.service 中的接口来配置和运行该应用程序,而不是使用样板反应堆代码。
twisted 实现了设计模式中的反应堆(reactor)模式,twisted 的核心就是 reactor 时间循环。reactor 可以感知网络、文件系统以及定时器事件。 reactor 通过 run 函数启动
from twisted.internet import reactor
reactor.run()
2
3
reactor 是 Singleton(单例模式),在一个程序中只有一个 reactor,只要引入就会相应的创建一个。twisted 还可以引入其他 reactor,例如,可以使用 twisted.internet.pollreactor 中的 pool 替代 select 的系统调用。若无指定 reactor 则使用系统安装的默认 reactor。
from twisted.internet import poolreactor
poolreactor.install()
from twisted.insternet import reactor
reactor.run()
2
3
4
5
反应器通常实现一组接口,但是根据所选的反应器和平台,某些接口可能未实现:
- IReactorCore:核心(必需)功能。
- IReactorFDSet:使用 FileDescriptor 对象。
- IReactorProcess:流程管理。
- IReactorSSL:SSL 网络支持。
- IReactorTCP:TCP 网络支持。
- IReactorThreads:线程的使用和管理。
- IReactorTime:调度接口。
- IReactorUDP:UDP 网络支持。
- IReactorUNIX:UNIX 套接字支持。
- IReactorSocket:第三方套接字支持。
一个简单回显客户端消息的例子
from twisted.internet.protocol import Factory, Protocol
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor
class QOTD(Protocol):
def connectionMade(self):
# self.factory was set by the factory's default buildProtocol:
self.transport.write(self.factory.quote + '\r\n')
self.transport.loseConnection()
class QOTDFactory(Factory):
# This will be used by the default buildProtocol to create new protocols:
protocol = QOTD
def __init__(self, quote=None):
self.quote = quote or 'An apple a day keeps the doctor away'
endpoint = TCP4ServerEndpoint(reactor, 8007)
endpoint.listen(QOTDFactory("configurable quote"))
reactor.run()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 2. Deferreds
deferreds 是一种异步延迟回调
# 2.1 使用示例
#-*- coding:utf-8 -*-
from twisted.internet import reactor, defer
class Getter:
def gotResult(self, x):
if self.d is None:
print "Nowhere to put results"
return
d = self.d
self.d = None # 防止重复调用
if x % 2 == 0:
d.callback(x * 3) #成功回调
else:
d.errback(ValueError("You used an odd number!")) #异常回调
def _toHTML(self, r):
return "Result: %s" % r
def getDummyData(self, x):
self.d = defer.Deferred()
reactor.callLater(2, self.gotResult, x) # 2秒后调用gotResults
self.d.addCallback(self._toHTML)
return self.d
def cbPrintData(result):
print(result)
def ebPrintError(failure):
import sys
sys.stderr.write(str(failure))
g = Getter()
d = g.getDummyData(3)
d.addCallback(cbPrintData)
d.addErrback(ebPrintError)
g = Getter()
d = g.getDummyData(4)
d.addCallback(cbPrintData)
d.addErrback(ebPrintError)
reactor.callLater(4, reactor.stop)
reactor.run()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
其中 getPage(url)返回的是一个 defer.Deferred()对象,addCallbacks 给 defer 对象增加回调(成功和失败回调),addBoth 给 defer 对象增加回调(成功失败添加同一回调),reactor.run() 启动 reactor
- 重复对 d 调用 callback,会导致 AlreadyCalledError 异常
- callback 调用异常会抛出 errback()异常,可以通过.errback(failure)返回调用失败,failure 通常是 wtisted.python.failure.Failure 的实例
# 2.2 异常处理
先看代码
# Case 1
d = getDeferredFromSomewhere()
d.addCallback(callback1) # A
d.addErrback(errback1) # B
d.addCallback(callback2)
d.addErrback(errback2)
# Case 2
d = getDeferredFromSomewhere()
d.addCallbacks(callback1, errback1) # C
d.addCallbacks(callback2, errback2)
2
3
4
5
6
7
8
9
10
11
如果 callback1 发生了错误,对于 Case1,将会使用 Failure 对象调用 errback1;而对于 Case2,则会调用 errback2。
- 对于 1,callback1 只会处理 deferred 执行成功的情形,而 errback1 将会处理 deferred 的错误和 callback1 中的错误
- 对于 2,errorback1 只会处理 deferred 执行错误的情形,而不处理 callback1 中的错误
# 2.3 未捕获异常
如果 Deferred 被垃圾回收并带有未处理的错误(即如果有一个错误,它将调用下一个 errback),那么 Twisted 会将错误的回溯写入日志文件。这意味着您通常可以避免不添加错误回复,但仍然会记录错误。但是要小心;如果在周围保留对 Deferred 的引用,以防止被垃圾回收,则您可能永远看不到错误(而且您的回调似乎也从未被调用过)。如果不确定,应该在回调之后显式添加一个 errback,即:
# Make sure errors get logged
from twisted.python import log
d.addErrback(log.err)
2
3
# 2.4 处理同步或异步结果
使用 maybeDeferred 可以返回一个 Deferred 对象,即使调用不是延迟的
twisted.internet.defer.maybeDeferred(f, *args, **kw)
# 2.5 取消 deferred
一个 Deferred 对象可能很长时间才能返回结果然后执行 callback,也可能永远不返回结果。当选择忽略结果时,deferred 对象可能仍在后台进行一些操作,消耗一些资源,比如 CPU、内存网络带宽,甚至磁盘空间。所以有时候希望显式地指明停止对 deferred 对象的关注,这样 deferred 对象就可以进行清理工作,并释放系统资源。twisted.internet.defer.cancel 方法可以用来停止对 deferr 的等待。
当调用 cancel 后,将会进行如下操作
- 如果后台正在进行连接操作,将会被终止
- Deferred 将会以莫衷方式及时地完成
- 就好像引起 Deferred 使用 CancelledError 调用 errbacked 这一系列操作的顺序非常重要,调用取消意味着我们希望停止后台操作,但是后台操作不一定能够立即做出相应。有一些情况可能不会被打断,例如:平台相关的域名解析操作会阻塞,因此需要在一个线程中执行;当等待一个以这种线程方式实现的域名解析时连接操作不能被取消。所以要取消的 Deferred 对象,不会立即执行 callback 或者 errback。
理想情况下,发出 cancel 请求后正在进行的服务都会停止,但是没有办法保证在多个 deferred 对象间的操作被取消。取消 Deferred 对象会尽最大努力:
- Deferred 对象不知道如何取消底层操作
- 底层操作处于不可取消的状态,比如做过一些不可逆操作。
- Deferred 对象已经有一个结果,取消操作什么也不做
- 无论能否取消成功,cancel 调用总是会成功。对于 1、2 情况,Deferred 对象会用 twisted.internet.defer.CancelledError 调用 errback,尽管后台操作可能还在进行。 如果被取消的 Deferred 对象还在等待另一个 Deferred 对象,则取消操作直接发送到另一个 Deferred
# 2.6 超时
超时是取消的一种特殊情况。可以在 Deferred 对象还没开始时,通过调用 Deferred.addTimeout 产生一个 TimeoutError。
from twisted.internet import task, defer
def logTimeout(result, timeout):
print("Got {0!r} but actually timed out after {1} seconds".format(
result, timeout))
return result + " (timed out)"
def main(reactor):
# generate a deferred with a custom canceller function, and never
# never callback or errback it to guarantee it gets timed out
d = defer.Deferred(lambda c: c.callback("Everything's ok!"))
d.addTimeout(2, reactor, onTimeoutCancel=logTimeout) # 等待超时
d.addBoth(print) # 此调用不会被等待超时
return d
task.react(main)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Deferred.addTimeout 的调用时机,决定了哪些 callbacks 会被超时控制。addTimeout 将监听在它调用之前的 callbacks 和 errbacks 超时,不包括在这之后的 callbacks 和 errbacks。超时计算在 addTimeout 调用之后就开始计算。
# 2.7 DeferredList
通过调用 deferredList,可以等待多个 Deferred 完成后再调用 callback。
# Creates a DeferredList
dl = defer.DeferredList([deferred1, deferred2, deferred3])
2
可以通过一个例子理解 DeferredList
# A callback that unpacks and prints the results of a DeferredList
def printResult(result):
for (success, value) in result:
if success:
print('Success:', value)
else:
print('Failure:', value.getErrorMessage())
# Create three deferreds.
deferred1 = defer.Deferred()
deferred2 = defer.Deferred()
deferred3 = defer.Deferred()
# Pack them into a DeferredList
dl = defer.DeferredList([deferred1, deferred2, deferred3], consumeErrors=True)
# Add our callback
dl.addCallback(printResult)
# Fire our three deferreds with various values.
deferred1.callback('one')
deferred2.errback(Exception('bang!'))
deferred3.callback('three')
# At this point, dl will fire its callback, printing:
# Success: one
# Failure: bang!
# Success: three
# (note that defer.SUCCESS == True, and defer.FAILURE == False)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
一个标准的 DeferredList 永远不会调用 errback,但是 DeferrerList 中的 Deferred 的失败将会调用它的 errback。 如果在 DeferredList 添加之后调用 callback,这个 callback 的调用结果不会向 DeferredList 的 callback 返回。为了避免造成困惑,建议不要在 deferred 对象已经添加到 DeferredList 之后再调用 callback。 DeferredList 的其他行为,先看一下源码实现会更直接,DeferredList 实际就是对 Deferred 的一层封装。
class DeferredList(Deferred): # 继承Deferred类
fireOnOneCallback = False # 每次CallBack成功后调用
fireOnOneErrback = False # 每次ErrBack后调用
def __init__(self, deferredList, fireOnOneCallback=False,
fireOnOneErrback=False, consumeErrors=False):
self._deferredList = list(deferredList)
self.resultList = [None] * len(self._deferredList)
Deferred.__init__(self)
if len(self._deferredList) == 0 and not fireOnOneCallback:
self.callback(self.resultList) # DeferredList为空时,直接回调
self.fireOnOneCallback = fireOnOneCallback # 赋值
self.fireOnOneErrback = fireOnOneErrback # 赋值
self.consumeErrors = consumeErrors # 赋值
self.finishedCount = 0 # 完成CallBack调用的计数
index = 0
for deferred in self._deferredList: # 初始化deferred list的callback
deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
callbackArgs=(index,SUCCESS),
errbackArgs=(index,FAILURE))
index = index + 1
def _cbDeferred(self, result, index, succeeded):# 内部callback函数
self.resultList[index] = (succeeded, result)
self.finishedCount += 1
if not self.called:
# 确保只调用一次callback,在callback或errback调用后called会被置为True
# 这里要注意 fireOnOnce 的回调和全部完成后的回调参数是不一样的
if succeeded == SUCCESS and self.fireOnOneCallback:
self.callback((result, index)) # 成功调用
elif succeeded == FAILURE and self.fireOnOneErrback:
self.errback(failure.Failure(FirstError(result, index))) # 失败调用
elif self.finishedCount == len(self.resultList):
self.callback(self.resultList) #
if succeeded == FAILURE and self.consumeErrors:
result = None
return result
def cancel(self):
pass
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
启用可选参数,可使用如下方式调用。
dl = defer.DeferredList([deferred1, deferred2, deferred3], fireOnOneCallback=True, consumeErrors=True)
dl.addCallback(printResult)
2
3
DeferredList 的一个常见用途是收集并行一步操作结果。如果所有的操作都成功则成功,如果有一个失败则失败,在这种情况下课时使用 twisted.internet.defer.gatherResults 看一下 DeferredList 的
# 2.8 deferred 类概览
- addCallbacks(self, callback[, errback, callbackArgs, callbackKeywords, errbackArgs, errbackKeywords])
- addCallback(callback,*callbackArgs,**callbackKeywords)
- addErrback(errback,*errbackArgs,**errbackKeywords)
- addBoth(callbackOrErrback,*callbackOrErrbackArgs,**callbackOrErrbackKeywords)
Deferred 链 可以使用*chainDeferred(otherDeferred)*调用,这个函数的作用和 *self.addCallbacks(otherDeferred.callback,otherDeferred.errback)*一样.
关于书写异步函数返回 Deferred 对象,可以参照 https://twistedmatrix.com/documents/current/core/howto/gendefer.html
# 3. 测试驱动开发
# 3.1 指导思想
在正式编写代码之前写编写测试代码,先保证所有测试用例都会失败,然后再编写代码。当代码有 bug 需要修改时,需要先编写让测试不通过的测试用例,再修复 bug。测试代码覆盖率是软件测试的重要指标,其中包括路径覆盖率、条件覆盖率、语句覆盖率等。
先写一个例子,目录结构如下
calculus/__init__.py
calculus/base_1.py
calculus/test/__init__.py
calculus/test/test_base_1.py
2
3
4
# -*- test-case-name: calculus.test.test_base_1 -*-
# filename: base_1.py
class Calculation(object):
def add(self, a, b):
pass
def subtract(self, a, b):
pass
def multiply(self, a, b):
pass
def divide(self, a, b):
pass
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# filename: test_base_1.py
from calculus.base_1 import Calculation
from twisted.trial import unittest
class CalculationTestCase(unittest.TestCase):
def test_add(self):
calc = Calculation()
result = calc.add(3, 8)
self.assertEqual(result, 11)
def test_subtract(self):
calc = Calculation()
result = calc.subtract(7, 3)
self.assertEqual(result, 4)
def test_multiply(self):
calc = Calculation()
result = calc.multiply(12, 5)
self.assertEqual(result, 60)
def test_divide(self):
calc = Calculation()
result = calc.divide(12, 5)
self.assertEqual(result, 2)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
使用下面命令启动测试
python -m twisted.trial calculus: 运行 calculus 包的所有测试
python -m twisted.trial calculus.test: run using Python’s import notation.
python -m twisted.trial calculus.test.test_base_1: 运行一个特定的测试用例 You can follow that logic by putting your class name and even a method name to only run those specific tests.
python -m twisted.trial --testmodule=calculus/base_1.py: use the test-case-name comment in the first line of calculus/base_1.py to find the tests.
python -m twisted.trial calculus/test: 运行目录下所有测试 (不推荐).
python -m twisted.trial calculus/test/test_base_1.py: 运行一个测试文件 (不推荐).
# 3.2 twisted 的具体测试
本节笔记参照 twisted
测试部分文档 (opens new window)书写。这里不在罗列代码,代码可在浏览器中查看。
测试类需要继承 twisted.trial.unittest.TestCase 类,测试类可以提供对应的 setUp 和 tearDown 方法。在每次启动测试时会调用 setUp 方法,测试后调用 tearDown 方法
# 3.2.1 使用客户端与服务端测试
测试类可以结合 reactor 的协议工厂,通过 twisted.test.proto_helpers.StringTransport 类伪造网络传输数据从而进行测试。这种方法用于模拟没有网络的网络连接,通过这种方法,可以忽略掉网络的不可靠性,从而可以编写 100%可靠的测试。还可以用确定性的方式,测试网络故障。 测试文件为:test_remote_1.py (opens new window) remote_1.py (opens new window)。在
可使用下面命令进行测试
$ python calculus/remote_1.py
$ python -m twisted.trial calculus.test.test_remote_1
2
3
在启动测试服务端后,也可使用 telnet 测试
$ telnet localhost 46194
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
add 4123 9423
13546
2
3
4
5
6
# 3.2.2 在测试客户端中应用 Deferred
在测试客户端中要和服务端通信(伪造协议,同步过程),服务器代码不可预知也许是异步过程,所以测试实际上是一个异步的过程,可以使用 Deferred 来加速异步测试
from calculus.client_1 import RemoteCalculationClient
from twisted.trial import unittest
from twisted.test import proto_helpers
class ClientCalculationTestCase(unittest.TestCase):
def setUp(self):
self.tr = proto_helpers.StringTransport()
self.proto = RemoteCalculationClient()
self.proto.makeConnection(self.tr)
def _test(self, operation, a, b, expected):
d = getattr(self.proto, operation)(a, b) #调用一个测试,这里返回一个Deferred对象
self.assertEqual(
self.tr.value(), # proto_helpers 消息,用于校验测试格式
u'{} {} {}\r\n'.format(operation, a, b).encode('utf-8')
)
self.tr.clear() # 清空proto_helpers 中的数据,准备下一次测试
d.addCallback(self.assertEqual, expected) # 为Deferred添加 asserEqual 测试函数
# 模拟接收数据,驱动self.proto对象中lineReceived函数的调用,这里模拟使用的字符串是经过校验的expected字符串。
self.proto.dataReceived(u"{}\r\n".format(expected,).encode('utf-8'))
return d
def test_add(self):
return self._test('add', 7, 6, 13)
def test_subtract(self):
return self._test('subtract', 82, 78, 4)
def test_multiply(self):
return self._test('multiply', 2, 8, 16)
def test_divide(self):
return self._test('divide', 14, 3, 4)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# -*- test-case-name: calculus.test.test_client_1 -*-
from twisted.protocols import basic
from twisted.internet import defer
class RemoteCalculationClient(basic.LineReceiver): # 继承一个按行处理类
def __init__(self):
self.results = []
def lineReceived(self, line): #按行处理协议接收数据
d = self.results.pop(0)
d.callback(int(line)) # callback调用,这里实际去调用代码执行
# reactor.()
def _sendOperation(self, op, a, b):
d = defer.Deferred()
self.results.append(d)
line = u"{} {} {}".format(op, a, b).encode('utf-8')
self.sendLine(line) # 发送测试格式回测试调用,用于校验测试格式是非正确
return d # 返回一个Deferred对象
def add(self, a, b):
return self._sendOperation("add", a, b)
def subtract(self, a, b):
return self._sendOperation("subtract", a, b)
def multiply(self, a, b):
return self._sendOperation("multiply", a, b)
def divide(self, a, b):
return self._sendOperation("divide", a, b)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 3.2.3 其他用法
超时和异常的更多测试可参照 twisted 文档的其他测试用例。
主要关注点在于以前我们使用 twisted.test.proto_helpers.StringTransport 进行协议伪装测试,测试超时和失去连接可以使用 twisted.test.proto_helpers.StringTransportWithDisconnection 和 twisted.internet.task.Clock.advance 和协议的timeOut。
代码测试可以使用 coverage 测试并生成测试报告,具体使用后面再研究。