diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala index a814e5e0e4f80..8b18c09306eea 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala @@ -524,6 +524,57 @@ class SparkE2EQueryITest } } + "spark query" can "when forceNullableProperties is false and rows have different schema" in { + val cosmosEndpoint = TestConfigurations.HOST + val cosmosMasterKey = TestConfigurations.MASTER_KEY + val samplingSize = 100 + val expectedResults = samplingSize * 2 + val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer) + + // Inserting documents with slightly different schema + for( _ <- 1 to expectedResults) { + val objectNode = Utils.getSimpleObjectMapper.createObjectNode() + val arr = objectNode.putArray("object_array") + val nested = Utils.getSimpleObjectMapper.createObjectNode() + nested.put("A", "test") + nested.put("B", "test") + arr.add(nested) + objectNode.put("id", UUID.randomUUID().toString) + container.createItem(objectNode).block() + } + + for( _ <- 1 to samplingSize) { + val objectNode2 = Utils.getSimpleObjectMapper.createObjectNode() + val arr = objectNode2.putArray("object_array") + val nested = Utils.getSimpleObjectMapper.createObjectNode() + nested.put("A", "test") + arr.add(nested) + objectNode2.put("id", UUID.randomUUID().toString) + container.createItem(objectNode2).block() + } + + val cfgWithInference = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint, + "spark.cosmos.accountKey" -> cosmosMasterKey, + "spark.cosmos.database" -> cosmosDatabase, + "spark.cosmos.container" -> cosmosContainer, + "spark.cosmos.read.inferSchema.enabled" -> "true", + "spark.cosmos.read.inferSchema.forceNullableProperties" -> "false", + "spark.cosmos.read.inferSchema.samplingSize" -> samplingSize.toString, + "spark.cosmos.read.inferSchema.query" -> "SELECT * FROM c ORDER BY c._ts", + "spark.cosmos.read.partitioning.strategy" -> "Restrictive" + ) + + val dfWithInference = spark.read.format("cosmos.oltp").options(cfgWithInference).load() + try { + dfWithInference.collect() + fail("Should have thrown an exception") + } + catch { + case inner: Exception => + inner.toString.contains("The 1th field 'B' of input row cannot be null") shouldBe true + } + } + "spark query" can "use custom sampling size" in { val cosmosEndpoint = TestConfigurations.HOST val cosmosMasterKey = TestConfigurations.MASTER_KEY