@@ -25,14 +25,17 @@ const ignoreStreamId = 0
25
25
const (
26
26
connDisconnected = 0
27
27
connConnected = 1
28
- connClosed = 2
28
+ connShutdown = 2
29
+ connClosed = 3
29
30
)
30
31
31
32
const (
32
33
connTransportNone = ""
33
34
connTransportSsl = "ssl"
34
35
)
35
36
37
+ const shutdownEventKey = "box.shutdown"
38
+
36
39
type ConnEventKind int
37
40
type ConnLogKind int
38
41
@@ -45,6 +48,8 @@ const (
45
48
ReconnectFailed
46
49
// Either reconnect attempts exhausted, or explicit Close is called.
47
50
Closed
51
+ // Shutdown signals that shutdown callback is processing.
52
+ Shutdown
48
53
49
54
// LogReconnectFailed is logged when reconnect attempt failed.
50
55
LogReconnectFailed ConnLogKind = iota + 1
@@ -134,10 +139,19 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
134
139
// always returns array of array (array of tuples for space related methods).
135
140
// For Eval* and Call* Tarantool always returns array, but does not forces
136
141
// array of arrays.
142
+ //
143
+ // If connected to Tarantool 2.10 or newer, connection supports server graceful
144
+ // shutdown. In this case, server will wait until all client requests will be
145
+ // finished and client disconnects before going down (server also may go down
146
+ // by timeout). Client reconnect will happen if connection options enable
147
+ // reconnect. Beware that graceful shutdown event initialization is asynchronous.
148
+ //
149
+ // More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
137
150
type Connection struct {
138
151
addr string
139
152
c net.Conn
140
153
mutex sync.Mutex
154
+ cond * sync.Cond
141
155
// Schema contains schema loaded on connection.
142
156
Schema * Schema
143
157
// requestId contains the last request ID for requests with nil context.
@@ -162,6 +176,11 @@ type Connection struct {
162
176
serverProtocolInfo ProtocolInfo
163
177
// watchMap is a map of key -> chan watchState.
164
178
watchMap sync.Map
179
+
180
+ // shutdownWatcher is the "box.shutdown" event watcher.
181
+ shutdownWatcher Watcher
182
+ // requestCnt is a counter of active requests.
183
+ requestCnt int64
165
184
}
166
185
167
186
var _ = Connector (& Connection {}) // Check compatibility with connector interface.
@@ -385,6 +404,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
385
404
conn .opts .Logger = defaultLogger {}
386
405
}
387
406
407
+ conn .cond = sync .NewCond (& conn .mutex )
408
+
388
409
if err = conn .createConnection (false ); err != nil {
389
410
ter , ok := err .(Error )
390
411
if conn .opts .Reconnect <= 0 {
@@ -589,10 +610,20 @@ func (conn *Connection) dial() (err error) {
589
610
conn .lockShards ()
590
611
conn .c = connection
591
612
atomic .StoreUint32 (& conn .state , connConnected )
613
+ conn .cond .Broadcast ()
592
614
conn .unlockShards ()
593
615
go conn .writer (w , connection )
594
616
go conn .reader (r , connection )
595
617
618
+ // Subscribe shutdown event to process graceful shutdown.
619
+ if conn .shutdownWatcher == nil && isFeatureInSlice (WatchersFeature , conn .serverProtocolInfo .Features ) {
620
+ watcher , werr := conn .newWatcherImpl (shutdownEventKey , shutdownEventCallback )
621
+ if werr != nil {
622
+ return werr
623
+ }
624
+ conn .shutdownWatcher = watcher
625
+ }
626
+
596
627
return
597
628
}
598
629
@@ -762,10 +793,17 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
762
793
if conn .state != connClosed {
763
794
close (conn .control )
764
795
atomic .StoreUint32 (& conn .state , connClosed )
796
+ conn .cond .Broadcast ()
797
+ // Free the resources.
798
+ if conn .shutdownWatcher != nil {
799
+ go conn .shutdownWatcher .Unregister ()
800
+ conn .shutdownWatcher = nil
801
+ }
765
802
conn .notify (Closed )
766
803
}
767
804
} else {
768
805
atomic .StoreUint32 (& conn .state , connDisconnected )
806
+ conn .cond .Broadcast ()
769
807
conn .notify (Disconnected )
770
808
}
771
809
if conn .c != nil {
@@ -784,9 +822,7 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
784
822
return
785
823
}
786
824
787
- func (conn * Connection ) reconnect (neterr error , c net.Conn ) {
788
- conn .mutex .Lock ()
789
- defer conn .mutex .Unlock ()
825
+ func (conn * Connection ) reconnectImpl (neterr error , c net.Conn ) {
790
826
if conn .opts .Reconnect > 0 {
791
827
if c == conn .c {
792
828
conn .closeConnection (neterr , false )
@@ -799,6 +835,13 @@ func (conn *Connection) reconnect(neterr error, c net.Conn) {
799
835
}
800
836
}
801
837
838
+ func (conn * Connection ) reconnect (neterr error , c net.Conn ) {
839
+ conn .mutex .Lock ()
840
+ defer conn .mutex .Unlock ()
841
+ conn .reconnectImpl (neterr , c )
842
+ conn .cond .Broadcast ()
843
+ }
844
+
802
845
func (conn * Connection ) lockShards () {
803
846
for i := range conn .shard {
804
847
conn .shard [i ].rmut .Lock ()
@@ -1026,6 +1069,15 @@ func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
1026
1069
fut .done = nil
1027
1070
shard .rmut .Unlock ()
1028
1071
return
1072
+ case connShutdown :
1073
+ fut .err = ClientError {
1074
+ ErrConnectionShutdown ,
1075
+ "server shutdown in progress" ,
1076
+ }
1077
+ fut .ready = nil
1078
+ fut .done = nil
1079
+ shard .rmut .Unlock ()
1080
+ return
1029
1081
}
1030
1082
pos := (fut .requestId / conn .opts .Concurrency ) & (requestsMap - 1 )
1031
1083
if ctx != nil {
@@ -1081,23 +1133,43 @@ func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
1081
1133
}
1082
1134
}
1083
1135
1136
+ func (conn * Connection ) incrementRequestCnt () {
1137
+ atomic .AddInt64 (& conn .requestCnt , int64 (1 ))
1138
+ }
1139
+
1140
+ func (conn * Connection ) decrementRequestCnt () {
1141
+ if atomic .AddInt64 (& conn .requestCnt , int64 (- 1 )) == 0 {
1142
+ conn .cond .Broadcast ()
1143
+ }
1144
+ }
1145
+
1084
1146
func (conn * Connection ) send (req Request , streamId uint64 ) * Future {
1147
+ conn .incrementRequestCnt ()
1148
+
1085
1149
fut := conn .newFuture (req .Ctx ())
1086
1150
if fut .ready == nil {
1151
+ conn .decrementRequestCnt ()
1087
1152
return fut
1088
1153
}
1154
+
1089
1155
if req .Ctx () != nil {
1090
1156
select {
1091
1157
case <- req .Ctx ().Done ():
1092
1158
conn .cancelFuture (fut , fmt .Errorf ("context is done" ))
1159
+ // future here does not belong to any shard yet,
1160
+ // so cancelFuture don't call markDone.
1161
+ conn .decrementRequestCnt ()
1093
1162
return fut
1094
1163
default :
1095
1164
}
1096
1165
}
1166
+
1097
1167
conn .putFuture (fut , req , streamId )
1168
+
1098
1169
if req .Ctx () != nil {
1099
1170
go conn .contextWatchdog (fut , req .Ctx ())
1100
1171
}
1172
+
1101
1173
return fut
1102
1174
}
1103
1175
@@ -1164,6 +1236,7 @@ func (conn *Connection) markDone(fut *Future) {
1164
1236
if conn .rlimit != nil {
1165
1237
<- conn .rlimit
1166
1238
}
1239
+ conn .decrementRequestCnt ()
1167
1240
}
1168
1241
1169
1242
func (conn * Connection ) peekFuture (reqid uint32 ) (fut * Future ) {
@@ -1458,6 +1531,15 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error
1458
1531
return st , nil
1459
1532
}
1460
1533
1534
+ func isFeatureInSlice (expected ProtocolFeature , actualSlice []ProtocolFeature ) bool {
1535
+ for _ , actual := range actualSlice {
1536
+ if expected == actual {
1537
+ return true
1538
+ }
1539
+ }
1540
+ return false
1541
+ }
1542
+
1461
1543
// NewWatcher creates a new Watcher object for the connection.
1462
1544
//
1463
1545
// You need to require WatchersFeature to use watchers, see examples for the
@@ -1496,20 +1578,16 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
1496
1578
// asynchronous. We do not expect any response from a Tarantool instance
1497
1579
// That's why we can't just check the Tarantool response for an unsupported
1498
1580
// request error.
1499
- watchersRequired := false
1500
- for _ , feature := range conn .opts .RequiredProtocolInfo .Features {
1501
- if feature == WatchersFeature {
1502
- watchersRequired = true
1503
- break
1504
- }
1505
- }
1506
-
1507
- if ! watchersRequired {
1581
+ if ! isFeatureInSlice (WatchersFeature , conn .opts .RequiredProtocolInfo .Features ) {
1508
1582
err := fmt .Errorf ("the feature %s must be required by connection " +
1509
1583
"options to create a watcher" , WatchersFeature )
1510
1584
return nil , err
1511
1585
}
1512
1586
1587
+ return conn .newWatcherImpl (key , callback )
1588
+ }
1589
+
1590
+ func (conn * Connection ) newWatcherImpl (key string , callback WatchCallback ) (Watcher , error ) {
1513
1591
st , err := subscribeWatchChannel (conn , key )
1514
1592
if err != nil {
1515
1593
return nil , err
@@ -1563,7 +1641,11 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
1563
1641
1564
1642
if state .cnt == 0 {
1565
1643
// The last one sends IPROTO_UNWATCH.
1566
- conn .Do (newUnwatchRequest (key )).Get ()
1644
+ if ! conn .ClosedNow () {
1645
+ // conn.ClosedNow() check is a workaround for calling
1646
+ // Unregister from connectionClose().
1647
+ conn .Do (newUnwatchRequest (key )).Get ()
1648
+ }
1567
1649
conn .watchMap .Delete (key )
1568
1650
close (state .unready )
1569
1651
}
@@ -1666,3 +1748,51 @@ func (conn *Connection) ServerProtocolInfo() ProtocolInfo {
1666
1748
func (conn * Connection ) ClientProtocolInfo () ProtocolInfo {
1667
1749
return clientProtocolInfo .Clone ()
1668
1750
}
1751
+
1752
+ func shutdownEventCallback (event WatchEvent ) {
1753
+ // Receives "true" on server shutdown.
1754
+ // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1755
+ // step 2.
1756
+ val , ok := event .Value .(bool )
1757
+ if ok && val {
1758
+ go event .Conn .shutdown ()
1759
+ }
1760
+ }
1761
+
1762
+ func (conn * Connection ) shutdown () {
1763
+ // Forbid state changes.
1764
+ conn .mutex .Lock ()
1765
+ defer conn .mutex .Unlock ()
1766
+
1767
+ if ! atomic .CompareAndSwapUint32 (& (conn .state ), connConnected , connShutdown ) {
1768
+ return
1769
+ }
1770
+ conn .cond .Broadcast ()
1771
+ conn .notify (Shutdown )
1772
+
1773
+ c := conn .c
1774
+ for {
1775
+ if (atomic .LoadUint32 (& conn .state ) != connShutdown ) || (c != conn .c ) {
1776
+ return
1777
+ }
1778
+ if atomic .LoadInt64 (& conn .requestCnt ) == 0 {
1779
+ break
1780
+ }
1781
+ // Use cond var on conn.mutex since request execution may
1782
+ // call reconnect(). It is ok if state changes as part of
1783
+ // reconnect since Tarantool server won't allow to reconnect
1784
+ // in the middle of shutting down.
1785
+ conn .cond .Wait ()
1786
+ }
1787
+
1788
+ // Start to reconnect based on common rules, same as in net.box.
1789
+ // Reconnect also closes the connection: server waits until all
1790
+ // subscribed connections are terminated.
1791
+ // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1792
+ // step 3.
1793
+ conn .reconnectImpl (
1794
+ ClientError {
1795
+ ErrConnectionClosed ,
1796
+ "connection closed after server shutdown" ,
1797
+ }, conn .c )
1798
+ }
0 commit comments