Doge log

Abby CTO 雑賀 力王のオフィシャルサイトです

社員募集

アニョハセヨー(こんにちわ)、東方神起ジュンスです。

株式会社Abbyでは、中途社員を募集しております。

応募資格

プログラミング言語で、得意言語を最低1つお持ちの方(言語は問いません)
・弊社メンバーと協力して作業を行える方。
・やる気と技術力の向上心のある方。
・実務経験3年以上(3年未満の方でもやる気や実績を考慮します)

歓迎スキル

オープンソースソフトウエアへの貢献
C/C++JavaPythonFlex、.NET、PHPPerlRuby等での開発経験者
・プログラミングが大好きな方

詳細

詳細は、こちらの中途採用タブをご覧下さい。
http://www.abby.co.jp/recruit

応募に関しては、こちら
http://www.abby.co.jp/contact

注意事項

連絡の取れるメールアドレスを入力して下さい。
応募したのに連絡が無い場合には、再度連絡お願いします。
(過去に、入力したメールアドレスが間違っていた方がいました)

東方神起のメンバーは募集していませんのでご了承ください。

meinheld websocket

こんにちわ、しのまりファンことmopemopeです。
meinheld 0.3よりwebsocketをサポートする予定です。

websocketはmiddleware経由で取得できます。
WebSocketMiddlewareを使うとenvironからwebsocketを取得できます。

シンプルなチャットはこんな感じ。
meinheld/websocket_chat_simple.py at dev · mopemope/meinheld · GitHub

import os
from meinheld import server, middleware, websocket

participants = set()

"""
@websocket.WebSocketWSGI
def handle(ws):
    participants.add(ws)
    try:
        while True:
            print "ws.wait()..."
            m = ws.wait()
            print "recv msg %s" % m
            if m is None:
                break
            for p in participants:
                print "send message %s" % m
                a = p.send(m)
                print "%s" % a
    finally:
        participants.remove(ws)
"""

def websocket_handle(environ, start_response):
    ws = environ.get('wsgi.websocket')
    participants.add(ws)
    try:
        while True:
            print "ws.wait()..."
            m = ws.wait()
            print "recv msg %s" % m
            if m is None:
                break
            for p in participants:
                print "send message %s" % m
                p.send(m)
    finally:
        participants.remove(ws)
    return [""]

def dispatch(environ, start_response):
    """Resolves to the web page or the websocket depending on the path."""
    if environ['PATH_INFO'] == '/chat':
        return websocket_handle(environ, start_response)
    else:
        print "/"
        start_response('200 OK', [('Content-Type', 'text/html'), ('Content-Length', '814')])
        ret = [open(os.path.join(
                     os.path.dirname(__file__), 
                     'templates/websocket_chat.html')).read()]
        return ret
        
if __name__ == "__main__":
    server.listen(("0.0.0.0", 8000))
    server.run(middleware.WebSocketMiddleware(dispatch))

chromeでしかテストしてないですが動作します。
eventletではdecoratorで指定するのですが、environに埋めたほうが使い勝手がよいだろうということでこうしています。

flask版
meinheld/websocket_chat.py at dev · mopemope/meinheld · GitHub

from flask import Flask, render_template, request
from meinheld import server, middleware

SECRET_KEY = 'development key'
DEBUG=True

app = Flask(__name__)
app.config.from_object(__name__)

participants = set()

@app.route('/')
def index():
    return render_template('websocket_chat.html')

@app.route('/chat')
def chat():
    print request.environ
    ws = request.environ.get('wsgi.websocket')
    participants.add(ws)
    try:
        while True:
            print "ws.wait()..."
            m = ws.wait()
            print "recv msg %s" % m
            if m is None:
                break
            for p in participants:
                print "send message %s" % m
                p.send(m)
    finally:
        participants.remove(ws)
    return ""

        
if __name__ == "__main__":
    server.listen(("0.0.0.0", 8000))
    server.run(middleware.WebSocketMiddleware(app))

非常にシンプルに書けますね。
既存のフレームワークにも簡単に組み込めるので便利です。

meinheld Cotinuations

