表数据超过百万级别时使用pandas读取数据速度过慢,如果仍然想用pandas读取,可以通过多进程提高效率。同时可以将常用数据保存为pkl文件,以便后续使用。
@主要代码实现
#按照表中的某字段将表划分为比较均匀的多个子集
#本例中需要读取的表中包含了城市字段,
#且涉及的城市包含了全国大部分城市,数据分布较为均匀,因此制作了一张省份城市配置表,将数据划分
#读取省份-城市配置表,获取城市列表
def
get_division_list
(
db_connect
,
division_table
)
:
sql
=
'select * from {};'
.
format
(
division_table
)
data
=
pd
.
read_sql
(
sql
,
con
=
db_connect
)
#省份列表
province_list
=
data
[
'province'
]
.
unique
(
)
.
tolist
(
)
#获取城市列表
city_list
=
[
]
for
code
in
province_list
:
city
=
data
[
data
[
'province'
]
.
str
.
contains
(
code
)
]
[
'city_code'
]
.
unique
(
)
.
tolist
(
)
city_list
.
append
(
city
)
#此处返回的城市列表demo为[['南京','苏州','扬州'],['深圳','广州']]
return
city_list
#单进程读取目标表
def
read_data
(
db_connect
,
target_table
,
code
)
:
start
=
time
.
time
(
)
sql
=
'select * from {0} where city_code in ({1});'
.
format
(
target_table
,
"'"
+
"','"
.
join
(
code
)
+
"'"
)
data_df
=
pd
.
read_sql
(
sql
,
con
=
db_connect
)
print
(
'数据读入成功!'
)
end
=
time
.
time
(
)
print
(
'Task runs %0.2f seconds.'
%
(
(
end
-
start
)
)
)
return
data_df
if
__name__
==
"__main__"
:
# 开启的进程数, 与逻辑核保持一致即可,普通台式机建议18,高性能工作站建议60
target_table
=
'table_name1'
division_table
=
'table_name2'
db_connect
=
connect_db
(
)
#数据库连接函数,详见最后所附完整项目代码
city_list
=
get_division_list
(
db_connect
,
division_table
)
proc_num
=
31
#进程数
pool
=
Pool
(
processes
=
proc_num
)
jop_result
=
[
]
for
code
in
city_list
:
# 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
res
=
pool
.
apply_async
(
read_data
,
(
db_connect
,
target_table
,
code
,
)
)
jop_result
.
append
(
res
)
pool
.
close
(
)
#关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
pool
.
join
(
)
#调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
#获取结果,本例中将读取的数据保存到本地的pkl文件中,以便后续使用
for
index
,
tmp
in
enumerate
(
jop_result
)
:
result_path
=
r
'path\result_'
+
str
(
index
)
+
'.pkl'
tmp_df
=
tmp
.
get
(
)
with
open
(
result_path
,
'wb'
)
as
f
:
pickle
.
dump
(
tmp_df
,
f
)
;
完整代码链接: https://github.com/AlisaAlbert/TransferData/blob/master/ReadData.py