-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
util/parquet: add support for all types #101946
Conversation
7c227a0
to
ebd58ea
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 8 of 9 files at r1.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava)
pkg/sql/sem/tree/datum.go
line 1424 at r1 (raw file):
} // AsDCollatedString attempts to retrieve a DString from an Expr, returning a AsDCollatedString and
nit: comments wrap at 80?
pkg/sql/sem/tree/datum.go
line 5083 at r1 (raw file):
// a flag signifying whether the assertion was successful. The function should // // be used instead of direct type assertions wherever a *DEnum wrapped by a // // *DOidWrapper is possible.
nit: extra '//'' in the comment above
pkg/sql/sem/tree/datum.go
line 5092 at r1 (raw file):
} return nil, false }
I was hoping that we can get some template magic going... but maybe for another time.
pkg/sql/sem/tree/format.go
line 670 at r1 (raw file):
// CloseAndGetBytes is the same as CloseAndGetString, except it returns bytes. // This avoids an allocation by not calling ctx.String().
Let's expand this comment to say that the return byte array is only valid until the next
call to fmtctx.
pkg/sql/sem/tree/format.go
line 673 at r1 (raw file):
func (ctx *FmtCtx) CloseAndGetBytes() []byte { s := ctx.Bytes() ctx.Close()
This is definitely not safe. You cannot have CloseAndGetBytes method. As a matter of fact, you should not have this method at all.
Unless i'm misreading the code, ctx.Close() resets the buffer (that's fine because you're still pointing to the "old" slice), but more important it returns it to the pool; and that means that that context can be reused, and the data in the underlying array will be overridden.
As I say above, just have a Writer level fmtCtx (which you close when you close the writer), and pass fmtCtx on every call; You can then call ctx.Bytes() yourself, and that's safe because you complete encoding of the datum before you encode the next one.
pkg/util/parquet/decoders.go
line 96 at r1 (raw file):
dtStr := string(v) d, dependsOnCtx, err := tree.ParseDTimestampTZ(nil, dtStr, time.Microsecond) if dependsOnCtx {
hmm... should we return an error hire?
Should we use default parse context?
Note: changefeed (queries at least) serialize this information in the proto buffer, so you probably have
access to it. If it's a test only thing, i'd just use default context.
pkg/util/parquet/schema.go
line 356 at r1 (raw file):
result.decoder = collatedStringDecoder{} return result, nil case types.ArrayFamily:
Let's add a test that calls this function for all valid type families.
You can iterate over types.Family_values
map and call this function for each one (I'm sure there will be some corner cases, like array, but I know you can figure out a way to make sure we don't miss any types, or any new types that can be added in the future)
pkg/util/parquet/testutils.go
line 271 at r1 (raw file):
PhysicalRepresentations: [][]byte{ {0x42, 0x1}, {0x42},
there is no utility method out there to help you generate those 0x42, etc?
pkg/util/parquet/write_functions.go
line 179 at r1 (raw file):
fmtCtx := tree.NewFmtCtx(tree.FmtExport) d.Format(fmtCtx) a.byteArrayBatch[0] = fmtCtx.CloseAndGetBytes()
fmtCtx should be Closed() at some point; otherwise it is never returned to the pool.
You have a bit more work to do, but I think it would make sense to allocate a single fmtCtx in the Writer (top level parquet datum writer); close this fmtCtx in the close method of the writer, and pass
explicit FmtCtx argument to each write function.
pkg/util/parquet/writer_test.go
line 63 at r1 (raw file):
// We will manually add array types which are supported below. // Excluding types.TupleFamily and types.ArrayFamily leaves us with only // scalar types so far.
so, we still have few more follow on PRs to handle the above (typles, array of arrays, tuples of array of tuples, etc; correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava)
pkg/util/parquet/write_functions.go
line 346 at r1 (raw file):
} func writeJson(
nit: capitalize JSON?
ebd58ea
to
5e07b02
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)
pkg/sql/sem/tree/datum.go
line 5092 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I was hoping that we can get some template magic going... but maybe for another time.
Good idea :)
pkg/util/parquet/decoders.go
line 96 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
hmm... should we return an error hire?
Should we use default parse context?
Note: changefeed (queries at least) serialize this information in the proto buffer, so you probably have
access to it. If it's a test only thing, i'd just use default context.
I think what I have is good. This is only used in testing. When we write parquet timestamps, we write the timestamp explicitly (we don't encode them as "NOW()" or anything like that). Thus we should assert in tests that the parse context is not required. If I'm reading the code correctly, the parse context is only needed for the current time or current timezone when those are not available.
See
cockroach/pkg/util/timeutil/pgdate/field_extract.go
Lines 167 to 170 in 048f833
case keywordNow: if err := fe.matchedSentinel(fe.now(), match); err != nil { return err } cockroach/pkg/util/timeutil/pgdate/parsing.go
Lines 246 to 250 in 048f833
if err := fe.Extract(s); err != nil { return TimeEpoch, false, parseError(err, "timestamp", s) } res := fe.MakeTimestampWithoutTimezone() return res, fe.currentTimeUsed, nil
pkg/util/parquet/schema.go
line 356 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Let's add a test that calls this function for all valid type families.
You can iterate overtypes.Family_values
map and call this function for each one (I'm sure there will be some corner cases, like array, but I know you can figure out a way to make sure we don't miss any types, or any new types that can be added in the future)
I think the existing randomized test that uses randgen.SeedTypes
gets the job done already, as it is made by iterating over all types - see below. The set of all types should cover all families.
I could use something like types.Family_name
to iterate over all the families and call this function in a test, but this function takes a *types.T
. I would have to randomly generate a type from the family, which can get a bit messy. I feel like the existing randomized test is sufficient.
cockroach/pkg/sql/randgen/type.go
Lines 41 to 80 in 85b666e
for _, typ := range types.OidToType { | |
switch typ.Oid() { | |
case oid.T_regnamespace: | |
// Temporarily don't include this. | |
// TODO(msirek): Remove this exclusion once | |
// https://github.com/cockroachdb/cockroach/issues/55791 is fixed. | |
case oid.T_unknown, oid.T_anyelement: | |
// Don't include these. | |
case oid.T_anyarray, oid.T_oidvector, oid.T_int2vector: | |
// Include these. | |
SeedTypes = append(SeedTypes, typ) | |
default: | |
// Only include scalar types. | |
if typ.Family() != types.ArrayFamily { | |
SeedTypes = append(SeedTypes, typ) | |
} | |
} | |
} | |
for _, typ := range types.OidToType { | |
if IsAllowedForArray(typ) { | |
arrayContentsTypes = append(arrayContentsTypes, typ) | |
} | |
} | |
// Add a collated string separately (since it shares the oid with the STRING | |
// type and, thus, wasn't included above). | |
collatedStringType := types.MakeCollatedString(types.String, "en" /* locale */) | |
SeedTypes = append(SeedTypes, collatedStringType) | |
if IsAllowedForArray(collatedStringType) { | |
arrayContentsTypes = append(arrayContentsTypes, collatedStringType) | |
} | |
// Sort these so randomly chosen indexes always point to the same element. | |
sort.Slice(SeedTypes, func(i, j int) bool { | |
return SeedTypes[i].String() < SeedTypes[j].String() | |
}) | |
sort.Slice(arrayContentsTypes, func(i, j int) bool { | |
return arrayContentsTypes[i].String() < arrayContentsTypes[j].String() | |
}) |
pkg/util/parquet/testutils.go
line 271 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
there is no utility method out there to help you generate those 0x42, etc?
Yes there is. Done.
pkg/util/parquet/write_functions.go
line 179 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
fmtCtx should be Closed() at some point; otherwise it is never returned to the pool.
You have a bit more work to do, but I think it would make sense to allocate a single fmtCtx in the Writer (top level parquet datum writer); close this fmtCtx in the close method of the writer, and pass
explicit FmtCtx argument to each write function.
Done.
pkg/util/parquet/writer.go
line 123 at r2 (raw file):
} fmtCtx.Close()
I decided not to store one fmtCtx
in the Writer
and reuse it. I want to avoid this situation: Say we write a large JSON, so the []byte
in the fmtCtx
grows and then we write small datums. The memory required to store the JSON is not released. To avoid the problem, I create a new fmtCtx
when we call write and close it after writing. I assume that the underlying pool is smart enough to release large allocs in the pool - at least when memory is required elsewhere
pkg/util/parquet/writer_test.go
line 63 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
so, we still have few more follow on PRs to handle the above (typles, array of arrays, tuples of array of tuples, etc; correct?
Yes ofc... But I think we should integrate these with changefeeds and EXPORT
first.
pkg/sql/sem/tree/format.go
line 670 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Let's expand this comment to say that the return byte array is only valid until the next
call to fmtctx.
Deleted this method.
pkg/sql/sem/tree/format.go
line 673 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
This is definitely not safe. You cannot have CloseAndGetBytes method. As a matter of fact, you should not have this method at all.
Unless i'm misreading the code, ctx.Close() resets the buffer (that's fine because you're still pointing to the "old" slice), but more important it returns it to the pool; and that means that that context can be reused, and the data in the underlying array will be overridden.
As I say above, just have a Writer level fmtCtx (which you close when you close the writer), and pass fmtCtx on every call; You can then call ctx.Bytes() yourself, and that's safe because you complete encoding of the datum before you encode the next one.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 3 of 6 files at r2.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava)
pkg/util/parquet/schema.go
line 356 at r1 (raw file):
Previously, jayshrivastava (Jayant) wrote…
I think the existing randomized test that uses
randgen.SeedTypes
gets the job done already, as it is made by iterating over all types - see below. The set of all types should cover all families.I could use something like
types.Family_name
to iterate over all the families and call this function in a test, but this function takes a*types.T
. I would have to randomly generate a type from the family, which can get a bit messy. I feel like the existing randomized test is sufficient.cockroach/pkg/sql/randgen/type.go
Lines 41 to 80 in 85b666e
for _, typ := range types.OidToType { switch typ.Oid() { case oid.T_regnamespace: // Temporarily don't include this. // TODO(msirek): Remove this exclusion once // https://github.com/cockroachdb/cockroach/issues/55791 is fixed. case oid.T_unknown, oid.T_anyelement: // Don't include these. case oid.T_anyarray, oid.T_oidvector, oid.T_int2vector: // Include these. SeedTypes = append(SeedTypes, typ) default: // Only include scalar types. if typ.Family() != types.ArrayFamily { SeedTypes = append(SeedTypes, typ) } } } for _, typ := range types.OidToType { if IsAllowedForArray(typ) { arrayContentsTypes = append(arrayContentsTypes, typ) } } // Add a collated string separately (since it shares the oid with the STRING // type and, thus, wasn't included above). collatedStringType := types.MakeCollatedString(types.String, "en" /* locale */) SeedTypes = append(SeedTypes, collatedStringType) if IsAllowedForArray(collatedStringType) { arrayContentsTypes = append(arrayContentsTypes, collatedStringType) } // Sort these so randomly chosen indexes always point to the same element. sort.Slice(SeedTypes, func(i, j int) bool { return SeedTypes[i].String() < SeedTypes[j].String() }) sort.Slice(arrayContentsTypes, func(i, j int) bool { return arrayContentsTypes[i].String() < arrayContentsTypes[j].String() })
It's fine; it is sufficient in terms for coverage.
Explicitly iterating over all types could produce better error messages though.
Like "missing parser for newly added type Foo" type error.
Up to you.
pkg/util/parquet/writer.go
line 122 at r2 (raw file):
return err } fmtCtx.Close()
defer close right after you get NewFmtCtx since you may return an error above.
This change adds support for the following types families to the `util/parquet` library: types.INetFamily, types.JsonFamily, types.FloatFamily, types.BytesFamily, types.BitFamily, types.EnumFamily, types.Box2DFamily, types.GeographyFamily, types.GeometryFamily, types.DateFamily, types.TimeFamily, types.TimeTZFamily, case types.IntervalFamily, types.TimestampTZFamily. Release note: None Informs: cockroachdb#99028 Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
5e07b02
to
7017d52
Compare
bors r=miretskiy |
Build succeeded: |
This change adds support for the following types families to the
util/parquet
library:types.INetFamily, types.JsonFamily, types.FloatFamily, types.BytesFamily, types.BitFamily, types.EnumFamily, types.Box2DFamily, types.GeographyFamily, types.GeometryFamily, types.DateFamily, types.TimeFamily, types.TimeTZFamily, case types.IntervalFamily, types.TimestampTZFamily.
Release note: None
Informs: #99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071