forked from ravendb/ravendb-go-client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
database_connection_state.go
157 lines (136 loc) · 3.77 KB
/
database_connection_state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package ravendb
import (
"fmt"
"sync"
)
var (
_ IChangesConnectionState = &DatabaseConnectionState{}
)
type DatabaseConnectionState struct {
onError []func(error)
_onDisconnect Runnable
onConnect Runnable
_value atomicInteger
lastException error
onDocumentChangeNotification []func(interface{})
onIndexChangeNotification []func(interface{})
onOperationStatusChangeNotification []func(interface{})
// protects arrays
mu sync.Mutex
}
func (s *DatabaseConnectionState) addOnError(handler func(error)) int {
s.mu.Lock()
defer s.mu.Unlock()
s.onError = append(s.onError, handler)
return len(s.onError) - 1
}
func (s *DatabaseConnectionState) removeOnError(idx int) {
s.mu.Lock()
defer s.mu.Unlock()
s.onError[idx] = nil
}
func (s *DatabaseConnectionState) inc() {
s._value.incrementAndGet()
}
func (s *DatabaseConnectionState) dec() {
if s._value.decrementAndGet() == 0 {
if s._onDisconnect != nil {
s._onDisconnect()
}
}
}
func (s *DatabaseConnectionState) error(e error) {
s.mu.Lock()
defer s.mu.Unlock()
s.lastException = e
for _, f := range s.onError {
if f != nil {
f(e)
}
}
}
func (s *DatabaseConnectionState) Close() {
// Note: not clearing as in Java because removeOnChangeNotification()
// can be called after Close()
}
func NewDatabaseConnectionState(onConnect Runnable, onDisconnect Runnable) *DatabaseConnectionState {
return &DatabaseConnectionState{
onConnect: onConnect,
_onDisconnect: onDisconnect,
}
}
func (s *DatabaseConnectionState) addOnChangeNotification(typ ChangesType, handler func(interface{})) int {
s.mu.Lock()
defer s.mu.Unlock()
var idx int
switch typ {
case ChangesType_DOCUMENT:
idx = len(s.onDocumentChangeNotification)
s.onDocumentChangeNotification = append(s.onDocumentChangeNotification, handler)
case ChangesType_INDEX:
idx = len(s.onIndexChangeNotification)
s.onIndexChangeNotification = append(s.onIndexChangeNotification, handler)
case ChangesType_OPERATION:
idx = len(s.onOperationStatusChangeNotification)
s.onOperationStatusChangeNotification = append(s.onOperationStatusChangeNotification, handler)
default:
//throw new IllegalStateException("ChangeType: " + type + " is not supported");
panicIf(true, "ChangeType: %s is not supported", typ)
}
return idx
}
func (s *DatabaseConnectionState) removeOnChangeNotification(typ ChangesType, idx int) {
s.mu.Lock()
defer s.mu.Unlock()
switch typ {
case ChangesType_DOCUMENT:
s.onDocumentChangeNotification[idx] = nil
case ChangesType_INDEX:
s.onIndexChangeNotification[idx] = nil
case ChangesType_OPERATION:
s.onOperationStatusChangeNotification[idx] = nil
default:
//throw new IllegalStateException("ChangeType: " + type + " is not supported");
panicIf(true, "ChangeType: %s is not supported", typ)
}
}
func (s *DatabaseConnectionState) send(v interface{}) error {
switch rv := v.(type) {
case *DocumentChange:
s.sendDocumentChange(rv)
case *IndexChange:
s.sendIndexChange(rv)
case *OperationStatusChange:
s.sendOperationStatusChange(rv)
default:
return fmt.Errorf("DatabaseConnectionState.send(): unsupporrted type %T", v)
}
return nil
}
func (s *DatabaseConnectionState) sendDocumentChange(documentChange *DocumentChange) {
s.mu.Lock()
defer s.mu.Unlock()
for _, f := range s.onDocumentChangeNotification {
if f != nil {
f(documentChange)
}
}
}
func (s *DatabaseConnectionState) sendIndexChange(indexChange *IndexChange) {
s.mu.Lock()
defer s.mu.Unlock()
for _, f := range s.onIndexChangeNotification {
if f != nil {
f(indexChange)
}
}
}
func (s *DatabaseConnectionState) sendOperationStatusChange(operationStatusChange *OperationStatusChange) {
s.mu.Lock()
defer s.mu.Unlock()
for _, f := range s.onOperationStatusChangeNotification {
if f != nil {
f(operationStatusChange)
}
}
}