-
Notifications
You must be signed in to change notification settings - Fork 360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add "open" method #1895
Add "open" method #1895
Conversation
813b1d0
to
a7783a6
Compare
Manual testing: Start a Spark shell configured to run everything: spark-shell --packages org.apache.hadoop:hadoop-aws:2.7.7 --jars ./target/hadoopfs-0.1.0.jar --master local --conf=spark.hadoop.fs.lakefs.impl=io.lakefs.LakeFSFileSystem --conf=spark.driver.extraJavaOptions='-Dcom.amazonaws.services.s3.enableV4' --conf=spark.hadoop.lakefs.api.url=http://localhost:8000/api/v1 --conf=spark.hadoop.fs.lakefs.access.key=AKIAlakeFS --conf=spark.hadoop.fs.lakefs.secret.key=lakeFShhhh --conf=spark.hadoop.fs.s3a.access.key=AKIApleaseDONTstealMYawsKEY --conf=spark.hadoop.fs.s3a.secret.key=Shhh... --conf=spark.hadoop.fs.s3a.endpoint=s3.eu-central-1.amazonaws.com Now run this in the shell: scala> import org.apache.hadoop.fs.Path; val p = new Path("lakefs://moo/main/yes"); val fs = p.getFileSystem(sc.hadoopConfiguration); val c = fs.open(p)
import org.apache.hadoop.fs.Path
p: org.apache.hadoop.fs.Path = lakefs://moo/main/yes
fs: org.apache.hadoop.fs.FileSystem = io.lakefs.LakeFSFileSystem@3b59d246
c: org.apache.hadoop.fs.FSDataInputStream = org.apache.hadoop.fs.FSDataInputStream@3fbf4614
scala> val buf = new Array[Byte](5555)
buf: Array[Byte] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...
scala> val len = c.read(buf)
len: Int = 5
scala> new String(buf.slice(0, len))
res0: String =
"huh!
" |
[+ @nopcoder for general layout, try to share |
private io.lakefs.clients.api.ApiClient apiClient; | ||
private ApiClient apiClient; | ||
|
||
private static class ParsedPath { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- made something like that 😃
- think there is indent issue started after this line
if (!parts[0].equals("")) { | ||
throw new RuntimeException("path does not start with / (" + String.join(" [/] ", parts) + ")"); | ||
} | ||
return new ParsedPath(pathUri.getAuthority(), parts[1], parts[2]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
used the getHost for this one as we don't use the port for the repository - not sure which one is the better
URI physicalUri = translateUri(new URI(stats.getPhysicalAddress())); | ||
|
||
Path physicalPath = new Path(physicalUri.toString()); | ||
FileSystem physicalFs = physicalPath.getFileSystem(conf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the part where I thought we need to initialize the underlying filesystem once in the initialize
and use it here and just call open
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was hoping to do that too. Disappointed that this is actually surprisingly difficult to do correctly. Now according to S3AFileSystem.initialize
(please look at the code), the repository is guaranteed to exist on the URL. (TBH, I don't understand why, but we might assume as much). So we could generate the underlying file system at that point.
Let's circle back to this point once we have a working FS? Because what this would do is (only) speed up multiple calls to open
(and create
, after #1858 ) when they occur on the same FileSystem. I.e. it speeds up the already-fast part of multi-part file read/write operations -- if it works.
Opened #1906.
FileSystem physicalFs = physicalPath.getFileSystem(conf); | ||
return physicalFs.open(physicalPath, bufSize); | ||
} catch (io.lakefs.clients.api.ApiException e) { | ||
throw new RuntimeException("API exception: " + e.getResponseBody()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think you can pass e
as inner exception here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, forgot about exception causes (aka "Go %w
done right"...). Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arielshaqed I reviewed this, and currently don't have comments to add to Barak's comments
a7783a6
to
d3da830
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
PTAL...
URI physicalUri = translateUri(new URI(stats.getPhysicalAddress())); | ||
|
||
Path physicalPath = new Path(physicalUri.toString()); | ||
FileSystem physicalFs = physicalPath.getFileSystem(conf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was hoping to do that too. Disappointed that this is actually surprisingly difficult to do correctly. Now according to S3AFileSystem.initialize
(please look at the code), the repository is guaranteed to exist on the URL. (TBH, I don't understand why, but we might assume as much). So we could generate the underlying file system at that point.
Let's circle back to this point once we have a working FS? Because what this would do is (only) speed up multiple calls to open
(and create
, after #1858 ) when they occur on the same FileSystem. I.e. it speeds up the already-fast part of multi-part file read/write operations -- if it works.
Opened #1906.
FileSystem physicalFs = physicalPath.getFileSystem(conf); | ||
return physicalFs.open(physicalPath, bufSize); | ||
} catch (io.lakefs.clients.api.ApiException e) { | ||
throw new RuntimeException("API exception: " + e.getResponseBody()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, forgot about exception causes (aka "Go %w
done right"...). Thanks!
Untested -- need an integration test (vs. e.g. minIO and/or true S3).
(Was concurrently added to our sources, 1 implementation is better than 2)
257f9b5
to
340e7b2
Compare
Thanks! Pulling... |
Untested -- need an integration test (vs. e.g. minIO and/or true S3).
Closes #1856.