I wrote a small Spark application which should measure the time that Spark needs to run an action on a partitioned RDD (combineByKey function to sum a value).
My problem is, that the first iteration seems to work correctly (calculated duration ~25 ms), but the next ones take much less time (~5 ms). It seems to me, that Spark persists the data without any request to do so!? Can I avoid that programmatically?
I have to know the duration that Spark needs to calculate a new RDD (without any caching / persisting of earlier iterations) --> I think the duration should always be about 20-25 ms!
To ensure the recalculation I moved the SparkContext generation into the for-loops, but this didn't bring any changes...
Thanks for your advices!
Here my code which seems to persist any data:
public static void main(String[] args) {
    switchOffLogging();
    // jetzt
    try {
        // Setup: Read out parameters & initialize SparkContext
        String path = args[0];
        SparkConf conf = new SparkConf(true);
        JavaSparkContext sc;
        // Create output file & writer
        System.out.println("\npar.\tCount\tinput.p\tcons.p\tTime");
        // The RDDs used for the benchmark
        JavaRDD<String> input = null;
        JavaPairRDD<Integer, String> pairRDD = null;
        JavaPairRDD<Integer, String> partitionedRDD = null;
        JavaPairRDD<Integer, Float> consumptionRDD = null;
        // Do the tasks iterative (10 times the same benchmark for testing)
        for (int i = 0; i < 10; i++) {
            boolean partitioning = true;
            int partitionsCount = 8;
            sc = new JavaSparkContext(conf);
            setS3credentials(sc, path);
            input = sc.textFile(path);
            pairRDD = mapToPair(input);
            partitionedRDD = partition(pairRDD, partitioning, partitionsCount);
            // Measure the duration
            long duration = System.currentTimeMillis();
            // Do the relevant function
            consumptionRDD = partitionedRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);
            duration = System.currentTimeMillis() - duration;
            // So some action to invoke the calculation
            System.out.println(consumptionRDD.collect().size());
            // Print the results
            System.out.println("\n" + partitioning + "\t" + partitionsCount + "\t" + input.partitions().size() + "\t" + consumptionRDD.partitions().size() + "\t" + duration + " ms");
            input = null;
            pairRDD = null;
            partitionedRDD = null;
            consumptionRDD = null;
            sc.close();
            sc.stop();
        }
    } catch (Exception e) {
        e.printStackTrace();
        System.out.println(e.getMessage());
    }
}
Some helper functions (should not be the problem):
private static void switchOffLogging() {
    Logger.getLogger("org").setLevel(Level.OFF);
    Logger.getLogger("akka").setLevel(Level.OFF);
}
private static void setS3credentials(JavaSparkContext sc, String path) {
    if (path.startsWith("s3n://")) {
        Configuration hadoopConf = sc.hadoopConfiguration();
        hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
        hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
        hadoopConf.set("fs.s3n.awsAccessKeyId", "mycredentials");
        hadoopConf.set("fs.s3n.awsSecretAccessKey", "mycredentials");
    }
}
// Initial element
private static Function<String, Float> createCombiner = new Function<String, Float>() {
    public Float call(String dataSet) throws Exception {
        String[] data = dataSet.split(",");
        float value = Float.valueOf(data[2]);
        return value;
    }
};
// merging function for a new dataset
private static Function2<Float, String, Float> mergeValue = new Function2<Float, String, Float>() {
    public Float call(Float sumYet, String dataSet) throws Exception {
        String[] data = dataSet.split(",");
        float value = Float.valueOf(data[2]);
        sumYet += value;
        return sumYet;
    }
};
// function to sum the consumption
private static Function2<Float, Float, Float> mergeCombiners = new Function2<Float, Float, Float>() {
    public Float call(Float a, Float b) throws Exception {
        a += b;
        return a;
    }
};
private static JavaPairRDD<Integer, String> partition(JavaPairRDD<Integer, String> pairRDD, boolean partitioning, int partitionsCount) {
    if (partitioning) {
        return pairRDD.partitionBy(new HashPartitioner(partitionsCount));
    } else {
        return pairRDD;
    }
}
private static JavaPairRDD<Integer, String> mapToPair(JavaRDD<String> input) {
    return input.mapToPair(new PairFunction<String, Integer, String>() {
        public Tuple2<Integer, String> call(String debsDataSet) throws Exception {
            String[] data = debsDataSet.split(",");
            int houseId = Integer.valueOf(data[6]);
            return new Tuple2<Integer, String>(houseId, debsDataSet);
        }
    });
}
And finally the output of the Spark console:
part.   Count   input.p cons.p  Time
true    8       6       8       20 ms
true    8       6       8       23 ms
true    8       6       8       7 ms        // Too less!!!
true    8       6       8       21 ms
true    8       6       8       13 ms
true    8       6       8       6 ms        // Too less!!!
true    8       6       8       5 ms        // Too less!!!
true    8       6       8       6 ms        // Too less!!!
true    8       6       8       4 ms        // Too less!!!
true    8       6       8       7 ms        // Too less!!!