python访问数据库语句(python连接数据库的方法)

python访问数据库语句(python连接数据库的方法)

大家好,我是安果!

在数据生产应用部门,取数分析是一个很常见的需求,实际上业务人员需求时刻变化,最高效的方式是让业务部门自己来取,减少不必要的重复劳动,一般情况下,业务部门数据库表结构一般是固定的,根据实际业务将取数需求做成sql 脚本,快速完成数据获取—授人以渔的方式,提供平台或工具

那如何实现一个自助取数查询工具?

基于底层数据来开发不难,无非是将用户输入变量作为筛选条件,将参数映射到 sql 语句,并生成一个 sql 语句然后再去数据库执行

具体思路:

一、数据库连接类

此处利用 pandas 读写操作 oracle 数据库

二、主函数模块

1)输入参数模块,外部输入条件参数,建立数据库关键字段映射

–注:读取外部 txt 文件,将筛选字段可能需要进行键值对转换

2)sql 语句集合模块,将待执行的业务 sql 语句统一存放到这里

3)数据处理函数工厂

4)使用多线程提取数据

一、数据库连接类

cx_Oracle 是一个 Python 扩展模块,相当于 python 的 Oracle 数据库的驱动,通过使用所有数据库访问模块通用的数据库 API 来实现 Oracle 数据库的查询和更新

Pandas 是基于 NumPy 开发,为了解决数据分析任务的模块,Pandas 引入了大量库和一些标准的数据模型,提供了高效地操作大型数据集所需的方法类和函数

pandas调用数据库主要有read_sql_table,read_sql_query,read_sql三种方式

本文主要介绍一下 Pandas 中 read_sql_query 方法的使用

1:pd.read_sql_query()读取自定义数据,返还DataFrame格式,通过SQL查询脚本包括增删改查。pd.read_sql_query(sql,con,index_col=None,coerce_float=True,params=None,parse_dates=None,chunksize=None)sql:要执行的sql脚本,文本类型con:数据库连接index_col:选择返回结果集索引的列,文本/文本列表coerce_float:非常有用,将数字形式的字符串直接以float型读入parse_dates:将某一列日期型字符串转换为datetime型数据,与pd.to_datetime函数功能类似。params:向sql脚本中传入的参数,官方类型有列表,元组和字典。用于传递参数的语法是数据库驱动程序相关的。chunksize:如果提供了一个整数值,那么就会返回一个generator,每次输出的行数就是提供的值的大小read_sql_query()中可以接受SQL语句,DELETE,INSERT INTO、UPDATE操作没有返回值(但是会在数据库中执行),程序会抛出SourceCodeCloseError,并终止程序。SELECT会返回结果。如果想继续运行,可以try捕捉此异常。2:pd.read_sql_table()读取数据库中的表,返还DataFrame格式(通过表名)importpandasaspdpd.read_sql_table(table_name,con,schema=None,index_col=None,coerce_float=True,parse_dates=None,columns=None,chunksize=None)3:pd.read_sql()读数据库通过SQL脚本或者表名importpandasaspdpd.read_sql(sql,con,index_col=None,coerce_float=True,params=None,parse_dates=None,columns=None,chunksize=None)

以下创建连接 oracel 数据库的连接类 Oracle_DB

主要提供 2 种操作数据的函数方法。

importcx_Oracle#Pandas读写操作Oracle数据库importpandasaspd#避免编码问题带来的乱码importosos.environ[‘NLS_LANG’]=’SIMPLIFIEDCHINESE_CHINA.UTF8’classOracle_DB(object):def__init__(self):try:#连接oracle#方法1:sqlalchemy 提供的create_engine()#fromsqlalchemyimportcreate_engine#engine=create_engine(‘oracle cx_oracle://username:password@ip:1521/ORCL’)##方法2:cx_Oracle.connect()self.engine=cx_Oracle.connect(‘username’,’password’,’ip:1521/database’)exceptcx_Oracle.Errorase:print(“Error %d:%s”%(e.args[0],e.args[1]))exit()#查询部分信息defsearch_one(self,sql,sparm):try:##查询获取数据用sql语句#代传参数:sparm–查询指定字段参数df=pd.read_sql_query(sql,self.engine,params=sparm)self.engine.close()exceptExceptionase:return”Error” e.args[0]returndf#查询全部信息defsearch_all(self,sql):try:##查询获取数据用sql语句df=pd.read_sql_query(sql,self.engine)self.engine.close()exceptExceptionase:return”Error” e.args[0]returndf

