Skip to content

Commit

Permalink
Update TTL of transactions upon finalisation to 500 ms (#235)
Browse files Browse the repository at this point in the history
* refactor(proxy): move dispatching logic to serve function

* feat(transaction-registry): update ttl to 1 second upon transaction completion

* fix: complete failures with empty fail reason

* chore: add test over cleanup of completed transaction
  • Loading branch information
gontarzpawel authored Sep 26, 2022
1 parent 3326055 commit 893ece8
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 66 deletions.
4 changes: 2 additions & 2 deletions cache/async_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func NewAsyncCache(cfg config.Cache, maxExecutionTime time.Duration) (*AsyncCach
switch cfg.Mode {
case "file_system":
cache, err = newFilesSystemCache(cfg, graceTime)
transaction = newInMemoryTransactionRegistry(transactionDeadline)
transaction = newInMemoryTransactionRegistry(transactionDeadline, transactionEndedTTL)
case "redis":
var redisClient redis.UniversalClient
redisClient, err = clients.NewRedisClient(cfg.Redis)
cache = newRedisCache(redisClient, cfg)
transaction = newRedisTransactionRegistry(redisClient, transactionDeadline)
transaction = newRedisTransactionRegistry(redisClient, transactionDeadline, transactionEndedTTL)
default:
return nil, fmt.Errorf("unknown config mode")
}
Expand Down
12 changes: 6 additions & 6 deletions cache/async_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const asyncTestDir = "./async-test-data"

func TestAsyncCache_Cleanup_Of_Expired_Transactions(t *testing.T) {
graceTime := 100 * time.Millisecond
asyncCache := newAsyncTestCache(t, graceTime)
asyncCache := newAsyncTestCache(t, graceTime, graceTime/2)
defer func() {
asyncCache.Close()
os.RemoveAll(asyncTestDir)
Expand Down Expand Up @@ -51,7 +51,7 @@ func TestAsyncCache_Cleanup_Of_Expired_Transactions(t *testing.T) {

func TestAsyncCache_AwaitForConcurrentTransaction_GraceTimeWithoutTransactionCompletion(t *testing.T) {
graceTime := 100 * time.Millisecond
asyncCache := newAsyncTestCache(t, graceTime)
asyncCache := newAsyncTestCache(t, graceTime, graceTime/2)

defer func() {
asyncCache.Close()
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestAsyncCache_AwaitForConcurrentTransaction_GraceTimeWithoutTransactionCom

func TestAsyncCache_AwaitForConcurrentTransaction_TransactionCompletedWhileAwaiting(t *testing.T) {
graceTime := 300 * time.Millisecond
asyncCache := newAsyncTestCache(t, graceTime)
asyncCache := newAsyncTestCache(t, graceTime, graceTime/2)

defer func() {
asyncCache.Close()
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestAsyncCache_AwaitForConcurrentTransaction_TransactionCompletedWhileAwait

func TestAsyncCache_AwaitForConcurrentTransaction_TransactionFailedWhileAwaiting(t *testing.T) {
graceTime := 300 * time.Millisecond
asyncCache := newAsyncTestCache(t, graceTime)
asyncCache := newAsyncTestCache(t, graceTime, graceTime/2)

defer func() {
asyncCache.Close()
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestAsyncCache_AwaitForConcurrentTransaction_TransactionFailedWhileAwaiting
}
}

func newAsyncTestCache(t *testing.T, graceTime time.Duration) *AsyncCache {
func newAsyncTestCache(t *testing.T, graceTime, transactionEndedTime time.Duration) *AsyncCache {
t.Helper()
cfg := config.Cache{
Name: "foobar",
Expand All @@ -203,7 +203,7 @@ func newAsyncTestCache(t *testing.T, graceTime time.Duration) *AsyncCache {

asyncC := &AsyncCache{
Cache: c,
TransactionRegistry: newInMemoryTransactionRegistry(graceTime),
TransactionRegistry: newInMemoryTransactionRegistry(graceTime, transactionEndedTime),
graceTime: graceTime,
}
return asyncC
Expand Down
4 changes: 4 additions & 0 deletions cache/transaction_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"io"
"time"
)

// TransactionRegistry is a registry of ongoing queries identified by Key.
Expand All @@ -21,6 +22,9 @@ type TransactionRegistry interface {
Status(key *Key) (TransactionStatus, error)
}

// transactionEndedTTL amount of time transaction record is kept after being updated
const transactionEndedTTL = 500 * time.Millisecond

type TransactionStatus struct {
State TransactionState
FailReason string // filled in only if state of transaction is transactionFailed
Expand Down
21 changes: 12 additions & 9 deletions cache/transaction_registry_inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@ type inMemoryTransactionRegistry struct {
pendingEntriesLock sync.Mutex
pendingEntries map[string]pendingEntry

deadline time.Duration
stopCh chan struct{}
wg sync.WaitGroup
deadline time.Duration
transactionEndedDeadline time.Duration
stopCh chan struct{}
wg sync.WaitGroup
}

func newInMemoryTransactionRegistry(deadline time.Duration) *inMemoryTransactionRegistry {
func newInMemoryTransactionRegistry(deadline, transactionEndedDeadline time.Duration) *inMemoryTransactionRegistry {
transaction := &inMemoryTransactionRegistry{
pendingEntriesLock: sync.Mutex{},
pendingEntries: make(map[string]pendingEntry),
deadline: deadline,
stopCh: make(chan struct{}),
pendingEntriesLock: sync.Mutex{},
pendingEntries: make(map[string]pendingEntry),
deadline: deadline,
transactionEndedDeadline: transactionEndedDeadline,
stopCh: make(chan struct{}),
}

transaction.wg.Add(1)
Expand Down Expand Up @@ -72,11 +74,12 @@ func (i *inMemoryTransactionRegistry) updateTransactionState(key *Key, state Tra
if entry, ok := i.pendingEntries[k]; ok {
entry.state = state
entry.failedReason = failReason
entry.deadline = time.Now().Add(i.transactionEndedDeadline)
i.pendingEntries[k] = entry
} else {
log.Errorf("[attempt to complete transaction] entry not found for key: %s, registering new entry with %v status", key.String(), state)
i.pendingEntries[k] = pendingEntry{
deadline: time.Now().Add(i.deadline),
deadline: time.Now().Add(i.transactionEndedDeadline),
state: state,
failedReason: failReason,
}
Expand Down
16 changes: 11 additions & 5 deletions cache/transaction_registry_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,20 @@ import (

type redisTransactionRegistry struct {
redisClient redis.UniversalClient
// deadline specifies TTL of record to be kept, no matter the associated transaction status

// deadline specifies TTL of the record to be kept
deadline time.Duration

// transactionEndedDeadline specifies TTL of the record to be kept that has been ended (either completed or failed)
transactionEndedDeadline time.Duration
}

func newRedisTransactionRegistry(redisClient redis.UniversalClient, deadline time.Duration) *redisTransactionRegistry {
func newRedisTransactionRegistry(redisClient redis.UniversalClient, deadline time.Duration,
endedDeadline time.Duration) *redisTransactionRegistry {
return &redisTransactionRegistry{
redisClient: redisClient,
deadline: deadline,
redisClient: redisClient,
deadline: deadline,
transactionEndedDeadline: endedDeadline,
}
}

Expand All @@ -41,7 +47,7 @@ func (r *redisTransactionRegistry) Fail(key *Key, reason string) error {
}

func (r *redisTransactionRegistry) updateTransactionState(key *Key, value []byte) error {
return r.redisClient.Set(context.Background(), toTransactionKey(key), value, r.deadline).Err()
return r.redisClient.Set(context.Background(), toTransactionKey(key), value, r.transactionEndedDeadline).Err()
}

func (r *redisTransactionRegistry) Status(key *Key) (TransactionStatus, error) {
Expand Down
94 changes: 92 additions & 2 deletions cache/transaction_registry_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestRedisTransaction(t *testing.T) {
Query: []byte("SELECT pending entries"),
}

redisTransaction := newRedisTransactionRegistry(redisClient, graceTime)
redisTransaction := newRedisTransactionRegistry(redisClient, graceTime, graceTime)

if err := redisTransaction.Create(key); err != nil {
t.Fatalf("unexpected error: %s while registering new transaction", err)
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestFailRedisTransaction(t *testing.T) {
Query: []byte("SELECT pending entries"),
}

redisTransaction := newRedisTransactionRegistry(redisClient, graceTime)
redisTransaction := newRedisTransactionRegistry(redisClient, graceTime, graceTime)

if err := redisTransaction.Create(key); err != nil {
t.Fatalf("unexpected error: %s while registering new transaction", err)
Expand All @@ -79,3 +79,93 @@ func TestFailRedisTransaction(t *testing.T) {
t.Fatalf("unexpected: transaction should curry fail reason")
}
}

func TestCleanupFailedRedisTransaction(t *testing.T) {
s := miniredis.RunT(t)

redisClient := redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: []string{s.Addr()},
})

graceTime := 10 * time.Second
updatedTTL := 10 * time.Millisecond
key := &Key{
Query: []byte("SELECT pending entries"),
}

redisTransaction := newRedisTransactionRegistry(redisClient, graceTime, updatedTTL)

if err := redisTransaction.Create(key); err != nil {
t.Fatalf("unexpected error: %s while registering new transaction", err)
}

status, err := redisTransaction.Status(key)
if err != nil || !status.State.IsPending() {
t.Fatalf("unexpected: transaction should be pending")
}

failReason := "failed for fun dudes"

if err := redisTransaction.Fail(key, failReason); err != nil {
t.Fatalf("unexpected error: %s while unregistering transaction", err)
}

status, err = redisTransaction.Status(key)
if err != nil || !status.State.IsFailed() {
t.Fatalf("unexpected: transaction should be failed")
}

if status.FailReason != failReason {
t.Fatalf("unexpected: transaction should curry fail reason")
}

// move ttls of mini redis to trigger the clean up transaction
s.FastForward(updatedTTL)

status, err = redisTransaction.Status(key)
if err != nil || !status.State.IsAbsent() {
t.Fatalf("unexpected: transaction should be cleaned up")
}
}

func TestCleanupCompletedRedisTransaction(t *testing.T) {
s := miniredis.RunT(t)

redisClient := redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: []string{s.Addr()},
})

graceTime := 10 * time.Second
updatedTTL := 10 * time.Millisecond
key := &Key{
Query: []byte("SELECT pending entries"),
}

redisTransaction := newRedisTransactionRegistry(redisClient, graceTime, updatedTTL)

if err := redisTransaction.Create(key); err != nil {
t.Fatalf("unexpected error: %s while registering new transaction", err)
}

status, err := redisTransaction.Status(key)
if err != nil || !status.State.IsPending() {
t.Fatalf("unexpected: transaction should be pending")
}

if err := redisTransaction.Complete(key); err != nil {
t.Fatalf("unexpected error: %s while unregistering transaction", err)
}

status, err = redisTransaction.Status(key)
if err != nil || !status.State.IsCompleted() {
t.Fatalf("unexpected: transaction should be failed")
}

// move ttls of mini redis to trigger the clean up transaction
s.FastForward(updatedTTL)

status, err = redisTransaction.Status(key)
if err != nil || !status.State.IsAbsent() {
t.Fatalf("unexpected: transaction should be cleaned up")
}
}
2 changes: 1 addition & 1 deletion cache/transaction_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func TestInMemoryTransaction(t *testing.T) {
key := &Key{
Query: []byte("SELECT pending entries"),
}
inMemoryTransaction := newInMemoryTransactionRegistry(graceTime)
inMemoryTransaction := newInMemoryTransactionRegistry(graceTime, graceTime)

if err := inMemoryTransaction.Create(key); err != nil {
t.Fatalf("unexpected error: %s while registering new transaction", err)
Expand Down
Loading

0 comments on commit 893ece8

Please sign in to comment.