-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
First complete version of RecordIO #1
Conversation
header.go
Outdated
checkSum uint32 | ||
compressor uint32 | ||
compressedSize uint32 | ||
len uint32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
用record_num表示记录的个数是否会更好一点?
或者加个注释?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
len确实有点confusing,我第一眼看到以为是number of bytes,其实是number of records。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
reader.go
Outdated
type Index struct { | ||
chunkOffsets []int64 | ||
chunkLens []uint32 | ||
records int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chunk struct中records表示的[][]byte
这里用total_record_num?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Index.records => Index.numRecords
reader.go
Outdated
f.chunkOffsets = append(f.chunkOffsets, offset) | ||
f.chunkLens = append(f.chunkLens, hdr.len) | ||
f.records += int(hdr.len) | ||
offset, e = r.Seek(int64(hdr.compressedSize), io.SeekCurrent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chunk的maxChunkSize是可以用户定制的。这个地方的seek的次数可能未必可控,潜在的性能风险比较大。
单个chunk的索引其实只有chunkOffset和chunkLen,读12个字节;跳读一次parseHeader需要读16个字节+seek一次。
如果需要索引,从性能考虑,不如把索引加到后边或者另外的文件里边。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
为了让代码可测试,需要允许比较小的chunk的。我修改一下:如果用户不指定,则使用默认defaultMaxChunkSize。
} | ||
|
||
var buf bytes.Buffer | ||
if _, e = io.CopyN(&buf, r, int64(hdr.compressedSize)); e != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
没有做checkSum检查
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.
chunk.go
Outdated
size int // sum of record lengths. | ||
} | ||
|
||
func newChunk() *Chunk { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok for me either way, just a suggestion: Maybe we don't need this function, since there is no argument to the function. We can just use &Chunk{}
where needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
header.go
Outdated
|
||
// NoCompression means writing raw chunk data into files. | ||
// With other choices, chunks are compressed before written. | ||
NoCompression = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These few lines can be changed to:
NoCompression = iota
Snappy
Gzip
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
func (c *Header) write(w io.Writer) (int, error) { | ||
var buf [20]byte | ||
binary.LittleEndian.PutUint32(buf[0:4], magicNumber) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels like compressor, magicNumber are only necessary per file (not per chunk)? Do we allow user specify different compressor per chunk?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. But a point of RecordIO is that if some records were not correctly written, we can skip over to the next chunk. Therefore, we need to have magic number and per-chunk metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wangkuiyi from my understanding it's the checksum that prevent decoder to return the corrupted data, not the magic number? It's possible that the program crash after writing the magic number but during writing the data.
I thought magic number is for understanding the type of the file. E.g., gzip have a magic number to tell it's a gzip file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, it's for constructing the correct index (since checksum is not verified during the index construction). Maybe it make sense to put magic number at the end of header, to prevent the case that writing magic number succeeded but writing other header fields failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I think that the magic number is for segmenting chunks. If a chunk is corrupted when it's written, we got a chance to skip to the next valid chunk by search sequentially for the next magic number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Good!
header.go
Outdated
checkSum uint32 | ||
compressor uint32 | ||
compressedSize uint32 | ||
len uint32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
len确实有点confusing,我第一眼看到以为是number of bytes,其实是number of records。
chunk.go
Outdated
|
||
// Clear the current chunk. | ||
ch.records = nil | ||
ch.size = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ch.size跟len(ch.records)是不是重复了?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
不重复。是我的变量命名导致的误会。
Done. ch.size ==> ch.numBytes
chunk.go
Outdated
// the next add invocation. | ||
func (ch *Chunk) dump(w io.Writer, compressorIndex int) error { | ||
// Write raw records and their lengths into data buffer. | ||
var data bytes.Buffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe early return if len(ch.records) == 0
. Otherwise empty chunk will still have header.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! Done.
return nil, e | ||
} | ||
|
||
ch := &Chunk{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is not using newChunk(), would not have this inconsistent problem if newChunk() is removed :p
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
chunk.go
Outdated
ch := &Chunk{} | ||
for i := 0; i < int(hdr.len); i++ { | ||
var rs [4]byte | ||
if _, e = deflated.Read(rs[:]); e != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rs[:] is same as rs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noop. deflated.Read
requires a slice, but rs
was defined as an array. rs[:]
converts the array into a slice.
return nil, fmt.Errorf("Failed to read record length: %v", e) | ||
} | ||
|
||
r := make([]byte, binary.LittleEndian.Uint32(rs[:])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rs[:] is same as rs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rs[:]
converts array rs
into a slice as required by binary.LittleEndian.Uint32
.
reader.go
Outdated
for { | ||
if hdr, e = parseHeader(r); e != nil { | ||
break | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
golang style prefer early return than if {} else {}.
I think the following is more clear (have less indentation):
for {
if {
break
}
// no else, more code here.
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
reader.go
Outdated
} | ||
|
||
// NewScanner creates a scanner that sequencially reads records in the | ||
// range [start, start+len). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps document what will happen when len < 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
reader.go
Outdated
func (s *Scanner) Record() []byte { | ||
ci, ri := s.index.Locate(s.cur) | ||
if s.chunkIndex != ci { | ||
log.Fatalf("Must call Scan before Record") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not panic when the application is not in an unrecoverable state. Please see bufio.Text() (also a scanner) for reference: https://golang.org/src/bufio/scan.go?s=4262:4293#L96, or here: https://golang.org/src/text/scanner/scanner.go?s=16921:16957#L652
I think returning nil here is reasonable.
Btw, from my understanding, logging inside a library does not make much sense (should report error if need logging) because it "pollutes" stdout, and developer may not see it during development (what if the case happened during production under some circumstance that is not trigged during development). I have never see it in golang official library, but not 100% sure, so I create this question: http://stackoverflow.com/questions/43708125/should-golang-library-do-logging-instead-of-return-error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point for this case! That test is for making sure that no bug here in the program, it is a develop-time test, shouldn't be shipped to production code.
Done by deleting the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM++ Very nice interface!
This PR is based on discussions in wangkuiyi/sstable#1
Fixes PaddlePaddle/Paddle#1947
recordio_test.go
shows the usage of the RecordIO API.