1. Managed Tables in Hive updates:
CREATE and UPDATE CSV incremental files into GCP with below data:-
-----------------------------------------------------------------
create batch_id_101.csv file:-
1,ram
2,john
3,kevin
4,david
5,paul
6,will
> gsutil cp batch_id_101.csv gs://bucketkt/raw/file/batch_id/
[upload file into GCP bucket]
create batch_id_102.csv file:-
3,kumar
4,dravid
7,jordan
8,trump
9,ivanka
10,obama
> gsutil cp batch_id_102.csv gs://bucketkt/raw/file/batch_id/
[upload file into GCP bucket]
TYPE-2 TABLE UPDATE:-
--------------------
SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.support.concurrency=true;
SET org.apache.hadoop.hive.ql.exec.tez=true;
SET hive.cli.print.current.db=true;
create database if not exists stage_db;
use stage_db;
create table if not exists hive_incremental_transactions (i int, j string) partitioned by (batch_id string) row format delimited fields terminated by ',';
//we should use hive call from shell script as this to pass batch_id value as 101 -> hive -f hive_query.hql -hiveconf batch_id=101
//For testing purpose we have hard coded below to 101
load data inpath 'gs://bucketkt//raw/file/batch_id/batch_id_101.csv' into table hive_incremental_transactions partition(batch_id='101');
--load data local inpath gs://bucketkt//raw/file/batch_id/batch_id_102.csv into table hive_incremental_transactions partition(batch_id='$(hiveconf:batch_id)');
select * from stage_db.hive_incremental_transactions order by i;
create database if not exists gold_db;
use gold_db;
//assume that i as surrogate key
create table if not exists hive_hist_transactions (i int, j string, eff_date date, exp_date date) clustered by (i) into 4 buckets stored as orc tblproperties ('transactional' = 'true');
--TYPE-2 UPDATE for eff_date and exp_date columns only based on column i, this does not update anything to the record actual columns
UPDATE gold_db.hive_hist_transactions set exp_date=CURRENT_DATE where i IN (SELECT a.i FROM gold_db.hive_hist_transactions a LEFT OUTER JOIN stage_db.hive_incremental_transactions b ON (a.i = b.i) where a.exp_date = TO_DATE(from_unixtime(unix_timestamp('31-12-9999','dd-MM-yyyy'))));
INSERT INTO gold_db.hive_hist_transactions SELECT i, j,CURRENT_DATE as eff_date, TO_DATE(from_unixtime(unix_timestamp('31-12-9999','dd-MM-yyyy'))) as exp_date from stage_db.hive_incremental_transactions where batch_id='101';
2. External Table in HIVE updates:-
-----------------------------------------------------------
//Insert STAGE Data to GOLD
INSERT INTO db_gold.gold_hct_terr SELECT ho007_terr_nbr as terr_nbr, ho007_terr_nm as terr_nm, ho007_terr_mgr_nm as terr_mgr_nm, FROM_UNIXTIME(UNIX_TIMESTAMP(current_timestamp),'yyyy-MM-dd\' \'HH:mm:ssz') as eff_dte, FROM_UNIXTIME(UNIX_TIMESTAMP('9999-12-31 12:59:59UTC'),'yyyy-MM-dd\' \'HH:mm:ssz') as expir_dte, file_load_timestamp as fil_lod_tmst, '${batch_id}' as btch_id from db_stage.raw_ho_rms_territory where batch_id='${batch_id}';
//Incremental Logic on GOLD table
INSERT OVERWRITE TABLE db_gold.gold_hct_terr select terr_nbr, terr_nm, terr_mgr_nm, eff_dte, (CASE WHEN (row_num != 1 AND expir_dte=FROM_UNIXTIME(UNIX_TIMESTAMP('9999-12-31 12:59:59UTC'))) THEN current_timestamp() ELSE expir_dte END) AS expir_dte,
file_load_timestamp as fil_lod_tmst, '${batch_id}' as btch_id from ( select *, row_number() over (partition by terr_nbr order by eff_dte desc) as row_num from db_gold.gold_hct_terr) t;