Python 子进程与 SIGPIPE 信号

众所周知,head 命令用于过滤标准输入前面 10 行(或由用户指定数量)的内容。即便上游命令的输出很多,且执行费时,但是一旦用管道连接到 head 命令,只要输出指定的行数后,上游命令也结束。下面是一个例子:

grep xxx abc.txt | head

上面的例子中,假设 abc.txt 的内容较长,这样 grep 命令的执行时间比 head 命令长。那么背后发生了什么呢?当 head 命令结束时,管道的读端被关闭,这时 grep 命令再写入管道,内核立刻把 SIGPIPE 信号发给 grep 进程,而 SIGPIPE 默认的行为是终止进程。

Python 中,为了能自己处理管道错误,抛出 OSError 异常,Python 启动时把 SIGPIPE 的处置方式设为“忽略”,那么底层系统调用里 write(2) 就能返回 EPIPE 错误。但是这种做法存在一个问题,子进程会继承信号的处置方式,若信号的处置方式为“默认”或“忽略”,则 exec() 后仍保留。因此,在 Python 使用 subprocess 模块执行上面的命令,我们会见到 grep 命令输出管道错误的信息。

其中一种解决方法是:调用 subprocess 模块前,先把 SIGPIPE 的处置方式恢复为“默认”。但是,在这句代码与调用 subprocess 的 Python 代码之间,无法达到原来 Python 希望达到的效果,即写入管道错误时抛出异常。

有没有两全其美的方法?答案是“有”。方法是给 SIGPIPE 信号设置一个处理函数,且内容为空:

import signal

def sigpipe_handler(sig, frame):
    pass

signal.signal(signal.SIGPIPE, sigpipe_handler)

首先,当写入管道失败时,如果 SIGPIPE 处理函数能返回,则底层会返回 EPIPE 错误,而不是 EINTR 错误,因此 Python 层面仍然能抛出一样的 OSError 异常。其次,当子进程进行 exec(),那么有处理函数的信号的处置方式重置为“默认”,也达到我们想要的效果。

pkg_resources加载插件

当编写应用软件时,我们通常希望程序具有一定的扩展性,额外的功能——甚至所有非核心的功能,都能通过插件实现。特别是使用 Python 编写的程序,由于语言本身的动态特性,为我们的插件方案提供了很多种实现方式。例如,使用标准库 importlibimport_module()函数,它可以动态加载指定的 Python 模块。但这种实现方式有一些缺点:

  • 不容易区分“模块不存在”与“模块里 import 错误”产生的 ImportError 异常
  • 需要指定插件模块所属的 Package,对插件代码的组织限制较大
  • 不容易实现多个相同名字插件共存的情况,例如希望实现第三方插件覆盖内建插件的功能,因为通常通过约定的名字搜索

本文讨论的主角是与安装库 setuptools 一并安装的软件库 pkg_resources。它基本解决了上述的问题,并且事实上成为了流行的插件实现方式。

Distribution

要理解 pkg_resources 的运作机制,首先得搞清楚一些相关的概念。Distribution 主要指的是 egg 软件包,根据我了解,主要有两种方式得到 egg 软件包。第一种方式是安装,通过 pipsetup.py 安装,软件包就能安装到 Python 的搜索路径中 sys.path。安装的结果可能是一个 zip 压缩文件,也可能是一个目录树,根据软件包的内容以及 setup.py 中的 zip_safe参数而定。第二种方式是执行 setup.py的子命令 bdist_egg,直接得到一个 egg 软件包。

pkg_resources 操作的主要单位就是 Distribution。例如,Python 脚本启动时,pkg_resources识别出搜索路径中的所有 Distribution 的命名空间包,因此,我们会发现 sys.path 包含了很多 pip 安装的软件包的路径,并且可以正确执行 import 操作。

假设插件目录不属于默认的搜索路径列表,我们可以通过 WorkingSet 对象实例的 find_plugins() 方法找到指定目录的 Distribution。pkg_resources 自带一个全局的 WorkingSet 对象,代表默认的搜索路径的工作集,通常使用这个对象即可。下面是一个小例子:

import pkg_resources

env = Environment(["/path/to/plugin", "..."])
dists, _ = pkg_resources.find_plugins(env)
for dist in dists:
    pkg_resources.working_set.add(dist)

Entry Point

Entry Point 是 Distribution 元信息的一部分,它把软件包中的一些 Python 对象(如函数、类)记录下来,使得 pkg_resouces可以在运行时动态加载。python -m 等操作正是利用这个特性。每个 Entry Point 包含三部分:

  • 组,以点号分隔便于组织层次,但与 Package 没有关联,如 pass.login
  • 名字,如 auth
  • Distribution 中的位置,如 shadow:auth,前面是 Package 和 Module,后面是模块内的位置

