# -*- coding=utf-8 -*-import warningsimport datetimewarnings.filterwarnings("ignore")def getNowDay(): DayNow = datetime.datetime.today().strftime('%Y-%m-%d') return DayNowdef getFristDay(): FristDay=datetime.datetime.strptime('2015-06-29', '%Y-%m-%d').strftime('%Y-%m-%d') return FristDaydef getDayAllList(run_day): DayAllList=[] begin_date = datetime.datetime.strptime(getFristDay(), "%Y-%m-%d") if run_day != '': end_date = datetime.datetime.strptime(run_day, '%Y-%m-%d') else: end_date = datetime.datetime.strptime(getNowDay(), "%Y-%m-%d") while begin_date <= end_date: date_str = begin_date.strftime("%Y-%m-%d") DayAllList.append(date_str) begin_date += datetime.timedelta(days=1) return DayAllListdef getDayRunList(run_day): DayRunList=[] if run_day != '': end_date = datetime.datetime.strptime(run_day, '%Y-%m-%d') else: end_date = datetime.datetime.strptime(getNowDay(), "%Y-%m-%d") if end_date - datetime.timedelta(days=60) <= datetime.datetime.strptime(getFristDay(), "%Y-%m-%d"): begin_date = datetime.datetime.strptime(getFristDay(), "%Y-%m-%d") else: begin_date = end_date - datetime.timedelta(days=60) while begin_date <= end_date: date_str = begin_date.strftime("%Y-%m-%d") DayRunList.append(date_str) begin_date += datetime.timedelta(days=1) return DayRunListdef getDayRemainRunIntervalList(str_begin_date, str_end_date): DayList=[] begin_date = datetime.datetime.strptime(str_begin_date, "%Y-%m-%d") end_date = datetime.datetime.strptime(str_end_date, "%Y-%m-%d") while begin_date <= end_date: date_str = begin_date.strftime("%Y-%m-%d") DayList.append(date_str) begin_date += datetime.timedelta(days=1) return DayList# Batch Test# run_day = '2016-07-10'# for remain_day in getDayAllList(run_day):# for his_day in getDayRunList(run_day=remain_day):# print remain_day, his_day# one day# for his_day in getDayRunList(run_day=run_day):# print run_day, his_day# run_day = '2017-01-01'# for remain_day in getDayRunList(run_day):# print remain_day,run_day # for his_day in getDayRunList(run_day=remain_day): # print remain_day, his_day# 计算2017年的数据信息# str_begin_date = '2017-01-01'# str_end_date = '2017-02-05'# for remain_day in getDayRemainRunIntervalList(str_begin_date, str_end_date):# for his_day in getDayRunList(run_day=remain_day):# print remain_day,his_day2、留存等指标的计算/Users/nisj/PycharmProjects/EsDataProc/RemainProcByDay/Hive_remain_byDay_proc.py# -*- coding=utf-8 -*-from DayProc import *import os,re,timewarnings.filterwarnings("ignore")def identifier_remain_byday(remain_day, his_day): os.system("""/usr/bin/MySQL -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass -e "use funnyai_data; / delete from bi_identifier_remain_byday where his_day='%s' and remain_day='%s'; / " """ % (his_day, remain_day)) identifier_remain_data = os.popen("""source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / with tab_new_identifier_byDay as ( / select appsource,appkey,identifier from bi_all_new_identifier_log / where pt_day = '%s' / ), / tab_access_log_byDay as ( / select identifier from bi_all_access_log / where pt_day = '%s' / group by identifier) / select '%s' his_day,'%s' remain_day, / a1.appsource,a1.appkey,count(a1.identifier) remain_identifier_cnt / from tab_new_identifier_byDay a1 / inner join tab_access_log_byDay a2 on a1.identifier=a2.identifier / group by a1.appsource,a1.appkey;" / """ % (his_day, remain_day, his_day, remain_day)).readlines(); ird_list = [] for ir_list in identifier_remain_data: ir = re.split('/t', ir_list.replace('/n', '')) ird_list.append(ir) for ird in ird_list: his_day = ird[0] remain_day = ird[1] appsource = ird[2] appkey = ird[3] remain_identifier_cnt = ird[4] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) os.system("""/usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass -e "use funnyai_data; / insert into bi_identifier_remain_byday(his_day, remain_day, appsource, appkey, remain_identifier_cnt, etl_time) / select '%s','%s','%s','%s','%s','%s'; / " """ % (his_day, remain_day, appsource, appkey, remain_identifier_cnt, etl_time))def user_registert_transf_byday(remain_day, his_day): os.system("""/usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass -e "use funnyai_data; / delete from bi_user_registert_transf_byday where his_day='%s' and remain_day='%s'; / " """ % (his_day, remain_day)) user_registert_transf_data = os.popen("""source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / select '%s' his_day,'%s' remain_day, / appsource,appkey,count(*) new_ide_reg_cnt / from bi_all_register_info / where (iden_day = '%s') and (pt_day = '%s') / group by appsource,appkey;" / """ % (his_day, remain_day, his_day, remain_day)).readlines(); urtd_list = [] for urt_list in user_registert_transf_data: urt = re.split('/t', urt_list.replace('/n', '')) urtd_list.append(urt) for urtd in urtd_list: his_day = urtd[0] remain_day = urtd[1] appsource = urtd[2] appkey = urtd[3] new_ide_reg_cnt = urtd[4] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) os.system("""/usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass -e "use funnyai_data; / insert into bi_user_registert_transf_byday(his_day, remain_day, appsource, appkey, new_ide_reg_cnt, etl_time) / select '%s','%s','%s','%s','%s','%s'; / " """ % (his_day, remain_day, appsource, appkey, new_ide_reg_cnt, etl_time))def user_remain_pay_byday(remain_day, his_day): os.system("""/usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass -e "use funnyai_data; / delete from bi_user_remain_pay_byday where his_day='%s' and remain_day='%s'; / " """ % (his_day, remain_day)) user_remain_pay_data = os.popen("""source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / with tab_user_register_info as( / select uid,appsource,appkey from bi_all_register_info / where (iden_day = '%s') / ), / tab_user_pay_info as ( / select uid,sum(amount) pay_amount,count(*) pay_cnt from data_chushou_pay_info / where state=0 / and (pt_day = '%s') / group by uid / ) / select '%s' his_day,'%s' remain_day, / a1.appsource,a1.appkey,sum(pay_amount) pay_amount,sum(pay_cnt) pay_cnt,count(a1.uid) pay_uid_cnt / from tab_user_register_info a1 / inner join tab_user_pay_info a2 on a1.uid=a2.uid / group by a1.appsource,a1.appkey;" / """ % (his_day, remain_day, his_day, remain_day)).readlines(); urpd_list = [] for urp_list in user_remain_pay_data: urp = re.split('/t', urp_list.replace('/n', '')) urpd_list.append(urp) for urpd in urpd_list: his_day = urpd[0] remain_day = urpd[1] appsource = urpd[2] appkey = urpd[3] pay_amount = urpd[4] pay_cnt = urpd[5] pay_uid_cnt = urpd[6] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) os.system("""/usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass -e "use funnyai_data; / insert into bi_user_remain_pay_byday(his_day, remain_day, appsource, appkey, pay_amount, pay_cnt, pay_uid_cnt, etl_time) / select '%s','%s','%s','%s','%s','%s','%s','%s'; / " """ % (his_day, remain_day, appsource, appkey, pay_amount, pay_cnt, pay_uid_cnt, etl_time))def registert_user_remain_byday(remain_day, his_day): os.system("""/usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass -e "use funnyai_data; / delete from bi_registert_user_remain_byday where his_day='%s' and remain_day='%s'; / " """ % (his_day, remain_day)) registert_user_remain_data = os.popen("""source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; / create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; / with tab_user_register_info as( / select uid,appsource,appkey from bi_all_register_info / where (iden_day = '%s') / ), / tab_access_log_byDay as ( / select RadixChange(lower(uid),16,10) uid from bi_all_access_log / where pt_day = '%s' / group by RadixChange(lower(uid),16,10)) / select '%s' his_day,'%s' remain_day, / a1.appsource,a1.appkey,count(a2.uid) uid_remain_cnt / from tab_user_register_info a1 / inner join tab_access_log_byDay a2 on a1.uid=a2.uid / group by a1.appsource,a1.appkey;" / """ % (his_day, remain_day, his_day, remain_day)).readlines(); rurd_list = [] for rur_list in registert_user_remain_data: rur = re.split('/t', rur_list.replace('/n', '')) rurd_list.append(rur) for rurd in rurd_list: his_day = rurd[0] remain_day = rurd[1] appsource = rurd[2] appkey = rurd[3] uid_remain_cnt = rurd[4] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) os.system("""/usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass -e "use funnyai_data; / insert into bi_registert_user_remain_byday(his_day, remain_day, appsource, appkey, uid_remain_cnt, etl_time) / select '%s','%s','%s','%s','%s','%s'; / " """ % (his_day, remain_day, appsource, appkey, uid_remain_cnt, etl_time))# Batch Test# run_day = '2016-07-10'# for remain_day in getDayAllList(run_day):# for his_day in getDayRunList(run_day=remain_day):# print remain_day, his_day# for his_day in getDayRunList(run_day=run_day):# # print run_day, his_day# identifier_remain_byday(remain_day=run_day, his_day=his_day)附:指标Sql打印测试/Users/nisj/PycharmProjects/EsDataProc/RemainProcByDay/Hive_remain_byDay_proc_printSql.py# -*- coding=utf-8 -*-from DayProc import *warnings.filterwarnings("ignore")def identifier_remain_byday(remain_day, his_day): sql_text = """source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / with tab_new_identifier_byDay as ( / select appsource,appkey,identifier from bi_all_new_identifier_log / where pt_day = '%s' / ), / tab_access_log_byDay as ( / select identifier from bi_all_access_log / where pt_day = '%s' / group by identifier) / select '%s' his_day,'%s' remain_day, / a1.appsource,a1.appkey,count(a1.identifier) remain_identifier_cnt / from tab_new_identifier_byDay a1 / inner join tab_access_log_byDay a2 on a1.identifier=a2.identifier / group by a1.appsource,a1.appkey;" / """ % (his_day, remain_day, his_day, remain_day); print sql_textdef user_registert_transf_byday(remain_day, his_day): sql_text = """source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / select '%s' his_day,'%s' remain_day, / appsource,appkey,count(*) new_ide_reg_cnt / from bi_all_register_info / where (iden_day = '%s') and (pt_day = '%s') / group by appsource,appkey;" / """ % (his_day, remain_day, his_day, remain_day); print sql_textdef user_remain_pay_byday(remain_day, his_day): sql_text = """source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / with tab_user_register_info as( / select uid,appsource,appkey from bi_all_register_info / where (iden_day = '%s') / ), / tab_user_pay_info as ( / select uid,sum(amount) pay_amount,count(*) pay_cnt from data_chushou_pay_info / where state=0 / and (pt_day = '%s') / group by uid / ) / select '%s' his_day,'%s' remain_day, / a1.appsource,a1.appkey,sum(pay_amount) pay_amount,sum(pay_cnt) pay_cnt,count(a1.uid) pay_uid_cnt / from tab_user_register_info a1 / inner join tab_user_pay_info a2 on a1.uid=a2.uid / group by a1.appsource,a1.appkey;" / """ % (his_day, remain_day, his_day, remain_day); print sql_textdef registert_user_remain_byday(remain_day, his_day): sql_text = """source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; / create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; / with tab_user_register_info as( / select uid,appsource,appkey from bi_all_register_info / where (iden_day = '%s') / ), / tab_access_log_byDay as ( / select RadixChange(lower(uid),16,10) uid from bi_all_access_log / where pt_day = '%s' / group by RadixChange(lower(uid),16,10)) / select '%s' his_day,'%s' remain_day, / a1.appsource,a1.appkey,count(a2.uid) uid_remain_cnt / from tab_user_register_info a1 / inner join tab_access_log_byDay a2 on a1.uid=a2.uid / group by a1.appsource,a1.appkey;" / """ % (his_day, remain_day, his_day, remain_day); print sql_text# Batch Test# run_day = '2016-07-10'# for remain_day in getDayAllList(run_day):# for his_day in getDayRunList(run_day=remain_day):# print remain_day, his_day# for his_day in getDayRunList(run_day=run_day):# # print run_day, his_day# identifier_remain_byday(remain_day=run_day, his_day=his_day)3、多线程调度/Users/nisj/PycharmProjects/EsDataProc/RemainProcByDay/BatchThread.py# -*- coding=utf-8 -*-import threadpoolfrom Hive_remain_byDay_proc import *warnings.filterwarnings("ignore")today = datetime.date.today()yesterday = today - datetime.timedelta(days=1)tomorrow = today + datetime.timedelta(days=1)now_time = time.strftime('%Y-%m-%d %X', time.localtime())print "当前时间是:",now_time# run_day = '2017-02-03'# for remain_day in getDayAllList(run_day):# for his_day in getDayRunList(run_day=remain_day):# print remain_day, his_day# one day# for his_day in getDayRunList(run_day=run_day):# print run_day, his_day# batch_day_list = []# for his_day in getDayRunList(run_day=run_day):# batch_day_list.append(([run_day, his_day], None))# 计算2017年的数据信息str_begin_date = '2017-01-01'str_end_date = '2017-02-05'batch_day_list = []for remain_day in getDayRemainRunIntervalList(str_begin_date, str_end_date): for his_day in getDayRunList(run_day=remain_day): batch_day_list.append(([remain_day, his_day], None))requests = []request_identifier_remain_byday = threadpool.makeRequests(identifier_remain_byday, batch_day_list)request_user_registert_transf_byday = threadpool.makeRequests(user_registert_transf_byday, batch_day_list)request_user_remain_pay_byday = threadpool.makeRequests(user_remain_pay_byday, batch_day_list)request_registert_user_remain_byday = threadpool.makeRequests(registert_user_remain_byday, batch_day_list)requests.extend(request_identifier_remain_byday)requests.extend(request_user_registert_transf_byday)requests.extend(request_user_remain_pay_byday)requests.extend(request_registert_user_remain_byday)main_pool = threadpool.ThreadPool(16)[main_pool.putRequest(req) for req in requests]if __name__ == '__main__': while True: try: time.sleep(100) main_pool.poll() except KeyboardInterrupt: print("**** Interrupted!") break except threadpool.NoResultsPending: break if main_pool.dismissedWorkers: print("Joining all dismissed worker threads...") main_pool.joinAllDismissedWorkers()now_time = time.strftime('%Y-%m-%d %X', time.localtime())print "当前时间是:",now_time期间可能会用到[threadpool.py](如果threadpool模块没有安装),需要将对应代码文件拷到【/Users/nisj/PycharmProjects/EsDataProc/RemainProcByDay/threadpool.py】。
新闻热点
疑难解答