Doge log

Abby CTO 雑賀 力王のオフィシャルサイトです

Jegaをリリースした話

先日、やっとリリースしました。

tobikko時代から考えると数年がかりいじってた気がします。

Jegaについて

concurrent networking and cooperative multitasking library for Python3. わかりやすく言うとgeventの後継ライブラリになります。 Python3.x系しかサポートしていません。

URL:

https://github.com/mopemope/jega

特徴

Jegaはgevent、evergreen, PEP3156を参考に作られています。

機能的なところはほぼevergreenと同様です。 主な特徴は以下です。

  • picoevベースの高速なイベントループ
  • greenletベースの協調スレッド
  • c-aresよるDNS lookupの非同期化
  • 非同期処理はFutureにより管理(専用のconcurrent.futuresの実装)
  • 協調スレッド間でコミュニケーションを取るためのchannel
  • socketなどを非同期処理に置き換えるpatch
  • Cによる高速化

最近の流れとしては非同期処理から結果を取り出す際にFutureを使うのが主流になっています。

またevergreen, PEP3156などではイベントループを意識し、イベントループを明示的に走らせないといけ ないのですが、Jegaは限定的ですが裏側でイベントループが自動的に走るようになっています。 Futureからの結果の取得、patch部コードは裏で自動でイベントループが走り処理をしてくれます。 loop.until_run_completeとかいう長いメソッドを呼ばなくもよいので楽です。

またevergreenもそうなんですが、最近はgoを意識してるのかchannelをサポートするという流 れになっています。なので合わせてchannelも実装しています。

本来はPEP3156に合わせたかったのですが、あの仕様だと問題があって

  • 高速化が難しい
  • loopを意識するのが面倒 というのがあって採用していません。

loopのメソッドの一部はPEP3156に合わせてはいますが。

例: task実行

import jega
import functools
size = 1024

def _call(a):
    return a
futures = []
for i in range(size):
    f = jega.spawn(_call, i)
    futures.append(f)
r = functools.reduce(lambda x, y: x + y.result(), futures, 0)
r2 = functools.reduce(lambda x, y: x + y, range(size), 0)
assert(r == r2)
del futures
print("OK")
jega.get_event_loop().stop()

シンプルなtaskなどはお決まりのパターンでspawnで起こします。 返ってくるのがFutureなのでresultメソッドで値を取得します。

例: channel

import jega

loop = jega.get_event_loop()
c = loop.make_channel()

def _send(chan):
    print("** start send")
    a = 1
    chan.send(a)
    print("** sended:%s" % a)
    return a


def _recv(chan):
    print("** start receive")
    r = chan.recv()
    # r = 1
    print("** received:%s" % r)
    return r

f1 = jega.spawn(_send, c)
f2 = jega.spawn(_recv, c)

assert(f1.result() == f2.result())

channelもシンプルにsend, recvでやり取りします。

その他にサーバーなどを書く際のヘルパーがあります。

例: echo server

import jega
from jega import patch
patch.patch_socket()
import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_DEFER_ACCEPT, 1)
s.bind(("", 5000))
s.listen(1024)
loop = jega.get_event_loop()

def echo(sock, addr):
    try:
        recv = sock.recv
        send = sock.send
        while 1:
            buf = recv(4096)
            if not buf:
                return
            send(buf)
    finally:
        sock.close()

loop.set_socket_acceptor(s, echo)
loop.run()

set_socket_acceptorにlistenしてるsocketとacceptした結果を受け取るハンドラーをセットします。 ハンドラーはaccept毎に独立したgreenlet上で実行されます。

高速化

基本的に頻繁に使われるであろう箇所はCで書かれています。

  • イベントループ
  • 協調スレッドのExecutor
  • 協調スレッド間で使用するlock、channelなどの制御モジュール

Pythonはチマチマバイトコードを実行するので関数内のコード量が多いとそれなりの時間がかかり ます。

