Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds more logging in DeltaSharingFileSystem #456

Merged
merged 5 commits into from
Apr 8, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ import org.apache.http.impl.conn.{DefaultRoutePlanner, DefaultSchemePortResolver
import org.apache.http.protocol.HttpContext
import org.apache.spark.SparkEnv
import org.apache.spark.delta.sharing.{PreSignedUrlCache, PreSignedUrlFetcher}
import org.apache.spark.internal.Logging

import io.delta.sharing.client.model.FileAction
import io.delta.sharing.client.util.ConfUtils

/** Read-only file system for delta paths. */
private[sharing] class DeltaSharingFileSystem extends FileSystem {
private[sharing] class DeltaSharingFileSystem extends FileSystem with Logging {

import DeltaSharingFileSystem._

Expand All @@ -53,6 +54,7 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem {
.setConnectionRequestTimeout(timeoutInSeconds * 1000)
.setSocketTimeout(timeoutInSeconds * 1000).build()

logInfo(s"Creating delta sharing httpClient with timeoutInSeconds: $timeoutInSeconds.")
val clientBuilder = HttpClientBuilder.create()
.setMaxConnTotal(maxConnections)
.setMaxConnPerRoute(maxConnections)
Expand Down Expand Up @@ -105,11 +107,18 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem {
val path = DeltaSharingFileSystem.decode(f)
val fetcher =
new PreSignedUrlFetcher(preSignedUrlCacheRef, path.tablePath, path.fileId, refreshThresholdMs)

if (getConf.getBoolean("spark.delta.sharing.loadDataFilesInMemory", false)) {
logInfo(s"opening delta sharing path $path with InMemoryHttpInputStream.")
val start = System.currentTimeMillis()
// `InMemoryHttpInputStream` loads the content into the memory immediately, so we don't need
// to refresh urls.
new FSDataInputStream(new InMemoryHttpInputStream(new URI(fetcher.getUrl())))
val stream = new FSDataInputStream(new InMemoryHttpInputStream(new URI(fetcher.getUrl())))
logInfo(s"It took ${(System.currentTimeMillis() - start)/1000}s to build " +
s"InMemoryHttpInputStream for delta sharing path $path.")
stream
} else {
logInfo(s"opening delta sharing path [$path] with RandomAccessHttpInputStream.")
new FSDataInputStream(
new RandomAccessHttpInputStream(
httpClient,
Expand Down Expand Up @@ -154,6 +163,7 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem {
throw new UnsupportedOperationException("mkdirs")

override def getFileStatus(f: Path): FileStatus = {
logInfo(s"Checking delta sharing file status for path: $f.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Is it possible to guard these with some debug level?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to logDebug

val resolved = makeQualified(f)
new FileStatus(decode(resolved).fileSize, false, 0, 1, 0, f)
}
Expand Down
Loading