子嘉的博客 子嘉的博客
首页
bic-bic
技术
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

高子嘉

没有比脚更长的路,没有比人更高的山
首页
bic-bic
技术
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • install
  • virtualenv
  • stackless
  • twisted
    • 1. reactor(反应堆)
    • 2. Deferreds
      • 2.1 使用示例
      • 2.2 异常处理
      • 2.3 未捕获异常
      • 2.4 处理同步或异步结果
      • 2.5 取消 deferred
      • 2.6 超时
      • 2.7 DeferredList
      • 2.8 deferred 类概览
    • 3. 测试驱动开发
      • 3.1 指导思想
      • 3.2 twisted 的具体测试
      • 3.2.1 使用客户端与服务端测试
      • 3.2.2 在测试客户端中应用 Deferred
      • 3.2.3 其他用法
  • pytesseract
  • django

  • python
子嘉
2022-06-04
目录

twisted

# twisted

twisted 是用 Python 实现的基于事件驱动的网络引擎框架,twisted 支持常见的传输及应用层协议,包括 TCP、UDP、SSL/TLS、HTTP、IMAP、SSH、IRC 以及 FTP。 安装

$ pip install twisted
1
  • github 地址 https://github.com/twisted/twisted

  • 文档支持 https://twistedmatrix.com/documents/current/

  • Twisted 源码分析之 reactor https://www.jianshu.com/p/26ae331b09b0

  • Twisted deferred 翻译讲解 https://blog.csdn.net/happyanger6/article/details/53073881

协议处理类通常是 twisted.internet.protocol.Protocol 的子类,但是协议处理的持久化通常是一个工厂类,这个工厂类是 twisted.internet.protocol.Factory 的子类。工厂的 buildProtocol 方法用于为每个新连接创建一个协议。

# 1. reactor(反应堆)

reactor 是 Twisted 中事件循环的核心,事件循环使用 Twisted 驱动应用程序。事件循环是一种程序设计结构,它等待并调度程序中的事件或消息。它的工作方式是调用一些内部或外部“事件提供程序”,该事件提供程序通常会阻塞直到事件到达,然后再调用相关的事件处理程序(调度事件)。reactor 为许多服务提供基本接口,包括网络通信,线程和事件调度。

反应器有多种实现方式,每种实现方式都经过修改,可以为默认功能提供更好的支持,以支持默认功能。有关这些内容以及如何使用特定实现的更多信息,可以通过 选择 Reactor 获得。

Twisted 应用程序可以使用 twisted.application.service 中的接口来配置和运行该应用程序,而不是使用样板反应堆代码。

twisted 实现了设计模式中的反应堆(reactor)模式,twisted 的核心就是 reactor 时间循环。reactor 可以感知网络、文件系统以及定时器事件。 reactor 通过 run 函数启动

from twisted.internet import reactor

reactor.run()
1
2
3

reactor 是 Singleton(单例模式),在一个程序中只有一个 reactor,只要引入就会相应的创建一个。twisted 还可以引入其他 reactor,例如,可以使用 twisted.internet.pollreactor 中的 pool 替代 select 的系统调用。若无指定 reactor 则使用系统安装的默认 reactor。

from twisted.internet import poolreactor
poolreactor.install()

from twisted.insternet import reactor
reactor.run()
1
2
3
4
5

反应器通常实现一组接口,但是根据所选的反应器和平台,某些接口可能未实现:

  • IReactorCore:核心(必需)功能。
  • IReactorFDSet:使用 FileDescriptor 对象。
  • IReactorProcess:流程管理。
  • IReactorSSL:SSL 网络支持。
  • IReactorTCP:TCP 网络支持。
  • IReactorThreads:线程的使用和管理。
  • IReactorTime:调度接口。
  • IReactorUDP:UDP 网络支持。
  • IReactorUNIX:UNIX 套接字支持。
  • IReactorSocket:第三方套接字支持。

一个简单回显客户端消息的例子

from twisted.internet.protocol import Factory, Protocol
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor

class QOTD(Protocol):

    def connectionMade(self):
        # self.factory was set by the factory's default buildProtocol:
        self.transport.write(self.factory.quote + '\r\n')
        self.transport.loseConnection()


class QOTDFactory(Factory):

    # This will be used by the default buildProtocol to create new protocols:
    protocol = QOTD

    def __init__(self, quote=None):
        self.quote = quote or 'An apple a day keeps the doctor away'

