diff --git a/AUTHORS b/AUTHORS index 4ee2adc..94eddb1 100644 --- a/AUTHORS +++ b/AUTHORS @@ -7,4 +7,5 @@ # # Please keep the list sorted. +Hengfei Yang Marty Schoch diff --git a/contentcoder.go b/contentcoder.go index 2dc7169..6996198 100644 --- a/contentcoder.go +++ b/contentcoder.go @@ -18,8 +18,6 @@ import ( "bytes" "encoding/binary" "io" - - "github.com/golang/snappy" ) var termSeparator byte = 0xff @@ -39,7 +37,7 @@ type chunkedContentCoder struct { chunkMeta []metaData - compressed []byte // temp buf for snappy compression + compressed []byte // temp buf for compression } // metaData represents the data information inside a @@ -107,18 +105,25 @@ func (c *chunkedContentCoder) flushContents() error { } // write out the metaData slice + diffDocNum := uint64(0) + diffDvOffset := uint64(0) for _, meta := range c.chunkMeta { - err := writeUvarints(&c.chunkMetaBuf, meta.DocNum, meta.DocDvOffset) + err = writeUvarints(&c.chunkMetaBuf, meta.DocNum-diffDocNum, meta.DocDvOffset-diffDvOffset) if err != nil { return err } + diffDocNum = meta.DocNum + diffDvOffset = meta.DocDvOffset } // write the metadata to final data metaData := c.chunkMetaBuf.Bytes() c.final = append(c.final, c.chunkMetaBuf.Bytes()...) // write the compressed data to the final data - c.compressed = snappy.Encode(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes()) + c.compressed, err = ZSTDCompress(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes(), ZSTDCompressionLevel) + if err != nil { + return err + } c.final = append(c.final, c.compressed...) c.chunkLens[c.currChunk] = uint64(len(c.compressed) + len(metaData)) diff --git a/contentcoder_test.go b/contentcoder_test.go index 1b7fd31..8a4ac79 100644 --- a/contentcoder_test.go +++ b/contentcoder_test.go @@ -33,10 +33,12 @@ func TestChunkedContentCoder(t *testing.T) { docNums: []uint64{0}, vals: [][]byte{[]byte("bluge")}, // 1 chunk, chunk-0 length 11(b), value - expected: []byte{0x1, 0x0, 0x5, 0x5, 0x10, 'b', 'l', 'u', 'g', 'e', - 0xa, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, + expected: []byte{ + 0x1, 0x0, 0x5, 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x29, 0x0, 0x0, + 'b', 'l', 'u', 'g', 'e', + 0x7e, 0xde, 0xed, 0x4a, 0x15, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, + }, }, { maxDocNum: 1, @@ -47,11 +49,13 @@ func TestChunkedContentCoder(t *testing.T) { []byte("scorch"), }, - expected: []byte{0x1, 0x0, 0x6, 0x6, 0x14, 0x75, 0x70, 0x73, 0x69, 0x64, - 0x65, 0x1, 0x1, 0x6, 0x6, 0x14, 0x73, 0x63, 0x6f, 0x72, 0x63, 0x68, - 0xb, 0x16, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2}, + expected: []byte{ + 0x1, 0x0, 0x6, 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x31, 0x0, 0x0, + 0x75, 0x70, 0x73, 0x69, 0x64, 0x65, 0x35, 0x89, 0x5a, 0xd, + 0x1, 0x1, 0x6, 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x31, 0x0, 0x0, + 0x73, 0x63, 0x6f, 0x72, 0x63, 0x68, 0xc4, 0x46, 0x89, 0x39, 0x16, 0x2c, + 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, + }, }, } @@ -61,7 +65,7 @@ func TestChunkedContentCoder(t *testing.T) { for i, docNum := range test.docNums { err := cic.Add(docNum, test.vals[i]) if err != nil { - t.Fatalf("error adding to intcoder: %v", err) + t.Fatalf("error adding to contentcoder: %v", err) } } _ = cic.Close() @@ -98,11 +102,11 @@ func TestChunkedContentCoders(t *testing.T) { for i, docNum := range docNums { err := cic1.Add(docNum, vals[i]) if err != nil { - t.Fatalf("error adding to intcoder: %v", err) + t.Fatalf("error adding to contentcoder: %v", err) } err = cic2.Add(docNum, vals[i]) if err != nil { - t.Fatalf("error adding to intcoder: %v", err) + t.Fatalf("error adding to contentcoder: %v", err) } } _ = cic1.Close() diff --git a/documentcoder.go b/documentcoder.go new file mode 100644 index 0000000..b55daec --- /dev/null +++ b/documentcoder.go @@ -0,0 +1,139 @@ +package ice + +import ( + "bytes" + "encoding/binary" + "io" +) + +const defaultDocumentChunkSize uint32 = 128 + +type chunkedDocumentCoder struct { + chunkSize uint64 + w io.Writer + buf *bytes.Buffer + metaBuf []byte + n uint64 + bytes uint64 + compressed []byte + offsets []uint64 +} + +func newChunkedDocumentCoder(chunkSize uint64, w io.Writer) *chunkedDocumentCoder { + c := &chunkedDocumentCoder{ + chunkSize: chunkSize, + w: w, + } + c.buf = bytes.NewBuffer(nil) + c.metaBuf = make([]byte, binary.MaxVarintLen64) + c.offsets = append(c.offsets, 0) + return c +} + +func (c *chunkedDocumentCoder) Add(docNum uint64, meta, data []byte) (int, error) { + var wn, n int + var err error + n = binary.PutUvarint(c.metaBuf, uint64(len(meta))) + if n, err = c.writeToBuf(c.metaBuf[:n]); err != nil { + return 0, err + } + wn += n + n = binary.PutUvarint(c.metaBuf, uint64(len(data))) + if n, err = c.writeToBuf(c.metaBuf[:n]); err != nil { + return 0, err + } + wn += n + if n, err = c.writeToBuf(meta); err != nil { + return 0, err + } + wn += n + if n, err = c.writeToBuf(data); err != nil { + return 0, err + } + wn += n + + return wn, c.newLine() +} + +func (c *chunkedDocumentCoder) writeToBuf(data []byte) (int, error) { + return c.buf.Write(data) +} + +func (c *chunkedDocumentCoder) newLine() error { + c.n++ + if c.n%c.chunkSize != 0 { + return nil + } + return c.flush() +} + +func (c *chunkedDocumentCoder) flush() error { + if c.buf.Len() > 0 { + var err error + c.compressed, err = ZSTDCompress(c.compressed[:cap(c.compressed)], c.buf.Bytes(), ZSTDCompressionLevel) + if err != nil { + return err + } + n, err := c.w.Write(c.compressed) + if err != nil { + return err + } + c.bytes += uint64(n) + c.buf.Reset() + } + c.offsets = append(c.offsets, c.bytes) + return nil +} + +func (c *chunkedDocumentCoder) Write() error { + // flush first + if err := c.flush(); err != nil { + return err + } + var err error + var wn, n int + // write chunk offsets + for _, offset := range c.offsets { + n = binary.PutUvarint(c.metaBuf, offset) + if _, err = c.w.Write(c.metaBuf[:n]); err != nil { + return err + } + wn += n + } + // write chunk offset length + err = binary.Write(c.w, binary.BigEndian, uint32(wn)) + if err != nil { + return err + } + // write chunk num + err = binary.Write(c.w, binary.BigEndian, uint32(len(c.offsets))) + if err != nil { + return err + } + return nil +} + +func (c *chunkedDocumentCoder) Reset() { + c.compressed = c.compressed[:0] + c.offsets = c.offsets[:0] + c.n = 0 + c.bytes = 0 + c.buf.Reset() +} + +// Size returns buffer size of current chunk +func (c *chunkedDocumentCoder) Size() uint64 { + return uint64(c.buf.Len()) +} + +// Len returns chunks num +func (c *chunkedDocumentCoder) Len() int { + return len(c.offsets) +} + +// Len returns chunks num +func (c *chunkedDocumentCoder) Offsets() []uint64 { + m := make([]uint64, 0, len(c.offsets)) + m = append(m, c.offsets...) + return m +} diff --git a/documentcoder_test.go b/documentcoder_test.go new file mode 100644 index 0000000..3067a29 --- /dev/null +++ b/documentcoder_test.go @@ -0,0 +1,124 @@ +package ice + +import ( + "bytes" + "testing" +) + +func TestChunkedDocumentCoder(t *testing.T) { + tests := []struct { + chunkSize uint64 + docNums []uint64 + metas [][]byte + datas [][]byte + expected []byte + expectedChunkNum int + }{ + { + chunkSize: 1, + docNums: []uint64{0}, + metas: [][]byte{{0}}, + datas: [][]byte{[]byte("bluge")}, + expected: []byte{ + 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x41, + 0x0, 0x0, 0x1, 0x5, 0x0, 0x62, 0x6c, 0x75, 0x67, 0x65, 0x2b, 0x30, 0x97, 0x33, 0x0, 0x15, 0x15, + 0x0, 0x0, 0x0, 0x3, 0x0, 0x0, 0x0, 0x3, + }, + expectedChunkNum: 3, // left, chunk, right + }, + { + chunkSize: 1, + docNums: []uint64{0, 1}, + metas: [][]byte{{0}, {1}}, + datas: [][]byte{[]byte("upside"), []byte("scorch")}, + expected: []byte{ + 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x49, + 0x0, 0x0, 0x1, 0x6, 0x0, 0x75, 0x70, 0x73, 0x69, 0x64, 0x65, + 0x36, 0x6e, 0x7e, 0x39, 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x49, + 0x0, 0x0, 0x1, 0x6, 0x1, 0x73, 0x63, 0x6f, 0x72, 0x63, 0x68, + 0x8f, 0x83, 0xa3, 0x37, 0x0, 0x16, 0x2c, 0x2c, + 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x4, + }, + expectedChunkNum: 4, // left, chunk, chunk, right + }, + } + + for _, test := range tests { + var actual bytes.Buffer + cic := newChunkedDocumentCoder(test.chunkSize, &actual) + for i, docNum := range test.docNums { + _, err := cic.Add(docNum, test.metas[i], test.datas[i]) + if err != nil { + t.Fatalf("error adding to documentcoder: %v", err) + } + } + err := cic.Write() + if err != nil { + t.Fatalf("error writing: %v", err) + } + if !bytes.Equal(test.expected, actual.Bytes()) { + t.Errorf("got:%s, expected:%s", actual.String(), string(test.expected)) + } + if test.expectedChunkNum != cic.Len() { + t.Errorf("got:%d, expected:%d", cic.Len(), test.expectedChunkNum) + } + } +} + +func TestChunkedDocumentCoders(t *testing.T) { + chunkSize := uint64(2) + docNums := []uint64{0, 1, 2, 3, 4, 5} + metas := [][]byte{ + {0}, + {1}, + {2}, + {3}, + {4}, + {5}, + } + datas := [][]byte{ + []byte("scorch"), + []byte("does"), + []byte("better"), + []byte("than"), + []byte("upside"), + []byte("down"), + } + chunkNum := 5 // left, chunk, chunk, chunk, right + + var actual1, actual2 bytes.Buffer + // chunkedDocumentCoder that writes out at the end + cic1 := newChunkedDocumentCoder(chunkSize, &actual1) + // chunkedContentCoder that writes out in chunks + cic2 := newChunkedDocumentCoder(chunkSize, &actual2) + + for i, docNum := range docNums { + _, err := cic1.Add(docNum, metas[i], datas[i]) + if err != nil { + t.Fatalf("error adding to documentcoder: %v", err) + } + _, err = cic2.Add(docNum, metas[i], datas[i]) + if err != nil { + t.Fatalf("error adding to documentcoder: %v", err) + } + } + + err := cic1.Write() + if err != nil { + t.Fatalf("error writing: %v", err) + } + err = cic2.Write() + if err != nil { + t.Fatalf("error writing: %v", err) + } + + if !bytes.Equal(actual1.Bytes(), actual2.Bytes()) { + t.Errorf("%s != %s", actual1.String(), actual2.String()) + } + if chunkNum != cic1.Len() { + t.Errorf("got:%d, expected:%d", cic1.Len(), chunkNum) + } + if chunkNum != cic2.Len() { + t.Errorf("got:%d, expected:%d", cic2.Len(), chunkNum) + } +} diff --git a/docvalues.go b/docvalues.go index 382f30d..b8923c6 100644 --- a/docvalues.go +++ b/docvalues.go @@ -22,7 +22,6 @@ import ( "sort" segment "github.com/blugelabs/bluge_segment_api" - "github.com/golang/snappy" ) type docNumTermsVisitor func(docNum uint64, terms []byte) error @@ -39,7 +38,7 @@ type docValueReader struct { dvDataLoc uint64 curChunkHeader []metaData curChunkData []byte // compressed data cache - uncompressed []byte // temp buf for snappy decompression + uncompressed []byte // temp buf for decompression } func (di *docValueReader) size() int { @@ -163,6 +162,9 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *Segment) error { } else { di.curChunkHeader = di.curChunkHeader[:int(numDocs)] } + + diffDocNum := uint64(0) + diffDvOffset := uint64(0) for i := 0; i < int(numDocs); i++ { var docNumData []byte docNumData, err = s.data.Read(int(chunkMetaLoc+offset), int(chunkMetaLoc+offset+binary.MaxVarintLen64)) @@ -170,6 +172,8 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *Segment) error { return err } di.curChunkHeader[i].DocNum, read = binary.Uvarint(docNumData) + di.curChunkHeader[i].DocNum += diffDocNum + diffDocNum = di.curChunkHeader[i].DocNum offset += uint64(read) var docDvOffsetData []byte docDvOffsetData, err = s.data.Read(int(chunkMetaLoc+offset), int(chunkMetaLoc+offset+binary.MaxVarintLen64)) @@ -177,6 +181,8 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *Segment) error { return err } di.curChunkHeader[i].DocDvOffset, read = binary.Uvarint(docDvOffsetData) + di.curChunkHeader[i].DocDvOffset += diffDvOffset + diffDvOffset = di.curChunkHeader[i].DocDvOffset offset += uint64(read) } @@ -203,7 +209,7 @@ func (di *docValueReader) iterateAllDocValues(s *Segment, visitor docNumTermsVis } // uncompress the already loaded data - uncompressed, err := snappy.Decode(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) + uncompressed, err := ZSTDDecompress(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) if err != nil { return err } @@ -238,7 +244,7 @@ func (di *docValueReader) visitDocValues(docNum uint64, uncompressed = di.uncompressed } else { // uncompress the already loaded data - uncompressed, err = snappy.Decode(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) + uncompressed, err = ZSTDDecompress(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) if err != nil { return err } diff --git a/go.mod b/go.mod index b0e28d4..d01abe7 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,12 @@ module github.com/blugelabs/ice -go 1.12 +go 1.16 require ( - github.com/RoaringBitmap/roaring v0.9.1 - github.com/blevesearch/mmap-go v1.0.4 - github.com/blevesearch/vellum v1.0.5 + github.com/RoaringBitmap/roaring v0.9.4 + github.com/blevesearch/mmap-go v1.0.3 + github.com/blevesearch/vellum v1.0.7 github.com/blugelabs/bluge_segment_api v0.2.0 - github.com/golang/snappy v0.0.1 + github.com/klauspost/compress v1.15.2 github.com/spf13/cobra v0.0.5 ) diff --git a/go.sum b/go.sum index d5928e5..d452fcd 100644 --- a/go.sum +++ b/go.sum @@ -1,17 +1,16 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eWBKuPVSXZIpsaMwCI= github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE= -github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM= github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc= +github.com/RoaringBitmap/roaring v0.9.4 h1:ckvZSX5gwCRaJYBNe7syNawCU5oruY9gQmjXlp4riwo= +github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/bits-and-blooms/bitset v1.2.0 h1:Kn4yilvwNtMACtf1eYDlG8H77R07mZSPbMjLyS07ChA= github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= -github.com/blevesearch/mmap-go v1.0.2 h1:JtMHb+FgQCTTYIhtMvimw15dJwu1Y5lrZDMOFXVWPk0= -github.com/blevesearch/mmap-go v1.0.2/go.mod h1:ol2qBqYaOUsGdm7aRMRrYGgPvnwLe6Y+7LMvAB5IbSA= -github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc= -github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs= -github.com/blevesearch/vellum v1.0.5 h1:L5dJ7hKauRVbuH7I8uqLeSK92CPPY6FfrbAmLhAug8A= -github.com/blevesearch/vellum v1.0.5/go.mod h1:atE0EH3fvk43zzS7t1YNdNC7DbmcC3uz+eMD5xZ2OyQ= +github.com/blevesearch/mmap-go v1.0.3 h1:7QkALgFNooSq3a46AE+pWeKASAZc9SiNFJhDGF1NDx4= +github.com/blevesearch/mmap-go v1.0.3/go.mod h1:pYvKl/grLQrBxuaRYgoTssa4rVujYYeenDp++2E+yvs= +github.com/blevesearch/vellum v1.0.7 h1:+vn8rfyCRHxKVRgDLeR0FAXej2+6mEb5Q15aQE/XESQ= +github.com/blevesearch/vellum v1.0.7/go.mod h1:doBZpmRhwTsASB4QdUZANlJvqVAUdUyX0ZK7QJCTeBE= github.com/blugelabs/bluge_segment_api v0.2.0 h1:cCX1Y2y8v0LZ7+EEJ6gH7dW6TtVTW4RhG0vp3R+N2Lo= github.com/blugelabs/bluge_segment_api v0.2.0/go.mod h1:95XA+ZXfRj/IXADm7gZ+iTcWOJPg5jQTY1EReIzl3LA= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= @@ -22,11 +21,11 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/klauspost/compress v1.15.2 h1:3WH+AG7s2+T8o3nrM/8u2rdqUEcQhmga7smjrT41nAw= +github.com/klauspost/compress v1.15.2/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= @@ -52,13 +51,9 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/intcoder.go b/intcoder.go index 28749b1..9dfc104 100644 --- a/intcoder.go +++ b/intcoder.go @@ -33,7 +33,8 @@ type chunkedIntCoder struct { chunkLens []uint64 currChunk uint64 - buf []byte + buf []byte + compressed []byte } // newChunkedIntCoder returns a new chunk int coder which packs data into @@ -101,11 +102,16 @@ func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error { // Close indicates you are done calling Add() this allows the final chunk // to be encoded. -func (c *chunkedIntCoder) Close() { - encodingBytes := c.chunkBuf.Bytes() - c.chunkLens[c.currChunk] = uint64(len(encodingBytes)) - c.final = append(c.final, encodingBytes...) +func (c *chunkedIntCoder) Close() error { + var err error + c.compressed, err = ZSTDCompress(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes(), ZSTDCompressionLevel) + if err != nil { + return err + } + c.chunkLens[c.currChunk] = uint64(len(c.compressed)) + c.final = append(c.final, c.compressed...) c.currChunk = uint64(cap(c.chunkLens)) // sentinel to detect double close + return nil } // Write commits all the encoded chunked integers to the provided writer. diff --git a/intcoder_test.go b/intcoder_test.go index 044194c..7f91e1b 100644 --- a/intcoder_test.go +++ b/intcoder_test.go @@ -35,8 +35,11 @@ func TestChunkIntCoder(t *testing.T) { vals: [][]uint64{ {3}, }, - // 1 chunk, chunk-0 length 1, value 3 - expected: []byte{0x1, 0x1, 0x3}, + // 1 chunk, chunk-0 length 1, value 3, but compressed + expected: []byte{ + 0x1, 0xe, 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x9, + 0x0, 0x0, 0x3, 0xb6, 0x4b, 0x1f, 0xbc, + }, }, { maxDocNum: 1, @@ -46,8 +49,12 @@ func TestChunkIntCoder(t *testing.T) { {3}, {7}, }, - // 2 chunks, chunk-0 offset 1, chunk-1 offset 2, value 3, value 7 - expected: []byte{0x2, 0x1, 0x2, 0x3, 0x7}, + // 2 chunks, chunk-0 offset 1, chunk-1 offset 2, value 3, value 7, but compressed + expected: []byte{ + 0x2, 0xe, 0x1c, 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x9, + 0x0, 0x0, 0x3, 0xb6, 0x4b, 0x1f, 0xbc, 0x28, 0xb5, 0x2f, 0xfd, 0x4, 0x0, 0x9, + 0x0, 0x0, 0x7, 0xb7, 0xbb, 0x58, 0xe8, + }, }, } diff --git a/intdecoder.go b/intdecoder.go index b9a0575..690a8b3 100644 --- a/intdecoder.go +++ b/intdecoder.go @@ -26,6 +26,7 @@ type chunkedIntDecoder struct { dataStartOffset uint64 chunkOffsets []uint64 curChunkBytes []byte + uncompressed []byte // temp buf for decompression data *segment.Data r *memUvarintReader } @@ -86,7 +87,11 @@ func (d *chunkedIntDecoder) loadChunk(chunk int) error { if err != nil { return err } - d.curChunkBytes = curChunkBytesData + d.uncompressed, err = ZSTDDecompress(d.uncompressed[:cap(d.uncompressed)], curChunkBytesData) + if err != nil { + return err + } + d.curChunkBytes = d.uncompressed if d.r == nil { d.r = newMemUvarintReader(d.curChunkBytes) } else { @@ -101,6 +106,7 @@ func (d *chunkedIntDecoder) reset() { d.dataStartOffset = 0 d.chunkOffsets = d.chunkOffsets[:0] d.curChunkBytes = d.curChunkBytes[:0] + d.uncompressed = d.uncompressed[:0] // FIXME what? // d.data = d.data[:0] diff --git a/load.go b/load.go index 97b041f..b242efb 100644 --- a/load.go +++ b/load.go @@ -55,6 +55,11 @@ func load(data *segment.Data) (*Segment, error) { return nil, err } + err = rv.loadStoredFieldChunk() + if err != nil { + return nil, err + } + err = rv.loadDvReaders() if err != nil { return nil, err @@ -128,3 +133,36 @@ func (s *Segment) loadFields() error { } return nil } + +// loadStoredFieldChunk load storedField chunk offsets +func (s *Segment) loadStoredFieldChunk() error { + // read chunk num + chunkOffsetPos := int(s.footer.storedIndexOffset - uint64(sizeOfUint32)) + chunkData, err := s.data.Read(chunkOffsetPos, chunkOffsetPos+sizeOfUint32) + if err != nil { + return err + } + chunkNum := binary.BigEndian.Uint32(chunkData) + chunkOffsetPos -= sizeOfUint32 + // read chunk offsets length + chunkData, err = s.data.Read(chunkOffsetPos, chunkOffsetPos+sizeOfUint32) + if err != nil { + return err + } + chunkOffsetsLen := binary.BigEndian.Uint32(chunkData) + // read chunk offsets + chunkOffsetPos -= int(chunkOffsetsLen) + var offset, read int + var offsetata []byte + s.storedFieldChunkOffsets = make([]uint64, chunkNum) + for i := 0; i < int(chunkNum); i++ { + offsetata, err = s.data.Read(chunkOffsetPos+offset, chunkOffsetPos+offset+binary.MaxVarintLen64) + if err != nil { + return err + } + s.storedFieldChunkOffsets[i], read = binary.Uvarint(offsetata) + offset += read + } + + return nil +} diff --git a/merge.go b/merge.go index a65264c..1397439 100644 --- a/merge.go +++ b/merge.go @@ -26,7 +26,6 @@ import ( "github.com/RoaringBitmap/roaring" "github.com/blevesearch/vellum" segment "github.com/blugelabs/bluge_segment_api" - "github.com/golang/snappy" ) const docDropped = math.MaxInt64 // sentinel docNum to represent a deleted doc @@ -624,7 +623,7 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, w *countHashWriter, closeCh chan struct{}) (storedIndexOffset uint64, newDocNums [][]uint64, err error) { var newDocNum uint64 - var data, compressed []byte + var data []byte var metaBuf bytes.Buffer varBuf := make([]byte, binary.MaxVarintLen64) metaEncode := func(val uint64) (int, error) { @@ -639,6 +638,9 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, vdc := visitDocumentCtxPool.Get().(*visitDocumentCtx) defer visitDocumentCtxPool.Put(vdc) + // document chunk coder + docChunkCoder := newChunkedDocumentCoder(uint64(defaultDocumentChunkSize), w) + // for each segment for segI, seg := range segments { // check for the closure in meantime @@ -654,7 +656,7 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, // segments and there are no deletions, via byte-copying // of stored docs bytes directly to the writer if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) { - err := seg.copyStoredDocs(newDocNum, docNumOffsets, w) + err := seg.copyStoredDocs(newDocNum, docNumOffsets, docChunkCoder) if err != nil { return 0, nil, err } @@ -670,7 +672,7 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, var err2 error newDocNum, err2 = mergeStoredAndRemapSegment(seg, dropsI, segNewDocNums, newDocNum, &metaBuf, data, - fieldsInv, vals, vdc, fieldsMap, metaEncode, compressed, docNumOffsets, w) + fieldsInv, vals, vdc, fieldsMap, metaEncode, docNumOffsets, docChunkCoder) if err2 != nil { return 0, nil, err2 } @@ -678,6 +680,11 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, newDocNums = append(newDocNums, segNewDocNums) } + // document chunk coder + if err := docChunkCoder.Write(); err != nil { + return 0, nil, err + } + // return value is the start of the stored index storedIndexOffset = uint64(w.Count()) @@ -694,8 +701,8 @@ func mergeStoredAndRemap(segments []*Segment, drops []*roaring.Bitmap, func mergeStoredAndRemapSegment(seg *Segment, dropsI *roaring.Bitmap, segNewDocNums []uint64, newDocNum uint64, metaBuf *bytes.Buffer, data []byte, fieldsInv []string, vals [][][]byte, vdc *visitDocumentCtx, - fieldsMap map[string]uint16, metaEncode func(val uint64) (int, error), compressed []byte, docNumOffsets []uint64, - w *countHashWriter) (uint64, error) { + fieldsMap map[string]uint16, metaEncode func(val uint64) (int, error), docNumOffsets []uint64, + docChunkCoder *chunkedDocumentCoder) (uint64, error) { // for each doc num for docNum := uint64(0); docNum < seg.footer.numDocs; docNum++ { // TODO: roaring's API limits docNums to 32-bits? @@ -737,26 +744,10 @@ func mergeStoredAndRemapSegment(seg *Segment, dropsI *roaring.Bitmap, segNewDocN metaBytes := metaBuf.Bytes() - compressed = snappy.Encode(compressed[:cap(compressed)], data) - // record where we're about to start writing - docNumOffsets[newDocNum] = uint64(w.Count()) - - // write out the meta len and compressed data len - err = writeUvarints(w, - uint64(len(metaBytes)), - uint64(len(compressed))) - if err != nil { - return 0, err - } - // now write the meta - _, err = w.Write(metaBytes) - if err != nil { - return 0, err - } - // now write the compressed data - _, err = w.Write(compressed) - if err != nil { + docNumOffsets[newDocNum] = docChunkCoder.Size() + // document chunk line + if _, err := docChunkCoder.Add(newDocNum, metaBytes, data); err != nil { return 0, err } @@ -768,46 +759,46 @@ func mergeStoredAndRemapSegment(seg *Segment, dropsI *roaring.Bitmap, segNewDocN // copyStoredDocs writes out a segment's stored doc info, optimized by // using a single Write() call for the entire set of bytes. The // newDocNumOffsets is filled with the new offsets for each doc. -func (s *Segment) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint64, - w *countHashWriter) error { +func (s *Segment) copyStoredDocs(newDocNum uint64, newDocNumOffsets []uint64, docChunkCoder *chunkedDocumentCoder) error { if s.footer.numDocs <= 0 { return nil } - indexOffset0, storedOffset0, err := s.getDocStoredOffsetsOnly(0) // the segment's first doc - if err != nil { - return err - } - - indexOffsetN, storedOffsetN, readN, metaLenN, dataLenN, err := - s.getDocStoredOffsets(s.footer.numDocs - 1) // the segment's last doc - if err != nil { - return err - } - - storedOffset0New := uint64(w.Count()) - - storedBytesData, err := s.data.Read(int(storedOffset0), int(storedOffsetN+readN+metaLenN+dataLenN)) - if err != nil { - return err - } - storedBytes := storedBytesData - _, err = w.Write(storedBytes) - if err != nil { - return err - } - - // remap the storedOffset's for the docs into new offsets relative - // to storedOffset0New, filling the given docNumOffsetsOut array - for indexOffset := indexOffset0; indexOffset <= indexOffsetN; indexOffset += fileAddrWidth { - storedOffsetData, err := s.data.Read(int(indexOffset), int(indexOffset+fileAddrWidth)) + // visit documents and rewrite to chunk + uncompressed := make([]byte, 0) + for i := 0; i < len(s.storedFieldChunkOffsets)-1; i++ { + chunkOffstart := s.storedFieldChunkOffsets[i] + chunkOffend := s.storedFieldChunkOffsets[i+1] + if chunkOffstart == chunkOffend { + continue + } + compressed, err := s.data.Read(int(chunkOffstart), int(chunkOffend)) if err != nil { return err } - storedOffset := binary.BigEndian.Uint64(storedOffsetData) - storedOffsetNew := storedOffset - storedOffset0 + storedOffset0New - newDocNumOffsets[newDocNum] = storedOffsetNew // PANIC - newDocNum++ + uncompressed, err = ZSTDDecompress(uncompressed[:cap(uncompressed)], compressed) + if err != nil { + return err + } + storedOffset := 0 + n := 0 + for storedOffset < len(uncompressed) { + n = 0 + metaLenData := uncompressed[storedOffset : storedOffset+int(binary.MaxVarintLen64)] + metaLen, read := binary.Uvarint(metaLenData) + n += read + dataLenData := uncompressed[storedOffset+n : storedOffset+n+int(binary.MaxVarintLen64)] + dataLen, read := binary.Uvarint(dataLenData) + n += read + newDocNumOffsets[newDocNum] = docChunkCoder.Size() + metaBytes := uncompressed[storedOffset+n : storedOffset+n+int(metaLen)] + data := uncompressed[storedOffset+n+int(metaLen) : storedOffset+n+int(metaLen+dataLen)] + if _, err := docChunkCoder.Add(newDocNum, metaBytes, data); err != nil { + return err + } + storedOffset += n + int(metaLen+dataLen) + newDocNum++ + } } return nil diff --git a/new.go b/new.go index e888c7b..4b46ee6 100644 --- a/new.go +++ b/new.go @@ -24,7 +24,6 @@ import ( "github.com/RoaringBitmap/roaring" "github.com/blevesearch/vellum" segment "github.com/blugelabs/bluge_segment_api" - "github.com/golang/snappy" ) var newSegmentBufferNumResultsBump = 100 @@ -62,7 +61,7 @@ func newWithChunkMode(results []segment.Document, normCalc func(string, int) flo s.w = newCountHashWriter(&br) var footer *footer - footer, dictOffsets, err := s.convert() + footer, dictOffsets, storedFieldChunkOffsets, err := s.convert() if err != nil { return nil, uint64(0), err } @@ -73,7 +72,7 @@ func newWithChunkMode(results []segment.Document, normCalc func(string, int) flo sb, err := initSegmentBase(br.Bytes(), footer, s.FieldsMap, s.FieldsInv, s.FieldDocs, s.FieldFreqs, - dictOffsets) + dictOffsets, storedFieldChunkOffsets) if err == nil && s.reset() == nil { s.lastNumDocs = len(results) @@ -87,17 +86,18 @@ func newWithChunkMode(results []segment.Document, normCalc func(string, int) flo func initSegmentBase(mem []byte, footer *footer, fieldsMap map[string]uint16, fieldsInv []string, fieldsDocs, fieldsFreqs map[uint16]uint64, - dictLocs []uint64) (*Segment, error) { + dictLocs []uint64, storedFieldChunkOffsets []uint64) (*Segment, error) { sb := &Segment{ - data: segment.NewDataBytes(mem), - footer: footer, - fieldsMap: fieldsMap, - fieldsInv: fieldsInv, - fieldDocs: fieldsDocs, - fieldFreqs: fieldsFreqs, - dictLocs: dictLocs, - fieldDvReaders: make(map[uint16]*docValueReader), - fieldFSTs: make(map[uint16]*vellum.FST), + data: segment.NewDataBytes(mem), + footer: footer, + fieldsMap: fieldsMap, + fieldsInv: fieldsInv, + fieldDocs: fieldsDocs, + fieldFreqs: fieldsFreqs, + dictLocs: dictLocs, + fieldDvReaders: make(map[uint16]*docValueReader), + fieldFSTs: make(map[uint16]*vellum.FST), + storedFieldChunkOffsets: storedFieldChunkOffsets, } sb.updateSize() @@ -249,7 +249,7 @@ type interimLoc struct { end uint64 } -func (s *interim) convert() (*footer, []uint64, error) { +func (s *interim) convert() (*footer, []uint64, []uint64, error) { s.FieldsMap = map[string]uint16{} s.FieldDocs = map[uint16]uint64{} s.FieldFreqs = map[uint16]uint64{} @@ -284,9 +284,9 @@ func (s *interim) convert() (*footer, []uint64, error) { s.processDocuments() - storedIndexOffset, err := s.writeStoredFields() + storedIndexOffset, storedFieldChunkOffsets, err := s.writeStoredFields() if err != nil { - return nil, nil, err + return nil, nil, nil, err } var fdvIndexOffset uint64 @@ -295,7 +295,7 @@ func (s *interim) convert() (*footer, []uint64, error) { if len(s.results) > 0 { fdvIndexOffset, dictOffsets, err = s.writeDicts() if err != nil { - return nil, nil, err + return nil, nil, nil, err } } else { dictOffsets = make([]uint64, len(s.FieldsInv)) @@ -303,7 +303,7 @@ func (s *interim) convert() (*footer, []uint64, error) { fieldsIndexOffset, err := persistFields(s.FieldsInv, s.FieldDocs, s.FieldFreqs, s.w, dictOffsets) if err != nil { - return nil, nil, err + return nil, nil, nil, err } return &footer{ @@ -311,7 +311,7 @@ func (s *interim) convert() (*footer, []uint64, error) { fieldsIndexOffset: fieldsIndexOffset, docValueOffset: fdvIndexOffset, version: Version, - }, dictOffsets, nil + }, dictOffsets, storedFieldChunkOffsets, nil } func (s *interim) getOrDefineField(fieldName string) int { @@ -553,7 +553,7 @@ func (s *interim) processDocument(docNum uint64, } func (s *interim) writeStoredFields() ( - storedIndexOffset uint64, err error) { + storedIndexOffset uint64, storedFieldChunkOffsets []uint64, err error) { varBuf := make([]byte, binary.MaxVarintLen64) metaEncode := func(val uint64) (int, error) { wb := binary.PutUvarint(varBuf, val) @@ -569,6 +569,9 @@ func (s *interim) writeStoredFields() ( // keyed by fieldID, for the current doc in the loop docStoredFields := map[uint16]interimStoredField{} + // document chunk coder + docChunkCoder := newChunkedDocumentCoder(uint64(defaultDocumentChunkSize), s.w) + for docNum, result := range s.results { for fieldID := range docStoredFields { // reset for next doc delete(docStoredFields, fieldID) @@ -601,45 +604,36 @@ func (s *interim) writeStoredFields() ( fieldID, isf.vals, curr, metaEncode, data) if err != nil { - return 0, err + return 0, nil, err } } } metaBytes := s.metaBuf.Bytes() - - compressed = snappy.Encode(compressed[:cap(compressed)], data) - - docStoredOffsets[docNum] = uint64(s.w.Count()) - - err = writeUvarints(s.w, - uint64(len(metaBytes)), - uint64(len(compressed))) - if err != nil { - return 0, err - } - - _, err = s.w.Write(metaBytes) + docStoredOffsets[docNum] = docChunkCoder.Size() + _, err = docChunkCoder.Add(uint64(docNum), metaBytes, data) if err != nil { - return 0, err + return 0, nil, err } + } - _, err = s.w.Write(compressed) - if err != nil { - return 0, err - } + // document chunk coder + err = docChunkCoder.Write() + if err != nil { + return 0, nil, err } + storedFieldChunkOffsets = docChunkCoder.Offsets() storedIndexOffset = uint64(s.w.Count()) for _, docStoredOffset := range docStoredOffsets { err = binary.Write(s.w, binary.BigEndian, docStoredOffset) if err != nil { - return 0, err + return 0, nil, err } } - return storedIndexOffset, nil + return storedIndexOffset, storedFieldChunkOffsets, nil } func (s *interim) writeDicts() (fdvIndexOffset uint64, dictOffsets []uint64, err error) { diff --git a/posting.go b/posting.go index c0dc2d4..404cfbb 100644 --- a/posting.go +++ b/posting.go @@ -258,6 +258,9 @@ func (p *PostingsList) read(postingsOffset uint64, d *Dictionary) error { return err } p.locOffset, read = binary.Uvarint(locOffsetData) + if p.locOffset > 0 && p.freqOffset > 0 { + p.locOffset += p.freqOffset + } n += uint64(read) postingsLenData, err := d.sb.data.Read(int(postingsOffset+n), int(postingsOffset+n+binary.MaxVarintLen64)) diff --git a/read.go b/read.go index 0887433..809555a 100644 --- a/read.go +++ b/read.go @@ -14,57 +14,55 @@ package ice -import "encoding/binary" +import ( + "encoding/binary" +) -func (s *Segment) getDocStoredMetaAndCompressed(docNum uint64) (meta, data []byte, err error) { +func (s *Segment) getDocStoredMetaAndUnCompressed(docNum uint64) (meta, data []byte, err error) { _, storedOffset, n, metaLen, dataLen, err := s.getDocStoredOffsets(docNum) if err != nil { return nil, nil, err } - meta, err = s.data.Read(int(storedOffset+n), int(storedOffset+n+metaLen)) - if err != nil { - return nil, nil, err - } - data, err = s.data.Read(int(storedOffset+n+metaLen), int(storedOffset+n+metaLen+dataLen)) - if err != nil { - return nil, nil, err - } - + meta = s.storedFieldChunkUncompressed[int(storedOffset+n):int(storedOffset+n+metaLen)] + data = s.storedFieldChunkUncompressed[int(storedOffset+n+metaLen):int(storedOffset+n+metaLen+dataLen)] return meta, data, nil } -func (s *Segment) getDocStoredOffsets(docNum uint64) ( - indexOffset, storedOffset, n, metaLen, dataLen uint64, err error) { - indexOffset = s.footer.storedIndexOffset + (fileAddrWidth * docNum) - - storedOffsetData, err := s.data.Read(int(indexOffset), int(indexOffset+fileAddrWidth)) +func (s *Segment) getDocStoredOffsets(docNum uint64) (indexOffset, storedOffset, n, metaLen, dataLen uint64, err error) { + indexOffset, storedOffset, err = s.getDocStoredOffsetsOnly(docNum) if err != nil { return 0, 0, 0, 0, 0, err } - storedOffset = binary.BigEndian.Uint64(storedOffsetData) - metaLenData, err := s.data.Read(int(storedOffset), int(storedOffset+binary.MaxVarintLen64)) + // document chunk coder + chunkI := docNum / uint64(defaultDocumentChunkSize) + chunkOffsetStart := s.storedFieldChunkOffsets[int(chunkI)] + chunkOffsetEnd := s.storedFieldChunkOffsets[int(chunkI)+1] + compressed, err := s.data.Read(int(chunkOffsetStart), int(chunkOffsetEnd)) if err != nil { return 0, 0, 0, 0, 0, err } + s.storedFieldChunkUncompressed = s.storedFieldChunkUncompressed[:0] + s.storedFieldChunkUncompressed, err = ZSTDDecompress(s.storedFieldChunkUncompressed[:cap(s.storedFieldChunkUncompressed)], compressed) + if err != nil { + return 0, 0, 0, 0, 0, err + } + + metaLenData := s.storedFieldChunkUncompressed[int(storedOffset):int(storedOffset+binary.MaxVarintLen64)] var read int metaLen, read = binary.Uvarint(metaLenData) n += uint64(read) - dataLenData, err := s.data.Read(int(storedOffset+n), int(storedOffset+n+binary.MaxVarintLen64)) - if err != nil { - return 0, 0, 0, 0, 0, err - } + dataLenData := s.storedFieldChunkUncompressed[int(storedOffset+n):int(storedOffset+n+binary.MaxVarintLen64)] dataLen, read = binary.Uvarint(dataLenData) n += uint64(read) return indexOffset, storedOffset, n, metaLen, dataLen, nil } -func (s *Segment) getDocStoredOffsetsOnly(docNum int) (indexOffset, storedOffset uint64, err error) { - indexOffset = s.footer.storedIndexOffset + (fileAddrWidth * uint64(docNum)) - +func (s *Segment) getDocStoredOffsetsOnly(docNum uint64) (indexOffset, storedOffset uint64, err error) { + indexOffset = s.footer.storedIndexOffset + (fileAddrWidth * docNum) storedOffsetData, err := s.data.Read(int(indexOffset), int(indexOffset+fileAddrWidth)) if err != nil { return 0, 0, err diff --git a/segment.go b/segment.go index 5b77791..19b768c 100644 --- a/segment.go +++ b/segment.go @@ -25,10 +25,9 @@ import ( "github.com/RoaringBitmap/roaring" "github.com/blevesearch/vellum" segment "github.com/blugelabs/bluge_segment_api" - "github.com/golang/snappy" ) -const Version uint32 = 1 +const Version uint32 = 2 const Type string = "ice" @@ -41,6 +40,9 @@ type Segment struct { fieldDocs map[uint16]uint64 // fieldID -> # docs with value in field fieldFreqs map[uint16]uint64 // fieldID -> # total tokens in field + storedFieldChunkOffsets []uint64 // stored field chunk offset + storedFieldChunkUncompressed []byte // for uncompress cache + dictLocs []uint64 fieldDvReaders map[uint16]*docValueReader // naive chunk cache per field fieldDvNames []string // field names cached in fieldDvReaders @@ -192,18 +194,13 @@ func (s *Segment) visitDocument(vdc *visitDocumentCtx, num uint64, visitor segment.StoredFieldVisitor) error { // first make sure this is a valid number in this segment if num < s.footer.numDocs { - meta, compressed, err := s.getDocStoredMetaAndCompressed(num) + meta, uncompressed, err := s.getDocStoredMetaAndUnCompressed(num) if err != nil { return err } vdc.reader.Reset(meta) - uncompressed, err := snappy.Decode(vdc.buf[:cap(vdc.buf)], compressed) - if err != nil { - return err - } - var keepGoing = true for keepGoing { field, err := binary.ReadUvarint(&vdc.reader) diff --git a/sizes.go b/sizes.go index e851daa..b8ade57 100644 --- a/sizes.go +++ b/sizes.go @@ -25,6 +25,8 @@ func init() { sizeOfString = int(reflect.TypeOf(str).Size()) var u16 uint16 sizeOfUint16 = int(reflect.TypeOf(u16).Size()) + var u32 uint32 + sizeOfUint32 = int(reflect.TypeOf(u32).Size()) var u64 uint64 sizeOfUint64 = int(reflect.TypeOf(u64).Size()) reflectStaticSizeSegment = int(reflect.TypeOf(Segment{}).Size()) @@ -45,6 +47,7 @@ func init() { var sizeOfPtr int var sizeOfString int var sizeOfUint16 int +var sizeOfUint32 int var sizeOfUint64 int var reflectStaticSizeSegment int var reflectStaticSizeMetaData int diff --git a/write.go b/write.go index b42f214..d177380 100644 --- a/write.go +++ b/write.go @@ -90,7 +90,11 @@ func writePostings(postings *roaring.Bitmap, tfEncoder, locEncoder *chunkedIntCo return 0, err } - n = binary.PutUvarint(bufMaxVarintLen64, locOffset) + if locOffset > 0 && tfOffset > 0 { + n = binary.PutUvarint(bufMaxVarintLen64, locOffset-tfOffset) + } else { + n = binary.PutUvarint(bufMaxVarintLen64, locOffset) + } _, err = w.Write(bufMaxVarintLen64[:n]) if err != nil { return 0, err @@ -127,6 +131,7 @@ func numUvarintBytes(x uint64) (n int) { // then writes out the roaring bitmap itself func writeRoaringWithLen(r *roaring.Bitmap, w io.Writer, reuseBufVarint []byte) (int, error) { + r.RunOptimize() buf, err := r.ToBytes() if err != nil { return 0, err diff --git a/zstd.go b/zstd.go new file mode 100644 index 0000000..7e053da --- /dev/null +++ b/zstd.go @@ -0,0 +1,71 @@ +/* + * Copyright 2019 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ice + +import ( + "log" + "sync" + + "github.com/klauspost/compress/zstd" +) + +const ZSTDCompressionLevel = 3 // 1, 3, 9 + +var ( + decoder *zstd.Decoder + encoder *zstd.Encoder + + encOnce, decOnce sync.Once +) + +// ZSTDDecompress decompresses a block using ZSTD algorithm. +func ZSTDDecompress(dst, src []byte) ([]byte, error) { + decOnce.Do(func() { + var err error + decoder, err = zstd.NewReader(nil) + if err != nil { + log.Panicf("ZSTDDecompress: %+v", err) + } + }) + return decoder.DecodeAll(src, dst[:0]) +} + +// ZSTDCompress compresses a block using ZSTD algorithm. +func ZSTDCompress(dst, src []byte, compressionLevel int) ([]byte, error) { + encOnce.Do(func() { + var err error + level := zstd.EncoderLevelFromZstd(compressionLevel) + encoder, err = zstd.NewWriter(nil, zstd.WithEncoderLevel(level)) + if err != nil { + log.Panicf("ZSTDCompress: %+v", err) + } + }) + return encoder.EncodeAll(src, dst[:0]), nil +} + +// ZSTDCompressBound returns the worst case size needed for a destination buffer. +// Klauspost ZSTD library does not provide any API for Compression Bound. This +// calculation is based on the DataDog ZSTD library. +// See https://pkg.go.dev/github.com/DataDog/zstd#CompressBound +func ZSTDCompressBound(srcSize int) int { + lowLimit := 128 << 10 // 128 kB + var margin int + if srcSize < lowLimit { + margin = (lowLimit - srcSize) >> 11 + } + return srcSize + (srcSize >> 8) + margin +}