Doge log

Abby CTO 雑賀 力王のオフィシャルサイトです

Twistedのbloking対策

ぎゃー!
POEだとpreforkとか出来るのかあスゲーとか思いつつ寝ぼけてた!
Twistedな人には当たり前な話なんだけども。
Twistedは内部でThreadPoolを持ってるのでいくらでも並列処理を行う事ができる。
例えばTwistedはRDBMSをサポートしてるんだけど内部はこんな感じだったりする。

dbapi.py
    def runInteraction(self, interaction, *args, **kw):
        """Interact with the database and return the result.

        The 'interaction' is a callable object which will be executed
        in a thread using a pooled connection. It will be passed an
        L{Transaction} object as an argument (whose interface is
        identical to that of the database cursor for your DB-API
        module of choice), and its results will be returned as a
        Deferred. If running the method raises an exception, the
        transaction will be rolled back. If the method returns a
        value, the transaction will be committed.

        NOTE that the function you pass is *not* run in the main
        thread: you may have to worry about thread-safety in the
        function you pass to this if it tries to use non-local
        objects.

        @param interaction: a callable object whose first argument is
            L{adbapi.Transaction}. *args,**kw will be passed as
            additional arguments.

        @return: a Deferred which will fire the return value of
            'interaction(Transaction(...))', or a Failure.
        """

        return self._deferToThread(self._runInteraction,
                                   interaction, *args, **kw)

Databaseにアクセスしてクエリを投げて返すメソッド。
要は返ってくるまで待ちになる。でもならない(なんじゃそりゃ)
_deferToThreadって名前の時点で怪しい。
で_runInteractionメソッドは

    def _runInteraction(self, interaction, *args, **kw):
        conn = Connection(self)
        trans = Transaction(self, conn)
        try:
            result = interaction(trans, *args, **kw)
            trans.close()
            conn.commit()
            return result
        except:
            conn.rollback()
            raise

引き数でわたって来たメソッドに対し、トランザクションを渡して結果を返すだけ。
ブロックしそう。。。。
実際に処理を行ってる_deferToThreadは

    def _deferToThread(self, f, *args, **kwargs):
        """Internal function.

        Call f in one of the connection pool's threads.
        """

        d = defer.Deferred()
        self.threadpool.callInThread(threads._putResultInDeferred,
                                     d, f, args, kwargs)
        return d

わ!callInThread呼んでんじゃん!
threadpoolのthreadにお任せしてるわけです。
もっと簡単でいいならばreactorのcallInThreadとか使うといいです。

    reactor.callInThread(aSillyBlockingMethod, "2 seconds have passed")

なのでPOEとはちょっと違うけどTwistedも対策はあるんですよという話。
うくく。