Skip to content

Commit

Permalink
compress boltdb files to gzip while uploading from shipper (#2507)
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepsukhani authored Aug 24, 2020
1 parent 501f93f commit 97dfb29
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 18 deletions.
12 changes: 11 additions & 1 deletion pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"go.etcd.io/bbolt"

"github.com/grafana/loki/pkg/chunkenc"
)

// timeout for downloading initial files for a table to avoid leaking resources by allowing it to take all the time.
Expand Down Expand Up @@ -391,7 +393,15 @@ func (t *Table) getFileFromStorage(ctx context.Context, objectKey, destination s
return err
}

_, err = io.Copy(f, readCloser)
var objectReader io.Reader = readCloser
if strings.HasSuffix(objectKey, ".gz") {
decompressedReader := chunkenc.Gzip.GetReader(readCloser)
defer chunkenc.Gzip.PutReader(decompressedReader)

objectReader = decompressedReader
}

_, err = io.Copy(f, objectReader)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/downloads/table_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestTableManager_QueryPages(t *testing.T) {
var queries []chunk.IndexQuery
for name, dbs := range tables {
queries = append(queries, chunk.IndexQuery{TableName: name})
testutil.SetupDBTablesAtPath(t, name, objectStoragePath, dbs)
testutil.SetupDBTablesAtPath(t, name, objectStoragePath, dbs, true)
}

tableManager, _, stopFunc := buildTestTableManager(t, tempDir)
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/stores/shipper/downloads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestTable_Query(t *testing.T) {
},
}

testutil.SetupDBTablesAtPath(t, "test", objectStoragePath, testDBs)
testutil.SetupDBTablesAtPath(t, "test", objectStoragePath, testDBs, true)

table, _, stopFunc := buildTestTable(t, "test", tempDir)
defer func() {
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestTable_Sync(t *testing.T) {
}

// setup the table in storage with some records
testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, testDBs)
testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, testDBs, false)

// create table instance
table, boltdbClient, stopFunc := buildTestTable(t, "test", tempDir)
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestTable_doParallelDownload(t *testing.T) {
}
}

testutil.SetupDBTablesAtPath(t, fmt.Sprint(tc), objectStoragePath, testDBs)
testutil.SetupDBTablesAtPath(t, fmt.Sprint(tc), objectStoragePath, testDBs, true)

table, _, stopFunc := buildTestTable(t, fmt.Sprint(tc), tempDir)
defer func() {
Expand Down
29 changes: 28 additions & 1 deletion pkg/storage/stores/shipper/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package testutil

import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"sync"
Expand All @@ -10,6 +13,7 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
"github.com/klauspost/compress/gzip"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)
Expand Down Expand Up @@ -131,7 +135,7 @@ type DBRecords struct {
Start, NumRecords int
}

func SetupDBTablesAtPath(t *testing.T, tableName, path string, dbs map[string]DBRecords) string {
func SetupDBTablesAtPath(t *testing.T, tableName, path string, dbs map[string]DBRecords, compressRandomFiles bool) string {
boltIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: path})
require.NoError(t, err)

Expand All @@ -140,9 +144,32 @@ func SetupDBTablesAtPath(t *testing.T, tableName, path string, dbs map[string]DB
tablePath := filepath.Join(path, tableName)
require.NoError(t, chunk_util.EnsureDirectory(tablePath))

var i int
for name, dbRecords := range dbs {
AddRecordsToDB(t, filepath.Join(tablePath, name), boltIndexClient, dbRecords.Start, dbRecords.NumRecords)
if compressRandomFiles && i%2 == 0 {
compressFile(t, filepath.Join(tablePath, name))
}
i++
}

return tablePath
}

func compressFile(t *testing.T, filepath string) {
uncompressedFile, err := os.Open(filepath)
require.NoError(t, err)

compressedFile, err := os.Create(fmt.Sprintf("%s.gz", filepath))
require.NoError(t, err)

compressedWriter := gzip.NewWriter(compressedFile)

_, err = io.Copy(compressedWriter, uncompressedFile)
require.NoError(t, err)

require.NoError(t, compressedWriter.Close())
require.NoError(t, uncompressedFile.Close())
require.NoError(t, compressedFile.Close())
require.NoError(t, os.Remove(filepath))
}
20 changes: 16 additions & 4 deletions pkg/storage/stores/shipper/uploads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"go.etcd.io/bbolt"

"github.com/grafana/loki/pkg/chunkenc"
)

