From 893ece87905ef31a9bcdfc16b575834fef5feedc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Gontarz?= Date: Mon, 26 Sep 2022 18:53:01 +0200 Subject: [PATCH] Update TTL of transactions upon finalisation to 500 ms (#235) * refactor(proxy): move dispatching logic to serve function * feat(transaction-registry): update ttl to 1 second upon transaction completion * fix: complete failures with empty fail reason * chore: add test over cleanup of completed transaction --- cache/async_cache.go | 4 +- cache/async_cache_test.go | 12 +-- cache/transaction_registry.go | 4 + cache/transaction_registry_inmem.go | 21 +++--- cache/transaction_registry_redis.go | 16 ++-- cache/transaction_registry_redis_test.go | 94 +++++++++++++++++++++++- cache/transaction_registry_test.go | 2 +- proxy.go | 88 +++++++++++----------- 8 files changed, 175 insertions(+), 66 deletions(-) diff --git a/cache/async_cache.go b/cache/async_cache.go index b5b04989..5be0f3ac 100644 --- a/cache/async_cache.go +++ b/cache/async_cache.go @@ -91,12 +91,12 @@ func NewAsyncCache(cfg config.Cache, maxExecutionTime time.Duration) (*AsyncCach switch cfg.Mode { case "file_system": cache, err = newFilesSystemCache(cfg, graceTime) - transaction = newInMemoryTransactionRegistry(transactionDeadline) + transaction = newInMemoryTransactionRegistry(transactionDeadline, transactionEndedTTL) case "redis": var redisClient redis.UniversalClient redisClient, err = clients.NewRedisClient(cfg.Redis) cache = newRedisCache(redisClient, cfg) - transaction = newRedisTransactionRegistry(redisClient, transactionDeadline) + transaction = newRedisTransactionRegistry(redisClient, transactionDeadline, transactionEndedTTL) default: return nil, fmt.Errorf("unknown config mode") } diff --git a/cache/async_cache_test.go b/cache/async_cache_test.go index 17fdbb41..a361f3b4 100644 --- a/cache/async_cache_test.go +++ b/cache/async_cache_test.go @@ -15,7 +15,7 @@ const asyncTestDir = "./async-test-data" func TestAsyncCache_Cleanup_Of_Expired_Transactions(t *testing.T) { graceTime := 100 * time.Millisecond - asyncCache := newAsyncTestCache(t, graceTime) + asyncCache := newAsyncTestCache(t, graceTime, graceTime/2) defer func() { asyncCache.Close() os.RemoveAll(asyncTestDir) @@ -51,7 +51,7 @@ func TestAsyncCache_Cleanup_Of_Expired_Transactions(t *testing.T) { func TestAsyncCache_AwaitForConcurrentTransaction_GraceTimeWithoutTransactionCompletion(t *testing.T) { graceTime := 100 * time.Millisecond - asyncCache := newAsyncTestCache(t, graceTime) + asyncCache := newAsyncTestCache(t, graceTime, graceTime/2) defer func() { asyncCache.Close() @@ -94,7 +94,7 @@ func TestAsyncCache_AwaitForConcurrentTransaction_GraceTimeWithoutTransactionCom func TestAsyncCache_AwaitForConcurrentTransaction_TransactionCompletedWhileAwaiting(t *testing.T) { graceTime := 300 * time.Millisecond - asyncCache := newAsyncTestCache(t, graceTime) + asyncCache := newAsyncTestCache(t, graceTime, graceTime/2) defer func() { asyncCache.Close() @@ -138,7 +138,7 @@ func TestAsyncCache_AwaitForConcurrentTransaction_TransactionCompletedWhileAwait func TestAsyncCache_AwaitForConcurrentTransaction_TransactionFailedWhileAwaiting(t *testing.T) { graceTime := 300 * time.Millisecond - asyncCache := newAsyncTestCache(t, graceTime) + asyncCache := newAsyncTestCache(t, graceTime, graceTime/2) defer func() { asyncCache.Close() @@ -186,7 +186,7 @@ func TestAsyncCache_AwaitForConcurrentTransaction_TransactionFailedWhileAwaiting } } -func newAsyncTestCache(t *testing.T, graceTime time.Duration) *AsyncCache { +func newAsyncTestCache(t *testing.T, graceTime, transactionEndedTime time.Duration) *AsyncCache { t.Helper() cfg := config.Cache{ Name: "foobar", @@ -203,7 +203,7 @@ func newAsyncTestCache(t *testing.T, graceTime time.Duration) *AsyncCache { asyncC := &AsyncCache{ Cache: c, - TransactionRegistry: newInMemoryTransactionRegistry(graceTime), + TransactionRegistry: newInMemoryTransactionRegistry(graceTime, transactionEndedTime), graceTime: graceTime, } return asyncC diff --git a/cache/transaction_registry.go b/cache/transaction_registry.go index 052d7951..2b34148d 100644 --- a/cache/transaction_registry.go +++ b/cache/transaction_registry.go @@ -2,6 +2,7 @@ package cache import ( "io" + "time" ) // TransactionRegistry is a registry of ongoing queries identified by Key. @@ -21,6 +22,9 @@ type TransactionRegistry interface { Status(key *Key) (TransactionStatus, error) } +// transactionEndedTTL amount of time transaction record is kept after being updated +const transactionEndedTTL = 500 * time.Millisecond + type TransactionStatus struct { State TransactionState FailReason string // filled in only if state of transaction is transactionFailed diff --git a/cache/transaction_registry_inmem.go b/cache/transaction_registry_inmem.go index 0244cc1a..8971cd15 100644 --- a/cache/transaction_registry_inmem.go +++ b/cache/transaction_registry_inmem.go @@ -17,17 +17,19 @@ type inMemoryTransactionRegistry struct { pendingEntriesLock sync.Mutex pendingEntries map[string]pendingEntry - deadline time.Duration - stopCh chan struct{} - wg sync.WaitGroup + deadline time.Duration + transactionEndedDeadline time.Duration + stopCh chan struct{} + wg sync.WaitGroup } -func newInMemoryTransactionRegistry(deadline time.Duration) *inMemoryTransactionRegistry { +func newInMemoryTransactionRegistry(deadline, transactionEndedDeadline time.Duration) *inMemoryTransactionRegistry { transaction := &inMemoryTransactionRegistry{ - pendingEntriesLock: sync.Mutex{}, - pendingEntries: make(map[string]pendingEntry), - deadline: deadline, - stopCh: make(chan struct{}), + pendingEntriesLock: sync.Mutex{}, + pendingEntries: make(map[string]pendingEntry), + deadline: deadline, + transactionEndedDeadline: transactionEndedDeadline, + stopCh: make(chan struct{}), } transaction.wg.Add(1) @@ -72,11 +74,12 @@ func (i *inMemoryTransactionRegistry) updateTransactionState(key *Key, state Tra if entry, ok := i.pendingEntries[k]; ok { entry.state = state entry.failedReason = failReason + entry.deadline = time.Now().Add(i.transactionEndedDeadline) i.pendingEntries[k] = entry } else { log.Errorf("[attempt to complete transaction] entry not found for key: %s, registering new entry with %v status", key.String(), state) i.pendingEntries[k] = pendingEntry{ - deadline: time.Now().Add(i.deadline), + deadline: time.Now().Add(i.transactionEndedDeadline), state: state, failedReason: failReason, } diff --git a/cache/transaction_registry_redis.go b/cache/transaction_registry_redis.go index d98cdcf6..70c08cc1 100644 --- a/cache/transaction_registry_redis.go +++ b/cache/transaction_registry_redis.go @@ -13,14 +13,20 @@ import ( type redisTransactionRegistry struct { redisClient redis.UniversalClient - // deadline specifies TTL of record to be kept, no matter the associated transaction status + + // deadline specifies TTL of the record to be kept deadline time.Duration + + // transactionEndedDeadline specifies TTL of the record to be kept that has been ended (either completed or failed) + transactionEndedDeadline time.Duration } -func newRedisTransactionRegistry(redisClient redis.UniversalClient, deadline time.Duration) *redisTransactionRegistry { +func newRedisTransactionRegistry(redisClient redis.UniversalClient, deadline time.Duration, + endedDeadline time.Duration) *redisTransactionRegistry { return &redisTransactionRegistry{ - redisClient: redisClient, - deadline: deadline, + redisClient: redisClient, + deadline: deadline, + transactionEndedDeadline: endedDeadline, } } @@ -41,7 +47,7 @@ func (r *redisTransactionRegistry) Fail(key *Key, reason string) error { } func (r *redisTransactionRegistry) updateTransactionState(key *Key, value []byte) error { - return r.redisClient.Set(context.Background(), toTransactionKey(key), value, r.deadline).Err() + return r.redisClient.Set(context.Background(), toTransactionKey(key), value, r.transactionEndedDeadline).Err() } func (r *redisTransactionRegistry) Status(key *Key) (TransactionStatus, error) { diff --git a/cache/transaction_registry_redis_test.go b/cache/transaction_registry_redis_test.go index 446e4278..a4adf260 100644 --- a/cache/transaction_registry_redis_test.go +++ b/cache/transaction_registry_redis_test.go @@ -20,7 +20,7 @@ func TestRedisTransaction(t *testing.T) { Query: []byte("SELECT pending entries"), } - redisTransaction := newRedisTransactionRegistry(redisClient, graceTime) + redisTransaction := newRedisTransactionRegistry(redisClient, graceTime, graceTime) if err := redisTransaction.Create(key); err != nil { t.Fatalf("unexpected error: %s while registering new transaction", err) @@ -53,7 +53,7 @@ func TestFailRedisTransaction(t *testing.T) { Query: []byte("SELECT pending entries"), } - redisTransaction := newRedisTransactionRegistry(redisClient, graceTime) + redisTransaction := newRedisTransactionRegistry(redisClient, graceTime, graceTime) if err := redisTransaction.Create(key); err != nil { t.Fatalf("unexpected error: %s while registering new transaction", err) @@ -79,3 +79,93 @@ func TestFailRedisTransaction(t *testing.T) { t.Fatalf("unexpected: transaction should curry fail reason") } } + +func TestCleanupFailedRedisTransaction(t *testing.T) { + s := miniredis.RunT(t) + + redisClient := redis.NewUniversalClient(&redis.UniversalOptions{ + Addrs: []string{s.Addr()}, + }) + + graceTime := 10 * time.Second + updatedTTL := 10 * time.Millisecond + key := &Key{ + Query: []byte("SELECT pending entries"), + } + + redisTransaction := newRedisTransactionRegistry(redisClient, graceTime, updatedTTL) + + if err := redisTransaction.Create(key); err != nil { + t.Fatalf("unexpected error: %s while registering new transaction", err) + } + + status, err := redisTransaction.Status(key) + if err != nil || !status.State.IsPending() { + t.Fatalf("unexpected: transaction should be pending") + } + + failReason := "failed for fun dudes" + + if err := redisTransaction.Fail(key, failReason); err != nil { + t.Fatalf("unexpected error: %s while unregistering transaction", err) + } + + status, err = redisTransaction.Status(key) + if err != nil || !status.State.IsFailed() { + t.Fatalf("unexpected: transaction should be failed") + } + + if status.FailReason != failReason { + t.Fatalf("unexpected: transaction should curry fail reason") + } + + // move ttls of mini redis to trigger the clean up transaction + s.FastForward(updatedTTL) + + status, err = redisTransaction.Status(key) + if err != nil || !status.State.IsAbsent() { + t.Fatalf("unexpected: transaction should be cleaned up") + } +} + +func TestCleanupCompletedRedisTransaction(t *testing.T) { + s := miniredis.RunT(t) + + redisClient := redis.NewUniversalClient(&redis.UniversalOptions{ + Addrs: []string{s.Addr()}, + }) + + graceTime := 10 * time.Second + updatedTTL := 10 * time.Millisecond + key := &Key{ + Query: []byte("SELECT pending entries"), + } + + redisTransaction := newRedisTransactionRegistry(redisClient, graceTime, updatedTTL) + + if err := redisTransaction.Create(key); err != nil { + t.Fatalf("unexpected error: %s while registering new transaction", err) + } + + status, err := redisTransaction.Status(key) + if err != nil || !status.State.IsPending() { + t.Fatalf("unexpected: transaction should be pending") + } + + if err := redisTransaction.Complete(key); err != nil { + t.Fatalf("unexpected error: %s while unregistering transaction", err) + } + + status, err = redisTransaction.Status(key) + if err != nil || !status.State.IsCompleted() { + t.Fatalf("unexpected: transaction should be failed") + } + + // move ttls of mini redis to trigger the clean up transaction + s.FastForward(updatedTTL) + + status, err = redisTransaction.Status(key) + if err != nil || !status.State.IsAbsent() { + t.Fatalf("unexpected: transaction should be cleaned up") + } +} diff --git a/cache/transaction_registry_test.go b/cache/transaction_registry_test.go index 1d1406c4..8de6f8b5 100644 --- a/cache/transaction_registry_test.go +++ b/cache/transaction_registry_test.go @@ -10,7 +10,7 @@ func TestInMemoryTransaction(t *testing.T) { key := &Key{ Query: []byte("SELECT pending entries"), } - inMemoryTransaction := newInMemoryTransactionRegistry(graceTime) + inMemoryTransaction := newInMemoryTransactionRegistry(graceTime, graceTime) if err := inMemoryTransaction.Create(key); err != nil { t.Fatalf("unexpected error: %s while registering new transaction", err) diff --git a/proxy.go b/proxy.go index a1e80fb0..f9ab32f8 100644 --- a/proxy.go +++ b/proxy.go @@ -117,7 +117,25 @@ func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { if s.user.cache == nil || s.user.cache.Cache == nil { rp.proxyRequest(s, srw, srw, req) } else { - rp.serveFromCache(s, srw, req, origParams) + noCache := origParams.Get("no_cache") + if noCache == "1" || noCache == "true" { + // The response caching is disabled. + rp.proxyRequest(s, srw, srw, req) + } else { + q, err := getFullQuery(req) + if err != nil { + err = fmt.Errorf("%s: cannot read query: %w", s, err) + respondWith(srw, err, http.StatusBadRequest) + return + } + + if !canCacheQuery(q) { + // The query cannot be cached, so just proxy it. + rp.proxyRequest(s, srw, srw, req) + } else { + rp.serveFromCache(s, srw, req, origParams, q) + } + } } // It is safe calling getQuerySnippet here, since the request @@ -233,43 +251,9 @@ func (rp *reverseProxy) proxyRequest(s *scope, rw http.ResponseWriter, srw *stat } } -func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *http.Request, origParams url.Values) { - noCache := origParams.Get("no_cache") - if noCache == "1" || noCache == "true" { - // The response caching is disabled. - rp.proxyRequest(s, srw, srw, req) - return - } - - q, err := getFullQuery(req) - if err != nil { - err = fmt.Errorf("%s: cannot read query: %w", s, err) - respondWith(srw, err, http.StatusBadRequest) - return - } - if !canCacheQuery(q) { - // The query cannot be cached, so just proxy it. - rp.proxyRequest(s, srw, srw, req) - return - } - - // Do not store `replica` and `cluster_node` in labels, since they have - // no sense for cache metrics. - labels := prometheus.Labels{ - "cache": s.user.cache.Name(), - "user": s.labels["user"], - "cluster": s.labels["cluster"], - "cluster_user": s.labels["cluster_user"], - } - - var userParamsHash uint32 - if s.user.params != nil { - userParamsHash = s.user.params.key - } - - queryParamsHash := calcQueryParamsHash(origParams) - - key := cache.NewKey(skipLeadingComments(q), origParams, sortHeader(req.Header.Get("Accept-Encoding")), userParamsHash, queryParamsHash) +func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *http.Request, origParams url.Values, q []byte) { + labels := makeCacheLabels(s) + key := newCacheKey(s, origParams, q, req) startTime := time.Now() userCache := s.user.cache @@ -279,8 +263,7 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h // The response has been successfully served from cache. defer cachedData.Data.Close() cacheHit.With(labels).Inc() - since := time.Since(startTime).Seconds() - cachedResponseDuration.With(labels).Observe(since) + cachedResponseDuration.With(labels).Observe(time.Since(startTime).Seconds()) log.Debugf("%s: cache hit", s) _ = RespondWithData(srw, cachedData.Data, cachedData.ContentMetadata, cachedData.Ttl, http.StatusOK) return @@ -414,6 +397,28 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h } } +func makeCacheLabels(s *scope) prometheus.Labels { + // Do not store `replica` and `cluster_node` in labels, since they have + // no sense for cache metrics. + return prometheus.Labels{ + "cache": s.user.cache.Name(), + "user": s.labels["user"], + "cluster": s.labels["cluster"], + "cluster_user": s.labels["cluster_user"], + } +} + +func newCacheKey(s *scope, origParams url.Values, q []byte, req *http.Request) *cache.Key { + var userParamsHash uint32 + if s.user.params != nil { + userParamsHash = s.user.params.key + } + + queryParamsHash := calcQueryParamsHash(origParams) + + return cache.NewKey(skipLeadingComments(q), origParams, sortHeader(req.Header.Get("Accept-Encoding")), userParamsHash, queryParamsHash) +} + func toString(stream io.Reader) (string, error) { buf := new(bytes.Buffer) @@ -433,7 +438,8 @@ var clickhouseRecoverableStatusCodes = map[int]struct{}{http.StatusServiceUnavai func (rp *reverseProxy) completeTransaction(s *scope, statusCode int, userCache *cache.AsyncCache, key *cache.Key, q []byte, failReason string) { - if statusCode < 300 { + // complete successful transactions or those with empty fail reason + if statusCode < 300 || failReason == "" { if err := userCache.Complete(key); err != nil { log.Errorf("%s: %s; query: %q", s, err, q) }