I'm trying to merge multiple hive tables using spark where some of the columns with the same name have different data types especially string and bigint.
My final table (hiveDF) should have schema like below-
+--------------------------+------------+----------+--+
| col_name | data_type | comment |
+--------------------------+------------+----------+--+
| announcementtype | bigint | |
| approvalstatus | string | |
| capitalrate | double | |
| cash | double | |
| cashinlieuprice | double | |
| costfactor | double | |
| createdby | string | |
| createddate | string | |
| currencycode | string | |
| declarationdate | string | |
| declarationtype | bigint | |
| divfeerate | double | |
| divonlyrate | double | |
| dividendtype | string | |
| dividendtypeid | bigint | |
| editedby | string | |
| editeddate | string | |
| exdate | string | |
| filerecordid | string | |
| frequency | string | |
| grossdivrate | double | |
| id | bigint | |
| indicatedannualdividend | string | |
| longtermrate | double | |
| netdivrate | double | |
| newname | string | |
| newsymbol | string | |
| note | string | |
| oldname | string | |
| oldsymbol | string | |
| paydate | string | |
| productid | bigint | |
| qualifiedratedollar | double | |
| qualifiedratepercent | double | |
| recorddate | string | |
| sharefactor | double | |
| shorttermrate | double | |
| specialdivrate | double | |
| splitfactor | double | |
| taxstatuscodeid | bigint | |
| lastmodifieddate | timestamp | |
| active_status | boolean | |
+--------------------------+------------+----------+--+
This final table (hiveDF) schema can be made with below JSON-
{
"id": -2147483647,
"productId": 150816,
"dividendTypeId": 2,
"dividendType": "Dividend/Capital Gain",
"payDate": null,
"exDate": "2009-03-25",
"oldSymbol": "ILAAX",
"newSymbol": "ILAAX",
"oldName": "",
"newName": "",
"grossDivRate": 0.115,
"shortTermRate": 0,
"longTermRate": 0,
"splitFactor": 0,
"shareFactor": 0,
"costFactor": 0,
"cashInLieuPrice": 0,
"cash": 0,
"note": "0",
"createdBy": "Yahoo",
"createdDate": "2009-08-03T06:44:19.677-05:00",
"editedBy": "Yahoo",
"editedDate": "2009-08-03T06:44:19.677-05:00",
"netDivRate": null,
"divFeeRate": null,
"specialDivRate": null,
"approvalStatus": null,
"capitalRate": null,
"qualifiedRateDollar": null,
"qualifiedRatePercent": null,
"declarationDate": null,
"declarationType": null,
"currencyCode": null,
"taxStatusCodeId": null,
"announcementType": null,
"frequency": null,
"recordDate": null,
"divOnlyRate": 0.115,
"fileRecordID": null,
"indicatedAnnualDividend": null
}
I am doing something like below-
var hiveDF = spark.sqlContext.sql("select * from final_destination_tableName")
var newDataDF = spark.sqlContext.sql("select * from incremental_table_1 where id > 866000")
My incremental table (newDataDF) has some columns with different data types. I have around 10 incremental tables where somewhere bigint and the same in other table as string so can't be sure if I do typecast. Typecast may be easy but I am not sure on which type should I do since multiple tables are there. I am looking for any approach where without typecast I can do.
For an example incremental table is something like below-
+--------------------------+------------+----------+--+
| col_name | data_type | comment |
+--------------------------+------------+----------+--+
| announcementtype | string | |
| approvalstatus | string | |
| capitalrate | string | |
| cash | double | |
| cashinlieuprice | double | |
| costfactor | double | |
| createdby | string | |
| createddate | string | |
| currencycode | string | |
| declarationdate | string | |
| declarationtype | string | |
| divfeerate | string | |
| divonlyrate | double | |
| dividendtype | string | |
| dividendtypeid | bigint | |
| editedby | string | |
| editeddate | string | |
| exdate | string | |
| filerecordid | string | |
| frequency | string | |
| grossdivrate | double | |
| id | bigint | |
| indicatedannualdividend | string | |
| longtermrate | double | |
| netdivrate | string | |
| newname | string | |
| newsymbol | string | |
| note | string | |
| oldname | string | |
| oldsymbol | string | |
| paydate | string | |
| productid | bigint | |
| qualifiedratedollar | string | |
| qualifiedratepercent | string | |
| recorddate | string | |
| sharefactor | double | |
| shorttermrate | double | |
| specialdivrate | string | |
| splitfactor | double | |
| taxstatuscodeid | string | |
| lastmodifieddate | timestamp | |
| active_status | boolean | |
+--------------------------+------------+----------+--+
I'm doing this union for table something like below-
var combinedDF = hiveDF.unionAll(newDataDF)
but no luck. I tried to give final schema as below but no luck-
val rows = newDataDF.rdd
val newDataDF2 = spark.sqlContext.createDataFrame(rows, hiveDF.schema)
var combinedDF = hiveDF.unionAll(newDataDF2)
combinedDF.coalesce(1).write.mode(SaveMode.Overwrite).option("orc.compress", "snappy").orc("/apps/hive/warehouse/" + database + "/" + tableLower + "_temp")
As per this, I tried below-
var combinedDF = sparkSession.read.json(hiveDF.toJSON.union(newDataDF.toJSON).rdd)
Finally I am trying to write into table like above but no luck, plz help me-