Skip to content

Commit

Permalink
Purge/resume index (#735)
Browse files Browse the repository at this point in the history
* added resume flag for reverse-lookup index build

Signed-off-by: Frédéric BIDON <fredbi@yahoo.com>

* implemented resume option, preloading all index files before rebuildng the index

Signed-off-by: Frédéric BIDON <fredbi@yahoo.com>

* parallelized index chunks download

Signed-off-by: Frédéric BIDON <fredbi@yahoo.com>

* split badger DB specifics

Signed-off-by: Frédéric BIDON <fredbi@yahoo.com>

* added KV implementation with pebble

Signed-off-by: Frédéric BIDON <fredbi@yahoo.com>

* added logging when retrieving index chunks

Signed-off-by: Frédéric BIDON <fredbi@yahoo.com>

* fix linting

Signed-off-by: Frédéric BIDON <fredbi@yahoo.com>

* fixed pebble merge operator

Signed-off-by: Frédéric BIDON <fredbi@yahoo.com>

* fixed options passing for resume mode

Signed-off-by: Frédéric BIDON <fredbi@yahoo.com>

* fixed typos

Signed-off-by: Frédéric BIDON <fredbi@yahoo.com>

* fixed gap in index numbering

Signed-off-by: Frederic BIDON <frederic@oneconcern.com>

* fixed retrieval of the latest index chunk index

Signed-off-by: Frederic BIDON <frederic@oneconcern.com>

Signed-off-by: Frédéric BIDON <fredbi@yahoo.com>
Signed-off-by: Frederic BIDON <frederic@oneconcern.com>
  • Loading branch information
fredbi authored Jan 4, 2023
1 parent 2912cbd commit a7d8684
Show file tree
Hide file tree
Showing 22 changed files with 1,228 additions and 382 deletions.
10 changes: 5 additions & 5 deletions cmd/datamon/cmd/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ func testDownloadBundle(t *testing.T, files []uploadTree, bcnt int) {
}, "download bundle uploaded from "+dirPathStr(t, files[0]), false)
exists, err = afero.Exists(destFS, dpc)
require.NoError(t, err, "error out of afero upstream library. possibly programming error in test.")
require.True(t, exists, "filesystem entry at at destination path '"+dpc+"' after bundle upload")
require.True(t, exists, "filesystem entry at destination path '"+dpc+"' after bundle upload")
//
for _, file := range files {
expected := readTextFile(t, filePathStr(t, file))
Expand Down Expand Up @@ -826,7 +826,7 @@ func TestDownloadBundleNameFilter(t *testing.T) {
}, "download bundle uploaded from "+dirPathStr(t, files[0]), false)
exists, err = afero.Exists(destFS, dpc)
require.NoError(t, err, "error out of afero upstream library. possibly programming error in test.")
require.True(t, exists, "filesystem entry at at destination path '"+dpc+"' after bundle upload")
require.True(t, exists, "filesystem entry at destination path '"+dpc+"' after bundle upload")
//
fileInfos := make(map[string]os.FileInfo)
err = afero.Walk(destFS, dpc, func(path string, info os.FileInfo, err error) error {
Expand Down Expand Up @@ -904,7 +904,7 @@ func TestDiffBundle(t *testing.T) {
}, "download bundle uploaded from "+dirPathStr(t, files[0]), false)
exists, err = afero.Exists(destFS, dpc)
require.NoError(t, err, "error out of afero upstream library. possibly programming error in test.")
require.True(t, exists, "filesystem entry at at destination path '"+dpc+"' after bundle upload")
require.True(t, exists, "filesystem entry at destination path '"+dpc+"' after bundle upload")
//
srcFS := afero.NewBasePathFs(afero.NewOsFs(), dirPathStr(t, files[0]))
delFile := files[0]
Expand Down Expand Up @@ -1029,7 +1029,7 @@ func TestUpdateBundle(t *testing.T) {
}, "download bundle uploaded from "+dirPathStr(t, files[0]), false)
exists, err = afero.Exists(destFS, dpc)
require.NoError(t, err, "error out of afero upstream library. possibly programming error in test.")
require.True(t, exists, "filesystem entry at at destination path '"+dpc+"' after bundle upload")
require.True(t, exists, "filesystem entry at destination path '"+dpc+"' after bundle upload")
//
srcFS := afero.NewBasePathFs(afero.NewOsFs(), dirPathStr(t, files[0]))
delFile := files[0]
Expand Down Expand Up @@ -1160,7 +1160,7 @@ func TestDownloadBundleByLabel(t *testing.T) {
}, "download bundle uploaded from "+dirPathStr(t, files[0]), false)
exists, err = afero.Exists(destFS, dpc)
require.NoError(t, err, "error out of afero upstream library. possibly programming error in test.")
require.True(t, exists, "filesystem entry at at destination path '"+dpc+"' after bundle upload")
require.True(t, exists, "filesystem entry at destination path '"+dpc+"' after bundle upload")
//
for _, file := range files {
expected := readTextFile(t, filePathStr(t, file))
Expand Down
9 changes: 9 additions & 0 deletions cmd/datamon/cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type flagsT struct {
Force bool
LocalStorePath string
DryRun bool
Resume bool
}
}

