0

in a dataframe, I'm generating a column based on column A in DateType format "yyyy-MM-dd". Column A is generated from a UDF (udf generates a random date from the last 24 months).

from that generated date I try to calculate column B. Column B is column A minus 6 months. ex. 2017-06-01 in A is 2017-01-01 in B. To achieve this I use function add_months(columname, -6)

when I do this using another column (not generated by udf) I get the right result. But when I do it on that generated column I get random values, totally wrong.

I checked the schema, column is from DateType

this is my code :

val test = df.withColumn("A", to_date(callUDF("randomUDF")))
val test2 = test.select(col("*"), add_months(col("A"), -6).as("B"))

code of my UDF :

sqlContext.udf.register("randomUDF", () => {

//prepare dateformat
val formatter = new SimpleDateFormat("yyyy-MM-dd")

//get today's date as reference 
val today = Calendar.getInstance()
val now = today.getTime()

//set "from" 2 years from now
val from = Calendar.getInstance()
from.setTime(now)
from.add(Calendar.MONTH, -24)

// set dates into Long
val valuefrom = from.getTimeInMillis()
val valueto = today.getTimeInMillis()

//generate random Long between from and to
val value3 = (valuefrom + Math.random()*(valueto - valuefrom))

// set generated value to Calendar and format date
val calendar3 = Calendar.getInstance()
calendar3.setTimeInMillis(value3.toLong)
formatter.format(calendar3.getTime()
}

UDF works as expected, but I think there is something going wrong here. I tried the add_months function on another column (not generated) and it worked fine.

example of results I get with this code :

A            |      B
2017-10-20   |   2016-02-27
2016-05-06   |   2015-05-25
2016-01-09   |   2016-03-14
2016-01-04   |   2017-04-26

using spark version 1.5.1 using scala 2.10.4

zero323
  • 322,348
  • 103
  • 959
  • 935

1 Answers1

0

The creation of test2 dataframe in your code

val test2 = test.select(col("*"), add_months(col("A"), -6).as("B"))

is treated by spark as

val test2 = df.withColumn("A", to_date(callUDF("randomUDF"))).select(col("*"), add_months(to_date(callUDF("randomUDF")), -6).as("B"))

So you can see that udf function is called twice. df.withColumn("A", to_date(callUDF("randomUDF"))) is generating the date that comes in column A. And add_months(to_date(callUDF("randomUDF")), -6).as("B") is calling udf function again and generating a new date and subtracting 6 months from it and showing that date in column B.

Thats the reason you are getting random dates.

The solution to this would be to use persist or cache in test dataframe as

val test = df.withColumn("A", callUDF("randomUDF")).cache()
val test2 = test.as("table").withColumn("B", add_months($"table.A", -6))
Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97