Skip to content

Proposal for BATCH-1674 #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,25 @@
<email>dhgarrette@gmail.com</email>
</developer>
</developers>

<repositories>
<repository>
<id>com.springsource.repository.bundles.release</id>
<name>SpringSource Enterprise Bundle Repository - SpringSource Bundle Releases</name>
<url>http://repository.springsource.com/maven/bundles/release</url>
</repository>
<repository>
<id>foundrylogic.vpp</id>
<name>Repository used to download foundrylogic.vpp</name>
<url>http://objectstyle.org/maven2</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>com.springsource.repository.bundles.release</id>
<name>SpringSource Enterprise Bundle Repository - SpringSource Bundle Releases</name>
<url>http://repository.springsource.com/maven/bundles/release</url>
</pluginRepository>
<pluginRepository>
<id>com.springsource.repository.bundles.milestone</id>
<name> SpringSource Enterprise Bundle Repository - SpringSource Bundle Milestones</name>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright 2006-2007 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.batch.core.partition.support;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.util.FileUtils;
import org.springframework.core.io.Resource;

import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedHashMap;
import java.util.Map;

/**
* Creates a set of partitions for a flat file.
* <p/>
* By default, assumes that each record is stored on one and only
* one line. First computes the number of items and then split them
* in the number of requested partition(s).
* <p/>
* The {@link org.springframework.batch.item.file.MultiThreadedFlatFileItemReader}
* can be used to read the file concurrently, each using a <tt>startAt</tt>
* cursor and a number of items to read as defined by the <tt>itemsCount</tt>
* property.
*
* @author Stephane Nicoll
*/
public class FlatFilePartitioner implements Partitioner {

/**
* The number of items to partition should skip on startup.
*/
public static final String START_AT_KEY = "startAt";

/**
* The number of items to read in the partition.
*/
public static final String ITEMS_COUNT_KEY = "itemsCount";

/**
* The common partition prefix name to use.
*/
public static final String PARTITION_PREFIX = "partition-";

private final Logger logger = LoggerFactory.getLogger(FlatFilePartitioner.class);

private Resource resource;

/**
* Creates a set of {@link ExecutionContext} according to the provided
* <tt>gridSize</tt> if there are enough elements.
* <p/>
* First computes the total number of items to process for the resource
* and then split equality these in each partition. The returned context
* hold the {@link #START_AT_KEY} and {@link #ITEMS_COUNT_KEY} properties
* defining the number of elements to skip and the number of elements to
* read respectively.
*
* @param gridSize the requested size of the grid
* @return the execution contexts
* @see #countItems(org.springframework.core.io.Resource)
*/
public Map<String, ExecutionContext> partition(int gridSize) {
final String partitionNumberFormat = "%0" + String.valueOf(gridSize).length() + "d";
final Map<String, ExecutionContext> result = new LinkedHashMap<String, ExecutionContext>();

checkResource(this.resource);
if (logger.isDebugEnabled()) {
logger.debug("Splitting [" + resource.getDescription() + "]");
}

final long lines = countItems(resource);
if (lines == 0) {
logger.info("Empty input file [" + resource.getDescription() + "] no partition will be created.");
return result;
}

final long linesPerFile = lines / gridSize;

// Check the case that the set is to small for the number of request partition(s)
if (linesPerFile == 0) {
logger.info("Not enough lines (" + lines + ") for the requested gridSize [" + gridSize + "]");
final String partitionName = PARTITION_PREFIX + String.format(partitionNumberFormat, 0);
result.put(partitionName, createExecutionContext(partitionName, 0, lines));
return result;
}

if (logger.isDebugEnabled()) {
logger.debug("Has to split [" + lines + "] line(s) in [" + gridSize + "] " +
"grid(s) (" + linesPerFile + " each)");
}

for (int i = 0; i < gridSize; i++) {
final String partitionName = PARTITION_PREFIX + String.format(partitionNumberFormat, i);

// Start at i * COUNT items (0, COUNT*1, COUNT*2, etc)
final long startAt = i * linesPerFile;
long itemsCount = linesPerFile;
// If this is the last partition, it gets all remaining items
if (i == gridSize - 1) {
itemsCount = lines - ((gridSize - 1) * linesPerFile);
}
result.put(partitionName, createExecutionContext(partitionName, startAt, itemsCount));
}
return result;
}

/**
* Creates a standard {@link ExecutionContext} with the specified parameters.
*
* @param partitionName the name of the partition
* @param startAt the number of items to skip
* @param itemsCount the number of items to read
* @return the execution context
*/
protected ExecutionContext createExecutionContext(String partitionName, long startAt, long itemsCount) {
final ExecutionContext executionContext = new ExecutionContext();
executionContext.putLong(START_AT_KEY, startAt);
executionContext.putLong(ITEMS_COUNT_KEY, itemsCount);
if (logger.isDebugEnabled()) {
logger.debug("Added partition [" + partitionName + "] with [" + executionContext + "]");
}
return executionContext;
}

/**
* Returns the number of elements in the specified {@link Resource}.
*
* @param resource the resource
* @return the number of items contained in the resource
*/
protected long countItems(Resource resource) {
try {
final InputStream in = resource.getInputStream();
try {
return FileUtils.countLines( in );
} finally {
in.close();
}
} catch (IOException e) {
throw new IllegalStateException("Unexpected IO exception while counting items for ["
+ resource.getDescription() + "]", e);
}
}

/**
* Checks whether the specified {@link Resource} is valid.
*
* @param resource the resource to check
* @throws IllegalStateException if the resource is invalid
*/
protected void checkResource(Resource resource) {
if (!resource.exists()) {
throw new IllegalStateException("Input resource must exist: " + resource);
}
if (!resource.isReadable()) {
throw new IllegalStateException("Input resource must be readable: " + resource);
}
}

/**
* Sets the input {@link Resource} to use.
*
* @param resource the resource to partition
*/
public void setResource(Resource resource) {
this.resource = resource;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright 2006-2007 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.batch.item.file;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.util.ExecutionContextUserSupport;
import org.springframework.util.ClassUtils;

/**
* A multi-threaded aware {@link FlatFileItemReader} implementation
* that uses start and end boundaries to delimit the portion of the
* file that should be read.
* <p/>
* Reads all the file by default.
*
* @author Stephane Nicoll
*/
public class MultiThreadedFlatFileItemReader<T> extends FlatFileItemReader<T> {

// Protected field would alleviate copy/paste here
private static final String READ_COUNT = "read.count";

private final Logger logger = LoggerFactory.getLogger(MultiThreadedFlatFileItemReader.class);

private int startAt = 0;
private int itemsCount = Integer.MAX_VALUE;

// Would be better if the base ecSupport was protected somehow.
private final ExecutionContextUserSupport ecSupport;

public MultiThreadedFlatFileItemReader() {
this.ecSupport = new ExecutionContextUserSupport();
setName(ClassUtils.getShortName(MultiThreadedFlatFileItemReader.class));
}


@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
super.open(executionContext);

/*
Since we are dealing with multiple chunk in the same area, let's make
sure we will jump to the right item.

The problem is that the maxItemCount and currentItemCount do not take this
notion into account. Say a chunk starts at item #1000 and must read 100
elements, the currentItemCount should be 1000 at beginning (ok) but the
maxItemCount must be 1100 (and not 100 like it should be)
*/
if (!executionContext.containsKey(ecSupport.getKey(READ_COUNT))) {
// We need to jump and this is a fresh start (nothing in the context)
if (startAt > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Skipping to item [" + startAt + "]");
}
// Make sure to register the maxItemCount properly
final int maxItemCount = startAt + itemsCount;
setMaxItemCount(maxItemCount);
try {
for (int i = 0; i < startAt; i++) {
read();
}
} catch (Exception e) {
throw new ItemStreamException(
"Could not move to stored position on restart", e);
}
if (logger.isDebugEnabled()) {
logger.debug("Ready to read from [" + getCurrentItemCount() + "] to [" + maxItemCount + "]");
}
} else {
// Fresh start on the first item so let's state the max item count is simply the
// itemsCount
setMaxItemCount(itemsCount);
if (logger.isDebugEnabled()) {
logger.debug("Ready to read from [" + getCurrentItemCount() + "] to [" + itemsCount + "]");
}
}
}
}

/**
* Sets the item number at which this instance should start reading. Set
* to 0 by default so that this instance starts at the first item.
*
* @param startAt the number of the item at which this instance should
* start reading
*/
public void setStartAt(int startAt) {
this.startAt = startAt;
}

/**
* Sets the number of items this instance should read.
*
* @param itemsCount the number of items to read
*/
public void setItemsCount(int itemsCount) {
this.itemsCount = itemsCount;
}

@Override
public void setName(String name) {
super.setName(name);
// default constructor of the parent calls this before the instance is
// actually initialized. With a protected ecSupport, this can go away
// altogether.
if (ecSupport != null) {
ecSupport.setName(name);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package org.springframework.batch.item.util;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;

import org.springframework.batch.item.ItemStreamException;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -110,4 +112,32 @@ public static boolean createNewFile(File file) throws IOException {

}

/**
* Returns the number of lines found in the specified stream.
* <p/>
* The caller is responsible to close the stream.
*
* @param in the input stream to use
* @return the number of lines found in the stream
* @throws IOException if an error occurred
*/
public static long countLines(InputStream in) throws IOException {
final InputStream is = new BufferedInputStream(in);
byte[] c = new byte[1024];
long count = 0;
int readChars;
while ((readChars = is.read(c)) != -1) {
for (int i = 0; i < readChars; ++i) {
// We're dealing with the char here, it's \n on Unix and \r\n on Windows
if (c[i] == '\n')
++count;
}
}
// Last line
if (count > 0) {
count++;
}
return count;
}

}