Doge log

Abby CTO 雑賀 力王のオフィシャルサイトです

DBUnit 0.11

というわけでvimも少しづつ触りながらいじってみた。
名前決めてないんだけどさー。
Excelでデータ入れれるのってやっぱいいよね!
ニーズ的にびみょーなんだけどdoctestやpyunitxから使えるように関数化しておいた方が良いのではないか。

dbunit/__init__.py
import dataset
import unittest

DEBUG = False

def read_xls_write_db(engine, connection, file):
    reader = dataset.XlsReader(file)
    for table in reader.dataset :
        _insert(engine, connection, table)

def read_xls_all_replace_db(engine, connection, file):
    reader = dataset.XlsReader(file)
    for table in reader.dataset :
        _delete(engine, connection, table)
        _insert(engine, connection, table)

def read_xls_replace_db(engine, connection, file):
    reader = dataset.XlsReader(file)
    for table in reader.dataset :
        _update(engine, connection, table)

def _delete(engine, connection, table):
    sql = _create_delete_sql(table)
    connection.cursor().execute(sql, [])                
        
def _insert(engine, connection, table):
    sql = _create_insert_sql(engine, connection, table)
    for data in table.data:
        row_data = []
        for cnt, v in enumerate(data):
            type, pk = table.meta[table.header[cnt]]
            row_data.append(_get_object(type, v))
        if DEBUG :
            print row_data
        connection.cursor().execute(sql, row_data)                

def _update(engine, connection, table):
    sql = _create_update_sql(engine, connection, table)
    for data in table.data:
        row_data = []
        where = []
        for cnt, v in enumerate(data):
            type, pk = table.meta[table.header[cnt]]
            if pk:
                where.append(_get_object(type, v))
            else :
                row_data.append(_get_object(type, v))
        row_data = row_data+where 
        if DEBUG :
            print row_data
        connection.cursor().execute(sql, row_data)                

def _create_delete_sql(table):
    table_name = table.name
    sql = 'DELETE FROM %s ' % table_name
    if DEBUG :
        print str(sql)
    return str(sql)
                    
def _create_insert_sql(engine, connection, table):
    table_name = table.name
    if not table.__dict__.has_key('meta') :
        table.meta = _get_description(engine, connection, table_name)
    sql = 'INSERT INTO %s (%s) VALUES(%s)' % (table_name, ','.join(table.header), ','.join('%s' for x in range(len(table.header)))) 
    if DEBUG :
        print str(sql)
    return str(sql)

def _create_update_sql(engine, connection, table):
    table_name = table.name
    if not table.__dict__.has_key('meta') :
        table.meta = _get_description(engine, connection, table_name)
    update = [];
    where = [];
    for column in table.header:
        field, pk = table.meta[column]
        if pk:
            where.append(column)
        else :
            update.append(column)            
    sql = 'UPDATE %s SET %s WHERE %s' % (table_name, ','.join([ '%s=%%s ' % x for x in update]), 'AND '.join([ '%s=%%s ' % x for x in where])) 
    if DEBUG :
        print str(sql)
    return str(sql)

def _get_description(engine, connection, table_name):
    introspection_module = (lambda: __import__('dbunit.db.%s.introspection' % engine, '', '', ['']))()
    meta = {}
    cursor = connection.cursor()
    if isinstance(table_name, unicode):
        table_name = table_name.encode('ms932')
    indexes = introspection_module.get_indexes(cursor, table_name)
    for i, row in enumerate(introspection_module.get_table_description(cursor, table_name)):
        name = row[0]
        type = introspection_module.DATA_TYPES_REVERSE[row[1]]
        isPk = False
        if name in indexes:
            if indexes[name]['primary_key']:
                isPk = True                    
        meta[name] =  type, isPk
    return meta   

def _get_object(type, value):
    if type == 'DateTimeField' :
        return value
    elif type == 'DateField' :
        return value
    elif type == 'CharField' :
        return str(value)
    elif type == 'IntegerField' :
        return int(value)
    elif type == 'FloatField' :
        return float(value)
    else :
        return value


