-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
server.go
4592 lines (4115 loc) · 131 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2012-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"flag"
"fmt"
"hash/fnv"
"io"
"log"
"math/rand"
"net"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"regexp"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
// Allow dynamic profiling.
_ "net/http/pprof"
"github.com/klauspost/compress/s2"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats-server/v2/logger"
"github.com/nats-io/nkeys"
"github.com/nats-io/nuid"
)
const (
// Interval for the first PING for non client connections.
firstPingInterval = time.Second
// This is for the first ping for client connections.
firstClientPingInterval = 2 * time.Second
)
// These are protocol versions sent between server connections: ROUTER, LEAF and
// GATEWAY. We may have protocol versions that have a meaning only for a certain
// type of connections, but we don't have to have separate enums for that.
// However, it is CRITICAL to not change the order of those constants since they
// are exchanged between servers. When adding a new protocol version, add to the
// end of the list, don't try to group them by connection types.
const (
// RouteProtoZero is the original Route protocol from 2009.
// http://nats.io/documentation/internals/nats-protocol/
RouteProtoZero = iota
// RouteProtoInfo signals a route can receive more then the original INFO block.
// This can be used to update remote cluster permissions, etc...
RouteProtoInfo
// RouteProtoV2 is the new route/cluster protocol that provides account support.
RouteProtoV2
// MsgTraceProto indicates that this server understands distributed message tracing.
MsgTraceProto
)
// Will return the latest server-to-server protocol versions, unless the
// option to override it is set.
func (s *Server) getServerProto() int {
opts := s.getOpts()
// Initialize with the latest protocol version.
proto := MsgTraceProto
// For tests, we want to be able to make this server behave
// as an older server so check this option to see if we should override.
if opts.overrideProto < 0 {
// The option overrideProto is set to 0 by default (when creating an
// Options structure). Since this is the same value than the original
// proto RouteProtoZero, tests call setServerProtoForTest() with the
// desired protocol level, which sets it as negative value equal to:
// (wantedProto + 1) * -1. Here we compute back the real value.
proto = (opts.overrideProto * -1) - 1
}
return proto
}
// Used by tests.
func setServerProtoForTest(wantedProto int) int {
return (wantedProto + 1) * -1
}
// Info is the information sent to clients, routes, gateways, and leaf nodes,
// to help them understand information about this server.
type Info struct {
ID string `json:"server_id"`
Name string `json:"server_name"`
Version string `json:"version"`
Proto int `json:"proto"`
GitCommit string `json:"git_commit,omitempty"`
GoVersion string `json:"go"`
Host string `json:"host"`
Port int `json:"port"`
Headers bool `json:"headers"`
AuthRequired bool `json:"auth_required,omitempty"`
TLSRequired bool `json:"tls_required,omitempty"`
TLSVerify bool `json:"tls_verify,omitempty"`
TLSAvailable bool `json:"tls_available,omitempty"`
MaxPayload int32 `json:"max_payload"`
JetStream bool `json:"jetstream,omitempty"`
IP string `json:"ip,omitempty"`
CID uint64 `json:"client_id,omitempty"`
ClientIP string `json:"client_ip,omitempty"`
Nonce string `json:"nonce,omitempty"`
Cluster string `json:"cluster,omitempty"`
Dynamic bool `json:"cluster_dynamic,omitempty"`
Domain string `json:"domain,omitempty"`
ClientConnectURLs []string `json:"connect_urls,omitempty"` // Contains URLs a client can connect to.
WSConnectURLs []string `json:"ws_connect_urls,omitempty"` // Contains URLs a ws client can connect to.
LameDuckMode bool `json:"ldm,omitempty"`
Compression string `json:"compression,omitempty"`
// Route Specific
Import *SubjectPermission `json:"import,omitempty"`
Export *SubjectPermission `json:"export,omitempty"`
LNOC bool `json:"lnoc,omitempty"`
LNOCU bool `json:"lnocu,omitempty"`
InfoOnConnect bool `json:"info_on_connect,omitempty"` // When true the server will respond to CONNECT with an INFO
ConnectInfo bool `json:"connect_info,omitempty"` // When true this is the server INFO response to CONNECT
RoutePoolSize int `json:"route_pool_size,omitempty"`
RoutePoolIdx int `json:"route_pool_idx,omitempty"`
RouteAccount string `json:"route_account,omitempty"`
RouteAccReqID string `json:"route_acc_add_reqid,omitempty"`
GossipMode byte `json:"gossip_mode,omitempty"`
// Gateways Specific
Gateway string `json:"gateway,omitempty"` // Name of the origin Gateway (sent by gateway's INFO)
GatewayURLs []string `json:"gateway_urls,omitempty"` // Gateway URLs in the originating cluster (sent by gateway's INFO)
GatewayURL string `json:"gateway_url,omitempty"` // Gateway URL on that server (sent by route's INFO)
GatewayCmd byte `json:"gateway_cmd,omitempty"` // Command code for the receiving server to know what to do
GatewayCmdPayload []byte `json:"gateway_cmd_payload,omitempty"` // Command payload when needed
GatewayNRP bool `json:"gateway_nrp,omitempty"` // Uses new $GNR. prefix for mapped replies
GatewayIOM bool `json:"gateway_iom,omitempty"` // Indicate that all accounts will be switched to InterestOnly mode "right away"
// LeafNode Specific
LeafNodeURLs []string `json:"leafnode_urls,omitempty"` // LeafNode URLs that the server can reconnect to.
RemoteAccount string `json:"remote_account,omitempty"` // Lets the other side know the remote account that they bind to.
XKey string `json:"xkey,omitempty"` // Public server's x25519 key.
}
// Server is our main struct.
type Server struct {
// Fields accessed with atomic operations need to be 64-bit aligned
gcid uint64
// How often user logon fails due to the issuer account not being pinned.
pinnedAccFail uint64
stats
scStats
mu sync.RWMutex
reloadMu sync.RWMutex // Write-locked when a config reload is taking place ONLY
kp nkeys.KeyPair
xkp nkeys.KeyPair
xpub string
info Info
configFile string
optsMu sync.RWMutex
opts *Options
running atomic.Bool
shutdown atomic.Bool
listener net.Listener
listenerErr error
gacc *Account
sys *internal
sysAcc atomic.Pointer[Account]
js atomic.Pointer[jetStream]
isMetaLeader atomic.Bool
jsClustered atomic.Bool
accounts sync.Map
tmpAccounts sync.Map // Temporarily stores accounts that are being built
activeAccounts int32
accResolver AccountResolver
clients map[uint64]*client
routes map[string][]*client
routesPoolSize int // Configured pool size
routesReject bool // During reload, we may want to reject adding routes until some conditions are met
routesNoPool int // Number of routes that don't use pooling (connecting to older server for instance)
accRoutes map[string]map[string]*client // Key is account name, value is key=remoteID/value=route connection
accRouteByHash sync.Map // Key is account name, value is nil or a pool index
accAddedCh chan struct{}
accAddedReqID string
leafs map[uint64]*client
users map[string]*User
nkeys map[string]*NkeyUser
totalClients uint64
closed *closedRingBuffer
done chan bool
start time.Time
http net.Listener
httpHandler http.Handler
httpBasePath string
profiler net.Listener
httpReqStats map[string]uint64
routeListener net.Listener
routeListenerErr error
routeInfo Info
routeResolver netResolver
routesToSelf map[string]struct{}
routeTLSName string
leafNodeListener net.Listener
leafNodeListenerErr error
leafNodeInfo Info
leafNodeInfoJSON []byte
leafURLsMap refCountedUrlSet
leafNodeOpts struct {
resolver netResolver
dialTimeout time.Duration
}
leafRemoteCfgs []*leafNodeCfg
leafRemoteAccounts sync.Map
leafNodeEnabled bool
leafDisableConnect bool // Used in test only
leafNoCluster bool // Indicate that this server has only remotes and no cluster defined
quitCh chan struct{}
startupComplete chan struct{}
shutdownComplete chan struct{}
// Tracking Go routines
grMu sync.Mutex
grTmpClients map[uint64]*client
grRunning bool
grWG sync.WaitGroup // to wait on various go routines
cproto int64 // number of clients supporting async INFO
configTime time.Time // last time config was loaded
logging struct {
sync.RWMutex
logger Logger
trace int32
debug int32
traceSysAcc int32
}
clientConnectURLs []string
// Used internally for quick look-ups.
clientConnectURLsMap refCountedUrlSet
lastCURLsUpdate int64
// For Gateways
gatewayListener net.Listener // Accept listener
gatewayListenerErr error
gateway *srvGateway
// Used by tests to check that http.Servers do
// not set any timeout.
monitoringServer *http.Server
profilingServer *http.Server
// LameDuck mode
ldm bool
ldmCh chan bool
// Trusted public operator keys.
trustedKeys []string
// map of trusted keys to operator setting StrictSigningKeyUsage
strictSigningKeyUsage map[string]struct{}
// We use this to minimize mem copies for requests to monitoring
// endpoint /varz (when it comes from http).
varzMu sync.Mutex
varz *Varz
// This is set during a config reload if we detect that we have
// added/removed routes. The monitoring code then check that
// to know if it should update the cluster's URLs array.
varzUpdateRouteURLs bool
// Keeps a sublist of of subscriptions attached to leafnode connections
// for the $GNR.*.*.*.> subject so that a server can send back a mapped
// gateway reply.
gwLeafSubs *Sublist
// Used for expiration of mapped GW replies
gwrm struct {
w int32
ch chan time.Duration
m sync.Map
}
// For eventIDs
eventIds *nuid.NUID
// Websocket structure
websocket srvWebsocket
// MQTT structure
mqtt srvMQTT
// OCSP monitoring
ocsps []*OCSPMonitor
// OCSP peer verification (at least one TLS block)
ocspPeerVerify bool
// OCSP response cache
ocsprc OCSPResponseCache
// exporting account name the importer experienced issues with
incompleteAccExporterMap sync.Map
// Holds cluster name under different lock for mapping
cnMu sync.RWMutex
cn string
// For registering raft nodes with the server.
rnMu sync.RWMutex
raftNodes map[string]RaftNode
// For mapping from a raft node name back to a server name and cluster. Node has to be in the same domain.
nodeToInfo sync.Map
// For out of resources to not log errors too fast.
rerrMu sync.Mutex
rerrLast time.Time
connRateCounter *rateCounter
// If there is a system account configured, to still support the $G account,
// the server will create a fake user and add it to the list of users.
// Keep track of what that user name is for config reload purposes.
sysAccOnlyNoAuthUser string
// IPQueues map
ipQueues sync.Map
// To limit logging frequency
rateLimitLogging sync.Map
rateLimitLoggingCh chan time.Duration
// Total outstanding catchup bytes in flight.
gcbMu sync.RWMutex
gcbOut int64
gcbOutMax int64 // Taken from JetStreamMaxCatchup or defaultMaxTotalCatchupOutBytes
// A global chanel to kick out stalled catchup sequences.
gcbKick chan struct{}
// Total outbound syncRequests
syncOutSem chan struct{}
// Queue to process JS API requests that come from routes (or gateways)
jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq]
// Delayed API responses.
delayedAPIResponses *ipQueue[*delayedAPIResponse]
// Whether moving NRG traffic into accounts is permitted on this server.
// Controls whether or not the account NRG capability is set in statsz.
// Currently used by unit tests to simulate nodes not supporting account NRG.
accountNRGAllowed atomic.Bool
}
// For tracking JS nodes.
type nodeInfo struct {
name string
version string
cluster string
domain string
id string
tags jwt.TagList
cfg *JetStreamConfig
stats *JetStreamStats
offline bool
js bool
binarySnapshots bool
accountNRG bool
}
// Make sure all are 64bits for atomic use
type stats struct {
inMsgs int64
outMsgs int64
inBytes int64
outBytes int64
slowConsumers int64
}
// scStats includes the total and per connection counters of Slow Consumers.
type scStats struct {
clients atomic.Uint64
routes atomic.Uint64
leafs atomic.Uint64
gateways atomic.Uint64
}
// This is used by tests so we can run all server tests with a default route
// or leafnode compression mode. For instance:
// go test -race -v ./server -cluster_compression=fast
var (
testDefaultClusterCompression string
testDefaultLeafNodeCompression string
)
// Compression modes.
const (
CompressionNotSupported = "not supported"
CompressionOff = "off"
CompressionAccept = "accept"
CompressionS2Auto = "s2_auto"
CompressionS2Uncompressed = "s2_uncompressed"
CompressionS2Fast = "s2_fast"
CompressionS2Better = "s2_better"
CompressionS2Best = "s2_best"
)
// defaultCompressionS2AutoRTTThresholds is the default of RTT thresholds for
// the CompressionS2Auto mode.
var defaultCompressionS2AutoRTTThresholds = []time.Duration{
// [0..10ms] -> CompressionS2Uncompressed
10 * time.Millisecond,
// ]10ms..50ms] -> CompressionS2Fast
50 * time.Millisecond,
// ]50ms..100ms] -> CompressionS2Better
100 * time.Millisecond,
// ]100ms..] -> CompressionS2Best
}
// For a given user provided string, matches to one of the compression mode
// constant and updates the provided string to that constant. Returns an
// error if the provided compression mode is not known.
// The parameter `chosenModeForOn` indicates which compression mode to use
// when the user selects "on" (or enabled, true, etc..). This is because
// we may have different defaults depending on where the compression is used.
func validateAndNormalizeCompressionOption(c *CompressionOpts, chosenModeForOn string) error {
if c == nil {
return nil
}
cmtl := strings.ToLower(c.Mode)
// First, check for the "on" case so that we set to the default compression
// mode for that. The other switch/case will finish setup if needed (for
// instance if the default mode is s2Auto).
switch cmtl {
case "on", "enabled", "true":
cmtl = chosenModeForOn
default:
}
// Check (again) with the proper mode.
switch cmtl {
case "not supported", "not_supported":
c.Mode = CompressionNotSupported
case "disabled", "off", "false":
c.Mode = CompressionOff
case "accept":
c.Mode = CompressionAccept
case "auto", "s2_auto":
var rtts []time.Duration
if len(c.RTTThresholds) == 0 {
rtts = defaultCompressionS2AutoRTTThresholds
} else {
for _, n := range c.RTTThresholds {
// Do not error on negative, but simply set to 0
if n < 0 {
n = 0
}
// Make sure they are properly ordered. However, it is possible
// to have a "0" anywhere in the list to indicate that this
// compression level should not be used.
if l := len(rtts); l > 0 && n != 0 {
for _, v := range rtts {
if n < v {
return fmt.Errorf("RTT threshold values %v should be in ascending order", c.RTTThresholds)
}
}
}
rtts = append(rtts, n)
}
if len(rtts) > 0 {
// Trim 0 that are at the end.
stop := -1
for i := len(rtts) - 1; i >= 0; i-- {
if rtts[i] != 0 {
stop = i
break
}
}
rtts = rtts[:stop+1]
}
if len(rtts) > 4 {
// There should be at most values for "uncompressed", "fast",
// "better" and "best" (when some 0 are present).
return fmt.Errorf("compression mode %q should have no more than 4 RTT thresholds: %v", c.Mode, c.RTTThresholds)
} else if len(rtts) == 0 {
// But there should be at least 1 if the user provided the slice.
// We would be here only if it was provided by say with values
// being a single or all zeros.
return fmt.Errorf("compression mode %q requires at least one RTT threshold", c.Mode)
}
}
c.Mode = CompressionS2Auto
c.RTTThresholds = rtts
case "fast", "s2_fast":
c.Mode = CompressionS2Fast
case "better", "s2_better":
c.Mode = CompressionS2Better
case "best", "s2_best":
c.Mode = CompressionS2Best
default:
return fmt.Errorf("unsupported compression mode %q", c.Mode)
}
return nil
}
// Returns `true` if the compression mode `m` indicates that the server
// will negotiate compression with the remote server, `false` otherwise.
// Note that the provided compression mode is assumed to have been
// normalized and validated.
func needsCompression(m string) bool {
return m != _EMPTY_ && m != CompressionOff && m != CompressionNotSupported
}
// Compression is asymmetric, meaning that one side can have a different
// compression level than the other. However, we need to check for cases
// when this server `scm` or the remote `rcm` do not support compression
// (say older server, or test to make it behave as it is not), or have
// the compression off.
// Note that `scm` is assumed to not be "off" or "not supported".
func selectCompressionMode(scm, rcm string) (mode string, err error) {
if rcm == CompressionNotSupported || rcm == _EMPTY_ {
return CompressionNotSupported, nil
}
switch rcm {
case CompressionOff:
// If the remote explicitly disables compression, then we won't
// use compression.
return CompressionOff, nil
case CompressionAccept:
// If the remote is ok with compression (but is not initiating it),
// and if we too are in this mode, then it means no compression.
if scm == CompressionAccept {
return CompressionOff, nil
}
// Otherwise use our compression mode.
return scm, nil
case CompressionS2Auto, CompressionS2Uncompressed, CompressionS2Fast, CompressionS2Better, CompressionS2Best:
// This case is here to make sure that if we don't recognize a
// compression setting, we error out.
if scm == CompressionAccept {
// If our compression mode is "accept", then we will use the remote
// compression mode, except if it is "auto", in which case we will
// default to "fast". This is not a configuration (auto in one
// side and accept in the other) that would be recommended.
if rcm == CompressionS2Auto {
return CompressionS2Fast, nil
}
// Use their compression mode.
return rcm, nil
}
// Otherwise use our compression mode.
return scm, nil
default:
return _EMPTY_, fmt.Errorf("unsupported route compression mode %q", rcm)
}
}
// If the configured compression mode is "auto" then will return that,
// otherwise will return the given `cm` compression mode.
func compressionModeForInfoProtocol(co *CompressionOpts, cm string) string {
if co.Mode == CompressionS2Auto {
return CompressionS2Auto
}
return cm
}
// Given a connection RTT and a list of thresholds durations, this
// function will return an S2 compression level such as "uncompressed",
// "fast", "better" or "best". For instance, with the following slice:
// [5ms, 10ms, 15ms, 20ms], a RTT of up to 5ms will result
// in the compression level "uncompressed", ]5ms..10ms] will result in
// "fast" compression, etc..
// However, the 0 value allows for disabling of some compression levels.
// For instance, the following slice: [0, 0, 20, 30] means that a RTT of
// [0..20ms] would result in the "better" compression - effectively disabling
// the use of "uncompressed" and "fast", then anything above 20ms would
// result in the use of "best" level (the 30 in the list has no effect
// and the list could have been simplified to [0, 0, 20]).
func selectS2AutoModeBasedOnRTT(rtt time.Duration, rttThresholds []time.Duration) string {
var idx int
var found bool
for i, d := range rttThresholds {
if rtt <= d {
idx = i
found = true
break
}
}
if !found {
// If we did not find but we have all levels, then use "best",
// otherwise use the last one in array.
if l := len(rttThresholds); l >= 3 {
idx = 3
} else {
idx = l - 1
}
}
switch idx {
case 0:
return CompressionS2Uncompressed
case 1:
return CompressionS2Fast
case 2:
return CompressionS2Better
}
return CompressionS2Best
}
// Returns an array of s2 WriterOption based on the route compression mode.
// So far we return a single option, but this way we can call s2.NewWriter()
// with a nil []s2.WriterOption, but not with a nil s2.WriterOption, so
// this is more versatile.
func s2WriterOptions(cm string) []s2.WriterOption {
_opts := [2]s2.WriterOption{}
opts := append(
_opts[:0],
s2.WriterConcurrency(1), // Stop asynchronous flushing in separate goroutines
)
switch cm {
case CompressionS2Uncompressed:
return append(opts, s2.WriterUncompressed())
case CompressionS2Best:
return append(opts, s2.WriterBestCompression())
case CompressionS2Better:
return append(opts, s2.WriterBetterCompression())
default:
return nil
}
}
// New will setup a new server struct after parsing the options.
// DEPRECATED: Use NewServer(opts)
func New(opts *Options) *Server {
s, _ := NewServer(opts)
return s
}
// NewServer will setup a new server struct after parsing the options.
// Could return an error if options can not be validated.
// The provided Options type should not be re-used afterwards.
// Either use Options.Clone() to pass a copy, or make a new one.
func NewServer(opts *Options) (*Server, error) {
setBaselineOptions(opts)
// Process TLS options, including whether we require client certificates.
tlsReq := opts.TLSConfig != nil
verify := (tlsReq && opts.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert)
// Create our server's nkey identity.
kp, _ := nkeys.CreateServer()
pub, _ := kp.PublicKey()
// Create an xkey for encrypting messages from this server.
xkp, _ := nkeys.CreateCurveKeys()
xpub, _ := xkp.PublicKey()
serverName := pub
if opts.ServerName != _EMPTY_ {
serverName = opts.ServerName
}
httpBasePath := normalizeBasePath(opts.HTTPBasePath)
// Validate some options. This is here because we cannot assume that
// server will always be started with configuration parsing (that could
// report issues). Its options can be (incorrectly) set by hand when
// server is embedded. If there is an error, return nil.
if err := validateOptions(opts); err != nil {
return nil, err
}
info := Info{
ID: pub,
XKey: xpub,
Version: VERSION,
Proto: PROTO,
GitCommit: gitCommit,
GoVersion: runtime.Version(),
Name: serverName,
Host: opts.Host,
Port: opts.Port,
AuthRequired: false,
TLSRequired: tlsReq && !opts.AllowNonTLS,
TLSVerify: verify,
MaxPayload: opts.MaxPayload,
JetStream: opts.JetStream,
Headers: !opts.NoHeaderSupport,
Cluster: opts.Cluster.Name,
Domain: opts.JetStreamDomain,
}
if tlsReq && !info.TLSRequired {
info.TLSAvailable = true
}
now := time.Now()
s := &Server{
kp: kp,
xkp: xkp,
xpub: xpub,
configFile: opts.ConfigFile,
info: info,
opts: opts,
done: make(chan bool, 1),
start: now,
configTime: now,
gwLeafSubs: NewSublistWithCache(),
httpBasePath: httpBasePath,
eventIds: nuid.New(),
routesToSelf: make(map[string]struct{}),
httpReqStats: make(map[string]uint64), // Used to track HTTP requests
rateLimitLoggingCh: make(chan time.Duration, 1),
leafNodeEnabled: opts.LeafNode.Port != 0 || len(opts.LeafNode.Remotes) > 0,
syncOutSem: make(chan struct{}, maxConcurrentSyncRequests),
}
// Delayed API response queue. Create regardless if JetStream is configured
// or not (since it can be enabled/disabled with config reload, we want this
// queue to exist at all times).
s.delayedAPIResponses = newIPQueue[*delayedAPIResponse](s, "delayed API responses")
// By default we'll allow account NRG.
s.accountNRGAllowed.Store(true)
// Fill up the maximum in flight syncRequests for this server.
// Used in JetStream catchup semantics.
for i := 0; i < maxConcurrentSyncRequests; i++ {
s.syncOutSem <- struct{}{}
}
if opts.TLSRateLimit > 0 {
s.connRateCounter = newRateCounter(opts.tlsConfigOpts.RateLimit)
}
// Trusted root operator keys.
if !s.processTrustedKeys() {
return nil, fmt.Errorf("Error processing trusted operator keys")
}
// If we have solicited leafnodes but no clustering and no clustername.
// However we may need a stable clustername so use the server name.
if len(opts.LeafNode.Remotes) > 0 && opts.Cluster.Port == 0 && opts.Cluster.Name == _EMPTY_ {
s.leafNoCluster = true
opts.Cluster.Name = opts.ServerName
}
if opts.Cluster.Name != _EMPTY_ {
// Also place into mapping cn with cnMu lock.
s.cnMu.Lock()
s.cn = opts.Cluster.Name
s.cnMu.Unlock()
}
s.mu.Lock()
defer s.mu.Unlock()
// Place ourselves in the JetStream nodeInfo if needed.
if opts.JetStream {
ourNode := getHash(serverName)
s.nodeToInfo.Store(ourNode, nodeInfo{
serverName,
VERSION,
opts.Cluster.Name,
opts.JetStreamDomain,
info.ID,
opts.Tags,
&JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true},
nil,
false, true, true, true,
})
}
s.routeResolver = opts.Cluster.resolver
if s.routeResolver == nil {
s.routeResolver = net.DefaultResolver
}
// Used internally for quick look-ups.
s.clientConnectURLsMap = make(refCountedUrlSet)
s.websocket.connectURLsMap = make(refCountedUrlSet)
s.leafURLsMap = make(refCountedUrlSet)
// Ensure that non-exported options (used in tests) are properly set.
s.setLeafNodeNonExportedOptions()
// Setup OCSP Stapling and OCSP Peer. This will abort server from starting if there
// are no valid staples and OCSP Stapling policy is set to Always or MustStaple.
if err := s.enableOCSP(); err != nil {
return nil, err
}
// Call this even if there is no gateway defined. It will
// initialize the structure so we don't have to check for
// it to be nil or not in various places in the code.
if err := s.newGateway(opts); err != nil {
return nil, err
}
// If we have a cluster definition but do not have a cluster name, create one.
if opts.Cluster.Port != 0 && opts.Cluster.Name == _EMPTY_ {
s.info.Cluster = nuid.Next()
} else if opts.Cluster.Name != _EMPTY_ {
// Likewise here if we have a cluster name set.
s.info.Cluster = opts.Cluster.Name
}
// This is normally done in the AcceptLoop, once the
// listener has been created (possibly with random port),
// but since some tests may expect the INFO to be properly
// set after New(), let's do it now.
s.setInfoHostPort()
// For tracking clients
s.clients = make(map[uint64]*client)
// For tracking closed clients.
s.closed = newClosedRingBuffer(opts.MaxClosedClients)
// For tracking connections that are not yet registered
// in s.routes, but for which readLoop has started.
s.grTmpClients = make(map[uint64]*client)
// For tracking routes and their remote ids
s.initRouteStructures(opts)
// For tracking leaf nodes.
s.leafs = make(map[uint64]*client)
// Used to kick out all go routines possibly waiting on server
// to shutdown.
s.quitCh = make(chan struct{})
// Closed when startup is complete. ReadyForConnections() will block on
// this before checking the presence of listening sockets.
s.startupComplete = make(chan struct{})
// Closed when Shutdown() is complete. Allows WaitForShutdown() to block
// waiting for complete shutdown.
s.shutdownComplete = make(chan struct{})
// Check for configured account resolvers.
if err := s.configureResolver(); err != nil {
return nil, err
}
// If there is an URL account resolver, do basic test to see if anyone is home.
if ar := opts.AccountResolver; ar != nil {
if ur, ok := ar.(*URLAccResolver); ok {
if _, err := ur.Fetch(_EMPTY_); err != nil {
return nil, err
}
}
}
// For other resolver:
// In operator mode, when the account resolver depends on an external system and
// the system account can't be fetched, inject a temporary one.
if ar := s.accResolver; len(opts.TrustedOperators) == 1 && ar != nil &&
opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT {
if _, ok := ar.(*MemAccResolver); !ok {
s.mu.Unlock()
var a *Account
// perform direct lookup to avoid warning trace
if _, err := fetchAccount(ar, opts.SystemAccount); err == nil {
a, _ = s.lookupAccount(opts.SystemAccount)
}
s.mu.Lock()
if a == nil {
sac := NewAccount(opts.SystemAccount)
sac.Issuer = opts.TrustedOperators[0].Issuer
sac.signingKeys = map[string]jwt.Scope{}
sac.signingKeys[opts.SystemAccount] = nil
s.registerAccountNoLock(sac)
}
}
}
// For tracking accounts
if _, err := s.configureAccounts(false); err != nil {
return nil, err
}
// Used to setup Authorization.
s.configureAuthorization()
// Start signal handler
s.handleSignals()
return s, nil
}
// Initializes route structures based on pooling and/or per-account routes.
//
// Server lock is held on entry
func (s *Server) initRouteStructures(opts *Options) {
s.routes = make(map[string][]*client)
if ps := opts.Cluster.PoolSize; ps > 0 {
s.routesPoolSize = ps
} else {
s.routesPoolSize = 1
}
// If we have per-account routes, we create accRoutes and initialize it
// with nil values. The presence of an account as the key will allow us
// to know if a given account is supposed to have dedicated routes.
if l := len(opts.Cluster.PinnedAccounts); l > 0 {
s.accRoutes = make(map[string]map[string]*client, l)
for _, acc := range opts.Cluster.PinnedAccounts {
s.accRoutes[acc] = make(map[string]*client)
}
}
}
func (s *Server) logRejectedTLSConns() {
defer s.grWG.Done()
t := time.NewTicker(time.Second)
defer t.Stop()
for {
select {
case <-s.quitCh:
return
case <-t.C:
blocked := s.connRateCounter.countBlocked()
if blocked > 0 {
s.Warnf("Rejected %d connections due to TLS rate limiting", blocked)
}
}
}
}
// clusterName returns our cluster name which could be dynamic.
func (s *Server) ClusterName() string {
s.mu.RLock()
cn := s.info.Cluster
s.mu.RUnlock()
return cn
}
// Grabs cluster name with cluster name specific lock.
func (s *Server) cachedClusterName() string {
s.cnMu.RLock()
cn := s.cn
s.cnMu.RUnlock()
return cn
}
// setClusterName will update the cluster name for this server.
func (s *Server) setClusterName(name string) {
s.mu.Lock()
var resetCh chan struct{}
if s.sys != nil && s.info.Cluster != name {
// can't hold the lock as go routine reading it may be waiting for lock as well
resetCh = s.sys.resetCh
}
s.info.Cluster = name
s.routeInfo.Cluster = name
// Need to close solicited leaf nodes. The close has to be done outside of the server lock.
var leafs []*client
for _, c := range s.leafs {
c.mu.Lock()
if c.leaf != nil && c.leaf.remote != nil {
leafs = append(leafs, c)
}
c.mu.Unlock()
}
s.mu.Unlock()
// Also place into mapping cn with cnMu lock.
s.cnMu.Lock()
s.cn = name
s.cnMu.Unlock()
for _, l := range leafs {
l.closeConnection(ClusterNameConflict)
}
if resetCh != nil {
resetCh <- struct{}{}
}
s.Noticef("Cluster name updated to %s", name)
}