Python 子进程与 SIGPIPE 信号

众所周知,head 命令用于过滤标准输入前面 10 行(或由用户指定数量)的内容。即便上游命令的输出很多,且执行费时,但是一旦用管道连接到 head 命令,只要输出指定的行数后,上游命令也结束。下面是一个例子:

grep xxx abc.txt | head

上面的例子中,假设 abc.txt 的内容较长,这样 grep 命令的执行时间比 head 命令长。那么背后发生了什么呢?当 head 命令结束时,管道的读端被关闭,这时 grep 命令再写入管道,内核立刻把 SIGPIPE 信号发给 grep 进程,而 SIGPIPE 默认的行为是终止进程。

Python 中,为了能自己处理管道错误,抛出 OSError 异常,Python 启动时把 SIGPIPE 的处置方式设为“忽略”,那么底层系统调用里 write(2) 就能返回 EPIPE 错误。但是这种做法存在一个问题,子进程会继承信号的处置方式,若信号的处置方式为“默认”或“忽略”,则 exec() 后仍保留。因此,在 Python 使用 subprocess 模块执行上面的命令,我们会见到 grep 命令输出管道错误的信息。

其中一种解决方法是:调用 subprocess 模块前,先把 SIGPIPE 的处置方式恢复为“默认”。但是,在这句代码与调用 subprocess 的 Python 代码之间,无法达到原来 Python 希望达到的效果,即写入管道错误时抛出异常。

有没有两全其美的方法?答案是“有”。方法是给 SIGPIPE 信号设置一个处理函数,且内容为空:

import signal

def sigpipe_handler(sig, frame):
    pass

signal.signal(signal.SIGPIPE, sigpipe_handler)

首先,当写入管道失败时,如果 SIGPIPE 处理函数能返回,则底层会返回 EPIPE 错误,而不是 EINTR 错误,因此 Python 层面仍然能抛出一样的 OSError 异常。其次,当子进程进行 exec(),那么有处理函数的信号的处置方式重置为“默认”,也达到我们想要的效果。

pkg_resources加载插件

当编写应用软件时,我们通常希望程序具有一定的扩展性,额外的功能——甚至所有非核心的功能,都能通过插件实现。特别是使用 Python 编写的程序,由于语言本身的动态特性,为我们的插件方案提供了很多种实现方式。例如,使用标准库 importlibimport_module()函数,它可以动态加载指定的 Python 模块。但这种实现方式有一些缺点:

  • 不容易区分“模块不存在”与“模块里 import 错误”产生的 ImportError 异常
  • 需要指定插件模块所属的 Package,对插件代码的组织限制较大
  • 不容易实现多个相同名字插件共存的情况,例如希望实现第三方插件覆盖内建插件的功能,因为通常通过约定的名字搜索

本文讨论的主角是与安装库 setuptools 一并安装的软件库 pkg_resources。它基本解决了上述的问题,并且事实上成为了流行的插件实现方式。

Distribution

要理解 pkg_resources 的运作机制,首先得搞清楚一些相关的概念。Distribution 主要指的是 egg 软件包,根据我了解,主要有两种方式得到 egg 软件包。第一种方式是安装,通过 pipsetup.py 安装,软件包就能安装到 Python 的搜索路径中 sys.path。安装的结果可能是一个 zip 压缩文件,也可能是一个目录树,根据软件包的内容以及 setup.py 中的 zip_safe参数而定。第二种方式是执行 setup.py的子命令 bdist_egg,直接得到一个 egg 软件包。

pkg_resources 操作的主要单位就是 Distribution。例如,Python 脚本启动时,pkg_resources识别出搜索路径中的所有 Distribution 的命名空间包,因此,我们会发现 sys.path 包含了很多 pip 安装的软件包的路径,并且可以正确执行 import 操作。

假设插件目录不属于默认的搜索路径列表,我们可以通过 WorkingSet 对象实例的 find_plugins() 方法找到指定目录的 Distribution。pkg_resources 自带一个全局的 WorkingSet 对象,代表默认的搜索路径的工作集,通常使用这个对象即可。下面是一个小例子:

import pkg_resources

env = Environment(["/path/to/plugin", "..."])
dists, _ = pkg_resources.find_plugins(env)
for dist in dists:
    pkg_resources.working_set.add(dist)

Entry Point

