Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request]: Teach Azure Filesystem to authenticate using DefaultAzureCredential in the Python SDK #24210

Closed
creste opened this issue Nov 16, 2022 · 4 comments · Fixed by #24212

Comments

@creste
Copy link
Contributor

creste commented Nov 16, 2022

Problem

Currently, the Azure Filesystem for the Python SDK only supports authenticating using the AZURE_STORAGE_CONNECTION_STRING environment variable. That approach has several limitations:

  • The AZURE_STORAGE_CONNECTION_STRING environment variable must be defined on all systems where the pipeline executes. This is difficult to configure when using Beam worker-pool sidecar containers with the FlinkRunner because Flink may be running in session mode with different Beam pipelines needing different connection strings.
  • The call to BlobServiceClient.from_connection_string() does not support all of the authentication methods supported by DefaultAzureCredential. For my use case in particular, it does not support Managed Identity credentials.

Solution

I plan to address the above limitations in a PR by adding new Azure-specific pipeline options described below.

--azure_connection_string

Specifies the Azure Storage Connection String.

Can be used instead of the AZURE_STORAGE_CONNECTION_STRING environment variable or the new --blob_service_endpoint pipeline option described below.

Example:

python -m apache_beam.examples.wordcount \
  --input azfs://devstoreaccount1/container/* \
  --output azfs://devstoreaccount1/container/py-wordcount-integration \
  --azure_connection_string "DefaultEndpointsProtocol=https;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=https://azurite:10000/devstoreaccount1;"

--blob_service_endpoint

Specifies the Azure Blob Storage Account Endpoint URL.

Can be used instead of the AZURE_STORAGE_CONNECTION_STRING environment variable or the new --azure_connection_string pipeline option described above.

This pipeline option uses DefaultAzureCredential() to authenticate.

Example:

python -m apache_beam.examples.wordcount \
  --input azfs://devstoreaccount1/container/* \
  --output azfs://devstoreaccount1/container/py-wordcount-integration \
  --blob_service_endpoint https://mystorageaccount.blob.core.windows.net/

--azure_managed_identity_client_id

Specifies the Managed Identity Client ID. Can only be used with --blob_service_endpoint.

This pipeline option uses DefaultAzureCredential(managed_identity_client_id=client_id) to authenticate.

Example:

python -m apache_beam.examples.wordcount \
  --input azfs://devstoreaccount1/container/* \
  --output azfs://devstoreaccount1/container/py-wordcount-integration \
  --blob_service_endpoint https://devstoreaccount1.blob.core.windows.net/ \
  --azure_managed_identity_client_id ca6cc1a3-4b82-48bd-97ca-8e799c0abff6

Testing

Per #20511, the Azure Filesystem does not have integration tests against Azure or Azurite. I plan to add integration tests for the new pipeline options to run against Azurite, similar to how HDFS does its integration tests.

Issue Priority

Priority: 2

Issue Component

Component: io-py-ideas

@creste
Copy link
Contributor Author

creste commented Nov 16, 2022

.take-issue

@Abacn
Copy link
Contributor

Abacn commented Nov 16, 2022

Thanks @creste for raising this! It is an important issue. There is a --azureConnectionString, --blobServiceEndpoint option in our Java SDK and some other options available in Java but not in Python. Ideally we would like to have the similar naming and specifications. Could we use --azure_connection_string, --blob_service_endpoint; for --azure_managed_identity_client_id I have not find a relevant one in our Java SDK but we have a --azureCredentialsProvider which includes client_id value.

@creste
Copy link
Contributor Author

creste commented Nov 16, 2022

Thank you for pointing out those flags, @Abacn ! I renamed the flags to --azure_connection_string and --blob_service_endpoint.

I can try changing --azure_managed_identity_client_id to do something similar to --azureCredentialsProvider. However, I don't have a good way to test all of those authentication methods using Azurite or Azure. How do you think I should proceed?

@Abacn
Copy link
Contributor

Abacn commented Nov 16, 2022

Thank you for pointing out those flags, @Abacn ! I renamed the flags to --azure_connection_string and --blob_service_endpoint.

I can try changing --azure_managed_identity_client_id to do something similar to --azureCredentialsProvider. However, I don't have a good way to test all of those authentication methods using Azurite or Azure. How do you think I should proceed?

That's fine. We can support only managed identity client for now and document it.

@github-actions github-actions bot added this to the 2.44.0 Release milestone Nov 22, 2022
Miuler added a commit to Miuler/beam that referenced this issue Nov 24, 2022
Refs: apache#23604

