at_yasu's blog

ロード的なことを

pythonでsqlite

PythonでSQLを遣いたく思い、サードパティのモジュールをインストールしなくていい方法を探していたら、どうもPythonにはSQLiteがデフォルトで入っている気配。早速使ってみた。

以下、メモ書き+今回作ったサンプルにもならない読みにくいソース+参考サイト


他の高級?なSQLに比べてちょっと変わっている点。

  • DBにアクセスするのに必要なのはパスだけ。ユーザ、パスワードは無いっぽい。寧ろ、ローカルからのアクセスしか想定していない為、DBサーバとアプリサーバを切り分けるってことを想定していない*1っぽい。ってマニュアル読んだら、userとかpasswordオプションがある。どういう風に使うのかは全くわかりません。そもそもどうやってユーザ管理するんだろ・・・
  • 使い方は至って簡単で、sqlite3を読み込み、パスを指定してインスタンスを生成。後は、executeとかごにょごにょ
  • SQLの実行は、execute。実行したら結果を格納したインスタンスを返す。
  • SQLInjection攻撃を妨害する為に、executeのSQL分は"%s"を使うんじゃなくて、execute("?",...)と書くべし*2
  • ちなみにthread-safeだそうな。
  • SQLiteのtableのフィールドの型は、(INTEGER|REAL|TEXT|BLOB)だけなんだぜ。
  • sqlite3がエラーを起こすと、StandardErrorを基礎とした例外が発生する。
# -*- coding: utf-8 -*-
#
# $Id: mydb.py 200 2008-03-06 23:27:45Z yasui $
#
# ex, http://www.python.org/dev/peps/pep-0249/
#     http://docs.python.org/lib/module-sqlite3.html

import sqlite3

class mydb:
	_conf = {'dbPath':'db'}
	_conn = None
	_debug = False
	
	def __init__ (self,hashref) :
		if 'dbPath' in hashref:
			self._conf['dbPath'] = hashref['dbPath']
		if 'debug' in hashref:
			self._debug = hashref['debug']
		
		self._conn = sqlite3.connect(self._conf['dbPath'])

	def getColumn (self, table, field, opt='') :
		sql = "select " + ",".join("?"*len(field)) + " from " + table+" "+opt
		c = self._conn.cursor()
		
		if self._debug :
			print sql,",",",".join(field)
		
		c.execute(sql, field)
		buff = c.fetchall()
		c.close()
		return buff
	
	# field =>dict, key=>column data, val => column name
	def append (self, table, field) :
		colname = [field[v] for v in field]
		coldata = [v for v in field]
		sql = "insert into " + table + "(`"+"`,`".join(colname)+"`) values " + \
			"(" +",".join(coldata) + ")"
		
		if self._debug :
			print sql
		
		c = self._conn.cursor()
		c.execute(sql,field)
		c.close()
	
	def delete (self, table, opt) :
		sql = "delete from ? where " + opt
		
		if self._debug :
			print sql
		
		c = self._conn.cursor()
		c.execute(sql,table)
		c.close
	
	# fields => dict, key=>field name, val=>field type
	def createTable (self, table, fields) :
		arr = [k+" "+fields[k] for k in fields]
		c = self._conn.cursor()
		sql = "create table "+table+" (" + ",".join(arr) + ")"
		
		if self._debug :
			print sql
		
		try :
			c.execute(sql)
		except StandardError,e :
			self._conn.rollback()
			c.close()
			return False
		
		self._conn.commit()
		c.close()
		return True
	
	def commit (self) :
		self._conn.commit()
	
	def connection (self):
		return self._conn



def test(dbname):
	tableName = 'test_table'
	db = mydb({'dbPath':dbname, "debug":True})
	
	# create table
	b = db.createTable(tableName,{'id':'INTEGER PRIMARY KEY AUTOINCREMENT','test':'TEXT'})
	if b == False :
		print "Exception Error: db.craeteTable"
	
	# Insert the data
	testdata = {"'test'":'test',"'test1'":'test',"'test2'":'test',"'test3'":'test',
				"'test4'":'test',"'test5'":'test',"'test6'":'test'}
	for i in testdata :
		db.append(tableName,{i:testdata[i]})
	db.commit()
	
	col = lambda d,t,col,q : d.getColumn(t,col,q)
	def fors(a):
		print len(a) 
		for v in a: print v
	
	arr = col(db,tableName,['test','id'],''); fors(arr)
	
	arr = col(db,tableName,['test']," where test like '%1'"); fors(arr)
	
	arr = col(db,tableName,['test']," where test like '%st%'"); fors(arr)
	
	arr = col(db,tableName,['test']," where test like 'test.%'"); fors(arr)
	
	arr =  col(db,tableName,['test']," where test like 'aaa'"); fors(arr)
	


if __name__ == '__main__':
	import os
	try:
		test('testdb')
	except StandardError,e:
		print e
	try:
		os.remove('testdb')
	except : pass


参考サイト一覧

*1:nfsとかを使うのもあるけど、nfsがしんだら終わりなんだぜ?

*2:ODBCもこんな感じだよねぇ