class DBUnitTestCase(unittest.TestCase):

    def engine(self):pass

    def connection(self):pass
    
    def setUp(self):
        self.connection = self.connection()
        self.engine = self.engine()
        if DEBUG:
            print "connect databse"
    
    def tearDown(self):
        self.connection.rollback()
        self.connection.close()
        if DEBUG:
            print "rollback databse"
    
    def read_xls_write_db(self, file):
        read_xls_write_db(self.engine, self.connection, file)

    def read_xls_all_replace_db(self, file):
        read_xls_all_replace_db(self.engine, self.connection, file)

    def read_xls_replace_db(self, file):
        read_xls_replace_db(self.engine, self.connection, file)
    

とメソッドを外だしして関数化。
中身は何も替えてない。vimリファクタリング・・・・はなかなか難しいかな・・・・。
ブロック単位でかっぱらってきてごにょるのが一般的なのかも。
で使うときは1行

read_xls_write_db('postgres', connection, 'C:/test.xls')

で使えるようにしておく。
これなら少しは融通利きそう。
初期データの投入などにも使えるかな。
バッチ処理してないので遅いけど)
で次に一応DataSet同士での比較をできるようにどうしようか悩み中。
でとりあえずdict同士のように扱えるようにしておこうかなと。dictを拡張。

dbunit/dataset.py
from pyExcelerator import *
import sys


class DataTable(dict):
    def __getitem__(self, key):
        if dict.has_key(self, key):
            return dict.__getitem__(self, key)
        return dict.__getitem__(self, realkey)

    def __setitem__(self, key, value):
        return dict.__setitem__(self, key, value)

    def get(self, key, default=None):
        if self.has_key(key):
            return self[key]
        else:
            return default

    def setdefault(self, key, value):
        if not self.has_key(key):
            self[key] = value
        return self[key]
        
    def has_key(self, key):
        try:
            return hasattr(self, key) or dict.has_key(self, key)
        except AttributeError:
            return False
        
    def __getattr__(self, key):
        try:
            return self.__dict__[key]
        except KeyError:
            pass
        try:
            assert not key.startswith('_')
            return self.__getitem__(key)
        except:
            raise AttributeError, "object has no attribute '%s'" % key

    def __setattr__(self, key, value):
        if key.startswith('_') or key == 'data':
            self.__dict__[key] = value
        else:
            return self.__setitem__(key, value)

    def __contains__(self, key):
        return self.has_key(key)
 
class XlsReader(object):
    def __init__(self, file):
        self.file = file;
        self.dataset = []
        for sheet_name, sheet_values in parse_xls(file): 
            self.create_table(sheet_name, sheet_values)
        
    def create_table(self , sheet_name, sheet_values):
        table = DataTable()
        table.name = sheet_name
        table.header = []
        table.data = []
        self.dataset.append(table)
        for row_idx, col_idx in sorted(sheet_values.keys()):
            v = sheet_values[(row_idx, col_idx)]
            if isinstance(v, unicode):
                v = v.encode('utf8')
            if row_idx == 0:
                table.header.append(v)
            else :
                if col_idx == 0:
                    data = []
                    data.append(v)
                    table.data.append(data)
                else:
                    table.data[-1].append(v)
                    

多分あってるはず・・・・。
このコードだと2.1以降で動かす事前提かな?
feedparserのdictの拡張を見てそのまんまパクリました。
属性も辞書内のアクセスもごった煮でアクセスできるようにしちゃう。
これだとjavascriptに近い記述になるよなー。
そう考えるとjavascriptpythonもしくはその逆に近い事って結構楽チンにできそうな気もする。
あと全然話変わるけどやっぱvimで:pyf時のsys.pathと普通pythonのpathはちゃう。
個人的には:!pythonで起動しちゃう方がいいと思う。

noremap <C-P> :!python %<CR>

私はこれで一発起動してる。
さーてvim&pythonに慣れるため素振りするかなあ。

うくく。