便所の落書き2 非同期初心者によるデータベース処理
mata benjyo ni twisted no code ga kaiteatta yo!!!
daremo RowObject nante tukawanaiYO!
demo RowObjectwo jidoude seisei dekitara iiyone!!
お決まりのintropectionを使う。
mysqlのサンプル。DBごとにintrospectionを作れば対応可能
db.mysql.introspection.py
from MySQLdb.constants import FIELD_TYPE def quote_name( name ): if name.startswith( "`" ) and name.endswith( "`" ): return name return "`%s`" % name def get_table_list(cursor): cursor.execute("SHOW TABLES") return [row[0] for row in cursor.fetchall()] def get_table_description( cursor, table_name ): cursor.execute( "SELECT * FROM %s LIMIT 1" % quote_name( table_name ) ) return cursor.description def get_indexes(cursor, table_name): cursor.execute("SHOW INDEX FROM %s" % quote_name(table_name)) indexes = {} for row in cursor.fetchall(): indexes[row[4]] = {'primary_key': (row[2] == 'PRIMARY'), 'unique': not bool(row[1])} return indexes DATA_TYPES_REVERSE = { FIELD_TYPE.BLOB: 'text', FIELD_TYPE.CHAR: 'char', FIELD_TYPE.DECIMAL: 'float8', FIELD_TYPE.DATE: 'date', FIELD_TYPE.DATETIME: 'timestamp', FIELD_TYPE.DOUBLE: 'float8', FIELD_TYPE.FLOAT: 'float8', FIELD_TYPE.INT24: 'bigint', FIELD_TYPE.LONG: 'bigint', FIELD_TYPE.LONGLONG: 'bigint', FIELD_TYPE.SHORT: 'smallint', FIELD_TYPE.STRING: 'text', FIELD_TYPE.TIMESTAMP: 'timestamp', FIELD_TYPE.TINY: 'boolean', FIELD_TYPE.TINY_BLOB: 'text', FIELD_TYPE.MEDIUM_BLOB: 'text', FIELD_TYPE.LONG_BLOB: 'text', FIELD_TYPE.VAR_STRING: 'varchar', }
RowObjectクラスをDBより生成するFactory。
あくまでクラスを生成する。クラス宣言なしにいきなり使える。
db.__init__.py
from twisted.enterprise import adbapi, row import re def camelize(str): p = re.compile(r'_([a-z])') return p.sub((lambda match:match.groups(0)[0].upper()), str.capitalize()) class RawObjectFactory(object): def __init__(self, engine): self.introspection_module = (lambda: __import__('db.%s.introspection' % engine, '', '', ['']))() def get_table_list(self, cursor): return self.introspection_module.get_table_list(cursor) def get_table_description(self, cursor, table_name): return self.introspection_module.get_table_description(cursor, table_name) def get_class(self, cursor, table_name): rowColumns = [] rowKeyColumns = [] indexes = self.introspection_module.get_indexes(cursor, table_name) for i, rows in enumerate(self.get_table_description(cursor, table_name)): name = rows[0] column_type = self.introspection_module.DATA_TYPES_REVERSE[rows[1]] rowColumns.append((name, column_type)) isPk = False if name in indexes: if indexes[name]['primary_key']: rowKeyColumns.append((name, column_type)) classdict = dict(rowTableName = table_name, rowColumns = rowColumns, rowKeyColumns = rowKeyColumns) return type(camelize(table_name)+"Object", (object, row.RowObject,), classdict) #sample if __name__ == "__main__": from twisted.enterprise import reflector as ref from twisted.enterprise.sqlreflector import SQLReflector from twisted.internet import reactor table_name = "rss_feed" _dbmodule="MySQLdb" connect_info=dict(host="localhost",db="hatenaclone",user="root",passwd="root",charset="utf8", use_unicode=True) dbpool = adbapi.ConnectionPool(_dbmodule, **connect_info) con = None try: con = dbpool.connect() cursor = con.cursor() factory = RawObjectFactory("mysql") list = factory.get_table_list(cursor) clazz = factory.get_class(cursor, table_name) reflector = SQLReflector(dbpool, [clazz]) d = reflector.loadObjectsFrom(table_name) def gotEntry(entries): for e in entries: print e.id def err(err): print err d.addCallback(gotEntry).addErrback(err) reactor.run() finally: if con: dbpool.disconnect(con)