endpoint = TCP4ServerEndpoint(reactor, 8007)
endpoint.listen(QOTDFactory("configurable quote"))
reactor.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 2. Deferreds

deferreds 是一种异步延迟回调

# 2.1 使用示例

#-*- coding:utf-8 -*-

from twisted.internet import reactor, defer

class Getter:
    def gotResult(self, x):
        if self.d is None:
            print "Nowhere to put results"
            return

        d = self.d
        self.d = None # 防止重复调用
        if x % 2 == 0:
            d.callback(x * 3)  #成功回调
        else:
            d.errback(ValueError("You used an odd number!")) #异常回调

    def _toHTML(self, r):
        return "Result: %s" % r

    def getDummyData(self, x):
        self.d = defer.Deferred()
        reactor.callLater(2, self.gotResult, x)  # 2秒后调用gotResults
        self.d.addCallback(self._toHTML)
        return self.d

def cbPrintData(result):
    print(result)

def ebPrintError(failure):
    import sys
    sys.stderr.write(str(failure))

g = Getter()
d = g.getDummyData(3)
d.addCallback(cbPrintData)
d.addErrback(ebPrintError)

g = Getter()
d = g.getDummyData(4)
d.addCallback(cbPrintData)
d.addErrback(ebPrintError)

reactor.callLater(4, reactor.stop)
reactor.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

其中 getPage(url)返回的是一个 defer.Deferred()对象,addCallbacks 给 defer 对象增加回调(成功和失败回调),addBoth 给 defer 对象增加回调(成功失败添加同一回调),reactor.run() 启动 reactor

  • 重复对 d 调用 callback,会导致 AlreadyCalledError 异常
  • callback 调用异常会抛出 errback()异常,可以通过.errback(failure)返回调用失败,failure 通常是 wtisted.python.failure.Failure 的实例

# 2.2 异常处理

先看代码

# Case 1
d = getDeferredFromSomewhere()
d.addCallback(callback1)       # A
d.addErrback(errback1)         # B
d.addCallback(callback2)
d.addErrback(errback2)

# Case 2
d = getDeferredFromSomewhere()
d.addCallbacks(callback1, errback1)  # C
d.addCallbacks(callback2, errback2)
1
2
3
4
5
6
7
8
9
10
11

如果 callback1 发生了错误,对于 Case1,将会使用 Failure 对象调用 errback1;而对于 Case2,则会调用 errback2。

  • 对于 1,callback1 只会处理 deferred 执行成功的情形,而 errback1 将会处理 deferred 的错误和 callback1 中的错误
  • 对于 2,errorback1 只会处理 deferred 执行错误的情形,而不处理 callback1 中的错误

# 2.3 未捕获异常

如果 Deferred 被垃圾回收并带有未处理的错误(即如果有一个错误,它将调用下一个 errback),那么 Twisted 会将错误的回溯写入日志文件。这意味着您通常可以避免不添加错误回复,但仍然会记录错误。但是要小心;如果在周围保留对 Deferred 的引用,以防止被垃圾回收,则您可能永远看不到错误(而且您的回调似乎也从未被调用过)。如果不确定,应该在回调之后显式添加一个 errback,即:

# Make sure errors get logged
from twisted.python import log
d.addErrback(log.err)
1
2
3

# 2.4 处理同步或异步结果

使用 maybeDeferred 可以返回一个 Deferred 对象,即使调用不是延迟的

twisted.internet.defer.maybeDeferred(f, *args, **kw)
1

# 2.5 取消 deferred

一个 Deferred 对象可能很长时间才能返回结果然后执行 callback,也可能永远不返回结果。当选择忽略结果时,deferred 对象可能仍在后台进行一些操作,消耗一些资源,比如 CPU、内存网络带宽,甚至磁盘空间。所以有时候希望显式地指明停止对 deferred 对象的关注,这样 deferred 对象就可以进行清理工作,并释放系统资源。twisted.internet.defer.cancel 方法可以用来停止对 deferr 的等待。

当调用 cancel 后,将会进行如下操作

  1. 如果后台正在进行连接操作,将会被终止
  2. Deferred 将会以莫衷方式及时地完成
  3. 就好像引起 Deferred 使用 CancelledError 调用 errbacked 这一系列操作的顺序非常重要,调用取消意味着我们希望停止后台操作,但是后台操作不一定能够立即做出相应。有一些情况可能不会被打断,例如:平台相关的域名解析操作会阻塞,因此需要在一个线程中执行;当等待一个以这种线程方式实现的域名解析时连接操作不能被取消。所以要取消的 Deferred 对象,不会立即执行 callback 或者 errback。