二、数据提取主函数模块

cx_Oracle 是一个 Python 扩展模块,相当于 python 的 Oracle 数据库的驱动,通过使用所有数据库访问模块通用的数据库 API 来实现 Oracle 数据库的查询和更新。

1)外部输入参数模块

txt 文本中,就包含一列数据,第一行列名,读取的时候忽略第一行

#建立ID——编号字典defbuildid():sqlid=”””select*fromb_build_info”””db=Oracle_DB()#实例化一个对象b_build_info=db.search_all(sqlid)ID_bUILDCODE=b_build_info.set_index(“BUILDCODE”)[“ID”].to_dict()returnID_bUILDCODE#通过文本传入待导出数据清单defread_task_list():build_code=buildid()tasklist=[]is_first_line=Truewithopen(“./b_lst.txt”)aslst:forlineinlst:ifis_first_line:is_first_line=Falsecontinuetasklist.append(build_code.get(line.strip(‘\n’))) #键值对转换returntasklist

2)业务sql 语句集合

注意in后面{0}不要加引号,这里传入为元组,params 参数传入sparm

= {‘Start_time’:’2021-04-01′,’End_time’:’2021-05-01′},此处参数可根据需要改变

defsql_d(lst):#逐月数据sql_d_energy_item_month=”””select*fromd_energy_item_monthwhererecorddate>=to_date(:Start_time,’yyyy-MM-dd’)andrecorddate<to_date(:End_time,’yyyy-MM-dd’)andbuildidin{0}orderbyrecorddateasc”””.format(lst)#逐月数据sql_d_energy_month=”””selectd.*,t.namefromd_energy_monthdjoint_device_infotond.branchid=t.idwhered.recorddate>=to_date(:Start_time,’yyyy-MM-dd’)andd.recorddate<to_date(:End_time,’yyyy-MM-dd’)andd.buildid='{0}’orderbyd.recorddateasc”””.format(lst)#查询当日数据sql_energy_item_hour_cheak=”””select*fromd_energy_item_hourwheretrunc(sysdate)=trunc(recorddate)orderbyrecorddateasc”””.format(lst)sql_collection=[sql_d_energy_item_month,sql_d_energy_item_day,sql_d_energy_item_hour,sql_d_energy_month,sql_d_energy_day,sql_d_energy_hour,sql_energy_hour_cheak]#此处省略部分sql语句returnsql_collection

3)业务数据处理

业务数据处理流程,原始数据后处理,这里不作介绍:

defdb_extranction(lst,sparm,sql_type):”””sql_type–输入需要操作的sql业务序号”””sql_=sql_d(lst)[sql_type]#输出sql语句db=Oracle_DB()#实例化一个对象res=db.search_one(sql_,sparm)#数据处理加工RES=Data_item_factory(res)#此处省略#res=db.search_all(sql_d_energy_item_month)print(RES)returnRES

多线程提取数据部分,这里 tasklist 列表多线程提取数据

importthreading#Pandas读写操作Oracle数据库fromtools.Data_Update_oracleimportOracle_DBimportpandasaspdfromconcurrentimportfuturesif__name__==’__main__’:#外部传入tasklist=read_task_list()print(tasklist)#输入时间查找范围参数,可手动修改sparm={‘Start_time’:’2021-04-01′,’End_time’:’2021-05-01′}lst=tuple(list(tasklist))#业务类型序号,可手动修改sql_type=0#全部提取db_extranction(lst,sparm,sql_type)#多线程按字段分批提取方法一:使用threading模块的Thread类的构造器创建线程#threads=[threading.Thread(target=db_extranction,args=(lst,sparm,sql_type))forlstintasklist]#[threads[i].start()foriinrange(len(threads))]方法二:使用python的concurrent库,这是官方基于 threading 封装,先安装该库#withfutures.ThreadPoolExecutor(len(tasklist))asexecutor:#executor.map([db_extranction(lst,sparm,sql_type)forlstintasklist],tasklist)

到此整个数据库取数工具开发流程介绍完毕,就差最后一步分享给小伙伴使用了,做成 GUI 应用此处不做详细介绍,构建独立的 python 环境,快速发布你的应用

发表评论

登录后才能评论