From d7cf5f6f1ab529ccc3656f4b317d00aa6133a66d Mon Sep 17 00:00:00 2001 From: Bram Schuur Date: Tue, 20 Aug 2024 10:56:55 +0200 Subject: [PATCH] STAC-21470: Make sure new mirrors start with the right state --- internal/mirror/mirror.go | 4 ++-- internal/mirror/reflector.go | 23 ++++++++++++++++++++++- internal/mirror/sendqueue.go | 29 +++++++++++++++++++++++++---- internal/proxy/handler.go | 2 +- 4 files changed, 50 insertions(+), 8 deletions(-) diff --git a/internal/mirror/mirror.go b/internal/mirror/mirror.go index 3d3c27f..2113d3a 100644 --- a/internal/mirror/mirror.go +++ b/internal/mirror/mirror.go @@ -40,7 +40,7 @@ type MirrorStatus struct { URL string } -func NewMirror(targetURL string, config *config.Config, failureCh chan<- string, persistent bool) *Mirror { +func NewMirror(targetURL string, config *config.Config, failureCh chan<- string, persistent bool, sendQueue *SendQueue) *Mirror { retryAfter := time.Duration(config.RetryAfter) * time.Minute persistentFailureTimeout := time.Duration(config.PersistentFailureTimeout) * time.Minute @@ -51,7 +51,7 @@ func NewMirror(targetURL string, config *config.Config, failureCh chan<- string, persistentFailureTimeout: persistentFailureTimeout, targetURL: targetURL, failureCh: failureCh, - sendQueue: MakeSendQueue(config.MaxQueuedRequests), + sendQueue: sendQueue, } settings := gobreaker.Settings{ diff --git a/internal/mirror/reflector.go b/internal/mirror/reflector.go index 411a203..68e854a 100644 --- a/internal/mirror/reflector.go +++ b/internal/mirror/reflector.go @@ -14,6 +14,9 @@ type Reflector struct { DoneCh chan bool MirrorFailureChan chan string config *config.Config + // This sendQueue is kept to keep exact state of what epochs were sent by the handler. This is used when we make a + // new mirror so we the state of that new mirror is in sync. + templateSendQueue *SendQueue } func NewReflector(config *config.Config) *Reflector { @@ -23,6 +26,7 @@ func NewReflector(config *config.Config) *Reflector { DoneCh: make(chan bool), MirrorFailureChan: make(chan string), config: config, + templateSendQueue: MakeSendQueue(config.MaxQueuedRequests), } } @@ -32,6 +36,7 @@ func (r *Reflector) Reflect() { for { select { case req := <-r.IncomingCh: + r.updateTemplateQueue(req) r.sendToMirrors(req) case url := <-r.MirrorFailureChan: log.Printf("Mirror '%s' has persistent failures", url) @@ -42,6 +47,22 @@ func (r *Reflector) Reflect() { } } +func (r *Reflector) updateTemplateQueue(req *Request) { + // Update the + r.templateSendQueue.AddToQueue(req, "template") + // Execute all possible items + for { + requests := r.templateSendQueue.NextExecuteItems() + if len(requests) > 0 { + for _, req := range requests { + r.templateSendQueue.ExecutionCompleted(req) + } + } else { + break + } + } +} + func (r *Reflector) sendToMirrors(req *Request) { r.RLock() defer r.RUnlock() @@ -57,7 +78,7 @@ func (r *Reflector) AddMirrors(urls []string, persistent bool) { for _, url := range urls { log.Printf("Adding '%s' to mirror list.", url) - r.mirrors[url] = NewMirror(url, r.config, r.MirrorFailureChan, persistent) + r.mirrors[url] = NewMirror(url, r.config, r.MirrorFailureChan, persistent, r.templateSendQueue) } } diff --git a/internal/mirror/sendqueue.go b/internal/mirror/sendqueue.go index 7ed2e16..2935747 100644 --- a/internal/mirror/sendqueue.go +++ b/internal/mirror/sendqueue.go @@ -8,10 +8,17 @@ import ( type SendQueue struct { sync.Mutex - completedEpochsUntil uint64 // Keep track of until which epoch we saw all data - epochsCompleted map[uint64]interface{} // Map of epochs that are completed, but the completedEpochsUntil could not be updated yet. - requestsQueued []*Request // Slice with queued requests, ordered by epoch from old to new - maxQueueSize int + + completedEpochsUntil uint64 // Keep track of until which epoch we saw all data + + // Map of epochs that are completed, but the completedEpochsUntil could not be updated yet. + // Should only contain epochs bigger than completedEpochsUntil. Elements directly + // following completedEpochsUntil will be removed and cleaned up form this map + // For this to work all epoch should be consecutive + epochsCompleted map[uint64]interface{} + + requestsQueued []*Request // Slice with queued requests, ordered by epoch from old to new + maxQueueSize int } func MakeSendQueue(maxQueueSize int) *SendQueue { @@ -22,6 +29,20 @@ func MakeSendQueue(maxQueueSize int) *SendQueue { } } +func (s *SendQueue) Clone() *SendQueue { + completedCopied := make(map[uint64]interface{}, 0) + for c, _ := range s.epochsCompleted { + completedCopied[c] = nil + } + + return &SendQueue{ + completedEpochsUntil: s.completedEpochsUntil, + epochsCompleted: completedCopied, + requestsQueued: append([]*Request(nil), s.requestsQueued...), + maxQueueSize: s.maxQueueSize, + } +} + func (s *SendQueue) AddToQueue(req *Request, targetURL string) { s.Lock() defer s.Unlock() diff --git a/internal/proxy/handler.go b/internal/proxy/handler.go index 9b2ff22..8abfa6d 100644 --- a/internal/proxy/handler.go +++ b/internal/proxy/handler.go @@ -36,7 +36,7 @@ func ReverseProxyHandler(reflector *mirror.Reflector, url *url.URL) func(res htt // Server the request to main target proxyTo.ServeHTTP(res, req) - // At this point the request has been served to the main target, so we remove this as active reuqest + // At this point the request has been served to the main target, so we remove this as active request activeRequests.Remove(requestEpoch) reflector.IncomingCh <- mirror.NewRequest(req, body, requestEpoch, activeSnapshot)