こんにちわ、韓流スターことmopemopeです。
meinheld 0.2よりContinuationをサポートしました。
これでlong poll(comet)も組めるようになります。

chatの例:http://github.com/mopemope/meinheld/tree/master/example/chat/

from flask import Flask, render_template, request, session, jsonify
import uuid
from meinheld import server, middleware


SECRET_KEY = 'development key'
DEBUG=True

app = Flask(__name__)
app.config.from_object(__name__)

def create_message(from_, body):
    data = {'id': str(uuid.uuid4()), 'from': from_, 'body': body}
    data['html'] = render_template('message.html', message=data)
    return data

cache = []
cache_size = 200
waiters = []

@app.route('/')
def index():
    return render_template('index.html', messages=cache)

@app.route('/a/message/updates', methods=['POST'])
def message_update():
    global cache, waiters
    c = request.environ.get(middleware.CONTINUATION_KEY, None)
    cursor = session.get('cursor')
    if not cache or cursor == cache[-1]['id']:
        waiters.append(c)
        c.suspend()

    assert cursor != cache[-1]['id'], cursor
    try:
        for index, m in enumerate(cache):
            if m['id'] == cursor:
                return jsonify({'messages': cache[index+1:]})
        return jsonify({'messages': cache})
    finally:
        if cache:
            session['cursor'] = cache[-1]['id']
        else:
            session.pop('cursor', None)
    
@app.route('/a/message/new', methods=['POST'])
def message_new():
    global cache, cache_size, waiters
    name = request.environ.get('REMOTE_ADDR') or 'Anonymous'
    forwarded_for = request.environ.get('HTTP_X_FORWARDED_FOR')
    if forwarded_for and name == '127.0.0.1':
        name = forwarded_for
    msg = create_message(name, request.form['body'])
    cache.append(msg)
    if len(cache) > cache_size:
        cache = cache[-cache_size:]

    for c in waiters:
        c.resume()
    waiters = []
    return jsonify(msg)


if __name__ == "__main__":
    server.listen(("0.0.0.0", 8000))
    server.run(middleware.SpawnMiddleware(app))

継続を使用するにはSpawnMiddlewareを使用します。
SpawnMiddlewareをかますとenvironからContinuation Objectが取り出せます。
suspendするとそこで処理が止まり抜けます。
resumeすると再開します。
これで1プロセス1スレッドでもchatアプリケーションが組めます。

継続

継続は協調スレッドで実現しています。APIはどうしようか迷いましたが、jettyを参考にしました。
geventなどはEventとしていますが、逆に混乱しそうなのでシンプルにしました。

協調スレッドの実体はgreenletですが、greenlet C/APIを使用しています。
(greenlet C/APIを使ってるライブラリは今のところないはず)

meinheldですが、次はGreenSocketの実装を考えています。
考えただけでボロクソにパフォーマンスが落ちそうですが。。。。

meinheldをリリースした

こんにちわ、闇金業者です。

Cの練習とPEP333の実装として作っていたpicowsですが、せっかくなのでpypiにあげることしました。
名前も変えてmeinheldにしています。
meinheld · PyPI

あとbitbucketもなんだか寂しい気がしてきたのでgithubに移動しました。

http_parserの変更など中身は大分変わっています。
明らかに過度のチューニングです。
hello_worldでabでベンチをとるといかに過度のチューニングであるかわかります。

インストール

linux onlyです。
pypiから

easy_install -ZU meinheld

使いかた

使い方は特に難しくありません。

from meinheld import server

def hello_world(environ, start_response):
    status = '200 OK'
    res = "Hello world!"
    response_headers = [('Content-type','text/plain'),('Content-Length',str(len(res)))]
    start_response(status, response_headers)
    return [res]

server.listen(("0.0.0.0", 8000))
server.run(hello_world)

シングルスレッドの非同期サーバなので並列処理をしたい場合はforkして下さい。
(その他、exampleを見てください。)
一応、プロセス名も変えれるクチも用意しています。
(multiprocessing のexampleがあるのでそれを参考にしてください)

運用(gunicorn)

