diff --git a/pom.xml b/pom.xml index 3fb0596fd6..59e789df14 100644 --- a/pom.xml +++ b/pom.xml @@ -521,7 +521,25 @@ dhgarrette@gmail.com + + + + com.springsource.repository.bundles.release + SpringSource Enterprise Bundle Repository - SpringSource Bundle Releases + http://repository.springsource.com/maven/bundles/release + + + foundrylogic.vpp + Repository used to download foundrylogic.vpp + http://objectstyle.org/maven2 + + + + com.springsource.repository.bundles.release + SpringSource Enterprise Bundle Repository - SpringSource Bundle Releases + http://repository.springsource.com/maven/bundles/release + com.springsource.repository.bundles.milestone SpringSource Enterprise Bundle Repository - SpringSource Bundle Milestones diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/FlatFilePartitioner.java b/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/FlatFilePartitioner.java new file mode 100644 index 0000000000..40e100508c --- /dev/null +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/FlatFilePartitioner.java @@ -0,0 +1,185 @@ +/* + * Copyright 2006-2007 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.core.partition.support; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.util.FileUtils; +import org.springframework.core.io.Resource; + +import java.io.IOException; +import java.io.InputStream; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Creates a set of partitions for a flat file. + *

+ * By default, assumes that each record is stored on one and only + * one line. First computes the number of items and then split them + * in the number of requested partition(s). + *

