Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log additional queryId and table id and table name #576

Merged
merged 3 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ import io.delta.sharing.client.util.{ConfUtils, JsonUtils, RetryUtils, Unexpecte

/** An interface to fetch Delta metadata from remote server. */
trait DeltaSharingClient {

protected var dsQueryId: Option[String] = None

def getQueryId: String = {
dsQueryId.getOrElse("dsQueryIdNotSet")
}
def listAllTables(): Seq[Table]

def getTableVersion(table: Table, startingTimestamp: Option[String] = None): Long
Expand Down Expand Up @@ -175,8 +181,6 @@ class DeltaSharingRestClient(
// Convert the responseFormat to a Seq to be used later.
private val responseFormatSet = responseFormat.split(",").toSet

private var queryId: Option[String] = None

private lazy val client = {
val clientBuilder: HttpClientBuilder = if (sslTrustAll) {
val sslBuilder = new SSLContextBuilder()
Expand Down Expand Up @@ -281,10 +285,10 @@ class DeltaSharingRestClient(
private def checkRespondedFormat(respondedFormat: String, rpc: String, table: String): Unit = {
if (!responseFormatSet.contains(respondedFormat)) {
logError(s"RespondedFormat($respondedFormat) is different from requested " +
s"responseFormat($responseFormat) for $rpc for table $table, queryId[$queryId].")
s"responseFormat($responseFormat) for $rpc for table $table, dsQueryId[$dsQueryId].")
throw new IllegalArgumentException("The responseFormat returned from the delta sharing " +
s"server doesn't match the requested responseFormat: respondedFormat($respondedFormat)" +
s" != requestedFormat($responseFormat), queryId[$queryId].")
s" != requestedFormat($responseFormat), dsQueryId[$dsQueryId].")
}
}

Expand Down Expand Up @@ -716,7 +720,7 @@ class DeltaSharingRestClient(
|$expectedProtocol, $expectedMetadata. Actual: version $version,
|$respondedFormat, ${lines(0)}, ${lines(1)}""".stripMargin
logError(s"Error while fetching next page files at url $targetUrl " +
s"with body(${JsonUtils.toJson(requestBody.orNull)}: $errorMsg), queryId[$queryId].")
s"with body(${JsonUtils.toJson(requestBody.orNull)}: $errorMsg), dsQueryId[$dsQueryId].")
throw new IllegalStateException(errorMsg)
}

Expand Down Expand Up @@ -974,8 +978,8 @@ class DeltaSharingRestClient(
allowNoContent: Boolean = false,
fetchAsOneString: Boolean = false
): (Option[Long], Option[String], Seq[String]) = {
// Reset queryId before calling RetryUtils, and before prepareHeaders.
queryId = Some(UUID.randomUUID().toString().split('-').head)
// Reset dsQueryId before calling RetryUtils, and before prepareHeaders.
dsQueryId = Some(UUID.randomUUID().toString().split('-').head)
RetryUtils.runWithExponentialBackoff(numRetries, maxRetryDuration) {
val profile = profileProvider.getProfile
val response = client.execute(
Expand Down Expand Up @@ -1008,7 +1012,7 @@ class DeltaSharingRestClient(
}
} catch {
case e: org.apache.http.ConnectionClosedException =>
val error = s"Request to delta sharing server failed for queryId[$queryId] " +
val error = s"Request to delta sharing server failed for dsQueryId[$dsQueryId] " +
s"due to ${e}."
logError(error)
lineBuffer += error
Expand Down Expand Up @@ -1059,7 +1063,7 @@ class DeltaSharingRestClient(
}

private def getQueryIdString: String = {
s"QueryId-${queryId.getOrElse("not_set")}"
s"QueryId-${dsQueryId.getOrElse("not_set")}"
}

// The value for delta-sharing-capabilities header, semicolon separated capabilities.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ case class DeltaSharingSource(
val intervalSeconds = ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS.max(
ConfUtils.streamingQueryTableVersionIntervalSeconds(spark.sessionState.conf)
)
logInfo(s"Configured queryTableVersionIntervalSeconds:${intervalSeconds}, " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
logInfo(s"Configured queryTableVersionIntervalSeconds:${intervalSeconds}," +
getTableInfoForLogging)
if (intervalSeconds < ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS) {
throw new IllegalArgumentException(s"QUERY_TABLE_VERSION_INTERVAL_MILLIS($intervalSeconds) " +
s"must not be less than ${ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS} seconds.")
Expand All @@ -167,6 +167,13 @@ case class DeltaSharingSource(
TableRefreshResult(Map.empty[String, String], None, None)
}

private lazy val getTableInfoForLogging: String =
s"for table(id:$tableId, name:${deltaLog.table.toString})"

private def getQueryIdForLogging: String = {
s", with queryId(${deltaLog.client.getQueryId})"
}

// Check the latest table version from the delta sharing server through the client.getTableVersion
// RPC. Adding a minimum interval of QUERY_TABLE_VERSION_INTERVAL_MILLIS between two consecutive
// rpcs to avoid traffic jam on the delta sharing server.
Expand All @@ -175,14 +182,14 @@ case class DeltaSharingSource(
if (lastGetVersionTimestamp == -1 ||
(currentTimeMillis - lastGetVersionTimestamp) >= QUERY_TABLE_VERSION_INTERVAL_MILLIS) {
val serverVersion = deltaLog.client.getTableVersion(deltaLog.table)
logInfo(s"Got table version $serverVersion from Delta Sharing Server." +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
logInfo(s"Got table version $serverVersion from Delta Sharing Server, " +
getTableInfoForLogging)
if (serverVersion < 0) {
throw new IllegalStateException(s"Delta Sharing Server returning negative table version:" +
s"$serverVersion.")
} else if (serverVersion < latestTableVersion) {
logWarning(s"Delta Sharing Server returning smaller table version:$serverVersion < " +
s"$latestTableVersion, for table(id:$tableId, name:${deltaLog.table.toString})")
s"$latestTableVersion, " + getTableInfoForLogging)
}
latestTableVersion = serverVersion
lastGetVersionTimestamp = currentTimeMillis
Expand Down Expand Up @@ -248,7 +255,7 @@ case class DeltaSharingSource(
s"$fromVersion, $fromIndex, $isStartingVersion) is not included in sortedFetchedFiles[" +
s"(${headFile.version}, ${headFile.index}, ${headFile.isSnapshot}) to " +
s"(${lastFile.version}, ${lastFile.index}, ${lastFile.isSnapshot})], " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
getTableInfoForLogging)
sortedFetchedFiles = Seq.empty
} else {
return
Expand All @@ -267,7 +274,7 @@ case class DeltaSharingSource(
logInfo(s"Reducing ending version for delta sharing rpc from currentLatestVersion(" +
s"$currentLatestVersion) to endingVersionForQuery($endingVersionForQuery), fromVersion:" +
s"$fromVersion, maxVersionsPerRpc:$maxVersionsPerRpc, " +
s"for table(id:$tableId, name:${deltaLog.table.toString})."
getTableInfoForLogging
)
}

Expand Down Expand Up @@ -334,7 +341,7 @@ case class DeltaSharingSource(
): Unit = {
synchronized {
logInfo(s"Refreshing sortedFetchedFiles(size: ${sortedFetchedFiles.size}) with newIdToUrl(" +
s"size: ${newIdToUrl.size}), for table(id:$tableId, name:${deltaLog.table.toString}).")
s"size: ${newIdToUrl.size}), " + getTableInfoForLogging + getQueryIdForLogging)
lastQueryTableTimestamp = queryTimestamp
minUrlExpirationTimestamp = newMinUrlExpiration
if (!CachedTableManager.INSTANCE.isValidUrlExpirationTime(minUrlExpirationTimestamp)) {
Expand Down Expand Up @@ -384,7 +391,7 @@ case class DeltaSharingSource(
)
}
logInfo(s"Refreshed ${numUrlsRefreshed} urls in sortedFetchedFiles(size: " +
s"${sortedFetchedFiles.size}), for table(id:$tableId, name:${deltaLog.table.toString})")
s"${sortedFetchedFiles.size}), " + getTableInfoForLogging)
}
}

Expand All @@ -411,7 +418,7 @@ case class DeltaSharingSource(
endingVersionForQuery: Long): Unit = {
logInfo(s"Fetching files with fromVersion($fromVersion), fromIndex($fromIndex), " +
s"isStartingVersion($isStartingVersion), endingVersionForQuery($endingVersionForQuery), " +
s"for table(id:$tableId, name:${deltaLog.table.toString})."
getTableInfoForLogging
)
resetGlobalTimestamp()
if (isStartingVersion) {
Expand Down Expand Up @@ -458,7 +465,7 @@ case class DeltaSharingSource(
val numFiles = tableFiles.files.size
logInfo(
s"Fetched ${numFiles} files for table version ${tableFiles.version} from" +
s" delta sharing server, for table(id:$tableId, name:${deltaLog.table.toString})."
s" delta sharing server, " + getTableInfoForLogging + getQueryIdForLogging
)
tableFiles.files.sortWith(fileActionCompareFunc).zipWithIndex.foreach {
case (file, index) if (index > fromIndex) =>
Expand Down Expand Up @@ -512,11 +519,13 @@ case class DeltaSharingSource(

TableRefreshResult(idToUrl, minUrlExpiration, None)
}
val allAddFiles = validateCommitAndFilterAddFiles(tableFiles).groupBy(a => a.version)
val filteredAddFiles = validateCommitAndFilterAddFiles(tableFiles)
val allAddFiles = filteredAddFiles.groupBy(a => a.version)
logInfo(
s"Fetched and filtered ${allAddFiles.size} files from startingVersion " +
s"Fetched ${tableFiles.addFiles.size} files, filtered ${filteredAddFiles.size} " +
s"in ${allAddFiles.size} versions from startingVersion " +
s"${fromVersion} to endingVersion ${endingVersionForQuery} from " +
s"delta sharing server, for table(id:$tableId, name:${deltaLog.table.toString})."
s"delta sharing server, " + getTableInfoForLogging + getQueryIdForLogging
)
for (v <- fromVersion to endingVersionForQuery) {
val vAddFiles = allAddFiles.getOrElse(v, ArrayBuffer[AddFileForCDF]())
Expand Down Expand Up @@ -556,8 +565,7 @@ case class DeltaSharingSource(
fromIndex: Long,
endingVersionForQuery: Long): Unit = {
logInfo(s"Fetching CDF files with fromVersion($fromVersion), fromIndex($fromIndex), " +
s"endingVersionForQuery($endingVersionForQuery), " +
s"for table(id:$tableId, name:${deltaLog.table.toString}).")
s"endingVersionForQuery($endingVersionForQuery), " + getTableInfoForLogging)
resetGlobalTimestamp()
val tableFiles = deltaLog.client.getCDFFiles(
deltaLog.table,
Expand Down Expand Up @@ -1003,7 +1011,7 @@ case class DeltaSharingSource(

override def getBatch(startOffsetOption: Option[Offset], end: Offset): DataFrame = {
logInfo(s"getBatch with startOffsetOption($startOffsetOption) and end($end), " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
getTableInfoForLogging)
val endOffset = DeltaSharingSourceOffset(tableId, end)

val (startVersion, startIndex, isStartingVersion, startSourceVersion) = if (
Expand All @@ -1030,7 +1038,7 @@ case class DeltaSharingSource(
val startOffset = DeltaSharingSourceOffset(tableId, startOffsetOption.get)
if (startOffset == endOffset) {
logInfo(s"startOffset($startOffset) is the same as endOffset($endOffset) in getBatch, " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
getTableInfoForLogging)
previousOffset = endOffset
// This happens only if we recover from a failure and `MicroBatchExecution` tries to call
// us with the previous offsets. The returned DataFrame will be dropped immediately, so we
Expand Down Expand Up @@ -1130,7 +1138,7 @@ case class DeltaSharingSource(
} else if (options.startingTimestamp.isDefined) {
val version = deltaLog.client.getTableVersion(deltaLog.table, options.startingTimestamp)
logInfo(s"Got table version $version for timestamp ${options.startingTimestamp} " +
s"from Delta Sharing Server, for table(id:$tableId, name:${deltaLog.table.toString})")
s"from Delta Sharing Server, " + getTableInfoForLogging)
Some(version)
} else {
None
Expand Down
Loading