Jegaをリリースした話
先日、やっとリリースしました。
tobikko時代から考えると数年がかりいじってた気がします。
Jegaについて
concurrent networking and cooperative multitasking library for Python3. わかりやすく言うとgeventの後継ライブラリになります。 Python3.x系しかサポートしていません。
URL:
https://github.com/mopemope/jega
特徴
Jegaはgevent、evergreen, PEP3156を参考に作られています。
機能的なところはほぼevergreenと同様です。 主な特徴は以下です。
- picoevベースの高速なイベントループ
- greenletベースの協調スレッド
- c-aresよるDNS lookupの非同期化
- 非同期処理はFutureにより管理(専用のconcurrent.futuresの実装)
- 協調スレッド間でコミュニケーションを取るためのchannel
- socketなどを非同期処理に置き換えるpatch
- Cによる高速化
最近の流れとしては非同期処理から結果を取り出す際にFutureを使うのが主流になっています。
またevergreen, PEP3156などではイベントループを意識し、イベントループを明示的に走らせないといけ ないのですが、Jegaは限定的ですが裏側でイベントループが自動的に走るようになっています。 Futureからの結果の取得、patch部コードは裏で自動でイベントループが走り処理をしてくれます。 loop.until_run_completeとかいう長いメソッドを呼ばなくもよいので楽です。
またevergreenもそうなんですが、最近はgoを意識してるのかchannelをサポートするという流 れになっています。なので合わせてchannelも実装しています。
本来はPEP3156に合わせたかったのですが、あの仕様だと問題があって
- 高速化が難しい
- loopを意識するのが面倒 というのがあって採用していません。
loopのメソッドの一部はPEP3156に合わせてはいますが。
例: task実行
import jega import functools size = 1024 def _call(a): return a futures = [] for i in range(size): f = jega.spawn(_call, i) futures.append(f) r = functools.reduce(lambda x, y: x + y.result(), futures, 0) r2 = functools.reduce(lambda x, y: x + y, range(size), 0) assert(r == r2) del futures print("OK") jega.get_event_loop().stop()
シンプルなtaskなどはお決まりのパターンでspawnで起こします。 返ってくるのがFutureなのでresultメソッドで値を取得します。
例: channel
import jega loop = jega.get_event_loop() c = loop.make_channel() def _send(chan): print("** start send") a = 1 chan.send(a) print("** sended:%s" % a) return a def _recv(chan): print("** start receive") r = chan.recv() # r = 1 print("** received:%s" % r) return r f1 = jega.spawn(_send, c) f2 = jega.spawn(_recv, c) assert(f1.result() == f2.result())
channelもシンプルにsend, recvでやり取りします。
その他にサーバーなどを書く際のヘルパーがあります。
例: echo server
import jega from jega import patch patch.patch_socket() import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) s.setsockopt(socket.IPPROTO_TCP, socket.TCP_DEFER_ACCEPT, 1) s.bind(("", 5000)) s.listen(1024) loop = jega.get_event_loop() def echo(sock, addr): try: recv = sock.recv send = sock.send while 1: buf = recv(4096) if not buf: return send(buf) finally: sock.close() loop.set_socket_acceptor(s, echo) loop.run()
set_socket_acceptorにlistenしてるsocketとacceptした結果を受け取るハンドラーをセットします。 ハンドラーはaccept毎に独立したgreenlet上で実行されます。
高速化
基本的に頻繁に使われるであろう箇所はCで書かれています。
- イベントループ
- 協調スレッドのExecutor
- 協調スレッド間で使用するlock、channelなどの制御モジュール
Pythonはチマチマバイトコードを実行するので関数内のコード量が多いとそれなりの時間がかかり ます。
Cで書かれている箇所はFrameオブジェクトを生成しないのでそれだけでも高速化につながります。 また、関数(メソッド)や属性値も展開済みの形で実装されているので高速に動作します。
特に協調スレッドのスケジューリング部はevergreenに比べてもかなり高速に動作します。
デバッグ
Jegaはビルド時に環境変数を参照し、DEBUGマクロを有効化します。
export JEGA_DEVELOP=1
とセットしてからビルド、インストールすると実行時にDEBUGログが出ます。
例:DEBUGログ
src/loop.c init_loop_module 2284: switch_value:0x7f330ee42040 src/loop.c init_loop_module 2285: loop_switch_value:0x7f330f2d75a0 src/loop.c LoopObject_init 1113: self:0x7f330e0fcea0 src/loop.c init_main_loop 193: self:0x7f330e0fcea0 src/loop.c LoopObject_init 1136: self:0x7f330e0fcea0 pendings:0x7f3312468628 src/loop.c LoopObject_add_reader 1629: self:0x7f330e0fcea0 src/loop.c LoopObject_add_reader 1631: args size 2 src/loop.c make_callback_arg 1208: alloc callback_arg_t:0x7f330f28ecc0 src/loop.c call_fork_loop 235: self:0x7f330e0fcea0 src/loop.c register_fd_callback 1344: add event loop:0x16f9a00 fd:5 event:1 timeout:0 src/executor.c ExecutorObject_init 126: self:0x7f330e0f7308
DEBUGしない場合には0をセットして下さい。
おまけ greenletの問題
Jegaを作る前にtobikkoというgeventコンパチなPython3系で動作するライブラリを作成していました。
tobikkoはアグレッシブにsocket moduleの再実装を行なっていたのですが、そこで問題が発生しました。 greenletはC-APIも提供しておりCからも使用することができます。
greenletはCスタックのコピー/リストアを行うことによって協調スレッドを実現しています。 問題はCスタックで、通常の値ならば問題ないんですが、積まれてる値がポインタのポインタである場 合、スタックをコピーした時と、リストアした時で実際のアドレスに入っているものがズレてしまいます。 特にCPythonはメモリープールを持っているのでアドレスに別のオブジェクトが入っていたりすることが 多々あります。
例:
static ssize_t
internal_recvfrom(SocketObject *self, char *buf, int bufsize, int flags, PyObject** addrobj)
{
struct sockaddr addr;
ssize_t r = -1;
socklen_t addrlen;
*addrobj = NULL;
if(getsockaddrlen(self->family, &addrlen) == -1){
return -1;
}
memset(&addr, 0, addrlen);
DEBUG("addrsize fd %d: addrlen:%d", self->fd, addrlen);
while(1){
Py_BEGIN_ALLOW_THREADS
r = recvfrom(self->fd, buf, bufsize, flags, &addr, &addrlen);
Py_END_ALLOW_THREADS
if(r < 0){
if(errno == EAGAIN || errno == EWOULDBLOCK) {
DEBUG("errno:%d", errno);
//ここでgreenletにスイッチ
if(io_trampoline(self->fd, PICOEV_READ, self->timeout, socket_timeout) < 0){
return -1;
}
//戻ってくると&addrがズレる
}else{
PyErr_SetFromErrno(socket_error);
return -1;
}
}else{
DEBUG("read %d: addrlen:%d", (int)r, (int)addrlen);
if(!(*addrobj = getaddrtuple(&addr, addrlen))){
return -1;
}
//不正な処理になる
return r;
}