+ * The {@link org.springframework.batch.item.file.MultiThreadedFlatFileItemReader} + * can be used to read the file concurrently, each using a startAt + * cursor and a number of items to read as defined by the itemsCount + * property. + * + * @author Stephane Nicoll + */ +public class FlatFilePartitioner implements Partitioner { + + /** + * The number of items to partition should skip on startup. + */ + public static final String START_AT_KEY = "startAt"; + + /** + * The number of items to read in the partition. + */ + public static final String ITEMS_COUNT_KEY = "itemsCount"; + + /** + * The common partition prefix name to use. + */ + public static final String PARTITION_PREFIX = "partition-"; + + private final Logger logger = LoggerFactory.getLogger(FlatFilePartitioner.class); + + private Resource resource; + + /** + * Creates a set of {@link ExecutionContext} according to the provided + * gridSize if there are enough elements. + *

+ * First computes the total number of items to process for the resource + * and then split equality these in each partition. The returned context + * hold the {@link #START_AT_KEY} and {@link #ITEMS_COUNT_KEY} properties + * defining the number of elements to skip and the number of elements to + * read respectively. + * + * @param gridSize the requested size of the grid + * @return the execution contexts + * @see #countItems(org.springframework.core.io.Resource) + */ + public Map partition(int gridSize) { + final String partitionNumberFormat = "%0" + String.valueOf(gridSize).length() + "d"; + final Map result = new LinkedHashMap(); + + checkResource(this.resource); + if (logger.isDebugEnabled()) { + logger.debug("Splitting [" + resource.getDescription() + "]"); + } + + final long lines = countItems(resource); + if (lines == 0) { + logger.info("Empty input file [" + resource.getDescription() + "] no partition will be created."); + return result; + } + + final long linesPerFile = lines / gridSize; + + // Check the case that the set is to small for the number of request partition(s) + if (linesPerFile == 0) { + logger.info("Not enough lines (" + lines + ") for the requested gridSize [" + gridSize + "]"); + final String partitionName = PARTITION_PREFIX + String.format(partitionNumberFormat, 0); + result.put(partitionName, createExecutionContext(partitionName, 0, lines)); + return result; + } + + if (logger.isDebugEnabled()) { + logger.debug("Has to split [" + lines + "] line(s) in [" + gridSize + "] " + + "grid(s) (" + linesPerFile + " each)"); + } + + for (int i = 0; i < gridSize; i++) { + final String partitionName = PARTITION_PREFIX + String.format(partitionNumberFormat, i); + + // Start at i * COUNT items (0, COUNT*1, COUNT*2, etc) + final long startAt = i * linesPerFile; + long itemsCount = linesPerFile; + // If this is the last partition, it gets all remaining items + if (i == gridSize - 1) { + itemsCount = lines - ((gridSize - 1) * linesPerFile); + } + result.put(partitionName, createExecutionContext(partitionName, startAt, itemsCount)); + } + return result; + } + + /** + * Creates a standard {@link ExecutionContext} with the specified parameters. + * + * @param partitionName the name of the partition + * @param startAt the number of items to skip + * @param itemsCount the number of items to read + * @return the execution context + */ + protected ExecutionContext createExecutionContext(String partitionName, long startAt, long itemsCount) { + final ExecutionContext executionContext = new ExecutionContext(); + executionContext.putLong(START_AT_KEY, startAt); + executionContext.putLong(ITEMS_COUNT_KEY, itemsCount); + if (logger.isDebugEnabled()) { + logger.debug("Added partition [" + partitionName + "] with [" + executionContext + "]"); + } + return executionContext; + } + + /** + * Returns the number of elements in the specified {@link Resource}. + * + * @param resource the resource + * @return the number of items contained in the resource + */ + protected long countItems(Resource resource) { + try { + final InputStream in = resource.getInputStream(); + try { + return FileUtils.countLines( in ); + } finally { + in.close(); + } + } catch (IOException e) { + throw new IllegalStateException("Unexpected IO exception while counting items for [" + + resource.getDescription() + "]", e); + } + } + + /** + * Checks whether the specified {@link Resource} is valid. + * + * @param resource the resource to check + * @throws IllegalStateException if the resource is invalid + */ + protected void checkResource(Resource resource) { + if (!resource.exists()) { + throw new IllegalStateException("Input resource must exist: " + resource); + } + if (!resource.isReadable()) { + throw new IllegalStateException("Input resource must be readable: " + resource); + } + } + + /** + * Sets the input {@link Resource} to use. + * + * @param resource the resource to partition + */ + public void setResource(Resource resource) { + this.resource = resource; + } +} diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/file/MultiThreadedFlatFileItemReader.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/file/MultiThreadedFlatFileItemReader.java new file mode 100644 index 0000000000..5e86a398f2 --- /dev/null +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/file/MultiThreadedFlatFileItemReader.java @@ -0,0 +1,129 @@ +/* + * Copyright 2006-2007 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.item.file; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.ItemStreamException; +import org.springframework.batch.item.util.ExecutionContextUserSupport; +import org.springframework.util.ClassUtils; + +/** + * A multi-threaded aware {@link FlatFileItemReader} implementation + * that uses start and end boundaries to delimit the portion of the + * file that should be read. + *

+ * Reads all the file by default. + * + * @author Stephane Nicoll + */ +public class MultiThreadedFlatFileItemReader extends FlatFileItemReader { + + // Protected field would alleviate copy/paste here + private static final String READ_COUNT = "read.count"; + + private final Logger logger = LoggerFactory.getLogger(MultiThreadedFlatFileItemReader.class); + + private int startAt = 0; + private int itemsCount = Integer.MAX_VALUE; + + // Would be better if the base ecSupport was protected somehow. + private final ExecutionContextUserSupport ecSupport; + + public MultiThreadedFlatFileItemReader() { + this.ecSupport = new ExecutionContextUserSupport(); + setName(ClassUtils.getShortName(MultiThreadedFlatFileItemReader.class)); + } + + + @Override + public void open(ExecutionContext executionContext) throws ItemStreamException { + super.open(executionContext); + + /* + Since we are dealing with multiple chunk in the same area, let's make + sure we will jump to the right item. + + The problem is that the maxItemCount and currentItemCount do not take this + notion into account. Say a chunk starts at item #1000 and must read 100 + elements, the currentItemCount should be 1000 at beginning (ok) but the + maxItemCount must be 1100 (and not 100 like it should be) + */ + if (!executionContext.containsKey(ecSupport.getKey(READ_COUNT))) { + // We need to jump and this is a fresh start (nothing in the context) + if (startAt > 0) { + if (logger.isDebugEnabled()) { + logger.debug("Skipping to item [" + startAt + "]"); + } + // Make sure to register the maxItemCount properly + final int maxItemCount = startAt + itemsCount; + setMaxItemCount(maxItemCount); + try { + for (int i = 0; i < startAt; i++) { + read(); + } + } catch (Exception e) { + throw new ItemStreamException( + "Could not move to stored position on restart", e); + } + if (logger.isDebugEnabled()) { + logger.debug("Ready to read from [" + getCurrentItemCount() + "] to [" + maxItemCount + "]"); + } + } else { + // Fresh start on the first item so let's state the max item count is simply the + // itemsCount + setMaxItemCount(itemsCount); + if (logger.isDebugEnabled()) { + logger.debug("Ready to read from [" + getCurrentItemCount() + "] to [" + itemsCount + "]"); + } + } + } + } + + /** + * Sets the item number at which this instance should start reading. Set + * to 0 by default so that this instance starts at the first item. + * + * @param startAt the number of the item at which this instance should + * start reading + */ + public void setStartAt(int startAt) { + this.startAt = startAt; + } + + /** + * Sets the number of items this instance should read. + * + * @param itemsCount the number of items to read + */ + public void setItemsCount(int itemsCount) { + this.itemsCount = itemsCount; + } + + @Override + public void setName(String name) { + super.setName(name); + // default constructor of the parent calls this before the instance is + // actually initialized. With a protected ecSupport, this can go away + // altogether. + if (ecSupport != null) { + ecSupport.setName(name); + } + } +} + diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/util/FileUtils.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/util/FileUtils.java index 8b5df89701..b19cfba573 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/util/FileUtils.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/util/FileUtils.java @@ -16,8 +16,10 @@ package org.springframework.batch.item.util; +import java.io.BufferedInputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; import org.springframework.batch.item.ItemStreamException; import org.springframework.util.Assert; @@ -110,4 +112,32 @@ public static boolean createNewFile(File file) throws IOException { } + /** + * Returns the number of lines found in the specified stream. + *

+ * The caller is responsible to close the stream. + * + * @param in the input stream to use + * @return the number of lines found in the stream + * @throws IOException if an error occurred + */ + public static long countLines(InputStream in) throws IOException { + final InputStream is = new BufferedInputStream(in); + byte[] c = new byte[1024]; + long count = 0; + int readChars; + while ((readChars = is.read(c)) != -1) { + for (int i = 0; i < readChars; ++i) { + // We're dealing with the char here, it's \n on Unix and \r\n on Windows + if (c[i] == '\n') + ++count; + } + } + // Last line + if (count > 0) { + count++; + } + return count; + } + }