首页 > 学院 > 开发设计 > 正文

sqlAlchemy的sql语句查询

2019-11-06 08:22:54
字体:
来源:转载
供稿:网友

1、条件查询

//这里得到的结果是一个list类型PRogram_ids = session.query(Program).filter(Program.contentId==values["contentId"]).all()//通过获取第一个数据对象,然后直接.属性的方式就可以获取program_id = program_ids[0].id# -*- coding: utf-8 -*-from sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column,Integer,Stringfrom sqlalchemy.orm import sessionmaker# 创建对象的基类:Base = declarative_base()global engine#建立数据库表def create_all_tables(DB_type,DB_host,DB_port,DB_name,username,passWord,charset="utf8"): global engine if DB_type.upper() == "MySQL": DB_URI = "mysql+mysqldb://%s:%s@%s:%s/%s?charset=%s" % (username,password,DB_host,DB_port,DB_name,charset) engine = create_engine(DB_URI,echo=True) # 寻找Base的所有子类,按照子类的结构在数据库中生成对应的数据表信息 Base.metadata.create_all(engine)#返回数据库会话def loadSession(): Session = sessionmaker(bind=engine) session = Session() return sessionclass User(Base): __tablename__="User" # 表的结构: id = Column(Integer,primary_key=True) userName = Column(String(50),nullable=False,default="Noob") password = Column(String(50),nullable=False,default="123456") gender = Column(String(1),nullable=True,default=None) def __init__(self,id,userName,password,gender = None): self.id = id self.userName = userName self.password = password self.gender = genderif __name__=="__main__": #建表 create_all_tables("mysql","localhost",3306,"test","root","123456") #获取数据库会话 session = loadSession() #增加 u1 = User(id=1,userName="Rose",password="aaaa",gender="F") u2 = User(id=2, userName="Joe", password="bbbb",gender="M") u3 = User(id=3, userName="jack", password="bbbb", gender="M") u4 = User(id=4, userName="Billy", password="cccc") session.add(u1) session.add(u2) session.add(u3) session.add(u4) session.commit() # 封装好的新增的方法 def my_insert(self, data): result = {'errorcode': -1} try : if data.has_key("createTime") is False and hasattr(self, "createTime"): setattr(self, "createTime", str(datetime.datetime.utcnow())[:19]) setattr(self, "updateTime", str(datetime.datetime.utcnow())[:19]) for key, value in data.items(): if hasattr(self, key): setattr(self, key, value) self.session.add(self) self.session.flush() result = self.to_dict(self.__dict__) # self.session.commit() result["errorcode"] = 0 except Exception : return result finally: return result #删除 pro_movie_res = session.query(ProgramMovie).filter(ProgramMovie.programId == program_id).delete(synchronize_session=False) #批量删除 poster_res = session.query(Poster).filter(Poster.id.in_(post_ids)).delete(synchronize_session=False) #更新(如果参数是一个dict,会对应更新相关数据) session.query(User).filter(User.userName == "jack").update({User.password:"xxxx"}) session.commit() #查询 #查第一行 session.query(User.id,User.userName,User.password).first() #查所有行 session.query(User.id, User.userName, User.password).all() #根据id倒序并取前两行 session.query(User).order_by(User.id.desc()).limit(2)
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表