Skip to content

Commit

Permalink
feat: write cache in batches (#13)
Browse files Browse the repository at this point in the history
* feat: write cache in batches

* refactor: property name

* infra: increase memory

* feat: add batch as a service

* feat: implement cache batch in current global services

* fix: wait for remaining sessions to be written

* fix: dispatch requests made syncronously

* refactor: batch functionality

* chore: change variable names

* docs: add comments
  • Loading branch information
rem1niscence authored Apr 12, 2022
1 parent 57380fa commit 2ca5d21
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 113 deletions.
31 changes: 25 additions & 6 deletions cmd/functions/dispatch-globally/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Pocket/global-dispatcher/common/apigateway"
"github.com/Pocket/global-dispatcher/common/environment"
Expand Down Expand Up @@ -40,6 +42,7 @@ var (
maxDispatchersErrorsAllowed = environment.GetInt64("MAX_DISPATCHER_ERRORS_ALLOWED", 2000)
dispatchGigastake = environment.GetBool("DISPATCH_GIGASTAKE", false)
maxClientsCacheCheck = environment.GetInt64("MAX_CLIENTS_CACHE_CHECK", 3)
cacheBatchSize = environment.GetInt64("CACHE_BATCH_SIZE", 100)
)

// LambdaHandler manages the DispatchSession call to return as an APIGatewayProxyResponse
Expand Down Expand Up @@ -98,6 +101,15 @@ func DispatchSessions(ctx context.Context, requestID string) (uint32, error) {
return 0, errors.New("error obtaining staked apps on db: " + err.Error())
}

var cacheWg sync.WaitGroup
cacheWg.Add(1)
cacheBatch := cache.BatchWriter(ctx, &cache.BatchWriterOptions{
Caches: caches,
BatchSize: int(cacheBatchSize),
WaitGroup: &cacheWg,
RequestID: requestID,
})

var failedDispatcherCalls uint32
var sem = semaphore.NewWeighted(dispatchConcurrency)
var wg sync.WaitGroup
Expand Down Expand Up @@ -131,25 +143,32 @@ func DispatchSessions(ctx context.Context, requestID string) (uint32, error) {
}

session := pocket.NewSessionCamelCase(dispatch.Session)

// Embedding current block height within session so can be checked for cache
session.BlockHeight = dispatch.BlockHeight

err = cache.WriteJSONToCaches(ctx, caches, cacheKey, session, uint(cacheTTL))
marshalledSession, err := json.Marshal(session)
if err != nil {
atomic.AddUint32(&failedDispatcherCalls, 1)
logger.Log.WithFields(log.Fields{
"appPublicKey": publicKey,
"chain": ch,
"error": err.Error(),
"requestID": requestID,
}).Error("error writing to cache: " + err.Error())
"blockchainID": ch,
"sessionKey": session.Key,
}).Errorf("sync check: error marshalling nodes: %s", err.Error())
return
}

cacheBatch <- &cache.Item{
Key: cacheKey,
Value: marshalledSession,
TTL: time.Duration(cacheTTL) * time.Second,
}
}(app.PublicKey, chain)
}
}

wg.Wait()
// Wait for the remaining items in the batch if any
cacheWg.Wait()