Expand Down Expand Up @@ -603,6 +604,14 @@ func addPurgeLocalPathFlag(cmd *cobra.Command) string {
return c
}

func addPurgeResumeFlag(cmd *cobra.Command) string {
const c = "resume"
if cmd != nil {
cmd.Flags().BoolVar(&datamonFlags.purge.Resume, c, false, "Resume index building: reload already uploaded index files (implies --force)")
}
return c
}

/** parameters struct from other formats */

// apply config file + env vars to structure used to parse cli flags
Expand Down
6 changes: 6 additions & 0 deletions cmd/datamon/cmd/purge_reverse_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,14 @@ You MUST make sure that no concurrent build-reverse-lookup or delete job is stil
}
logger, err := optionInputs.getLogger()

if datamonFlags.purge.Resume {
datamonFlags.purge.Force = true
}

logger.Info("building reverse-lookup index",
zap.String("context", datamonFlags.context.Descriptor.Name),
zap.Bool("force?", datamonFlags.purge.Force),
zap.Bool("resume?", datamonFlags.purge.Resume),
zap.String("context BLOB bucket", datamonFlags.context.Descriptor.Blob),
zap.String("context metadata bucket", datamonFlags.context.Descriptor.Metadata),
)
Expand All @@ -62,6 +67,7 @@ You MUST make sure that no concurrent build-reverse-lookup or delete job is stil
core.WithPurgeLocalStore(datamonFlags.purge.LocalStorePath),
core.WithPurgeExtraContexts(extraContexts),
core.WithPurgeParallel(datamonFlags.bundle.ConcurrencyFactor),
core.WithPurgeResumeIndex(datamonFlags.purge.Resume),
}

err = core.PurgeLock(remoteStores, opts...)
Expand Down
2 changes: 2 additions & 0 deletions cmd/datamon/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ func init() {
addConcurrencyFactorFlag(deleteUnusedCmd, 100)
addConcurrencyFactorFlag(reverseLookupCmd, 100)

addPurgeResumeFlag(reverseLookupCmd)

purgeCmd.AddCommand(reverseLookupCmd)
purgeCmd.AddCommand(deleteUnusedCmd)
purgeCmd.AddCommand(deleteLookupCmd)
Expand Down
13 changes: 10 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,37 @@ replace github.com/spf13/pflag => github.com/fredbi/pflag v1.0.6-0.2020110615442
require (
cloud.google.com/go v0.107.0 // indirect
cloud.google.com/go/storage v1.28.1
github.com/DataDog/zstd v1.5.2 // indirect
github.com/PuerkitoBio/goquery v1.8.0
github.com/aws/aws-sdk-go v1.44.171
github.com/blang/semver v3.5.1+incompatible
github.com/cenkalti/backoff/v4 v4.2.0
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/errors v1.9.0 // indirect
github.com/cockroachdb/pebble v0.0.0-20221229212011-811a8c0e741b
github.com/davecgh/go-spew v1.1.1
github.com/dgraph-io/badger/v3 v3.2103.5
github.com/docker/go-units v0.5.0
github.com/getsentry/sentry-go v0.16.0 // indirect
github.com/go-chi/chi v4.1.2+incompatible
github.com/go-openapi/runtime v0.25.0
github.com/gobuffalo/packd v1.0.2
github.com/gobuffalo/packr/v2 v2.8.3
github.com/google/go-querystring v1.1.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1
github.com/hashicorp/golang-lru v0.6.0
github.com/influxdata/influxdb v1.11.0
github.com/jacobsa/daemonize v0.0.0-20160101105449-e460293e890f
github.com/jacobsa/fuse v0.0.0-20220531202254-21122235c77a
github.com/karrick/godirwalk v1.17.0
github.com/klauspost/compress v1.15.13 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1
github.com/nightlyone/lockfile v1.0.0
github.com/opentracing/opentracing-go v1.2.0
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8
github.com/prometheus/common v0.39.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/rhysd/go-github-selfupdate v1.2.3
github.com/segmentio/ksuid v1.0.4
github.com/spf13/afero v1.9.3
Expand All @@ -39,8 +47,7 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/goleak v1.2.0
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect
golang.org/x/oauth2 v0.2.0 // indirect
golang.org/x/exp v0.0.0-20221230185412-738e83a70c30 // indirect
golang.org/x/sync v0.1.0
golang.org/x/sys v0.3.0
google.golang.org/api v0.105.0
Expand Down
Loading

0 comments on commit a7d8684

Please sign in to comment.