首页 > 学院 > 开发设计 > 正文

Pyton实现SQLHelper

2019-11-14 17:16:40
字体:
来源:转载
供稿:网友

看了廖老师的教程实现了这个模块,按照自己的思路实现了一个,代码附下。

 

 

 

 

 

 

 

 

 

 

 

 

 

需要说名的几点:

1. dbcontext继承自threading.local,确保每个线程中都有独立的一个dbcontext对象,保证个用户数据独立。

2. connection对象是对dbcontext对象的一个封装,实现了getcursor方法,和关闭方法。

3. 现在的代码不够DRY,select方法和update方法调用了connection对象后都要手动关闭,可以实现一个装饰器,把要调用connection对象的代码块儿包裹起来,初始化和关闭connection对象的代码提取到装饰器中。下次补上。

代码附下:

 

  1 import MySQLdb,logging,threading,time,uuid  2 logging.basicConfig(level=logging.INFO,format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',datefmt='%a, %d %b %Y %H:%M:%S')  3   4 def next_id(t=None):  5     '''  6     Return next id as 50-char string.  7   8     Args:  9         t: unix timestamp, default to None and using time.time(). 10     ''' 11     if t is None: 12         t = time.time() 13     return '%015d%s000' % (int(t * 1000), uuid.uuid4().hex) 14  15 class Dict(dict): 16     def __init__(self,names=(),values=(),**args): 17         super(Dict,self).__init__(**args) 18         for k,v in zip(names,values): 19             self[k]=v 20     def __getattr__(self, item): 21         try: 22             return self[item] 23         except KeyError: 24             raise AttributeError('it has no attribute named %s'%(str(item),)) 25     def __setattr__(self, key, value): 26         self[key]=value 27  28 class dbcontext(threading.local): 29     def __init__(self): 30         self.db= MySQLdb.connect(host="localhost",port=3306,passwd='toor',user='root',db='xdyweb') 31     def getdb(self): 32         return self.db 33  34 class conntection(object): 35     def __init__(self): 36         self.dbctx=dbcontext() 37         self.db=self.dbctx.getdb() 38         self.cursor=self.db.cursor() 39     def getcursor(self): 40         return self.cursor 41     def close(self): 42         self.db.close() 43         self.cursor=None 44 def _select(sql,first,*args): 45     conn=conntection() 46     csr=conn.getcursor() 47     sql=sql.replace('?','%s') 48     values=csr.execute(sql,*args) 49     try: 50         if csr.description: 51             names=[x[0] for x in csr.description] 52         if first: 53             values=csr.fetchone() 54             if values is None: 55                 return None 56             return Dict(names,values) 57         return  [Dict(names,x) for x in csr.fetchall()] 58     finally: 59         conn.close() 60  61 def select(sql,first,*args): 62     return _select(sql,first,*args) 63 def select_one(sql,pk): 64     return _select(sql,True,pk) 65  66 def update(sql,*args): 67     r''' 68     Execute update SQL. 69  70     >>> u1 = dict(id=1000, name='Michael', email='michael@test.org', passwd='123456', last_modified=time.time()) 71     >>> insert('user', **u1) 72     1 73     >>> u2 = select_one('select * from user where id=?', 1000) 74     >>> u2.email 75     u'michael@test.org' 76     >>> u2.passwd 77     u'123456' 78     >>> update('update user set email=?, passwd=? where id=?', 'michael@example.org', '654321', 1000) 79     1 80     >>> u3 = select_one('select * from user where id=?', 1000) 81     >>> u3.email 82     u'michael@example.org' 83     >>> u3.passwd 84     u'654321' 85     >>> update('update user set passwd=? where id=?', '***', '123/' or id=/'456') 86     0 87     ''' 88     conn=conntection() 89     csr=conn.getcursor() 90     sql=sql.replace('?','%s') 91  92     try: 93         csr.execute(sql,args) 94         return csr.rowcount 95     finally: 96         conn.close() 97 def insert(table,**kw): 98     ''' 99     Execute insert SQL.100 101     >>> u1 = dict(id=2000, name='Bob', email='bob@test.org', passwd='bobobob', last_modified=time.time())102     >>> insert('user', **u1)103     1104     >>> u2 = select_one('select * from user where id=?', 2000)105     >>> u2.name106     u'Bob'107     >>> insert('user', **u2)108     Traceback (most recent call last):109       ...110     IntegrityError: 1062 (23000): Duplicate entry '2000' for key 'PRIMARY'111     '''112     cols, args = zip(*kw.iteritems())113     sql = 'insert into `%s` (%s) values (%s)' % (table, ','.join(['`%s`' % col for col in cols]), ','.join(['?' for i in range(len(cols))]))114     logging.info(sql)115     return update(sql,*args)116 117 def main():118     # db=MySQLdb.connect(host="localhost",port=3306,passwd='toor',user='root',db='xdyweb')119 120     # conn=conntection()121     # c=conn.getcursor()122     # r=c.execute("select * from users where email=%s",('sss@sss.sss',))123     # logging.warning(r)124 125     # f=_select('select * from users where email =?',False,('sss@sss.sss',))126     # logging.info(f)127     # pass128 129     u1 = dict(id=2000, name='Bob', email='bob@test.org', passwd='bobobob', last_modified=time.time())130     r=insert('user', **u1)131     logging.info(r)132 if __name__=="__main__":133     main()
View Code

 


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表