Skip to content

Commit

Permalink
Some finishing touches
Browse files Browse the repository at this point in the history
Rethrow IOException as UncheckedIOException instead of introducing our
own exception type.

Remove redundant flag endOfStreamReached, it was actually never
inspected, as the call to loadNextChunk was guarded by hasNext flag.

Some kaizen refactoring of InputStreamIteratorTest.
  • Loading branch information
runeflobakk committed Jun 13, 2024
1 parent 82851d7 commit 524aa49
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 85 deletions.
39 changes: 16 additions & 23 deletions src/main/java/no/digipost/io/InputStreamIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,55 +15,56 @@
*/
package no.digipost.io;

import no.digipost.DiggBase;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.NoSuchElementException;

import static java.lang.Math.toIntExact;
import static no.digipost.DiggBase.friendlyName;
import static no.digipost.DiggExceptions.exceptionNameAndMessage;

/**
* InputStreamIterator is an {@link Iterator} reading from an {@link InputStream} in chunks
* where each chunk is returned as the next element in the iterable.
* When the input stream is fully consumed the iterator has no more elements.
*/
public class InputStreamIterator implements Iterator<byte[]> {
private final InputStream inputStream;
private final int chunkSize;
private final int chunkSizeBytes;
private byte[] next;
private Boolean hasNext;
private boolean endOfStreamReached = false;

/**
* @param inputStream The input stream to iterate over
* @param chunkSize DataSize should not be too big since that defeats the purpose of this iterator.
*/
public InputStreamIterator(InputStream inputStream, DataSize chunkSize) {
this.inputStream = inputStream;
this.chunkSize = (int) chunkSize.toBytes();
this(inputStream, toIntExact(chunkSize.toBytes()));
}

public InputStreamIterator(InputStream inputStream, int chunkSizeBytes) {
this.inputStream = inputStream;
this.chunkSize = chunkSizeBytes;
this.chunkSizeBytes = chunkSizeBytes;
}

private byte[] loadNextChunk() {
if (endOfStreamReached) return null;

byte[] chunk = new byte[chunkSize];
byte[] chunk = new byte[chunkSizeBytes];
int bytesRead = 0;
try {
bytesRead = inputStream.read(chunk);
if (bytesRead == -1) {
endOfStreamReached = true;
return null;
}
} catch (IOException e) {
throw new WrappedInputStreamFailed(e, inputStream);
throw new UncheckedIOException(
"Failed reading next chunk of up to " + chunkSizeBytes +
" bytes from " + friendlyName(inputStream.getClass()) +
" because " + exceptionNameAndMessage(e), e);
}

if (bytesRead < chunkSize) {
if (bytesRead < chunkSizeBytes) {
// resize the buffer if less data was read
byte[] smallerBuffer = new byte[bytesRead];
System.arraycopy(chunk, 0, smallerBuffer, 0, bytesRead);
Expand All @@ -74,10 +75,10 @@ private byte[] loadNextChunk() {
}

/**
* If the iterator fails reading from the wrapped InputStream an
* {@link InputStreamIterator.WrappedInputStreamFailed} runtime exception is thrown.
*
* @return true if the iteration has more elements
*
* @throws UncheckedIOException if the wrapped InputStream throws an IOException
*/
@Override
public boolean hasNext() {
Expand All @@ -101,12 +102,4 @@ public byte[] next() {
return result;
}

public static final class WrappedInputStreamFailed extends RuntimeException {
private static final long serialVersionUID = 1L;

private WrappedInputStreamFailed(Throwable cause, InputStream inputStream) {
super("The InputStream " + DiggBase.friendlyName(inputStream.getClass()) +
" read failed. Cause: " + cause.getClass() + ": " + cause.getMessage(), cause);
}
}
}
104 changes: 42 additions & 62 deletions src/test/java/no/digipost/io/InputStreamIteratorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
*/
package no.digipost.io;

import no.digipost.io.InputStreamIterator.WrappedInputStreamFailed;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.BrokenInputStream;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
Expand All @@ -34,87 +35,55 @@
import java.util.zip.ZipOutputStream;

import static java.nio.charset.StandardCharsets.UTF_8;
import static no.digipost.DiggExceptions.runUnchecked;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsStringIgnoringCase;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static uk.co.probablyfine.matchers.Java8Matchers.where;
import static uk.co.probablyfine.matchers.Java8Matchers.whereNot;

class InputStreamIteratorTest {

@Test
void should_read_the_input_stream_fully() throws Exception {
StringBuilder sb = new StringBuilder();
void fully_reads_the_input_stream() throws Exception {
try (ByteArrayInputStream inputStream = new ByteArrayInputStream("Some data".getBytes(UTF_8));) {
String consumedFromIterator = consumeToString(new InputStreamIterator(inputStream, DataSize.bytes(2)), UTF_8);

try (final ByteArrayInputStream inputStream = new ByteArrayInputStream("Some data".getBytes(StandardCharsets.UTF_8));) {
InputStreamIterator iterator = new InputStreamIterator(inputStream, 2);

while (iterator.hasNext()) {
sb.append(new String(iterator.next()));
}
}

assertEquals("Some data", sb.toString());
}

@Test
void should_read_the_input_stream_fully_with_datasize() throws Exception {
StringBuilder sb = new StringBuilder();

try (final ByteArrayInputStream inputStream = new ByteArrayInputStream("Some data".getBytes(StandardCharsets.UTF_8));) {

InputStreamIterator iterator = new InputStreamIterator(inputStream, DataSize.bytes(2));
while (iterator.hasNext()) {
sb.append(new String(iterator.next()));
}
assertThat(consumedFromIterator, is("Some data"));
}

assertEquals("Some data", sb.toString());
}

@Test
void too_big_data_size_will_throw_NegativeArraySizeException() throws Exception {
try (final ByteArrayInputStream inputStream = new ByteArrayInputStream("Some data".getBytes(StandardCharsets.UTF_8))) {
InputStreamIterator iterator = new InputStreamIterator(inputStream, DataSize.MAX);
void cannot_instantiate_with_too_big_chunk_size() throws Exception {
try (ByteArrayInputStream inputStream = new ByteArrayInputStream("Some data".getBytes(UTF_8))) {
Exception thrown = assertThrows(ArithmeticException.class, () -> new InputStreamIterator(inputStream, DataSize.MAX));

assertThrows(NegativeArraySizeException.class, iterator::hasNext);
assertThat(thrown, where(Exception::getMessage, containsStringIgnoringCase("integer overflow")));
}
}

@Test
void should_throw_if_next_is_called_with_no_more_elements() throws Exception {
StringBuilder sb = new StringBuilder();

try (final ByteArrayInputStream inputStream = new ByteArrayInputStream("Some data".getBytes(StandardCharsets.UTF_8));) {

void throws_if_next_is_called_with_no_more_elements() throws Exception {
try (ByteArrayInputStream inputStream = new ByteArrayInputStream("Some data".getBytes(UTF_8));) {
InputStreamIterator iterator = new InputStreamIterator(inputStream, 2);

while (iterator.hasNext()) {
sb.append(new String(iterator.next()));
}
assertThat(consumeToString(iterator, UTF_8), is("Some data"));
assertThat(iterator, whereNot(Iterator::hasNext));

assertThrows(NoSuchElementException.class, iterator::next);
}

assertEquals("Some data", sb.toString());
}

@Test
void should_throw_exception_if_input_stream_fails() throws Exception {
try (final InputStream failingInputStream = new InputStream() {
void throws_exception_if_input_stream_fails() throws Exception {
InputStreamIterator iterator = new InputStreamIterator(new BrokenInputStream(), 3);

@Override
public int read() throws IOException {
throw new IOException("This input stream is broken");
}
}) {
InputStreamIterator iterator = new InputStreamIterator(failingInputStream, 1);

final WrappedInputStreamFailed ex = assertThrows(WrappedInputStreamFailed.class, iterator::next);
assertThat(ex, where(Exception::getMessage, containsString("InputStreamIteratorTest.")));
}
Exception ex = assertThrows(UncheckedIOException.class, iterator::next);

assertThat(ex, where(Exception::getMessage, containsString("BrokenInputStream")));
}

@Test
Expand All @@ -137,22 +106,29 @@ void worksWithInputStreamHavingMultipleEntries() throws IOException {
List<ZipEntryContent> entriesReadInChunks = new ArrayList<>();
try (ZipInputStream zipReader = new ZipInputStream(new ByteArrayInputStream(zipFile))) {
for (ZipEntry nextEntry = zipReader.getNextEntry(); nextEntry != null; nextEntry = zipReader.getNextEntry()) {
ByteArrayOutputStream entryConsumer = new ByteArrayOutputStream();
for (byte[] chunk : (Iterable<byte[]>) () -> new InputStreamIterator(zipReader, DataSize.bytes(2))) {
entryConsumer.write(chunk);
}
entriesReadInChunks.add(ZipEntryContent.read(nextEntry, entryConsumer.toByteArray()));
String content = consumeToString(new InputStreamIterator(zipReader, DataSize.bytes(2)), UTF_8);
entriesReadInChunks.add(new ZipEntryContent(nextEntry, content));
}
}

assertThat(entriesReadInChunks, containsInAnyOrder(file1, file2));
}

private static String consumeToString(InputStreamIterator iterator, Charset charset) {
byte[] bytes = consumeAndFlatten(iterator);
return new String(bytes, charset);
}

private static final class ZipEntryContent {
static ZipEntryContent read(ZipEntry entry, byte[] content) throws IOException {
return read(entry, new ByteArrayInputStream(content));
private static byte[] consumeAndFlatten(InputStreamIterator iterator) {
ByteArrayOutputStream chunkConsumer = new ByteArrayOutputStream();
for (byte[] chunk : (Iterable<byte[]>) () -> iterator) {
runUnchecked(() -> chunkConsumer.write(chunk));
}
return chunkConsumer.toByteArray();
}


private static final class ZipEntryContent {

static ZipEntryContent read(ZipEntry entry, InputStream contentStream) throws IOException {
return new ZipEntryContent(entry.getName(), IOUtils.toString(contentStream, UTF_8));
Expand All @@ -161,6 +137,10 @@ static ZipEntryContent read(ZipEntry entry, InputStream contentStream) throws IO
final String name;
final String content;

ZipEntryContent(ZipEntry entry, String content) {
this(entry.getName(), content);
}

ZipEntryContent(String name, String content) {
this.name = name;
this.content = content;
Expand Down

0 comments on commit 524aa49

Please sign in to comment.