From c0e7ddb86d78910378b97e4b87ef5351850e69e1 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Thu, 28 Nov 2024 20:34:57 +0000 Subject: [PATCH] Update migration.md --- .../docs/migration.md | 87 ++++++++++--------- 1 file changed, 44 insertions(+), 43 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/migration.md b/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/migration.md index 873fb3c4071e6..bc86281d92d9e 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/migration.md +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/migration.md @@ -3,59 +3,60 @@ ## Configuration settings ### Commonly used config options -| Name - V1-3 (Spark 2.4) | Name - V4 and later (Spark >= 3.1) | Notes | -| :--- |:------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| endpoint | spark.cosmos.accountEndpoint | | -| masterkey | spark.cosmos.accountKey | | -| database | spark.cosmos.database | | -| collection | spark.cosmos.container | Set this config value to `Beginning` | -| preferredregions | spark.cosmos.preferredRegionsList | | -| application_name | spark.cosmos.applicationName | | -| connectionmode | spark.cosmos.useGatewayMode | Direct (over TCP) is the default - setting this to `true` will switch it to Gateway mode | +| Name - V1-3 (Spark 2.4) | Name - V4 and later (Spark >= 3.1) | Notes | +|:--------------------------------|:------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| endpoint | spark.cosmos.accountEndpoint | | +| masterkey | spark.cosmos.accountKey | | +| database | spark.cosmos.database | | +| collection | spark.cosmos.container | Set this config value to `Beginning` | +| preferredregions | spark.cosmos.preferredRegionsList | | +| application_name | spark.cosmos.applicationName | | +| connectionmode | spark.cosmos.useGatewayMode | Direct (over TCP) is the default - setting this to `true` will switch it to Gateway mode | | changefeedstartfromthebeginning | spark.cosmos.changeFeed.startFrom | Set this config value to `Beginning` | -| changefeedstartfromdatetime | spark.cosmos.changeFeed.startFrom | Set this config value to the point in time you want to start from - for example `2020-02-10T14:15:03Z` | -| changefeedmaxpagesperbatch | spark.cosmos.changeFeed.itemCountPerTriggerHint | | -| WritingBatchSize | spark.cosmos.write.bulk.maxBatchSize | Recommendation would be to start with the default of `100` (not specifying this config entry - and only adjust (reduce) it when really necessary | -| Upsert | spark.cosmos.write.strategy | If you use `ItemOverwrite` here the behavior is the same as with Upsert==true before | -| WriteThroughputBudget | spark.cosmos.throughputControl.* | See the [Throughput control](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/scenarios/Ingestion.md#throughput-control) section for more information | -| MaxIngestionTaskParallelism | n/a | Not relevant anymore - just remove this config entry | -| query_pagesize | n/a | Not relevant anymore - just remove this config entry | -| query_custom | spark.cosmos.read.customQuery | When provided the custom query will be processed against the Cosmos endpoint instead of dynamically generating the query via predicate push down. Usually it is recommended to rely on Spark's predicate push down because that will allow to generate the most efficient set of filters based on the query plan. But there are a couple of of predicates like aggregates (count, group by, avg, sum etc.) that cannot be pushed down yet (at least in Spark 3.1) - so the custom query is a fallback to allow them to be pushed into the query sent to Cosmos. | -| readchangefeed | n/a | See the `"One DataSource rules them all" vs. separate DataSource for ChangeFeed` section below. For change feed we now have a dedicated DataSource | -| changefeedqueryname | n/a | See the `Structured streaming` section below. Bookmarks/offsets are not stored in a proprietary way by the connector any longer but by Spark's Metadata Store | -| changefeedcheckpointlocation | n/a | See the `Structured streaming` section below. Bookmarks/offsets are not stored in a proprietary way by the connector any longer but by Spark's Metadata Store | +| changefeedstartfromdatetime | spark.cosmos.changeFeed.startFrom | Set this config value to the point in time you want to start from - for example `2020-02-10T14:15:03Z` | +| changefeedmaxpagesperbatch | spark.cosmos.changeFeed.itemCountPerTriggerHint | | +| WritingBatchSize | spark.cosmos.write.bulk.maxBatchSize | Recommendation would be to start with the default of `100` (not specifying this config entry - and only adjust (reduce) it when really necessary | +| Upsert | spark.cosmos.write.strategy | If you use `ItemOverwrite` here the behavior is the same as with Upsert==true before | +| WriteThroughputBudget | spark.cosmos.throughputControl.* | See the [Throughput control](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3_2-12/docs/scenarios/Ingestion.md#throughput-control) section for more information | +| MaxIngestionTaskParallelism | n/a | Not relevant anymore - just remove this config entry | +| query_pagesize | n/a | Not relevant anymore - just remove this config entry | +| query_custom | spark.cosmos.read.customQuery | When provided the custom query will be processed against the Cosmos endpoint instead of dynamically generating the query via predicate push down. Usually it is recommended to rely on Spark's predicate push down because that will allow to generate the most efficient set of filters based on the query plan. But there are a couple of of predicates like aggregates (count, group by, avg, sum etc.) that cannot be pushed down yet (at least in Spark 3.1) - so the custom query is a fallback to allow them to be pushed into the query sent to Cosmos. | +| readchangefeed | n/a | See the `"One DataSource rules them all" vs. separate DataSource for ChangeFeed` section below. For change feed we now have a dedicated DataSource | +| changefeedqueryname | n/a | See the `Structured streaming` section below. Bookmarks/offsets are not stored in a proprietary way by the connector any longer but by Spark's Metadata Store | +| changefeedcheckpointlocation | n/a | See the `Structured streaming` section below. Bookmarks/offsets are not stored in a proprietary way by the connector any longer but by Spark's Metadata Store | ### Other config options -| Name - V1-3 (Spark 2.4) | Name - V4 and later (Spark >= 3.1) | Notes | -| :--- | :---- | :--- | -| query_maxretryattemptsonthrottledrequests | n/a| The new connector will always retry for ever when throttling is happening | -| query_maxretrywaittimeinseconds | n/a| The new connector will always retry for ever when throttling is happening | -| query_maxdegreeofparallelism | n/a | Not relevant anymore - just remove this config entry | -| query_maxbuffereditemcount | n/a | Not relevant anymore - just remove this config entry | -| query_enablescan | n/a | Not relevant anymore - just remove this config entry | -| query_disableruperminuteusage | n/a | Not relevant anymore - just remove this config entry | -| query_emitverbosetraces | n/a | Not relevant anymore - just remove this config entry | -| query_maxbuffereditemcount | n/a | Not relevant anymore - just remove this config entry | -| consistencylevel | spark.cosmos.read.forceEventualConsistency | By default the new connector will use Eventual Consistency for all read operations. If instead you want read operations to use the default account's consistency-level you can override the `spark.cosmos.read.forceEventualConsistency` property to `false`. | -| rollingchangefeed | n/a | Not relevant anymore - just remove this config entry | -| changefeedusenexttoken | n/a | Not relevant anymore - just remove this config entry | -| changefeedusenexttoken | n/a | Not relevant anymore - just remove this config entry | -| writestreamretrypolicy.* | n/a | Not relevant anymore - or not supported yet (poison message handling) | -| resourcetoken | | Not supported yet with the new connector | -| connectionmaxpoolsize | n/a | Not relevant anymore - just remove this config entry | -| connectionidletimeout | n/a | Not relevant anymore - just remove this config entry | -| connectionrequesttimeout | n/a | Not relevant anymore - just remove this config entry | -| preservenullinwrite | spark.cosmos.serialization.inclusionMode | The default value for `preservenullinwrite` was `false` which reflects `NonNull` now. But the new default value is `Always` which reflects `preservenullinwrite` == `true` - *So the default values have changed for this setting* | +| Name - V1-3 (Spark 2.4) | Name - V4 and later (Spark >= 3.1) | Notes | +|:------------------------------------------|:-------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| query_maxretryattemptsonthrottledrequests | n/a | The new connector will always retry for ever when throttling is happening | +| query_maxretrywaittimeinseconds | n/a | The new connector will always retry for ever when throttling is happening | +| query_maxdegreeofparallelism | n/a | Not relevant anymore - just remove this config entry | +| query_maxbuffereditemcount | n/a | Not relevant anymore - just remove this config entry | +| query_enablescan | n/a | Not relevant anymore - just remove this config entry | +| query_disableruperminuteusage | n/a | Not relevant anymore - just remove this config entry | +| query_emitverbosetraces | n/a | Not relevant anymore - just remove this config entry | +| query_maxbuffereditemcount | n/a | Not relevant anymore - just remove this config entry | +| consistencylevel | spark.cosmos.read.forceEventualConsistency | By default the new connector will use Eventual Consistency for all read operations. If instead you want read operations to use the default account's consistency-level you can override the `spark.cosmos.read.forceEventualConsistency` property to `false`. | +| rollingchangefeed | n/a | Not relevant anymore - just remove this config entry | +| changefeedusenexttoken | n/a | Not relevant anymore - just remove this config entry | +| changefeedusenexttoken | n/a | Not relevant anymore - just remove this config entry | +| writestreamretrypolicy.* | n/a | Not relevant anymore - or not supported yet (poison message handling) | +| resourcetoken | | Not supported yet with the new connector | +| connectionmaxpoolsize | n/a | Not relevant anymore - just remove this config entry | +| connectionidletimeout | n/a | Not relevant anymore - just remove this config entry | +| connectionrequesttimeout | n/a | Not relevant anymore - just remove this config entry | +| preservenullinwrite | spark.cosmos.serialization.inclusionMode | The default value for `preservenullinwrite` was `false` which reflects `NonNull` now. But the new default value is `Always` which reflects `preservenullinwrite` == `true` - *So the default values have changed for this setting* | ## Conceptual differences The list above shows the new names of the different config options. Below you can find an explanation for conceptual differences in the new connector. The Cosmos DB Connector for Spark 3 is implementing using the DataSource V2 API (vs. Data Source V1 with the old connector) which is driving some changes especially for structured streaming. ### Partitioning -When reading data in Spark via a DataSource, the number of partitions in the returned RDD is determined by the DataSource. In the Spark 2.4 version of the Cosmos DB connector the DataSource would create 1 Spark partition for each physical Cosmos DB partition. Especially for smaller Cosmos accounts this often lead to situations where the number of resulting Spark partitions was lower than the number of Cores available for the executors - so the latency of the spark job was relatively high because not all Executors would participate in processing the read operations from Cosmos. The main reason for this decision was that it wasn't possible to scope a query to just a fragment of a physical partition when the Cosmos DB Connector for Spark 2.4 was created. With the new connector for Spark 3 we now by default use a new capability available in the Cosmos DB Backend, and the Java V4 SDK that allows scoping queries to just a fragment of a physical partition. As a result the default partitioning strategy will ensure that at least as many Spark partitions as Executor Cores are created. The number of Spark partition created will depend on the number of available Cores for executors, and the storage size in each physical partition. -For most use cases the default partitioning strategy `Default` should be sufficient. For some use cases (especially when doing very targeted filtering to just one logical partition key etc.) it might be preferable to use the previous partitioning model (just one partition per physical Cosmos partition) = this can be achieved by using the `Restrictive` partitioning strategy. +When reading data in Spark via a DataSource, the number of partitions in the returned RDD is determined by the DataSource. In the Spark 2.4 version of the Cosmos DB connector the DataSource would create 1 Spark partition for each physical Cosmos DB partition. The same is true with the Spark 3.* connectors. For smaller Cosmos accounts this can lead to situations where the number of resulting Spark partitions is lower than the number of Cores available for the executors - so the latency of the spark job can be unnecessarily high because not all Executors would participate in processing the read operations from Cosmos. With the new connector for Spark 3 we now have a new capability available in the Cosmos DB Backend, and the Java V4 SDK that allows scoping queries to just a fragment of a physical partition. The partitioning strategy `Aggressive` (in config `spark.cosmos.read.partitioning.strategy`) will ensure that at least as many Spark partitions as Executor Cores are created. The number of Spark partition created will depend on the number of available Cores for executors, and the storage size in each physical partition. +For most use cases the default partitioning strategy `Default` should be sufficient. When Spark jobs would benefit from higher concurrency the `Aggressive` partitioning strategy can help. -### Automatically populating "id" column +### Automatically populating `id` column Unlike the Cosmos DB connector for Spark 2.* the new connector for Spark 3.* and above `cosmos.oltp` requires you to pre-populate an `id` column in the data frame containing the data you want to write into Cosmos DB. In the previous version it was possible to let the Spark connector auto-generate a value for the `id` column. We changed this, because the auto-generation of `id` values in the connector has an intrinsic problem whenever Spark retries a task. A different `id` value would be generated for each retry - which means you might end-up with duplicate records. A sample that explains how you can safely populate `id` values in a data frame is located here: [Ingestion best practices](https://aka.ms/azure-cosmos-spark-3-scenarios-ingestion#populating-id-column) +***NOTE: Please ensure that you add the `id` column only in the Dataframe/RDD you write to Cosmos DB - not in any Dataframe before you make transformations/aggregations. The `id` column is required in Cosmos DB (service) and the tuple of the partition key value and ´id´ column value have to be globally unique. So, this column is only needed when you write data to Cosmos DB.*** ### "One DataSource rules them all" vs. separate DataSource for ChangeFeed In the Cosmos DB Connector for Spark 2.4 all operations (writing or reading documents/items from the container as well as processing change feed) were surfaced by one DataSource. With the new Cosmos DB Connector for Spark 3 we are using two different DataSources - `cosmos.oltp` and `cosmos.oltp.changeFeed` to be able to express the supported capabilities in the DataSource V2 API correctly - for example to expose to the Spark runtime that the change feed DataSource would not support writes but supports read stream operations - while the DataSource for items support writing (both streaming and batch) but no read stream operations. This means when migrating existing notebooks/programs, it will be necessary to not only change the configuration but also the identifier in the `format(xxx)` clause to use the right identifier - `cosmos.oltp` for operations on items and `cosmos.oltp.changeFeed` when processing change feed events.