Entry Point 是 Distribution 元信息的一部分,它把软件包中的一些 Python 对象(如函数、类)记录下来,使得 pkg_resouces可以在运行时动态加载。python -m 等操作正是利用这个特性。每个 Entry Point 包含三部分:

  • 组,以点号分隔便于组织层次,但与 Package 没有关联,如 pass.login
  • 名字,如 auth
  • Distribution 中的位置,如 shadow:auth,前面是 Package 和 Module,后面是模块内的位置

自定义的 Entry Point 必须在 setup.py 中指定,它的值是一个 dict,Key 是组,Value 是一个 list,每个值是 name = location 形式的字符串。下面是一个小例子:

from setuptools import setup

setup(
    entry_points={
        "pass.login": [
            "auth = shadow:auth",
        ],
    },
)

解决方案

首先在插件规范中约定好组和名字,然后,我们编写程序时,根据配置文件、命令行参数,或者代码中的常量,获取需要使用的组和名字,然后使用 WorkingSet 对象的 iter_entry_points() 方法,枚举所有的 Entry Point,使用其 load() 方法,获取指向的 Python 对象。下面是一个小例子:

import pkg_resources

for entry_point in pkg_resources.iter_entry_points(group, name):
    func = entry_point.load()
    break
else:
    raise RuntimeError("Entry Point Not Found")

pkg_resources 方式实现的插件也有其缺点,插件提供者必须打一个 egg 软件包,再进行分发。这或多或少有些不便,但如果结合 git 的钩子,也许可以有一些自动化的方案,研究后再作分享和讨论。

闭包陷阱

闭包的常见特征是在函数中定义子函数,并且子函数访问父函数中的局部变量,有时子函数还可作为返回值返回。

Python 也支持闭包,同其他语言类似,闭包中外部变量的值是在运行时确定的。如果在闭包定义到执行期间,外部变量发生变化,会影响到闭包的执行结果。本人最近遇到这个问题,遍历序列的过程中,在每个循环体里定义一个子函数,它使用的外部变量实际上是用于接收序列中的值。下面是一个简化的例子:

def parent():
    funcs = []
    for d in xrange(4):
        def closure():
            print d
        funcs.append(closure)
    for func in funcs:
        func()

上面的例子里,本来期待 4 个闭包分别输出 0, 1, 2, 3,但实际输出 3, 3, 3, 3。原因就是闭包依赖的变量 d在执行时的值为 3

其中一种解决方法是,在子函数的定义里增加额外的输入参数,把依赖外部变量的代码改为依赖输入参数。利用标准库 functoolspartial方法,我们可以确定输入参数的值,并得到一个新的函数对象,从而解决上述问题。下面是一个小例子:

import functools

def parent():
    funcs = []
    def closure(j):
        print j
    for d in xrange(4):
        func = functools.partial(closure, d)
        funcs.append(func)
    for func in funcs:
        func()

实际上,第二个例子比起第一个例子还减少了函数对象的定义,整个过程中只定义了 1 次,可能在性能上有优势。对于其他动态语言,通常也有类似的机制,例如 JavaScript 的函数对象有 bind()方法。

线程返回值

在 POSIX 标准中,pthread_join(3) 可以接收另一线程 pthread_exit(3) 指定的返回值。然而在 Python 中,线程标准库 threadingThread 对象的 join() 方法,却不能接收线程函数的返回值。

本人猜测,部分原因由于线程本身没有像进程一样的父子关系,即任何线程都可以调用某个 Thread 对象的 join() 方法,且可以调用多次。相反,POSIX 只允许对线程调用 1 次 pthread_join(3) 方法。

问题是我们有时确实需要取得线程的返回值,例如使用线程在后台执行耗时较长的任务,怎么办?其中一种解决办法是使用全局变量,线程在执行的过程中更新全局变量的值,当调用 join() 方法确定线程结束后,我们可以认为全局变量是线程的执行结果。这种方法完全绕开了返回值的问题,但存在几个问题:

  • 有时函数是第三方实现的,不受管理
  • 强化了代码间的耦合,不利于代码的维护
  • 有多个线程时,需要给每个线程分配一个全局变量,通常使用全局的 listdict

本文讨论的主角是 Python 3.2 引入的 concurrent.futures 标准库,其中的 ThreadPoolExecutor 较好地解决了上述的问题。当我们对 Executor 提交函数任务后,将返回一个 Future 对象。我们可以先完成其他任务,等需要的时候再调用其 result() 方法异步获取结果。下面是一个小例子:

import concurrent.futures

executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
future = executor.submit(func)
# Do some other work
print(future.result())

