-
Notifications
You must be signed in to change notification settings - Fork 0
/
redrive.go
70 lines (54 loc) · 1.48 KB
/
redrive.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package sqsdr
import (
"context"
"log"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
)
// Redrive is a simple strategy that moves messages from a source queue to a destination queue.
type Redrive struct {
SourceClient *sqs.SQS
SourceQueueURL string
DestClient *sqs.SQS
DestQueueURL string
JMESPath string
Regex string
// Not implemented yet
concurrency int
}
// Redrive is the entry point into the redriving strategy
func (r *Redrive) Redrive() error {
if r.Regex != "" {
return r.filteredRedrive()
}
return r.simpleRedrive()
}
func (r *Redrive) simpleRedrive() error {
log.Println("starting simple redrive")
sink := &SQSSink{QueueURL: r.DestQueueURL, Client: r.DestClient}
pipeline := &Pipeline{
Chooser: &PassthroughChooser{},
LeftSink: sink,
RightSink: NoOpSink{},
}
poller := NewPoller(r.SourceQueueURL, r.SourceClient, pipeline)
return poller.Process(context.Background())
}
func (r *Redrive) filteredRedrive() error {
chooser, err := NewFilterChooser(r.JMESPath, r.Regex)
if err != nil {
return err
}
leftSink := &SQSSink{QueueURL: r.DestQueueURL, Client: r.DestClient}
rightSinkFunc := func(queueURL string, client sqsiface.SQSAPI) Sinker {
return &SQSSink{QueueURL: queueURL, Client: client}
}
f := FallthroughPipeline{
Chooser: chooser,
LeftSink: leftSink,
RightSinkFunc: rightSinkFunc,
SourceClient: r.SourceClient,
SourceQueueURL: r.SourceQueueURL,
}
return f.Run()
}