Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ configurations:
| cps.topic | String | REQUIRED (No default) | The Pub/Sub topic ID, e.g. "foo" for topic "/projects/bar/topics/foo". |
| cps.project | String | REQUIRED (No default) | The project containing the Pub/Sub topic, e.g. "bar" from above. |
| cps.endpoint | String | "pubsub.googleapis.com:443" | The [Pub/Sub endpoint](https://cloud.google.com/pubsub/docs/reference/service_apis_overview#service_endpoints) to use. |
| cps.useEmulator | Boolean | false | When true, use the Pub/Sub emulator instead of the production service. The emulator endpoint will be determined by the PUBSUB_EMULATOR_HOST environment variable, or fallback to the cps.endpoint configuration. |
| maxBufferSize | Integer | 100 | The maximum number of messages that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
| maxBufferBytes | Long | 10,000,000 | The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
| maxOutstandingRequestBytes | Long | Long.MAX_VALUE | The maximum number of total bytes that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class ConnectorUtils {
public static final String CPS_TOPIC_CONFIG = "cps.topic";
public static final String CPS_ENDPOINT = "cps.endpoint";
public static final String CPS_DEFAULT_ENDPOINT = "pubsub.googleapis.com:443";
public static final String CPS_USE_EMULATOR = "cps.useEmulator";
public static final String PUBSUB_EMULATOR_HOST = "PUBSUB_EMULATOR_HOST";
public static final String CPS_MESSAGE_KEY_ATTRIBUTE = "key";
public static final String CPS_ORDERING_KEY_ATTRIBUTE = "orderingKey";
public static final String GCP_CREDENTIALS_FILE_PATH_CONFIG = "gcp.credentials.file.path";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,13 @@ public ConfigDef config() {
Type.STRING,
ConnectorUtils.CPS_DEFAULT_ENDPOINT,
Importance.LOW,
"The Pub/Sub endpoint to use.");
"The Pub/Sub endpoint to use.")
.define(
ConnectorUtils.CPS_USE_EMULATOR,
Type.BOOLEAN,
false,
Importance.LOW,
"When true, use the Pub/Sub emulator instead of the production service.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class CloudPubSubSinkTask extends SinkTask {
private String cpsProject;
private String cpsTopic;
private String cpsEndpoint;
private boolean useEmulator;
private String messageBodyName;
private long maxBufferSize;
private long maxBufferBytes;
Expand Down Expand Up @@ -118,6 +119,7 @@ public void start(Map<String, String> props) {
cpsProject = validatedProps.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString();
cpsTopic = validatedProps.get(ConnectorUtils.CPS_TOPIC_CONFIG).toString();
cpsEndpoint = validatedProps.get(ConnectorUtils.CPS_ENDPOINT).toString();
useEmulator = (Boolean) validatedProps.get(ConnectorUtils.CPS_USE_EMULATOR);
maxBufferSize = (Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_BUFFER_SIZE_CONFIG);
maxBufferBytes = (Long) validatedProps.get(CloudPubSubSinkConnector.MAX_BUFFER_BYTES_CONFIG);
maxOutstandingRequestBytes =
Expand Down Expand Up @@ -399,7 +401,6 @@ private void createPublisher() {

com.google.cloud.pubsub.v1.Publisher.Builder builder =
com.google.cloud.pubsub.v1.Publisher.newBuilder(fullTopic)
.setCredentialsProvider(gcpCredentialsProvider)
.setBatchingSettings(batchingSettings.build())
.setRetrySettings(
RetrySettings.newBuilder()
Expand All @@ -413,8 +414,24 @@ private void createPublisher() {
.setInitialRpcTimeout(Duration.ofSeconds(10))
.setRpcTimeoutMultiplier(2)
.build())
.setExecutorProvider(FixedExecutorProvider.create(getSystemExecutor()))
.setEndpoint(cpsEndpoint);
.setExecutorProvider(FixedExecutorProvider.create(getSystemExecutor()));

// Configure endpoint, credentials and channel based on whether we're using emulator or production
if (useEmulator) {
// For emulator: use PUBSUB_EMULATOR_HOST env var, fallback to configured cps.endpoint, then default
String emulatorHost = System.getenv(ConnectorUtils.PUBSUB_EMULATOR_HOST);
String endpoint = emulatorHost != null ? emulatorHost : cpsEndpoint;
builder.setCredentialsProvider(com.google.api.gax.core.NoCredentialsProvider.create())
.setChannelProvider(
com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.newBuilder()
.setEndpoint(endpoint)
.setChannelConfigurator(channel -> channel.usePlaintext())
.build());
} else {
// For production: use configured credentials and endpoint
builder.setCredentialsProvider(gcpCredentialsProvider)
.setEndpoint(cpsEndpoint);
}
if (orderingKeySource != OrderingKeySource.NONE) {
builder.setEnableMessageOrdering(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.pubsub.kafka.sink;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
Expand All @@ -38,6 +39,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
Expand Down Expand Up @@ -612,6 +614,42 @@ public void testPublisherShutdownOnStop() throws Exception {
verify(publisher, times(1)).awaitTermination(maxShutdownTimeoutMs, TimeUnit.MILLISECONDS);
}

/** Tests that the emulator configuration is properly defined and parsed. */
@Test
public void testEmulatorConfiguration() {
CloudPubSubSinkConnector connector = new CloudPubSubSinkConnector();
ConfigDef configDef = connector.config();

assertTrue("Emulator configuration should be defined",
configDef.names().contains(ConnectorUtils.CPS_USE_EMULATOR));

Map<String, String> emulatorProps = new HashMap<>();
emulatorProps.put(ConnectorUtils.CPS_TOPIC_CONFIG, CPS_TOPIC);
emulatorProps.put(ConnectorUtils.CPS_PROJECT_CONFIG, CPS_PROJECT);
emulatorProps.put(ConnectorUtils.CPS_USE_EMULATOR, "true");

Map<String, Object> parsedProps = configDef.parse(emulatorProps);
assertTrue("Emulator should be enabled", (Boolean) parsedProps.get(ConnectorUtils.CPS_USE_EMULATOR));
}

@Test
public void testCreatePublisherWithEmulatorEnabled() {
props.put(ConnectorUtils.CPS_USE_EMULATOR, "true");
props.put(ConnectorUtils.CPS_ENDPOINT, "localhost:8085");
CloudPubSubSinkTask task = new CloudPubSubSinkTask(publisher);
task.start(props);
assertEquals(CloudPubSubSinkTask.class, task.getClass());
}

@Test
public void testCreatePublisherWithEmulatorDisabled() {
props.put(ConnectorUtils.CPS_USE_EMULATOR, "false");
props.put(ConnectorUtils.CPS_ENDPOINT, "pubsub.googleapis.com:443");
CloudPubSubSinkTask task = new CloudPubSubSinkTask(publisher);
task.start(props);
assertEquals(CloudPubSubSinkTask.class, task.getClass());
}

/** Get some sample SinkRecords's to use in the tests. */
private List<SinkRecord> getSampleRecords() {
List<SinkRecord> records = new ArrayList<>();
Expand Down
Loading