Skip to content

Commit

Permalink
add (#437)
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db authored Nov 14, 2023
1 parent 138c133 commit e3c6d8c
Showing 1 changed file with 18 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,15 @@ case class DeltaSharingSource(
val currentTimeMillis = System.currentTimeMillis()
if (lastGetVersionTimestamp == -1 ||
(currentTimeMillis - lastGetVersionTimestamp) >= QUERY_TABLE_VERSION_INTERVAL_MILLIS) {
latestTableVersion = deltaLog.client.getTableVersion(deltaLog.table)
val serverVersion = deltaLog.client.getTableVersion(deltaLog.table)
if (serverVersion < 0) {
throw new IllegalStateException(s"Delta Sharing Server returning negative table version:" +
s"$serverVersion.")
} else if (serverVersion < latestTableVersion) {
logWarning(s"Delta Sharing Server returning smaller table version:$serverVersion < " +
s"$latestTableVersion.")
}
latestTableVersion = serverVersion
lastGetVersionTimestamp = currentTimeMillis
}
latestTableVersion
Expand Down Expand Up @@ -444,6 +452,10 @@ case class DeltaSharingSource(
}

val numFiles = tableFiles.files.size
logInfo(
s"Fetched ${numFiles} files for table version ${tableFiles.version} from" +
" delta sharing server."
)
tableFiles.files.sortWith(fileActionCompareFunc).zipWithIndex.foreach {
case (file, index) if (index > fromIndex) =>
appendToSortedFetchedFiles(
Expand Down Expand Up @@ -497,6 +509,11 @@ case class DeltaSharingSource(
TableRefreshResult(idToUrl, minUrlExpiration, None)
}
val allAddFiles = validateCommitAndFilterAddFiles(tableFiles).groupBy(a => a.version)
logInfo(
s"Fetched and filtered ${allAddFiles.size} files from startingVersion " +
s"${fromVersion} to endingVersion ${endingVersionForQuery} from " +
"delta sharing server."
)
for (v <- fromVersion to endingVersionForQuery) {
val vAddFiles = allAddFiles.getOrElse(v, ArrayBuffer[AddFileForCDF]())
val numFiles = vAddFiles.size
Expand Down

0 comments on commit e3c6d8c

Please sign in to comment.