From bf616b42da84a126c8054953d20a4fa14fd7bfe2 Mon Sep 17 00:00:00 2001 From: "Christian Whitehead (MSFT)" <35080559+chrwhit@users.noreply.github.com> Date: Mon, 7 Jun 2021 16:48:15 -0700 Subject: [PATCH] Chrwhit/master update two (#22123) * Exposes management node in azure-core-amqp (#22095) * Update AmqpConnection to have a getManagementNode. * Adding AmqpManagementNode. * Update AmqpConnection, AmqpManagementNode, AmqpSession to use AsyncCloseable. * Adding AsyncCloseable to AmqpLink. * ClaimsBasedSecurityNode.java uses AsyncCloseable. * Implements CbsNode's closeAsync() and adds tests. * ReactorSession implements closeAsync() * ReactorConnection uses closeAsync(). Renames dispose() to closeAsync(). Fixes errors where some close operations were not subscribed to. * RequestResponseChannel. Remove close operation with message. * Adding DeliveryOutcome models and DeliveryState enum. * Add authorization scope to connection options. * Add MessageUtils to serialize and deserialize AmqpAnnotatedMessage * Update AmqpManagementNode to expose delivery outcomes because they can be associated with messages. * Adding MessageUtil support for converting DeliveryOutcome and Outcomes. * Fixing build breaks from ConnectionOptions. * Adding management channel class. * Adding management channel into ReactorConnection. * Update ExceptionUtil to return instead of throwing on unknown amqp error codes. * Moving ManagementChannel formatting. * Add javadocs to ReceivedDeliveryOutcome. * Add tests for ManagementChannel * Adding tests for message utils. * Fix javadoc on ModifiedDeliveryOutcome * ReactorConnection: Hook up dispose method. * EventHubs: Fixing instances of ConnectionOptions. * ServiceBus: Fix build errors using ConnectionOptions. * Adding MessageUtilsTests. * Updating CHANGELOG. * Annotate HttpRange with Immutable (#22119) * Cosmos Spark: Changing inferSchema.forceNullableProperties default to true (#22049) * Changing default * Docs * Tests * new test * doc update * Change log * Make getScopes in the ARM Authentication Policy Public (#22120) Make getScopes in the ARM Authentication Policy Public Co-authored-by: Connie Yau Co-authored-by: Alan Zimmer <48699787+alzimmermsft@users.noreply.github.com> Co-authored-by: Matias Quaranta --- .../cosmos/spark/SparkE2EQueryITest.scala | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala index a814e5e0e4f80..8b18c09306eea 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala @@ -524,6 +524,57 @@ class SparkE2EQueryITest } } + "spark query" can "when forceNullableProperties is false and rows have different schema" in { + val cosmosEndpoint = TestConfigurations.HOST + val cosmosMasterKey = TestConfigurations.MASTER_KEY + val samplingSize = 100 + val expectedResults = samplingSize * 2 + val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer) + + // Inserting documents with slightly different schema + for( _ <- 1 to expectedResults) { + val objectNode = Utils.getSimpleObjectMapper.createObjectNode() + val arr = objectNode.putArray("object_array") + val nested = Utils.getSimpleObjectMapper.createObjectNode() + nested.put("A", "test") + nested.put("B", "test") + arr.add(nested) + objectNode.put("id", UUID.randomUUID().toString) + container.createItem(objectNode).block() + } + + for( _ <- 1 to samplingSize) { + val objectNode2 = Utils.getSimpleObjectMapper.createObjectNode() + val arr = objectNode2.putArray("object_array") + val nested = Utils.getSimpleObjectMapper.createObjectNode() + nested.put("A", "test") + arr.add(nested) + objectNode2.put("id", UUID.randomUUID().toString) + container.createItem(objectNode2).block() + } + + val cfgWithInference = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint, + "spark.cosmos.accountKey" -> cosmosMasterKey, + "spark.cosmos.database" -> cosmosDatabase, + "spark.cosmos.container" -> cosmosContainer, + "spark.cosmos.read.inferSchema.enabled" -> "true", + "spark.cosmos.read.inferSchema.forceNullableProperties" -> "false", + "spark.cosmos.read.inferSchema.samplingSize" -> samplingSize.toString, + "spark.cosmos.read.inferSchema.query" -> "SELECT * FROM c ORDER BY c._ts", + "spark.cosmos.read.partitioning.strategy" -> "Restrictive" + ) + + val dfWithInference = spark.read.format("cosmos.oltp").options(cfgWithInference).load() + try { + dfWithInference.collect() + fail("Should have thrown an exception") + } + catch { + case inner: Exception => + inner.toString.contains("The 1th field 'B' of input row cannot be null") shouldBe true + } + } + "spark query" can "use custom sampling size" in { val cosmosEndpoint = TestConfigurations.HOST val cosmosMasterKey = TestConfigurations.MASTER_KEY