python多进程读取mysql表

系统 1346 0

表数据超过百万级别时使用pandas读取数据速度过慢,如果仍然想用pandas读取,可以通过多进程提高效率。同时可以将常用数据保存为pkl文件,以便后续使用。
@主要代码实现

            
              
                #按照表中的某字段将表划分为比较均匀的多个子集
              
              
                #本例中需要读取的表中包含了城市字段,
              
              
                #且涉及的城市包含了全国大部分城市,数据分布较为均匀,因此制作了一张省份城市配置表,将数据划分
              
              
                #读取省份-城市配置表,获取城市列表
              
              
                def
              
              
                get_division_list
              
              
                (
              
              db_connect
              
                ,
              
               division_table
              
                )
              
              
                :
              
              
    sql 
              
                =
              
              
                'select * from {};'
              
              
                .
              
              
                format
              
              
                (
              
              division_table
              
                )
              
              
    data 
              
                =
              
               pd
              
                .
              
              read_sql
              
                (
              
              sql
              
                ,
              
               con 
              
                =
              
               db_connect
              
                )
              
              
                #省份列表
              
              
    province_list 
              
                =
              
               data
              
                [
              
              
                'province'
              
              
                ]
              
              
                .
              
              unique
              
                (
              
              
                )
              
              
                .
              
              tolist
              
                (
              
              
                )
              
              
                #获取城市列表
              
              
    city_list 
              
                =
              
              
                [
              
              
                ]
              
              
                for
              
               code 
              
                in
              
               province_list
              
                :
              
              
        city 
              
                =
              
               data
              
                [
              
              data
              
                [
              
              
                'province'
              
              
                ]
              
              
                .
              
              
                str
              
              
                .
              
              contains
              
                (
              
              code
              
                )
              
              
                ]
              
              
                [
              
              
                'city_code'
              
              
                ]
              
              
                .
              
              unique
              
                (
              
              
                )
              
              
                .
              
              tolist
              
                (
              
              
                )
              
              
        city_list
              
                .
              
              append
              
                (
              
              city 
              
                )
              
              
                #此处返回的城市列表demo为[['南京','苏州','扬州'],['深圳','广州']]
              
              
                return
              
               city_list
 
 
              
                #单进程读取目标表   
              
              
                def
              
              
                read_data
              
              
                (
              
              db_connect
              
                ,
              
               target_table
              
                ,
              
               code
              
                )
              
              
                :
              
              
    start 
              
                =
              
               time
              
                .
              
              time
              
                (
              
              
                )
              
              
    sql 
              
                =
              
              
                'select * from {0} where city_code in ({1});'
              
              
                .
              
              
                format
              
              
                (
              
              target_table
              
                ,
              
              
                "'"
              
              
                +
              
              
                "','"
              
              
                .
              
              join
              
                (
              
              code
              
                )
              
              
                +
              
              
                "'"
              
              
                )
              
              
    data_df 
              
                =
              
               pd
              
                .
              
              read_sql
              
                (
              
              sql
              
                ,
              
              con 
              
                =
              
               db_connect
              
                )
              
              
                print
              
              
                (
              
              
                '数据读入成功!'
              
              
                )
              
              
    end 
              
                =
              
               time
              
                .
              
              time
              
                (
              
              
                )
              
              
                print
              
              
                (
              
              
                'Task runs %0.2f seconds.'
              
              
                %
              
              
                (
              
              
                (
              
              end 
              
                -
              
               start
              
                )
              
              
                )
              
              
                )
              
              
                return
              
               data_df


              
                if
              
               __name__ 
              
                ==
              
              
                "__main__"
              
              
                :
              
              
                # 开启的进程数, 与逻辑核保持一致即可,普通台式机建议18,高性能工作站建议60
              
              
    target_table 
              
                =
              
              
                'table_name1'
              
              
    division_table 
              
                =
              
              
                'table_name2'
              
              
    db_connect
              
                =
              
               connect_db
              
                (
              
              
                )
              
              
                #数据库连接函数,详见最后所附完整项目代码
              
              
    city_list 
              
                =
              
               get_division_list
              
                (
              
              db_connect
              
                ,
              
               division_table
              
                )
              
              

    proc_num 
              
                =
              
              
                31
              
              
                #进程数
              
              
    pool 
              
                =
              
               Pool
              
                (
              
              processes 
              
                =
              
               proc_num
              
                )
              
              
    jop_result 
              
                =
              
              
                [
              
              
                ]
              
              
                for
              
               code 
              
                in
              
               city_list 
              
                :
              
              
                # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
              
              
        res 
              
                =
              
               pool
              
                .
              
              apply_async
              
                (
              
              read_data
              
                ,
              
              
                (
              
              db_connect
              
                ,
              
              target_table 
              
                ,
              
              code
              
                ,
              
              
                )
              
              
                )
              
              
        jop_result
              
                .
              
              append
              
                (
              
              res
              
                )
              
              

    pool
              
                .
              
              close
              
                (
              
              
                )
              
              
                #关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
              
              
    pool
              
                .
              
              join
              
                (
              
              
                )
              
              
                #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
              
              
                #获取结果,本例中将读取的数据保存到本地的pkl文件中,以便后续使用
              
              
                for
              
               index
              
                ,
              
              tmp 
              
                in
              
              
                enumerate
              
              
                (
              
              jop_result
              
                )
              
              
                :
              
              
        result_path 
              
                =
              
               r
              
                'path\result_'
              
              
                +
              
              
                str
              
              
                (
              
              index
              
                )
              
              
                +
              
              
                '.pkl'
              
              
        tmp_df 
              
                =
              
               tmp
              
                .
              
              get
              
                (
              
              
                )
              
              
                with
              
              
                open
              
              
                (
              
              result_path
              
                ,
              
              
                'wb'
              
              
                )
              
              
                as
              
               f
              
                :
              
              
            pickle
              
                .
              
              dump
              
                (
              
              tmp_df
              
                ,
              
               f
              
                )
              
              
                ;
              
            
          

完整代码链接: https://github.com/AlisaAlbert/TransferData/blob/master/ReadData.py


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论