-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(table/scanner): Initial pass for planning a scan and returning the files to use #118
Conversation
@Fokko @nastra This should be ready for review now, though there's a weirdness in the number of data files being created for one of the integration testing tables on the CI here vs when I run the docker compose and provisioning locally. I don't know enough about spark-iceberg internals to know whether that is a quirk, expected, or something that I should change the tests for. Any ideas? I've added a comment in |
@nastra Any further comments? |
thanks for the patience here @zeroshade. I'll do a full review in the next 2-3 days |
io/s3.go
Outdated
HostnameImmutable: true, | ||
}, nil | ||
}))) | ||
opts = append(opts, func(o *s3.Options) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the future I think it would be great to just extract such things out into a separate (small) PR. That way we can get PRs reviewed faster as otherwise it's quite difficult to find long periods of time to review a huge chunk of new code that is mixed with other changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about this, I originally had intended for this to be done as a separate change but the CI started failing without this / I wasn't able to get the CI testing of the new changes without this so I ended up adding it to this.
literals.go
Outdated
@@ -71,6 +71,12 @@ type TypedLiteral[T LiteralType] interface { | |||
Comparator() Comparator[T] | |||
} | |||
|
|||
type NumericLiteral interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used with the transforms, specifically for truncateNumber
.
Looking through this, I could probably change this to not be exported though if we want.
manifest.go
Outdated
for k, v := range input { | ||
switch v := v.(type) { | ||
case map[string]any: | ||
for typname, val := range v { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: did you mean to call this typeName
?
manifest.go
Outdated
// | ||
// Becomes: | ||
// | ||
// map[string]any{"ts": map[string]any{"int.date": time.Time{}}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we losing the field-id here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently yes, the hambra/avro
library doesn't return the field-id back to us at all.
I think I can possibly leverage the https://pkg.go.dev/github.com/hamba/avro/v2#Schema object in the library to get the field-id property but I haven't spent enough time there yet to work out the best way to handle it.
manifest.go
Outdated
case map[string]any: | ||
for typname, val := range v { | ||
switch typname { | ||
case "int.date": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you elaborate where this string representation is coming from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the representation is coming from the hambra/avro
library. When we unmarshal the data, it constructs the type as type
.logical-type
.
As described in the comment above, the avro has something like "type": {"type": "int", "logicalType": "date"}
so the hambra/avro
library will denote that with the type int.date
.
All of the below representations come from the avro specification for logical types
table/evaluators.go
Outdated
return (&manifestEvalVisitor{partitionFilter: boundFilter}).Eval, nil | ||
} | ||
|
||
type manifestEvalVisitor struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused, is this new code or just code that has been moved around?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just code that was moved around. I was able to put it into the table package to limit access. This doesn't need to be exported so I was able to shift this into the table package rather than export it in the main iceberg package.
e7a8eba
to
66797d6
Compare
@zeroshade could you please rebase this one now that all the other PRs are merged? |
7a111bd
to
2087008
Compare
@nastra All rebased already 😄 |
4c952ef
to
f4e008e
Compare
// for some reason when I run the provisioning locally i get 5 data files | ||
// but GHA CI running spark provisioning ends up with only 4 files? | ||
// anyone know why? | ||
{"test_uuid_and_fixed_unpartitioned", iceberg.AlwaysTrue{}, 4}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5 should be correct right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the default max parallelism of Spark is capped by the number of CPUs, so probably one of the data files contains two rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's probably fine to follow up on this issue in a separate PR @zeroshade
from pyiceberg.types import FixedType, NestedField, UUIDType | ||
|
||
spark = SparkSession.builder.getOrCreate() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love it
Very rough initial implementation of metrics evaluation and a simple scanner for Tables that produces the list of
FileScanTask
s to perform a scan along with positional delete files and so on.This also includes a framework and setup for performing integration testing that is adapted from the approach used in pyiceberg, creating docker images and a file of tests which are only executed by setting the
integration
tag which is used in a new workflow which runs those tests.This provides an end-to-end case of using a table and row-filter-expression to perform manifest and metrics evaluations to create the plan for scanning. The next step would be actually fetching the data!