My use case:
Read data from a MongoDB collection of the form:
{
    "_id" : ObjectId("582cab1b21650fc72055246d"),
    "label" : 167.517838916715,
    "features" : [ 
        10.0964787450654, 
        218.621137772497, 
        18.8833848806122, 
        11.8010251302327, 
        1.67037687829152, 
        22.0766170950477, 
        11.7122322171201, 
        12.8014773524475, 
        8.30441804118235, 
        29.4821268054137
    ]
}
And pass it to the org.apache.spark.ml.regression.LinearRegression class to create a model for predictions.
My problem:
The Spark connector reads in "features" as Array[Double].
LinearRegression.fit(...) expects a DataSet with a Label column and a Features column.
The Features column must be of type VectorUDT (so DenseVector or SparseVector will work).
I cannot .map features from Array[Double] to DenseVector because there is no relevant Encoder:
Error:(23, 11) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
  .map{case Row(label: Double, features: Array[Double]) => Row(label, Vectors.dense(features))}
Custom Encoders cannot be defined.
My question:
- Is there a way I can set the configuration of the Spark connector to read in the "features" array as a Dense/SparseVector?
- Is there any other way I can achieve this (without, for example, using an intermediary .csv file and loading that using libsvm)?
My code:
import com.mongodb.spark.MongoSpark
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.{Row, SparkSession}
case class DataPoint(label: Double, features: Array[Double])
object LinearRegressionWithMongo {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("LinearRegressionWithMongo")
      .master("local[4]")
      .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/LinearRegressionTest.DataPoints")
      .getOrCreate()
    import spark.implicits._
    val dataPoints = MongoSpark.load(spark)
      .map{case Row(label: Double, features: Array[Double]) => Row(label, Vectors.dense(features))}
    val splitData = dataPoints.randomSplit(Array(0.7, 0.3), 42)
    val training = splitData(0)
    val test = splitData(1)
    val linearRegression = new LinearRegression()
      .setLabelCol("label")
      .setFeaturesCol("features")
      .setRegParam(0.0)
      .setElasticNetParam(0.0)
      .setMaxIter(100)
      .setTol(1e-6)
    // Train the model
    val startTime = System.nanoTime()
    val linearRegressionModel = linearRegression.fit(training)
    val elapsedTime = (System.nanoTime() - startTime) / 1e9
    println(s"Training time: $elapsedTime seconds")
    // Print the weights and intercept for linear regression.
    println(s"Weights: ${linearRegressionModel.coefficients} Intercept: ${linearRegressionModel.intercept}")
    val modelEvaluator = new ModelEvaluator()
    println("Training data results:")
    modelEvaluator.evaluateRegressionModel(linearRegressionModel, training, "label")
    println("Test data results:")
    modelEvaluator.evaluateRegressionModel(linearRegressionModel, test, "label")
    spark.stop()
  }
}
Any help would be ridiculously appreciated!
 
     
    