Python: popenでnonblocking IOをしなくちゃいけなくなった件

subprocessでよくコマンドと対話することがある。Pythonのプログラムから外部プログラムの標準入力に書き込み、返答を標準出力から読み込む。いつものようにそういうことしていたら今日はデッドロックになった。


$ cat x.files | python cpio_wrapper.py naive | cpio -t
# 何時迄まって待てど終らず…
^CTraceback (most recent call last):
File "cpio_wrapper.py", line 100, in
f()
File "cpio_wrapper.py", line 56, in naive
pip.stdin.write(line)
KeyboardInterrupt

コマンドの標準入力に書き込むところで刺さっている。入力を減らしてやると上手くいく。


$ cat x.files | head | python cpio_wrapper.py naive | cpio -t
.....
2745 blocks

デッドロックするバージョンはこれ。問題を分かり易くするためにcpioのラッパーという単純なものに置き換えている。


def naive():
""" cpio -o をラップするパイプコマンド。 安易バージョン。 """

os.chdir('/')
cmd="/bin/cpio -o"

pip=Popen(cmd.split(' '), stdin=PIPE, stdout=PIPE)

# cpioの標準入力にパスを一気に送り込む
for line in sys.stdin.readlines():
# ここで永遠ブロック
pip.stdin.write(line)
pip.stdin.close()

# 標準出力からcpioの出力を読み出す
while True:
buf=pip.stdout.read(4096)
if buf=='':
break
sys.stdout.write(buf)
pip.stdout.close()

status=pip.wait()

問題は入力を全部終えてから出力を読むという流れ。データ量が少ないときはこれで大丈夫だが、大くなると問題が起きる。入力のパスのストリームが`find /`のように大量だと、出力はさらに大量になる。cpioがこの出力を一気に溜め込むとメモリを使い切ってしまう恐れがある。なので、あるサイズの出力バッファがいっぱいになったら、次のプログラムが読んでくれるまで入力を読まなくなる。
便秘の人が食事を取らなくなるのと同じ状況だ(本当かウソか幸い知らないが)。なので、出すものを出してあげなら入れれば詰らずスムースに流れる。でこれがとりあえず動くバージョン。


def works():
""" cpio -o をラップするパイプコマンド
出力側をnonblockingにして、とりあえず、動くバージョン。
"""

os.chdir('/')
cmd="/bin/cpio -o"

pip=Popen(cmd.split(' '), stdin=PIPE, stdout=PIPE)

# 出力をnonblockingに
set_nonblocking(pip.stdout)

for line in sys.stdin.readlines():

# cpioの出力バッファがいっぱいにならないように抜きながら書き込む。
pip.stdin.write(line)
nonblocking_pass(pip.stdout, sys.stdout)
pip.stdin.close()

# 残った出力を処理。こんどはblockingで全部読む。
nonblocking_pass(pip.stdout, sys.stdout, True)
pip.stdout.close()

status=pip.wait()

果してこれが正解なのかちょっとわからない。Perlのときはどうしていなかな… こんな面倒なことしないと、パイプを扱えなかったっけ? もっと簡単な方法あったら教えてください。

こんなことするんだったら、いっそうのこと、全部nonblockingにしてselectで読み書きやった方がいいような気がする。pythonのgeneratorを上手く使ったら使い易いapiができるかもしれない。
以下、テストプログラム。


#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys,os
from subprocess import Popen, PIPE

def set_nonblocking(fh):
""" ファイルハンドルをnonblockingにする """

import fcntl

fd = fh.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

def nonblocking_pass(infh, outfh, blocking=False):
""" nonblockingな入力ファイルハンドルinfhからデータを読み込んでそれをそのまま出力ファイルハンドルに渡す。
catのようなパイプ。blocking==Trueだと普通のblockingのような振舞に。
"""

while True:
try:
buf=infh.read(4096)
except IOError, e:
if e.args[0]==11: # Resource temporarily unavailable
if blocking:
# データが無いと再びトライ。selectでデータが来るのを待った方が無難かも。
continue
else:
# nonblockingモード。
# 今はデータが無いので、他のIOができるようにループから出るようにする。
buf=''
if buf=='':
# データが無いので、コントロールをcallerに返す。
break
outfh.write(buf)

def naive():
""" cpio -o をラップするパイプコマンド。 単純バージョン。
下のコマンドとの通信量が少ないと、こんな感じで動く場合が多い。
"""

# フルパスを読み込む
os.chdir('/')
cmd="/bin/cpio -o"

pip=Popen(cmd.split(' '), stdin=PIPE, stdout=PIPE)

for line in sys.stdin.readlines():
# 進行状況を表示する
sys.stderr.write('xx: '+line)
sys.stderr.flush()
# サブプロセス(cpio -o)にインプットを押し込む
# しかし、ここでブロックしてしまう…
# cpioの出力バッファがいっぱいになってしまって、出力するのを待っているのだろう。
# ところが、一気に入力をすませてから下でcpioの出力を読もうとしているので、デッドロックになってしまう。
pip.stdin.write(line)
pip.stdin.close()

# 標準出力からcpioの出力を読み出す
while True:
buf=pip.stdout.read(4096)
if buf=='':
break
sys.stdout.write(buf)
pip.stdout.close()

status=pip.wait()

def works():
""" cpio -o をラップするパイプコマンド
出力側をnonblockingにして、とりあえず、動くバージョン。
"""

os.chdir('/')
cmd="/bin/cpio -o"

pip=Popen(cmd.split(' '), stdin=PIPE, stdout=PIPE)

# 出力をnonblockingに
set_nonblocking(pip.stdout)

for line in sys.stdin.readlines():

# cpioの出力バッファがいっぱいにならないように抜きながら書き込む。
pip.stdin.write(line)
nonblocking_pass(pip.stdout, sys.stdout)
pip.stdin.close()

# 残った出力を処理
nonblocking_pass(pip.stdout, sys.stdout, True)
pip.stdout.close()

status=pip.wait()

if __name__=='__main__':

cmd=sys.argv[1]
assert cmd in ['naive', 'works']
f=eval(cmd)
f()