I'm testing my spring batch job with restart feature, I'm processing total of nine records with chunk size 5, after processing the first chunk - I'm intentionally failing the second chunk to test the failure scenario. As expected, my batch got failed after the first chunk processed successfully and in my table - batch_job_execution I've the record with execution id and status as FAILED. Now I'm running the restart job by passing the execution id to verify the failed records are processing or not. But I'm getting the below exception when I run the failed job:
2022-05-03 18:58:44,829 [ JOB=scheduler.id_IS_UNDEFINED ] [THREAD=main] ERROR [RestartJobTasklet]
Exception java.lang.IllegalStateException: Existing transaction detected in JobRepository. Please fix this and try again (e.g. remove @Transactional annotations from client).while restart the failed job executionId8
Could you please assist me here - what I'm missing here. Please find my code below:
Appreciated your help in advance!
TestJobConfig.java
@Configuration
@Profile("myJob-config")
public class TestJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private MyItemWriter myItemWriter;
@Autowired
private RestartJobTasklet restartJobTasklet;
@Bean("myJob-config")
public Job job(@Qualifier("validateStep") Step validateStep,
@Qualifier("processRecords") Step processRecords) {
Job job = jobBuilderFactory.get("myJob-config")
.incrementer(new RunIdIncrementer())
.start(validateStep)
.on("FAILED")
.end()
.from(validateStep).on("*").to(processRecords)
.end()
.build();
return job;
}
@Bean("restart-myjob")
public Job restartJob(@Qualifier("restartMyJobStep") Step restartMyJobStep) {
return jobBuilderFactory.get("restart-myjob")
.incrementer(new RunIdIncrementer())
.start(restartMyJobStep)
.build();
}
@Bean(name = "restartMyJobStep")
public Step restartMyJobStep() {
return this.stepBuilderFactory.get("restart-failed-job")
.tasklet(restartJobTasklet)
.build();
}
@Bean(name = "processRecords")
public Step processRecords() {
return this.stepBuilderFactory.get("process-csv-records").<Employee, Employee>chunk(5)
.reader(reader())
.writer(itemWriter())
.build();
}
@Bean(name = "validateStep")
public Step validateStep(@Qualifier("validateTasklet") Tasklet validateTasklet) {
return stepBuilderFactory.get("validateStep")
.tasklet(validateTasklet)
.allowStartIfComplete(true)
.build();
}
@Bean(name = "validateTasklet")
public Tasklet validateTasklet() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
return RepeatStatus.FINISHED;
}
};
}
@Bean
public FlatFileItemReader<Employee> reader() {
FlatFileItemReader<Employee> flatFileItemReader = new FlatFileItemReader<>();
flatFileItemReader.setLinesToSkip(1);
flatFileItemReader.setResource(new ClassPathResource("/csv/emps.csv"));
DefaultLineMapper<Employee> empDefaultLineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setNames(new String[]{"id", "firstName", "lastName"});
empDefaultLineMapper.setLineTokenizer(lineTokenizer);
empDefaultLineMapper.setFieldSetMapper(new EmployeeFieldSetMapper());
empDefaultLineMapper.afterPropertiesSet();
flatFileItemReader.setLineMapper(empDefaultLineMapper);
return flatFileItemReader;
}
@Bean
public MyItemWriter<Employee> itemWriter() {
return myItemWriter;
}
}
RestartJobTasklet.java
@Component
public class RestartJobTasklet implements Tasklet, StepExecutionListener {
@Autowired
JobExplorer jobExplorer;
@Autowired
JobOperator jobOperator;
private StepExecution stepExecution;
private JobExecution jobExecution;
@Autowired
private OpsJobProperties props;
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.jobExecution = stepExecution.getJobExecution();
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
Long executionId = 8l;
try {
Long restartId = jobOperator.restart(executionId);
JobExecution restartExecution = jobExplorer.getJobExecution(restartId);
} catch (JobRestartException e) {
throw e;
} catch (Exception exception) {
throw exception;
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return ExitStatus.COMPLETED;
}
}
DBConfig.java
@Configuration
public class DBConfig extends DefaultBatchConfigurer {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Bean
public JobRepository jobRepository(@Autowired DataSource dataSource,
@Autowired PlatformTransactionManager transactionManager) throws Exception {
JobRepositoryFactoryBean jobRepositoryFactory = new JobRepositoryFactoryBean();
jobRepositoryFactory.setDatabaseType(DatabaseType.POSTGRES.name());
jobRepositoryFactory.setDataSource(dataSource);
jobRepositoryFactory.setTransactionManager(transactionManager);
jobRepositoryFactory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
jobRepositoryFactory.setTablePrefix("BATCH_");
jobRepositoryFactory.setMaxVarCharLength(1000);
jobRepositoryFactory.setValidateTransactionState(Boolean.FALSE);
return jobRepositoryFactory.getObject();
}
@Bean()
public DataSource dataSource() {
PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
pgSimpleDataSource.setServerName("my-db-server");
pgSimpleDataSource.setDatabaseName("test-db");
pgSimpleDataSource.setUser("test");
pgSimpleDataSource.setPassword("test");
return pgSimpleDataSource;
}
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(final JobRegistry jobRegistry) {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry);
return postProcessor;
}
@Bean
public JobOperator jobOperator(final JobLauncher jobLauncher, final JobRepository jobRepository,
final JobRegistry jobRegistry, final JobExplorer jobExplorer) {
final SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobLauncher(jobLauncher);
jobOperator.setJobRepository(jobRepository);
jobOperator.setJobRegistry(jobRegistry);
jobOperator.setJobExplorer(jobExplorer);
return jobOperator;
}
@Bean
public JobExplorer jobExplorer(@Autowired DataSource dataSource) throws Exception {
final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
bean.setDataSource(dataSource);
bean.setTablePrefix("BATCH_");
bean.setJdbcOperations(new JdbcTemplate(dataSource));
bean.afterPropertiesSet();
return bean.getObject();
}
@Bean
public PlatformTransactionManager transactionManager(@Autowired DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
}
Error Log
2022-05-03 18:58:44,865 [ JOB=scheduler.id_IS_UNDEFINED ] [THREAD=main] ERROR [org.springframework.batch.core.step.AbstractStep]
Encountered an error executing step restart-failed-job in job restart-myjob
java.lang.IllegalStateException: Existing transaction detected in JobRepository. Please fix this and try again (e.g. remove @Transactional annotations from client).
at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean$1.invoke(AbstractJobRepositoryFactoryBean.java:177)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
at com.sun.proxy.$Proxy68.createJobExecution(Unknown Source)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:128)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
at com.sun.proxy.$Proxy73.run(Unknown Source)
at org.springframework.batch.core.launch.support.SimpleJobOperator.restart(SimpleJobOperator.java:283)
at org.springframework.batch.core.launch.support.SimpleJobOperator$$FastClassBySpringCGLIB$$44ee6049.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
at org.springframework.batch.core.launch.support.SimpleJobOperator$$EnhancerBySpringCGLIB$$e5e87de1.restart(<generated>)