理想情况下,发出 cancel 请求后正在进行的服务都会停止,但是没有办法保证在多个 deferred 对象间的操作被取消。取消 Deferred 对象会尽最大努力:

  1. Deferred 对象不知道如何取消底层操作
  2. 底层操作处于不可取消的状态,比如做过一些不可逆操作。
  3. Deferred 对象已经有一个结果,取消操作什么也不做
  4. 无论能否取消成功,cancel 调用总是会成功。对于 1、2 情况,Deferred 对象会用 twisted.internet.defer.CancelledError 调用 errback,尽管后台操作可能还在进行。 如果被取消的 Deferred 对象还在等待另一个 Deferred 对象,则取消操作直接发送到另一个 Deferred

# 2.6 超时

超时是取消的一种特殊情况。可以在 Deferred 对象还没开始时,通过调用 Deferred.addTimeout 产生一个 TimeoutError。

from twisted.internet import task, defer

def logTimeout(result, timeout):
    print("Got {0!r} but actually timed out after {1} seconds".format(
        result, timeout))
    return result + " (timed out)"

def main(reactor):
    # generate a deferred with a custom canceller function, and never
    # never callback or errback it to guarantee it gets timed out
    d = defer.Deferred(lambda c: c.callback("Everything's ok!"))
    d.addTimeout(2, reactor, onTimeoutCancel=logTimeout)  # 等待超时
    d.addBoth(print)  # 此调用不会被等待超时
    return d

task.react(main)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

Deferred.addTimeout 的调用时机,决定了哪些 callbacks 会被超时控制。addTimeout 将监听在它调用之前的 callbacks 和 errbacks 超时,不包括在这之后的 callbacks 和 errbacks。超时计算在 addTimeout 调用之后就开始计算。

# 2.7 DeferredList

通过调用 deferredList,可以等待多个 Deferred 完成后再调用 callback。

# Creates a DeferredList
dl = defer.DeferredList([deferred1, deferred2, deferred3])
1
2

可以通过一个例子理解 DeferredList

# A callback that unpacks and prints the results of a DeferredList
def printResult(result):
    for (success, value) in result:
        if success:
            print('Success:', value)
        else:
            print('Failure:', value.getErrorMessage())

# Create three deferreds.
deferred1 = defer.Deferred()
deferred2 = defer.Deferred()
deferred3 = defer.Deferred()

# Pack them into a DeferredList
dl = defer.DeferredList([deferred1, deferred2, deferred3], consumeErrors=True)

# Add our callback
dl.addCallback(printResult)

# Fire our three deferreds with various values.
deferred1.callback('one')
deferred2.errback(Exception('bang!'))
deferred3.callback('three')

# At this point, dl will fire its callback, printing:
#    Success: one
#    Failure: bang!
#    Success: three
# (note that defer.SUCCESS == True, and defer.FAILURE == False)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

一个标准的 DeferredList 永远不会调用 errback,但是 DeferrerList 中的 Deferred 的失败将会调用它的 errback。 如果在 DeferredList 添加之后调用 callback,这个 callback 的调用结果不会向 DeferredList 的 callback 返回。为了避免造成困惑,建议不要在 deferred 对象已经添加到 DeferredList 之后再调用 callback。 DeferredList 的其他行为,先看一下源码实现会更直接,DeferredList 实际就是对 Deferred 的一层封装。

class DeferredList(Deferred):  # 继承Deferred类
    fireOnOneCallback = False  # 每次CallBack成功后调用
    fireOnOneErrback = False   # 每次ErrBack后调用

    def __init__(self, deferredList, fireOnOneCallback=False,
                 fireOnOneErrback=False, consumeErrors=False):

        self._deferredList = list(deferredList)
        self.resultList = [None] * len(self._deferredList)
        Deferred.__init__(self)
        if len(self._deferredList) == 0 and not fireOnOneCallback:
            self.callback(self.resultList) # DeferredList为空时,直接回调


        self.fireOnOneCallback = fireOnOneCallback # 赋值
        self.fireOnOneErrback = fireOnOneErrback  # 赋值
        self.consumeErrors = consumeErrors  # 赋值
        self.finishedCount = 0  # 完成CallBack调用的计数

        index = 0
        for deferred in self._deferredList: # 初始化deferred list的callback
            deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
                                  callbackArgs=(index,SUCCESS),
                                  errbackArgs=(index,FAILURE))
            index = index + 1
    def _cbDeferred(self, result, index, succeeded):# 内部callback函数
        self.resultList[index] = (succeeded, result)

        self.finishedCount += 1
        if not self.called:
            # 确保只调用一次callback,在callback或errback调用后called会被置为True
            # 这里要注意 fireOnOnce 的回调和全部完成后的回调参数是不一样的
            if succeeded == SUCCESS and self.fireOnOneCallback:
                self.callback((result, index))  # 成功调用
            elif succeeded == FAILURE and self.fireOnOneErrback:
                self.errback(failure.Failure(FirstError(result, index))) # 失败调用
            elif self.finishedCount == len(self.resultList):
                self.callback(self.resultList) #

        if succeeded == FAILURE and self.consumeErrors:
            result = None

        return result


    def cancel(self):
        pass

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

