Skip to content

Commit f758d14

Browse files
hpoettkerfmbenhassine
authored andcommitted
Refactor MultiResourceItemWriter
The `MultiResourceItemWriter` now writes at most `itemCountLimitPerResource` items per resource where it previously allowed more items when they were written within the same chunk. Resolves #1722
1 parent 3b0868d commit f758d14

File tree

4 files changed

+150
-137
lines changed

4 files changed

+150
-137
lines changed

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/file/MultiResourceItemWriter.java

+26-20
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2024 the original author or authors.
2+
* Copyright 2006-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,9 +34,6 @@
3434
* {@link #setItemCountLimitPerResource(int)}. Suffix creation can be customized with
3535
* {@link #setResourceSuffixCreator(ResourceSuffixCreator)}.
3636
* <p>
37-
* Note that new resources are created only at chunk boundaries i.e. the number of items
38-
* written into one resource is between the limit set by
39-
* <p>
4037
* This writer will create an output file only when there are items to write, which means
4138
* there would be no empty file created if no items are passed (for example when all items
4239
* are filtered or skipped during the processing phase).
@@ -45,6 +42,7 @@
4542
* @param <T> item type
4643
* @author Robert Kasanicky
4744
* @author Mahmoud Ben Hassine
45+
* @author Henning Pöttker
4846
*/
4947
public class MultiResourceItemWriter<T> extends AbstractItemStreamItemWriter<T> {
5048

@@ -74,22 +72,30 @@ public MultiResourceItemWriter() {
7472

7573
@Override
7674
public void write(Chunk<? extends T> items) throws Exception {
77-
if (!opened) {
78-
File file = setResourceToDelegate();
79-
// create only if write is called
80-
file.createNewFile();
81-
Assert.state(file.canWrite(), "Output resource " + file.getAbsolutePath() + " must be writable");
82-
delegate.open(new ExecutionContext());
83-
opened = true;
84-
}
85-
delegate.write(items);
86-
currentResourceItemCount += items.size();
87-
if (currentResourceItemCount >= itemCountLimitPerResource) {
88-
delegate.close();
89-
resourceIndex++;
90-
currentResourceItemCount = 0;
91-
setResourceToDelegate();
92-
opened = false;
75+
int writtenItems = 0;
76+
while (writtenItems < items.size()) {
77+
if (!opened) {
78+
File file = setResourceToDelegate();
79+
// create only if write is called
80+
file.createNewFile();
81+
Assert.state(file.canWrite(), "Output resource " + file.getAbsolutePath() + " must be writable");
82+
delegate.open(new ExecutionContext());
83+
opened = true;
84+
}
85+
86+
int itemsToWrite = Math.min(itemCountLimitPerResource - currentResourceItemCount,
87+
items.size() - writtenItems);
88+
delegate.write(new Chunk<T>(items.getItems().subList(writtenItems, writtenItems + itemsToWrite)));
89+
currentResourceItemCount += itemsToWrite;
90+
writtenItems += itemsToWrite;
91+
92+
if (currentResourceItemCount >= itemCountLimitPerResource) {
93+
delegate.close();
94+
resourceIndex++;
95+
currentResourceItemCount = 0;
96+
setResourceToDelegate();
97+
opened = false;
98+
}
9399
}
94100
}
95101

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/file/MultiResourceItemWriterFlatFileTests.java

+65-64
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2008-2023 the original author or authors.
2+
* Copyright 2008-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -78,22 +78,22 @@ void testBasicMultiResourceWriteScenario() throws Exception {
7878

7979
tested.write(Chunk.of("1", "2", "3"));
8080

81-
File part1 = new File(file.getAbsolutePath() + suffixCreator.getSuffix(1));
82-
assertTrue(part1.exists());
83-
assertEquals("123", readFile(part1));
81+
assertFileExistsAndContains(1, "12");
82+
assertFileExistsAndContains(2, "3");
8483

8584
tested.write(Chunk.of("4"));
86-
File part2 = new File(file.getAbsolutePath() + suffixCreator.getSuffix(2));
87-
assertTrue(part2.exists());
88-
assertEquals("4", readFile(part2));
85+
86+
assertFileExistsAndContains(2, "34");
8987

9088
tested.write(Chunk.of("5"));
91-
assertEquals("45", readFile(part2));
89+
90+
assertFileExistsAndContains(3, "5");
9291

9392
tested.write(Chunk.of("6", "7", "8", "9"));
94-
File part3 = new File(file.getAbsolutePath() + suffixCreator.getSuffix(3));
95-
assertTrue(part3.exists());
96-
assertEquals("6789", readFile(part3));
93+
94+
assertFileExistsAndContains(3, "56");
95+
assertFileExistsAndContains(4, "78");
96+
assertFileExistsAndContains(5, "9");
9797
}
9898

9999
@Test
@@ -107,7 +107,7 @@ void testUpdateAfterDelegateClose() throws Exception {
107107
assertEquals(1, executionContext.getInt(tested.getExecutionContextKey("resource.index")));
108108
tested.write(Chunk.of("1", "2", "3"));
109109
tested.update(executionContext);
110-
assertEquals(0, executionContext.getInt(tested.getExecutionContextKey("resource.item.count")));
110+
assertEquals(1, executionContext.getInt(tested.getExecutionContextKey("resource.item.count")));
111111
assertEquals(2, executionContext.getInt(tested.getExecutionContextKey("resource.index")));
112112

113113
}
@@ -121,17 +121,22 @@ void testMultiResourceWriteScenarioWithFooter() throws Exception {
121121

122122
tested.write(Chunk.of("1", "2", "3"));
123123

124-
File part1 = new File(file.getAbsolutePath() + suffixCreator.getSuffix(1));
125-
assertTrue(part1.exists());
124+
assertFileExistsAndContains(1, "12f");
125+
assertFileExistsAndContains(2, "3");
126126

127127
tested.write(Chunk.of("4"));
128-
File part2 = new File(file.getAbsolutePath() + suffixCreator.getSuffix(2));
129-
assertTrue(part2.exists());
128+
129+
assertFileExistsAndContains(2, "34f");
130+
131+
tested.write(Chunk.of("5"));
132+
133+
assertFileExistsAndContains(3, "5");
130134

131135
tested.close();
132136

133-
assertEquals("123f", readFile(part1));
134-
assertEquals("4f", readFile(part2));
137+
assertFileExistsAndContains(1, "12f");
138+
assertFileExistsAndContains(2, "34f");
139+
assertFileExistsAndContains(3, "5f");
135140

136141
}
137142

@@ -144,19 +149,18 @@ void testTransactionalMultiResourceWriteScenarioWithFooter() throws Exception {
144149

145150
ResourcelessTransactionManager transactionManager = new ResourcelessTransactionManager();
146151

147-
new TransactionTemplate(transactionManager).execute(new WriterCallback(Chunk.of("1", "2", "3")));
152+
new TransactionTemplate(transactionManager).execute(new WriterCallback(Chunk.of("1", "2")));
148153

149-
File part1 = new File(file.getAbsolutePath() + suffixCreator.getSuffix(1));
150-
assertTrue(part1.exists());
154+
assertFileExistsAndContains(1, "12f");
151155

152-
new TransactionTemplate(transactionManager).execute(new WriterCallback(Chunk.of("4")));
153-
File part2 = new File(file.getAbsolutePath() + suffixCreator.getSuffix(2));
154-
assertTrue(part2.exists());
156+
new TransactionTemplate(transactionManager).execute(new WriterCallback(Chunk.of("3")));
157+
158+
assertFileExistsAndContains(2, "3");
155159

156160
tested.close();
157161

158-
assertEquals("123f", readFile(part1));
159-
assertEquals("4f", readFile(part2));
162+
assertFileExistsAndContains(1, "12f");
163+
assertFileExistsAndContains(2, "3f");
160164

161165
}
162166

@@ -168,27 +172,23 @@ void testRestart() throws Exception {
168172

169173
tested.write(Chunk.of("1", "2", "3"));
170174

171-
File part1 = new File(file.getAbsolutePath() + suffixCreator.getSuffix(1));
172-
assertTrue(part1.exists());
173-
assertEquals("123", readFile(part1));
174-
175-
tested.write(Chunk.of("4"));
176-
File part2 = new File(file.getAbsolutePath() + suffixCreator.getSuffix(2));
177-
assertTrue(part2.exists());
178-
assertEquals("4", readFile(part2));
175+
assertFileExistsAndContains(1, "12");
176+
assertFileExistsAndContains(2, "3");
179177

180178
tested.update(executionContext);
181179
tested.close();
182180

183181
tested.open(executionContext);
184182

185-
tested.write(Chunk.of("5"));
186-
assertEquals("45", readFile(part2));
183+
tested.write(Chunk.of("4"));
187184

188-
tested.write(Chunk.of("6", "7", "8", "9"));
189-
File part3 = new File(file.getAbsolutePath() + suffixCreator.getSuffix(3));
190-
assertTrue(part3.exists());
191-
assertEquals("6789", readFile(part3));
185+
assertFileExistsAndContains(2, "34");
186+
187+
tested.write(Chunk.of("5", "6", "7", "8", "9"));
188+
189+
assertFileExistsAndContains(3, "56");
190+
assertFileExistsAndContains(4, "78");
191+
assertFileExistsAndContains(5, "9");
192192
}
193193

194194
@Test
@@ -201,27 +201,24 @@ void testRestartWithFooter() throws Exception {
201201

202202
tested.write(Chunk.of("1", "2", "3"));
203203

204-
File part1 = new File(file.getAbsolutePath() + suffixCreator.getSuffix(1));
205-
assertTrue(part1.exists());
206-
assertEquals("123f", readFile(part1));
207-
208-
tested.write(Chunk.of("4"));
209-
File part2 = new File(file.getAbsolutePath() + suffixCreator.getSuffix(2));
210-
assertTrue(part2.exists());
211-
assertEquals("4", readFile(part2));
204+
assertFileExistsAndContains(1, "12f");
205+
assertFileExistsAndContains(2, "3");
212206

213207
tested.update(executionContext);
214208
tested.close();
215209

216210
tested.open(executionContext);
217211

218-
tested.write(Chunk.of("5"));
219-
assertEquals("45f", readFile(part2));
212+
tested.write(Chunk.of("4"));
220213

221-
tested.write(Chunk.of("6", "7", "8", "9"));
222-
File part3 = new File(file.getAbsolutePath() + suffixCreator.getSuffix(3));
223-
assertTrue(part3.exists());
224-
assertEquals("6789f", readFile(part3));
214+
assertFileExistsAndContains(2, "34f");
215+
216+
tested.write(Chunk.of("5", "6", "7", "8", "9"));
217+
tested.close();
218+
219+
assertFileExistsAndContains(3, "56f");
220+
assertFileExistsAndContains(4, "78f");
221+
assertFileExistsAndContains(5, "9f");
225222
}
226223

227224
@Test
@@ -233,24 +230,28 @@ void testTransactionalRestartWithFooter() throws Exception {
233230

234231
ResourcelessTransactionManager transactionManager = new ResourcelessTransactionManager();
235232

236-
new TransactionTemplate(transactionManager).execute(new WriterCallback(Chunk.of("1", "2", "3")));
233+
new TransactionTemplate(transactionManager).execute(new WriterCallback(Chunk.of("1", "2")));
237234

238-
File part1 = new File(file.getAbsolutePath() + suffixCreator.getSuffix(1));
239-
assertTrue(part1.exists());
240-
assertEquals("123f", readFile(part1));
235+
assertFileExistsAndContains(1, "12f");
241236

242-
new TransactionTemplate(transactionManager).execute(new WriterCallback(Chunk.of("4")));
243-
File part2 = new File(file.getAbsolutePath() + suffixCreator.getSuffix(2));
244-
assertTrue(part2.exists());
245-
assertEquals("4", readFile(part2));
237+
new TransactionTemplate(transactionManager).execute(new WriterCallback(Chunk.of("3")));
238+
239+
assertFileExistsAndContains(2, "3");
246240

247241
tested.update(executionContext);
248242
tested.close();
249243

250244
tested.open(executionContext);
251245

252-
new TransactionTemplate(transactionManager).execute(new WriterCallback(Chunk.of("5")));
253-
assertEquals("45f", readFile(part2));
246+
new TransactionTemplate(transactionManager).execute(new WriterCallback(Chunk.of("4")));
247+
248+
assertFileExistsAndContains(2, "34f");
249+
}
250+
251+
private void assertFileExistsAndContains(int index, String expected) throws Exception {
252+
File part = new File(this.file.getAbsolutePath() + this.suffixCreator.getSuffix(index));
253+
assertTrue(part.exists());
254+
assertEquals(expected, readFile(part));
254255
}
255256

256257
}

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/file/MultiResourceItemWriterXmlTests.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2009-2022 the original author or authors.
2+
* Copyright 2009-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -108,21 +108,26 @@ void multiResourceWritingWithRestart() throws Exception {
108108
tested.update(executionContext);
109109
tested.close();
110110

111-
assertEquals(xmlDocStart + "<prefix:4/>" + xmlDocEnd, readFile(part2));
112-
assertEquals(xmlDocStart + "<prefix:1/><prefix:2/><prefix:3/>" + xmlDocEnd, readFile(part1));
111+
assertEquals(xmlDocStart + "<prefix:3/><prefix:4/>" + xmlDocEnd, readFile(part2));
112+
assertEquals(xmlDocStart + "<prefix:1/><prefix:2/>" + xmlDocEnd, readFile(part1));
113113

114114
tested.open(executionContext);
115115

116116
tested.write(Chunk.of("5"));
117-
118-
tested.write(Chunk.of("6", "7", "8", "9"));
119117
File part3 = new File(file.getAbsolutePath() + suffixCreator.getSuffix(3));
120118
assertTrue(part3.exists());
121119

120+
tested.write(Chunk.of("6", "7", "8", "9"));
121+
File part4 = new File(file.getAbsolutePath() + suffixCreator.getSuffix(4));
122+
assertTrue(part4.exists());
123+
File part5 = new File(file.getAbsolutePath() + suffixCreator.getSuffix(5));
124+
assertTrue(part5.exists());
125+
122126
tested.close();
123127

124-
assertEquals(xmlDocStart + "<prefix:4/><prefix:5/>" + xmlDocEnd, readFile(part2));
125-
assertEquals(xmlDocStart + "<prefix:6/><prefix:7/><prefix:8/><prefix:9/>" + xmlDocEnd, readFile(part3));
128+
assertEquals(xmlDocStart + "<prefix:5/><prefix:6/>" + xmlDocEnd, readFile(part3));
129+
assertEquals(xmlDocStart + "<prefix:7/><prefix:8/>" + xmlDocEnd, readFile(part4));
130+
assertEquals(xmlDocStart + "<prefix:9/>" + xmlDocEnd, readFile(part5));
126131
}
127132

128133
}

0 commit comments

Comments
 (0)