-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathhandler_func_pool.go
124 lines (98 loc) · 3.2 KB
/
handler_func_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package simplefixgo
import (
"errors"
"sync"
)
// ErrHandleNotFound is returned when a required handler is not found.
var ErrHandleNotFound = errors.New("handler not found")
// HandlerPool is used for managing the pool of message handlers.
type HandlerPool struct {
mu sync.RWMutex
handlers map[string][]interface{}
counter *int64
}
// NewHandlerPool creates a new HandlerPool instance.
func NewHandlerPool() *HandlerPool {
return &HandlerPool{
handlers: make(map[string][]interface{}),
counter: new(int64),
}
}
func (p *HandlerPool) free(msgType string) {
if len(p.handlers[msgType]) != 0 {
return
}
delete(p.handlers, msgType)
}
// Remove is used to remove a handler with a specified identifier.
func (p *HandlerPool) Remove(msgType string, _ int64) error {
if _, ok := p.handlers[msgType]; !ok {
return ErrHandleNotFound
}
p.free(msgType)
return nil
}
func (p *HandlerPool) handlersByMsgType(msgType string) (result []interface{}) {
p.mu.RLock()
defer p.mu.RUnlock()
handlers, ok := p.handlers[msgType]
if !ok {
return
}
result = make([]interface{}, 0, len(handlers))
result = append(result, handlers...)
return result
}
func (p *HandlerPool) add(msgType string, handle interface{}) int64 {
p.mu.Lock()
defer p.mu.Unlock()
if _, ok := p.handlers[msgType]; !ok {
p.handlers[msgType] = make([]interface{}, 0)
}
p.handlers[msgType] = append(p.handlers[msgType], handle)
return int64(len(p.handlers)) - 1
}
// IncomingHandlerPool is used to manage the pool of incoming messages stored in the form of byte arrays.
type IncomingHandlerPool struct {
*HandlerPool
}
// NewIncomingHandlerPool creates a new HandlerPool instance.
func NewIncomingHandlerPool() IncomingHandlerPool {
return IncomingHandlerPool{NewHandlerPool()}
}
// Range is used for traversal through handlers. The traversal stops if any handler returns false.
func (p IncomingHandlerPool) Range(msgType string, f func(IncomingHandlerFunc) bool) {
for _, handle := range p.handlersByMsgType(msgType) {
if !f(handle.(IncomingHandlerFunc)) {
break
}
}
}
// Add is used to add a new message handler for the specified message type.
// The function returns the ID of a message for which a handler was added.
func (p *IncomingHandlerPool) Add(msgType string, handle IncomingHandlerFunc) int64 {
return p.add(msgType, handle)
}
// OutgoingHandlerPool is used to manage the pool of outgoing messages stored as structures.
type OutgoingHandlerPool struct {
*HandlerPool
}
// NewOutgoingHandlerPool creates a new OutgoingHandlerPool instance.
func NewOutgoingHandlerPool() OutgoingHandlerPool {
return OutgoingHandlerPool{NewHandlerPool()}
}
// Range is used for traversal through handlers.
// The traversal stops if any handler returns false.
func (p OutgoingHandlerPool) Range(msgType string, f func(OutgoingHandlerFunc) bool) (res bool) {
for _, handle := range p.handlersByMsgType(msgType) {
if !f(handle.(OutgoingHandlerFunc)) {
return false
}
}
return true
}
// Add is used to add a new message handler for the specified message type.
// The function returns the ID of a message for which a handler was added.
func (p *HandlerPool) Add(msgType string, handle OutgoingHandlerFunc) int64 {
return p.add(msgType, handle)
}