diff --git a/gcloud-java-logging/src/main/java/com/google/cloud/logging/SinkInfo.java b/gcloud-java-logging/src/main/java/com/google/cloud/logging/SinkInfo.java
new file mode 100644
index 000000000000..669ba9f58c51
--- /dev/null
+++ b/gcloud-java-logging/src/main/java/com/google/cloud/logging/SinkInfo.java
@@ -0,0 +1,635 @@
+/*
+ * Copyright 2016 Google Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.logging;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.MoreObjects;
+import com.google.logging.v2.LogSink;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Google Cloud Logging sinks can be used to control the export of your logs. Each sink specifies
+ * the export of a set of log entries to a certain destination. A sink consists of a name, unique to
+ * the project, a filter for choosing the log entries to export and a destination for the log
+ * entries.
+ *
+ *
Sink destination can either be a Google Cloud Storage bucket (see
+ * {@link Destination.BucketDestination}, a Google Cloud BigQuery dataset (see
+ * {@link Destination.DatasetDestination}) or a Google Cloud Pub/Sub topic (see
+ * {@link Destination.TopicDestination}).
+ *
+ * @see About
+ * Sinks
+ */
+public class SinkInfo implements Serializable {
+
+ private static final long serialVersionUID = 6652676315712662729L;
+
+ private final String name;
+ private final Destination destination;
+ private final String filter;
+ private final VersionFormat versionFormat;
+
+ public abstract static class Destination implements Serializable {
+
+ private static final long serialVersionUID = 5257964588379880017L;
+
+ private final Type type;
+
+ /**
+ * Type of destination for Google Cloud Logging sink.
+ */
+ public enum Type {
+ /**
+ * Specifies a Google Cloud Storage bucket as destination for the sink.
+ */
+ BUCKET,
+
+ /**
+ * Specifies a Google Cloud BigQuery dataset as destination for the sink.
+ */
+ DATASET,
+
+ /**
+ * Specifies a Google Cloud Pub/Sub topic as destination for the sink.
+ */
+ TOPIC;
+ }
+
+ /**
+ * Class for specifying a Google Cloud Storage bucket as destination for the sink.
+ */
+ public static final class BucketDestination extends Destination {
+
+ private static final long serialVersionUID = -7614931032119779091L;
+ private static final String BASE_NAME = "storage.googleapis.com/";
+ private static final String REGEX = BASE_NAME + "([^/]+)";
+ private static final Pattern PATTERN = Pattern.compile(REGEX);
+
+ private final String bucket;
+
+ BucketDestination(String bucket) {
+ super(Type.BUCKET);
+ this.bucket = checkNotNull(bucket);
+ }
+
+ /**
+ * Returns the name of the Google Cloud Storage bucket this destination represents.
+ */
+ public String bucket() {
+ return bucket;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null || !(obj instanceof BucketDestination)) {
+ return false;
+ }
+ BucketDestination other = (BucketDestination) obj;
+ return baseEquals(other) && bucket.equals(other.bucket);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(baseHashCode(), bucket);
+ }
+
+ static boolean matchesDestination(String destinationPb) {
+ return PATTERN.matcher(destinationPb).matches();
+ }
+
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("bucket", bucket).toString();
+ }
+
+ @Override
+ String toPb(String projectId) {
+ return BASE_NAME + bucket;
+ }
+
+ /**
+ * Creates a {@code BucketDestination} object given the name of the bucket to be used as sink
+ * destination.
+ */
+ public static BucketDestination of(String bucket) {
+ return new BucketDestination(bucket);
+ }
+
+ static BucketDestination fromPb(String destinationPb) {
+ Matcher matcher = PATTERN.matcher(destinationPb);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException(destinationPb + " is not a valid sink destination");
+ }
+ return new BucketDestination(matcher.group(1));
+ }
+ }
+
+ /**
+ * Class for specifying a Google Cloud BigQuery dataset as destination for the sink.
+ */
+ public static final class DatasetDestination extends Destination {
+
+ private static final long serialVersionUID = 6952354643801154411L;
+ private static final String BASE_NAME = "bigquery.googleapis.com/";
+ private static final String REGEX = BASE_NAME + "projects/([^/]+)/datasets/([^/]+)";
+ private static final Pattern PATTERN = Pattern.compile(REGEX);
+
+ private final String project;
+ private final String dataset;
+
+ DatasetDestination(String project, String dataset) {
+ super(Type.DATASET);
+ this.project = project;
+ this.dataset = checkNotNull(dataset);
+ }
+
+ /**
+ * Returns the name of the project where the Google Cloud BigQuery dataset resides. If
+ * {@code null}, the default project is used.
+ */
+ public String project() {
+ return project;
+ }
+
+ /**
+ * Returns the name of the Google Cloud BigQuery dataset this destination represents.
+ */
+ public String dataset() {
+ return dataset;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null || !(obj instanceof DatasetDestination)) {
+ return false;
+ }
+ DatasetDestination other = (DatasetDestination) obj;
+ return baseEquals(other)
+ && Objects.equals(project, other.project)
+ && Objects.equals(dataset, other.dataset);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(baseHashCode(), project, dataset);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("project", project)
+ .add("dataset", dataset)
+ .toString();
+ }
+
+ @Override
+ String toPb(String projectId) {
+ String project = this.project == null ? projectId : this.project;
+ return BASE_NAME + "projects/" + project + "/datasets/" + dataset;
+ }
+
+ /**
+ * Creates a {@code DatasetDestination} object given the name of the project and dataset to be
+ * used as sink destination.
+ */
+ public static DatasetDestination of(String project, String dataset) {
+ return new DatasetDestination(project, dataset);
+ }
+
+ /**
+ * Creates a {@code DatasetDestination} object given the name of the dataset to be used as
+ * sink destination. Dataset is assumed to reside in the default project.
+ */
+ public static DatasetDestination of(String dataset) {
+ return new DatasetDestination(null, dataset);
+ }
+
+ static boolean matchesDestination(String destinationPb) {
+ return PATTERN.matcher(destinationPb).matches();
+ }
+
+ static DatasetDestination fromPb(String destinationPb) {
+ Matcher matcher = PATTERN.matcher(destinationPb);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException(destinationPb + " is not a valid sink destination");
+ }
+ return new DatasetDestination(matcher.group(1), matcher.group(2));
+ }
+ }
+
+ /**
+ * Class for specifying a Google Cloud BigQuery dataset as destination for the sink.
+ */
+ public static final class TopicDestination extends Destination {
+
+ private static final long serialVersionUID = -8252473597084887048L;
+ private static final String BASE_NAME = "pubsub.googleapis.com/";
+ private static final String REGEX = BASE_NAME + "projects/([^/]+)/topics/([^/]+)";
+ private static final Pattern PATTERN = Pattern.compile(REGEX);
+
+ private final String project;
+ private final String topic;
+
+ TopicDestination(String project, String topic) {
+ super(Type.TOPIC);
+ this.project = project;
+ this.topic = checkNotNull(topic);
+ }
+
+ /**
+ * Returns the name of the project where the Google Cloud Pub/Sub topic resides. If
+ * {@code null}, the default project is used.
+ */
+ public String project() {
+ return project;
+ }
+
+ /**
+ * Returns the name of the Google Cloud Pub/Sub topic this destination represents.
+ */
+ public String topic() {
+ return topic;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null || !(obj instanceof TopicDestination)) {
+ return false;
+ }
+ TopicDestination other = (TopicDestination) obj;
+ return baseEquals(other)
+ && Objects.equals(project, other.project)
+ && Objects.equals(topic, other.topic);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(baseHashCode(), project, topic);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("project", project)
+ .add("topic", topic)
+ .toString();
+ }
+
+ @Override
+ String toPb(String projectId) {
+ String project = this.project == null ? projectId : this.project;
+ return BASE_NAME + "projects/" + project + "/topics/" + topic;
+ }
+
+ /**
+ * Creates a {@code TopicDestination} object given the name of the project and topic to be
+ * used as sink destination.
+ */
+ public static TopicDestination of(String project, String topic) {
+ return new TopicDestination(project, topic);
+ }
+
+ /**
+ * Creates a {@code DatasetDestination} object given the name of the topic to be used as
+ * sink destination. Topic is assumed to reside in the default project.
+ */
+ public static TopicDestination of(String topic) {
+ return new TopicDestination(null, topic);
+ }
+
+ static boolean matchesDestination(String destinationPb) {
+ return PATTERN.matcher(destinationPb).matches();
+ }
+
+ static TopicDestination fromPb(String destinationPb) {
+ Matcher matcher = PATTERN.matcher(destinationPb);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException(destinationPb + " is not a valid sink destination");
+ }
+ return new TopicDestination(matcher.group(1), matcher.group(2));
+ }
+ }
+
+ Destination(Type type) {
+ this.type = checkNotNull(type);
+ }
+
+ /**
+ * Returns the type of this destination.
+ */
+ public Type type() {
+ return type;
+ }
+
+ final boolean baseEquals(Destination other) {
+ return type.equals(other.type);
+ }
+
+ final int baseHashCode() {
+ return Objects.hash(type);
+ }
+
+ abstract String toPb(String projectId);
+
+ @SuppressWarnings("unchecked")
+ static T fromPb(String destinationPb) {
+ if (BucketDestination.matchesDestination(destinationPb)) {
+ return (T) BucketDestination.fromPb(destinationPb);
+ } else if (DatasetDestination.matchesDestination(destinationPb)) {
+ return (T) DatasetDestination.fromPb(destinationPb);
+ } else if (TopicDestination.matchesDestination(destinationPb)) {
+ return (T) TopicDestination.fromPb(destinationPb);
+ }
+ throw new IllegalArgumentException(destinationPb + " is not a valid sink destination");
+ }
+ }
+
+ /**
+ * Available log entry formats. Log entries can be written to Cloud Logging in either format and
+ * can be exported in either format. Version 2 is the preferred format.
+ */
+ public enum VersionFormat {
+ V1(LogSink.VersionFormat.V1),
+ V2(LogSink.VersionFormat.V2);
+
+ private LogSink.VersionFormat versionPb;
+
+ VersionFormat(LogSink.VersionFormat versionPb) {
+ this.versionPb = versionPb;
+ }
+
+ LogSink.VersionFormat toPb() {
+ return versionPb;
+ }
+
+ static VersionFormat fromPb(LogSink.VersionFormat versionPb) {
+ switch (versionPb) {
+ case V1:
+ return VersionFormat.V1;
+ case V2:
+ return VersionFormat.V2;
+ case VERSION_FORMAT_UNSPECIFIED:
+ return null;
+ default:
+ throw new IllegalArgumentException(versionPb + " is not a valid version");
+ }
+ }
+ }
+
+ /**
+ * A builder for {@code SinkInfo} objects.
+ */
+ public abstract static class Builder {
+
+ /**
+ * Sets the name of the sink. Example: {@code my-severe-errors-to-pubsub}. Sink identifiers are
+ * limited to 1000 characters and can include only the following characters: {@code A-Z},
+ * {@code a-z}, {@code 0-9}, and the special characters {@code _-.}.
+ */
+ public abstract Builder name(String name);
+
+ /**
+ * Sets the export destination. Use a {@link Destination.BucketDestination} object to create a
+ * sink that exports logs to a Google Cloud Storage bucket. Use a
+ * {@link Destination.DatasetDestination} object to create a sink that exports logs to a Google
+ * Cloud BigQuery dataset. Use a {@link Destination.TopicDestination} object to create a sink
+ * that exports logs to a Google Cloud Pub/Sub topic.
+ *
+ * @see
+ * Exporting Logs
+ */
+ public abstract Builder destination(Destination destination);
+
+ /**
+ * Sets an advanced logs filter. Only log entries matching that filter are exported. The filter
+ * must be consistent with the log entry format specified with
+ * {@link #versionFormat(VersionFormat)}, regardless of the format of the log entry that was
+ * originally written to Google Cloud Logging. Example (V2 format):
+ * {@code logName=projects/my-projectid/logs/syslog AND severity>=ERROR}.
+ *
+ * @see Advanced Log
+ * Filters
+ */
+ public abstract Builder filter(String filter);
+
+ /**
+ * Sets the log entry version to use for this sink's exported log entries. This version does
+ * not have to correspond to the version of the log entry when it was written to Google Cloud
+ * Logging.
+ */
+ public abstract Builder versionFormat(VersionFormat versionFormat);
+
+ /**
+ * Creates a {@code SinkInfo} object for this builder.
+ */
+ public abstract SinkInfo build();
+ }
+
+ static final class BuilderImpl extends Builder {
+
+ private String name;
+ private Destination destination;
+ private String filter;
+ private VersionFormat versionFormat;
+
+ BuilderImpl(String name, Destination destination) {
+ this.name = name;
+ this.destination = destination;
+ }
+
+ BuilderImpl(SinkInfo sink) {
+ this.name = sink.name;
+ this.destination = sink.destination;
+ this.filter = sink.filter;
+ this.versionFormat = sink.versionFormat;
+ }
+
+ @Override
+ public Builder name(String name) {
+ this.name = name;
+ return this;
+ }
+
+ @Override
+ public Builder destination(Destination destination) {
+ this.destination = destination;
+ return this;
+ }
+
+ @Override
+ public Builder filter(String filter) {
+ this.filter = filter;
+ return this;
+ }
+
+ @Override
+ public Builder versionFormat(VersionFormat versionFormat) {
+ this.versionFormat = versionFormat;
+ return this;
+ }
+
+ @Override
+ public SinkInfo build() {
+ return new SinkInfo(this);
+ }
+ }
+
+ SinkInfo(BuilderImpl builder) {
+ this.name = checkNotNull(builder.name);
+ this.destination = checkNotNull(builder.destination);
+ this.filter = builder.filter;
+ this.versionFormat = builder.versionFormat;
+ }
+
+ /**
+ * Returns the name of the sink. Example: {@code my-severe-errors-to-pubsub}. Sink identifiers are
+ * limited to 1000 characters and can include only the following characters: {@code A-Z},
+ * {@code a-z}, {@code 0-9}, and the special characters {@code _-.}.
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Returns the export destination. This method returns a {@link Destination.BucketDestination} for
+ * sinks that export logs to Google Cloud Storage buckets. Returns
+ * {@link Destination.DatasetDestination} for sinks that export logs to Google Cloud BigQuery
+ * datasets. Returns {@link Destination.TopicDestination} for sinks that export logs to Google
+ * Cloud Pub/Sub topics.
+ *
+ * @see
+ * Exporting Logs
+ */
+ @SuppressWarnings("unchecked")
+ public T destination() {
+ return (T) destination;
+ }
+
+ /**
+ * Returns an advanced logs filter. Only log entries matching that filter are exported. The filter
+ * must be consistent with the log entry format specified in {@link #versionFormat()}, regardless
+ * of the format of the log entry that wa originally written to Google Cloud Logging. Example (V2
+ * format): {@code logName=projects/my-projectid/logs/syslog AND severity>=ERROR}.
+ *
+ * @see Advanced Log
+ * Filters
+ */
+ public String filter() {
+ return filter;
+ }
+
+ /**
+ * Returns the log entry version to use for this sink's exported log entries. This version does
+ * not have to correspond to the version of the log entry when it was written to Google Cloud
+ * Logging.
+ */
+ public VersionFormat versionFormat() {
+ return versionFormat;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("name", name)
+ .add("destination", destination)
+ .add("filter", filter)
+ .add("versionFormat", versionFormat)
+ .toString();
+ }
+
+ final boolean baseEquals(SinkInfo sinkInfo) {
+ return Objects.equals(name, sinkInfo.name)
+ && Objects.equals(destination, sinkInfo.destination)
+ && Objects.equals(filter, sinkInfo.filter)
+ && Objects.equals(versionFormat, sinkInfo.versionFormat);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null || !(obj.getClass().equals(SinkInfo.class))) {
+ return false;
+ }
+ return baseEquals((SinkInfo) obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, destination, filter, versionFormat);
+ }
+
+ /**
+ * Returns a builder for this {@code SinkInfo} object.
+ */
+ public Builder toBuilder() {
+ return new BuilderImpl(this);
+ }
+
+ /**
+ * Returns a builder for {@code SinkInfo} objects given the name of the sink and its destination.
+ */
+ public static Builder builder(String name, Destination destination) {
+ return new BuilderImpl(name, destination);
+ }
+
+ /**
+ * Creates a {@code SinkInfo} object given the name of the sink and its destination.
+ */
+ public static SinkInfo of(String name, Destination destination) {
+ return new BuilderImpl(name, destination).build();
+ }
+
+ LogSink toPb(String projectId) {
+ LogSink.Builder builder = LogSink.newBuilder()
+ .setName(name)
+ .setDestination(destination.toPb(projectId))
+ .setOutputVersionFormat(versionFormat == null
+ ? LogSink.VersionFormat.VERSION_FORMAT_UNSPECIFIED : versionFormat.toPb());
+ if (filter != null) {
+ builder.setFilter(filter);
+ }
+ return builder.build();
+ }
+
+ static SinkInfo fromPb(LogSink sinkPb) {
+ Builder builder = builder(sinkPb.getName(),
+ Destination.fromPb(sinkPb.getDestination()))
+ .versionFormat(VersionFormat.fromPb(sinkPb.getOutputVersionFormat()));
+ if (!sinkPb.getFilter().equals("")) {
+ builder.filter(sinkPb.getFilter());
+ }
+ return builder.build();
+ }
+}
diff --git a/gcloud-java-logging/src/test/java/com/google/cloud/logging/SinkInfoTest.java b/gcloud-java-logging/src/test/java/com/google/cloud/logging/SinkInfoTest.java
new file mode 100644
index 000000000000..e22b2259e82f
--- /dev/null
+++ b/gcloud-java-logging/src/test/java/com/google/cloud/logging/SinkInfoTest.java
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2016 Google Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.logging;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.google.cloud.logging.SinkInfo.Destination;
+import com.google.cloud.logging.SinkInfo.Destination.BucketDestination;
+import com.google.cloud.logging.SinkInfo.Destination.DatasetDestination;
+import com.google.cloud.logging.SinkInfo.Destination.TopicDestination;
+import com.google.cloud.logging.SinkInfo.VersionFormat;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class SinkInfoTest {
+
+ private static final String NAME = "name";
+ private static final String FILTER =
+ "logName=projects/my-projectid/logs/syslog AND severity>=ERROR";
+ private static final VersionFormat VERSION = VersionFormat.V1;
+ private static final BucketDestination BUCKET_DESTINATION = BucketDestination.of("bucket");
+ private static final DatasetDestination DATASET_DESTINATION =
+ DatasetDestination.of("project", "dataset");
+ private static final TopicDestination TOPIC_DESTINATION =
+ TopicDestination.of("project", "topic");
+ private static final SinkInfo BUCKET_SINK_INFO = SinkInfo.builder(NAME, BUCKET_DESTINATION)
+ .filter(FILTER)
+ .versionFormat(VERSION)
+ .build();
+ private static final SinkInfo DATASET_SINK_INFO = SinkInfo.builder(NAME, DATASET_DESTINATION)
+ .filter(FILTER)
+ .versionFormat(VERSION)
+ .build();
+ private static final SinkInfo TOPIC_SINK_INFO = SinkInfo.builder(NAME, TOPIC_DESTINATION)
+ .filter(FILTER)
+ .versionFormat(VERSION)
+ .build();
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testOfBucketDestination() {
+ assertEquals(Destination.Type.BUCKET, BUCKET_DESTINATION.type());
+ assertEquals("bucket", BUCKET_DESTINATION.bucket());
+ }
+
+ @Test
+ public void testOfDatasetDestination() {
+ assertEquals(Destination.Type.DATASET, DATASET_DESTINATION.type());
+ assertEquals("project", DATASET_DESTINATION.project());
+ assertEquals("dataset", DATASET_DESTINATION.dataset());
+ DatasetDestination datasetDestination = DatasetDestination.of("dataset");
+ assertNull(datasetDestination.project());
+ assertEquals("dataset", datasetDestination.dataset());
+ }
+
+ @Test
+ public void testOfTopicDestination() {
+ assertEquals(Destination.Type.TOPIC, TOPIC_DESTINATION.type());
+ assertEquals("project", TOPIC_DESTINATION.project());
+ assertEquals("topic", TOPIC_DESTINATION.topic());
+ TopicDestination topicDestination = TopicDestination.of("topic");
+ assertNull(topicDestination.project());
+ assertEquals("topic", topicDestination.topic());
+ }
+
+ @Test
+ public void testToAndFromPbDestination() {
+ BucketDestination bucketDestination = Destination.fromPb(BUCKET_DESTINATION.toPb("other"));
+ assertEquals(Destination.Type.BUCKET, bucketDestination.type());
+ assertEquals("bucket", bucketDestination.bucket());
+ compareBucketDestination(BUCKET_DESTINATION, bucketDestination);
+ DatasetDestination datasetDestination = Destination.fromPb(DATASET_DESTINATION.toPb("other"));
+ assertEquals(Destination.Type.DATASET, datasetDestination.type());
+ assertEquals("project", datasetDestination.project());
+ assertEquals("dataset", datasetDestination.dataset());
+ compareDatasetDestination(DATASET_DESTINATION, datasetDestination);
+ TopicDestination topicDestination = Destination.fromPb(TOPIC_DESTINATION.toPb("other"));
+ assertEquals(Destination.Type.TOPIC, topicDestination.type());
+ assertEquals("project", topicDestination.project());
+ assertEquals("topic", topicDestination.topic());
+ compareTopicDestination(TOPIC_DESTINATION, topicDestination);
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("wrongDestination is not a valid sink destination");
+ Destination.fromPb("wrongDestination");
+ }
+
+ @Test
+ public void testToAndFromPbDestination_NoProjectId() {
+ DatasetDestination datasetDestination =
+ DatasetDestination.fromPb(DatasetDestination.of("dataset").toPb("project"));
+ compareDatasetDestination(DATASET_DESTINATION, datasetDestination);
+ assertEquals("project", datasetDestination.project());
+ TopicDestination topicDestination =
+ TopicDestination.fromPb(TopicDestination.of("topic").toPb("project"));
+ assertEquals("project", topicDestination.project());
+ compareTopicDestination(TOPIC_DESTINATION, topicDestination);
+ }
+
+ @Test
+ public void testBuilder() {
+ assertEquals(NAME, BUCKET_SINK_INFO.name());
+ assertEquals(BUCKET_DESTINATION, BUCKET_SINK_INFO.destination());
+ assertEquals(FILTER, BUCKET_SINK_INFO.filter());
+ assertEquals(VERSION, BUCKET_SINK_INFO.versionFormat());
+ assertEquals(NAME, DATASET_SINK_INFO.name());
+ assertEquals(DATASET_DESTINATION, DATASET_SINK_INFO.destination());
+ assertEquals(FILTER, DATASET_SINK_INFO.filter());
+ assertEquals(VERSION, DATASET_SINK_INFO.versionFormat());
+ assertEquals(NAME, TOPIC_SINK_INFO.name());
+ assertEquals(TOPIC_DESTINATION, TOPIC_SINK_INFO.destination());
+ assertEquals(FILTER, TOPIC_SINK_INFO.filter());
+ assertEquals(VERSION, TOPIC_SINK_INFO.versionFormat());
+ }
+
+ @Test
+ public void testToBuilder() {
+ compareSinkInfo(BUCKET_SINK_INFO, BUCKET_SINK_INFO.toBuilder().build());
+ compareSinkInfo(DATASET_SINK_INFO, DATASET_SINK_INFO.toBuilder().build());
+ compareSinkInfo(TOPIC_SINK_INFO, TOPIC_SINK_INFO.toBuilder().build());
+ SinkInfo updatedSinkInfo = BUCKET_SINK_INFO.toBuilder()
+ .destination(TOPIC_DESTINATION)
+ .name("newName")
+ .filter("logName=projects/my-projectid/logs/syslog")
+ .versionFormat(VersionFormat.V2)
+ .build();
+ assertEquals("newName", updatedSinkInfo.name());
+ assertEquals(TOPIC_DESTINATION, updatedSinkInfo.destination());
+ assertEquals("logName=projects/my-projectid/logs/syslog", updatedSinkInfo.filter());
+ assertEquals(VersionFormat.V2, updatedSinkInfo.versionFormat());
+ updatedSinkInfo = BUCKET_SINK_INFO.toBuilder()
+ .destination(BUCKET_DESTINATION)
+ .name(NAME)
+ .filter(FILTER)
+ .versionFormat(VersionFormat.V1)
+ .build();
+ assertEquals(BUCKET_SINK_INFO, updatedSinkInfo);
+ }
+
+ @Test
+ public void testToAndFromPb() {
+ compareSinkInfo(BUCKET_SINK_INFO, SinkInfo.fromPb(BUCKET_SINK_INFO.toPb("project")));
+ compareSinkInfo(DATASET_SINK_INFO, SinkInfo.fromPb(DATASET_SINK_INFO.toPb("project")));
+ compareSinkInfo(TOPIC_SINK_INFO, SinkInfo.fromPb(TOPIC_SINK_INFO.toPb("project")));
+ SinkInfo sinkInfo = SinkInfo.of("name", BUCKET_DESTINATION);
+ compareSinkInfo(sinkInfo, SinkInfo. fromPb(sinkInfo.toPb("project")));
+ sinkInfo = SinkInfo.of("name", DATASET_DESTINATION);
+ compareSinkInfo(sinkInfo, SinkInfo.fromPb(sinkInfo.toPb("project")));
+ sinkInfo = SinkInfo.of("name", TOPIC_DESTINATION);
+ compareSinkInfo(sinkInfo, SinkInfo.fromPb(sinkInfo.toPb("project")));
+ }
+
+ @Test
+ public void testToAndFromPb_NoProjectId() {
+ DatasetDestination datasetDestination = DatasetDestination.of("dataset");
+ SinkInfo sinkInfo = SinkInfo.of("name", DATASET_DESTINATION);
+ compareSinkInfo(sinkInfo,
+ SinkInfo.fromPb(SinkInfo.of("name", datasetDestination).toPb("project")));
+ TopicDestination topicDestination = TopicDestination.of("topic");
+ sinkInfo = SinkInfo.of("name", TOPIC_DESTINATION);
+ compareSinkInfo(sinkInfo,
+ SinkInfo.fromPb(SinkInfo.of("name", topicDestination).toPb("project")));
+ }
+
+ private void compareBucketDestination(BucketDestination expected, BucketDestination value) {
+ assertEquals(expected, value);
+ assertEquals(expected.bucket(), value.bucket());
+ assertEquals(expected.hashCode(), value.hashCode());
+ assertEquals(expected.toString(), value.toString());
+ }
+
+ private void compareDatasetDestination(DatasetDestination expected, DatasetDestination value) {
+ assertEquals(expected, value);
+ assertEquals(expected.project(), value.project());
+ assertEquals(expected.dataset(), value.dataset());
+ assertEquals(expected.hashCode(), value.hashCode());
+ assertEquals(expected.toString(), value.toString());
+ }
+
+ private void compareTopicDestination(TopicDestination expected, TopicDestination value) {
+ assertEquals(expected, value);
+ assertEquals(expected.project(), value.project());
+ assertEquals(expected.topic(), value.topic());
+ assertEquals(expected.hashCode(), value.hashCode());
+ assertEquals(expected.toString(), value.toString());
+ }
+
+ private void compareSinkInfo(SinkInfo expected, SinkInfo value) {
+ assertEquals(expected, value);
+ assertEquals(expected.name(), value.name());
+ assertEquals(expected.destination(), value.destination());
+ assertEquals(expected.filter(), value.filter());
+ assertEquals(expected.versionFormat(), value.versionFormat());
+ assertEquals(expected.hashCode(), value.hashCode());
+ assertEquals(expected.toString(), value.toString());
+ }
+}