启用可选参数,可使用如下方式调用。

dl = defer.DeferredList([deferred1, deferred2, deferred3], fireOnOneCallback=True, consumeErrors=True)

dl.addCallback(printResult)
1
2
3

DeferredList 的一个常见用途是收集并行一步操作结果。如果所有的操作都成功则成功,如果有一个失败则失败,在这种情况下课时使用 twisted.internet.defer.gatherResults 看一下 DeferredList 的

# 2.8 deferred 类概览

  • addCallbacks(self, callback[, errback, callbackArgs, callbackKeywords, errbackArgs, errbackKeywords])
  • addCallback(callback,*callbackArgs,**callbackKeywords)
  • addErrback(errback,*errbackArgs,**errbackKeywords)
  • addBoth(callbackOrErrback,*callbackOrErrbackArgs,**callbackOrErrbackKeywords)

Deferred 链 可以使用*chainDeferred(otherDeferred)*调用,这个函数的作用和 *self.addCallbacks(otherDeferred.callback,otherDeferred.errback)*一样.

关于书写异步函数返回 Deferred 对象,可以参照 https://twistedmatrix.com/documents/current/core/howto/gendefer.html

# 3. 测试驱动开发

# 3.1 指导思想

在正式编写代码之前写编写测试代码,先保证所有测试用例都会失败,然后再编写代码。当代码有 bug 需要修改时,需要先编写让测试不通过的测试用例,再修复 bug。测试代码覆盖率是软件测试的重要指标,其中包括路径覆盖率、条件覆盖率、语句覆盖率等。

先写一个例子,目录结构如下

calculus/__init__.py
calculus/base_1.py
calculus/test/__init__.py
calculus/test/test_base_1.py
1
2
3
4
# -*- test-case-name: calculus.test.test_base_1 -*-
# filename: base_1.py

class Calculation(object):
    def add(self, a, b):
        pass

    def subtract(self, a, b):
        pass

    def multiply(self, a, b):
        pass

    def divide(self, a, b):
        pass
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# filename: test_base_1.py

from calculus.base_1 import Calculation
from twisted.trial import unittest

class CalculationTestCase(unittest.TestCase):
    def test_add(self):
        calc = Calculation()
        result = calc.add(3, 8)
        self.assertEqual(result, 11)

    def test_subtract(self):
        calc = Calculation()
        result = calc.subtract(7, 3)
        self.assertEqual(result, 4)

    def test_multiply(self):
        calc = Calculation()
        result = calc.multiply(12, 5)
        self.assertEqual(result, 60)

    def test_divide(self):
        calc = Calculation()
        result = calc.divide(12, 5)
        self.assertEqual(result, 2)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

使用下面命令启动测试

  • python -m twisted.trial calculus: 运行 calculus 包的所有测试

  • python -m twisted.trial calculus.test: run using Python’s import notation.

  • python -m twisted.trial calculus.test.test_base_1: 运行一个特定的测试用例 You can follow that logic by putting your class name and even a method name to only run those specific tests.

  • python -m twisted.trial --testmodule=calculus/base_1.py: use the test-case-name comment in the first line of calculus/base_1.py to find the tests.

  • python -m twisted.trial calculus/test: 运行目录下所有测试 (不推荐).

  • python -m twisted.trial calculus/test/test_base_1.py: 运行一个测试文件 (不推荐).

# 3.2 twisted 的具体测试

本节笔记参照 twisted

测试部分文档 (opens new window)书写。这里不在罗列代码,代码可在浏览器中查看。