運用用にworkerプロセスなどを管理するツールも書こうと思ったのですが、めんどくさくなってやめました。
代わりにgunicornのworkerを提供しています。
「今時gunicornで動かせないとか情弱すぎるww」と言われそうだったので作りました。

gunicornはunicornpython版です。
Gunicorn - Python WSGI HTTP Server for UNIX

簡単な例

gunicorn --workers=2 --worker-class="meinheld.gmeinheld.MeinheldWorker" gunicorn_test:app

worker class にmeinheld.gmeinheld.MeinheldWorkerをすればmeinheldで動作します。
unix domain socketもいけるのでnginxと同マシンで動作させる場合は使用した方がいいです。
workerの監視は毎秒親プロセスへの通知するとなんだかもったいない気がしたので10秒毎ぐらいになっています。
gunicornのタイムアウト値が短すぎるとkillされて新しいプロセスが立ち上がるので少し長めにしてください。
(30〜ぐらい)

I/O戦略

readに関してはread_callbackで毎回readで読んでるだけで普通です。
writeの方は今まで自前でバッファ連結してたのですが、writevを使うようにしています。
writevで書けるだけ書いて残があればwrite_callbackを仕掛けて残りもwritevで送ります。
送信時には一応TCP_CORKも立ててます。

セキュリティ

Long Header DoSでやられないようにfield-valueの最大値は8kになっています。
(長ければ400を返す。ヘッダ数自身も制限)
またreadが遅いクライアントは積極的にバッサリクローズします。
なのでslowlorisをかましても返ってこなくなることはないです。
クソでかいデータが来ると困るのでContent-Lengthが16M(デフォルト)以上は413を返します。
(set_max_content_lengthで変更できる)
また512k以上の場合はメモリではなくtmpfileにデータを吐き出してメモリのドカ食いを防ぎます。


なんだかんだで割とまともなモノができたかなあと思います。

http_parserを乗せ変えた話

こんにちわ、情弱DQNです。

最近は寝かしていたpicowsを見直しています。
その時にとある事が気になったのでhttp_parserを乗せ変えました。
でその時の話。

以前まで

今まではmongrel, thinなどでも使われているZedの書いたragelベースのhttp parser を使用していました。
もちろんですが、python版に書き直しています。
(余談ですがeventletの作者も同じようなものを作ってますが、メモリリークしてます...)

ragelベースのparesrは記述が比較的楽だし、それなりに速いので便利です。
(ただコールバックスタイルが好きかどうかって話もあるかも知れません)

mongrelのパーサの処理の流れは大体こんな感じです。

  • start actionで開始位置のポインタをマーク
  • 読み進め、条件にマッチしてるか判断
  • 条件にマッチしなくなるとendと解釈
  • end actionでstart actionでマークしたポインタと長さをコールバックに渡して呼ぶ

parserに流し込む文字列はパースし終わるまで保持しておかないといけません。
(開始位置のポインタがわからなくなる)

なのでsocketから読み込んだ文字列を連結しながらparseしていく方式です。
このやり方の場合、endの解釈まで文字列を保持しているので長いデータが来ると問題がある可能性があります。

例えば1G超のhttp header valueを送った場合、何も考えてないとそれだけ文字列を確保してしまします。
(end が呼ばれるまで何も状態が変わらないし、コールバックも呼ばれない)

