-
Notifications
You must be signed in to change notification settings - Fork 7
/
indexed_iterator.go
135 lines (121 loc) · 3.9 KB
/
indexed_iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package rosbag
import (
"bytes"
"compress/bzip2"
"container/heap"
"encoding/binary"
"fmt"
"io"
"github.com/pierrec/lz4/v4"
)
type indexedIterator struct {
rs io.ReadSeeker
info *Info
pq *messageHeap
compressedChunk []byte
lz4Reader *lz4.Reader
}
func (it *indexedIterator) More() bool {
return it.pq.Len() > 0
}
// Next extracts the next message from the bag. When there are no more messages,
// it returns io.EOF.
func (it *indexedIterator) Next() (*Connection, *Message, error) {
for it.pq.Len() > 0 {
entry, ok := heap.Pop(it.pq).(heapEntry)
if !ok {
return nil, nil, ErrInvalidHeapEntry
}
switch entry.op {
case OpMessageData:
offset := int(entry.offset())
chunkData := entry.chunkData
headerLength := int(u32(chunkData[offset:]))
dataLength := int(u32(chunkData[offset+4+headerLength:]))
recordEnd := offset + 4 + headerLength + 4 + dataLength
msg, err := ParseMessage(chunkData[offset:recordEnd])
if err != nil {
return nil, nil, err
}
return it.info.Connections[msg.Conn], msg, nil
case OpChunkInfo:
if _, err := it.rs.Seek(entry.offset(), io.SeekStart); err != nil {
return nil, nil, fmt.Errorf(
"failed to seek to chunk at offset %d: %w", entry.offset(), err)
}
var headerLen uint32
if err := binary.Read(it.rs, binary.LittleEndian, &headerLen); err != nil {
return nil, nil, fmt.Errorf("failed to read header length: %w", err)
}
headerData := make([]byte, headerLen)
if _, err := io.ReadFull(it.rs, headerData); err != nil {
return nil, nil, fmt.Errorf("failed to read header data: %w", err)
}
var compressedLen uint32
if err := binary.Read(it.rs, binary.LittleEndian, &compressedLen); err != nil {
return nil, nil, fmt.Errorf("failed to read compressed length: %w", err)
}
if len(it.compressedChunk) < int(compressedLen) {
it.compressedChunk = make([]byte, compressedLen*2)
}
if _, err := io.ReadFull(it.rs, it.compressedChunk[:compressedLen]); err != nil {
return nil, nil, fmt.Errorf("failed to read compressed chunk: %w", err)
}
header := readHeader(headerData)
compression := string(header["compression"])
size := int(u32(header["size"]))
decompressedChunk := make([]byte, size)
switch compression {
case CompressionNone:
copy(decompressedChunk, it.compressedChunk[:compressedLen])
case CompressionLZ4:
it.lz4Reader.Reset(bytes.NewReader(it.compressedChunk[:compressedLen]))
if _, err := io.ReadFull(it.lz4Reader, decompressedChunk); err != nil {
return nil, nil, fmt.Errorf("decompression failure: %w", err)
}
case CompressionBZ2:
bzw := bzip2.NewReader(bytes.NewReader(it.compressedChunk[:compressedLen]))
if _, err := io.ReadFull(bzw, decompressedChunk); err != nil {
return nil, nil, fmt.Errorf("decompression failure: %w", err)
}
default:
return nil, nil, ErrUnsupportedCompression{compression}
}
// now we're past the chunk, at the index data records. Read those
// and dump any messages we need onto the heap.
for i := 0; i < int(entry.chunkInfo.Count); i++ {
opcode, record, err := ReadRecord(it.rs)
if err != nil {
return nil, nil, err
}
if opcode != OpIndexData {
return nil, nil, ErrUnexpectedOpHeader{OpIndexData, opcode}
}
indexData, err := ParseIndexData(record)
if err != nil {
return nil, nil, err
}
for _, entry := range indexData.Data {
entry := entry
heap.Push(it.pq, newMessageHeapEntry(&entry, decompressedChunk))
}
}
continue
}
}
return nil, nil, io.EOF
}
func newIndexedIterator(rs io.ReadSeeker, info *Info) *indexedIterator {
pq := newMessageHeap()
heap.Init(pq)
for _, chunkInfo := range info.ChunkInfos {
heap.Push(pq, newChunkInfoHeapEntry(chunkInfo))
}
return &indexedIterator{
rs: rs,
info: info,
pq: pq,
compressedChunk: []byte{},
lz4Reader: lz4.NewReader(nil),
}
}