注意:pig中用run或者exec 运行脚本。除了cd和ls,其他命令不用。在本代码中用rm和mv命令做例子,容易出错。
另外,pig只有在store或dump时候才会真正加载数据,否则,只是加载代码,不具体操作数据。所以在rm操作时必须注意该文件是否已经生成。如果rm的文件为生成,可以第三文件,进行mv改名操作
SET job.name 'test_age_reporth_istorical';-- 定义任务名字,在http://172.XX.XX.XX:50030/jobtracker.jsp中查看任务状态,失败成功。
SET job.priority HIGH;--优先级
--注册jar包,用于读取sequence file和输出分析结果文件 REGISTER piggybank.jar; DEFINE SequenceFileLoader org.apache.pig.piggybank.storage.SequenceFileLoader(); --读取二进制文件,函数名定义 %default Cleaned_Log /user/C/data/XXX/cleaned/$date/*/part* --$date是外部传入参数 %default AD_Data /user/XXX/data/xxx/metadata/ad/part* %default Campaign_Data /user/xxx/data/xxx/metadata/campaign/part* %default Social_Data /user/xxx/data/report/socialdata/part* --所有的输出文件路径: %default Industry_Path $file_path/report/historical/age/$year/industry %default Industry_SUM $file_path/report/historical/age/$year/industry_sum %default Industry_TMP $file_path/report/historical/age/$year/industry_tmp %default Industry_Brand_Path $file_path/report/historical/age/$year/industry_brand %default Industry_Brand_SUM $file_path/report/historical/age/$year/industry_brand_sum %default Industry_Brand_TMP $file_path/report/historical/age/$year/industry_brand_tmp %default ALL_Path $file_path/report/historical/age/$year/all %default ALL_SUM $file_path/report/historical/age/$year/all_sum %default ALL_TMP $file_path/report/historical/age/$year/all_tmp %default output_path /user/xxx/tmp/result origin_cleaned_data = LOAD '$Cleaned_Log' USING PigStorage(',') --读取日志文件 AS (ad_network_id:chararray, xxx_ad_id:chararray, guid:chararray, id:chararray, create_time:chararray, action_time:chararray, log_type:chararray, ad_id:chararray, positioning_method:chararray, location_accuracy:chararray, lat:chararray, lon:chararray, cell_id:chararray, lac:chararray, mcc:chararray, mnc:chararray, ip:chararray, connection_type:chararray, android_id:chararray, android_advertising_id:chararray, openudid:chararray, mac_address:chararray, uid:chararray, density:chararray, screen_height:chararray, screen_width:chararray, user_agent:chararray, app_id:chararray, app_category_id:chararray, device_model_id:chararray, carrier_id:chararray, os_id:chararray, device_type:chararray, os_version:chararray, country_region_id:chararray, province_region_id:chararray, city_region_id:chararray, ip_lat:chararray, ip_lon:chararray, quadkey:chararray); --loading metadata/ad(adId,campaignId) metadata_ad = LOAD '$AD_Data' USING PigStorage(',') AS (adId:chararray, campaignId:chararray); --loading metadata/campaignæ•°æ®(campaignId, industryId, brandId) metadata_campaign = LOAD '$Campaign_Data' USING PigStorage(',') AS (campaignId:chararray, industryId:chararray, brandId:chararray); --ad and campaign for inner join joinAdCampaignByCampaignId = JOIN metadata_ad BY campaignId,metadata_campaign BY campaignId;--(adId,campaignId,campaignId,industryId,brandId) --filtering out redundant column of joinAdCampaignByCampaignId joined_ad_campaign_data = FOREACH joinAdCampaignByCampaignId GENERATE $0 AS adId,$3 AS industryId,$4 AS brandId; --(adId,industryId,brandId) --extract column for analyzing origin_historical_age = FOREACH origin_cleaned_data GENERATE xxx_ad_id,guid,log_type;--(xxx_ad_id,guid,log_type) --distinct distinct_origin_historical_age = DISTINCT origin_historical_age;--(xxx_ad_id,guid,log_type) --loading metadata_region(guid_social, sex, age, income, edu, hobby) metadata_social = LOAD '$Social_Data' USING PigStorage(',') AS (guid_social:chararray, sex:chararray, age:chararray, income:chararray, edu:chararray, hobby:chararray); --extract needed column in metadata_social social_age = FOREACH metadata_social GENERATE guid_social,age; --join socialData(metadata_social) and logData(distinct_origin_historical_age): joinedByGUID = JOIN social_age BY guid_social, distinct_origin_historical_age BY guid; --(guid_social, age; xxx_ad_id,guid,log_type) --generating analyzing age data joined_orgin_age_data = FOREACH joinedByGUID GENERATE xxx_ad_id,guid,log_type,age; joinedByAdId = JOIN joined_ad_campaign_data BY adId, joined_orgin_age_data BY xxx_ad_id; --(adId,industryId,brandId,xxx_ad_id,guid,log_type,age) --filtering all_current_data = FOREACH joinedByAdId GENERATE guid,log_type,industryId,brandId,age; --(guid,log_type,industryId,brandId,age) --for industry analyzing industry_current_data = FOREACH all_current_data GENERATE industryId,guid,age,log_type; --(industryId,guid,age,log_type) --load all in the path "industry" industry_existed_Data = LOAD '$Industry_Path' USING PigStorage(',') AS (industryId:chararray,guid:chararray,age:chararray,log_type:chararray); --merge with history data union_Industry = UNION industry_existed_Data, industry_current_data; distict_union_industry = DISTINCT union_Industry; group_industry = GROUP distict_union_industry BY ($2,$0,$3); count_guid_for_industry = FOREACH group_industry GENERATE FLATTEN(group),COUNT($1.$1); rm $Industry_SUM; STORE count_guid_for_industry INTO '$Industry_SUM' USING PigStorage(','); --storing union industry data(current and history) STORE distict_union_industry INTO '$Industry_TMP' USING PigStorage(','); rm $Industry_Path mv $Industry_TMP $Industry_Path --counting guid for industry and brand industry_brand_current = FOREACH all_current_data GENERATE age,industryId,brandId,log_type,guid; --(age,industryId,brandId,log_type,guid) --load history data of industry_brand industry_brand_history = LOAD '$Industry_Brand_Path' USING PigStorage(',') AS(age:chararray, industryId:chararray, brandId:chararray, log_type:chararray, guid:chararray); --union all data of industry_brand union_industry_brand = UNION industry_brand_current,industry_brand_history; unique_industry_brand = DISTINCT union_industry_brand; --(age,industryId,brandId,log_type,guid) --counting users' number for industry and brand group_industry_brand = GROUP unique_industry_brand BY ($0,$1,$2,$3); count_guid_for_industry_brand = FOREACH group_industry_brand GENERATE FLATTEN(group),COUNT($1.$4); rm $Industry_Brand_SUM; STORE count_guid_for_industry_brand INTO '$Industry_Brand_SUM' USING PigStorage(','); STORE unique_industry_brand INTO '$Industry_Brand_TMP' USING PigStorage(','); rm $Industry_Brand_Path; mv $Industry_Brand_TMP $Industry_Brand_Path --counting user number for age and logtype current_data = FOREACH all_current_data GENERATE age,log_type,guid;--(age,log_type,guid) --load history data of age and logtype history_data = LOAD '$ALL_Path' USING PigStorage(',') AS(age:chararray,log_type:chararray,guid:chararray); --union current and history data union_all_data = UNION history_data, current_data; unique_all_data = DISTINCT union_all_data; --count users' number group_all_data = GROUP unique_all_data BY ($0,$1); count_guid_for_age_logtype = FOREACH group_all_data GENERATE FLATTEN(group),COUNT($1.$2); rm $ALL_SUM; STORE count_guid_for_age_logtype INTO '$ALL_SUM' USING PigStorage(','); STORE unique_all_data INTO '$ALL_TMP' USING PigStorage(','); rm $ALL_Path mv $ALL_TMP $ALL_Path