-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathsinkworker.go
126 lines (99 loc) · 3.38 KB
/
sinkworker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package conveyor
import (
"fmt"
"log"
"golang.org/x/sync/semaphore"
)
// SinkWorkerPool struct provides the worker pool infra for Sink interface
type SinkWorkerPool struct {
*ConcreteNodeWorker
inputChannel chan map[string]interface{}
}
// NewSinkWorkerPool creates a new SinkWorkerPool
func NewSinkWorkerPool(executor NodeExecutor, mode WorkerMode) NodeWorker {
cnw := newConcreteNodeWorker(executor, mode)
swp := &SinkWorkerPool{ConcreteNodeWorker: cnw}
return swp
}
// CreateChannels creates channels for the sink worker
func (swp *SinkWorkerPool) CreateChannels(buffer int) {
swp.inputChannel = make(chan map[string]interface{}, buffer)
}
// Start Sink Worker Pool
func (swp *SinkWorkerPool) Start(ctx CnvContext) error {
if swp.Mode == WorkerModeTransaction {
return swp.startTransactionMode(ctx)
} else if swp.Mode == WorkerModeLoop {
return swp.startLoopMode(ctx)
} else {
return ErrInvalidWorkerMode
}
}
// startLoopMode SinkWorkerPool
func (swp *SinkWorkerPool) startLoopMode(ctx CnvContext) error {
return swp.ConcreteNodeWorker.startLoopMode(ctx, swp.inputChannel, nil)
}
// startTransactionMode starts SourceWorkerPool in transaction mode
func (swp *SinkWorkerPool) startTransactionMode(ctx CnvContext) error {
swp.sem = semaphore.NewWeighted(int64(swp.WorkerCount))
workerLoop:
for {
select {
case <-ctx.Done():
break workerLoop
default:
}
in, ok := <-swp.inputChannel
if !ok {
ctx.SendLog(0, fmt.Sprintf("Executor:[%s] sink's input channel closed", swp.Executor.GetUniqueIdentifier()), nil)
break workerLoop
}
if err := swp.sem.Acquire(ctx, 1); err != nil {
ctx.SendLog(0, fmt.Sprintf("Worker:[%s] for Executor:[%s] Failed to acquire semaphore", swp.Name, swp.Executor.GetUniqueIdentifier()), err)
break
}
go func(data map[string]interface{}) {
defer swp.recovery(ctx, "SinkWorkerPool")
defer swp.sem.Release(1)
if ok {
_, err := swp.Executor.Execute(ctx, data)
if err == ErrExecuteNotImplemented {
ctx.SendLog(0, fmt.Sprintf("Executor:[%s]", swp.Executor.GetUniqueIdentifier()), err)
log.Fatalf("Improper setup of Executor[%s], Execute() method is required", swp.Executor.GetUniqueIdentifier())
}
if err != nil {
ctx.SendLog(2, fmt.Sprintf("Worker:[%s] for Executor:[%s] Execute() Call Failed.",
swp.Name, swp.Executor.GetUniqueIdentifier()), err)
}
}
return
}(in)
}
return nil
}
// GetOutputChannel returns the output channel of Sink WorkerPool
func (swp *SinkWorkerPool) GetOutputChannel() (chan map[string]interface{}, error) {
return nil, ErrOutputChanDoesNotExist
}
// GetInputChannel returns the input channel of Sink WorkerPool
func (swp *SinkWorkerPool) GetInputChannel() (chan map[string]interface{}, error) {
return swp.inputChannel, nil
}
// SetInputChannel updates the input channel of Sink WorkerPool
func (swp *SinkWorkerPool) SetInputChannel(inChan chan map[string]interface{}) error {
swp.inputChannel = inChan
return nil
}
// SetOutputChannel updates the output channel of Sink WorkerPool
func (swp *SinkWorkerPool) SetOutputChannel(outChan chan map[string]interface{}) error {
return ErrOutputChanDoesNotExist
}
// WorkerType returns the type of worker
func (swp *SinkWorkerPool) WorkerType() string {
return WorkerTypeSink
}
// WaitAndStop SinkWorkerPool
func (swp *SinkWorkerPool) WaitAndStop(ctx CnvContext) error {
_ = swp.ConcreteNodeWorker.WaitAndStop(ctx)
return nil
}