Cで書かれている箇所はFrameオブジェクトを生成しないのでそれだけでも高速化につながります。 また、関数(メソッド)や属性値も展開済みの形で実装されているので高速に動作します。

特に協調スレッドのスケジューリング部はevergreenに比べてもかなり高速に動作します。

デバッグ

Jegaはビルド時に環境変数を参照し、DEBUGマクロを有効化します。

export JEGA_DEVELOP=1

とセットしてからビルド、インストールすると実行時にDEBUGログが出ます。

例:DEBUGログ

src/loop.c                   init_loop_module                    2284: switch_value:0x7f330ee42040
src/loop.c                   init_loop_module                    2285: loop_switch_value:0x7f330f2d75a0
src/loop.c                   LoopObject_init                     1113: self:0x7f330e0fcea0
src/loop.c                   init_main_loop                       193: self:0x7f330e0fcea0
src/loop.c                   LoopObject_init                     1136: self:0x7f330e0fcea0 pendings:0x7f3312468628
src/loop.c                   LoopObject_add_reader               1629: self:0x7f330e0fcea0
src/loop.c                   LoopObject_add_reader               1631: args size 2
src/loop.c                   make_callback_arg                   1208: alloc callback_arg_t:0x7f330f28ecc0
src/loop.c                   call_fork_loop                       235: self:0x7f330e0fcea0
src/loop.c                   register_fd_callback                1344: add event loop:0x16f9a00 fd:5 event:1 timeout:0
src/executor.c               ExecutorObject_init                  126: self:0x7f330e0f7308

DEBUGしない場合には0をセットして下さい。

おまけ greenletの問題

Jegaを作る前にtobikkoというgeventコンパチなPython3系で動作するライブラリを作成していました。

tobikkoはアグレッシブにsocket moduleの再実装を行なっていたのですが、そこで問題が発生しました。 greenletはC-APIも提供しておりCからも使用することができます。

greenletはCスタックのコピー/リストアを行うことによって協調スレッドを実現しています。 問題はCスタックで、通常の値ならば問題ないんですが、積まれてる値がポインタのポインタである場 合、スタックをコピーした時と、リストアした時で実際のアドレスに入っているものがズレてしまいます。 特にCPythonはメモリープールを持っているのでアドレスに別のオブジェクトが入っていたりすることが 多々あります。

例:

static ssize_t
internal_recvfrom(SocketObject *self, char *buf, int bufsize, int flags, PyObject** addrobj)
{
    struct sockaddr addr;
    ssize_t r = -1;
    socklen_t addrlen;

    *addrobj = NULL;

    if(getsockaddrlen(self->family, &addrlen) == -1){ 
        return -1;
    }    

    memset(&addr, 0, addrlen);
    DEBUG("addrsize fd %d: addrlen:%d", self->fd, addrlen);

    while(1){
        Py_BEGIN_ALLOW_THREADS
        r = recvfrom(self->fd, buf, bufsize, flags, &addr, &addrlen);
        Py_END_ALLOW_THREADS

        if(r < 0){
            if(errno == EAGAIN || errno == EWOULDBLOCK) {
                DEBUG("errno:%d", errno);
                //ここでgreenletにスイッチ
                if(io_trampoline(self->fd, PICOEV_READ, self->timeout, socket_timeout) < 0){
                    return -1;
                }
                //戻ってくると&addrがズレる
            }else{
                PyErr_SetFromErrno(socket_error);
                return -1;
            }    
        }else{
            DEBUG("read %d: addrlen:%d", (int)r, (int)addrlen);
            if(!(*addrobj = getaddrtuple(&addr, addrlen))){
                return -1;
            }
            //不正な処理になる

            return r;
        }    

Common LispにThreaded Macroを実装する

PythonからいいところをひとつもってこれたのでClojureからももってくる。
Clojureでは数珠つなぎで処理する場所はわかりやすくThreaded Macroを使って書くことが多い。
これはわかりやすいのでCommon Lispでも実装しておく。

(defmacro -> (x &optional form &rest more)
  (cond ((not (null more))
         `(-> (-> ,x ,form) ,@more))
        ((not (null form))
         (if (listp form)
             `(,(first form) ,x ,@(rest form))
           (list form x)))
        (t x)))

(defmacro ->> (x form &rest more)
  (cond ((not (null more))
         `(->> (->> ,x ,form) ,@more))
        (t (if (listp form)
               `(,(first form) ,@(rest form) ,x)
             (list form x)))))

