Skip to content

Commit

Permalink
feat: cache volumes
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Aug 14, 2023
1 parent 2121ab7 commit 6f15d39
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 9 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opencontainers/runc v1.1.3 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,8 @@ github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417/go.m
github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI=
github.com/ory/dockertest/v3 v3.9.1 h1:v4dkG+dlu76goxMiTT2j8zV7s4oPPEppKT8K8p2f1kY=
github.com/ory/dockertest/v3 v3.9.1/go.mod h1:42Ir9hmvaAPm0Mgibk6mBPi7SFvTXxEcnztDYOJ//uM=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw=
Expand Down
13 changes: 13 additions & 0 deletions pkg/core/account.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core

import (
"encoding/json"
"fmt"
"regexp"
)
Expand All @@ -20,6 +21,18 @@ type AccountWithVolumes struct {
Balances AssetsBalances `json:"balances" example:"COIN:100"`
}

func (v AccountWithVolumes) Copy() *AccountWithVolumes {
data, err := json.Marshal(v)
if err != nil {
panic(err)
}
ret := &AccountWithVolumes{}
if err := json.Unmarshal(data, ret); err != nil {
panic(err)
}
return ret
}

const accountPattern = "^[a-zA-Z_]+[a-zA-Z0-9_:]*$"

var accountRegexp = regexp.MustCompile(accountPattern)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ledger/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ func TestRevertTransaction(t *testing.T) {

newBal := world.Balances["COIN"]
expectedBal := originalBal.Add(revertAmt)
require.Equalf(t, expectedBal, newBal,
require.Equalf(t, expectedBal.Uint64(), newBal.Uint64(),
"COIN world balances expected %d, got %d", expectedBal, newBal)
})
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/sqlstorage/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,13 @@ func (s *Store) ensureAccountExists(ctx context.Context, account string) error {
}

func (s *Store) UpdateAccountMetadata(ctx context.Context, address string, metadata core.Metadata, at time.Time) error {

entry, ok := s.cache.Get(address)
if ok {
account := entry.(*core.AccountWithVolumes)
account.Metadata = account.Metadata.Merge(metadata)
}

ib := sqlbuilder.NewInsertBuilder()

metadataData, err := json.Marshal(metadata)
Expand Down
15 changes: 11 additions & 4 deletions pkg/storage/sqlstorage/aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,29 @@ import (
"github.com/huandu/go-sqlbuilder"
"github.com/numary/ledger/pkg/core"
"github.com/numary/ledger/pkg/ledger"
"github.com/patrickmn/go-cache"
)

func (s *Store) GetAccountWithVolumes(ctx context.Context, account string) (*core.AccountWithVolumes, error) {
func (s *Store) GetAccountWithVolumes(ctx context.Context, address string) (*core.AccountWithVolumes, error) {
account, ok := s.cache.Get(address)
if ok {
return account.(*core.AccountWithVolumes).Copy(), nil
}

acc := core.Account{
Address: core.AccountAddress(account),
Address: core.AccountAddress(address),
Metadata: core.Metadata{},
}
assetsVolumes := core.AssetsVolumes{}

if s.bloom.Test([]byte(account)) {
if s.bloom.Test([]byte(address)) {
sb := sqlbuilder.NewSelectBuilder()
sb.Select("accounts.metadata", "volumes.asset", "volumes.input", "volumes.output")
sb.From(s.schema.Table("accounts"))
sb.JoinWithOption(sqlbuilder.LeftOuterJoin,
s.schema.Table("volumes"),
"accounts.address = volumes.account")
sb.Where(sb.E("accounts.address", account))
sb.Where(sb.E("accounts.address", address))

executor, err := s.executorProvider(ctx)
if err != nil {
Expand Down Expand Up @@ -85,6 +90,8 @@ func (s *Store) GetAccountWithVolumes(ctx context.Context, account string) (*cor
}
res.Balances = res.Volumes.Balances()

s.cache.Set(address, res.Copy(), cache.NoExpiration)

return res, nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/sqlstorage/store_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package sqlstorage

import (
"context"
"time"

"github.com/bits-and-blooms/bloom"
"github.com/formancehq/stack/libs/go-libs/logging"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/numary/ledger/pkg/core"
"github.com/numary/ledger/pkg/ledger"
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
)

Expand All @@ -23,6 +25,7 @@ type Store struct {
lastLog *core.Log
lastTx *core.ExpandedTransaction
bloom *bloom.BloomFilter
cache *cache.Cache
}

func (s *Store) error(err error) error {
Expand Down Expand Up @@ -64,12 +67,14 @@ func (s *Store) Close(ctx context.Context) error {

func NewStore(schema Schema, executorProvider func(ctx context.Context) (executor, error),
onClose, onDelete func(ctx context.Context) error) *Store {

return &Store{
executorProvider: executorProvider,
schema: schema,
onClose: onClose,
onDelete: onDelete,
bloom: bloom.NewWithEstimates(1000000, 0.01), // TODO: Configure
cache: cache.New(5*time.Minute, 10*time.Minute),
}
}

Expand Down
18 changes: 14 additions & 4 deletions pkg/storage/sqlstorage/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,18 @@ import (
)

func (s *Store) updateVolumes(ctx context.Context, volumes core.AccountsAssetsVolumes) error {
for account, accountVolumes := range volumes {
accountBy, err := json.Marshal(strings.Split(account, ":"))
for address, accountVolumes := range volumes {

entry, ok := s.cache.Get(address)
if ok {
account := entry.(*core.AccountWithVolumes)
for asset, volumes := range accountVolumes {
account.Volumes[asset] = volumes
account.Balances[asset] = volumes.Balance()
}
}

accountBy, err := json.Marshal(strings.Split(address, ":"))
if err != nil {
panic(err)
}
Expand All @@ -25,11 +35,11 @@ func (s *Store) updateVolumes(ctx context.Context, volumes core.AccountsAssetsVo
switch s.schema.Flavor() {
case sqlbuilder.PostgreSQL:
ib = ib.Cols("account", "asset", "input", "output", "account_json").
Values(account, asset, volumes.Input.String(), volumes.Output.String(), accountBy).
Values(address, asset, volumes.Input.String(), volumes.Output.String(), accountBy).
SQL("ON CONFLICT (account, asset) DO UPDATE SET input = " + inputArg + ", output = " + outputArg)
case sqlbuilder.SQLite:
ib = ib.Cols("account", "asset", "input", "output").
Values(account, asset, volumes.Input.String(), volumes.Output.String()).
Values(address, asset, volumes.Input.String(), volumes.Output.String()).
SQL("ON CONFLICT (account, asset) DO UPDATE SET input = " + inputArg + ", output = " + outputArg)
}

Expand Down

0 comments on commit 6f15d39

Please sign in to comment.