-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
importccl: Parallelize avro import #45269
Conversation
6f93b6c
to
4805318
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @dt, @miretskiy, and @spaskob)
pkg/ccl/importccl/read_import_avro.go, line 232 at r1 (raw file):
return false } c.rejected <- fmt.Sprintf("%s\n", row)
I am confused, isn't avro a binary format? What is the meaning of row in this case?
pkg/ccl/importccl/read_import_base.go, line 356 at r1 (raw file):
type namedInput struct { reader *fileReader name string
could you add comments what name and idx represent?
pkg/ccl/importccl/read_import_base.go, line 360 at r1 (raw file):
} // A scanner over avro input.
why is the comment specific to avro?
ab759ac
to
5c30103
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @dt and @spaskob)
pkg/ccl/importccl/read_import_avro.go, line 232 at r1 (raw file):
Previously, spaskob (Spas Bojanov) wrote…
I am confused, isn't avro a binary format? What is the meaning of row in this case?
You are not confused; avro may or may not be binary.
However, at this stage, we have read avro input file and converted that input into a native go type
(for example: map, or an array, or a byte array, or an int, etc). So, i'm just printing that representation of the row that we have failed to handle.
I wonder if I should print %v instead though....
pkg/ccl/importccl/read_import_base.go, line 356 at r1 (raw file):
Previously, spaskob (Spas Bojanov) wrote…
could you add comments what name and idx represent?
Done.
pkg/ccl/importccl/read_import_base.go, line 360 at r1 (raw file):
Previously, spaskob (Spas Bojanov) wrote…
why is the comment specific to avro?
Because of copy & paste.
Updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @dt and @spaskob)
pkg/ccl/importccl/read_import_base.go, line 360 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Because of copy & paste.
Updated.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @dt, @miretskiy, and @spaskob)
pkg/ccl/importccl/read_import_avro.go, line 232 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
You are not confused; avro may or may not be binary.
However, at this stage, we have read avro input file and converted that input into a native go type
(for example: map, or an array, or a byte array, or an int, etc). So, i'm just printing that representation of the row that we have failed to handle.I wonder if I should print %v instead though....
We may need to discuss how rejected file is going to be used. The initial idea is that it is in the same format as the input so that the user can repeat the import but only reading from the rejected file. If this is not feasible for avro we should reconsider this approach.
pkg/ccl/importccl/read_import_csv.go, line 82 at r2 (raw file):
} func (c *csvInputReader) readFile(
I wonder if this function can be refactored away in the future as well.
pkg/ccl/importccl/read_import_csv.go, line 192 at r2 (raw file):
col := conv.VisibleCols[i] err = wrapRowErr(err, s.input.name, rowNum, pgcode.Syntax, "parse %q as %s", col.Name, col.Type.SQLString())
I wonder if we should also include the field in the error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @dt and @spaskob)
pkg/ccl/importccl/read_import_avro.go, line 232 at r1 (raw file):
Previously, spaskob (Spas Bojanov) wrote…
We may need to discuss how rejected file is going to be used. The initial idea is that it is in the same format as the input so that the user can repeat the import but only reading from the rejected file. If this is not feasible for avro we should reconsider this approach.
It's not feasible for avro.
pkg/ccl/importccl/read_import_csv.go, line 82 at r2 (raw file):
Previously, spaskob (Spas Bojanov) wrote…
I wonder if this function can be refactored away in the future as well.
perhaps... Not sure if it's possible to refactor e.g. mysqldump or pgdump formats,
but perhaps move this to parallelImporter in base and just call that
pkg/ccl/importccl/read_import_csv.go, line 192 at r2 (raw file):
Previously, spaskob (Spas Bojanov) wrote…
I wonder if we should also include the field in the error message.
Perhaps -- I tried to keep things the same where possible. if you want, I can change this err message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @dt and @spaskob)
pkg/ccl/importccl/read_import_avro.go, line 232 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
It's not feasible for avro.
Done.
746014a
to
fc53315
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 6 of 10 files at r1, 4 of 6 files at r3.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @dt, @miretskiy, and @spaskob)
pkg/ccl/importccl/read_import_base.go, line 431 at r3 (raw file):
Scan() bool // Err returns an error (if any) encountered when processing avro stream.
avro mentioned in comment
pkg/ccl/importccl/read_import_base.go, line 523 at r3 (raw file):
// Scan more data, unless we're done. select { case <-ctx.Done():
I'm surprised this didn't show up in profiling, but .Done()
in hot loops is usually performance problem -- we usually pull it into local above the loop done := ctx.Done()
and even then sometimes even avoid checking the ch on every iteration as it is still somewhat expensive (i believe the prior CSV code only checked it on flushes).
pkg/ccl/importccl/read_import_base.go, line 568 at r3 (raw file):
p.b.data = append(p.b.data, data) if len(p.b.data) == cap(p.b.data) {
this seems brittle, or at least subtle -- i had to read this a couple times before I'd convinced myself that this would actually behave correctly, i.e. that append or something else would never re-alloc the batch. IMO it'd be clearer just to add batchSize
as a field on p.
d407f37
to
6f9e247
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @dt and @spaskob)
pkg/ccl/importccl/read_import_base.go, line 431 at r3 (raw file):
Previously, dt (David Taylor) wrote…
avro mentioned in comment
Done.
pkg/ccl/importccl/read_import_base.go, line 523 at r3 (raw file):
Previously, dt (David Taylor) wrote…
I'm surprised this didn't show up in profiling, but
.Done()
in hot loops is usually performance problem -- we usually pull it into local above the loopdone := ctx.Done()
and even then sometimes even avoid checking the ch on every iteration as it is still somewhat expensive (i believe the prior CSV code only checked it on flushes).
Done.
pkg/ccl/importccl/read_import_base.go, line 568 at r3 (raw file):
Previously, dt (David Taylor) wrote…
this seems brittle, or at least subtle -- i had to read this a couple times before I'd convinced myself that this would actually behave correctly, i.e. that append or something else would never re-alloc the batch. IMO it'd be clearer just to add
batchSize
as a field on p.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @dt, @miretskiy, and @spaskob)
pkg/ccl/importccl/read_import_csv.go, line 192 at r2 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Perhaps -- I tried to keep things the same where possible. if you want, I can change this err message.
You have 4 things to show but only 2 placeholders
Rerun and update stats for csv import benchmark. Release notes: None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @dt and @spaskob)
pkg/ccl/importccl/read_import_base.go, line 523 at r3 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Done.
Good point. Removed, and, as per your suggestion moved to flush.
pkg/ccl/importccl/read_import_csv.go, line 192 at r2 (raw file):
Previously, spaskob (Spas Bojanov) wrote…
You have 4 things to show but only 2 placeholders
You misread the code a bit; I've changed line wrapping to make it clear that i'm calling a function with 3 arguments (one of which is an error constructed with Wrapf(...))
I reran and updated before/after benchmark for csv import. Refactoring and introduction of the interface{} (instead of the string) didn't have a significant impact: name old speed new speed delta name old alloc/op new alloc/op delta name old allocs/op new allocs/op delta |
Refactor csv importer and pull its parallel import related logic into separate helpers. This refactoring is done in preparation of making avro imports (and possibly others) use the same functionality. Release notes: None
Add a benchmark testing avro import performance. Release notes: None
Parallelize avro importer to improve its throughput (2.8x improvement). Fixes cockroachdb#45097 Release notes (performance): Faster avro import
bors r+ |
Canceled (will resume) |
bors r+ |
Already running a review |
Build failed (retrying...) |
bors r+ |
Already running a review |
Build succeeded |
Parallelize avro importer to improve its throughput (2.8x improvement).
Touches #40374.
Fixes #45097.
Release notes (performance): Faster avro import