除了线程池模型的 ThreadPoolExecutor 外,模块中还有按照进程池模型设计的 ProcessPoolExecutor,使用方式类似。模块中还有 wait() 函数,它可以同时等待多个 Future 对象的结果。

用线上数据库做开发

我在开发Web网站的时候,一般在本地调试好没问题后,再上传到服务器。本地调试时,数据从哪里来呢?通常我会在刚开始的时候生成一些伪数据作为测试用途,如果是已经上线的系统,则我可能会dump一部分数据(通常一个月)下来,然后测试。然而,每次都这么做则要耗费不少时间,而且还要占据本地的存储空间。另外,如果换台机器开发,这个过程可能还要重复一次。

直到最近一次阅读中,发现ssh有-L这个参数,使得我可以把远程数据库当作本地数据库使用。具体来说,-L参数可以把发往本地端口的数据透明地传送到指定目标的指定端口。-L port:host:hostport,其中port是本地监听的端口,host是目标主机(从ssh服务器发起连接),hostport是目标主机的端口。因此,-L 3306:localhost:3306,就能把发往本地3306端口的数据转发到ssh服务器的3306端口,从而在本地可以直接访问MySQL服务器。

MySQL connection

值得注意的是,如果这时直接连接服务器,例如mysql -u root,那么会提示错误,如上图所示。这是因为,mysql如果发现服务器是localhost时,会自动使用socket file,而不是建立TCP连接。解决方法是让mysql客户端强制使用TCP连接,即加--protocol=TCP;或者指定host为127.0.0.1,即-h 127.0.0.1。

除了文中的例子外,端口转发还能做很多事情,大家不妨发散思维。值得注意的是,直接利用线上服务器做开发有一定风险,只涉及查询数据的开发我才敢这么做,如果涉及到修改数据,那还是用测试数据库吧。

窗口过高的问题

一般情况下,如果某个应用程序的窗口过大,我们可以把鼠标放到窗口的四周,然后调整大小;或者右键单击标题栏,选择“改变大小”;或者直接把窗口最大化。但是,有某些应用程序会有最小高度(或最小长度),无论怎么改变窗口大小都不能比最小高度小,比如Android的模拟器。

Move Window

Gnome下可以通过“移动”窗口解决这个问题。首先,第一步可以通过右键单击标题栏,选择“移动”启用移动窗口,或者直接用快捷键Alt+F7也可以启用;第二步则用鼠标或键盘的方向键移动窗口的位置;最后单击或回车确定。上图中,是我把Android模拟器向上移动后的样子。

其他桌面环境应该也有类似的功能。遗憾的是,Windows XP的窗口移动后,如果超出屏幕范围,它又自动调整窗口的位置和大小,使之前的移动没有效果。

初学netcat

前几天,从网络中得知netcat这一被称为“网络瑞士军刀”的神器后,学习了一些基本的用法,现在总结一下。

使用netcat,我们可以把计算机A上的文件传到计算机B上。根据建立连接方式的不同,我们有两种方法。方法一,计算机B负责监听,A向B主动连接。这时在B上先输入“nc -l 8000 > file”,其中8000是监听的端口,然后在A上输入“nc IP 8000 < file”。方法二,计算机A负责监听,B向A主动连接。这时先在A上输入“nc -l 8080 < file”,然后在B上输入“nc IP 8080 > file”。

netcat从名字上是net+cat,但是它和cat有着本质上的不同。cat把标准输入的内容拷贝到标准输出,是单向的。然而,netcat是双向的,两台计算机的netcat建立连接后(不妨把两台计算机称为A和B),A从标准输入读到的东西,会经过网络,然后从B的标准输出出来,反之亦然。

netcat

我觉得这种I/O很像立交桥,它能同时处理两路I/O,然而,把netcat用于传文件显然只能用到其中一路I/O。如果能同时用上两路I/O,会发生什么有趣的事情呢?首先,如果两端的netcat的输入和输出都是接Terminal,那么两端的人可以用nc聊天,如上图所示。

如果把监听的一端(不妨称为计算机B)的输入输出重定向,那就更有趣。比如把B上nc的输出接到bash,然后把bash的输出又接回nc,那么B就是一个bash服务器,相当于开了个后门。具体实现可以使用命名管道和重定向,

mkfifo named_pipe; bash named_pipe | nc -l 8000 > named_pipe

同理,如果把程序换为cat,那么B就是一个echo服务器;如果把程序换为date,那么B就是一个date服务器。但和真正的服务器不一样,B只能接受一个连接,并且接受一次连接后就马上退出。