diff --git a/cache/async_cache.go b/cache/async_cache.go index b5b04989..5be0f3ac 100644 --- a/cache/async_cache.go +++ b/cache/async_cache.go @@ -91,12 +91,12 @@ func NewAsyncCache(cfg config.Cache, maxExecutionTime time.Duration) (*AsyncCach switch cfg.Mode { case "file_system": cache, err = newFilesSystemCache(cfg, graceTime) - transaction = newInMemoryTransactionRegistry(transactionDeadline) + transaction = newInMemoryTransactionRegistry(transactionDeadline, transactionEndedTTL) case "redis": var redisClient redis.UniversalClient redisClient, err = clients.NewRedisClient(cfg.Redis) cache = newRedisCache(redisClient, cfg) - transaction = newRedisTransactionRegistry(redisClient, transactionDeadline) + transaction = newRedisTransactionRegistry(redisClient, transactionDeadline, transactionEndedTTL) default: return nil, fmt.Errorf("unknown config mode") } diff --git a/cache/async_cache_test.go b/cache/async_cache_test.go index 17fdbb41..a361f3b4 100644 --- a/cache/async_cache_test.go +++ b/cache/async_cache_test.go @@ -15,7 +15,7 @@ const asyncTestDir = "./async-test-data" func TestAsyncCache_Cleanup_Of_Expired_Transactions(t *testing.T) { graceTime := 100 * time.Millisecond - asyncCache := newAsyncTestCache(t, graceTime) + asyncCache := newAsyncTestCache(t, graceTime, graceTime/2) defer func() { asyncCache.Close() os.RemoveAll(asyncTestDir) @@ -51,7 +51,7 @@ func TestAsyncCache_Cleanup_Of_Expired_Transactions(t *testing.T) { func TestAsyncCache_AwaitForConcurrentTransaction_GraceTimeWithoutTransactionCompletion(t *testing.T) { graceTime := 100 * time.Millisecond - asyncCache := newAsyncTestCache(t, graceTime) + asyncCache := newAsyncTestCache(t, graceTime, graceTime/2) defer func() { asyncCache.Close() @@ -94,7 +94,7 @@ func TestAsyncCache_AwaitForConcurrentTransaction_GraceTimeWithoutTransactionCom func TestAsyncCache_AwaitForConcurrentTransaction_TransactionCompletedWhileAwaiting(t *testing.T) { graceTime := 300 * time.Millisecond - asyncCache := newAsyncTestCache(t, graceTime) + asyncCache := newAsyncTestCache(t, graceTime, graceTime/2) defer func() { asyncCache.Close() @@ -138,7 +138,7 @@ func TestAsyncCache_AwaitForConcurrentTransaction_TransactionCompletedWhileAwait func TestAsyncCache_AwaitForConcurrentTransaction_TransactionFailedWhileAwaiting(t *testing.T) { graceTime := 300 * time.Millisecond - asyncCache := newAsyncTestCache(t, graceTime) + asyncCache := newAsyncTestCache(t, graceTime, graceTime/2) defer func() { asyncCache.Close() @@ -186,7 +186,7 @@ func TestAsyncCache_AwaitForConcurrentTransaction_TransactionFailedWhileAwaiting } } -func newAsyncTestCache(t *testing.T, graceTime time.Duration) *AsyncCache { +func newAsyncTestCache(t *testing.T, graceTime, transactionEndedTime time.Duration) *AsyncCache { t.Helper() cfg := config.Cache{ Name: "foobar", @@ -203,7 +203,7 @@ func newAsyncTestCache(t *testing.T, graceTime time.Duration) *AsyncCache { asyncC := &AsyncCache{ Cache: c, - TransactionRegistry: newInMemoryTransactionRegistry(graceTime), + TransactionRegistry: newInMemoryTransactionRegistry(graceTime, transactionEndedTime), graceTime: graceTime, } return asyncC diff --git a/cache/transaction_registry.go b/cache/transaction_registry.go index 052d7951..2b34148d 100644 --- a/cache/transaction_registry.go +++ b/cache/transaction_registry.go @@ -2,6 +2,7 @@ package cache import ( "io" + "time" ) // TransactionRegistry is a registry of ongoing queries identified by Key. @@ -21,6 +22,9 @@ type TransactionRegistry interface { Status(key *Key) (TransactionStatus, error) } +// transactionEndedTTL amount of time transaction record is kept after being updated +const transactionEndedTTL = 500 * time.Millisecond + type TransactionStatus struct { State TransactionState FailReason string // filled in only if state of transaction is transactionFailed diff --git a/cache/transaction_registry_inmem.go b/cache/transaction_registry_inmem.go index 0244cc1a..8971cd15 100644 --- a/cache/transaction_registry_inmem.go +++ b/cache/transaction_registry_inmem.go @@ -17,17 +17,19 @@ type inMemoryTransactionRegistry struct { pendingEntriesLock sync.Mutex pendingEntries map[string]pendingEntry - deadline time.Duration - stopCh chan struct{} - wg sync.WaitGroup + deadline time.Duration + transactionEndedDeadline time.Duration + stopCh chan struct{} + wg sync.WaitGroup } -func newInMemoryTransactionRegistry(deadline time.Duration) *inMemoryTransactionRegistry { +func newInMemoryTransactionRegistry(deadline, transactionEndedDeadline time.Duration) *inMemoryTransactionRegistry { transaction := &inMemoryTransactionRegistry{ - pendingEntriesLock: sync.Mutex{}, - pendingEntries: make(map[string]pendingEntry), - deadline: deadline, - stopCh: make(chan struct{}), + pendingEntriesLock: sync.Mutex{}, + pendingEntries: make(map[string]pendingEntry), + deadline: deadline, + transactionEndedDeadline: transactionEndedDeadline, + stopCh: make(chan struct{}), } transaction.wg.Add(1) @@ -72,11 +74,12 @@ func (i *inMemoryTransactionRegistry) updateTransactionState(key *Key, state Tra if entry, ok := i.pendingEntries[k]; ok { entry.state = state entry.failedReason = failReason + entry.deadline = time.Now().Add(i.transactionEndedDeadline) i.pendingEntries[k] = entry } else { log.Errorf("[attempt to complete transaction] entry not found for key: %s, registering new entry with %v status", key.String(), state) i.pendingEntries[k] = pendingEntry{ - deadline: time.Now().Add(i.deadline), + deadline: time.Now().Add(i.transactionEndedDeadline), state: state, failedReason: failReason, } diff --git a/cache/transaction_registry_redis.go b/cache/transaction_registry_redis.go index d98cdcf6..70c08cc1 100644 --- a/cache/transaction_registry_redis.go +++ b/cache/transaction_registry_redis.go @@ -13,14 +13,20 @@ import ( type redisTransactionRegistry struct { redisClient redis.UniversalClient - // deadline specifies TTL of record to be kept, no matter the associated transaction status + + // deadline specifies TTL of the record to be kept deadline time.Duration + + // transactionEndedDeadline specifies TTL of the record to be kept that has been ended (either completed or failed) + transactionEndedDeadline time.Duration } -func newRedisTransactionRegistry(redisClient redis.UniversalClient, deadline time.Duration) *redisTransactionRegistry { +func newRedisTransactionRegistry(redisClient redis.UniversalClient, deadline time.Duration, + endedDeadline time.Duration) *redisTransactionRegistry { return &redisTransactionRegistry{ - redisClient: redisClient, - deadline: deadline, + redisClient: redisClient, + deadline: deadline, + transactionEndedDeadline: endedDeadline, } } @@ -41,7 +47,7 @@ func (r *redisTransactionRegistry) Fail(key *Key, reason string) error { } func (r *redisTransactionRegistry) updateTransactionState(key *Key, value []byte) error { - return r.redisClient.Set(context.Background(), toTransactionKey(key), value, r.deadline).Err() + return r.redisClient.Set(context.Background(), toTransactionKey(key), value, r.transactionEndedDeadline).Err() } func (r *redisTransactionRegistry) Status(key *Key) (TransactionStatus, error) { diff --git a/cache/transaction_registry_redis_test.go b/cache/transaction_registry_redis_test.go index 446e4278..9e03c173 100644 --- a/cache/transaction_registry_redis_test.go +++ b/cache/transaction_registry_redis_test.go @@ -20,7 +20,7 @@ func TestRedisTransaction(t *testing.T) { Query: []byte("SELECT pending entries"), } - redisTransaction := newRedisTransactionRegistry(redisClient, graceTime) + redisTransaction := newRedisTransactionRegistry(redisClient, graceTime, graceTime) if err := redisTransaction.Create(key); err != nil { t.Fatalf("unexpected error: %s while registering new transaction", err) @@ -53,7 +53,7 @@ func TestFailRedisTransaction(t *testing.T) { Query: []byte("SELECT pending entries"), } - redisTransaction := newRedisTransactionRegistry(redisClient, graceTime) + redisTransaction := newRedisTransactionRegistry(redisClient, graceTime, graceTime) if err := redisTransaction.Create(key); err != nil { t.Fatalf("unexpected error: %s while registering new transaction", err) @@ -79,3 +79,51 @@ func TestFailRedisTransaction(t *testing.T) { t.Fatalf("unexpected: transaction should curry fail reason") } } + +func TestCleanupRedisTransaction(t *testing.T) { + s := miniredis.RunT(t) + + redisClient := redis.NewUniversalClient(&redis.UniversalOptions{ + Addrs: []string{s.Addr()}, + }) + + graceTime := 10 * time.Second + updatedTTL := 10 * time.Millisecond + key := &Key{ + Query: []byte("SELECT pending entries"), + } + + redisTransaction := newRedisTransactionRegistry(redisClient, graceTime, updatedTTL) + + if err := redisTransaction.Create(key); err != nil { + t.Fatalf("unexpected error: %s while registering new transaction", err) + } + + status, err := redisTransaction.Status(key) + if err != nil || !status.State.IsPending() { + t.Fatalf("unexpected: transaction should be pending") + } + + failReason := "failed for fun dudes" + + if err := redisTransaction.Fail(key, failReason); err != nil { + t.Fatalf("unexpected error: %s while unregistering transaction", err) + } + + status, err = redisTransaction.Status(key) + if err != nil || !status.State.IsFailed() { + t.Fatalf("unexpected: transaction should be failed") + } + + if status.FailReason != failReason { + t.Fatalf("unexpected: transaction should curry fail reason") + } + + // move ttls of mini redis to trigger the clean up transaction + s.FastForward(updatedTTL) + + status, err = redisTransaction.Status(key) + if err != nil || !status.State.IsAbsent() { + t.Fatalf("unexpected: transaction should be cleaned up") + } +} diff --git a/cache/transaction_registry_test.go b/cache/transaction_registry_test.go index 1d1406c4..8de6f8b5 100644 --- a/cache/transaction_registry_test.go +++ b/cache/transaction_registry_test.go @@ -10,7 +10,7 @@ func TestInMemoryTransaction(t *testing.T) { key := &Key{ Query: []byte("SELECT pending entries"), } - inMemoryTransaction := newInMemoryTransactionRegistry(graceTime) + inMemoryTransaction := newInMemoryTransactionRegistry(graceTime, graceTime) if err := inMemoryTransaction.Create(key); err != nil { t.Fatalf("unexpected error: %s while registering new transaction", err)