diff --git a/spring-batch-dynamic-composite/.gitignore b/spring-batch-dynamic-composite/.gitignore new file mode 100644 index 00000000..cd578f0c --- /dev/null +++ b/spring-batch-dynamic-composite/.gitignore @@ -0,0 +1,4 @@ +/target/ +.classpath +.project +.settings diff --git a/spring-batch-dynamic-composite/README.md b/spring-batch-dynamic-composite/README.md new file mode 100644 index 00000000..7a18b7bc --- /dev/null +++ b/spring-batch-dynamic-composite/README.md @@ -0,0 +1,19 @@ +# spring-batch-dynamic-composite + +This project is all about spring-batch' composite processor and writer. Currently, spring-batch' built-in composite processor and writer are not dynamic. Meaning, one set them in design time (and coding) and it cannot be changed during runtime. But there are cases that this is needed. + +Without loss of generality, we say here "processor" but mean "reader" and "writer" as well. + +The idea is to have the ability to replace a processor(s) at runtime. For example, in case of multiple processors (AKA composite-processor), to have the option to add/remove/replace/change-order of processors. + +In the implementation, there is `DynamicCompositeItemProcessor` (which is a real `ItemProcessor`) that uses a manager to read the list of processors bean-names from the DB. Thus, the processors list can be modified and reloaded. + +As mentioned, same for reader as well as for writer. + +## why do we need this? + +There are cases that processors are used as "filters", and it may occur that the business (the client) may change the requirements (yes, it is very annoying) and ask to switch among filters (change the priority). + +Other use case is having multiple readers, reading the data from different data warehouses, and again - the client changes the warehouse from time to time (integration phase), and I do not want my app to be restarted each and every time. + +There are many other use cases, of course. diff --git a/spring-batch-dynamic-composite/pom.xml b/spring-batch-dynamic-composite/pom.xml new file mode 100644 index 00000000..a33f213a --- /dev/null +++ b/spring-batch-dynamic-composite/pom.xml @@ -0,0 +1,77 @@ + + + 4.0.0 + org.springframewor.batch + spring-batch-dynamic-composite + 1.0-SNAPSHOT + jar + + + + 4.2.3.RELEASE + 3.0.5.RELEASE + + + + + + log4j + log4j + 1.2.17 + + + + org.springframework + spring-jdbc + ${spring.version} + + + + org.springframework.batch + spring-batch-core + ${spring-batch.version} + + + + + + + mysql + mysql-connector-java + 5.1.16 + + + + + + + + + org.apache.maven.plugins + 2.5.1 + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + org.apache.maven.plugins + maven-source-plugin + + + attach-sources + + jar + + + + + + + + + \ No newline at end of file diff --git a/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/core/BatchBeanTypeEnum.java b/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/core/BatchBeanTypeEnum.java new file mode 100644 index 00000000..6c2e03b6 --- /dev/null +++ b/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/core/BatchBeanTypeEnum.java @@ -0,0 +1,6 @@ +package com.ohadr.spring_batch_dynamic_composite.core; + +public enum BatchBeanTypeEnum +{ + READER, PROCESSOR, WRITER +} diff --git a/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/core/CompositeBatchBeanEntity.java b/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/core/CompositeBatchBeanEntity.java new file mode 100644 index 00000000..699dca94 --- /dev/null +++ b/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/core/CompositeBatchBeanEntity.java @@ -0,0 +1,78 @@ +package com.ohadr.spring_batch_dynamic_composite.core; + +import java.io.Serializable; +import org.springframework.batch.core.Entity; + +public class CompositeBatchBeanEntity + extends Entity + implements Comparable, Serializable +{ + private static final long serialVersionUID = 6582817624632934459L; + + private String name; + + private Integer priority; + + private String taskName; + + private BatchBeanTypeEnum batchBeanType; + + + public CompositeBatchBeanEntity(Long id, + String name, Integer priority, String taskName, BatchBeanTypeEnum batchBeanType) + { + super(id); + this.name = name; + this.priority = priority; + this.taskName = taskName; + this.batchBeanType = batchBeanType; + } + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public Integer getPriority() + { + return priority; + } + + public void setPriority(Integer priority) + { + this.priority = priority; + } + + public String getTaskName() + { + return taskName; + } + + public void setTaskName(String taskName) + { + this.taskName = taskName; + } + + public BatchBeanTypeEnum getBatchBeanType() + { + return batchBeanType; + } + + public void setBatchBeanType(BatchBeanTypeEnum batchBeanType) + { + this.batchBeanType = batchBeanType; + } + + @Override + public int compareTo(CompositeBatchBeanEntity compositeProcessorEntity) + { + // descending order + return compositeProcessorEntity.getPriority() - this.getPriority(); + } + +} diff --git a/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/core/CompositeBatchBeanManager.java b/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/core/CompositeBatchBeanManager.java new file mode 100644 index 00000000..0a103664 --- /dev/null +++ b/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/core/CompositeBatchBeanManager.java @@ -0,0 +1,21 @@ +package com.ohadr.spring_batch_dynamic_composite.core; + +import java.util.List; +import java.util.Set; + +public interface CompositeBatchBeanManager +{ + + List getBatchBeanList(String task, BatchBeanTypeEnum batchBeanType); + + Set getAllTaskNames(); + + Set getAllValuesOfBeanTypes(); + + void deleteBatchBean(Long compositeBatchBeanId); + + CompositeBatchBeanEntity getBatchBean(Long compositeBatchBeanId); + + void addBatchBean(CompositeBatchBeanEntity processor); + +} diff --git a/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/core/CompositeBatchBeanManagerImpl.java b/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/core/CompositeBatchBeanManagerImpl.java new file mode 100644 index 00000000..04f61a12 --- /dev/null +++ b/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/core/CompositeBatchBeanManagerImpl.java @@ -0,0 +1,103 @@ +/** + * Copyright (c) 2016, William Hill Online. All rights reserved + */ +package com.ohadr.spring_batch_dynamic_composite.core; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import org.apache.log4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +//import org.springframework.cache.annotation.Cacheable; +import org.springframework.util.StringUtils; + +import com.ohadr.spring_batch_dynamic_composite.repository.CompositeBatchBeanDao; + + +@Component +public class CompositeBatchBeanManagerImpl implements CompositeBatchBeanManager +{ + private static Logger log = Logger.getLogger(CompositeBatchBeanManagerImpl.class); + /** + * UID for CompositeBatchBeanManager + */ + public final static String UID = "compositeBatchBeanManager"; + + @Autowired + private CompositeBatchBeanDao compositeBatchBeanDao; + + + + @Override +// @Cacheable(value = "compositeProcessorCache", key = "#key1+#batchBeanType+#key2") + public List getBatchBeanList(String taskName, BatchBeanTypeEnum batchBeanType) + { + if (StringUtils.isEmpty(taskName)) + { + throw new IllegalArgumentException("taskName is null."); + } + + if (batchBeanType == null) + { + throw new IllegalArgumentException("batchBeanType is null."); + } + + List batchBeanEntities = compositeBatchBeanDao.getCompositeBatchBeans(taskName, batchBeanType); + + if (batchBeanEntities == null) + { + return new ArrayList<>(); + } + + Collections.sort(batchBeanEntities); + + log.info("beans list:" + batchBeanEntities); + + return batchBeanEntities; + } + + + @Override + public void addBatchBean(CompositeBatchBeanEntity processor) + { + if (processor == null) + { + throw new IllegalArgumentException("processor is null."); + } + + compositeBatchBeanDao.update(processor); + } + + @Override + public void deleteBatchBean(Long compositeBatchBeanId) + { + compositeBatchBeanDao.delete(compositeBatchBeanId); + } + + @Override + public CompositeBatchBeanEntity getBatchBean(Long compositeBatchBeanId) + { + return compositeBatchBeanDao.get(compositeBatchBeanId); + } + + + @Override + public Set getAllTaskNames() + { + return compositeBatchBeanDao.getAllTaskNames(); + } + + @Override + public Set getAllValuesOfBeanTypes() + { + return compositeBatchBeanDao.getAllValuesOfBeanTypes(); + } + + public void setTablePrefix(String tablePrefix) + { + compositeBatchBeanDao.setTablePrefix(tablePrefix); + } + +} diff --git a/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/item/AbstractDynamicCompositeItem.java b/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/item/AbstractDynamicCompositeItem.java new file mode 100644 index 00000000..effd05c8 --- /dev/null +++ b/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/item/AbstractDynamicCompositeItem.java @@ -0,0 +1,69 @@ +package com.ohadr.spring_batch_dynamic_composite.item; + +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.StepExecutionListener; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.util.StringUtils; +import com.ohadr.spring_batch_dynamic_composite.core.CompositeBatchBeanManager; + +public abstract class AbstractDynamicCompositeItem + implements + StepExecutionListener, + ApplicationContextAware, + InitializingBean +{ + protected String taskName; + + @Autowired + protected CompositeBatchBeanManager compositeBatchBeanManager; + + // if false, filter result is false if any filter is defined + protected Boolean acceptEmptyFiltersList = false; + + protected ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException + { + this.applicationContext = applicationContext; + } + + //hide super's implementation + @Override + public void afterPropertiesSet() throws Exception + { + } + + @Override + public ExitStatus afterStep(StepExecution stepExecution) + { + return stepExecution != null ? stepExecution.getExitStatus() : null; + } + + /** + * get items list, which means processors, readers or writers. each implemetation should + * set the list that is read from DB to the "delegates" member (e.g. CompositeItemProcessor.delegates) + */ + protected abstract void getItemsList(); + + @Override + public void beforeStep(StepExecution stepExecution) + { + taskName = stepExecution.getJobExecution().getJobInstance().getJobName(); + + if (StringUtils.isEmpty(taskName)) + { + String message = getClass().getSimpleName() + " beforeStep: taskName is null or empty."; + // Log.error(message); + throw new RuntimeException(message); + } + + //get processors list from DB + getItemsList(); + } +} diff --git a/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/item/DynamicCompositeItemProcessor.java b/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/item/DynamicCompositeItemProcessor.java new file mode 100644 index 00000000..688d7578 --- /dev/null +++ b/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/item/DynamicCompositeItemProcessor.java @@ -0,0 +1,49 @@ +package com.ohadr.spring_batch_dynamic_composite.item; + +import java.util.ArrayList; +import java.util.List; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.support.CompositeItemProcessor; +import com.ohadr.spring_batch_dynamic_composite.core.BatchBeanTypeEnum; +import com.ohadr.spring_batch_dynamic_composite.core.CompositeBatchBeanEntity; + +public class DynamicCompositeItemProcessor extends AbstractDynamicCompositeItem + implements ItemProcessor +{ + //aggregate CompositeItemProcessor rather than extending it: + protected CompositeItemProcessor innerComposite = new CompositeItemProcessor(); + + protected void getProcessorsList() + { + List> delegates = new ArrayList>(); + + BatchBeanTypeEnum batchBeanType = BatchBeanTypeEnum.PROCESSOR; + List processorsList = compositeBatchBeanManager.getBatchBeanList(taskName, batchBeanType); + if (processorsList.isEmpty() && !acceptEmptyFiltersList) + { + String message = "No " + batchBeanType + " were found for taskName=" + taskName; + throw new RuntimeException(message); + } + + for (CompositeBatchBeanEntity compositeProcessorEntity : processorsList) + { + // load generic filter by name + ItemProcessor processor = applicationContext.getBean(compositeProcessorEntity.getName(), ItemProcessor.class); + delegates.add( processor ); + } + + innerComposite.setDelegates(delegates); + } + + @Override + protected void getItemsList() + { + getProcessorsList(); + } + + @Override + public O process(I item) throws Exception + { + return innerComposite.process(item); + } +} diff --git a/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/item/DynamicCompositeItemWriter.java b/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/item/DynamicCompositeItemWriter.java new file mode 100644 index 00000000..e05ce0cf --- /dev/null +++ b/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/item/DynamicCompositeItemWriter.java @@ -0,0 +1,50 @@ +package com.ohadr.spring_batch_dynamic_composite.item; + +import java.util.ArrayList; +import java.util.List; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.support.CompositeItemWriter; +import com.ohadr.spring_batch_dynamic_composite.core.BatchBeanTypeEnum; +import com.ohadr.spring_batch_dynamic_composite.core.CompositeBatchBeanEntity; + +public class DynamicCompositeItemWriter extends AbstractDynamicCompositeItem + implements ItemWriter +{ + //aggregate CompositeItemWriter rather than extending it: + protected CompositeItemWriter innerComposite = new CompositeItemWriter<>(); + + protected void getWritersList() + { + List> delegates = new ArrayList>(); + + BatchBeanTypeEnum batchBeanType = BatchBeanTypeEnum.WRITER; + List processorsList = compositeBatchBeanManager.getBatchBeanList(taskName, batchBeanType); + if (processorsList.isEmpty() && !acceptEmptyFiltersList) + { + String message = "No " + batchBeanType + " were found for taskName=" + taskName; + throw new RuntimeException(message); + } + + for (CompositeBatchBeanEntity compositeProcessorEntity : processorsList) + { + // load generic filter by name + ItemWriter writer = applicationContext.getBean(compositeProcessorEntity.getName(), ItemWriter.class); + delegates.add( writer ); + } + + innerComposite.setDelegates(delegates); + } + + @Override + protected void getItemsList() + { + getWritersList(); + } + + @Override + public void write(List items) throws Exception + { + innerComposite.write(items); + } + +} diff --git a/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/repository/CompositeBatchBeanDao.java b/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/repository/CompositeBatchBeanDao.java new file mode 100644 index 00000000..febcc94e --- /dev/null +++ b/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/repository/CompositeBatchBeanDao.java @@ -0,0 +1,25 @@ +package com.ohadr.spring_batch_dynamic_composite.repository; + +import java.util.List; +import java.util.Set; +import com.ohadr.spring_batch_dynamic_composite.core.BatchBeanTypeEnum; +import com.ohadr.spring_batch_dynamic_composite.core.CompositeBatchBeanEntity; + +public interface CompositeBatchBeanDao +{ + + List getCompositeBatchBeans(String taskName, BatchBeanTypeEnum batchBeanType); + + Set getAllTaskNames(); + + Set getAllValuesOfBeanTypes(); + + void update(CompositeBatchBeanEntity processor); + + void delete(Long compositeBatchBeanId); + + CompositeBatchBeanEntity get(Long compositeBatchBeanId); + + void setTablePrefix(String tablePrefix); + +} diff --git a/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/repository/JdbcCompositeBatchBeanDao.java b/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/repository/JdbcCompositeBatchBeanDao.java new file mode 100644 index 00000000..f5898945 --- /dev/null +++ b/spring-batch-dynamic-composite/src/main/java/com/ohadr/spring_batch_dynamic_composite/repository/JdbcCompositeBatchBeanDao.java @@ -0,0 +1,125 @@ +package com.ohadr.spring_batch_dynamic_composite.repository; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; +import javax.sql.DataSource; +import org.apache.log4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.EmptyResultDataAccessException; +import org.springframework.jdbc.core.JdbcOperations; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Repository; +import com.ohadr.spring_batch_dynamic_composite.core.BatchBeanTypeEnum; +import com.ohadr.spring_batch_dynamic_composite.core.CompositeBatchBeanEntity; + +@Repository +public class JdbcCompositeBatchBeanDao implements CompositeBatchBeanDao +{ + private static Logger log = Logger.getLogger(JdbcCompositeBatchBeanDao.class); + + private static final String TABLE_NAME = "composite_batch_beans"; + private static final String DEFAULT_TABLE_PREFIX = ""; + private String tablePrefix = DEFAULT_TABLE_PREFIX; + + private static final String COMPOSITE_BATCH_BEANS_FIELDS = "ID, " + + "NAME, " + + "JOB_NAME, " + + "BATCH_BEAN_TYPE, " + + "PRIORITY"; + + private final String DEFAULT_COMPOSITE_BATCH_BEANS_SELECT_STATEMENT = "select " + COMPOSITE_BATCH_BEANS_FIELDS + + " from " + getTableName() + " where JOB_NAME = ? and BATCH_BEAN_TYPE = ?"; + + private JdbcOperations jdbcTemplate; + + + @Autowired + public void setDataSource(DataSource dataSource) + { + jdbcTemplate = new JdbcTemplate(dataSource); + } + + @Override + public List getCompositeBatchBeans(String taskName, BatchBeanTypeEnum batchBeanType) + { + List batchBeans = null; + try + { + log.info("query: " + DEFAULT_COMPOSITE_BATCH_BEANS_SELECT_STATEMENT + " taskName=" + taskName + " batchBeanType=" + batchBeanType); + batchBeans = jdbcTemplate.query(DEFAULT_COMPOSITE_BATCH_BEANS_SELECT_STATEMENT, + new CompositeBatchBeanEntityRowMapper(), + taskName, batchBeanType.name()); + } + catch (EmptyResultDataAccessException e) + { + log.info("No record was found for taskName: " + taskName); + throw new NoSuchElementException("no record was found for taskName: " + taskName); + } + + + return batchBeans; + } + + @Override + public Set getAllTaskNames() + { + // TODO Auto-generated method stub + return null; + } + + @Override + public Set getAllValuesOfBeanTypes() + { + // TODO Auto-generated method stub + return null; + } + + @Override + public void update(CompositeBatchBeanEntity processor) + { + // TODO Auto-generated method stub + + } + + @Override + public void delete(Long compositeBatchBeanId) + { + // TODO Auto-generated method stub + + } + + @Override + public CompositeBatchBeanEntity get(Long compositeBatchBeanId) + { + // TODO Auto-generated method stub + return null; + } + + private final class CompositeBatchBeanEntityRowMapper implements RowMapper + { + @Override + public CompositeBatchBeanEntity mapRow(ResultSet rs, int rowNum) throws SQLException + { + CompositeBatchBeanEntity compositeBatchBeanEntity = + new CompositeBatchBeanEntity(rs.getLong(1), rs.getString(2), rowNum, null, null); + // should always be at version=0 because they never get updated + compositeBatchBeanEntity.incrementVersion(); + return compositeBatchBeanEntity; + } + } + + @Override + public void setTablePrefix(String tablePrefix) + { + this.tablePrefix = tablePrefix; + } + + private String getTableName() + { + return tablePrefix + TABLE_NAME; + } +} diff --git a/spring-batch-dynamic-composite/src/main/resources/schema.sql b/spring-batch-dynamic-composite/src/main/resources/schema.sql new file mode 100644 index 00000000..002f2c4d --- /dev/null +++ b/spring-batch-dynamic-composite/src/main/resources/schema.sql @@ -0,0 +1,10 @@ +DROP TABLE IF EXISTS COMPOSITE_BATCH_BEANS; + +CREATE TABLE `COMPOSITE_BATCH_BEANS` ( + `ID` BIGINT(20) NOT NULL AUTO_INCREMENT, + `NAME` VARCHAR(255) NOT NULL, + `JOB_NAME` VARCHAR(255) NOT NULL, + `BATCH_BEAN_TYPE` VARCHAR(50) NULL DEFAULT NULL, + `PRIORITY` INT(11) NULL DEFAULT NULL, + PRIMARY KEY (`ID`) +);