Skip to content

Commit

Permalink
Improve error handling and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db committed Nov 13, 2023
1 parent d863b6e commit 2a489b6
Showing 1 changed file with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,15 @@ case class DeltaSharingSource(
val currentTimeMillis = System.currentTimeMillis()
if (lastGetVersionTimestamp == -1 ||
(currentTimeMillis - lastGetVersionTimestamp) >= QUERY_TABLE_VERSION_INTERVAL_MILLIS) {
latestTableVersion = deltaLog.client.getTableVersion(deltaLog.table)
val tmpVersion = deltaLog.client.getTableVersion(deltaLog.table)
if (tmpVersion < 0) {
throw new IllegalStateException(s"Delta Sharing Server returning negative table version:" +
s"$tmpVersion.")
} else if (tmpVersion < latestTableVersion) {
logWarning(s"Delta Sharing Server returning smaller table version:$tmpVersion < " +
s"$latestTableVersion.")
}
latestTableVersion = tmpVersion
lastGetVersionTimestamp = currentTimeMillis
}
latestTableVersion
Expand Down Expand Up @@ -321,6 +329,10 @@ case class DeltaSharingSource(
}

val numFiles = tableFiles.files.size
logInfo(
s"Fetched ${numFiles} files for table version ${tableFiles.version} from" +
" delta sharing server."
)
tableFiles.files.sortWith(fileActionCompareFunc).zipWithIndex.foreach {
case (file, index) if (index > fromIndex) =>
appendToSortedFetchedFiles(
Expand Down Expand Up @@ -361,9 +373,14 @@ case class DeltaSharingSource(
.toMap
TableRefreshResult(idToUrl, None)
}

val allAddFiles = validateCommitAndFilterAddFiles(tableFiles).groupBy(a => a.version)
logInfo(
s"Fetched and filtered ${allAddFiles.size} files from startingVersion " +
s"${fromVersion} to endingVersion ${endingVersionForQuery} from " +
"delta sharing server."
)
for (v <- fromVersion to endingVersionForQuery) {

val vAddFiles = allAddFiles.getOrElse(v, ArrayBuffer[AddFileForCDF]())
val numFiles = vAddFiles.size
appendToSortedFetchedFiles(
Expand Down

0 comments on commit 2a489b6

Please sign in to comment.