自定义的 Entry Point 必须在 setup.py 中指定,它的值是一个 dict,Key 是组,Value 是一个 list,每个值是 name = location 形式的字符串。下面是一个小例子:

from setuptools import setup

setup(
    entry_points={
        "pass.login": [
            "auth = shadow:auth",
        ],
    },
)

解决方案

首先在插件规范中约定好组和名字,然后,我们编写程序时,根据配置文件、命令行参数,或者代码中的常量,获取需要使用的组和名字,然后使用 WorkingSet 对象的 iter_entry_points() 方法,枚举所有的 Entry Point,使用其 load() 方法,获取指向的 Python 对象。下面是一个小例子:

import pkg_resources

for entry_point in pkg_resources.iter_entry_points(group, name):
    func = entry_point.load()
    break
else:
    raise RuntimeError("Entry Point Not Found")

pkg_resources 方式实现的插件也有其缺点,插件提供者必须打一个 egg 软件包,再进行分发。这或多或少有些不便,但如果结合 git 的钩子,也许可以有一些自动化的方案,研究后再作分享和讨论。

闭包陷阱

闭包的常见特征是在函数中定义子函数,并且子函数访问父函数中的局部变量,有时子函数还可作为返回值返回。

Python 也支持闭包,同其他语言类似,闭包中外部变量的值是在运行时确定的。如果在闭包定义到执行期间,外部变量发生变化,会影响到闭包的执行结果。本人最近遇到这个问题,遍历序列的过程中,在每个循环体里定义一个子函数,它使用的外部变量实际上是用于接收序列中的值。下面是一个简化的例子:

def parent():
    funcs = []
    for d in xrange(4):
        def closure():
            print d
        funcs.append(closure)
    for func in funcs:
        func()

上面的例子里,本来期待 4 个闭包分别输出 0, 1, 2, 3,但实际输出 3, 3, 3, 3。原因就是闭包依赖的变量 d在执行时的值为 3

其中一种解决方法是,在子函数的定义里增加额外的输入参数,把依赖外部变量的代码改为依赖输入参数。利用标准库 functoolspartial方法,我们可以确定输入参数的值,并得到一个新的函数对象,从而解决上述问题。下面是一个小例子:

import functools

def parent():
    funcs = []
    def closure(j):
        print j
    for d in xrange(4):
        func = functools.partial(closure, d)
        funcs.append(func)
    for func in funcs:
        func()

实际上,第二个例子比起第一个例子还减少了函数对象的定义,整个过程中只定义了 1 次,可能在性能上有优势。对于其他动态语言,通常也有类似的机制,例如 JavaScript 的函数对象有 bind()方法。

线程返回值

在 POSIX 标准中,pthread_join(3) 可以接收另一线程 pthread_exit(3) 指定的返回值。然而在 Python 中,线程标准库 threadingThread 对象的 join() 方法,却不能接收线程函数的返回值。

本人猜测,部分原因由于线程本身没有像进程一样的父子关系,即任何线程都可以调用某个 Thread 对象的 join() 方法,且可以调用多次。相反,POSIX 只允许对线程调用 1 次 pthread_join(3) 方法。

问题是我们有时确实需要取得线程的返回值,例如使用线程在后台执行耗时较长的任务,怎么办?其中一种解决办法是使用全局变量,线程在执行的过程中更新全局变量的值,当调用 join() 方法确定线程结束后,我们可以认为全局变量是线程的执行结果。这种方法完全绕开了返回值的问题,但存在几个问题:

  • 有时函数是第三方实现的,不受管理
  • 强化了代码间的耦合,不利于代码的维护
  • 有多个线程时,需要给每个线程分配一个全局变量,通常使用全局的 listdict

本文讨论的主角是 Python 3.2 引入的 concurrent.futures 标准库,其中的 ThreadPoolExecutor 较好地解决了上述的问题。当我们对 Executor 提交函数任务后,将返回一个 Future 对象。我们可以先完成其他任务,等需要的时候再调用其 result() 方法异步获取结果。下面是一个小例子:

import concurrent.futures

executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
future = executor.submit(func)
# Do some other work
print(future.result())

除了线程池模型的 ThreadPoolExecutor 外,模块中还有按照进程池模型设计的 ProcessPoolExecutor,使用方式类似。模块中还有 wait() 函数,它可以同时等待多个 Future 对象的结果。