Skip to content

Commit

Permalink
* Ingest at the last for dup blobs option (#404)
Browse files Browse the repository at this point in the history
  • Loading branch information
ag-ramachandran authored Oct 8, 2024
1 parent 175ac60 commit da0963f
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success, Try}
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.util.concurrent.ConcurrentHashMap

object KustoWriter {
private val className = this.getClass.getSimpleName
Expand Down Expand Up @@ -392,6 +393,8 @@ object KustoWriter {
batchIdForTracing: String): Unit = {
val partitionId = TaskContext.getPartitionId
val partitionIdString = TaskContext.getPartitionId.toString
val taskMap = new ConcurrentHashMap[String, BlobWriteResource]()

def ingest(
blobResource: BlobWriteResource,
size: Long,
Expand Down Expand Up @@ -486,26 +489,30 @@ object KustoWriter {
if (shouldNotCommitBlockBlob) {
blobWriter
} else {
KDSU.logInfo(
className,
s"Sealing blob in partition $partitionIdString for requestId: '${parameters.writeOptions.requestId}', " +
s"blob number ${row._2}, with size $count")
finalizeBlobWrite(blobWriter)
ingest(
blobWriter,
blobWriter.csvWriter.getCounter,
blobWriter.sas,
flushImmediately = !parameters.writeOptions.disableFlushImmediately,
curBlobUUID,
kustoClient)
curBlobUUID = UUID.randomUUID().toString
createBlobWriter(
parameters.coordinates,
parameters.tmpTableName,
kustoClient,
partitionIdString,
row._2,
curBlobUUID)
if (parameters.writeOptions.ensureNoDupBlobs) {
taskMap.put(curBlobUUID, blobWriter)
} else {
KDSU.logInfo(
className,
s"Sealing blob in partition $partitionIdString for requestId: '${parameters.writeOptions.requestId}', " +
s"blob number ${row._2}, with size $count")
finalizeBlobWrite(blobWriter)
ingest(
blobWriter,
blobWriter.csvWriter.getCounter,
blobWriter.sas,
flushImmediately = !parameters.writeOptions.disableFlushImmediately,
curBlobUUID,
kustoClient)
curBlobUUID = UUID.randomUUID().toString
createBlobWriter(
parameters.coordinates,
parameters.tmpTableName,
kustoClient,
partitionIdString,
row._2,
curBlobUUID)
}
}
}

Expand All @@ -515,13 +522,22 @@ object KustoWriter {
s"requestId: '${parameters.writeOptions.requestId}' ")
finalizeBlobWrite(lastBlobWriter)
if (lastBlobWriter.csvWriter.getCounter > 0) {
ingest(
lastBlobWriter,
lastBlobWriter.csvWriter.getCounter,
lastBlobWriter.sas,
flushImmediately = false,
curBlobUUID,
kustoClient)
if (parameters.writeOptions.ensureNoDupBlobs) {
taskMap.put(curBlobUUID, lastBlobWriter)
} else {
ingest(
lastBlobWriter,
lastBlobWriter.csvWriter.getCounter,
lastBlobWriter.sas,
flushImmediately = false,
curBlobUUID,
kustoClient)
}
}
if (parameters.writeOptions.ensureNoDupBlobs && taskMap.size() > 0) {
taskMap.forEach((uuid, bw) => {
ingest(bw, bw.csvWriter.getCounter, bw.sas, flushImmediately = false, uuid, kustoClient)
})
}
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<packaging>pom</packaging>
<version>${revision}</version>
<properties>
<revision>5.2.2</revision>
<revision>5.2.3</revision>
<!-- Spark dependencies -->
<scala.version.major>2.12</scala.version.major>
<scalafmt.plugin.version>1.1.1640084764.9f463a9</scalafmt.plugin.version>
Expand Down

0 comments on commit da0963f

Please sign in to comment.