Skip to content

Commit 5115db0

Browse files
xinlian12annie-mac
andauthored
support feedrange cache refresh interval config (#46759)
* support feedrange cache refresh interval config --------- Co-authored-by: annie-mac <xinlian@microsoft.com>
1 parent adb7e32 commit 5115db0

31 files changed

+281
-149
lines changed

sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
### 4.40.0-beta.1 (Unreleased)
44

55
#### Features Added
6+
* Added support for feed range cache refresh interval config. - See [46759](https://github.com/Azure/azure-sdk-for-java/pull/46759)
67

78
#### Breaking Changes
89

sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ private class ChangeFeedMicroBatchStream
210210
assertNotNullOrEmpty(checkpointLocation, "checkpointLocation"))
211211
val offsetJson = metadataLog.get(0).getOrElse {
212212
val newOffsetJson = CosmosPartitionPlanner.createInitialOffset(
213-
container, changeFeedConfig, partitioningConfig, Some(streamId))
213+
container, containerConfig, changeFeedConfig, partitioningConfig, Some(streamId))
214214
metadataLog.add(0, newOffsetJson)
215215
newOffsetJson
216216
}

sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
### 4.40.0-beta.1 (Unreleased)
44

55
#### Features Added
6+
* Added support for feed range cache refresh interval config. - See [46759](https://github.com/Azure/azure-sdk-for-java/pull/46759)
67

78
#### Breaking Changes
89

sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ private class ChangeFeedMicroBatchStream
211211
assertNotNullOrEmpty(checkpointLocation, "checkpointLocation"))
212212
val offsetJson = metadataLog.get(0).getOrElse {
213213
val newOffsetJson = CosmosPartitionPlanner.createInitialOffset(
214-
container, changeFeedConfig, partitioningConfig, Some(streamId))
214+
container, containerConfig, changeFeedConfig, partitioningConfig, Some(streamId))
215215
metadataLog.add(0, newOffsetJson)
216216
newOffsetJson
217217
}

sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
### 4.40.0-beta.1 (Unreleased)
44

55
#### Features Added
6+
* Added support for feed range cache refresh interval config. - See [46759](https://github.com/Azure/azure-sdk-for-java/pull/46759)
67

78
#### Breaking Changes
89

sdk/cosmos/azure-cosmos-spark_3-5_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ private class ChangeFeedMicroBatchStream
212212
assertNotNullOrEmpty(checkpointLocation, "checkpointLocation"))
213213
val offsetJson = metadataLog.get(0).getOrElse {
214214
val newOffsetJson = CosmosPartitionPlanner.createInitialOffset(
215-
container, changeFeedConfig, partitioningConfig, Some(streamId))
215+
container, containerConfig, changeFeedConfig, partitioningConfig, Some(streamId))
216216
metadataLog.add(0, newOffsetJson)
217217
newOffsetJson
218218
}

sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md

Lines changed: 14 additions & 13 deletions
Large diffs are not rendered by default.

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import scala.collection.JavaConverters._
5050
private class BulkWriter
5151
(
5252
container: CosmosAsyncContainer,
53+
containerConfig: CosmosContainerConfig,
5354
partitionKeyDefinition: PartitionKeyDefinition,
5455
writeConfig: CosmosWriteConfig,
5556
diagnosticsConfig: DiagnosticsConfig,
@@ -74,7 +75,7 @@ private class BulkWriter
7475
case Some(configuredMaxConcurrentPartitions) => 2 * configuredMaxConcurrentPartitions
7576
// using the total number of physical partitions
7677
// multiplied by 2 to leave space for partition splits during ingestion
77-
case None => 2 * ContainerFeedRangesCache.getFeedRanges(container).block().size
78+
case None => 2 * ContainerFeedRangesCache.getFeedRanges(container, containerConfig.feedRangeRefreshIntervalInSecondsOpt).block().size
7879
}
7980
log.logInfo(
8081
s"BulkWriter instantiated (Host CPU count: $cpuCount, maxPendingOperations: $maxPendingOperations, " +

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedBatch.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ private class ChangeFeedBatch
9999
SparkBridgeImplementationInternal.extractCollectionRid(changeFeedStateBase64))
100100

101101
val newOffsetJson = CosmosPartitionPlanner.createInitialOffset(
102-
container, changeFeedConfig, partitioningConfig, None)
102+
container, containerConfig, changeFeedConfig, partitioningConfig, None)
103103
log.logWarning(s"Invalid Start offset '$offsetJson' retrieved from file location " +
104104
s"'$startOffsetLocation' for batchId: $batchId -> New offset retrieved from " +
105105
s"service: '$newOffsetJson'.")
@@ -113,14 +113,14 @@ private class ChangeFeedBatch
113113
newOffsetJson
114114
}
115115
} else {
116-
val newOffsetJson = CosmosPartitionPlanner.createInitialOffset(container, changeFeedConfig, partitioningConfig, None)
116+
val newOffsetJson = CosmosPartitionPlanner.createInitialOffset(container, containerConfig, changeFeedConfig, partitioningConfig, None)
117117
log.logDebug(s"No Start offset retrieved from file location '$startOffsetLocation' for batchId: $batchId " +
118118
s"-> offset retrieved from service: '$newOffsetJson'.")
119119

120120
newOffsetJson
121121
}
122122
} else {
123-
val newOffsetJson = CosmosPartitionPlanner.createInitialOffset(container, changeFeedConfig, partitioningConfig, None)
123+
val newOffsetJson = CosmosPartitionPlanner.createInitialOffset(container, containerConfig, changeFeedConfig, partitioningConfig, None)
124124
log.logDebug(s"No offset file location provided for batchId: $batchId " +
125125
s"-> offset retrieved from service: '$newOffsetJson'.")
126126

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedTable.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import org.apache.spark.sql.connector.read.ScanBuilder
1515
import org.apache.spark.sql.util.CaseInsensitiveStringMap
1616

1717
import java.util
18-
import java.util.UUID
1918

2019
// scalastyle:off underscore.import
2120
import com.azure.cosmos.spark.CosmosTableSchemaInferrer._

0 commit comments

Comments
 (0)