Skip to content

Commit 88184ab

Browse files
committed
feat: add support for Pub/Sub emulator configuration
Introduced a new configuration option cps.useEmulator to allow users to switch between the production service and the Pub/Sub emulator. Updated the README to document this feature and modified the relevant classes to handle the emulator endpoint based on the configuration. Added a test to verify the emulator configuration is correctly defined and parsed.
1 parent 7534e69 commit 88184ab

File tree

5 files changed

+68
-4
lines changed

5 files changed

+68
-4
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ configurations:
196196
| cps.topic | String | REQUIRED (No default) | The Pub/Sub topic ID, e.g. "foo" for topic "/projects/bar/topics/foo". |
197197
| cps.project | String | REQUIRED (No default) | The project containing the Pub/Sub topic, e.g. "bar" from above. |
198198
| cps.endpoint | String | "pubsub.googleapis.com:443" | The [Pub/Sub endpoint](https://cloud.google.com/pubsub/docs/reference/service_apis_overview#service_endpoints) to use. |
199+
| cps.useEmulator | Boolean | false | When true, use the Pub/Sub emulator instead of the production service. The emulator endpoint will be determined by the PUBSUB_EMULATOR_HOST environment variable, or fallback to the cps.endpoint configuration. |
199200
| maxBufferSize | Integer | 100 | The maximum number of messages that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
200201
| maxBufferBytes | Long | 10,000,000 | The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
201202
| maxOutstandingRequestBytes | Long | Long.MAX_VALUE | The maximum number of total bytes that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |

src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ public class ConnectorUtils {
2929
public static final String CPS_TOPIC_CONFIG = "cps.topic";
3030
public static final String CPS_ENDPOINT = "cps.endpoint";
3131
public static final String CPS_DEFAULT_ENDPOINT = "pubsub.googleapis.com:443";
32+
public static final String CPS_USE_EMULATOR = "cps.useEmulator";
33+
public static final String PUBSUB_EMULATOR_HOST = "PUBSUB_EMULATOR_HOST";
3234
public static final String CPS_MESSAGE_KEY_ATTRIBUTE = "key";
3335
public static final String CPS_ORDERING_KEY_ATTRIBUTE = "orderingKey";
3436
public static final String GCP_CREDENTIALS_FILE_PATH_CONFIG = "gcp.credentials.file.path";

src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,13 @@ public ConfigDef config() {
273273
Type.STRING,
274274
ConnectorUtils.CPS_DEFAULT_ENDPOINT,
275275
Importance.LOW,
276-
"The Pub/Sub endpoint to use.");
276+
"The Pub/Sub endpoint to use.")
277+
.define(
278+
ConnectorUtils.CPS_USE_EMULATOR,
279+
Type.BOOLEAN,
280+
false,
281+
Importance.LOW,
282+
"When true, use the Pub/Sub emulator instead of the production service.");
277283
}
278284

279285
@Override

