I'm using Spring Batch which loads data from Oracle and put it into the MongoDB. I'm looking to use the Spring Cloud Data Flow, but SCDF doesn't have support for the MongoDB.
Is there any way if we can maintain the SCDF Metadata into the Postgres (as its has good support) or H2, in this example I used H2, and run the actual batch jobs which load data from Oracle to MongoDB.
When I tried to run this combination, I get the below error, certainly some config changes needs to be tweek. Any quick workaround ?
Error:
org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:152) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader$$FastClassBySpringCGLIB$$ebb633d0.invoke(<generated>) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688) ~[spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.batch.item.database.JdbcCursorItemReader$$EnhancerBySpringCGLIB$$3c62d122.open(<generated>) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103) ~[spring-batch-infrastructure-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) ~[spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:399) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:313) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:144) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_171]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_171]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) [spring-batch-core-4.1.1.RELEASE.jar!/:4.1.1.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.1.5.RELEASE.jar!/:5.1.5.RELEASE]
at com.sun.proxy.$Proxy66.run(Unknown Source) [na:na]
at com.mastercard.customer.data.management.refdata.ReferenceMongoBatchApplication.run(ReferenceMongoBatchApplication.java:63) [classes!/:1.0]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:813) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:797) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:324) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.3.RELEASE.jar!/:2.1.3.RELEASE]
at com.mastercard.customer.data.management.refdata.ReferenceMongoBatchApplication.main(ReferenceMongoBatchApplication.java:51) [classes!/:1.0]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_171]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_171]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) [reference-mongo-batch-1.0.jar:1.0]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) [reference-mongo-batch-1.0.jar:1.0]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:50) [reference-mongo-batch-1.0.jar:1.0]
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51) [reference-mongo-batch-1.0.jar:1.0]
application.properties
# ORACLE DATASOURCE
spring.datasource.url=jdbc:oracle:thin:@//XXXXXX:1527/XXXX
spring.datasource.username=user
spring.datasource.password=password
spring.datasource.driver-class-name=oracle.jdbc.OracleDriver
spring.datasource.validation-query= select 1
#spring.datasource.hikari.validationTimeout=30000
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=mgdb_drefdata_99
Config
@Configuration
public class EmployeeJob {
@Value( "${spring.chunk.size}")
private String chunkSize;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JdbcCursorItemReader<Employee> EmployeeReader;
@Autowired
public AsyncItemProcessor<Employee, Employee> asyncItemProcessor;
@Autowired
public AsyncItemWriter<Employee> asyncItemWriter;
@Bean
public EmployeeStepExecuListner EmployeeStepExecuListner() {
return new EmployeeStepExecuListner();
}
@Bean("readEmployeeJob")
@Primary
public Job readEmployeeJob() {
return jobBuilderFactory.get("readEmployeeJob")
.incrementer(new RunIdIncrementer())
.start(EmployeeStepOne())
.build();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Bean
public Step EmployeeStepOne() {
return stepBuilderFactory.get("EmployeeStepOne")
.<Employee, Employee>chunk(Integer.parseInt(chunkSize))
.reader(EmployeeReader)
.processor((ItemProcessor) asyncItemProcessor)
.writer(asyncItemWriter)
.build();
}
}
Config
@Configuration
public class EmployeeBatchConfig {
@Autowired
@Qualifier(value="oracleDS")
private DataSource dataSource;
@Bean(destroyMethod = "")
@StepScope
public JdbcCursorItemReader<Employee> EmployeeReader() throws Exception {
JdbcCursorItemReader<Employee> reader = new JdbcCursorItemReader<>();
reader.setDataSource(this.dataSource);
reader.setSql(SELECT_Employee_SQL);
reader.setRowMapper(new EmployeeRowMapper());
reader.afterPropertiesSet();
return reader;
}
@Bean
public ItemProcessor<Employee, Employee> EmployeeProcessor() {
return new EmployeeProcessor();
}
@Bean
public AsyncItemProcessor<Employee, Employee> asyncItemProcessor() throws Exception{
AsyncItemProcessor<Employee, Employee> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(EmployeeProcessor());
asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
asyncItemProcessor.afterPropertiesSet();
return asyncItemProcessor;
}
@Bean
public EmployeeWriter EmployeeWriter() {
return new EmployeeWriter();
}
@Bean
public AsyncItemWriter<Employee> asyncItemWriter() throws Exception{
AsyncItemWriter<Employee> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(EmployeeWriter());
asyncItemWriter.afterPropertiesSet();
return asyncItemWriter;
}
}