これは昔にApacheもやられた手法です。
(Long Header DoS

もちろん多くの場合はフロントに別のhttp serverを置いたりするのでそこでガードする事はできます。
(nginxもapacheもヘッダ1行あたりのバッファサイズは8192byte(デフォルト)のはず)

現状

とはいえフロントのproxyがどこまでヘッダを検証してくれるのか、ヘッダをそのまま送ってくるのか等、イマイチ
信用できななかったので長い文字列をなるべく保持しないようにnode.jsでも使ってるhttp_parserを使用しました。

ry's http_parserの話

使用しているのはこれです。

http://github.com/ry/http-parser

このparserの元ネタはlibebbで、それを元にチューニングと機能追加しています。

特徴は以下

  • 文字列を溜め込まないでパースする

まあ細かいxxxxに対応とかもありますが、これが一番大きいかと思います。
そして大きな制限としてはfield-valueがmultilineに対応していないという点です。
(これはid:malaも書いていますが。)
これに関しては僕がforkして対応したものを作ってます。

http://github.com/mopemope/http-parser

で使い方。
コールバックの仕込みなどはtest、ドキュメントなどを見てください。

    size_t len = 8*1024, nparsed;
    char buf[len];
    ssize_t recved;

    recved = recv(fd, buf, len, 0);

    if (recved < 0) {
      /* Handle error. */
    }

    /* Start up / continue the parser.
     * Note we pass recved==0 to signal that EOF has been recieved.
     */
    nparsed = http_parser_execute(parser, &settings, buf, recved);

    if (parser->upgrade) {
      /* handle new protocol */
    } else if (nparsed != recved) {
      /* Handle error. Usually just close the connection. */
    }

readしたものをそのままparseしています。
データがまたがったらどうするんじゃい!ってツッコミがあると思いますが、コールバック
の呼ばれ方に少し癖があります。

このhttp_parserは読み込むバッファがなくなった際もその状態のコールバックを呼びます。
状態はparesrが保持しているので、再度読み込むバッファを突っ込んでパースを継続します。

コールバックの例;

struct line {
  char *field;
  size_t field_len;
  char *value;
  size_t value_len;
};

#define CURRENT_LINE (&header[nlines-1])
#define MAX_HEADER_LINES 2000

static struct line header[MAX_HEADER_LINES];
static int nlines = 0;
static int last_was_value = 0;

void
on_header_field (http_parser *_, const char *at, size_t len)
{
  if (last_was_value) {
    nlines++;

    if (nlines == MAX_HEADER_LINES) ;// error!
    
    CURRENT_LINE->value = NULL;
    CURRENT_LINE->value_len = 0;

    CURRENT_LINE->field_len = len;
    CURRENT_LINE->field = malloc(len+1);
    strncpy(CURRENT_LINE->field, at, len);

  } else {
    assert(CURRENT_LINE->value == NULL);
    assert(CURRENT_LINE->value_len == 0);

    CURRENT_LINE->field_len += len;
    CURRENT_LINE->field = realloc(CURRENT_LINE->field,
        CURRENT_LINE->field_len+1);
    strncat(CURRENT_LINE->field, at, len);
  }

  CURRENT_LINE->field[CURRENT_LINE->field_len] = '\0';
  last_was_value = 0;
}

void
on_header_value (http_parser *_, const char *at, size_t len)
{
  if (!last_was_value) {
    CURRENT_LINE->value_len = len;
    CURRENT_LINE->value = malloc(len+1);
    strncpy(CURRENT_LINE->value, at, len);

  } else {
    CURRENT_LINE->value_len += len;
    CURRENT_LINE->value = realloc(CURRENT_LINE->value,
        CURRENT_LINE->value_len+1);
    strncat(CURRENT_LINE->value, at, len);
  }

  CURRENT_LINE->value[CURRENT_LINE->value_len] = '\0';
  last_was_value = 1;
}

途中で途切れる可能性があるのでstrncatで連結します。
また新しいヘッダかどうかの判断をフラグで管理します。
(last_was_value)

少し脱線しますが、この方式だとパフォーマンス的にどうなの?って話になると思います。
読み込みバッファは毎回捨て同然なので必要な値は、自前でコピーし、保持しておく必要があります。
最初memcpyが大量に発生するのが予想されたので少し気になりました。
実際ゴリゴリ組んでみるとそこまで問題ないようです。
(ココを気にするよりもっと気にすることがあるのが大半です)

で注目すべきは読み込むバッファがなくなった際もその状態のコールバックを呼ぶ点です。
ragelではendがくるまでールバックが呼ばれませんが、こいつはバッファが切れた時点でも呼ばれるので
そのタイミングでいろいろとチェックすることができます。

コールバックでは文字列の長さも渡されるので長すぎればその時点で弾くことできますし、
(最高でreadしたバッファサイズの文字列がコールバックで渡される)
既に保持している文字列と連結した結果長いと予想されるケースも検出できます。
(ragelでは何も考えないとずーとコールバックが呼ばれず下手するとクソでかいバッファを保持しかねない)

気になったこと

長くなりましたが、結局気になったことはWSGI(Rack PSGI)サーバなどはどこまでセキュリティに気をつけて作ればいいのか?って点です。
HTTPをしゃべれる速いサーバがあるのはいいんですが、チェックがざるでいいのか。
Long Header DoS、slowlorisでもくじけないようにすべきなのかどうか。
(まあフロントありきで表にでなきゃまあいいんでしょうけどねえ,ちなみにpicowsは両方共にくじけません!)

参考にいろいろな実装見てると気になることがどんどん増えてしまうなあ。。。。

プロセス名を書き換える

こんにちわ、情弱です。
村崎百郎が殺されて少し動揺しています。

話は変わって。。。
perlなどは$0でプロセス名を変えたりできて便利ですよね。
pythonではできるの?って話です。

ダメかと思ったが一応できるっぽい。
誰も書いてなさそうなので書いておきます。

PyObject *
set_process_name(PyObject *self, PyObject *args)
{
    int i = 0,argc,len;
    char **argv;
    char *name;

    if (!PyArg_ParseTuple(args, "s:process name", &name)){
        return NULL;
    }

    Py_GetArgcArgv(&argc, &argv);

    for(i = 0;i < argc; i++){
        len = strlen(argv[i]);
        memset(argv[i], 0, len);
    }

    strcpy(*argv, name);
    Py_RETURN_NONE;
}

長さ的にどーなってんのかちゃんと調べてないので気をつけないといけないかも知れません。
(16byteってなんかなかったかな?)
これでpreforkでプロセス名をxxx-master, xxx-workerとかに変えれますね。
とりあえずflaskとか立ち上げてプロセス名を"starman master app.psgi", "starman worker app.psgi"とかにしてperl気分を味わいましょう。

consistent hashing

30越えて書いたことないなんて恥ずかしいので書いてみた。

#include <inttypes.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>

#define POINTS_PER_SERVER 100
#define MAX_HOST_LENGTH 128

typedef struct{
    uint32_t index;
    uint32_t value;
} point_item_t;

typedef struct {
    char *name;
    int port;
} server_t;

typedef struct {
    point_item_t *points;
    server_t *server_list;
    uint32_t server_count;
    uint32_t points_count;
} consistent_t;


static consistent_t * 
init()
{
    consistent_t *consistent;
    consistent = malloc(sizeof(consistent_t));
    
    /* MAX 10 server */
    consistent->server_list = malloc(sizeof(server_t) * 10);
    consistent->server_count = 0;
    return consistent;
}

static void
add_server(consistent_t *consistent, char *name, int port)
{

    uint32_t index = 0;
    index = consistent->server_count;
    if(index > 9){
        return;
    }
    consistent->server_list[index].name = name;
    consistent->server_list[index].port = port;

    consistent->server_count++;
    consistent->points_count = POINTS_PER_SERVER * consistent->server_count;
}


uint32_t 
hash_one_at_a_time(const char *key, size_t key_length);

static int 
point_item_cmp(const void *t1, const void *t2)
{
    point_item_t *ct1= (point_item_t *)t1;
    point_item_t *ct2= (point_item_t *)t2;

    if (ct1->value == ct2->value){
        return 0;
    }else if (ct1->value > ct2->value){
        return 1;
    }else{
        return -1;
    }
}

static void
update(consistent_t *consistent)
{
    uint32_t server_count;

    uint32_t point_index = 0;
    uint32_t server_index = 0;
    uint32_t i = 0;
    uint32_t counter= 0;
    uint32_t per_server= POINTS_PER_SERVER;
    uint32_t value;

    server_t *list = consistent->server_list;
    server_count = consistent->server_count;
    if(!consistent->points){ 
        consistent->points = malloc(sizeof(point_item_t) * POINTS_PER_SERVER * server_count);
    }

    
    for (server_index= 0; server_index < server_count; ++server_index) {

        for (i = 1; i <= per_server; i++){

            char host[MAX_HOST_LENGTH]= "";
            size_t host_length;
            
            host_length = (size_t) snprintf(host, MAX_HOST_LENGTH,
                                                  "%s:%u-%u",
                                                  list[server_index].name,
                                                  list[server_index].port,
                                                  i -1);
            value = hash_one_at_a_time(host, host_length);
            
            //printf("%s point %u\n", host, value);
            consistent->points[point_index].index = server_index;
            consistent->points[point_index].value = value;
            point_index++;
        }

    }
    
    //sort
    qsort(consistent->points, point_index, sizeof(point_item_t), point_item_cmp);
}

static server_t 
get_server(consistent_t *consistent, char *key, size_t key_length)
{
    uint32_t hash, index;
    hash = hash_one_at_a_time(key, key_length);
    uint32_t length = consistent->points_count;

    hash = hash;
    point_item_t *begin, *end, *left, *right, *middle;
    begin = left= consistent->points;
    end = right= consistent->points + length;

    while (left < right){
        middle = left + (right - left) / 2;
        if (middle->value < hash){
            left= middle + 1;
        }else{
            right= middle;
        }
    }
    if (right == end){
        right = begin;
    }
    return consistent->server_list[right->index];
}


uint32_t 
hash_one_at_a_time(const char *key, size_t key_length)
{
    const char *ptr= key;
    uint32_t value= 0;

    while (key_length--)
    {
        uint32_t val= (uint32_t) *ptr++;
        value += val;
        value += (value << 10);
        value ^= (value >> 6);
    }
    value += (value << 3);
    value ^= (value >> 11);
    value += (value << 15);

    return value;
}


int 
main()
{
    uint32_t i;
    consistent_t *consistent;
    consistent = init();
    add_server(consistent, "server01", 10001);
    add_server(consistent, "server02", 10002);
    add_server(consistent, "server03", 10003);
    add_server(consistent, "server04", 10004);
    update(consistent);
    
    /*
    for(i = 0; i < consistent->points_count; i++) {
        point_item_t c = consistent->points[i];
        printf("name %s point %u\n", consistent->server_list[c.index].name, c.value);
    }*/
    
    char key[128];
    server_t server;
    for(i = 0; i < 50; i++){
        sprintf(key, "data_key_%d", i);
        server = get_server(consistent, key, (int)strlen(key));
        printf("%s %s\n", key, server.name);
    }
}

最低限のシンプルな構成。
hash値はOne-at-a-Time Hash(jenkins)を使用。
サーバごとに100個のポイントを割り振る。重みづけはなし。

50個のキーで試したけどまあまあふれてると思う。

サーバ数を変えてみて別のキーに影響がないか見てみる。
3番目のサーバを消して走らせてみる。

    add_server(consistent, "server01", 10001);
    add_server(consistent, "server02", 10002);
    //add_server(consistent, "server03", 10003);
    add_server(consistent, "server04", 10004);

4つのものとの差はdiffでみてみた。

1c1< data_key_0 server03

    • -

> data_key_0 server01
4,7c4,7< data_key_3 server03< data_key_4 server03< data_key_5 server03< data_key_6 server03

    • -

> data_key_3 server02
> data_key_4 server02
> data_key_5 server04
> data_key_6 server04
9,10c9,10< data_key_8 server03< data_key_9 server03

    • -

> data_key_8 server02
> data_key_9 server02
16c16< data_key_15 server03

    • -

> data_key_15 server04
19c19< data_key_18 server03

    • -

> data_key_18 server01
28c28< data_key_27 server03

    • -

> data_key_27 server02
40c40< data_key_39 server03

    • -

> data_key_39 server02
44c44< data_key_43 server03

    • -

> data_key_43 server02
48c48< data_key_47 server03

    • -

> data_key_47 server02
50c50< data_key_49 server03

    • -

> data_key_49 server02

ちゃんと削除されてるserver03のキーのみ別のserverに割り当てられてる。
別のキーには影響がない。
まあ当たり前なんだろうけど。