Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add chunked encoding of large directories #16

Merged
merged 3 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
go-version: [1.21.x,1.22.x]
go-version: [1.21.x,1.22.x,1.23.x]
os: [ubuntu-latest]
steps:
- name: Set up Go ${{ matrix.go-version }} on ${{ matrix.os }}
Expand All @@ -30,6 +30,6 @@ jobs:
env:
GO111MODULE: on
run: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.59.0
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.60.1
$(go env GOPATH)/bin/golangci-lint run --timeout=5m --config ./.golangci.yml
go test -race ./...
9 changes: 4 additions & 5 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
linters-settings:
golint:
min-confidence: 0

staticcheck:
checks:
- all
- '-SA6002' # disable the rule SA6002, slices are fine.
misspell:
locale: US

Expand All @@ -24,5 +25,3 @@ issues:
- comment on exported method
- should have comment or be unexported
- error strings should not be capitalized or end with punctuation or a newline
service:
golangci-lint-version: 1.59.0 # use the fixed version to not introduce new linters unexpectedly
236 changes: 217 additions & 19 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"path/filepath"
"sort"
"strings"
"sync"

"github.com/klauspost/compress/zstd"
"github.com/tinylib/msgp/msgp"
Expand Down Expand Up @@ -92,6 +93,7 @@ type files []File
const currentVerPlain = 1
const currentVerCompressed = 2
const currentVerCompressedStructs = 3
const currentVerCompressedStructsChunked = 4

var zstdEnc, _ = zstd.NewWriter(nil, zstd.WithWindowSize(128<<10), zstd.WithEncoderConcurrency(2), zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
var zstdDec, _ = zstd.NewReader(nil, zstd.WithDecoderLowmem(true), zstd.WithDecoderConcurrency(2), zstd.WithDecoderMaxMemory(MaxIndexSize), zstd.WithDecoderMaxWindow(8<<20))
Expand Down Expand Up @@ -132,7 +134,44 @@ func (f Files) Serialize() ([]byte, error) {
res = append(res, currentVerCompressed)
return zstdEnc.EncodeAll(payload, res), nil
}
const chunkN = 25000
if len(f) < chunkN {
return f.encodeAoS(nil)
}
f.SortByName()
res := append([]byte{}, currentVerCompressedStructsChunked)
left := f
var tmp []byte
for len(left) > 0 {
todo := left
if len(left) < chunkN*2 && len(left) > chunkN {
todo = left[:len(left)/2]
} else if len(left) > chunkN {
todo = left[:chunkN]
}
ch := chunk{
Files: len(todo),
First: todo[0].Name,
Last: todo[len(todo)-1].Name,
}
// Sort each chunk for smaller offsets.
todo.Sort()
var err error
tmp, err = todo.encodeAoS(tmp[:0])
if err != nil {
return nil, err
}
ch.Payload = tmp
res, err = ch.MarshalMsg(res)
if err != nil {
return nil, err
}
left = left[len(todo):]
}
return res, nil
}