if failedDispatcherCalls > uint32(maxDispatchersErrorsAllowed) {
return failedDispatcherCalls, ErrMaxDispatchErrorsExceeded
Expand Down
197 changes: 116 additions & 81 deletions cmd/functions/run-application-checks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -52,6 +53,12 @@ var (
minMetricsPoolSize = environment.GetInt64("MIN_METRICS_POOL_SIZE", 5)
maxMetricsPoolSize = environment.GetInt64("MAX_METRICS_POOL_SIZE", 20)
defaultTimeOut = environment.GetInt64("DEFAULT_TIMEOUT", 8)
cacheBatchSize = environment.GetInt64("CACHE_BATCH_SIZE", 50)

caches []*cache.Redis
metricsRecorder *metrics.Recorder
db *database.Mongo
rpcProvider *provider.JSONRPCProvider
)

type applicationChecks struct {
Expand All @@ -65,6 +72,7 @@ type applicationChecks struct {
RequestID string
SyncChecker *pocket.SyncChecker
ChainChecker *pocket.ChainChecker
CacheBatch chan *cache.Item
}

func lambdaHandler(ctx context.Context) (events.APIGatewayProxyResponse, error) {
Expand All @@ -87,23 +95,24 @@ func runApplicationChecks(ctx context.Context, requestID string) error {
if len(redisConnectionStrings) <= 0 {
return errNoCacheClientProvided
}
var err error

db, err := database.ClientFromURI(ctx, mongoConnectionString, mongoDatabase)
db, err = database.ClientFromURI(ctx, mongoConnectionString, mongoDatabase)
if err != nil {
return errors.New("error connecting to mongo: " + err.Error())
}

metricsRecorder, err := metrics.NewMetricsRecorder(ctx, metricsConnection, int(minMetricsPoolSize), int(maxMetricsPoolSize))
metricsRecorder, err = metrics.NewMetricsRecorder(ctx, metricsConnection, int(minMetricsPoolSize), int(maxMetricsPoolSize))
if err != nil {
return errors.New("error connecting to metrics db: " + err.Error())
}

caches, err := cache.ConnectoCacheClients(ctx, redisConnectionStrings, "", isRedisCluster)
caches, err = cache.ConnectoCacheClients(ctx, redisConnectionStrings, "", isRedisCluster)
if err != nil {
return errors.New("error connecting to redis: " + err.Error())
}

rpcProvider := provider.NewJSONRPCProvider(rpcURL, dispatchURLs)
rpcProvider = provider.NewJSONRPCProvider(rpcURL, dispatchURLs)
rpcProvider.UpdateRequestConfig(0, time.Duration(defaultTimeOut)*time.Second)
wallet, err := signer.NewWalletFromPrivatekey(appPrivateKey)
if err != nil {
Expand Down Expand Up @@ -131,13 +140,23 @@ func runApplicationChecks(ctx context.Context, requestID string) error {
return bc.ID
})

var cacheWg sync.WaitGroup
cacheWg.Add(1)
cacheBatch := cache.BatchWriter(ctx, &cache.BatchWriterOptions{
Caches: caches,
BatchSize: int(cacheBatchSize),
WaitGroup: &cacheWg,
RequestID: requestID,
})

appChecks := applicationChecks{
Caches: caches,
Provider: rpcProvider,
Relayer: pocketRelayer,
MetricsRecorder: metricsRecorder,
BlockHeight: blockHeight,
RequestID: requestID,
CacheBatch: cacheBatch,
SyncChecker: &pocket.SyncChecker{
Relayer: pocketRelayer,
DefaultSyncAllowance: int(defaultSyncAllowance),
Expand All @@ -152,63 +171,73 @@ func runApplicationChecks(ctx context.Context, requestID string) error {
},
}

var sem = semaphore.NewWeighted(dispatchConcurrency)
ntApps = ntApps[15:17]

var wg sync.WaitGroup
var sem = semaphore.NewWeighted(dispatchConcurrency)
for index, app := range ntApps {
for _, chain := range app.Chains {
dbApp := dbApps[index]

session, err := appChecks.getSession(ctx, app.PublicKey, chain)
if err != nil {
logger.Log.WithFields(log.Fields{
"appPublicKey": app.PublicKey,
"chain": chain,
"error": err.Error(),
}).Errorf("error dispatching: %s", err.Error())
continue
}

blockchain := *blockchains[chain]

pocketAAT := provider.PocketAAT{
AppPubKey: dbApp.GatewayAAT.ApplicationPublicKey,
ClientPubKey: dbApp.GatewayAAT.ClientPublicKey,
Version: dbApp.GatewayAAT.Version,
Signature: dbApp.GatewayAAT.ApplicationSignature,
}

sem.Acquire(ctx, 1)
wg.Add(1)
go func() {
defer sem.Release(1)
go func(publicKey, ch string, idx int) {
defer wg.Done()
appChecks.chainCheck(ctx, pocket.ChainCheckOptions{
Session: *session,
Blockchain: blockchain.ID,
Data: blockchain.ChainIDCheck,
ChainID: blockchain.ChainID,
Path: blockchain.Path,
PocketAAT: pocketAAT,
}, blockchain, caches)
}()

sem.Acquire(ctx, 1)
wg.Add(1)
go func() {
defer sem.Release(1)
defer wg.Done()
appChecks.syncCheck(ctx, pocket.SyncCheckOptions{
Session: *session,
PocketAAT: pocketAAT,
SyncCheckOptions: blockchain.SyncCheckOptions,
AltruistURL: blockchain.Altruist,
Blockchain: blockchain.ID,
}, blockchain, caches)
}()
dbApp := dbApps[idx]

session, err := appChecks.getSession(ctx, publicKey, ch)
if err != nil {
logger.Log.WithFields(log.Fields{
"appPublicKey": publicKey,
"chain": ch,
"error": err.Error(),
}).Errorf("error dispatching: %s", err.Error())
return
}

blockchain := *blockchains[ch]

pocketAAT := provider.PocketAAT{
AppPubKey: dbApp.GatewayAAT.ApplicationPublicKey,
ClientPubKey: dbApp.GatewayAAT.ClientPublicKey,
Version: dbApp.GatewayAAT.Version,
Signature: dbApp.GatewayAAT.ApplicationSignature,
}

sem.Acquire(ctx, 1)
wg.Add(1)
go func() {
defer sem.Release(1)
defer wg.Done()
appChecks.chainCheck(ctx, pocket.ChainCheckOptions{
Session: *session,
Blockchain: blockchain.ID,
Data: blockchain.ChainIDCheck,
ChainID: blockchain.ChainID,
Path: blockchain.Path,
PocketAAT: pocketAAT,
}, blockchain, caches)
}()

sem.Acquire(ctx, 1)
wg.Add(1)
go func() {
defer sem.Release(1)
defer wg.Done()
appChecks.syncCheck(ctx, pocket.SyncCheckOptions{
Session: *session,
PocketAAT: pocketAAT,
SyncCheckOptions: blockchain.SyncCheckOptions,
AltruistURL: blockchain.Altruist,
Blockchain: blockchain.ID,
}, blockchain, caches)
}()
}(app.PublicKey, chain, index)
}
}
wg.Wait()

close(cacheBatch)
// Wait for the remaining items in the batch if any
cacheWg.Wait()

metricsRecorder.Conn.Close()
return cache.CloseConnections(caches)
}
Expand Down Expand Up @@ -241,12 +270,21 @@ func (ac *applicationChecks) chainCheck(ctx context.Context, options pocket.Chai
ttl = 30
}

if err := cache.WriteJSONToCaches(ctx, caches, cacheKey, nodes, uint(ttl)); err != nil {
marshalledNodes, err := json.Marshal(nodes)
if err != nil {
logger.Log.WithFields(log.Fields{
"chain": options.Session.Header.Chain,
"error": err.Error(),
"requestID": ac.RequestID,
}).Errorf("sync check: error writing to cache: %s", err.Error())
"error": err.Error(),
"requestID": ac.RequestID,
"blockchainID": blockchain,
"sessionKey": options.Session.Key,
}).Errorf("sync check: error marshalling nodes: %s", err.Error())
return nodes
}

ac.CacheBatch <- &cache.Item{
Key: cacheKey,
Value: marshalledNodes,
TTL: time.Duration(ttl) * time.Second,
}

return nodes
Expand All @@ -264,43 +302,40 @@ func (ac *applicationChecks) syncCheck(ctx context.Context, options pocket.SyncC
ttl = 30
}

if err := cache.WriteJSONToCaches(ctx, caches, cacheKey, nodes, uint(ttl)); err != nil {
logger.Log.WithFields(log.Fields{
"chain": options.Session.Header.Chain,
"error": err.Error(),
"requestID": ac.RequestID,
}).Errorf("sync check: error writing to cache: %s", err.Error())
}

if err := ac.eraseNodesFailureMark(ctx, options, nodes, caches); err != nil {
marshalledNodes, err := json.Marshal(nodes)
if err != nil {
logger.Log.WithFields(log.Fields{
"appPublicKey": options.Session.Header.AppPublicKey,
"chain": options.Session.Header.Chain,
"error": err.Error(),
"requestID": ac.RequestID,
}).Errorf("sync check: error erasing failure mark on cache: %s", err.Error())
"blockchainID": blockchain,
"sessionKey": options.Session.Key,
}).Errorf("sync check: error marshalling nodes: %s", err.Error())
return nodes
}
ac.CacheBatch <- &cache.Item{
Key: cacheKey,
Value: marshalledNodes,
TTL: time.Duration(ttl) * time.Second,
}

ac.eraseNodesFailureMark(ctx, options, nodes, caches)

return nodes
}

// eraseNodesFailureMark deletes the failure status on nodes on the api that were failing
// a significant amount of relays
func (ac *applicationChecks) eraseNodesFailureMark(ctx context.Context, options pocket.SyncCheckOptions, nodes []string, caches []*cache.Redis) error {
getNodeFailureKey := func(blockchain, node string) string {
func (ac *applicationChecks) eraseNodesFailureMark(ctx context.Context, options pocket.SyncCheckOptions, nodes []string, caches []*cache.Redis) {
nodeFailureKey := func(blockchain, node string) string {
return fmt.Sprintf("%s%s-%s-failure", ac.CommitHash, blockchain, node)
}

err := cache.RunFunctionOnAllClients(caches, func(ch *cache.Redis) error {
pipe := ch.Client.Pipeline()
for _, node := range nodes {
pipe.Set(ctx, getNodeFailureKey(options.Blockchain, node), "false", 1*time.Hour)
for _, node := range nodes {
ac.CacheBatch <- &cache.Item{
Key: nodeFailureKey(options.Blockchain, node),
Value: node,
TTL: 1 * time.Hour,
}
_, err := pipe.Exec(ctx)
return err
})

return err
}
}

func main() {
Expand Down
Loading

0 comments on commit 2ca5d21

Please sign in to comment.