Doge log

Abby CTO 雑賀 力王のオフィシャルサイトです

sqlalchemyで結果をキャッシュする

こんにちわ、sqlalchemyを日本で一番使い倒してるであろうmopemopeです。
sqlalchemyのcacheに関して具体的なコードを書いている人がいないのはみんなexampleのあれを見てるからなのか?
と思ったのですがあえて書いてみます。

SQLの内容を60秒間キャッシュする例:

import hashlib

from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import create_session, scoped_session, mapper as sqla_mapper
from sqlalchemy.orm.query import Query
from sqlalchemy.sql import visitors

class CachingQuery(Query):
    
    _timeout = 60
    
    def use_cache(self):
        # Not Impl
        pass
    
    def get_cache_engine(self):
        # Not Impl
        pass

    def _get_cache_key(self):
        key = hashlib.md5(str(self)).hexdigest()
        args = _params_from_query(self)
        args_key = " ".join([str(x) for x in args])
        key = key + "_" + args_key
        key = key.replace(' ', '_')
        return key

    def __iter__(self):

        cache_key =  self._get_cache_key()
        cache_engine = self.get_cache_engine()
        
        if not self.use_cache():
            cache_engine.delete(cache_key)
                
        ret = cache_engine.get(cache_key)
        if ret == None:
            ret = list(Query.__iter__(self))
            cache_engine.set(cache_key, ret, timeout = self._timeout)
        return iter(self.session.merge(x, dont_load=True) for x in ret)
        #return Query.__iter__(self)

def _params_from_query(query):
    
    v = []
    def visit_bindparam(bind):
        value = query._params.get(bind.key, bind.value)
        
        if callable(value):
            value = value()
        
        k = bind.params()
        v.append("%s:%s" % (k, value))
    
    for obj in query._from_obj:
        _criterion = obj.onclause
        if _criterion is not None:
            visitors.traverse(_criterion, {}, {'bindparam':visit_bindparam})

    if query._criterion is not None:
        visitors.traverse(query._criterion, {}, {'bindparam':visit_bindparam})
    return v


engine = create_engine("xxxxxx", convert_unicode=True)
metadata = MetaData()
session = scoped_session(lambda: create_session(engine, 
    autocommit=False,
    query_cls=CachingQuery), )

ちょっと動かしてるものから必要な箇所だけ修正してるのでミスってるかも。

キャッシュから読む条件とキャシュエンジンは各自で実装してください。
キャッシュに関してはwerkzeugにキャッシュのライブラリがあるのでそれを使うと楽です。

主な流れ

  1. SQL、条件からキャッシュキー生成
  2. キャシュキーでキャッシュからデータを引く
  3. あればキャッシュデータを現在のsessionに乗せてから返す。
  4. なければデータを引いてlist化
  5. listをキャッシュ
  6. 現在のsessionに乗せてから返す。

そもそもQueryクラスを置き換え可能であることを知ってればまあ大したことないですね。

キャッシュのキーは

SQLMD5 + (SQLへバインド変数 + 値)の列挙

という形式。
これで同SQLでも条件にバインドする値が違えば別扱いにできます。

ここで問題ないのがバインドする変数とバインドする値の列挙の仕方。
_params_from_queryでやってることがキモなんだけど。
基本的にはonclause(where句に相当するところ)にバインド情報が入っている。
なのでそこをtraverseで走査して値をかっさらう。
このままでも良さそうだが、traverseしているのはQuery自体のonclause部だけ。
(一番外側のwhere句)
join句の中の条件(絞り込んで結合するとか)は拾えない。
そのため_from_objでfrom内のonclauseも調べる必要があります。
これで全部の条件のバインド変数、値が取れます。

DataMapper系はeagerしないと子objectを引くのにドバドバSQL吐く可能性が高いのでキャッシュは有効だと思います。
またeagerでjoin系SQLを吐いてもいいのですが、別の結果の子objectとしてとってきたデータがを再度取得しちゃうケースがあったり少し無駄になる可能性があります。
(joinも気をつけないと遅いSQLを吐く可能性があったりしますけどね)

Queryをいじれる事を知ると、一部のデータをkvsから補完したり(どのentityにmapされてるとかも取れるので)など色々できるようになります。
またmapperをいじるとsession経由でkvsにつっこんだりもできたりするのですがその辺の話はまた機会があれば。