测试类需要继承 twisted.trial.unittest.TestCase 类,测试类可以提供对应的 setUp 和 tearDown 方法。在每次启动测试时会调用 setUp 方法,测试后调用 tearDown 方法

# 3.2.1 使用客户端与服务端测试

测试类可以结合 reactor 的协议工厂,通过 twisted.test.proto_helpers.StringTransport 类伪造网络传输数据从而进行测试。这种方法用于模拟没有网络的网络连接,通过这种方法,可以忽略掉网络的不可靠性,从而可以编写 100%可靠的测试。还可以用确定性的方式,测试网络故障。 测试文件为:test_remote_1.py (opens new window) remote_1.py (opens new window)。在

可使用下面命令进行测试

$ python calculus/remote_1.py

$ python -m twisted.trial calculus.test.test_remote_1
1
2
3

在启动测试服务端后,也可使用 telnet 测试

$ telnet localhost 46194
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
add 4123 9423
13546
1
2
3
4
5
6

# 3.2.2 在测试客户端中应用 Deferred

在测试客户端中要和服务端通信(伪造协议,同步过程),服务器代码不可预知也许是异步过程,所以测试实际上是一个异步的过程,可以使用 Deferred 来加速异步测试

from calculus.client_1 import RemoteCalculationClient
from twisted.trial import unittest
from twisted.test import proto_helpers


class ClientCalculationTestCase(unittest.TestCase):
    def setUp(self):
        self.tr = proto_helpers.StringTransport()
        self.proto = RemoteCalculationClient()
        self.proto.makeConnection(self.tr)

    def _test(self, operation, a, b, expected):
        d = getattr(self.proto, operation)(a, b) #调用一个测试,这里返回一个Deferred对象
        self.assertEqual(
            self.tr.value(), # proto_helpers 消息,用于校验测试格式
            u'{} {} {}\r\n'.format(operation, a, b).encode('utf-8')
        )
        self.tr.clear() # 清空proto_helpers 中的数据,准备下一次测试
        d.addCallback(self.assertEqual, expected) # 为Deferred添加 asserEqual 测试函数
        # 模拟接收数据,驱动self.proto对象中lineReceived函数的调用,这里模拟使用的字符串是经过校验的expected字符串。
        self.proto.dataReceived(u"{}\r\n".format(expected,).encode('utf-8'))
        return d

    def test_add(self):
        return self._test('add', 7, 6, 13)

    def test_subtract(self):
        return self._test('subtract', 82, 78, 4)

    def test_multiply(self):
        return self._test('multiply', 2, 8, 16)

    def test_divide(self):
        return self._test('divide', 14, 3, 4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# -*- test-case-name: calculus.test.test_client_1 -*-
from twisted.protocols import basic
from twisted.internet import defer

class RemoteCalculationClient(basic.LineReceiver): # 继承一个按行处理类
    def __init__(self):
        self.results = []

    def lineReceived(self, line): #按行处理协议接收数据
        d = self.results.pop(0)
        d.callback(int(line)) # callback调用,这里实际去调用代码执行
        # reactor.()

    def _sendOperation(self, op, a, b):
        d = defer.Deferred()
        self.results.append(d)
        line = u"{} {} {}".format(op, a, b).encode('utf-8')
        self.sendLine(line) # 发送测试格式回测试调用,用于校验测试格式是非正确
        return d # 返回一个Deferred对象

    def add(self, a, b):
        return self._sendOperation("add", a, b)

    def subtract(self, a, b):
        return self._sendOperation("subtract", a, b)

    def multiply(self, a, b):
        return self._sendOperation("multiply", a, b)

    def divide(self, a, b):
        return self._sendOperation("divide", a, b)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 3.2.3 其他用法

超时和异常的更多测试可参照 twisted 文档的其他测试用例。

主要关注点在于以前我们使用 twisted.test.proto_helpers.StringTransport 进行协议伪装测试,测试超时和失去连接可以使用 twisted.test.proto_helpers.StringTransportWithDisconnection 和 twisted.internet.task.Clock.advance 和协议的timeOut。

代码测试可以使用 coverage 测试并生成测试报告,具体使用后面再研究。

编辑 (opens new window)
上次更新: 2023/02/24, 10:34:03
stackless
pytesseract

← stackless pytesseract→

最近更新
01
mongodb restore
03-06
02
pytesseract
02-28
03
consul
02-24
更多文章>
Theme by Vdoing | Copyright © 2022-2025 子嘉 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式