diff --git a/client/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala b/client/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala index b10b2fb5a..7bc053129 100644 --- a/client/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala +++ b/client/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala @@ -172,7 +172,15 @@ case class DeltaSharingSource( val currentTimeMillis = System.currentTimeMillis() if (lastGetVersionTimestamp == -1 || (currentTimeMillis - lastGetVersionTimestamp) >= QUERY_TABLE_VERSION_INTERVAL_MILLIS) { - latestTableVersion = deltaLog.client.getTableVersion(deltaLog.table) + val serverVersion = deltaLog.client.getTableVersion(deltaLog.table) + if (serverVersion < 0) { + throw new IllegalStateException(s"Delta Sharing Server returning negative table version:" + + s"$serverVersion.") + } else if (serverVersion < latestTableVersion) { + logWarning(s"Delta Sharing Server returning smaller table version:$serverVersion < " + + s"$latestTableVersion.") + } + latestTableVersion = serverVersion lastGetVersionTimestamp = currentTimeMillis } latestTableVersion @@ -444,6 +452,10 @@ case class DeltaSharingSource( } val numFiles = tableFiles.files.size + logInfo( + s"Fetched ${numFiles} files for table version ${tableFiles.version} from" + + " delta sharing server." + ) tableFiles.files.sortWith(fileActionCompareFunc).zipWithIndex.foreach { case (file, index) if (index > fromIndex) => appendToSortedFetchedFiles( @@ -497,6 +509,11 @@ case class DeltaSharingSource( TableRefreshResult(idToUrl, minUrlExpiration, None) } val allAddFiles = validateCommitAndFilterAddFiles(tableFiles).groupBy(a => a.version) + logInfo( + s"Fetched and filtered ${allAddFiles.size} files from startingVersion " + + s"${fromVersion} to endingVersion ${endingVersionForQuery} from " + + "delta sharing server." + ) for (v <- fromVersion to endingVersionForQuery) { val vAddFiles = allAddFiles.getOrElse(v, ArrayBuffer[AddFileForCDF]()) val numFiles = vAddFiles.size