Skip to content

Commit

Permalink
KafkaV2SourceConnector (#38748)
Browse files Browse the repository at this point in the history
* kafkaV2SourceConnector

---------

Co-authored-by: annie-mac <xinlian@microsoft.com>
  • Loading branch information
xinlian12 and annie-mac authored Feb 22, 2024
1 parent 92e2511 commit 30835d9
Show file tree
Hide file tree
Showing 55 changed files with 5,159 additions and 41 deletions.
3 changes: 2 additions & 1 deletion .vscode/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
"sdk/cosmos/azure-cosmos-encryption/**",
"sdk/cosmos/azure-cosmos-spark_3_2-12/**",
"sdk/spring/azure-spring-data-cosmos/**",
"sdk/cosmos/azure-cosmos-kafka-connect/**",
"sdk/core/azure-json/**",
"sdk/deviceupdate/azure-iot-deviceupdate/**",
"sdk/e2e/src/**",
Expand Down Expand Up @@ -716,7 +717,7 @@
"words": [
"Pfast",
"Pdirect",
"Pmulti",
"Pmulti",
"Psplit",
"Pquery",
"Pcfp",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ the main ServiceBusClientBuilder. -->
files="com.azure.cosmos.ClientSideRequestStatistics"/> <!-- Need OperatingSystemMXBean from sun to obtain cpu info -->
<suppress checks="EnforceFinalFields" files="com.azure.spring.cloud.config.AppConfigurationPropertySourceLocator"/>
<suppress checks="ConstantName" files="com.azure.spring.cloud.config.AppConfigurationPropertySourceLocator"/>
<suppress checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck"
files="[/\\]azure-cosmos-kafka-connect[/\\]"/>
<suppress checks="com.azure.tools.checkstyle.checks.ExternalDependencyExposedCheck" files="com.azure.cosmos.kafka.connect.CosmosDBSourceConnector"/>

<!-- Checkstyle suppressions for resource manager package -->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="com.azure.resourcemanager.*"/>
Expand Down
1 change: 1 addition & 0 deletions eng/versioning/external_dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ cosmos_org.scalastyle:scalastyle-maven-plugin;1.0.0
# Cosmos Kafka connector runtime dependencies
cosmos_org.apache.kafka:connect-api;3.6.0
# Cosmos Kafka connector tests only
cosmos_org.apache.kafka:connect-runtime;3.6.0
# Maven Tools for Cosmos Kafka connector only
cosmos_io.confluent:kafka-connect-maven-plugin;0.12.0

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 1.0.0-beta.1 (Unreleased)

#### Features Added
* Added Source connector. See [PR 38748](https://github.com/Azure/azure-sdk-for-java/pull/38748)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
## Configuration Reference:

## Generic Configuration
| Config Property Name | Default | Description |
|:---------------------------------------------|:--------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `kafka.connect.cosmos.accountEndpoint` | None | Cosmos DB Account Endpoint Uri |
| `kafka.connect.cosmos.accountEndpoint` | None | Cosmos DB Account Key |
| `kafka.connect.cosmos.useGatewayMode` | `false` | Flag to indicate whether to use gateway mode. By default it is false. |
| `kafka.connect.cosmos.preferredRegionsList` | `[]` | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. You should use a collocated kafka cluster with your Cosmos DB account and pass the kafka cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet&preserve-view=true). |
| `kafka.connect.cosmos.applicationName` | `""` | Application name. Will be added as the userAgent suffix. |

## Source Connector Configuration
| Config Property Name | Default | Description |
|:----------------------------------------------------------|:-------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `kafka.connect.cosmos.source.database.name` | None | Cosmos DB database name. |
| `kafka.connect.cosmos.source.containers.includeAll` | `false` | Flag to indicate whether reading from all containers. |
| `kafka.connect.cosmos.source.containers.includedList` | `[]` | Containers included. This config will be ignored if kafka.connect.cosmos.source.includeAllContainers is true. |
| `kafka.connect.cosmos.source.containers.topicMap` | `[]` | A comma delimited list of Kafka topics mapped to Cosmos containers. For example: topic1#con1,topic2#con2. By default, container name is used as the name of the kafka topic to publish data to, can use this property to override the default config |
| `kafka.connect.cosmos.source.changeFeed.startFrom` | `Beginning` | ChangeFeed Start from settings (Now, Beginning or a certain point in time (UTC) for example 2020-02-10T14:15:03) - the default value is 'Beginning'. |
| `kafka.connect.cosmos.source.changeFeed.mode` | `LatestVersion` | ChangeFeed mode (LatestVersion or AllVersionsAndDeletes). |
| `kafka.connect.cosmos.source.changeFeed.maxItemCountHint` | `1000` | The maximum number of documents returned in a single change feed request. But the number of items received might be higher than the specified value if multiple items are changed by the same transaction. |
| `kafka.connect.cosmos.source.metadata.poll.delay.ms` | `300000` | Indicates how often to check the metadata changes (including container split/merge, adding/removing/recreated containers). When changes are detected, it will reconfigure the tasks. Default is 5 minutes. |
| `kafka.connect.cosmos.source.metadata.storage.topic` | `_cosmos.metadata.topic` | The name of the topic where the metadata are stored. The metadata topic will be created if it does not already exist, else it will use the pre-created topic. |
| `kafka.connect.cosmos.source.messageKey.enabled` | `true` | Whether to set the kafka record message key. |
| `kafka.connect.cosmos.source.messageKey.field` | `id` | The field to use as the message key. |
21 changes: 18 additions & 3 deletions sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ Licensed under the MIT License.
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jacoco.min.linecoverage>0.01</jacoco.min.linecoverage>
<jacoco.min.branchcoverage>0.02</jacoco.min.branchcoverage>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<shadingPrefix>azure_cosmos_kafka_connect</shadingPrefix>

<!-- CosmosSkip - This is not a module we want/expect external customers to consume. Skip breaking API checks. -->
Expand All @@ -48,7 +46,9 @@ Licensed under the MIT License.
<javaModulesSurefireArgLine>
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.models=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.source=com.fasterxml.jackson.databind,ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.routing=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.apachecommons.lang=ALL-UNNAMED
</javaModulesSurefireArgLine>
</properties>

Expand Down Expand Up @@ -94,6 +94,19 @@ Licensed under the MIT License.
<version>1.10.0</version> <!-- {x-version-update;org.apache.commons:commons-text;external_dependency} -->
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<version>3.6.0</version> <!-- {x-version-update;cosmos_org.apache.kafka:connect-runtime;external_dependency} -->
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>jackson-jaxrs-json-provider</artifactId>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down Expand Up @@ -160,6 +173,7 @@ Licensed under the MIT License.
<version>1.14.8</version> <!-- {x-version-update;testdep_net.bytebuddy:byte-buddy-agent;external_dependency} -->
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down Expand Up @@ -221,6 +235,7 @@ Licensed under the MIT License.
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-${project.version}-jar-with-dependencies</finalName>
<filters>
<filter>
<artifact>*:*:*:*</artifact>
Expand Down
8 changes: 8 additions & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/src/docker/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
connectors/
log.txt

# Exclude all temporary files in resources
!resources/*example
resources/sink.properties
resources/source.properties
resources/standalone.properties
7 changes: 7 additions & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/src/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Build the Cosmos DB Connectors on top of the Kafka Connect image
FROM confluentinc/cp-kafka-connect:7.5.0

# Install datagen connector
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest

COPY connectors/ /etc/kafka-connect/jars
Loading

0 comments on commit 30835d9

Please sign in to comment.