I am trying to move data from GP to HDFS using Scala & Spark.
val execQuery    = "select * from schema.tablename"
val yearDF       = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016").option("user", devUserName).option("password", devPassword).option("partitionColumn","header_id").option("lowerBound", 19919927).option("upperBound", 28684058).option("numPartitions",30).load()
val yearDFSchema = yearDF.schema
The schema for yearDF is:
root
 |-- source_system_name: string (nullable = true)
 |-- table_refresh_delay_min: decimal(38,30) (nullable = true)
 |-- release_number: decimal(38,30) (nullable = true)
 |-- change_number: decimal(38,30) (nullable = true)
 |-- interface_queue_enabled_flag: string (nullable = true)
 |-- rework_enabled_flag: string (nullable = true)
 |-- fdm_application_id: decimal(15,0) (nullable = true)
 |-- history_enabled_flag: string (nullable = true)
The schema of same table on hive which is given by our project:
val hiveColumns = source_system_name:String|description:String|creation_date:Timestamp|status:String|status_date:Timestamp|table_refresh_delay_min:Timestamp|release_number:Double|change_number:Double|interface_queue_enabled_flag:String|rework_enabled_flag:String|fdm_application_id:Bigint|history_enabled_flag:String
So I took hiveColumns and created a new StructType as given below:
def convertDatatype(datatype: String): DataType = {
  val convert = datatype match {
    case "string"     => StringType
    case "bigint"     => LongType
    case "int"        => IntegerType
    case "double"     => DoubleType
    case "date"       => TimestampType
    case "boolean"    => BooleanType
    case "timestamp"  => TimestampType
  }
  convert
}
val schemaList = hiveColumns.split("\\|")
val newSchema  = new StructType(schemaList.map(col => col.split(":")).map(e => StructField(e(0), convertDatatype(e(1)), true)))
newSchema.printTreeString()
root
 |-- source_system_name: string (nullable = true)
 |-- table_refresh_delay_min: double (nullable = true)
 |-- release_number: double (nullable = true)
 |-- change_number: double (nullable = true)
 |-- interface_queue_enabled_flag: string (nullable = true)
 |-- rework_enabled_flag: string (nullable = true)
 |-- fdm_application_id: long (nullable = true)
 |-- history_enabled_flag: string (nullable = true)
When I try to apply my new schema: schemaStructType on yearDF as below, I get the exception:
 Caused by: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of double
The exception occurs due to conversion of decimal to double. 
What I don't understand is how can I convert the datatype of columns: table_refresh_delay_min, release_number, change_number, fdm_application_id in the StructType: newSchema from DoubleType to their corresponding datatypes present in yearDF's Schema. i.e.
If the column in yearDFSchema has a decimal datatype with precision more than zero, in this case decimal(38,30), I need to convert the same column's datatype in newSchema to DecimalType(38,30)
Could anyone let me know how can I achieve it ?