Doge log

Abby CTO 雑賀 力王のオフィシャルサイトです

Jegaをリリースした話

先日、やっとリリースしました。

tobikko時代から考えると数年がかりいじってた気がします。

Jegaについて

concurrent networking and cooperative multitasking library for Python3. わかりやすく言うとgeventの後継ライブラリになります。 Python3.x系しかサポートしていません。

URL:

https://github.com/mopemope/jega

特徴

Jegaはgevent、evergreen, PEP3156を参考に作られています。

機能的なところはほぼevergreenと同様です。 主な特徴は以下です。

  • picoevベースの高速なイベントループ
  • greenletベースの協調スレッド
  • c-aresよるDNS lookupの非同期化
  • 非同期処理はFutureにより管理(専用のconcurrent.futuresの実装)
  • 協調スレッド間でコミュニケーションを取るためのchannel
  • socketなどを非同期処理に置き換えるpatch
  • Cによる高速化

最近の流れとしては非同期処理から結果を取り出す際にFutureを使うのが主流になっています。

またevergreen, PEP3156などではイベントループを意識し、イベントループを明示的に走らせないといけ ないのですが、Jegaは限定的ですが裏側でイベントループが自動的に走るようになっています。 Futureからの結果の取得、patch部コードは裏で自動でイベントループが走り処理をしてくれます。 loop.until_run_completeとかいう長いメソッドを呼ばなくもよいので楽です。

またevergreenもそうなんですが、最近はgoを意識してるのかchannelをサポートするという流 れになっています。なので合わせてchannelも実装しています。

本来はPEP3156に合わせたかったのですが、あの仕様だと問題があって

  • 高速化が難しい
  • loopを意識するのが面倒 というのがあって採用していません。

loopのメソッドの一部はPEP3156に合わせてはいますが。

例: task実行

import jega
import functools
size = 1024

def _call(a):
    return a
futures = []
for i in range(size):
    f = jega.spawn(_call, i)
    futures.append(f)
r = functools.reduce(lambda x, y: x + y.result(), futures, 0)
r2 = functools.reduce(lambda x, y: x + y, range(size), 0)
assert(r == r2)
del futures
print("OK")
jega.get_event_loop().stop()

シンプルなtaskなどはお決まりのパターンでspawnで起こします。 返ってくるのがFutureなのでresultメソッドで値を取得します。

例: channel

import jega

loop = jega.get_event_loop()
c = loop.make_channel()

def _send(chan):
    print("** start send")
    a = 1
    chan.send(a)
    print("** sended:%s" % a)
    return a


def _recv(chan):
    print("** start receive")
    r = chan.recv()
    # r = 1
    print("** received:%s" % r)
    return r

f1 = jega.spawn(_send, c)
f2 = jega.spawn(_recv, c)

assert(f1.result() == f2.result())

channelもシンプルにsend, recvでやり取りします。

その他にサーバーなどを書く際のヘルパーがあります。

例: echo server

import jega
from jega import patch
patch.patch_socket()
import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_DEFER_ACCEPT, 1)
s.bind(("", 5000))
s.listen(1024)
loop = jega.get_event_loop()

def echo(sock, addr):
    try:
        recv = sock.recv
        send = sock.send
        while 1:
            buf = recv(4096)
            if not buf:
                return
            send(buf)
    finally:
        sock.close()

loop.set_socket_acceptor(s, echo)
loop.run()

set_socket_acceptorにlistenしてるsocketとacceptした結果を受け取るハンドラーをセットします。 ハンドラーはaccept毎に独立したgreenlet上で実行されます。

高速化

基本的に頻繁に使われるであろう箇所はCで書かれています。

  • イベントループ
  • 協調スレッドのExecutor
  • 協調スレッド間で使用するlock、channelなどの制御モジュール

Pythonはチマチマバイトコードを実行するので関数内のコード量が多いとそれなりの時間がかかり ます。

Cで書かれている箇所はFrameオブジェクトを生成しないのでそれだけでも高速化につながります。 また、関数(メソッド)や属性値も展開済みの形で実装されているので高速に動作します。

特に協調スレッドのスケジューリング部はevergreenに比べてもかなり高速に動作します。

デバッグ

Jegaはビルド時に環境変数を参照し、DEBUGマクロを有効化します。

export JEGA_DEVELOP=1

とセットしてからビルド、インストールすると実行時にDEBUGログが出ます。

例:DEBUGログ

src/loop.c                   init_loop_module                    2284: switch_value:0x7f330ee42040
src/loop.c                   init_loop_module                    2285: loop_switch_value:0x7f330f2d75a0
src/loop.c                   LoopObject_init                     1113: self:0x7f330e0fcea0
src/loop.c                   init_main_loop                       193: self:0x7f330e0fcea0
src/loop.c                   LoopObject_init                     1136: self:0x7f330e0fcea0 pendings:0x7f3312468628
src/loop.c                   LoopObject_add_reader               1629: self:0x7f330e0fcea0
src/loop.c                   LoopObject_add_reader               1631: args size 2
src/loop.c                   make_callback_arg                   1208: alloc callback_arg_t:0x7f330f28ecc0
src/loop.c                   call_fork_loop                       235: self:0x7f330e0fcea0
src/loop.c                   register_fd_callback                1344: add event loop:0x16f9a00 fd:5 event:1 timeout:0
src/executor.c               ExecutorObject_init                  126: self:0x7f330e0f7308

DEBUGしない場合には0をセットして下さい。

おまけ greenletの問題

Jegaを作る前にtobikkoというgeventコンパチなPython3系で動作するライブラリを作成していました。

tobikkoはアグレッシブにsocket moduleの再実装を行なっていたのですが、そこで問題が発生しました。 greenletはC-APIも提供しておりCからも使用することができます。

greenletはCスタックのコピー/リストアを行うことによって協調スレッドを実現しています。 問題はCスタックで、通常の値ならば問題ないんですが、積まれてる値がポインタのポインタである場 合、スタックをコピーした時と、リストアした時で実際のアドレスに入っているものがズレてしまいます。 特にCPythonはメモリープールを持っているのでアドレスに別のオブジェクトが入っていたりすることが 多々あります。

例:

static ssize_t
internal_recvfrom(SocketObject *self, char *buf, int bufsize, int flags, PyObject** addrobj)
{
    struct sockaddr addr;
    ssize_t r = -1;
    socklen_t addrlen;

    *addrobj = NULL;

    if(getsockaddrlen(self->family, &addrlen) == -1){ 
        return -1;
    }    

    memset(&addr, 0, addrlen);
    DEBUG("addrsize fd %d: addrlen:%d", self->fd, addrlen);

    while(1){
        Py_BEGIN_ALLOW_THREADS
        r = recvfrom(self->fd, buf, bufsize, flags, &addr, &addrlen);
        Py_END_ALLOW_THREADS

        if(r < 0){
            if(errno == EAGAIN || errno == EWOULDBLOCK) {
                DEBUG("errno:%d", errno);
                //ここでgreenletにスイッチ
                if(io_trampoline(self->fd, PICOEV_READ, self->timeout, socket_timeout) < 0){
                    return -1;
                }
                //戻ってくると&addrがズレる
            }else{
                PyErr_SetFromErrno(socket_error);
                return -1;
            }    
        }else{
            DEBUG("read %d: addrlen:%d", (int)r, (int)addrlen);
            if(!(*addrobj = getaddrtuple(&addr, addrlen))){
                return -1;
            }
            //不正な処理になる

            return r;
        }