From ead97c76159bf081dbb7bcbcba99e9f026aa1c63 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Mon, 14 Aug 2023 12:32:39 +0200 Subject: [PATCH 1/8] feat: keep last log in memory --- go.mod | 2 + go.sum | 5 ++ .../transaction_controller_test.go | 10 ++-- pkg/ledger/ledger.go | 6 +- pkg/storage/sqlstorage/commit.go | 1 + pkg/storage/sqlstorage/driver.go | 34 +++++++----- pkg/storage/sqlstorage/logs.go | 55 ++++++++++--------- pkg/storage/sqlstorage/store_ledger.go | 4 +- 8 files changed, 68 insertions(+), 49 deletions(-) diff --git a/go.mod b/go.mod index d2215350d..cc808034b 100755 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/containerd/continuity v0.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/docker/cli v20.10.17+incompatible // indirect github.com/docker/docker v20.10.17+incompatible // indirect @@ -120,6 +121,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/segmentio/backo-go v1.0.1 // indirect + github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb // indirect github.com/spf13/afero v1.9.3 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect diff --git a/go.sum b/go.sum index 102330112..cac5e077c 100644 --- a/go.sum +++ b/go.sum @@ -121,6 +121,8 @@ github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWa github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y= +github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E= +github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/docker/cli v20.10.17+incompatible h1:eO2KS7ZFeov5UJeaDmIs1NFEDRf32PaqRpvoEkKBy5M= @@ -464,6 +466,8 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg= github.com/segmentio/backo-go v1.0.1 h1:68RQccglxZeyURy93ASB/2kc9QudzgIDexJ927N++y4= github.com/segmentio/backo-go v1.0.1/go.mod h1:9/Rh6yILuLysoQnZ2oNooD2g7aBnvM7r/fNVxRNWfBc= +github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb h1:XfLJSPIOUX+osiMraVgIrMR27uMXnRJWGm1+GL8/63U= +github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg= github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= @@ -975,6 +979,7 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/pkg/api/controllers/transaction_controller_test.go b/pkg/api/controllers/transaction_controller_test.go index 5a29db639..10b25c996 100644 --- a/pkg/api/controllers/transaction_controller_test.go +++ b/pkg/api/controllers/transaction_controller_test.go @@ -1199,12 +1199,12 @@ func TestTransactions(t *testing.T) { require.Len(t, cursor.Data, 1) require.Equal(t, cursor.Data[0].ID, tx3.ID) - rsp = internal.CountTransactions(api, url.Values{ - "metadata[priority]": []string{"high"}, + rsp = internal.CountTransactions(api, url.Values{ + "metadata[priority]": []string{"high"}, + }) + require.Equal(t, http.StatusOK, rsp.Result().StatusCode) + require.Equal(t, "1", rsp.Header().Get("Count")) }) - require.Equal(t, http.StatusOK, rsp.Result().StatusCode) - require.Equal(t, "1", rsp.Header().Get("Count")) - }) t.Run("after", func(t *testing.T) { rsp := internal.GetTransactions(api, url.Values{ diff --git a/pkg/ledger/ledger.go b/pkg/ledger/ledger.go index 832248b30..6b6d54fa9 100644 --- a/pkg/ledger/ledger.go +++ b/pkg/ledger/ledger.go @@ -54,9 +54,9 @@ func NewLedger(store Store, monitor Monitor, cache *ristretto.Cache, options ... } func (l *Ledger) Close(ctx context.Context) error { - if err := l.store.Close(ctx); err != nil { - return errors.Wrap(err, "closing store") - } + //if err := l.store.Close(ctx); err != nil { + // return errors.Wrap(err, "closing store") + //} return nil } diff --git a/pkg/storage/sqlstorage/commit.go b/pkg/storage/sqlstorage/commit.go index 35cbead29..2643a110a 100644 --- a/pkg/storage/sqlstorage/commit.go +++ b/pkg/storage/sqlstorage/commit.go @@ -30,6 +30,7 @@ func (s *Store) commit(ctx context.Context, txs ...core.ExpandedTransaction) ([] if err != nil { return nil, err } + for _, tx := range txs { newLog := core.NewTransactionLog(lastLog, tx.Transaction) lastLog = &newLog diff --git a/pkg/storage/sqlstorage/driver.go b/pkg/storage/sqlstorage/driver.go index 6369157a9..86c359b8d 100644 --- a/pkg/storage/sqlstorage/driver.go +++ b/pkg/storage/sqlstorage/driver.go @@ -102,7 +102,7 @@ type Driver struct { name string db DB systemSchema Schema - registeredLedgers map[string]struct{} + registeredLedgers map[string]*Store lock sync.Mutex } @@ -125,9 +125,10 @@ func (d *Driver) GetLedgerStore(ctx context.Context, name string, create bool) ( var ( created bool schema Schema - err error + ret *Store + exists bool ) - if _, exists := d.registeredLedgers[name]; !exists { + if ret, exists = d.registeredLedgers[name]; !exists { systemStore := &SystemStore{ systemSchema: d.systemSchema, } @@ -152,19 +153,22 @@ func (d *Driver) GetLedgerStore(ctx context.Context, name string, create bool) ( if err = schema.Initialize(ctx); err != nil { return nil, false, err } - d.registeredLedgers[name] = struct{}{} - } else { - schema, err = d.db.Schema(ctx, name) - if err != nil { - return nil, false, errors.Wrap(err, "opening schema") - } + + ret = NewStore(schema, defaultExecutorProvider(schema), func(ctx context.Context) error { + d.lock.Lock() + defer d.lock.Unlock() + + delete(d.registeredLedgers, name) + return schema.Close(context.Background()) + }, func(ctx context.Context) error { + return d.GetSystemStore().DeleteLedger(ctx, name) + }) + + d.registeredLedgers[name] = ret + created = true } - return NewStore(schema, defaultExecutorProvider(schema), func(ctx context.Context) error { - return schema.Close(context.Background()) - }, func(ctx context.Context) error { - return d.GetSystemStore().DeleteLedger(ctx, name) - }), created, nil + return ret, created, nil } func (d *Driver) Name() string { @@ -223,7 +227,7 @@ func NewDriver(name string, db DB) *Driver { return &Driver{ db: db, name: name, - registeredLedgers: map[string]struct{}{}, + registeredLedgers: map[string]*Store{}, } } diff --git a/pkg/storage/sqlstorage/logs.go b/pkg/storage/sqlstorage/logs.go index db582f1e0..e71ac43d8 100644 --- a/pkg/storage/sqlstorage/logs.go +++ b/pkg/storage/sqlstorage/logs.go @@ -71,40 +71,45 @@ func (s *Store) appendLog(ctx context.Context, log ...core.Log) error { if err != nil { return s.error(err) } + + s.LastLog = &log[len(log)-1] return nil } func (s *Store) GetLastLog(ctx context.Context) (*core.Log, error) { - sb := sqlbuilder.NewSelectBuilder() - sb.From(s.schema.Table("log")) - sb.Select("id", "type", "hash", "date", "data") - sb.OrderBy("id desc") - sb.Limit(1) + if s.LastLog == nil { + sb := sqlbuilder.NewSelectBuilder() + sb.From(s.schema.Table("log")) + sb.Select("id", "type", "hash", "date", "data") + sb.OrderBy("id desc") + sb.Limit(1) + + executor, err := s.executorProvider(ctx) + if err != nil { + return nil, err + } - executor, err := s.executorProvider(ctx) - if err != nil { - return nil, err - } + l := core.Log{} + data := sql.NullString{} + sqlq, _ := sb.BuildWithFlavor(s.schema.Flavor()) + row := executor.QueryRowContext(ctx, sqlq) + if err := row.Scan(&l.ID, &l.Type, &l.Hash, &l.Date, &data); err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + l.Date = l.Date.UTC() - l := core.Log{} - data := sql.NullString{} - sqlq, _ := sb.BuildWithFlavor(s.schema.Flavor()) - row := executor.QueryRowContext(ctx, sqlq) - if err := row.Scan(&l.ID, &l.Type, &l.Hash, &l.Date, &data); err != nil { - if err == sql.ErrNoRows { - return nil, nil + l.Data, err = core.HydrateLog(l.Type, data.String) + if err != nil { + return nil, err } - return nil, err - } - l.Date = l.Date.UTC() + l.Date = l.Date.UTC() - l.Data, err = core.HydrateLog(l.Type, data.String) - if err != nil { - return nil, err + s.LastLog = &l } - l.Date = l.Date.UTC() - - return &l, nil + return s.LastLog, nil } func (s *Store) GetLogs(ctx context.Context, q *ledger.LogsQuery) (api.Cursor[core.Log], error) { diff --git a/pkg/storage/sqlstorage/store_ledger.go b/pkg/storage/sqlstorage/store_ledger.go index d5b16571d..a7945006e 100644 --- a/pkg/storage/sqlstorage/store_ledger.go +++ b/pkg/storage/sqlstorage/store_ledger.go @@ -5,6 +5,7 @@ import ( "github.com/formancehq/stack/libs/go-libs/logging" _ "github.com/jackc/pgx/v5/stdlib" + "github.com/numary/ledger/pkg/core" "github.com/numary/ledger/pkg/ledger" "github.com/pkg/errors" ) @@ -17,7 +18,8 @@ type Store struct { executorProvider func(ctx context.Context) (executor, error) schema Schema onClose func(ctx context.Context) error - onDelete func(ctx context.Context) error + onDelete func(ctx context.Context) error + LastLog *core.Log } func (s *Store) error(err error) error { From cde7024a5c22905d90e4ca3f87c920777601b78d Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Mon, 14 Aug 2023 12:49:45 +0200 Subject: [PATCH 2/8] feat: keep last tx in memory --- pkg/storage/sqlstorage/logs.go | 8 +-- pkg/storage/sqlstorage/store_ledger.go | 3 +- pkg/storage/sqlstorage/transactions.go | 94 +++++++++++++++----------- 3 files changed, 60 insertions(+), 45 deletions(-) diff --git a/pkg/storage/sqlstorage/logs.go b/pkg/storage/sqlstorage/logs.go index e71ac43d8..ebcecb809 100644 --- a/pkg/storage/sqlstorage/logs.go +++ b/pkg/storage/sqlstorage/logs.go @@ -72,12 +72,12 @@ func (s *Store) appendLog(ctx context.Context, log ...core.Log) error { return s.error(err) } - s.LastLog = &log[len(log)-1] + s.lastLog = &log[len(log)-1] return nil } func (s *Store) GetLastLog(ctx context.Context) (*core.Log, error) { - if s.LastLog == nil { + if s.lastLog == nil { sb := sqlbuilder.NewSelectBuilder() sb.From(s.schema.Table("log")) sb.Select("id", "type", "hash", "date", "data") @@ -107,9 +107,9 @@ func (s *Store) GetLastLog(ctx context.Context) (*core.Log, error) { } l.Date = l.Date.UTC() - s.LastLog = &l + s.lastLog = &l } - return s.LastLog, nil + return s.lastLog, nil } func (s *Store) GetLogs(ctx context.Context, q *ledger.LogsQuery) (api.Cursor[core.Log], error) { diff --git a/pkg/storage/sqlstorage/store_ledger.go b/pkg/storage/sqlstorage/store_ledger.go index a7945006e..8b559e296 100644 --- a/pkg/storage/sqlstorage/store_ledger.go +++ b/pkg/storage/sqlstorage/store_ledger.go @@ -19,7 +19,8 @@ type Store struct { schema Schema onClose func(ctx context.Context) error onDelete func(ctx context.Context) error - LastLog *core.Log + lastLog *core.Log + lastTx *core.ExpandedTransaction } func (s *Store) error(err error) error { diff --git a/pkg/storage/sqlstorage/transactions.go b/pkg/storage/sqlstorage/transactions.go index 4dd9e3c1d..b9865dd0b 100644 --- a/pkg/storage/sqlstorage/transactions.go +++ b/pkg/storage/sqlstorage/transactions.go @@ -278,53 +278,57 @@ func (s *Store) GetTransaction(ctx context.Context, txId uint64) (*core.Expanded } func (s *Store) GetLastTransaction(ctx context.Context) (*core.ExpandedTransaction, error) { - sb := sqlbuilder.NewSelectBuilder() - sb.Select("id", "timestamp", "reference", "metadata", "postings", "pre_commit_volumes", "post_commit_volumes") - sb.From(s.schema.Table("transactions")) - sb.OrderBy("id desc") - sb.Limit(1) - - executor, err := s.executorProvider(ctx) - if err != nil { - return nil, err - } + if s.lastTx == nil { + sb := sqlbuilder.NewSelectBuilder() + sb.Select("id", "timestamp", "reference", "metadata", "postings", "pre_commit_volumes", "post_commit_volumes") + sb.From(s.schema.Table("transactions")) + sb.OrderBy("id desc") + sb.Limit(1) + + executor, err := s.executorProvider(ctx) + if err != nil { + return nil, err + } - sqlq, args := sb.BuildWithFlavor(s.schema.Flavor()) - row := executor.QueryRowContext(ctx, sqlq, args...) - if row.Err() != nil { - return nil, s.error(row.Err()) - } + sqlq, args := sb.BuildWithFlavor(s.schema.Flavor()) + row := executor.QueryRowContext(ctx, sqlq, args...) + if row.Err() != nil { + return nil, s.error(row.Err()) + } - tx := core.ExpandedTransaction{ - Transaction: core.Transaction{ - TransactionData: core.TransactionData{ - Postings: core.Postings{}, - Metadata: core.Metadata{}, + tx := core.ExpandedTransaction{ + Transaction: core.Transaction{ + TransactionData: core.TransactionData{ + Postings: core.Postings{}, + Metadata: core.Metadata{}, + }, }, - }, - PreCommitVolumes: core.AccountsAssetsVolumes{}, - PostCommitVolumes: core.AccountsAssetsVolumes{}, - } + PreCommitVolumes: core.AccountsAssetsVolumes{}, + PostCommitVolumes: core.AccountsAssetsVolumes{}, + } - var ref sql.NullString - if err := row.Scan( - &tx.ID, - &tx.Timestamp, - &ref, - &tx.Metadata, - &tx.Postings, - &tx.PreCommitVolumes, - &tx.PostCommitVolumes, - ); err != nil { - if err == sql.ErrNoRows { - return nil, nil + var ref sql.NullString + if err := row.Scan( + &tx.ID, + &tx.Timestamp, + &ref, + &tx.Metadata, + &tx.Postings, + &tx.PreCommitVolumes, + &tx.PostCommitVolumes, + ); err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err } - return nil, err + tx.Timestamp = tx.Timestamp.UTC() + tx.Reference = ref.String + + s.lastTx = &tx } - tx.Timestamp = tx.Timestamp.UTC() - tx.Reference = ref.String - return &tx, nil + return s.lastTx, nil } func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTransaction) error { @@ -487,6 +491,8 @@ func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTran return s.error(err) } + s.lastTx = &txs[len(txs)-1] + return nil } @@ -525,6 +531,14 @@ func (s *Store) UpdateTransactionMetadata(ctx context.Context, id uint64, metada return errors.Wrap(err, "reading last log") } + if s.lastTx.ID == id { + if s.lastTx.Metadata == nil { + s.lastTx.Metadata = metadata + } else { + s.lastTx.Metadata = s.lastTx.Metadata.Merge(metadata) + } + } + return s.appendLog(ctx, core.NewSetMetadataLog(lastLog, at, core.SetMetadata{ TargetType: core.MetaTargetTypeTransaction, TargetID: id, From b2f3ee73792c58f75664c6dc069e59357ee138af Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Mon, 14 Aug 2023 15:13:49 +0200 Subject: [PATCH 3/8] feat: add cuckoo filter --- Dockerfile | 4 +- go.mod | 3 + go.sum | 5 ++ pkg/storage/sqlstorage/accounts.go | 4 ++ pkg/storage/sqlstorage/aggregations.go | 91 +++++++++++++------------- pkg/storage/sqlstorage/store_ledger.go | 9 ++- 6 files changed, 68 insertions(+), 48 deletions(-) diff --git a/Dockerfile b/Dockerfile index c56bffc76..c20edde39 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,7 +12,9 @@ ARG SEGMENT_WRITE_KEY WORKDIR /go/src/github.com/numary/ledger # get deps first so it's cached COPY . . -RUN CGO_ENABLED=1 GOOS=linux GOARCH=$TARGETARCH \ +RUN --mount=type=cache,id=gobuild,target=/root/.cache/go-build \ + --mount=type=cache,id=gomodcache,target=/go/pkg/mod \ + CGO_ENABLED=1 GOOS=linux GOARCH=$TARGETARCH \ CC=$TARGETARCH-linux-gnu-gcc \ go build -o numary -tags json1,netgo \ -ldflags="-X github.com/numary/ledger/cmd.Version=${VERSION} \ diff --git a/go.mod b/go.mod index cc808034b..f6dd584c2 100755 --- a/go.mod +++ b/go.mod @@ -51,6 +51,7 @@ require ( github.com/ThreeDotsLabs/watermill-kafka/v2 v2.2.2 // indirect github.com/ThreeDotsLabs/watermill-nats/v2 v2.0.0 // indirect github.com/ajg/form v1.5.1 // indirect + github.com/bits-and-blooms/bloom v2.0.3+incompatible // indirect github.com/bytedance/sonic v1.8.1 // indirect github.com/cenkalti/backoff/v4 v4.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect @@ -122,6 +123,7 @@ require ( github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/segmentio/backo-go v1.0.1 // indirect github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb // indirect + github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 // indirect github.com/spf13/afero v1.9.3 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect @@ -131,6 +133,7 @@ require ( github.com/ugorji/go/codec v1.2.10 // indirect github.com/uptrace/opentelemetry-go-extra/otellogrus v0.1.21 // indirect github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.21 // indirect + github.com/willf/bitset v1.1.11 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect diff --git a/go.sum b/go.sum index cac5e077c..843740ebf 100644 --- a/go.sum +++ b/go.sum @@ -72,6 +72,8 @@ github.com/antlr/antlr4/runtime/Go/antlr v1.4.10/go.mod h1:F7bn7fEU90QkQ3tnmaTx3 github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/bits-and-blooms/bloom v2.0.3+incompatible h1:3ONZFjJoMyfHDil5iCcNkcPJ//PNNo+55RHvPrfUGnY= +github.com/bits-and-blooms/bloom v2.0.3+incompatible/go.mod h1:nEmPH2pqJb3sCXfd7cyDSKC4iPfCAt312JHgNrtnnDE= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bool64/shared v0.1.5 h1:fp3eUhBsrSjNCQPcSdQqZxxh9bBwrYiZ+zOKFkM0/2E= github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= @@ -474,6 +476,7 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.9.3 h1:41FoI0fD7OR7mGcKE/aOiLkGreyf8ifIOQmJANWogMk= github.com/spf13/afero v1.9.3/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y= @@ -520,6 +523,8 @@ github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.21/go.mod h1:2MNqrUmDrt5 github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= +github.com/willf/bitset v1.1.11 h1:N7Z7E9UvjW+sGsEl7k/SJrvY2reP1A07MrGuCjIOjRE= +github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= diff --git a/pkg/storage/sqlstorage/accounts.go b/pkg/storage/sqlstorage/accounts.go index 212ecba84..68713e4a7 100644 --- a/pkg/storage/sqlstorage/accounts.go +++ b/pkg/storage/sqlstorage/accounts.go @@ -265,6 +265,8 @@ func (s *Store) ensureAccountExists(ctx context.Context, account string) error { return err } + s.bloom.Add([]byte(account)) + _, err = executor.ExecContext(ctx, sqlq, args...) return s.error(err) } @@ -311,6 +313,8 @@ func (s *Store) UpdateAccountMetadata(ctx context.Context, address string, metad return errors.Wrap(err, "reading last log") } + s.bloom.Add([]byte(address)) + return s.appendLog(ctx, core.NewSetMetadataLog(lastLog, at, core.SetMetadata{ TargetType: core.MetaTargetTypeAccount, TargetID: address, diff --git a/pkg/storage/sqlstorage/aggregations.go b/pkg/storage/sqlstorage/aggregations.go index 424c3a27f..3493b8b71 100644 --- a/pkg/storage/sqlstorage/aggregations.go +++ b/pkg/storage/sqlstorage/aggregations.go @@ -11,25 +11,6 @@ import ( ) func (s *Store) GetAccountWithVolumes(ctx context.Context, account string) (*core.AccountWithVolumes, error) { - sb := sqlbuilder.NewSelectBuilder() - sb.Select("accounts.metadata", "volumes.asset", "volumes.input", "volumes.output") - sb.From(s.schema.Table("accounts")) - sb.JoinWithOption(sqlbuilder.LeftOuterJoin, - s.schema.Table("volumes"), - "accounts.address = volumes.account") - sb.Where(sb.E("accounts.address", account)) - - executor, err := s.executorProvider(ctx) - if err != nil { - return nil, err - } - - q, args := sb.BuildWithFlavor(s.schema.Flavor()) - rows, err := executor.QueryContext(ctx, q, args...) - if err != nil { - return nil, s.error(err) - } - defer rows.Close() acc := core.Account{ Address: core.AccountAddress(account), @@ -37,43 +18,65 @@ func (s *Store) GetAccountWithVolumes(ctx context.Context, account string) (*cor } assetsVolumes := core.AssetsVolumes{} - for rows.Next() { - var asset, inputStr, outputStr sql.NullString - if err := rows.Scan(&acc.Metadata, &asset, &inputStr, &outputStr); err != nil { + if s.bloom.Test([]byte(account)) { + sb := sqlbuilder.NewSelectBuilder() + sb.Select("accounts.metadata", "volumes.asset", "volumes.input", "volumes.output") + sb.From(s.schema.Table("accounts")) + sb.JoinWithOption(sqlbuilder.LeftOuterJoin, + s.schema.Table("volumes"), + "accounts.address = volumes.account") + sb.Where(sb.E("accounts.address", account)) + + executor, err := s.executorProvider(ctx) + if err != nil { + return nil, err + } + + q, args := sb.BuildWithFlavor(s.schema.Flavor()) + rows, err := executor.QueryContext(ctx, q, args...) + if err != nil { return nil, s.error(err) } + defer rows.Close() - if asset.Valid { - assetsVolumes[asset.String] = core.Volumes{ - Input: core.NewMonetaryInt(0), - Output: core.NewMonetaryInt(0), + for rows.Next() { + var asset, inputStr, outputStr sql.NullString + if err := rows.Scan(&acc.Metadata, &asset, &inputStr, &outputStr); err != nil { + return nil, s.error(err) } - if inputStr.Valid { - input, err := core.ParseMonetaryInt(inputStr.String) - if err != nil { - return nil, s.error(err) - } + if asset.Valid { assetsVolumes[asset.String] = core.Volumes{ - Input: input, - Output: assetsVolumes[asset.String].Output, + Input: core.NewMonetaryInt(0), + Output: core.NewMonetaryInt(0), } - } - if outputStr.Valid { - output, err := core.ParseMonetaryInt(outputStr.String) - if err != nil { - return nil, s.error(err) + if inputStr.Valid { + input, err := core.ParseMonetaryInt(inputStr.String) + if err != nil { + return nil, s.error(err) + } + assetsVolumes[asset.String] = core.Volumes{ + Input: input, + Output: assetsVolumes[asset.String].Output, + } } - assetsVolumes[asset.String] = core.Volumes{ - Input: assetsVolumes[asset.String].Input, - Output: output, + + if outputStr.Valid { + output, err := core.ParseMonetaryInt(outputStr.String) + if err != nil { + return nil, s.error(err) + } + assetsVolumes[asset.String] = core.Volumes{ + Input: assetsVolumes[asset.String].Input, + Output: output, + } } } } - } - if err := rows.Err(); err != nil { - return nil, s.error(err) + if err := rows.Err(); err != nil { + return nil, s.error(err) + } } res := &core.AccountWithVolumes{ diff --git a/pkg/storage/sqlstorage/store_ledger.go b/pkg/storage/sqlstorage/store_ledger.go index 8b559e296..998fc3347 100644 --- a/pkg/storage/sqlstorage/store_ledger.go +++ b/pkg/storage/sqlstorage/store_ledger.go @@ -3,6 +3,7 @@ package sqlstorage import ( "context" + "github.com/bits-and-blooms/bloom" "github.com/formancehq/stack/libs/go-libs/logging" _ "github.com/jackc/pgx/v5/stdlib" "github.com/numary/ledger/pkg/core" @@ -18,9 +19,10 @@ type Store struct { executorProvider func(ctx context.Context) (executor, error) schema Schema onClose func(ctx context.Context) error - onDelete func(ctx context.Context) error - lastLog *core.Log - lastTx *core.ExpandedTransaction + onDelete func(ctx context.Context) error + lastLog *core.Log + lastTx *core.ExpandedTransaction + bloom *bloom.BloomFilter } func (s *Store) error(err error) error { @@ -67,6 +69,7 @@ func NewStore(schema Schema, executorProvider func(ctx context.Context) (executo schema: schema, onClose: onClose, onDelete: onDelete, + bloom: bloom.NewWithEstimates(1000000, 0.01), // TODO: Configure } } From 9f1684d7cf3157970aa0e2cfb55495bc1331ccb2 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Mon, 14 Aug 2023 16:24:15 +0200 Subject: [PATCH 4/8] feat: cache volumes --- go.mod | 1 + go.sum | 2 ++ pkg/core/account.go | 13 +++++++++++++ pkg/ledger/ledger_test.go | 2 +- pkg/storage/sqlstorage/accounts.go | 7 +++++++ pkg/storage/sqlstorage/aggregations.go | 15 +++++++++++---- pkg/storage/sqlstorage/store_ledger.go | 5 +++++ pkg/storage/sqlstorage/volumes.go | 18 ++++++++++++++---- 8 files changed, 54 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index f6dd584c2..6c4c4fcdc 100755 --- a/go.mod +++ b/go.mod @@ -117,6 +117,7 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect github.com/opencontainers/runc v1.1.3 // indirect + github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/pelletier/go-toml/v2 v2.0.6 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index 843740ebf..99e98d414 100644 --- a/go.sum +++ b/go.sum @@ -429,6 +429,8 @@ github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417/go.m github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI= github.com/ory/dockertest/v3 v3.9.1 h1:v4dkG+dlu76goxMiTT2j8zV7s4oPPEppKT8K8p2f1kY= github.com/ory/dockertest/v3 v3.9.1/go.mod h1:42Ir9hmvaAPm0Mgibk6mBPi7SFvTXxEcnztDYOJ//uM= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw= diff --git a/pkg/core/account.go b/pkg/core/account.go index 4d026744d..ed90c19bb 100644 --- a/pkg/core/account.go +++ b/pkg/core/account.go @@ -1,6 +1,7 @@ package core import ( + "encoding/json" "fmt" "regexp" ) @@ -20,6 +21,18 @@ type AccountWithVolumes struct { Balances AssetsBalances `json:"balances" example:"COIN:100"` } +func (v AccountWithVolumes) Copy() *AccountWithVolumes { + data, err := json.Marshal(v) + if err != nil { + panic(err) + } + ret := &AccountWithVolumes{} + if err := json.Unmarshal(data, ret); err != nil { + panic(err) + } + return ret +} + const accountPattern = "^[a-zA-Z_]+[a-zA-Z0-9_:]*$" var accountRegexp = regexp.MustCompile(accountPattern) diff --git a/pkg/ledger/ledger_test.go b/pkg/ledger/ledger_test.go index 285a4c3f4..bb78ec83c 100644 --- a/pkg/ledger/ledger_test.go +++ b/pkg/ledger/ledger_test.go @@ -640,7 +640,7 @@ func TestRevertTransaction(t *testing.T) { newBal := world.Balances["COIN"] expectedBal := originalBal.Add(revertAmt) - require.Equalf(t, expectedBal, newBal, + require.Equalf(t, expectedBal.Uint64(), newBal.Uint64(), "COIN world balances expected %d, got %d", expectedBal, newBal) }) } diff --git a/pkg/storage/sqlstorage/accounts.go b/pkg/storage/sqlstorage/accounts.go index 68713e4a7..09a5837ac 100644 --- a/pkg/storage/sqlstorage/accounts.go +++ b/pkg/storage/sqlstorage/accounts.go @@ -272,6 +272,13 @@ func (s *Store) ensureAccountExists(ctx context.Context, account string) error { } func (s *Store) UpdateAccountMetadata(ctx context.Context, address string, metadata core.Metadata, at time.Time) error { + + entry, ok := s.cache.Get(address) + if ok { + account := entry.(*core.AccountWithVolumes) + account.Metadata = account.Metadata.Merge(metadata) + } + ib := sqlbuilder.NewInsertBuilder() metadataData, err := json.Marshal(metadata) diff --git a/pkg/storage/sqlstorage/aggregations.go b/pkg/storage/sqlstorage/aggregations.go index 3493b8b71..ff2af30eb 100644 --- a/pkg/storage/sqlstorage/aggregations.go +++ b/pkg/storage/sqlstorage/aggregations.go @@ -8,24 +8,29 @@ import ( "github.com/huandu/go-sqlbuilder" "github.com/numary/ledger/pkg/core" "github.com/numary/ledger/pkg/ledger" + "github.com/patrickmn/go-cache" ) -func (s *Store) GetAccountWithVolumes(ctx context.Context, account string) (*core.AccountWithVolumes, error) { +func (s *Store) GetAccountWithVolumes(ctx context.Context, address string) (*core.AccountWithVolumes, error) { + account, ok := s.cache.Get(address) + if ok { + return account.(*core.AccountWithVolumes).Copy(), nil + } acc := core.Account{ - Address: core.AccountAddress(account), + Address: core.AccountAddress(address), Metadata: core.Metadata{}, } assetsVolumes := core.AssetsVolumes{} - if s.bloom.Test([]byte(account)) { + if s.bloom.Test([]byte(address)) { sb := sqlbuilder.NewSelectBuilder() sb.Select("accounts.metadata", "volumes.asset", "volumes.input", "volumes.output") sb.From(s.schema.Table("accounts")) sb.JoinWithOption(sqlbuilder.LeftOuterJoin, s.schema.Table("volumes"), "accounts.address = volumes.account") - sb.Where(sb.E("accounts.address", account)) + sb.Where(sb.E("accounts.address", address)) executor, err := s.executorProvider(ctx) if err != nil { @@ -85,6 +90,8 @@ func (s *Store) GetAccountWithVolumes(ctx context.Context, account string) (*cor } res.Balances = res.Volumes.Balances() + s.cache.Set(address, res.Copy(), cache.NoExpiration) + return res, nil } diff --git a/pkg/storage/sqlstorage/store_ledger.go b/pkg/storage/sqlstorage/store_ledger.go index 998fc3347..4df4127d6 100644 --- a/pkg/storage/sqlstorage/store_ledger.go +++ b/pkg/storage/sqlstorage/store_ledger.go @@ -2,12 +2,14 @@ package sqlstorage import ( "context" + "time" "github.com/bits-and-blooms/bloom" "github.com/formancehq/stack/libs/go-libs/logging" _ "github.com/jackc/pgx/v5/stdlib" "github.com/numary/ledger/pkg/core" "github.com/numary/ledger/pkg/ledger" + "github.com/patrickmn/go-cache" "github.com/pkg/errors" ) @@ -23,6 +25,7 @@ type Store struct { lastLog *core.Log lastTx *core.ExpandedTransaction bloom *bloom.BloomFilter + cache *cache.Cache } func (s *Store) error(err error) error { @@ -64,12 +67,14 @@ func (s *Store) Close(ctx context.Context) error { func NewStore(schema Schema, executorProvider func(ctx context.Context) (executor, error), onClose, onDelete func(ctx context.Context) error) *Store { + return &Store{ executorProvider: executorProvider, schema: schema, onClose: onClose, onDelete: onDelete, bloom: bloom.NewWithEstimates(1000000, 0.01), // TODO: Configure + cache: cache.New(5*time.Minute, 10*time.Minute), } } diff --git a/pkg/storage/sqlstorage/volumes.go b/pkg/storage/sqlstorage/volumes.go index b93750e7e..30cb83266 100644 --- a/pkg/storage/sqlstorage/volumes.go +++ b/pkg/storage/sqlstorage/volumes.go @@ -10,8 +10,18 @@ import ( ) func (s *Store) updateVolumes(ctx context.Context, volumes core.AccountsAssetsVolumes) error { - for account, accountVolumes := range volumes { - accountBy, err := json.Marshal(strings.Split(account, ":")) + for address, accountVolumes := range volumes { + + entry, ok := s.cache.Get(address) + if ok { + account := entry.(*core.AccountWithVolumes) + for asset, volumes := range accountVolumes { + account.Volumes[asset] = volumes + account.Balances[asset] = volumes.Balance() + } + } + + accountBy, err := json.Marshal(strings.Split(address, ":")) if err != nil { panic(err) } @@ -25,11 +35,11 @@ func (s *Store) updateVolumes(ctx context.Context, volumes core.AccountsAssetsVo switch s.schema.Flavor() { case sqlbuilder.PostgreSQL: ib = ib.Cols("account", "asset", "input", "output", "account_json"). - Values(account, asset, volumes.Input.String(), volumes.Output.String(), accountBy). + Values(address, asset, volumes.Input.String(), volumes.Output.String(), accountBy). SQL("ON CONFLICT (account, asset) DO UPDATE SET input = " + inputArg + ", output = " + outputArg) case sqlbuilder.SQLite: ib = ib.Cols("account", "asset", "input", "output"). - Values(account, asset, volumes.Input.String(), volumes.Output.String()). + Values(address, asset, volumes.Input.String(), volumes.Output.String()). SQL("ON CONFLICT (account, asset) DO UPDATE SET input = " + inputArg + ", output = " + outputArg) } From 34f16e8da22107f6f02815e1391df9807cfb7059 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Mon, 14 Aug 2023 16:57:37 +0200 Subject: [PATCH 5/8] feat: comment all useless actions --- .github/workflows/main.yml | 176 ++++++++++++++++++------------------- 1 file changed, 88 insertions(+), 88 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index bdf1528a4..413c22781 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -9,101 +9,101 @@ on: name: Main jobs: - Pr: - if: github.event_name == 'pull_request' - uses: formancehq/gh-workflows/.github/workflows/pr-style.yml@main - - Test_postgres: - name: 'Test with PostgreSQL' - runs-on: ubuntu-latest - steps: - - name: Install task - uses: arduino/setup-task@v1 - with: - repo-token: ${{ secrets.GITHUB_TOKEN }} - - uses: actions/checkout@v3 - - uses: actions/setup-go@v3 - with: - go-version-file: 'go.mod' - cache: true - - name: Run tests - run: task tests - - name: Upload coverage to Codecov - uses: codecov/codecov-action@v3 - with: - fail_ci_if_error: false # optional (default = false) - verbose: true # optional (default = false) - - Test_sqlite: - uses: formancehq/gh-workflows/.github/workflows/golang-test.yml@main - +# Pr: +# if: github.event_name == 'pull_request' +# uses: formancehq/gh-workflows/.github/workflows/pr-style.yml@main +# +# Test_postgres: +# name: 'Test with PostgreSQL' +# runs-on: ubuntu-latest +# steps: +# - name: Install task +# uses: arduino/setup-task@v1 +# with: +# repo-token: ${{ secrets.GITHUB_TOKEN }} +# - uses: actions/checkout@v3 +# - uses: actions/setup-go@v3 +# with: +# go-version-file: 'go.mod' +# cache: true +# - name: Run tests +# run: task tests +# - name: Upload coverage to Codecov +# uses: codecov/codecov-action@v3 +# with: +# fail_ci_if_error: false # optional (default = false) +# verbose: true # optional (default = false) +# +# Test_sqlite: +# uses: formancehq/gh-workflows/.github/workflows/golang-test.yml@main +# Control: name: 'Control' uses: ./.github/workflows/template_build-control.yaml secrets: NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} - - GoReleaserBuild: - needs: - - Control - - Test_sqlite - - Test_postgres - if: github.event_name != 'release' - name: 'GoReleaser Build' - uses: ./.github/workflows/template_goreleaser-build.yaml - secrets: - NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} - SEGMENT_WRITE_KEY_OSS: ${{ secrets.SEGMENT_WRITE_KEY_OSS }} - - GoReleaserRelease: - needs: - - Control - - Test_sqlite - - Test_postgres - if: github.event_name == 'release' - name: 'GoReleaser Release' - uses: ./.github/workflows/template_goreleaser-release.yaml - secrets: - NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} - SEGMENT_WRITE_KEY_OSS: ${{ secrets.SEGMENT_WRITE_KEY_OSS }} - FURY_TOKEN: ${{ secrets.FURY_TOKEN }} - - SdkGenerate: - needs: - - GoReleaserBuild - uses: ./.github/workflows/template_sdk-generate.yaml - secrets: - NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} - - SdkPublish: - needs: - - GoReleaserRelease - uses: ./.github/workflows/template_sdk-publish.yaml - with: - VERSION: ${{ github.event.release.tag_name }} - secrets: - NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} - - DockerRelease: - needs: - - Control - - Test_sqlite - - Test_postgres - if: github.event_name == 'release' - uses: ./.github/workflows/template_docker.yaml - with: - VERSION: ${{ github.event.release.tag_name }} - APP_SHA: ${{ github.sha }} - RELEASE: ${{ github.event.action }} - secrets: - NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} - SEGMENT_WRITE_KEY_OSS: ${{ secrets.SEGMENT_WRITE_KEY_OSS }} +# +# GoReleaserBuild: +# needs: +# - Control +# - Test_sqlite +# - Test_postgres +# if: github.event_name != 'release' +# name: 'GoReleaser Build' +# uses: ./.github/workflows/template_goreleaser-build.yaml +# secrets: +# NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} +# SEGMENT_WRITE_KEY_OSS: ${{ secrets.SEGMENT_WRITE_KEY_OSS }} +# +# GoReleaserRelease: +# needs: +# - Control +# - Test_sqlite +# - Test_postgres +# if: github.event_name == 'release' +# name: 'GoReleaser Release' +# uses: ./.github/workflows/template_goreleaser-release.yaml +# secrets: +# NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} +# SEGMENT_WRITE_KEY_OSS: ${{ secrets.SEGMENT_WRITE_KEY_OSS }} +# FURY_TOKEN: ${{ secrets.FURY_TOKEN }} +# +# SdkGenerate: +# needs: +# - GoReleaserBuild +# uses: ./.github/workflows/template_sdk-generate.yaml +# secrets: +# NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} +# +# SdkPublish: +# needs: +# - GoReleaserRelease +# uses: ./.github/workflows/template_sdk-publish.yaml +# with: +# VERSION: ${{ github.event.release.tag_name }} +# secrets: +# NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} +# +# DockerRelease: +# needs: +# - Control +# - Test_sqlite +# - Test_postgres +# if: github.event_name == 'release' +# uses: ./.github/workflows/template_docker.yaml +# with: +# VERSION: ${{ github.event.release.tag_name }} +# APP_SHA: ${{ github.sha }} +# RELEASE: ${{ github.event.action }} +# secrets: +# NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} +# SEGMENT_WRITE_KEY_OSS: ${{ secrets.SEGMENT_WRITE_KEY_OSS }} DockerBranch: - needs: - - Control - - Test_sqlite - - Test_postgres +# needs: +# - Control +# - Test_sqlite +# - Test_postgres if: github.event_name != 'release' uses: ./.github/workflows/template_docker.yaml with: From a29d4489a8b632c94fc1234a663ad4419ba29caf Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 16 Aug 2023 09:05:04 +0200 Subject: [PATCH 6/8] feat: use cache --- pkg/core/account.go | 12 ++++++++++++ pkg/storage/sqlstorage/accounts.go | 6 ++++++ 2 files changed, 18 insertions(+) diff --git a/pkg/core/account.go b/pkg/core/account.go index ed90c19bb..95bad43af 100644 --- a/pkg/core/account.go +++ b/pkg/core/account.go @@ -15,6 +15,18 @@ type Account struct { Metadata Metadata `json:"metadata" swaggertype:"object"` } +func (v Account) Copy() *Account { + data, err := json.Marshal(v) + if err != nil { + panic(err) + } + ret := &Account{} + if err := json.Unmarshal(data, ret); err != nil { + panic(err) + } + return ret +} + type AccountWithVolumes struct { Account Volumes AssetsVolumes `json:"volumes"` diff --git a/pkg/storage/sqlstorage/accounts.go b/pkg/storage/sqlstorage/accounts.go index 09a5837ac..e333695d3 100644 --- a/pkg/storage/sqlstorage/accounts.go +++ b/pkg/storage/sqlstorage/accounts.go @@ -200,6 +200,12 @@ func (s *Store) GetAccounts(ctx context.Context, q ledger.AccountsQuery) (api.Cu } func (s *Store) GetAccount(ctx context.Context, addr string) (*core.Account, error) { + + entry, ok := s.cache.Get(addr) + if ok { + return entry.(*core.AccountWithVolumes).Account.Copy(), nil + } + sb := sqlbuilder.NewSelectBuilder() sb.Select("address", "metadata"). From(s.schema.Table("accounts")). From 2a9a44c108b46f157bcaa365cd3e3042aca505e9 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 16 Aug 2023 09:24:40 +0200 Subject: [PATCH 7/8] feat: reenable CI --- .github/workflows/main.yml | 176 ++++++++++++++++++------------------- 1 file changed, 88 insertions(+), 88 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 413c22781..bdf1528a4 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -9,101 +9,101 @@ on: name: Main jobs: -# Pr: -# if: github.event_name == 'pull_request' -# uses: formancehq/gh-workflows/.github/workflows/pr-style.yml@main -# -# Test_postgres: -# name: 'Test with PostgreSQL' -# runs-on: ubuntu-latest -# steps: -# - name: Install task -# uses: arduino/setup-task@v1 -# with: -# repo-token: ${{ secrets.GITHUB_TOKEN }} -# - uses: actions/checkout@v3 -# - uses: actions/setup-go@v3 -# with: -# go-version-file: 'go.mod' -# cache: true -# - name: Run tests -# run: task tests -# - name: Upload coverage to Codecov -# uses: codecov/codecov-action@v3 -# with: -# fail_ci_if_error: false # optional (default = false) -# verbose: true # optional (default = false) -# -# Test_sqlite: -# uses: formancehq/gh-workflows/.github/workflows/golang-test.yml@main -# + Pr: + if: github.event_name == 'pull_request' + uses: formancehq/gh-workflows/.github/workflows/pr-style.yml@main + + Test_postgres: + name: 'Test with PostgreSQL' + runs-on: ubuntu-latest + steps: + - name: Install task + uses: arduino/setup-task@v1 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + - uses: actions/checkout@v3 + - uses: actions/setup-go@v3 + with: + go-version-file: 'go.mod' + cache: true + - name: Run tests + run: task tests + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v3 + with: + fail_ci_if_error: false # optional (default = false) + verbose: true # optional (default = false) + + Test_sqlite: + uses: formancehq/gh-workflows/.github/workflows/golang-test.yml@main + Control: name: 'Control' uses: ./.github/workflows/template_build-control.yaml secrets: NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} -# -# GoReleaserBuild: -# needs: -# - Control -# - Test_sqlite -# - Test_postgres -# if: github.event_name != 'release' -# name: 'GoReleaser Build' -# uses: ./.github/workflows/template_goreleaser-build.yaml -# secrets: -# NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} -# SEGMENT_WRITE_KEY_OSS: ${{ secrets.SEGMENT_WRITE_KEY_OSS }} -# -# GoReleaserRelease: -# needs: -# - Control -# - Test_sqlite -# - Test_postgres -# if: github.event_name == 'release' -# name: 'GoReleaser Release' -# uses: ./.github/workflows/template_goreleaser-release.yaml -# secrets: -# NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} -# SEGMENT_WRITE_KEY_OSS: ${{ secrets.SEGMENT_WRITE_KEY_OSS }} -# FURY_TOKEN: ${{ secrets.FURY_TOKEN }} -# -# SdkGenerate: -# needs: -# - GoReleaserBuild -# uses: ./.github/workflows/template_sdk-generate.yaml -# secrets: -# NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} -# -# SdkPublish: -# needs: -# - GoReleaserRelease -# uses: ./.github/workflows/template_sdk-publish.yaml -# with: -# VERSION: ${{ github.event.release.tag_name }} -# secrets: -# NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} -# -# DockerRelease: -# needs: -# - Control -# - Test_sqlite -# - Test_postgres -# if: github.event_name == 'release' -# uses: ./.github/workflows/template_docker.yaml -# with: -# VERSION: ${{ github.event.release.tag_name }} -# APP_SHA: ${{ github.sha }} -# RELEASE: ${{ github.event.action }} -# secrets: -# NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} -# SEGMENT_WRITE_KEY_OSS: ${{ secrets.SEGMENT_WRITE_KEY_OSS }} + + GoReleaserBuild: + needs: + - Control + - Test_sqlite + - Test_postgres + if: github.event_name != 'release' + name: 'GoReleaser Build' + uses: ./.github/workflows/template_goreleaser-build.yaml + secrets: + NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} + SEGMENT_WRITE_KEY_OSS: ${{ secrets.SEGMENT_WRITE_KEY_OSS }} + + GoReleaserRelease: + needs: + - Control + - Test_sqlite + - Test_postgres + if: github.event_name == 'release' + name: 'GoReleaser Release' + uses: ./.github/workflows/template_goreleaser-release.yaml + secrets: + NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} + SEGMENT_WRITE_KEY_OSS: ${{ secrets.SEGMENT_WRITE_KEY_OSS }} + FURY_TOKEN: ${{ secrets.FURY_TOKEN }} + + SdkGenerate: + needs: + - GoReleaserBuild + uses: ./.github/workflows/template_sdk-generate.yaml + secrets: + NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} + + SdkPublish: + needs: + - GoReleaserRelease + uses: ./.github/workflows/template_sdk-publish.yaml + with: + VERSION: ${{ github.event.release.tag_name }} + secrets: + NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} + + DockerRelease: + needs: + - Control + - Test_sqlite + - Test_postgres + if: github.event_name == 'release' + uses: ./.github/workflows/template_docker.yaml + with: + VERSION: ${{ github.event.release.tag_name }} + APP_SHA: ${{ github.sha }} + RELEASE: ${{ github.event.action }} + secrets: + NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} + SEGMENT_WRITE_KEY_OSS: ${{ secrets.SEGMENT_WRITE_KEY_OSS }} DockerBranch: -# needs: -# - Control -# - Test_sqlite -# - Test_postgres + needs: + - Control + - Test_sqlite + - Test_postgres if: github.event_name != 'release' uses: ./.github/workflows/template_docker.yaml with: From 2c4ccbf71fbe3e288e484206617831b35ec542df Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 16 Aug 2023 11:18:47 +0200 Subject: [PATCH 8/8] feat: review --- pkg/storage/sqlstorage/commit.go | 19 +++++++++++++++-- pkg/storage/sqlstorage/driver.go | 5 +++++ pkg/storage/sqlstorage/store_ledger.go | 28 +++++++++++++++++++++++++- pkg/storage/sqlstorage/volumes.go | 20 +++++++++++------- pkg/storage/transactional.go | 19 ++++++++++++++++- 5 files changed, 80 insertions(+), 11 deletions(-) diff --git a/pkg/storage/sqlstorage/commit.go b/pkg/storage/sqlstorage/commit.go index 2643a110a..6d7f0d024 100644 --- a/pkg/storage/sqlstorage/commit.go +++ b/pkg/storage/sqlstorage/commit.go @@ -4,6 +4,7 @@ import ( "context" "github.com/numary/ledger/pkg/core" + "github.com/numary/ledger/pkg/storage" "github.com/pkg/errors" ) @@ -44,7 +45,21 @@ func (s *Store) commit(ctx context.Context, txs ...core.ExpandedTransaction) ([] return logs, nil } -func (s *Store) Commit(ctx context.Context, txs ...core.ExpandedTransaction) error { - _, err := s.commit(ctx, txs...) +func (s *Store) Commit(ctx context.Context, txs ...core.ExpandedTransaction) (err error) { + if !storage.IsTransactional(ctx) { + ctx = storage.TransactionalContext(ctx) + defer func() { + if err == nil { + if commitErr := storage.CommitTransaction(ctx); commitErr != nil { + panic(commitErr) + } + } else { + if rollbackErr := storage.RollbackTransaction(ctx); rollbackErr != nil { + panic(rollbackErr) + } + } + }() + } + _, err = s.commit(ctx, txs...) return err } diff --git a/pkg/storage/sqlstorage/driver.go b/pkg/storage/sqlstorage/driver.go index 86c359b8d..f5fe0458c 100644 --- a/pkg/storage/sqlstorage/driver.go +++ b/pkg/storage/sqlstorage/driver.go @@ -216,6 +216,11 @@ func (d *Driver) Initialize(ctx context.Context) (err error) { } func (d *Driver) Close(ctx context.Context) error { + for _, store := range d.registeredLedgers { + if err := store.Close(ctx); err != nil { + return err + } + } err := d.systemSchema.Close(ctx) if err != nil { return err diff --git a/pkg/storage/sqlstorage/store_ledger.go b/pkg/storage/sqlstorage/store_ledger.go index 4df4127d6..92547b048 100644 --- a/pkg/storage/sqlstorage/store_ledger.go +++ b/pkg/storage/sqlstorage/store_ledger.go @@ -2,6 +2,9 @@ package sqlstorage import ( "context" + "fmt" + "os" + "strconv" "time" "github.com/bits-and-blooms/bloom" @@ -68,12 +71,35 @@ func (s *Store) Close(ctx context.Context) error { func NewStore(schema Schema, executorProvider func(ctx context.Context) (executor, error), onClose, onDelete func(ctx context.Context) error) *Store { + const ( + bloomFilterSizeEnvVar = "NUMARY_BLOOM_FILTER_SIZE" + bloomFilterErrorRateEnvVar = "NUMARY_BLOOM_FILTER_ERROR_RATE" + ) + + var ( + bloomSize uint64 = 100000 + bloomErrorRate = 0.01 + err error + ) + if bloomSizeFromEnv := os.Getenv(bloomFilterSizeEnvVar); bloomSizeFromEnv != "" { + bloomSize, err = strconv.ParseUint(bloomSizeFromEnv, 10, 64) + if err != nil { + panic(errors.Wrap(err, fmt.Sprint("Parsing", bloomFilterSizeEnvVar, "env var"))) + } + } + if bloomErrorRateFromEnv := os.Getenv(bloomFilterErrorRateEnvVar); bloomErrorRateFromEnv != "" { + bloomErrorRate, err = strconv.ParseFloat(bloomErrorRateFromEnv, 64) + if err != nil { + panic(errors.Wrap(err, fmt.Sprint("Parsing", bloomFilterErrorRateEnvVar, "env var"))) + } + } + return &Store{ executorProvider: executorProvider, schema: schema, onClose: onClose, onDelete: onDelete, - bloom: bloom.NewWithEstimates(1000000, 0.01), // TODO: Configure + bloom: bloom.NewWithEstimates(uint(bloomSize), bloomErrorRate), cache: cache.New(5*time.Minute, 10*time.Minute), } } diff --git a/pkg/storage/sqlstorage/volumes.go b/pkg/storage/sqlstorage/volumes.go index 30cb83266..5f9302c64 100644 --- a/pkg/storage/sqlstorage/volumes.go +++ b/pkg/storage/sqlstorage/volumes.go @@ -7,19 +7,25 @@ import ( "github.com/huandu/go-sqlbuilder" "github.com/numary/ledger/pkg/core" + "github.com/numary/ledger/pkg/storage" ) func (s *Store) updateVolumes(ctx context.Context, volumes core.AccountsAssetsVolumes) error { - for address, accountVolumes := range volumes { - entry, ok := s.cache.Get(address) - if ok { - account := entry.(*core.AccountWithVolumes) - for asset, volumes := range accountVolumes { - account.Volumes[asset] = volumes - account.Balances[asset] = volumes.Balance() + storage.OnTransactionCommitted(ctx, func() { + for address, accountVolumes := range volumes { + entry, ok := s.cache.Get(address) + if ok { + account := entry.(*core.AccountWithVolumes) + for asset, volumes := range accountVolumes { + account.Volumes[asset] = volumes + account.Balances[asset] = volumes.Balance() + } } } + }) + + for address, accountVolumes := range volumes { accountBy, err := json.Marshal(strings.Split(address, ":")) if err != nil { diff --git a/pkg/storage/transactional.go b/pkg/storage/transactional.go index 289add090..ff36e97da 100644 --- a/pkg/storage/transactional.go +++ b/pkg/storage/transactional.go @@ -10,6 +10,7 @@ type contextHolder struct { transaction any commit func(ctx context.Context) error rollback func(ctx context.Context) error + onCommit []func() } type contextHolderKeyStruct struct{} @@ -77,7 +78,15 @@ func CommitTransaction(ctx context.Context) error { if holder.transaction == nil { return errors.New("transaction not initialized") } - return holder.commit(ctx) + err := holder.commit(ctx) + if err != nil { + return err + } + + for _, onCommit := range holder.onCommit { + onCommit() + } + return nil } func RollbackTransaction(ctx context.Context) error { @@ -90,3 +99,11 @@ func RollbackTransaction(ctx context.Context) error { } return holder.rollback(ctx) } + +func OnTransactionCommitted(ctx context.Context, callback func()) { + holder := getContextHolder(ctx) + if holder == nil { + panic("context holder is nil") + } + holder.onCommit = append(holder.onCommit, callback) +}