適当に例をあげる

(-> 25 sqrt (* 2) (- 3))

結果は

7.0

Clackなどのmiddlewareを書く際にもThreaded Macroの方がわかりやすいんじゃないかなと思う。
(ringのMiddlewareはThreaded Macroを使って書く事が多い)

これでまたひとつClojureにあるライブラリを移植しやすくなった。

Common LispでPythonのgeneratorを実装する

結局Lisp書かないとダメだということで本格的にPythonから移行しようと思ってます。
まず、今使ってるツールなどをLispに置き換えようと思ったらまあgeneratorが無くてめんどいことに。
というわけでgeneratorを実装しておく。
この手はみんな実装してるので珍しくもなんともない。
定番のcl-contを使う。

(require 'cl-cont)

(defun mkstr (&rest args)
  (with-output-to-string (s)
    (dolist (a args) (princ a s))))

(defun symb (&rest args)
  (values (intern (apply #'mkstr args))))

(defun flatten (x)
  (labels ((rec (x acc)
             (cond ((null x) acc)
                   ((atom x) (cons x acc))
                   (t (rec
                       (car x)
                       (rec (cdr x) acc))))))
    (rec x nil)))

(defun g!-symbol-p (s)
  (if (symbolp s)
      (let ((str (symbol-name s)))
        (string= str "#" :start1 (1- (length str))))))

(defun o!-symbol-p (s)
  (if (symbolp s)
      (let ((str (symbol-name s)))
        (string= str "%" :start1 (1- (length str))))))

(defun o!-symbol-to-g!-symbol (s)
  (let ((str (symbol-name s)))
    (symb (subseq str 0 (1- (length str)))
          "#")))

(defmacro defmacro/g! (name args &body body)
  (let ((symbs (remove-duplicates
                (remove-if-not #'g!-symbol-p
                               (flatten body)))))
    `(defmacro ,name ,args
       (let ,(mapcar
              (lambda (s)
                `(,s (gensym ,(subseq
                               (symbol-name s)
                               2))))
              symbs)
         ,@body))))

(defmacro defmacro* (name args &body body)
  (let* ((os (remove-if-not #'o!-symbol-p args))
         (gs (mapcar #'o!-symbol-to-g!-symbol os)))
    `(defmacro/g! ,name ,args
       `(let ,(mapcar #'list (list ,@gs) (list ,@os))
          ,(progn ,@body)))))

(defmacro* make-generator (&body body)
  `(let (,cont#)
     (cl-cont:with-call/cc
       (labels ((,(intern "YIELD") (&rest values)
                  (cl-cont:let/cc k
                    (setf ,cont# k)
                    (apply #'values values)
                    )))
         (,(intern "YIELD") (lambda () (cl-cont:call/cc ,cont#)))
         ,@body
         (loop (,(intern "YIELD") :done))))))

(defmacro* defgenerator (name args &body body)
  `(defun ,name ,args
     (make-generator ,@body)))

(declaim (inline next))
(defun next (g)
  (funcall g))

(defmacro* nif (expr pos zero neg)
  `(let ((,g# ,expr))
       (cond ((plusp ,g#) ,pos)
             ((zerop ,g#) ,zero)
             (t ,neg))))

(defmacro* alambda (args &body body)
  `(labels ((self ,args ,@body))
     #'self))

まあよく使うutilとかもまるっと書いてあるけど。 make-generatorだけみればOK。
generatorが作れたのでPythonのitertoolsも多分実装できます。
itertools.countはこんな感じ。

(defgenerator counter (n)
  (funcall (alambda (n)
             (yield n)
             (self (incf n))) n))


(setf foo (counter 10))
(loop repeat 50 collect (next foo))

結果

(10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
  34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
   58 59)

足りない機能を実装できるのでLispはやはり強力だなあと思う

PySpaアドベントカレンダー 22日目

はじめに

このエントリはPySpaアドベントカレンダーの22日目のエントリです。

PHPとかJavaがクソとか言うエントリではありませんのでご注意願います。

PySpaについて

まあ言うことはありません。ロビーでイリーガルな話やヒドいい話などをする合宿です。

コードとか二の次であってみんなの闇を共有する合宿です。

多分逮捕者がでてもおかしくないです。

最近作ってるもの

せっかくなので最近作ってるもの話をしておきます。

dismage

dismage - MySQL Protocol Server

要は MySQL Protocol を話す Server です。WSGI ライクなインターフェイスを持っています。

isucon 用に作りはじめたのがきっかけです。内部は libdrizzle で作られています。

全て非同期のAPIを使って実装されていますが、毎度のことながら greenlet でその辺を隠蔽してあるので使う人何も意識することはありません。

ほぼ C で書いてあるのでパフォーマンスはそれなりに出ます。

パフォーマンスをあげるためキャッシュなどをする場合、Web アプリケーション側に手が入ってしまったりするのですが(Middleware使えばいいのですが) それらをDB側の方へ追いやることができます。

サンプルコード

from dismage import *
from dismage import patch
patch.patch_all()

def app(cmd, cmd_data, start_result):

    print("cmd:%s data:%s start_response:%s" % (cmd, cmd_data, start_result))
    if cmd == COMMAND_INIT_DB or "COMMIT" in cmd_data:
        return
    columns = [(COLUMN_TYPE_VARCHAR, 10, 'name'), (COLUMN_TYPE_INT24, 8, 'age')]
    start_result(columns=columns)
    return [('john', 30), ('smith', 28)]

listen(port=3307)
run(app)
  • cmd には COMMAND の種別が入ってくる
  • cmd_data には SQL 本文が入ってくる
  • start_result には SELECT 文で返す際の行の定義を設定する

上記の例だと以下のSQLを実行して結果を返しているようなイメージになります。

SELECT name, age FROM x;

SELECT 場合のみ start_result に返す行の定義を設定しますが、その他の場合には設定しなくてよいです。

返り値もNoneで返せばOKです。

非同期対応

socket にパッチがあたってるので pure python な 通信は非同期通信が行われます。

例えば内部キャッシュに無いデータを本来の MySQL へ問い合わせる(中継するイメージ)場合 pymysql などを使えば非同期で MySQL へ問い合わされます。

応用

まだ開発中ですが応用の用途としては以下のようなものがあります。

  • テスト用の MySQL Sever として使う
  • 複数の DB に同時に書き込む
  • SQL を解析して自前で sharding する
  • SQL を解析して一部の SQL を Hive に流す
  • SQL を解析して一部の SQL を Impala に流す
  • INSERT 文を解析して KVS にデータを入れる
  • INSERT 文を解析して fluentd にデータを入れる

ドライバーによっては SQL かどうか判断していないので好きなデータを送信して処理することもできると思います。

(URL を送ってアクセスした結果を返すなどなど)

また WSGI Middleware のようなものも書けるので Middlwareで SQL の方言を直したり、フィルターをかけたりといったこともできます。

お手伝いしてくれる人など募集してます。

(特にlibdrizzleの認証とこに詳しい人)

素数を求める

特に難しいことはない。
とりあえずメモリが許すまで。
100万個ぐらいは数えれる。


fn prime(n: uint) {
    let mut prime: ~[uint] = do vec::build_sized(n) |push| {
        let mut i: uint = 0;
        while i < n {
            push(0);
            i += 1;
        }};
    let mut ptr: uint = 0;
    let mut j: uint = 5;

    prime[ptr] = 2;
    ptr += 1;
    prime[ptr] = 3;
    ptr += 1;
    
    loop {
        let mut i: uint = 1;
        let mut flg: bool = false;
        
        while prime[i] * prime[i] <= j  {

            if (j % prime[i] == 0) {
                flg = true;
                break;
            }
            i += 1;
        }
        if !flg {
            prime[ptr] = j;
            ptr += 1;
        }
        if prime[n - 1] > 0 {
            break;
        }
        j += 2;
    }

    for prime.each |e| {
        io::println(fmt!("%u", *e));
    }
}

fn main() {
    let args = os::args();
    let n = int::from_str(args[1]).get() as uint;
    prime(n);
}

KVMで無線LANブリッジ接続(DHCP)

最近ではVMが立ち上げれるぐらいのノートパソコンも安価に手に入るようになりました。
私のマシンもi7搭載のノートでKVMも入れてるのですが、ブリッジ接続の例がeth0のばっかで無線LANでブリッジをやる方法がよくわかりませんでした。
(NATで使ってた)
で最近マジメに設定してみようと思い調べたのでメモっておきたいと思います。
ちなみに環境はUbuntuです。RedHat系だと簡単にできるのかも知れません。

ちなみに適当にeth0と同じように書いてもwlan0ははじかれたり、それっぽい設定をしてもDHCPでうまくつながらなかったりします。
この方法だとゲスト、ホストともにDHCPでつながります。
あっちこっち持ち運んで作業するノートなどではDHCPの方が楽だと思います。

#!/bin/sh

/sbin/brctl addbr br0
/usr/sbin/tunctl -t tap0
/sbin/brctl addif br0 tap0
/sbin/ip addr add 192.168.2.200 dev br0
/sbin/ip link set br0 up

/bin/echo 1 > /proc/sys/net/ipv4/conf/wlan0/proxy_arp
/bin/echo 1 > /proc/sys/net/ipv4/conf/br0/proxy_arp
/bin/echo 1 > /proc/sys/net/ipv4/conf/tap0/proxy_arp

/usr/sbin/parprouted wlan0 br0
/usr/sbin/bcrelay -d -i br0 -o wlan0

ip addr addのとこはDHCPレンジ外を指定します。
このシェルをrootで実行後、virt-managerでゲストのネットワークデバイスで br0を指定すればつながります。
rc.localなどに書いてもいいかもしれませんね。

protocolを使って既存の関数の振る舞いを変える

こんにちわ、高校生です。
今回はprotocolを使った話です。

通常の場合

user=> (bit-and "生" "死")
IllegalArgumentException bit operation not supported for: class java.lang.String  clojure.lang.Numbers.bitOpsCast (Numbers.java:994)

bit-andはNumbersしか受け付けない関数なので当たり前のごとくうまくいきません。
異なる型でもいい感じに処理をして欲しい場合にはprotocolで既存関数も拡張ができます。

(defprotocol bit-protocol
  (bit-and [x y]))

(extend-protocol bit-protocol
  java.lang.String
  (bit-and [x y]
    (let [a (Character/codePointAt x 0)
          b (Character/codePointAt y 0)
          c (bit-and a b)]
      (apply str (Character/toChars c))))

  java.lang.Object
  (bit-and [x y]
    (clojure.core/bit-and x y)))

(bit-and "生" "死")

上記のように型ごとに処理が書けます。

Warning: protocol #'user/bit-protocol is overwriting function bit-and
WARNING: bit-and already refers to: #'clojure.core/bit-and in namespace: user, being replaced by: #'user/bit-and

Warningが出るものの既存の関数をoverwriteすることができます。
もちとんns上も置き換わるのでそちらの警告も出ますが。