diff --git a/.gitignore b/.gitignore
index f5fc63f9de..7dd6ee80d6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -135,4 +135,3 @@ playground/frontend/playground_components/pubspec.lock
 **/*.tfvars

 # Ignore Katas auto-generated files
-**/*-remote-info.yaml
\ No newline at end of file
diff --git a/CHANGES.md b/CHANGES.md
index db9f37eeec..a5a1cfac94 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -62,6 +62,7 @@
 * S3 implementation of the Beam filesystem (Go) ([apache#23991](apache#23991)).
 * Support for SingleStoreDB source and sink added (Java) ([apache#22617](apache#22617)).
 * Added support for DefaultAzureCredential authentication in Azure Filesystem (Python) ([apache#24210](apache#24210)).
+* Support for read from Cosmos DB Core SQL API [apache#23610](apache#23610)

 ## New Features / Improvements

diff --git a/sdks/java/io/azure-cosmosdb/README.md b/sdks/java/io/azure-cosmosdb/README.md
new file mode 100644
index 0000000000..f7c61c73ab
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/README.md
@@ -0,0 +1,40 @@
+# Cosmos DB Core SQL API
+
+Compile all module azure-cosmosdb
+
+```shell
+gradle sdks:java:io:azure-cosmosdb:build
+```
+
+## Test
+
+Run TEST for this module (Cosmos DB Core SQL API):
+
+```shell
+gradle sdks:java:io:azure-cosmosdb:test
+```
+
+
+## Publish in Maven Local
+
+Publish this module
+
+```shell
+# apache beam core
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/core/  publishToMavenLocal
+
+# apache beam azure-cosmosdb
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/io/azure-cosmosdb/  publishToMavenLocal
+```
+
+Publish all modules of apache beam
+
+```shell
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/  publishToMavenLocal
+
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p runners/  publishToMavenLocal
+
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p model/ publishToMavenLocal
+```
+
+
diff --git a/sdks/java/io/azure-cosmosdb/build.gradle b/sdks/java/io/azure-cosmosdb/build.gradle
new file mode 100644
index 0000000000..3875532652
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/build.gradle
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins {
+  id("org.apache.beam.module")
+  id("scala")
+}
+
+ext {
+  junitVersion = "5.9.1"
+  cosmosVersion = "4.37.1"
+  cosmosContainerVersion = "1.17.5"
+  bsonMongoVersion = "4.7.2"
+  log4jVersion = "2.19.0"
+}
+
+applyJavaNature(automaticModuleName: "org.apache.beam.sdk.io.azure.cosmosdb")
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Azure Cosmos DB"
+ext.summary = "IO library to read and write Azure Cosmos DB services from Beam."
+
+dependencies {
+  implementation("org.scala-lang:scala-library:2.12.17")
+  implementation("com.azure:azure-cosmos:${cosmosVersion}")
+  implementation library.java.commons_io
+  permitUnusedDeclared library.java.commons_io // BEAM-11761
+  implementation library.java.slf4j_api
+  implementation project(path: ":sdks:java:core", configuration: "shadow")
+  implementation("org.mongodb:bson:${bsonMongoVersion}")
+}
+
+// TEST
+dependencies {
+  testImplementation("org.testcontainers:azure:${cosmosContainerVersion}")
+  testImplementation("com.outr:scribe_2.12:3.10.4")
+  testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
+  testImplementation library.java.mockito_core
+  testImplementation("org.junit.vintage:junit-vintage-engine:${junitVersion}")
+  testRuntimeOnly("org.apache.logging.log4j:log4j-api:$log4jVersion")
+  testRuntimeOnly("org.apache.logging.log4j:log4j-core:$log4jVersion")
+  testRuntimeOnly library.java.slf4j_jdk14
+  testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala
new file mode 100644
index 0000000000..f25b903068
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala
@@ -0,0 +1,50 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import com.azure.cosmos.models.CosmosQueryRequestOptions
+import com.azure.cosmos.{CosmosClient, CosmosClientBuilder}
+import org.apache.beam.sdk.io.BoundedSource
+import org.bson.Document
+import org.slf4j.LoggerFactory
+
+
+private class CosmosBoundedReader(cosmosSource: CosmosBoundedSource) extends BoundedSource.BoundedReader[Document] {
+  private val log = LoggerFactory.getLogger(getClass)
+  private var maybeClient: Option[CosmosClient] = None
+  private var maybeIterator: Option[java.util.Iterator[Document]] = None
+
+  override def start(): Boolean = {
+    maybeClient = Some(
+      new CosmosClientBuilder()
+        .gatewayMode
+        .endpointDiscoveryEnabled(false)
+        .endpoint(cosmosSource.readCosmos.endpoint)
+        .key(cosmosSource.readCosmos.key)
+        .buildClient
+    )
+
+    maybeIterator = maybeClient.map { client =>
+      log.info("Get the container name")
+
+      log.info(s"Get the iterator of the query in container ${cosmosSource.readCosmos.container}")
+      client
+        .getDatabase(cosmosSource.readCosmos.database)
+        .getContainer(cosmosSource.readCosmos.container)
+        .queryItems(cosmosSource.readCosmos.query, new CosmosQueryRequestOptions(), classOf[Document])
+        .iterator()
+    }
+
+    true
+  }
+
+  override def advance(): Boolean = maybeIterator.exists(_.hasNext)
+
+  override def getCurrent: Document = maybeIterator
+    .filter(_.hasNext)
+    //.map(iterator => new Document(iterator.next()))
+    .map(_.next())
+    .orNull
+
+  override def getCurrentSource: CosmosBoundedSource = cosmosSource
+
+  override def close(): Unit = maybeClient.foreach(_.close())
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala
new file mode 100644
index 0000000000..39fafc9039
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala
@@ -0,0 +1,25 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import org.apache.beam.sdk.coders.{Coder, SerializableCoder}
+import org.apache.beam.sdk.io.BoundedSource
+import org.apache.beam.sdk.options.PipelineOptions
+import org.bson.Document
+
+import java.util
+import java.util.Collections
+
+class CosmosBoundedSource(val readCosmos: CosmosRead) extends BoundedSource[Document] {
+
+  /** @inheritdoc
+   * TODO: You have to find a better way, maybe by partition key */
+  override def split(desiredBundleSizeBytes: Long, options: PipelineOptions): util.List[CosmosBoundedSource] = Collections.singletonList(this)
+
+  /** @inheritdoc
+   * The Cosmos DB Coro (SQL) API not support this metrics by the querys */
+  override def getEstimatedSizeBytes(options: PipelineOptions): Long = 0L
+
+  override def getOutputCoder: Coder[Document] = SerializableCoder.of(classOf[Document])
+
+  override def createReader(options: PipelineOptions): BoundedSource.BoundedReader[Document] =
+    new CosmosBoundedReader(this)
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala
new file mode 100644
index 0000000000..fcd3a7abfc
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala
@@ -0,0 +1,8 @@
+package org.apache.beam.sdk.io.azure.cosmos;;
+
+object CosmosIO {
+  def read(): CosmosRead = {
+    CosmosRead()
+  }
+}
+
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala
new file mode 100644
index 0000000000..48a0ece0d4
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala
@@ -0,0 +1,45 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import org.apache.beam.sdk.io.Read
+import org.apache.beam.sdk.transforms.PTransform
+import org.apache.beam.sdk.values.{PBegin, PCollection}
+import org.bson.Document
+import org.slf4j.LoggerFactory
+
+case class CosmosRead(private[cosmos] val endpoint: String = null,
+                      private[cosmos] val key: String = null,
+                      private[cosmos] val database: String = null,
+                      private[cosmos] val container: String = null,
+                      private[cosmos] val query: String = null)
+  extends PTransform[PBegin, PCollection[Document]] {
+
+
+  private val log = LoggerFactory.getLogger(classOf[CosmosRead])
+
+  /** Create new ReadCosmos based into previous ReadCosmos, modifying the endpoint */
+  def withCosmosEndpoint(endpoint: String): CosmosRead = this.copy(endpoint = endpoint)
+
+  def withCosmosKey(key: String): CosmosRead = this.copy(key = key)
+
+  def withDatabase(database: String): CosmosRead = this.copy(database = database)
+
+  def withQuery(query: String): CosmosRead = this.copy(query = query)
+
+  def withContainer(container: String): CosmosRead = this.copy(container = container)
+
+  override def expand(input: PBegin): PCollection[Document] = {
+    log.debug(s"Read CosmosDB with endpoint: $endpoint and query: $query")
+    validate()
+
+    // input.getPipeline.apply(Read.from(new CosmosSource(this)))
+    input.apply(Read.from(new CosmosBoundedSource(this)))
+  }
+
+  private def validate(): Unit = {
+    require(endpoint != null, "CosmosDB endpoint is required")
+    require(key != null, "CosmosDB key is required")
+    require(database != null, "CosmosDB database is required")
+    require(container != null, "CosmosDB container is required")
+    require(query != null, "CosmosDB query is required")
+  }
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties b/sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..28f1bb63f7
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties
@@ -0,0 +1,8 @@
+# Root logger option
+log4j.rootLogger=DEBUG, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+#log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
diff --git a/sdks/java/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala b/sdks/java/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala
new file mode 100644
index 0000000000..7de8dda145
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala
@@ -0,0 +1,103 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import com.azure.cosmos.CosmosClientBuilder
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.azure.cosmos.CosmosIOTest.{CONTAINER, DATABASE, cosmosDBEmulatorContainer}
+import org.apache.beam.sdk.testing.PAssert
+import org.apache.beam.sdk.transforms.Count
+import org.apache.beam.sdk.values.PCollection
+import org.bson.Document
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.LoggerFactory
+import org.testcontainers.containers.CosmosDBEmulatorContainer
+import org.testcontainers.utility.DockerImageName
+
+import java.nio.file.Files
+import scala.util.Using
+
+@RunWith(classOf[JUnit4])
+class CosmosIOTest {
+  private val log = LoggerFactory.getLogger("CosmosIOTest")
+  //  @(Rule @Getter)
+  //  val pipelineWrite: TestPipeline = TestPipeline.create
+  //  @(Rule @Getter)
+  //  val pipelineRead: TestPipeline = TestPipeline.create
+
+  @test
+  def readFromCosmosCoreSqlApi(): Unit = {
+    val read = CosmosIO
+      .read()
+      .withCosmosEndpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint)
+      .withCosmosKey(cosmosDBEmulatorContainer.getEmulatorKey)
+      .withQuery(s"SELECT * FROM c")
+      .withContainer(CONTAINER)
+      .withDatabase(DATABASE)
+
+    val pipeline = Pipeline.create()
+    val count: PCollection[java.lang.Long] = pipeline
+      .apply(read)
+      .apply(Count.globally())
+
+    PAssert.thatSingleton(count).isEqualTo(10)
+
+    pipeline.run().waitUntilFinish()
+  }
+}
+
+/** Initialization of static fields and methods */
+@RunWith(classOf[JUnit4])
+object CosmosIOTest {
+  private val log = LoggerFactory.getLogger("CosmosIOTest[Obj]")
+  private val DOCKER_NAME = "mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:latest"
+  private val cosmosDBEmulatorContainer = new CosmosDBEmulatorContainer(DockerImageName.parse(DOCKER_NAME))
+  private val DATABASE = "test"
+  private val CONTAINER = "test"
+
+  @BeforeClass
+  def setup(): Unit = {
+    log.info("Starting CosmosDB emulator")
+    cosmosDBEmulatorContainer.start()
+
+    val tempFolder = new TemporaryFolder
+    tempFolder.create()
+    val keyStoreFile = tempFolder.newFile("azure-cosmos-emulator.keystore").toPath
+    val keyStore = cosmosDBEmulatorContainer.buildNewKeyStore
+    keyStore.store(Files.newOutputStream(keyStoreFile.toFile.toPath), cosmosDBEmulatorContainer.getEmulatorKey.toCharArray)
+    System.setProperty("javax.net.ssl.trustStore", keyStoreFile.toString)
+    System.setProperty("javax.net.ssl.trustStorePassword", cosmosDBEmulatorContainer.getEmulatorKey)
+    System.setProperty("javax.net.ssl.trustStoreType", "PKCS12")
+
+
+    log.info("Creando la data -------------------------------------------------------->")
+    val triedCreateData = Using(new CosmosClientBuilder()
+      .gatewayMode
+      .endpointDiscoveryEnabled(false)
+      .endpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint)
+      .key(cosmosDBEmulatorContainer.getEmulatorKey)
+      .buildClient) { client =>
+
+      client.createDatabase(DATABASE)
+      val db = client.getDatabase(DATABASE)
+      db.createContainer(CONTAINER, "/id")
+      val container = db.getContainer(CONTAINER)
+      for (i <- 1 to 10) {
+        container.createItem(new Document("id", i.toString))
+      }
+    }
+    if (triedCreateData.isFailure) {
+      val throwable = triedCreateData.failed.get
+      log.error("Error creando la data", throwable)
+      throw throwable
+    }
+    log.info("Data creada ------------------------------------------------------------<")
+  }
+
+  @afterclass
+  def close(): Unit = {
+    log.info("Stop CosmosDB emulator")
+    cosmosDBEmulatorContainer.stop()
+  }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 8527d17d3c..033c9dc24b 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -157,6 +157,7 @@ include(":sdks:java:io:amazon-web-services")
 include(":sdks:java:io:amazon-web-services2")
 include(":sdks:java:io:amqp")
 include(":sdks:java:io:azure")
+include(":sdks:java:io:azure-cosmosdb")
 include(":sdks:java:io:cassandra")
 include(":sdks:java:io:clickhouse")
 include(":sdks:java:io:common")
Miuler added a commit to Miuler/beam that referenced this issue Dec 12, 2022
Refs: apache#23604

diff --git a/.gitignore b/.gitignore
index f5fc63f9de..7dd6ee80d6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -135,4 +135,3 @@ playground/frontend/playground_components/pubspec.lock
 **/*.tfvars

 # Ignore Katas auto-generated files
-**/*-remote-info.yaml
\ No newline at end of file
diff --git a/CHANGES.md b/CHANGES.md
index db9f37eeec..a5a1cfac94 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -62,6 +62,7 @@
 * S3 implementation of the Beam filesystem (Go) ([apache#23991](apache#23991)).
 * Support for SingleStoreDB source and sink added (Java) ([apache#22617](apache#22617)).
 * Added support for DefaultAzureCredential authentication in Azure Filesystem (Python) ([apache#24210](apache#24210)).
+* Support for read from Cosmos DB Core SQL API [apache#23610](apache#23610)

 ## New Features / Improvements

diff --git a/sdks/java/io/azure-cosmosdb/README.md b/sdks/java/io/azure-cosmosdb/README.md
new file mode 100644
index 0000000000..f7c61c73ab
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/README.md
@@ -0,0 +1,40 @@
+# Cosmos DB Core SQL API
+
+Compile all module azure-cosmosdb
+
+```shell
+gradle sdks:java:io:azure-cosmosdb:build
+```
+
+## Test
+
+Run TEST for this module (Cosmos DB Core SQL API):
+
+```shell
+gradle sdks:java:io:azure-cosmosdb:test
+```
+
+
+## Publish in Maven Local
+
+Publish this module
+
+```shell
+# apache beam core
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/core/  publishToMavenLocal
+
+# apache beam azure-cosmosdb
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/io/azure-cosmosdb/  publishToMavenLocal
+```
+
+Publish all modules of apache beam
+
+```shell
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/  publishToMavenLocal
+
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p runners/  publishToMavenLocal
+
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p model/ publishToMavenLocal
+```
+
+
diff --git a/sdks/java/io/azure-cosmosdb/build.gradle b/sdks/java/io/azure-cosmosdb/build.gradle
new file mode 100644
index 0000000000..3875532652
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/build.gradle
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins {
+  id("org.apache.beam.module")
+  id("scala")
+}
+
+ext {
+  junitVersion = "5.9.1"
+  cosmosVersion = "4.37.1"
+  cosmosContainerVersion = "1.17.5"
+  bsonMongoVersion = "4.7.2"
+  log4jVersion = "2.19.0"
+}
+
+applyJavaNature(automaticModuleName: "org.apache.beam.sdk.io.azure.cosmosdb")
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Azure Cosmos DB"
+ext.summary = "IO library to read and write Azure Cosmos DB services from Beam."
+
+dependencies {
+  implementation("org.scala-lang:scala-library:2.12.17")
+  implementation("com.azure:azure-cosmos:${cosmosVersion}")
+  implementation library.java.commons_io
+  permitUnusedDeclared library.java.commons_io // BEAM-11761
+  implementation library.java.slf4j_api
+  implementation project(path: ":sdks:java:core", configuration: "shadow")
+  implementation("org.mongodb:bson:${bsonMongoVersion}")
+}
+
+// TEST
+dependencies {
+  testImplementation("org.testcontainers:azure:${cosmosContainerVersion}")
+  testImplementation("com.outr:scribe_2.12:3.10.4")
+  testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
+  testImplementation library.java.mockito_core
+  testImplementation("org.junit.vintage:junit-vintage-engine:${junitVersion}")
+  testRuntimeOnly("org.apache.logging.log4j:log4j-api:$log4jVersion")
+  testRuntimeOnly("org.apache.logging.log4j:log4j-core:$log4jVersion")
+  testRuntimeOnly library.java.slf4j_jdk14
+  testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala
new file mode 100644
index 0000000000..f25b903068
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala
@@ -0,0 +1,50 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import com.azure.cosmos.models.CosmosQueryRequestOptions
+import com.azure.cosmos.{CosmosClient, CosmosClientBuilder}
+import org.apache.beam.sdk.io.BoundedSource
+import org.bson.Document
+import org.slf4j.LoggerFactory
+
+
+private class CosmosBoundedReader(cosmosSource: CosmosBoundedSource) extends BoundedSource.BoundedReader[Document] {
+  private val log = LoggerFactory.getLogger(getClass)
+  private var maybeClient: Option[CosmosClient] = None
+  private var maybeIterator: Option[java.util.Iterator[Document]] = None
+
+  override def start(): Boolean = {
+    maybeClient = Some(
+      new CosmosClientBuilder()
+        .gatewayMode
+        .endpointDiscoveryEnabled(false)
+        .endpoint(cosmosSource.readCosmos.endpoint)
+        .key(cosmosSource.readCosmos.key)
+        .buildClient
+    )
+
+    maybeIterator = maybeClient.map { client =>
+      log.info("Get the container name")
+
+      log.info(s"Get the iterator of the query in container ${cosmosSource.readCosmos.container}")
+      client
+        .getDatabase(cosmosSource.readCosmos.database)
+        .getContainer(cosmosSource.readCosmos.container)
+        .queryItems(cosmosSource.readCosmos.query, new CosmosQueryRequestOptions(), classOf[Document])
+        .iterator()
+    }
+
+    true
+  }
+
+  override def advance(): Boolean = maybeIterator.exists(_.hasNext)
+
+  override def getCurrent: Document = maybeIterator
+    .filter(_.hasNext)
+    //.map(iterator => new Document(iterator.next()))
+    .map(_.next())
+    .orNull
+
+  override def getCurrentSource: CosmosBoundedSource = cosmosSource
+
+  override def close(): Unit = maybeClient.foreach(_.close())
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala
new file mode 100644
index 0000000000..39fafc9039
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala
@@ -0,0 +1,25 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import org.apache.beam.sdk.coders.{Coder, SerializableCoder}
+import org.apache.beam.sdk.io.BoundedSource
+import org.apache.beam.sdk.options.PipelineOptions
+import org.bson.Document
+
+import java.util
+import java.util.Collections
+
+class CosmosBoundedSource(val readCosmos: CosmosRead) extends BoundedSource[Document] {
+
+  /** @inheritdoc
+   * TODO: You have to find a better way, maybe by partition key */
+  override def split(desiredBundleSizeBytes: Long, options: PipelineOptions): util.List[CosmosBoundedSource] = Collections.singletonList(this)
+
+  /** @inheritdoc
+   * The Cosmos DB Coro (SQL) API not support this metrics by the querys */
+  override def getEstimatedSizeBytes(options: PipelineOptions): Long = 0L
+
+  override def getOutputCoder: Coder[Document] = SerializableCoder.of(classOf[Document])
+
+  override def createReader(options: PipelineOptions): BoundedSource.BoundedReader[Document] =
+    new CosmosBoundedReader(this)
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala
new file mode 100644
index 0000000000..fcd3a7abfc
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala
@@ -0,0 +1,8 @@
+package org.apache.beam.sdk.io.azure.cosmos;;
+
+object CosmosIO {
+  def read(): CosmosRead = {
+    CosmosRead()
+  }
+}
+
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala
new file mode 100644
index 0000000000..48a0ece0d4
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala
@@ -0,0 +1,45 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import org.apache.beam.sdk.io.Read
+import org.apache.beam.sdk.transforms.PTransform
+import org.apache.beam.sdk.values.{PBegin, PCollection}
+import org.bson.Document
+import org.slf4j.LoggerFactory
+
+case class CosmosRead(private[cosmos] val endpoint: String = null,
+                      private[cosmos] val key: String = null,
+                      private[cosmos] val database: String = null,
+                      private[cosmos] val container: String = null,
+                      private[cosmos] val query: String = null)
+  extends PTransform[PBegin, PCollection[Document]] {
+
+
+  private val log = LoggerFactory.getLogger(classOf[CosmosRead])
+
+  /** Create new ReadCosmos based into previous ReadCosmos, modifying the endpoint */
+  def withCosmosEndpoint(endpoint: String): CosmosRead = this.copy(endpoint = endpoint)
+
+  def withCosmosKey(key: String): CosmosRead = this.copy(key = key)
+
+  def withDatabase(database: String): CosmosRead = this.copy(database = database)
+
+  def withQuery(query: String): CosmosRead = this.copy(query = query)
+
+  def withContainer(container: String): CosmosRead = this.copy(container = container)
+
+  override def expand(input: PBegin): PCollection[Document] = {
+    log.debug(s"Read CosmosDB with endpoint: $endpoint and query: $query")
+    validate()
+
+    // input.getPipeline.apply(Read.from(new CosmosSource(this)))
+    input.apply(Read.from(new CosmosBoundedSource(this)))
+  }
+
+  private def validate(): Unit = {
+    require(endpoint != null, "CosmosDB endpoint is required")
+    require(key != null, "CosmosDB key is required")
+    require(database != null, "CosmosDB database is required")
+    require(container != null, "CosmosDB container is required")
+    require(query != null, "CosmosDB query is required")
+  }
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties b/sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..28f1bb63f7
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties
@@ -0,0 +1,8 @@
+# Root logger option
+log4j.rootLogger=DEBUG, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+#log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
diff --git a/sdks/java/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala b/sdks/java/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala
new file mode 100644
index 0000000000..7de8dda145
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala
@@ -0,0 +1,103 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import com.azure.cosmos.CosmosClientBuilder
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.azure.cosmos.CosmosIOTest.{CONTAINER, DATABASE, cosmosDBEmulatorContainer}
+import org.apache.beam.sdk.testing.PAssert
+import org.apache.beam.sdk.transforms.Count
+import org.apache.beam.sdk.values.PCollection
+import org.bson.Document
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.LoggerFactory
+import org.testcontainers.containers.CosmosDBEmulatorContainer
+import org.testcontainers.utility.DockerImageName
+
+import java.nio.file.Files
+import scala.util.Using
+
+@RunWith(classOf[JUnit4])
+class CosmosIOTest {
+  private val log = LoggerFactory.getLogger("CosmosIOTest")
+  //  @(Rule @Getter)
+  //  val pipelineWrite: TestPipeline = TestPipeline.create
+  //  @(Rule @Getter)
+  //  val pipelineRead: TestPipeline = TestPipeline.create
+
+  @test
+  def readFromCosmosCoreSqlApi(): Unit = {
+    val read = CosmosIO
+      .read()
+      .withCosmosEndpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint)
+      .withCosmosKey(cosmosDBEmulatorContainer.getEmulatorKey)
+      .withQuery(s"SELECT * FROM c")
+      .withContainer(CONTAINER)
+      .withDatabase(DATABASE)
+
+    val pipeline = Pipeline.create()
+    val count: PCollection[java.lang.Long] = pipeline
+      .apply(read)
+      .apply(Count.globally())
+
+    PAssert.thatSingleton(count).isEqualTo(10)
+
+    pipeline.run().waitUntilFinish()
+  }
+}
+
+/** Initialization of static fields and methods */
+@RunWith(classOf[JUnit4])
+object CosmosIOTest {
+  private val log = LoggerFactory.getLogger("CosmosIOTest[Obj]")
+  private val DOCKER_NAME = "mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:latest"
+  private val cosmosDBEmulatorContainer = new CosmosDBEmulatorContainer(DockerImageName.parse(DOCKER_NAME))
+  private val DATABASE = "test"
+  private val CONTAINER = "test"
+
+  @BeforeClass
+  def setup(): Unit = {
+    log.info("Starting CosmosDB emulator")
+    cosmosDBEmulatorContainer.start()
+
+    val tempFolder = new TemporaryFolder
+    tempFolder.create()
+    val keyStoreFile = tempFolder.newFile("azure-cosmos-emulator.keystore").toPath
+    val keyStore = cosmosDBEmulatorContainer.buildNewKeyStore
+    keyStore.store(Files.newOutputStream(keyStoreFile.toFile.toPath), cosmosDBEmulatorContainer.getEmulatorKey.toCharArray)
+    System.setProperty("javax.net.ssl.trustStore", keyStoreFile.toString)
+    System.setProperty("javax.net.ssl.trustStorePassword", cosmosDBEmulatorContainer.getEmulatorKey)
+    System.setProperty("javax.net.ssl.trustStoreType", "PKCS12")
+
+
+    log.info("Creando la data -------------------------------------------------------->")
+    val triedCreateData = Using(new CosmosClientBuilder()
+      .gatewayMode
+      .endpointDiscoveryEnabled(false)
+      .endpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint)
+      .key(cosmosDBEmulatorContainer.getEmulatorKey)
+      .buildClient) { client =>
+
+      client.createDatabase(DATABASE)
+      val db = client.getDatabase(DATABASE)
+      db.createContainer(CONTAINER, "/id")
+      val container = db.getContainer(CONTAINER)
+      for (i <- 1 to 10) {
+        container.createItem(new Document("id", i.toString))
+      }
+    }
+    if (triedCreateData.isFailure) {
+      val throwable = triedCreateData.failed.get
+      log.error("Error creando la data", throwable)
+      throw throwable
+    }
+    log.info("Data creada ------------------------------------------------------------<")
+  }
+
+  @afterclass
+  def close(): Unit = {
+    log.info("Stop CosmosDB emulator")
+    cosmosDBEmulatorContainer.stop()
+  }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 8527d17d3c..033c9dc24b 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -157,6 +157,7 @@ include(":sdks:java:io:amazon-web-services")
 include(":sdks:java:io:amazon-web-services2")
 include(":sdks:java:io:amqp")
 include(":sdks:java:io:azure")
+include(":sdks:java:io:azure-cosmosdb")
 include(":sdks:java:io:cassandra")
 include(":sdks:java:io:clickhouse")
 include(":sdks:java:io:common")
Miuler added a commit to Miuler/beam that referenced this issue Dec 28, 2022
Refs: apache#23604

diff --git a/.gitignore b/.gitignore
index f5fc63f9de..7dd6ee80d6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -135,4 +135,3 @@ playground/frontend/playground_components/pubspec.lock
 **/*.tfvars

 # Ignore Katas auto-generated files
-**/*-remote-info.yaml
\ No newline at end of file
diff --git a/CHANGES.md b/CHANGES.md
index db9f37eeec..a5a1cfac94 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -62,6 +62,7 @@
 * S3 implementation of the Beam filesystem (Go) ([apache#23991](apache#23991)).
 * Support for SingleStoreDB source and sink added (Java) ([apache#22617](apache#22617)).
 * Added support for DefaultAzureCredential authentication in Azure Filesystem (Python) ([apache#24210](apache#24210)).
+* Support for read from Cosmos DB Core SQL API [apache#23610](apache#23610)

 ## New Features / Improvements

diff --git a/sdks/java/io/azure-cosmosdb/README.md b/sdks/java/io/azure-cosmosdb/README.md
new file mode 100644
index 0000000000..f7c61c73ab
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/README.md
@@ -0,0 +1,40 @@
+# Cosmos DB Core SQL API
+
+Compile all module azure-cosmosdb
+
+```shell
+gradle sdks:java:io:azure-cosmosdb:build
+```
+
+## Test
+
+Run TEST for this module (Cosmos DB Core SQL API):
+
+```shell
+gradle sdks:java:io:azure-cosmosdb:test
+```
+
+
+## Publish in Maven Local
+
+Publish this module
+
+```shell
+# apache beam core
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/core/  publishToMavenLocal
+
+# apache beam azure-cosmosdb
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/io/azure-cosmosdb/  publishToMavenLocal
+```
+
+Publish all modules of apache beam
+
+```shell
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/  publishToMavenLocal
+
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p runners/  publishToMavenLocal
+
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p model/ publishToMavenLocal
+```
+
+
diff --git a/sdks/java/io/azure-cosmosdb/build.gradle b/sdks/java/io/azure-cosmosdb/build.gradle
new file mode 100644
index 0000000000..3875532652
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/build.gradle
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins {
+  id("org.apache.beam.module")
+  id("scala")
+}
+
+ext {
+  junitVersion = "5.9.1"
+  cosmosVersion = "4.37.1"
+  cosmosContainerVersion = "1.17.5"
+  bsonMongoVersion = "4.7.2"
+  log4jVersion = "2.19.0"
+}
+
+applyJavaNature(automaticModuleName: "org.apache.beam.sdk.io.azure.cosmosdb")
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Azure Cosmos DB"
+ext.summary = "IO library to read and write Azure Cosmos DB services from Beam."
+
+dependencies {
+  implementation("org.scala-lang:scala-library:2.12.17")
+  implementation("com.azure:azure-cosmos:${cosmosVersion}")
+  implementation library.java.commons_io
+  permitUnusedDeclared library.java.commons_io // BEAM-11761
+  implementation library.java.slf4j_api
+  implementation project(path: ":sdks:java:core", configuration: "shadow")
+  implementation("org.mongodb:bson:${bsonMongoVersion}")
+}
+
+// TEST
+dependencies {
+  testImplementation("org.testcontainers:azure:${cosmosContainerVersion}")
+  testImplementation("com.outr:scribe_2.12:3.10.4")
+  testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
+  testImplementation library.java.mockito_core
+  testImplementation("org.junit.vintage:junit-vintage-engine:${junitVersion}")
+  testRuntimeOnly("org.apache.logging.log4j:log4j-api:$log4jVersion")
+  testRuntimeOnly("org.apache.logging.log4j:log4j-core:$log4jVersion")
+  testRuntimeOnly library.java.slf4j_jdk14
+  testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala
new file mode 100644
index 0000000000..f25b903068
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala
@@ -0,0 +1,50 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import com.azure.cosmos.models.CosmosQueryRequestOptions
+import com.azure.cosmos.{CosmosClient, CosmosClientBuilder}
+import org.apache.beam.sdk.io.BoundedSource
+import org.bson.Document
+import org.slf4j.LoggerFactory
+
+
+private class CosmosBoundedReader(cosmosSource: CosmosBoundedSource) extends BoundedSource.BoundedReader[Document] {
+  private val log = LoggerFactory.getLogger(getClass)
+  private var maybeClient: Option[CosmosClient] = None
+  private var maybeIterator: Option[java.util.Iterator[Document]] = None
+
+  override def start(): Boolean = {
+    maybeClient = Some(
+      new CosmosClientBuilder()
+        .gatewayMode
+        .endpointDiscoveryEnabled(false)
+        .endpoint(cosmosSource.readCosmos.endpoint)
+        .key(cosmosSource.readCosmos.key)
+        .buildClient
+    )
+
+    maybeIterator = maybeClient.map { client =>
+      log.info("Get the container name")
+
+      log.info(s"Get the iterator of the query in container ${cosmosSource.readCosmos.container}")
+      client
+        .getDatabase(cosmosSource.readCosmos.database)
+        .getContainer(cosmosSource.readCosmos.container)
+        .queryItems(cosmosSource.readCosmos.query, new CosmosQueryRequestOptions(), classOf[Document])
+        .iterator()
+    }
+
+    true
+  }
+
+  override def advance(): Boolean = maybeIterator.exists(_.hasNext)
+
+  override def getCurrent: Document = maybeIterator
+    .filter(_.hasNext)
+    //.map(iterator => new Document(iterator.next()))
+    .map(_.next())
+    .orNull
+
+  override def getCurrentSource: CosmosBoundedSource = cosmosSource
+
+  override def close(): Unit = maybeClient.foreach(_.close())
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala
new file mode 100644
index 0000000000..39fafc9039
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala
@@ -0,0 +1,25 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import org.apache.beam.sdk.coders.{Coder, SerializableCoder}
+import org.apache.beam.sdk.io.BoundedSource
+import org.apache.beam.sdk.options.PipelineOptions
+import org.bson.Document
+
+import java.util
+import java.util.Collections
+
+class CosmosBoundedSource(val readCosmos: CosmosRead) extends BoundedSource[Document] {
+
+  /** @inheritdoc
+   * TODO: You have to find a better way, maybe by partition key */
+  override def split(desiredBundleSizeBytes: Long, options: PipelineOptions): util.List[CosmosBoundedSource] = Collections.singletonList(this)
+
+  /** @inheritdoc
+   * The Cosmos DB Coro (SQL) API not support this metrics by the querys */
+  override def getEstimatedSizeBytes(options: PipelineOptions): Long = 0L
+
+  override def getOutputCoder: Coder[Document] = SerializableCoder.of(classOf[Document])
+
+  override def createReader(options: PipelineOptions): BoundedSource.BoundedReader[Document] =
+    new CosmosBoundedReader(this)
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala
new file mode 100644
index 0000000000..fcd3a7abfc
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala
@@ -0,0 +1,8 @@
+package org.apache.beam.sdk.io.azure.cosmos;;
+
+object CosmosIO {
+  def read(): CosmosRead = {
+    CosmosRead()
+  }
+}
+
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala
new file mode 100644
index 0000000000..48a0ece0d4
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala
@@ -0,0 +1,45 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import org.apache.beam.sdk.io.Read
+import org.apache.beam.sdk.transforms.PTransform
+import org.apache.beam.sdk.values.{PBegin, PCollection}
+import org.bson.Document
+import org.slf4j.LoggerFactory
+
+case class CosmosRead(private[cosmos] val endpoint: String = null,
+                      private[cosmos] val key: String = null,
+                      private[cosmos] val database: String = null,
+                      private[cosmos] val container: String = null,
+                      private[cosmos] val query: String = null)
+  extends PTransform[PBegin, PCollection[Document]] {
+
+
+  private val log = LoggerFactory.getLogger(classOf[CosmosRead])
+
+  /** Create new ReadCosmos based into previous ReadCosmos, modifying the endpoint */
+  def withCosmosEndpoint(endpoint: String): CosmosRead = this.copy(endpoint = endpoint)
+
+  def withCosmosKey(key: String): CosmosRead = this.copy(key = key)
+
+  def withDatabase(database: String): CosmosRead = this.copy(database = database)
+
+  def withQuery(query: String): CosmosRead = this.copy(query = query)
+
+  def withContainer(container: String): CosmosRead = this.copy(container = container)
+
+  override def expand(input: PBegin): PCollection[Document] = {
+    log.debug(s"Read CosmosDB with endpoint: $endpoint and query: $query")
+    validate()
+
+    // input.getPipeline.apply(Read.from(new CosmosSource(this)))
+    input.apply(Read.from(new CosmosBoundedSource(this)))
+  }
+
+  private def validate(): Unit = {
+    require(endpoint != null, "CosmosDB endpoint is required")
+    require(key != null, "CosmosDB key is required")
+    require(database != null, "CosmosDB database is required")
+    require(container != null, "CosmosDB container is required")
+    require(query != null, "CosmosDB query is required")
+  }
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties b/sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..28f1bb63f7
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties
@@ -0,0 +1,8 @@
+# Root logger option
+log4j.rootLogger=DEBUG, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+#log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
diff --git a/sdks/java/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala b/sdks/java/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala
new file mode 100644
index 0000000000..7de8dda145
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala
@@ -0,0 +1,103 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import com.azure.cosmos.CosmosClientBuilder
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.azure.cosmos.CosmosIOTest.{CONTAINER, DATABASE, cosmosDBEmulatorContainer}
+import org.apache.beam.sdk.testing.PAssert
+import org.apache.beam.sdk.transforms.Count
+import org.apache.beam.sdk.values.PCollection
+import org.bson.Document
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.LoggerFactory
+import org.testcontainers.containers.CosmosDBEmulatorContainer
+import org.testcontainers.utility.DockerImageName
+
+import java.nio.file.Files
+import scala.util.Using
+
+@RunWith(classOf[JUnit4])
+class CosmosIOTest {
+  private val log = LoggerFactory.getLogger("CosmosIOTest")
+  //  @(Rule @Getter)
+  //  val pipelineWrite: TestPipeline = TestPipeline.create
+  //  @(Rule @Getter)
+  //  val pipelineRead: TestPipeline = TestPipeline.create
+
+  @test
+  def readFromCosmosCoreSqlApi(): Unit = {
+    val read = CosmosIO
+      .read()
+      .withCosmosEndpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint)
+      .withCosmosKey(cosmosDBEmulatorContainer.getEmulatorKey)
+      .withQuery(s"SELECT * FROM c")
+      .withContainer(CONTAINER)
+      .withDatabase(DATABASE)
+
+    val pipeline = Pipeline.create()
+    val count: PCollection[java.lang.Long] = pipeline
+      .apply(read)
+      .apply(Count.globally())
+
+    PAssert.thatSingleton(count).isEqualTo(10)
+
+    pipeline.run().waitUntilFinish()
+  }
+}
+
+/** Initialization of static fields and methods */
+@RunWith(classOf[JUnit4])
+object CosmosIOTest {
+  private val log = LoggerFactory.getLogger("CosmosIOTest[Obj]")
+  private val DOCKER_NAME = "mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:latest"
+  private val cosmosDBEmulatorContainer = new CosmosDBEmulatorContainer(DockerImageName.parse(DOCKER_NAME))
+  private val DATABASE = "test"
+  private val CONTAINER = "test"
+
+  @BeforeClass
+  def setup(): Unit = {
+    log.info("Starting CosmosDB emulator")
+    cosmosDBEmulatorContainer.start()
+
+    val tempFolder = new TemporaryFolder
+    tempFolder.create()
+    val keyStoreFile = tempFolder.newFile("azure-cosmos-emulator.keystore").toPath
+    val keyStore = cosmosDBEmulatorContainer.buildNewKeyStore
+    keyStore.store(Files.newOutputStream(keyStoreFile.toFile.toPath), cosmosDBEmulatorContainer.getEmulatorKey.toCharArray)
+    System.setProperty("javax.net.ssl.trustStore", keyStoreFile.toString)
+    System.setProperty("javax.net.ssl.trustStorePassword", cosmosDBEmulatorContainer.getEmulatorKey)
+    System.setProperty("javax.net.ssl.trustStoreType", "PKCS12")
+
+
+    log.info("Creando la data -------------------------------------------------------->")
+    val triedCreateData = Using(new CosmosClientBuilder()
+      .gatewayMode
+      .endpointDiscoveryEnabled(false)
+      .endpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint)
+      .key(cosmosDBEmulatorContainer.getEmulatorKey)
+      .buildClient) { client =>
+
+      client.createDatabase(DATABASE)
+      val db = client.getDatabase(DATABASE)
+      db.createContainer(CONTAINER, "/id")
+      val container = db.getContainer(CONTAINER)
+      for (i <- 1 to 10) {
+        container.createItem(new Document("id", i.toString))
+      }
+    }
+    if (triedCreateData.isFailure) {
+      val throwable = triedCreateData.failed.get
+      log.error("Error creando la data", throwable)
+      throw throwable
+    }
+    log.info("Data creada ------------------------------------------------------------<")
+  }
+
+  @afterclass
+  def close(): Unit = {
+    log.info("Stop CosmosDB emulator")
+    cosmosDBEmulatorContainer.stop()
+  }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 8527d17d3c..033c9dc24b 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -157,6 +157,7 @@ include(":sdks:java:io:amazon-web-services")
 include(":sdks:java:io:amazon-web-services2")
 include(":sdks:java:io:amqp")
 include(":sdks:java:io:azure")
+include(":sdks:java:io:azure-cosmosdb")
 include(":sdks:java:io:cassandra")
 include(":sdks:java:io:clickhouse")
 include(":sdks:java:io:common")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants