AWS Data Engineer over S3 optimisation technique

AWS with spark (Pyspark) – Data extraction and delta load optimisation

Spark-Hive Data extraction optimisation

There are two option to extract data from oracle into HDFS
If you don’t have driver jar for source DB connectivity. Need to download.
a. Sqoop Data Extraction
In parquet we can extract the full data but for incremental (upsert) data load in parquet it will not work file format does support.
command:
sqoop import -m 5 –connect “jdbc:oracle:thin:@<<server_name>>:<Port_number>:<service_name>” –username <username> –password <pwd> –query “select * from <tble_name> WHERE ROW_WID >= cond1 AND ROW_WID < cond2 AND \$CONDITIONS” -delete-target-dir –target-dir “hdfs_table_location” –compression-codec=snappy –as-parquetfile –split-by ROW_WID –boundary-query “select cond1,cond2 from dual” –verbose

b. Spark Data Extraction
Spark is best to extract data from oracle or different using jdbc connection.
command:
query=”(select * from table_name)” # for other database may query format change
Spark Command
df_target = spark.read.format(“jdbc”).option(“url”,”jdbc:oracle:thin:@<<server_name>>:<Port_number>:<service_name>”).option(“dbtable”, query).option(“<user>”, ).option(“<password>”, ).option(“driver”, “oracle.jdbc.driver.OracleDriver”).option(“fetchsize”,10000) # driver will change for different DB
#fetchsize: The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). This option applies only to reading.
This Command with work over single mapper (cores) and it may take long time to complete
Optimisation:
divide data into ‘n’ factor using mod command in oracle similar in to different DB.
Based on factored data extract data in parallel.
command:
query=”(select MOD(a.row_id,”+n+”) PARTITION_key,A.* from table_name A)”
If you don’t have row_id in integer you can take different column with different data type as well
query=”(select MOD(ascii(a.row_id),”+n+”) PARTITION_key,A.* from table_name A)”
It will take ascii value then it will take mod of the value wit factor n
Spark Command
df_target = spark.read.format(“jdbc”).option(“url”,”jdbc:oracle:thin:@<<server_name>>:<Port_number>:<service_name>”).option(“dbtable”, query).option(“<user>”, ).option(“<password>”, ).option(“driver”, “oracle.jdbc.driver.OracleDriver”).option(“partitionColumn”, “PARTITION_KEY”).option(“lowerBound”,0).option(“numPartitions”,n).option(“upperBound”,n).option(“fetchsize”,10000).load()
Once the data comes to dataFrame drop the extra column
df_target=df_target.dropColumn(“partition_key”)

reference: https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

Delta data merge into full data in upsert case

In case of data merge try to create hive partition even for spark because no partition then it will go for full load. considering my scenario
Use Case -1 : I had a table of size 150 Gb without fixed date column. Hence I didn’t create any partition initially. Hence it was taking almost 40 min complete every time even I had to do single record change.
What would be solution, off course hive partition. But, how to create partition if your data has nothing to create partition only i had row_id and it was getting incrementally added for new record and no change in row_id for update record and the changes was mostly recent change.

solution:
choose Factor. Let say Factor=10 for a table
take a quotient value since I want incremental partition.
Consider table has below data:

Table_data

After performing logic as:
select *,FLOOR(Row_id/10,1) partition_col from table_name ;

modified data

Hence based on primary key i created partition to load data into table in incremental order. For optimisation always try to create hive partition for big table in spark or hive both.
Once the incremental data comes apply same query and do insert overwrite to the table
>> df_delta=df_target.registerTempTable(“delta_tbl”)
>> df_delta=spark.sql(“select A.*,floor(row_id/key) partition_col from delta_tbl”)
Since, delta data is small it won’t take much time. Hence take distinct value of partition_col of delta table. and extract only that particular partition only from the base table (full data table).
>> dis_rowid=df_delta.select(“partition_col”).distinct().collect()
>> list_rowid=[str(row_id[“partition_col”]) for row_id in dis_rowid]
>> filter_val=”,”.join(list_rowid)
>> df_main=spark.sql(“select * from main_tbl where row_id in (“+filter_val+”)”)
Hence load data into main table would be very less or required only.
after here we can do leftanti and union with main table and do insert overwrite to main table.
>> df_rest=df_main.join(broadcast(df_delta),df_main_alias[join_column]==df_stg_alias[join_column],”leftanti”)
>> df_final=df_delta.unionByName(df_rest) #finally we got the required data frame
df_final.registerTempTable(“tbl_main”)
>> spark.sql(“insert overwrite table <table_name> select * from tbl_main”) #it should be optimised.
Result: Hence we achieved the optimised way to load data into table.
Because of the above logic i was able to complete in 5-10 min
Advantages:
it will take very less time in AWS EMR (EBS), coludera other HDFS

Disadvantage for S3:
s3 is very bad in renaming the files (Since it object storage it will create new object for renamed file). Once the process will get complete, It start doing the renaming the files at the table location. Hence overall performance will degraded.

how does spark insert overwrite work in table

It would create temporary folder at table location and after the process complete it will move files from spark temp location to main table folder location as in fig 1.1 and 1.2

fig 1.2

because of renaming part over s3, S3 will take much time to do renaming At the end of process. Hence it will take too much time to process the data (master node will start doing the renaming part). Based on data size and number of files to rename it will take the time.
Hence if you have to do update more frequent over the table in result s3 will be slower.

What do you think, what should we do to use s3 even for update?

First create backup location with same data. let say (backup_db as database) and create table within the data. once the process is getting completed copy data over backup table. (using s3-dist-cp would not take more time in copy the data into backup table)
There is 5 steps for loading data
steps:
1. read data from backup (df_backup)
2. read data from delta table (df_delta)
3. create df_rest by using left_anti join (df_backup –> left table, df_delta –> right table)
>> df_rest=df_backup.join(df_delta,’leftanti’)
4. do union by name to create final_df
>> final_df=df_delta.unionByName(df_rest)
5. Load into main table as below.

There are two scenario.
1. Table without partition
>> final_df.write.parquet(<<table_location>>,’overwrite’)
2. With partition table
first try to calculate the new partition came into new data (delta table).
delete the partition that exists into main table
1. df_delta=df_delta.registerTempTable(“delta_tbl”)
df_delta=spark.sql(“select A.*,<<partition_logic>> partition_col from delta_tbl”)
2. dis_rowid=df_delta.select(“partition_col”).distinct().collect()
list_rowid=[str(row_id[“partition_col”]) for row_id in dis_rowid]
filter_val=”,”.join(list_rowid)
3. rm_list=[]
for path in filter_val:
rm_list.append(‘s3://<<bucket-name>>/<<db>>/<<table_name>>/<<partition_col_name>>=’+path)
rm_list_str=’ ‘.join(rm_list)
os.system(‘hadoop fs -rmr ‘+rm_list_str)
This can be done with help of multiprocessing that can do even faster if you have multiple partitions ( divide the list into sub-list and process it using fig 2 parallel )
4. Final write as below
>> final_df.write.partitionBy(<<partition-key>>).parquet(table_location,’append’)
Some times data frame writting also create problem while reading because of multiple files in one bucket (Hence create one file in one partition using below)
>> final_df.repartition(<<partition-key>>).write.partitionBy(<<partition-key>>).parquet(table_location,’append’) #recommended
>> spark.sql(‘msck repair table ‘+<<table_name>>) #repair the partition

fig. 2

This is from my side as per my experience. in case of any issue you can mail/contact me.
Thanks…!!!!


Abhishek Chaurasiya (Data Engineer in TTL)

You have any issue don’t hesitate to contact me over below contact details
Contacts
email: abhishek_chaurasiya@yahoo.com
linkedIn: https://www.linkedin.com/in/abhishek-chaurasiya-98ba9b148

Design a site like this with WordPress.com
Get started