const (
Expand Down Expand Up @@ -264,9 +266,19 @@ func (lt *Table) uploadDB(ctx context.Context, name string, db *bbolt.DB) error
}
}()

err = db.View(func(tx *bbolt.Tx) error {
_, err := tx.WriteTo(f)
return err
err = db.View(func(tx *bbolt.Tx) (err error) {
compressedWriter := chunkenc.Gzip.GetWriter(f)
defer chunkenc.Gzip.PutWriter(compressedWriter)

defer func() {
cerr := compressedWriter.Close()
if err == nil {
err = cerr
}
}()

_, err = tx.WriteTo(compressedWriter)
return
})
if err != nil {
return err
Expand Down Expand Up @@ -333,7 +345,7 @@ func (lt *Table) buildObjectKey(dbName string) string {
objectKey = fmt.Sprintf("%s/%s", lt.name, lt.uploader)
}

return objectKey
return fmt.Sprintf("%s.gz", objectKey)
}

func loadBoltDBsFromDir(dir string) (map[string]*bbolt.DB, error) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/stores/shipper/uploads/table_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package uploads

import (
"context"
"github.com/cortexproject/cortex/pkg/chunk/local"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
"github.com/grafana/loki/pkg/storage/stores/shipper/testutil"
"github.com/stretchr/testify/require"
)
Expand All @@ -27,7 +27,7 @@ func buildTestTableManager(t *testing.T, testDir string) (*TableManager, *local.
IndexDir: indexPath,
UploadInterval: time.Hour,
}
tm, err := NewTableManager(cfg, boltDBIndexClient, storageClient, nil)
tm, err := NewTableManager(cfg, boltDBIndexClient, storageClient, nil)
require.NoError(t, err)

return tm, boltDBIndexClient, func() {
Expand Down Expand Up @@ -64,7 +64,7 @@ func TestLoadTables(t *testing.T) {
Start: 20,
NumRecords: 10,
},
})
}, false)

// table2 with 2 dbs
testutil.SetupDBTablesAtPath(t, "table2", indexPath, map[string]testutil.DBRecords{
Expand All @@ -76,7 +76,7 @@ func TestLoadTables(t *testing.T) {
Start: 40,
NumRecords: 10,
},
})
}, false)

expectedTables := map[string]struct {
start, numRecords int
Expand Down
41 changes: 37 additions & 4 deletions pkg/storage/stores/shipper/uploads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ package uploads
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"

"github.com/grafana/loki/pkg/storage/stores/shipper/testutil"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
"github.com/klauspost/compress/gzip"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/stores/shipper/testutil"
)

const (
Expand Down Expand Up @@ -74,7 +76,7 @@ func TestLoadTable(t *testing.T) {
Start: 10,
NumRecords: 10,
},
})
}, false)

// try loading the table.
table, err := LoadTable(tablePath, "test", nil, boltDBIndexClient)
Expand Down Expand Up @@ -200,13 +202,44 @@ func TestTable_Upload(t *testing.T) {
}

func compareTableWithStorage(t *testing.T, table *Table, storageDir string) {
// use a temp dir for decompressing the files before comparison.
tempDir, err := ioutil.TempDir("", "compare-table")
require.NoError(t, err)

defer func() {
require.NoError(t, os.RemoveAll(tempDir))
}()

for name, db := range table.dbs {
objectKey := table.buildObjectKey(name)
storageDB, err := local.OpenBoltdbFile(filepath.Join(storageDir, objectKey))

// open compressed file from storage
compressedFile, err := os.Open(filepath.Join(storageDir, objectKey))
require.NoError(t, err)

// get a compressed reader
compressedReader, err := gzip.NewReader(compressedFile)
require.NoError(t, err)

// create a temp file for writing decompressed file
decompressedFilePath := filepath.Join(tempDir, filepath.Base(objectKey))
decompressedFile, err := os.Create(decompressedFilePath)
require.NoError(t, err)

// do the decompression
_, err = io.Copy(decompressedFile, compressedReader)
require.NoError(t, err)

// close the references
require.NoError(t, compressedFile.Close())
require.NoError(t, decompressedFile.Close())

storageDB, err := local.OpenBoltdbFile(decompressedFilePath)
require.NoError(t, err)

testutil.CompareDBs(t, db, storageDB)
require.NoError(t, storageDB.Close())
require.NoError(t, os.Remove(decompressedFilePath))
}
}

Expand Down

0 comments on commit 97dfb29

Please sign in to comment.