diff --git a/.gitignore b/.gitignore index a9b76ab254..79dad5b8ad 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ com.springsource.sts.config.flow.prefs s3.properties .idea *.iml +.*.swp diff --git a/README.md b/README.md index 1517178320..2759f557d5 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Clone the git repository using the URL on the Github home page: $ cd spring-batch ## Command Line -Use Maven 2.2.* (might work with 3.0, but we don't test it), then on the command line: +Use Maven 2.2 or 3.0, then on the command line: $ mvn install @@ -27,7 +27,7 @@ In STS (or any Eclipse distro or other IDE with Maven support), import the modul This is the quickest way to get started. It requires an internet connection for download, and access to a Maven repository (remote or local). -* Download STS version 2.5.* (or better) from the [SpringSource website](http://www.springsource.com/products/sts). STS is a free Eclipse bundle with many features useful for Spring developers. +* Download STS version 2.5 (or better) from the [SpringSource website](http://www.springsource.com/products/sts). STS is a free Eclipse bundle with many features useful for Spring developers. * Go to `File->New->Spring Template Project` from the menu bar (in the Spring perspective). * The wizard has a drop down with a list of template projects. One of them is a "Simple Spring Batch Project". Select it and follow the wizard. * A project is created with all dependencies and a simple input/output job configuration. It can be run using a unit test, or on the command line (see instructions in the pom.xml). diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MapJobRegistry.java b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MapJobRegistry.java index daf4420001..96139770f2 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MapJobRegistry.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MapJobRegistry.java @@ -15,11 +15,11 @@ */ package org.springframework.batch.core.configuration.support; -import java.util.Collection; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.springframework.batch.core.Job; import org.springframework.batch.core.configuration.DuplicateJobException; @@ -29,50 +29,46 @@ import org.springframework.util.Assert; /** - * Simple map-based implementation of {@link JobRegistry}. Access to the map is - * synchronized, guarded by an internal lock. + * Simple, thread-safe, map-based implementation of {@link JobRegistry}. * * @author Dave Syer + * @author Robert Fischer * */ public class MapJobRegistry implements JobRegistry { - private Map map = new HashMap(); + private final ConcurrentMap map = new ConcurrentHashMap(); public void register(JobFactory jobFactory) throws DuplicateJobException { Assert.notNull(jobFactory); String name = jobFactory.getJobName(); Assert.notNull(name, "Job configuration must have a name."); - synchronized (map) { - if (map.containsKey(name)) { - throw new DuplicateJobException("A job configuration with this name [" + name - + "] was already registered"); - } - map.put(name, jobFactory); + JobFactory previousValue = map.putIfAbsent(name, jobFactory); + if(previousValue != null) { + throw new DuplicateJobException("A job configuration with this name [" + name + + "] was already registered"); } } public void unregister(String name) { - Assert.notNull(name, "Job configuration must have a name."); - synchronized (map) { - map.remove(name); - } - + Assert.notNull(name, "Must be given a name to unregister"); + map.remove(name); } public Job getJob(String name) throws NoSuchJobException { - synchronized (map) { - if (!map.containsKey(name)) { - throw new NoSuchJobException("No job configuration with the name [" + name + "] was registered"); - } - return map.get(name).createJob(); + JobFactory factory = map.get(name); + if(factory == null) { + throw new NoSuchJobException("No job configuration with the name [" + name + "] was registered"); + } else { + return factory.createJob(); } } - public Collection getJobNames() { - synchronized (map) { - return Collections.unmodifiableCollection(new HashSet(map.keySet())); - } + /** + * Provides an unmodifiable view of the job names. + */ + public Set getJobNames() { + return Collections.unmodifiableSet(map.keySet()); } } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MapExecutionContextDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MapExecutionContextDao.java index 7904a19586..888269fb2c 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MapExecutionContextDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MapExecutionContextDao.java @@ -16,8 +16,11 @@ package org.springframework.batch.core.repository.dao; +import java.util.concurrent.ConcurrentMap; import java.util.Map; +import java.io.Serializable; + import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.StepExecution; import org.springframework.batch.item.ExecutionContext; @@ -32,15 +35,61 @@ */ public class MapExecutionContextDao implements ExecutionContextDao { - private Map contextsByStepExecutionId = TransactionAwareProxyFactory + private final ConcurrentMap contexts = TransactionAwareProxyFactory .createAppendOnlyTransactionalMap(); - private Map contextsByJobExecutionId = TransactionAwareProxyFactory - .createAppendOnlyTransactionalMap(); + private static final class ContextKey implements Comparable, Serializable { + + private static enum Type { STEP, JOB; } + + private final Type type; + private final long id; + + private ContextKey(Type type, long id) { + if(type == null) throw new IllegalStateException("Need a non-null type for a context"); + this.type = type; + this.id = id; + } + + @Override + public int compareTo(ContextKey them) { + if(them == null) return 1; + final int idCompare = Long.compare(this.id, them.id); + if(idCompare != 0) return idCompare; + final int typeCompare = this.type.compareTo(them.type); + if(typeCompare != 0) return typeCompare; + return 0; + } + + @Override + public boolean equals(Object them) { + if(them == null) return false; + if(them instanceof ContextKey) return this.equals((ContextKey)them); + return false; + } + + public boolean equals(ContextKey them) { + if(them == null) return false; + return this.id == them.id && this.type.equals(them.type); + } + + @Override + public int hashCode() { + int value = (int)(id^(id>>>32)); + switch(type) { + case STEP: return value; + case JOB: return ~value; + default: throw new IllegalStateException("Unknown type encountered in switch: " + type); + } + } + + public static ContextKey step(long id) { return new ContextKey(Type.STEP, id); } + + public static ContextKey job(long id) { return new ContextKey(Type.JOB, id); } + } - public void clear() { - contextsByJobExecutionId.clear(); - contextsByStepExecutionId.clear(); + public void clear() { + contexts.clear(); } private static ExecutionContext copy(ExecutionContext original) { @@ -48,24 +97,24 @@ private static ExecutionContext copy(ExecutionContext original) { } public ExecutionContext getExecutionContext(StepExecution stepExecution) { - return copy(contextsByStepExecutionId.get(stepExecution.getId())); + return copy(contexts.get(ContextKey.step(stepExecution.getId()))); } public void updateExecutionContext(StepExecution stepExecution) { ExecutionContext executionContext = stepExecution.getExecutionContext(); if (executionContext != null) { - contextsByStepExecutionId.put(stepExecution.getId(), copy(executionContext)); + contexts.put(ContextKey.step(stepExecution.getId()), copy(executionContext)); } } public ExecutionContext getExecutionContext(JobExecution jobExecution) { - return copy(contextsByJobExecutionId.get(jobExecution.getId())); + return copy(contexts.get(ContextKey.job(jobExecution.getId()))); } public void updateExecutionContext(JobExecution jobExecution) { ExecutionContext executionContext = jobExecution.getExecutionContext(); if (executionContext != null) { - contextsByJobExecutionId.put(jobExecution.getId(), copy(executionContext)); + contexts.put(ContextKey.job(jobExecution.getId()), copy(executionContext)); } } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MapJobExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MapJobExecutionDao.java index 2b4effdb51..eac221a665 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MapJobExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MapJobExecutionDao.java @@ -23,7 +23,9 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentMap; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobInstance; @@ -36,9 +38,9 @@ */ public class MapJobExecutionDao implements JobExecutionDao { - private Map executionsById = new ConcurrentHashMap(); + private final ConcurrentMap executionsById = new ConcurrentSkipListMap(); - private long currentId = 0; + private final AtomicLong currentId = new AtomicLong(0L); public void clear() { executionsById.clear(); @@ -51,7 +53,7 @@ private static JobExecution copy(JobExecution original) { public void saveJobExecution(JobExecution jobExecution) { Assert.isTrue(jobExecution.getId() == null); - Long newId = currentId++; + Long newId = currentId.getAndIncrement(); jobExecution.setId(newId); jobExecution.incrementVersion(); executionsById.put(newId, copy(jobExecution)); diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MapJobInstanceDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MapJobInstanceDao.java index 552e71d78c..0961839981 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MapJobInstanceDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MapJobInstanceDao.java @@ -21,7 +21,8 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ConcurrentSkipListSet; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobInstance; @@ -33,9 +34,9 @@ */ public class MapJobInstanceDao implements JobInstanceDao { - private Collection jobInstances = new CopyOnWriteArraySet(); + private final Collection jobInstances = new ConcurrentSkipListSet(); - private long currentId = 0; + private final AtomicLong currentId = new AtomicLong(0L); public void clear() { jobInstances.clear(); @@ -45,7 +46,7 @@ public JobInstance createJobInstance(String jobName, JobParameters jobParameters Assert.state(getJobInstance(jobName, jobParameters) == null, "JobInstance must not already exist"); - JobInstance jobInstance = new JobInstance(currentId++, jobParameters, jobName); + JobInstance jobInstance = new JobInstance(currentId.getAndIncrement(), jobParameters, jobName); jobInstance.incrementVersion(); jobInstances.add(jobInstance); diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/MapExecutionContextDaoTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/MapExecutionContextDaoTests.java index 7fd900d4c5..8180e1085e 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/MapExecutionContextDaoTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/MapExecutionContextDaoTests.java @@ -3,6 +3,7 @@ import static org.junit.Assert.*; import org.junit.Test; +import org.junit.Ignore; import org.junit.internal.runners.JUnit4ClassRunner; import org.junit.runner.RunWith; import org.springframework.batch.core.JobExecution; @@ -34,7 +35,31 @@ protected StepExecutionDao getStepExecutionDao() { protected ExecutionContextDao getExecutionContextDao() { return new MapExecutionContextDao(); } + + @Test + public void testSaveBothJobAndStepContextWithSameId() throws Exception { + MapExecutionContextDao tested = new MapExecutionContextDao(); + JobExecution jobExecution = new JobExecution(1L); + StepExecution stepExecution = new StepExecution("stepName", jobExecution, 1L); + + assertTrue(stepExecution.getId() == jobExecution.getId()); + + jobExecution.getExecutionContext().put("type", "job"); + stepExecution.getExecutionContext().put("type", "step"); + assertTrue(!jobExecution.getExecutionContext().get("type").equals(stepExecution.getExecutionContext().get("type"))); + assertEquals("job", jobExecution.getExecutionContext().get("type")); + assertEquals("step", stepExecution.getExecutionContext().get("type")); + + tested.saveExecutionContext(jobExecution); + tested.saveExecutionContext(stepExecution); + ExecutionContext jobCtx = tested.getExecutionContext(jobExecution); + ExecutionContext stepCtx = tested.getExecutionContext(stepExecution); + + assertEquals("job", jobCtx.get("type")); + assertEquals("step", stepCtx.get("type")); + } + @Test public void testPersistentCopy() throws Exception { MapExecutionContextDao tested = new MapExecutionContextDao(); @@ -54,5 +79,18 @@ public void testPersistentCopy() throws Exception { assertTrue(retrieved.isEmpty()); } + @Ignore("Under discussion for JIRA ticket BATCH-1858") + @Test + public void testNullExecutionContextUpdate() throws Exception { + MapExecutionContextDao tested = new MapExecutionContextDao(); + JobExecution jobExecution = new JobExecution((long)1); + assertNotNull(jobExecution.getExecutionContext()); + tested.updateExecutionContext(jobExecution); + assertNotNull(tested.getExecutionContext(jobExecution)); + jobExecution.setExecutionContext(null); + tested.updateExecutionContext(jobExecution); + //assert???(mapExecutionContextDao.getExecutionContext(jobExecution) == null); + } + } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/MapJobExecutionDaoTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/MapJobExecutionDaoTests.java index d0a83d5f77..d788e67834 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/MapJobExecutionDaoTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/MapJobExecutionDaoTests.java @@ -1,6 +1,14 @@ package org.springframework.batch.core.repository.dao; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CountDownLatch; + +import java.util.ArrayList; +import java.util.Collections; import java.util.Date; +import java.util.List; +import java.util.SortedSet; import org.junit.Test; import org.junit.runner.RunWith; @@ -45,4 +53,63 @@ public void testPersistentCopy() { } + /** + * Verify that the ids are properly generated even under heavy concurrent load + */ + @Test + public void testConcurrentSaveJobExecution() throws Exception { + final int iterations = 5000; + + // Object under test + final JobExecutionDao tested = new MapJobExecutionDao(); + + // Support objects for this testing + final CountDownLatch latch = new CountDownLatch(1); + final SortedSet ids = new ConcurrentSkipListSet(); + final AtomicReference exception = new AtomicReference(null); + + // Implementation of the high-concurrency code + final Runnable codeUnderTest = new Runnable() { + public void run() { + try { + JobExecution jobExecution = new JobExecution(new JobInstance((long) -1, new JobParameters(), "mapJob")); + latch.await(); + tested.saveJobExecution(jobExecution); + ids.add(jobExecution.getId()); + } catch(Exception e) { + exception.set(e); + } + } + }; + + // Create the threads + final Thread[] threads = new Thread[iterations]; + for(int i = 0; i < iterations; i++) { + Thread t = new Thread(codeUnderTest, "Map Job Thread #" + (i+1)); + t.setPriority(Thread.MAX_PRIORITY); + t.setDaemon(true); + t.start(); + Thread.yield(); + threads[i] = t; + } + + // Let the high concurrency abuse begin! + do { latch.countDown(); } while(latch.getCount() > 0); + for(Thread t : threads) { t.join(); } + + // Ensure no general exceptions arose + if(exception.get() != null) throw new RuntimeException("Excepion occurred under high concurrency usage", exception.get()); + + // Validate the ids: we'd expect one of these three things to fail + if(ids.size() < iterations) { + fail("Duplicate id generated during high concurrency usage"); + } + if(ids.first() < 0) { + fail("Generated an id less than zero during high concurrency usage: " + ids.first()); + } + if(ids.last() > iterations) { + fail("Generated an id larger than expected during high concurrency usage: " + ids.last()); + } + } + } diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/support/transaction/TransactionAwareProxyFactory.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/support/transaction/TransactionAwareProxyFactory.java index efac6507a3..c54ac474e7 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/support/transaction/TransactionAwareProxyFactory.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/support/transaction/TransactionAwareProxyFactory.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; @@ -165,8 +166,8 @@ public static Map createTransactionalMap(Map map) { return (Map) new TransactionAwareProxyFactory>(new ConcurrentHashMap(map)).createInstance(); } - public static Map createAppendOnlyTransactionalMap() { - return (Map) new TransactionAwareProxyFactory>(new ConcurrentHashMap(), true).createInstance(); + public static ConcurrentMap createAppendOnlyTransactionalMap() { + return new TransactionAwareProxyFactory>(new ConcurrentHashMap(), true).createInstance(); } public static Set createAppendOnlyTransactionalSet() {