Skip to content

Improved Concurrency Support for MemoryJob* classes #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ com.springsource.sts.config.flow.prefs
s3.properties
.idea
*.iml
.*.swp
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Clone the git repository using the URL on the Github home page:
$ cd spring-batch

## Command Line
Use Maven 2.2.* (might work with 3.0, but we don't test it), then on the command line:
Use Maven 2.2 or 3.0, then on the command line:

$ mvn install

Expand All @@ -27,7 +27,7 @@ In STS (or any Eclipse distro or other IDE with Maven support), import the modul

This is the quickest way to get started. It requires an internet connection for download, and access to a Maven repository (remote or local).

* Download STS version 2.5.* (or better) from the [SpringSource website](http://www.springsource.com/products/sts). STS is a free Eclipse bundle with many features useful for Spring developers.
* Download STS version 2.5 (or better) from the [SpringSource website](http://www.springsource.com/products/sts). STS is a free Eclipse bundle with many features useful for Spring developers.
* Go to `File->New->Spring Template Project` from the menu bar (in the Spring perspective).
* The wizard has a drop down with a list of template projects. One of them is a "Simple Spring Batch Project". Select it and follow the wizard.
* A project is created with all dependencies and a simple input/output job configuration. It can be run using a unit test, or on the command line (see instructions in the pom.xml).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
*/
package org.springframework.batch.core.configuration.support;

import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.configuration.DuplicateJobException;
Expand All @@ -29,50 +29,46 @@
import org.springframework.util.Assert;

/**
* Simple map-based implementation of {@link JobRegistry}. Access to the map is
* synchronized, guarded by an internal lock.
* Simple, thread-safe, map-based implementation of {@link JobRegistry}.
*
* @author Dave Syer
* @author Robert Fischer
*
*/
public class MapJobRegistry implements JobRegistry {

private Map<String, JobFactory> map = new HashMap<String, JobFactory>();
private final ConcurrentMap<String, JobFactory> map = new ConcurrentHashMap<String, JobFactory>();

public void register(JobFactory jobFactory) throws DuplicateJobException {
Assert.notNull(jobFactory);
String name = jobFactory.getJobName();
Assert.notNull(name, "Job configuration must have a name.");
synchronized (map) {
if (map.containsKey(name)) {
throw new DuplicateJobException("A job configuration with this name [" + name
+ "] was already registered");
}
map.put(name, jobFactory);
JobFactory previousValue = map.putIfAbsent(name, jobFactory);
if(previousValue != null) {
throw new DuplicateJobException("A job configuration with this name [" + name
+ "] was already registered");
}
}

public void unregister(String name) {
Assert.notNull(name, "Job configuration must have a name.");
synchronized (map) {
map.remove(name);
}

Assert.notNull(name, "Must be given a name to unregister");
map.remove(name);
}

public Job getJob(String name) throws NoSuchJobException {
synchronized (map) {
if (!map.containsKey(name)) {
throw new NoSuchJobException("No job configuration with the name [" + name + "] was registered");
}
return map.get(name).createJob();
JobFactory factory = map.get(name);
if(factory == null) {
throw new NoSuchJobException("No job configuration with the name [" + name + "] was registered");
} else {
return factory.createJob();
}
}

public Collection<String> getJobNames() {
synchronized (map) {
return Collections.unmodifiableCollection(new HashSet<String>(map.keySet()));
}
/**
* Provides an unmodifiable view of the job names.
*/
public Set<String> getJobNames() {
return Collections.unmodifiableSet(map.keySet());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package org.springframework.batch.core.repository.dao;

import java.util.concurrent.ConcurrentMap;
import java.util.Map;

import java.io.Serializable;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.item.ExecutionContext;
Expand All @@ -32,40 +35,86 @@
*/
public class MapExecutionContextDao implements ExecutionContextDao {

private Map<Long, ExecutionContext> contextsByStepExecutionId = TransactionAwareProxyFactory
private final ConcurrentMap<ContextKey, ExecutionContext> contexts = TransactionAwareProxyFactory
.createAppendOnlyTransactionalMap();

private Map<Long, ExecutionContext> contextsByJobExecutionId = TransactionAwareProxyFactory
.createAppendOnlyTransactionalMap();
private static final class ContextKey implements Comparable<ContextKey>, Serializable {

private static enum Type { STEP, JOB; }

private final Type type;
private final long id;

private ContextKey(Type type, long id) {
if(type == null) throw new IllegalStateException("Need a non-null type for a context");
this.type = type;
this.id = id;
}

@Override
public int compareTo(ContextKey them) {
if(them == null) return 1;
final int idCompare = Long.compare(this.id, them.id);
if(idCompare != 0) return idCompare;
final int typeCompare = this.type.compareTo(them.type);
if(typeCompare != 0) return typeCompare;
return 0;
}

@Override
public boolean equals(Object them) {
if(them == null) return false;
if(them instanceof ContextKey) return this.equals((ContextKey)them);
return false;
}

public boolean equals(ContextKey them) {
if(them == null) return false;
return this.id == them.id && this.type.equals(them.type);
}

@Override
public int hashCode() {
int value = (int)(id^(id>>>32));
switch(type) {
case STEP: return value;
case JOB: return ~value;
default: throw new IllegalStateException("Unknown type encountered in switch: " + type);
}
}

public static ContextKey step(long id) { return new ContextKey(Type.STEP, id); }

public static ContextKey job(long id) { return new ContextKey(Type.JOB, id); }
}

public void clear() {
contextsByJobExecutionId.clear();
contextsByStepExecutionId.clear();
public void clear() {
contexts.clear();
}

private static ExecutionContext copy(ExecutionContext original) {
return (ExecutionContext) SerializationUtils.deserialize(SerializationUtils.serialize(original));
}

public ExecutionContext getExecutionContext(StepExecution stepExecution) {
return copy(contextsByStepExecutionId.get(stepExecution.getId()));
return copy(contexts.get(ContextKey.step(stepExecution.getId())));
}

public void updateExecutionContext(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution.getExecutionContext();
if (executionContext != null) {
contextsByStepExecutionId.put(stepExecution.getId(), copy(executionContext));
contexts.put(ContextKey.step(stepExecution.getId()), copy(executionContext));
}
}

public ExecutionContext getExecutionContext(JobExecution jobExecution) {
return copy(contextsByJobExecutionId.get(jobExecution.getId()));
return copy(contexts.get(ContextKey.job(jobExecution.getId())));
}

public void updateExecutionContext(JobExecution jobExecution) {
ExecutionContext executionContext = jobExecution.getExecutionContext();
if (executionContext != null) {
contextsByJobExecutionId.put(jobExecution.getId(), copy(executionContext));
contexts.put(ContextKey.job(jobExecution.getId()), copy(executionContext));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentMap;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
Expand All @@ -36,9 +38,9 @@
*/
public class MapJobExecutionDao implements JobExecutionDao {

private Map<Long, JobExecution> executionsById = new ConcurrentHashMap<Long, JobExecution>();
private final ConcurrentMap<Long, JobExecution> executionsById = new ConcurrentSkipListMap<Long, JobExecution>();

private long currentId = 0;
private final AtomicLong currentId = new AtomicLong(0L);

public void clear() {
executionsById.clear();
Expand All @@ -51,7 +53,7 @@ private static JobExecution copy(JobExecution original) {

public void saveJobExecution(JobExecution jobExecution) {
Assert.isTrue(jobExecution.getId() == null);
Long newId = currentId++;
Long newId = currentId.getAndIncrement();
jobExecution.setId(newId);
jobExecution.incrementVersion();
executionsById.put(newId, copy(jobExecution));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentSkipListSet;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
Expand All @@ -33,9 +34,9 @@
*/
public class MapJobInstanceDao implements JobInstanceDao {

private Collection<JobInstance> jobInstances = new CopyOnWriteArraySet<JobInstance>();
private final Collection<JobInstance> jobInstances = new ConcurrentSkipListSet<JobInstance>();

private long currentId = 0;
private final AtomicLong currentId = new AtomicLong(0L);

public void clear() {
jobInstances.clear();
Expand All @@ -45,7 +46,7 @@ public JobInstance createJobInstance(String jobName, JobParameters jobParameters

Assert.state(getJobInstance(jobName, jobParameters) == null, "JobInstance must not already exist");

JobInstance jobInstance = new JobInstance(currentId++, jobParameters, jobName);
JobInstance jobInstance = new JobInstance(currentId.getAndIncrement(), jobParameters, jobName);
jobInstance.incrementVersion();
jobInstances.add(jobInstance);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.junit.Assert.*;

import org.junit.Test;
import org.junit.Ignore;
import org.junit.internal.runners.JUnit4ClassRunner;
import org.junit.runner.RunWith;
import org.springframework.batch.core.JobExecution;
Expand Down Expand Up @@ -34,7 +35,31 @@ protected StepExecutionDao getStepExecutionDao() {
protected ExecutionContextDao getExecutionContextDao() {
return new MapExecutionContextDao();
}

@Test
public void testSaveBothJobAndStepContextWithSameId() throws Exception {
MapExecutionContextDao tested = new MapExecutionContextDao();
JobExecution jobExecution = new JobExecution(1L);
StepExecution stepExecution = new StepExecution("stepName", jobExecution, 1L);

assertTrue(stepExecution.getId() == jobExecution.getId());

jobExecution.getExecutionContext().put("type", "job");
stepExecution.getExecutionContext().put("type", "step");
assertTrue(!jobExecution.getExecutionContext().get("type").equals(stepExecution.getExecutionContext().get("type")));
assertEquals("job", jobExecution.getExecutionContext().get("type"));
assertEquals("step", stepExecution.getExecutionContext().get("type"));

tested.saveExecutionContext(jobExecution);
tested.saveExecutionContext(stepExecution);

ExecutionContext jobCtx = tested.getExecutionContext(jobExecution);
ExecutionContext stepCtx = tested.getExecutionContext(stepExecution);

assertEquals("job", jobCtx.get("type"));
assertEquals("step", stepCtx.get("type"));
}

@Test
public void testPersistentCopy() throws Exception {
MapExecutionContextDao tested = new MapExecutionContextDao();
Expand All @@ -54,5 +79,18 @@ public void testPersistentCopy() throws Exception {
assertTrue(retrieved.isEmpty());
}

@Ignore("Under discussion for JIRA ticket BATCH-1858")
@Test
public void testNullExecutionContextUpdate() throws Exception {
MapExecutionContextDao tested = new MapExecutionContextDao();
JobExecution jobExecution = new JobExecution((long)1);
assertNotNull(jobExecution.getExecutionContext());
tested.updateExecutionContext(jobExecution);
assertNotNull(tested.getExecutionContext(jobExecution));
jobExecution.setExecutionContext(null);
tested.updateExecutionContext(jobExecution);
//assert???(mapExecutionContextDao.getExecutionContext(jobExecution) == null);
}

}

Loading