python连接hive批量补充时间分区特征

系统 1521 0

1. python连接数据库

            
              from pyhive import hive

conn = hive.Connection(
        host= 'xxx xxx', 
        port= xxx, 
        auth='CUSTOM',
        username='your user name', 
        password='your password',
        database='default')
cursor = conn.cursor()

            
          

2. SQL代码封装

设计为可传参方式

            
              def get_sql(d):
    d = "'"+ d + "'"
    add_sql_horizon = """
    CREATE table tmp.horizon_feature as
    with 
    tmp_dlr_cstm as(
        select dealer_id,
        sum(
            case when DateDiff("""+d+""",created_time)<=30 and date_from not in (0,3) then 1
            else 0
            end
        ) as pcust_acc_1m_dlr_off
        from dcs.nt_tp_p_customers
        group by dealer_id
    )

    select   
        nvl(cast(d1.dealer_id as string),'') as dealer_id,
        nvl(cast(d1.dealer_code as string),'') as dealer_code,
        nvl(cast(d1.company_id as string),'') as company_id,
        nvl(cast(tmp_dlr_cstm.pcust_acc_1m_dlr_off as string),'') as pcust_acc_1m_dlr_off
    from dcs.nt_BB_DEALERS d1
    left join tmp_dlr_cstm on d1.dealer_id = tmp_dlr_cstm.dealer_id
    where d1.biz_status=10310005"""         

    add_sql_verticle ="""
    insert into table tmp.dwb_dealer_feature_dd
    select dealer_id,dealer_code,company_id,t.feat_name,t.feat_value,"""+d+""" as prod_time
    from tmp.horizon_feature
    lateral view explode(
        map(
        'pcust_acc_1m_dlr_off', pcust_acc_1m_dlr_off
    )
    )t as feat_name, feat_value
    """
    return add_sql_horizon,add_sql_verticle

            
          

以上操作完成了新建一个横表,然后转化为纵表的操作。

3. 获取日期参数

            
              import datetime

def getEveryDay(begin_date,end_date):
    date_list = []
    begin_date = datetime.datetime.strptime(begin_date, "%Y-%m-%d")
    end_date = datetime.datetime.strptime(end_date,"%Y-%m-%d")
    while begin_date <= end_date:
        date_str = begin_date.strftime("%Y-%m-%d")
        date_list.append(date_str)
        begin_date += datetime.timedelta(days=1)
    return date_list

            
          

4. 循环执行SQL脚本

            
              for d in getEveryDay('2017-01-01','2019-08-07'):
    cursor.execute("""drop table if exists tmp.horizon_feature""")
    conn.commit()
    horizon_sql,verticle_sql = get_sql(d)
    cursor.execute(horizon_sql)
    conn.commit()

    cursor = conn.cursor()
    cursor.execute(verticle_sql)
    conn.commit()
    cursor = conn.cursor()
    
cursor.close()

            
          

成功插入数据!
棒!

欢迎关注微信公众号“数据分析师手记”,一起进步!


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论