func (f Files) encodeAoS(res []byte) ([]byte, error) {
// Encode many files as struct of arrays...
x := filesAsStructs{
Names: make([][]byte, len(f)),
Expand Down Expand Up @@ -173,7 +212,6 @@ func (f Files) Serialize() ([]byte, error) {
if err != nil {
return nil, err
}
res := make([]byte, 0, len(payload))
res = append(res, currentVerCompressedStructs)
return zstdEnc.EncodeAll(payload, res), nil
}
Expand Down Expand Up @@ -209,6 +247,20 @@ func (f Files) Sort() {
}
}

// SortByName will sort files by file name in zip file.
func (f Files) SortByName() {
less := func(i, j int) bool {
a, b := f[i], f[j]
if a.Name != b.Name {
return a.Name < b.Name
}
return a.Offset < b.Offset
}
if !sort.SliceIsSorted(f, less) {
sort.Slice(f, less)
}
}

// Find the file with the provided name.
// Search is linear.
func (f Files) Find(name string) *File {
Expand Down Expand Up @@ -243,41 +295,52 @@ func (f Files) StripFlags(mask uint16) {
}
}

var decompBuffer sync.Pool

// unpackPayload unpacks and optionally decompresses the payload.
func unpackPayload(b []byte) ([]byte, bool, error) {
func unpackPayload(b []byte) (payload []byte, typ byte, newBuf bool, err error) {
if len(b) < 1 {
return nil, false, io.ErrUnexpectedEOF
return nil, 0, false, io.ErrUnexpectedEOF
}
if len(b) > MaxIndexSize {
return nil, false, ErrMaxSizeExceeded
return nil, 0, false, ErrMaxSizeExceeded
}
var out []byte
switch b[0] {
case currentVerPlain:
typ = b[0]
switch typ {
case currentVerPlain, currentVerCompressedStructsChunked:
out = b[1:]
case currentVerCompressed, currentVerCompressedStructs:
decoded, err := zstdDec.DecodeAll(b[1:], nil)
newBuf = true
dst, _ := decompBuffer.Get().([]byte)
// It is ok if we get a nil buffer, the decoder will allocate.

decoded, err := zstdDec.DecodeAll(b[1:], dst[:0])
if err != nil {
switch err {
case zstd.ErrDecoderSizeExceeded, zstd.ErrWindowSizeExceeded:
err = ErrMaxSizeExceeded
}
return nil, false, err
decompBuffer.Put(dst)
return nil, typ, false, err
}
out = decoded
default:
return nil, false, errors.New("unknown version")
return nil, typ, false, errors.New("unknown version")
}
return out, b[0] == currentVerCompressedStructs, nil
return out, typ, newBuf, nil
}

// DeserializeFiles will de-serialize the files.
func DeserializeFiles(b []byte) (Files, error) {
b, structs, err := unpackPayload(b)
b, typ, newBuf, err := unpackPayload(b)
if err != nil {
return nil, err
}
if !structs {
if newBuf {
defer decompBuffer.Put(b)
}
if typ == currentVerCompressed || typ == currentVerPlain {
var dst files
// Check number of files.
nFiles, _, err := msgp.ReadArrayHeaderBytes(b)
Expand All @@ -290,15 +353,53 @@ func DeserializeFiles(b []byte) (Files, error) {
_, err = dst.UnmarshalMsg(b)
return Files(dst), err
}
if typ == currentVerCompressedStructsChunked {
var dst Files
var c chunk
tmp, _ := decompBuffer.Get().([]byte)
defer func() {
if tmp != nil {
decompBuffer.Put(tmp)
}
}()
for len(b) > 0 {
b, err = c.UnmarshalMsgZC(b)
if err != nil {
return nil, err
}
if len(c.Payload) == 0 {
return nil, errors.New("missing payload")
}
switch c.Payload[0] {
case currentVerCompressedStructs:
tmp, err = zstdDec.DecodeAll(c.Payload[1:], tmp[:0])
if err != nil {
return nil, err
}
dst, err = deserializeAoS(tmp, dst)
if err != nil {
return nil, err
}
default:
return nil, errors.New("unknown version")
}
}
return dst, nil
}
return deserializeAoS(b, nil)
}

func deserializeAoS(b []byte, files Files) (Files, error) {
var dst filesAsStructs
var err error
if _, err = dst.UnmarshalMsg(b); err != nil {
return nil, err
}

files := make(Files, len(dst.Names))
start := len(files)
files = append(files, make(Files, len(dst.Names))...)
add := files[start:]
var cur File
for i := range files {
for i := range add {
cur = File{
Name: string(dst.Names[i]),
CompressedSize64: uint64(dst.CSizes[i] + int64(cur.CompressedSize64)),
Expand All @@ -310,17 +411,18 @@ func DeserializeFiles(b []byte) (Files, error) {
if i == 0 {
cur.Offset = dst.Offsets[i]
} else {
cur.Offset = dst.Offsets[i] + files[i-1].Offset + int64(files[i-1].CompressedSize64) + fileHeaderLen + int64(len(files[i-1].Name)) + dataDescriptorLen
cur.Offset = dst.Offsets[i] + add[i-1].Offset + int64(add[i-1].CompressedSize64) + fileHeaderLen + int64(len(add[i-1].Name)) + dataDescriptorLen
}
if len(dst.Custom[i]) > 0 {
if cur.Custom, err = readCustomData(dst.Custom[i]); err != nil {
return nil, err
}
}
files[i] = cur
add[i] = cur

}
return files, err

}

func readCustomData(bts []byte) (dst map[string]string, err error) {
Expand Down Expand Up @@ -360,11 +462,14 @@ func readCustomData(bts []byte) (dst map[string]string, err error) {
// Expected speed scales O(n) for n files.
// Returns nil, io.EOF if not found.
func FindSerialized(b []byte, name string) (*File, error) {
buf, structs, err := unpackPayload(b)
buf, typ, newBuf, err := unpackPayload(b)
if err != nil {
return nil, err
}
if !structs {
if newBuf {
defer decompBuffer.Put(buf)
}
if typ == currentVerCompressed || typ == currentVerPlain {
n, buf, err := msgp.ReadArrayHeaderBytes(buf)
if err != nil {
return nil, err
Expand All @@ -385,6 +490,38 @@ func FindSerialized(b []byte, name string) (*File, error) {
return nil, io.EOF
}

if typ == currentVerCompressedStructsChunked {
var c chunk
for len(buf) > 0 {
buf, err = c.UnmarshalMsgZC(buf)
if err != nil {
return nil, err
}
if name < c.First {
return nil, io.EOF
}
if name >= c.First && name <= c.Last {
if len(c.Payload) == 0 {
return nil, errors.New("missing payload")
}
buf = c.Payload
if buf[0] != currentVerCompressedStructs {
return nil, errors.New("unknown chunk")
}
dst, _ := decompBuffer.Get().([]byte)
defer decompBuffer.Put(dst)
buf, err = zstdDec.DecodeAll(buf[1:], dst[:0])
if err != nil {
return nil, err
}
break
}
if len(buf) == 0 {
return nil, io.EOF
}
}
}

// Files are packed as an array of arrays...
idx := -1
var zb0001 uint32
Expand Down Expand Up @@ -812,3 +949,64 @@ func (z *filesAsStructs) UnmarshalMsg(bts []byte) (o []byte, err error) {
o = bts
return
}

type chunk struct {
Files int
First string
Last string
Payload []byte
}

// UnmarshalMsgZC unmarshals, but does a zero copy bytes
func (z *chunk) UnmarshalMsgZC(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "Files":
z.Files, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Files")
return
}
case "First":
z.First, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "First")
return
}
case "Last":
z.Last, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Last")
return
}
case "Payload":
z.Payload, bts, err = msgp.ReadBytesZC(bts)
if err != nil {
err = msgp.WrapError(err, "Payload")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
Loading
Loading