diff --git a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/Gcs.java b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/Gcs.java index 837657760..1a7ea81d5 100644 --- a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/Gcs.java +++ b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/Gcs.java @@ -1,6 +1,5 @@ package com.mozilla.telemetry.ingestion.sink.io; -import com.google.cloud.WriteChannel; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; @@ -10,9 +9,9 @@ import com.mozilla.telemetry.ingestion.sink.transform.PubsubMessageToObjectNode; import com.mozilla.telemetry.ingestion.sink.transform.PubsubMessageToTemplatedString; import com.mozilla.telemetry.ingestion.sink.util.BatchWrite; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UncheckedIOException; -import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.UUID; @@ -31,6 +30,8 @@ public abstract static class Write extends BatchWrite.Batch { @VisibleForTesting final BlobInfo blobInfo; - private final WriteChannel writer; + private final ByteArrayOutputStream content = new ByteArrayOutputStream(); - private Batch(Storage storage, String bucket, String keyPrefix) { + private Batch(String bucket, String keyPrefix) { super(); // save blobInfo for batchCloseHook blobInfo = BlobInfo .newBuilder(BlobId.of(bucket, keyPrefix + UUID.randomUUID().toString() + ".ndjson")) .setContentType("application/json").build(); - writer = storage.writer(blobInfo); } @Override protected CompletableFuture close(Void ignore) { - try { - writer.close(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - BlobInfo blobInfoWithSize = storage.get(blobInfo.getBlobId()); - return batchCloseHook.apply(blobInfoWithSize); + return batchCloseHook.apply(storage.create(blobInfo, content.toByteArray())); } @Override protected void write(byte[] encodedInput) { - try { - writer.write(ByteBuffer.wrap(encodedInput)); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + content.write(encodedInput, 0, encodedInput.length); } @Override diff --git a/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/GcsWriteTest.java b/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/GcsWriteTest.java index 148b8676b..3dca4d095 100644 --- a/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/GcsWriteTest.java +++ b/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/GcsWriteTest.java @@ -7,16 +7,12 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import com.google.cloud.WriteChannel; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import com.google.pubsub.v1.PubsubMessage; import com.mozilla.telemetry.ingestion.sink.transform.PubsubMessageToObjectNode; import com.mozilla.telemetry.ingestion.sink.transform.PubsubMessageToTemplatedString; -import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.CompletableFuture; @@ -37,7 +33,6 @@ public class GcsWriteTest { private Storage storage; private Gcs.Write.Ndjson output; - private WriteChannel writer; private CompletableFuture batchCloseHook(BlobInfo ignore) { return CompletableFuture.completedFuture(null); @@ -47,8 +42,6 @@ private CompletableFuture batchCloseHook(BlobInfo ignore) { @Before public void mockBigQueryResponse() { storage = mock(Storage.class); - writer = mock(WriteChannel.class); - when(storage.writer(any(BlobInfo.class))).thenReturn(writer); output = new Gcs.Write.Ndjson(storage, MAX_BYTES, MAX_MESSAGES, MAX_DELAY, BATCH_KEY_TEMPLATE, PubsubMessageToObjectNode.Raw.of(), this::batchCloseHook); } @@ -94,19 +87,21 @@ public void canLimitBatchByteSize() { assertThat((int) output.batches.get(BATCH_KEY).byteSize, lessThanOrEqualTo(MAX_BYTES)); } - @Test(expected = UncheckedIOException.class) - public void failsOnInsertErrors() throws Throwable { - doThrow(new IOException()).when(writer).close(); - + @Test + public void failsOnInsertErrors() { + final Throwable expect = new RuntimeException("fail"); + doThrow(expect).when(storage).create(any(BlobInfo.class), any(byte[].class)); + Throwable cause = null; try { output.apply(EMPTY_MESSAGE).join(); } catch (CompletionException e) { - throw e.getCause(); + cause = e.getCause(); } + assertEquals(expect, cause); } @Test public void canHandleOversizeMessage() { - output.apply(PubsubMessage.newBuilder().putAttributes("meta", "data").build()); + output.apply(PubsubMessage.newBuilder().putAttributes("meta", "data").build()).join(); } }