Skip to content

Commit

Permalink
STAC-21470: Make sure new mirrors start with the right state
Browse files Browse the repository at this point in the history
  • Loading branch information
craffit committed Aug 20, 2024
1 parent 7d94f76 commit d7cf5f6
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 8 deletions.
4 changes: 2 additions & 2 deletions internal/mirror/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type MirrorStatus struct {
URL string
}

func NewMirror(targetURL string, config *config.Config, failureCh chan<- string, persistent bool) *Mirror {
func NewMirror(targetURL string, config *config.Config, failureCh chan<- string, persistent bool, sendQueue *SendQueue) *Mirror {
retryAfter := time.Duration(config.RetryAfter) * time.Minute
persistentFailureTimeout := time.Duration(config.PersistentFailureTimeout) * time.Minute

Expand All @@ -51,7 +51,7 @@ func NewMirror(targetURL string, config *config.Config, failureCh chan<- string,
persistentFailureTimeout: persistentFailureTimeout,
targetURL: targetURL,
failureCh: failureCh,
sendQueue: MakeSendQueue(config.MaxQueuedRequests),
sendQueue: sendQueue,
}

settings := gobreaker.Settings{
Expand Down
23 changes: 22 additions & 1 deletion internal/mirror/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ type Reflector struct {
DoneCh chan bool
MirrorFailureChan chan string
config *config.Config
// This sendQueue is kept to keep exact state of what epochs were sent by the handler. This is used when we make a
// new mirror so we the state of that new mirror is in sync.
templateSendQueue *SendQueue
}

func NewReflector(config *config.Config) *Reflector {
Expand All @@ -23,6 +26,7 @@ func NewReflector(config *config.Config) *Reflector {
DoneCh: make(chan bool),
MirrorFailureChan: make(chan string),
config: config,
templateSendQueue: MakeSendQueue(config.MaxQueuedRequests),
}
}

Expand All @@ -32,6 +36,7 @@ func (r *Reflector) Reflect() {
for {
select {
case req := <-r.IncomingCh:
r.updateTemplateQueue(req)
r.sendToMirrors(req)
case url := <-r.MirrorFailureChan:
log.Printf("Mirror '%s' has persistent failures", url)
Expand All @@ -42,6 +47,22 @@ func (r *Reflector) Reflect() {
}
}

func (r *Reflector) updateTemplateQueue(req *Request) {
// Update the
r.templateSendQueue.AddToQueue(req, "template")
// Execute all possible items
for {
requests := r.templateSendQueue.NextExecuteItems()
if len(requests) > 0 {
for _, req := range requests {
r.templateSendQueue.ExecutionCompleted(req)
}
} else {
break
}
}
}

func (r *Reflector) sendToMirrors(req *Request) {
r.RLock()
defer r.RUnlock()
Expand All @@ -57,7 +78,7 @@ func (r *Reflector) AddMirrors(urls []string, persistent bool) {

for _, url := range urls {
log.Printf("Adding '%s' to mirror list.", url)
r.mirrors[url] = NewMirror(url, r.config, r.MirrorFailureChan, persistent)
r.mirrors[url] = NewMirror(url, r.config, r.MirrorFailureChan, persistent, r.templateSendQueue)
}
}

Expand Down
29 changes: 25 additions & 4 deletions internal/mirror/sendqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@ import (

type SendQueue struct {
sync.Mutex
completedEpochsUntil uint64 // Keep track of until which epoch we saw all data
epochsCompleted map[uint64]interface{} // Map of epochs that are completed, but the completedEpochsUntil could not be updated yet.
requestsQueued []*Request // Slice with queued requests, ordered by epoch from old to new
maxQueueSize int

completedEpochsUntil uint64 // Keep track of until which epoch we saw all data

// Map of epochs that are completed, but the completedEpochsUntil could not be updated yet.
// Should only contain epochs bigger than completedEpochsUntil. Elements directly
// following completedEpochsUntil will be removed and cleaned up form this map
// For this to work all epoch should be consecutive
epochsCompleted map[uint64]interface{}

requestsQueued []*Request // Slice with queued requests, ordered by epoch from old to new
maxQueueSize int
}

func MakeSendQueue(maxQueueSize int) *SendQueue {
Expand All @@ -22,6 +29,20 @@ func MakeSendQueue(maxQueueSize int) *SendQueue {
}
}

func (s *SendQueue) Clone() *SendQueue {
completedCopied := make(map[uint64]interface{}, 0)
for c, _ := range s.epochsCompleted {
completedCopied[c] = nil
}

return &SendQueue{
completedEpochsUntil: s.completedEpochsUntil,
epochsCompleted: completedCopied,
requestsQueued: append([]*Request(nil), s.requestsQueued...),
maxQueueSize: s.maxQueueSize,
}
}

func (s *SendQueue) AddToQueue(req *Request, targetURL string) {
s.Lock()
defer s.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func ReverseProxyHandler(reflector *mirror.Reflector, url *url.URL) func(res htt
// Server the request to main target
proxyTo.ServeHTTP(res, req)

// At this point the request has been served to the main target, so we remove this as active reuqest
// At this point the request has been served to the main target, so we remove this as active request
activeRequests.Remove(requestEpoch)

reflector.IncomingCh <- mirror.NewRequest(req, body, requestEpoch, activeSnapshot)
Expand Down

0 comments on commit d7cf5f6

Please sign in to comment.