Picture the following senario: A Spark application (Java implementation) is using Cassandra database to load, convert to RDD and process the data. Also the application is steaming new data from the database which are also processed by a custom receiver. The output of the streaming process is stored in the database. The implementation is using Spring Data Cassandra from the integration with the database.
CassandraConfig:
@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {
    @Autowired
    private Environment env;
    @Bean
    public CassandraClusterFactoryBean cluster() {
        CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
        cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
        cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));
        return cluster;
    }
    @Bean
    public CassandraMappingContext mappingContext() {
        return new BasicCassandraMappingContext();
    }
    @Bean
    public CassandraConverter converter() {
        return new MappingCassandraConverter(mappingContext());
    }
    @Bean
    public CassandraSessionFactoryBean session() throws Exception {
        CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
        session.setCluster(cluster().getObject());
        session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
        session.setConverter(converter());
        session.setSchemaAction(SchemaAction.NONE);
        return session;
    }
    @Bean
    public CassandraOperations cassandraTemplate() throws Exception {
        return new CassandraTemplate(session().getObject());
    }
}
DataProcessor.main method:
// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);
while(pagingResults != null && !pagingResults.isEmpty()) {
    Event lastEvent = pagingResults.get(pagingResults.size() - 1);
    pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
    // Parallelize page and add to the existing
    rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}
// data processing
...
It is expected to have a big amount of data for the initial loading. For this reason the data are paginated, loaded and distributed in rddBuffer.
There are also the following options available:
- The Spark-Cassandra example (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala), although there is minimum amount of documentation for this example.
- The Calliope project (http://tuplejump.github.io/calliope/)
I would like to know what is the best practice for the integration of Spark with Cassandra. What would be the best option to follow in my implementation?
Apache Spark 1.0.0, Apache Cassandra 2.0.8
 
     
    