src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public class CloudPubSubSinkTask extends SinkTask {
6969
private String cpsProject;
7070
private String cpsTopic;
7171
private String cpsEndpoint;
72+
private boolean useEmulator;
7273
private String messageBodyName;
7374
private long maxBufferSize;
7475
private long maxBufferBytes;
@@ -118,6 +119,7 @@ public void start(Map<String, String> props) {
118119
cpsProject = validatedProps.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString();
119120
cpsTopic = validatedProps.get(ConnectorUtils.CPS_TOPIC_CONFIG).toString();
120121
cpsEndpoint = validatedProps.get(ConnectorUtils.CPS_ENDPOINT).toString();
122+
useEmulator = (Boolean) validatedProps.get(ConnectorUtils.CPS_USE_EMULATOR);
121123
maxBufferSize = (Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_BUFFER_SIZE_CONFIG);
122124
maxBufferBytes = (Long) validatedProps.get(CloudPubSubSinkConnector.MAX_BUFFER_BYTES_CONFIG);
123125
maxOutstandingRequestBytes =
@@ -399,7 +401,6 @@ private void createPublisher() {
399401

400402
com.google.cloud.pubsub.v1.Publisher.Builder builder =
401403
com.google.cloud.pubsub.v1.Publisher.newBuilder(fullTopic)
402-
.setCredentialsProvider(gcpCredentialsProvider)
403404
.setBatchingSettings(batchingSettings.build())
404405
.setRetrySettings(
405406
RetrySettings.newBuilder()
@@ -413,8 +414,24 @@ private void createPublisher() {
413414
.setInitialRpcTimeout(Duration.ofSeconds(10))
414415
.setRpcTimeoutMultiplier(2)
415416
.build())
416-
.setExecutorProvider(FixedExecutorProvider.create(getSystemExecutor()))
417-
.setEndpoint(cpsEndpoint);
417+
.setExecutorProvider(FixedExecutorProvider.create(getSystemExecutor()));
418+
419+
// Configure endpoint, credentials and channel based on whether we're using emulator or production
420+
if (useEmulator) {
421+
// For emulator: use PUBSUB_EMULATOR_HOST env var, fallback to configured cps.endpoint, then default
422+
String emulatorHost = System.getenv(ConnectorUtils.PUBSUB_EMULATOR_HOST);
423+
String endpoint = emulatorHost != null ? emulatorHost : cpsEndpoint;
424+
builder.setCredentialsProvider(com.google.api.gax.core.NoCredentialsProvider.create())
425+
.setChannelProvider(
426+
com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.newBuilder()
427+
.setEndpoint(endpoint)
428+
.setChannelConfigurator(channel -> channel.usePlaintext())
429+
.build());
430+
} else {
431+
// For production: use configured credentials and endpoint
432+
builder.setCredentialsProvider(gcpCredentialsProvider)
433+
.setEndpoint(cpsEndpoint);
434+
}
418435
if (orderingKeySource != OrderingKeySource.NONE) {
419436
builder.setEnableMessageOrdering(true);
420437
}

src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.google.pubsub.kafka.sink;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertTrue;
1920
import static org.mockito.ArgumentMatchers.any;
2021
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
2122
import static org.mockito.Mockito.mock;
@@ -38,6 +39,7 @@
3839
import java.util.concurrent.TimeUnit;
3940
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
4041
import org.apache.kafka.common.TopicPartition;
42+
import org.apache.kafka.common.config.ConfigDef;
4143
import org.apache.kafka.common.record.TimestampType;
4244
import org.apache.kafka.connect.data.Schema;
4345
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -612,6 +614,42 @@ public void testPublisherShutdownOnStop() throws Exception {
612614
verify(publisher, times(1)).awaitTermination(maxShutdownTimeoutMs, TimeUnit.MILLISECONDS);
613615
}
614616

617+
/** Tests that the emulator configuration is properly defined and parsed. */
618+
@Test
619+
public void testEmulatorConfiguration() {
620+
CloudPubSubSinkConnector connector = new CloudPubSubSinkConnector();
621+
ConfigDef configDef = connector.config();
622+
623+
assertTrue("Emulator configuration should be defined",
624+
configDef.names().contains(ConnectorUtils.CPS_USE_EMULATOR));
625+
626+
Map<String, String> emulatorProps = new HashMap<>();
627+
emulatorProps.put(ConnectorUtils.CPS_TOPIC_CONFIG, CPS_TOPIC);
628+
emulatorProps.put(ConnectorUtils.CPS_PROJECT_CONFIG, CPS_PROJECT);
629+
emulatorProps.put(ConnectorUtils.CPS_USE_EMULATOR, "true");
630+
631+
Map<String, Object> parsedProps = configDef.parse(emulatorProps);
632+
assertTrue("Emulator should be enabled", (Boolean) parsedProps.get(ConnectorUtils.CPS_USE_EMULATOR));
633+
}
634+
635+
@Test
636+
public void testCreatePublisherWithEmulatorEnabled() {
637+
props.put(ConnectorUtils.CPS_USE_EMULATOR, "true");
638+
props.put(ConnectorUtils.CPS_ENDPOINT, "localhost:8085");
639+
CloudPubSubSinkTask task = new CloudPubSubSinkTask(publisher);
640+
task.start(props);
641+
assertEquals(CloudPubSubSinkTask.class, task.getClass());
642+
}
643+
644+
@Test
645+
public void testCreatePublisherWithEmulatorDisabled() {
646+
props.put(ConnectorUtils.CPS_USE_EMULATOR, "false");
647+
props.put(ConnectorUtils.CPS_ENDPOINT, "pubsub.googleapis.com:443");
648+
CloudPubSubSinkTask task = new CloudPubSubSinkTask(publisher);
649+
task.start(props);
650+
assertEquals(CloudPubSubSinkTask.class, task.getClass());
651+
}
652+
615653
/** Get some sample SinkRecords's to use in the tests. */
616654
private List<SinkRecord> getSampleRecords() {
617655
List<SinkRecord> records = new ArrayList<>();

0 commit comments

Comments
 (0)