Doge log

Abby CTO 雑賀 力王のオフィシャルサイトです

multiprocessing入門

multiprocessingといっても今まであったos.forkをうまくラップしてくれてる
というだけなのでプロセスをforkしたらどうなるの?っていうのを知らないと
扱えないと思われる。

forkすると子プロセスには今までの状態がコピーされるがその後はメモリ空間が
別々になる。
(当たり前の話)
だが開いているfdとかファイルエントリテーブルなどは共有される。
どのタイミングでforkするかが個人的には重要。

例ではPIPEはいっさい使ってない。

from multiprocessing import current_process, cpu_count, Process, Manager, Queue
import time

class Parent(object):

    def __init__(self, name):
        self.name = name
        self.queue = Queue()
        self.data = Manager().dict()
        self.data['start'] = True
        self.processe = []
        self.parent_pid = current_process().pid
    
    def is_parent(self):
        return self.parent_pid == current_process().pid

    def _run(self, data, queue):
        i = 0
        while self.data['start']:

            if not queue.empty():
                pid = current_process().pid
                try:
                    res = queue.get(timeout=1)
                except:
                    return 
                print("pid %d %d %s " % (pid,i,res))
                i += 1

    def start(self):
        for i in xrange(cpu_count() ):
            p = Process(target=self._run, args=(self.data, self.queue,))
            self.processe.append(p)
            p.start()
     
    def set(self, val):
        self.queue.put(val, timeout=1)
    
    def stop(self):
        while not self.queue.empty():
            time.sleep(0.5)
        self.data['start'] = False
        for p in self.processe:
            p.join()
        

p = Parent('bob')
p.start()
for i in xrange(1000000):
    p.set(i)
time.sleep(1)
p.stop()

シンプルなケース。実行すると実行されてるpidを出力する。
プロセス間通信にはFIFO
暇なプロセスが取り出しにいくことで分散化。

ただFIFOはサイズに制限がある+ロックがあるのでこのように細かくやり取り
するケースではあまり向かない。
上記だと自プロセスの分があるのでcpu_count -1で子供を作る方がよい。

from multiprocessing import current_process, cpu_count, Process, Manager, Queue
import time

class Parent(object):

    def __init__(self, name):
        self.name = name
        self.queue = Queue()
        self.data = Manager().dict()
        self.data['start'] = True
        self.processe = []
        self.parent_pid = current_process().pid

    def f(self, data, queue):
        i = 0
        while self.data['start']:
            
            if not queue.empty():
                pid = current_process().pid
                try:
                    res = queue.get(timeout=1)
                except:
                    self.stop()

                print("pid %d %d %s is parent %s" % (pid,i,res,\
                        self.parent_pid == pid))
                i += 1
            else:
                self.stop()

    def start(self):
        for i in xrange(cpu_count() -1):
            p = Process(target=self.f, args=(self.data, self.queue,))
            self.processe.append(p)
            p.start()
        self.f(self.data, self.queue)
     
    def set(self, val):
        self.queue.put(val)
    
    def stop(self):
        if self.parent_pid == current_process().pid:
            while not self.queue.empty():
                time.sleep(0.5)
            self.data['start'] = False
            for p in self.processe:
                p.join()
        
p = Parent('bob')
for i in xrange(2000):
    p.set("bob")
p.start()

親を自走させながらFIFOに書くのがめんどいの数を少なくした。
(ここを大きくすると止まります)
FIFOのロックに関して言えば一応O_NONBLOCKに対応するメソッド
put_nowait、get_nowaitを持っている。
こいつらを使うと書き込めない、取り出せない場合に即時、Full、Empty
といったエラーが返る。
Manager経由で共有なオブジェクトを生成し、終了条件に使っている。
多分、共有なオブジェクトに対するアクセスが遅いので別の方法がいい。
(whileのループで毎回評価してるので多分遅いはず)


とりあえずサーバ系のサンプル。

シンプル版
from multiprocessing import current_process, Process

class Server(object):

    def __init__(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.bind(('localhost', 6000))
        sock.listen(50)
        self.sock = sock
        self.process = []

    def start(self):
        for i in xrange(1):
            p = Process(target=self.run)
            self.process.append(p)
            p.start()

        self.run()

    def run(self):
        while True:
            con, addr = self.sock.accept()
            data =  con.recv(8192)
            con.sendall(data)
            print('process %d' % current_process().pid)
            con.close()

Server().start()
マニア向け
from twisted.internet.protocol import Protocol, Factory
from twisted.internet import reactor
from multiprocessing import current_process, Process

class Echo(Protocol):

    def dataReceived(self, data):
        print(current_process().pid)
        self.echo(data)

    def echo(self, data):
        self.transport.write(data)
        self.transport.loseConnection()

class Server(object):

    def __init__(self):
        f = Factory()
        f.protocol = Echo
        reactor.listenTCP(6000, f)
        self.process = []

    def start(self):
        for i in xrange(1):
            p = Process(target=self.run)
            self.process.append(p)
            #p.start()
        self.run()

    def run(self):
        reactor.run()

Server().start()
特殊属性向け
from multiprocessing import current_process, Process
from eventlet import api
 
def handle_socket(client):
    print(current_process().pid)

    data = client.recv(8192)
    client.sendall(data)
    client.close()
 
class Server(object):
    
    def __init__(self):
        server = api.tcp_listener(('0.0.0.0', 6000))
        self.server = server
        self.process = []

    def start(self):
        for i in xrange(1):
            p = Process(target=self.run)
            self.process.append(p)
            #p.start()

        self.run()

    def run(self):
        while True:
            new_sock, address = self.server.accept()
            api.spawn(handle_socket, new_sock)

Server().start()

acceptでプロセスがひとつだけ起きてくれる前提のコードなので注意。
acceptのロックの調整のせいか、multiprocessにしてもそこまで性能あがらないっぽい。
(性能はそれなりにあがる。処理がシンプルすぎるせい)
既存のモジュールなどと併用する際には、内部のどこで何をやっているか具体的に
わからないとどのタイミングでforkするといいのかわからないので注意。
(当たり前。上記のどの例もlistenとaccept間にforkし、ちょんぼをしている)