python多进程读取文件

系统 1784 0

文件夹中文件数较多,每份文件较大的情况下,可以采用多进程读取文件
最后附完整项目代码

            
              
                #单进程读取文件夹中的单份文件
              
              
                def
              
              
                read_data
              
              
                (
              
              path
              
                )
              
              
                :
              
              
    start 
              
                =
              
               time
              
                .
              
              time
              
                (
              
              
                )
              
              
                with
              
              
                open
              
              
                (
              
              path
              
                ,
              
              
                'rb'
              
              
                )
              
              
                as
              
               f
              
                :
              
              
        filename 
              
                =
              
               pickle
              
                .
              
              load
              
                (
              
              f
              
                )
              
              
    end 
              
                =
              
               time
              
                .
              
              time
              
                (
              
              
                )
              
              
                print
              
              
                (
              
              
                'Task runs %0.2f seconds.'
              
              
                %
              
              
                (
              
              
                (
              
              end 
              
                -
              
               start
              
                )
              
              
                )
              
              
                )
              
              
                return
              
               filename


              
                #向数据库插入数据
              
              
                def
              
              
                insert_data
              
              
                (
              
              db_connect
              
                ,
              
               result
              
                ,
              
               table
              
                )
              
              
                :
              
              
    cursor 
              
                =
              
               db_connect
              
                .
              
              cursor
              
                (
              
              
                )
              
              
                #转换数据格式,插入数据库
              
              
    static_result_df1 
              
                =
              
               np
              
                .
              
              array
              
                (
              
              result
              
                )
              
              
                .
              
              tolist
              
                (
              
              
                )
              
              
    static_result_df2 
              
                =
              
              
                list
              
              
                (
              
              
                map
              
              
                (
              
              
                tuple
              
              
                ,
              
               static_result_df1
              
                )
              
              
                )
              
              

    sql_truncate 
              
                =
              
              
                "truncate {};"
              
              
                .
              
              
                format
              
              
                (
              
              table
              
                )
              
              
    sql_insert 
              
                =
              
              
                '''
    insert into {}
        (columns_name
    ) values 
    (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
    '''
              
              
                .
              
              
                format
              
              
                (
              
              table
              
                )
              
              
                try
              
              
                :
              
              
                # 执行sql语句
              
              
        cursor
              
                .
              
              execute
              
                (
              
              sql_truncate
              
                )
              
              
        cursor
              
                .
              
              executemany
              
                (
              
              sql_insert
              
                ,
              
               static_result_df2
              
                )
              
              
                # 执行sql语句
              
              
        cursor
              
                .
              
              commit
              
                (
              
              
                )
              
              
                print
              
              
                (
              
              
                "Done Task!"
              
              
                )
              
              
                except
              
              
                :
              
              
                # 发生错误时回滚
              
              
        cursor
              
                .
              
              rollback
              
                (
              
              
                )
              
              
    cursor
              
                .
              
              close
              
                (
              
              
                )
              
              
                if
              
               __name__
              
                ==
              
              
                '__main__'
              
              
                :
              
              
                #开启进程,与逻辑核保持一致
              
              
    connect_db 
              
                =
              
               connect_db
              
                (
              
              
                )
              
              
    filepath 
              
                =
              
               r
              
                'D:\filename'
              
              
    table 
              
                =
              
              
                'table_name'
              
              

    t1 
              
                =
              
               time
              
                .
              
              time
              
                (
              
              
                )
              
              
    pro_num 
              
                =
              
              
                10
              
              
                #进程数
              
              
    pool 
              
                =
              
               Pool
              
                (
              
              processes 
              
                =
              
               pro_num
              
                )
              
              
    job_result 
              
                =
              
              
                [
              
              
                ]
              
              
                #遍历文件夹读取所有文件
              
              
                for
              
              
                file
              
              
                in
              
               os
              
                .
              
              listdir
              
                (
              
              filepath
              
                )
              
              
                :
              
              
        filename 
              
                =
              
               filepath 
              
                +
              
              
                '\\'
              
              
                +
              
              
                file
              
              
        res 
              
                =
              
               pool
              
                .
              
              apply_async
              
                (
              
              read_data
              
                ,
              
              
                (
              
              filename
              
                ,
              
              
                )
              
              
                )
              
              
        job_result
              
                .
              
              append
              
                (
              
              res
              
                )
              
              

    pool
              
                .
              
              close
              
                (
              
              
                )
              
              
                #关闭进程池
              
              
    pool
              
                .
              
              join
              
                (
              
              
                )
              
              
                #合并所有读取的文件
              
              
    get_result 
              
                =
              
               pd
              
                .
              
              DataFrame
              
                (
              
              
                )
              
              
                for
              
               tmp 
              
                in
              
               job_result
              
                :
              
              
        get_result 
              
                =
              
               get_result
              
                .
              
              append
              
                (
              
              tmp
              
                .
              
              get
              
                (
              
              
                )
              
              
                )
              
              
    t2 
              
                =
              
               time
              
                .
              
              time
              
                (
              
              
                )
              
              

    insert_data
              
                (
              
              connect_db
              
                ,
              
               get_result
              
                ,
              
               table
              
                )
              
              
                print
              
              
                (
              
              
                'It took a total of %0.2f seconds.'
              
              
                %
              
              
                (
              
              t2 
              
                -
              
               t1
              
                )
              
              
                )
              
            
          

完整项目代码链接:https://github.com/AlisaAlbert/TransferData/blob/master/InsertData.py


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论