Skip to content

Commit

Permalink
Limit Gcs.Write to single storage.create api call
Browse files Browse the repository at this point in the history
  • Loading branch information
relud committed Apr 9, 2020
1 parent bb3d8b8 commit e8ffa8c
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.mozilla.telemetry.ingestion.sink.io;

import com.google.cloud.WriteChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
Expand All @@ -10,9 +9,9 @@
import com.mozilla.telemetry.ingestion.sink.transform.PubsubMessageToObjectNode;
import com.mozilla.telemetry.ingestion.sink.transform.PubsubMessageToTemplatedString;
import com.mozilla.telemetry.ingestion.sink.util.BatchWrite;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.UUID;
Expand All @@ -31,6 +30,8 @@ public abstract static class Write extends BatchWrite<PubsubMessage, byte[], Str

public static class Ndjson extends Write {

private static final byte[] newline = "\n".getBytes(StandardCharsets.UTF_8);

private final PubsubMessageToObjectNode encoder;

public Ndjson(Storage storage, long maxBytes, int maxMessages, Duration maxDelay,
Expand All @@ -43,8 +44,7 @@ public Ndjson(Storage storage, long maxBytes, int maxMessages, Duration maxDelay
@Override
protected byte[] encodeInput(PubsubMessage input) {
try {
return ArrayUtils.addAll(Json.asBytes(encoder.apply(input)),
"\n".getBytes(StandardCharsets.UTF_8));
return ArrayUtils.addAll(Json.asBytes(encoder.apply(input)), newline);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down Expand Up @@ -80,7 +80,7 @@ protected Batch getBatch(String gcsPrefix) {
String.format("Gcs prefix must match \"%s\" but got \"%s\" from: %s",
BLOB_ID_PATTERN.pattern(), gcsPrefix, batchKeyTemplate.template));
}
return new Batch(storage, gcsPrefixMatcher.group(BUCKET), gcsPrefixMatcher.group(NAME));
return new Batch(gcsPrefixMatcher.group(BUCKET), gcsPrefixMatcher.group(NAME));
}

@VisibleForTesting
Expand All @@ -89,35 +89,24 @@ class Batch extends BatchWrite<PubsubMessage, byte[], String, Void>.Batch {
@VisibleForTesting
final BlobInfo blobInfo;

private final WriteChannel writer;
private final ByteArrayOutputStream content = new ByteArrayOutputStream();

private Batch(Storage storage, String bucket, String keyPrefix) {
private Batch(String bucket, String keyPrefix) {
super();
// save blobInfo for batchCloseHook
blobInfo = BlobInfo
.newBuilder(BlobId.of(bucket, keyPrefix + UUID.randomUUID().toString() + ".ndjson"))
.setContentType("application/json").build();
writer = storage.writer(blobInfo);
}

@Override
protected CompletableFuture<Void> close(Void ignore) {
try {
writer.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
BlobInfo blobInfoWithSize = storage.get(blobInfo.getBlobId());
return batchCloseHook.apply(blobInfoWithSize);
return batchCloseHook.apply(storage.create(blobInfo, content.toByteArray()));
}

@Override
protected void write(byte[] encodedInput) {
try {
writer.write(ByteBuffer.wrap(encodedInput));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
content.write(encodedInput, 0, encodedInput.length);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,12 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.cloud.WriteChannel;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.pubsub.v1.PubsubMessage;
import com.mozilla.telemetry.ingestion.sink.transform.PubsubMessageToObjectNode;
import com.mozilla.telemetry.ingestion.sink.transform.PubsubMessageToTemplatedString;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
Expand All @@ -37,7 +33,6 @@ public class GcsWriteTest {

private Storage storage;
private Gcs.Write.Ndjson output;
private WriteChannel writer;

private CompletableFuture<Void> batchCloseHook(BlobInfo ignore) {
return CompletableFuture.completedFuture(null);
Expand All @@ -47,8 +42,6 @@ private CompletableFuture<Void> batchCloseHook(BlobInfo ignore) {
@Before
public void mockBigQueryResponse() {
storage = mock(Storage.class);
writer = mock(WriteChannel.class);
when(storage.writer(any(BlobInfo.class))).thenReturn(writer);
output = new Gcs.Write.Ndjson(storage, MAX_BYTES, MAX_MESSAGES, MAX_DELAY, BATCH_KEY_TEMPLATE,
PubsubMessageToObjectNode.Raw.of(), this::batchCloseHook);
}
Expand Down Expand Up @@ -94,19 +87,21 @@ public void canLimitBatchByteSize() {
assertThat((int) output.batches.get(BATCH_KEY).byteSize, lessThanOrEqualTo(MAX_BYTES));
}

@Test(expected = UncheckedIOException.class)
public void failsOnInsertErrors() throws Throwable {
doThrow(new IOException()).when(writer).close();

@Test
public void failsOnInsertErrors() {
final Throwable expect = new RuntimeException("fail");
doThrow(expect).when(storage).create(any(BlobInfo.class), any(byte[].class));
Throwable cause = null;
try {
output.apply(EMPTY_MESSAGE).join();
} catch (CompletionException e) {
throw e.getCause();
cause = e.getCause();
}
assertEquals(expect, cause);
}

@Test
public void canHandleOversizeMessage() {
output.apply(PubsubMessage.newBuilder().putAttributes("meta", "data").build());
output.apply(PubsubMessage.newBuilder().putAttributes("meta", "data").build()).join();
}
}

0 comments on commit e8ffa8c

Please sign in to comment.