This repository has been archived by the owner on Sep 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 936
/
Copy pathinstance_dao.go
3347 lines (3091 loc) · 118 KB
/
instance_dao.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright 2014 Outbrain Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package inst
import (
"bytes"
"database/sql"
"errors"
"fmt"
"github.com/go-sql-driver/mysql"
"regexp"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/openark/golib/log"
"github.com/openark/golib/math"
"github.com/openark/golib/sqlutils"
"github.com/patrickmn/go-cache"
"github.com/rcrowley/go-metrics"
"github.com/sjmudd/stopwatch"
"github.com/openark/orchestrator/go/attributes"
"github.com/openark/orchestrator/go/collection"
"github.com/openark/orchestrator/go/config"
"github.com/openark/orchestrator/go/db"
"github.com/openark/orchestrator/go/kv"
"github.com/openark/orchestrator/go/metrics/query"
"github.com/openark/orchestrator/go/util"
)
const (
backendDBConcurrency = 20
retryInstanceFunctionCount = 5
retryInterval = 500 * time.Millisecond
error1045AccessDenied = "Error 1045: Access denied for user"
errorConnectionRefused = "getsockopt: connection refused"
errorNoSuchHost = "no such host"
errorIOTimeout = "i/o timeout"
)
var instanceReadChan = make(chan bool, backendDBConcurrency)
var instanceWriteChan = make(chan bool, backendDBConcurrency)
// InstancesByCountReplicas is a sortable type for Instance
type InstancesByCountReplicas [](*Instance)
func (this InstancesByCountReplicas) Len() int { return len(this) }
func (this InstancesByCountReplicas) Swap(i, j int) { this[i], this[j] = this[j], this[i] }
func (this InstancesByCountReplicas) Less(i, j int) bool {
return len(this[i].Replicas) < len(this[j].Replicas)
}
// Constant strings for Group Replication information
// See https://dev.mysql.com/doc/refman/8.0/en/replication-group-members-table.html for additional information.
const (
// Group member roles
GroupReplicationMemberRolePrimary = "PRIMARY"
GroupReplicationMemberRoleSecondary = "SECONDARY"
// Group member states
GroupReplicationMemberStateOnline = "ONLINE"
GroupReplicationMemberStateRecovering = "RECOVERING"
GroupReplicationMemberStateOffline = "OFFLINE"
GroupReplicationMemberStateError = "ERROR"
)
// We use this map to identify whether the query failed because the server does not support group replication or due
// to a different reason.
var GroupReplicationNotSupportedErrors = map[uint16]bool{
// If either the group replication global variables are not known or the
// performance_schema.replication_group_members table does not exist, the host does not support group
// replication, at least in the form supported here.
1193: true, // ERROR: 1193 (HY000): Unknown system variable 'group_replication_group_name'
1146: true, // ERROR: 1146 (42S02): Table 'performance_schema.replication_group_members' doesn't exist
}
// instanceKeyInformativeClusterName is a non-authoritative cache; used for auditing or general purpose.
var instanceKeyInformativeClusterName *cache.Cache
var forgetInstanceKeys *cache.Cache
var clusterInjectedPseudoGTIDCache *cache.Cache
var accessDeniedCounter = metrics.NewCounter()
var readTopologyInstanceCounter = metrics.NewCounter()
var readInstanceCounter = metrics.NewCounter()
var writeInstanceCounter = metrics.NewCounter()
var backendWrites = collection.CreateOrReturnCollection("BACKEND_WRITES")
var writeBufferMetrics = collection.CreateOrReturnCollection("WRITE_BUFFER")
var writeBufferLatency = stopwatch.NewNamedStopwatch()
var emptyQuotesRegexp = regexp.MustCompile(`^""$`)
func init() {
metrics.Register("instance.access_denied", accessDeniedCounter)
metrics.Register("instance.read_topology", readTopologyInstanceCounter)
metrics.Register("instance.read", readInstanceCounter)
metrics.Register("instance.write", writeInstanceCounter)
writeBufferLatency.AddMany([]string{"wait", "write"})
writeBufferLatency.Start("wait")
go initializeInstanceDao()
}
func initializeInstanceDao() {
config.WaitForConfigurationToBeLoaded()
instanceWriteBuffer = make(chan instanceUpdateObject, config.Config.InstanceWriteBufferSize)
instanceKeyInformativeClusterName = cache.New(time.Duration(config.Config.InstancePollSeconds/2)*time.Second, time.Second)
forgetInstanceKeys = cache.New(time.Duration(config.Config.InstancePollSeconds*3)*time.Second, time.Second)
clusterInjectedPseudoGTIDCache = cache.New(time.Minute, time.Second)
// spin off instance write buffer flushing
go func() {
flushTick := time.Tick(time.Duration(config.Config.InstanceFlushIntervalMilliseconds) * time.Millisecond)
for {
// it is time to flush
select {
case <-flushTick:
flushInstanceWriteBuffer()
case <-forceFlushInstanceWriteBuffer:
flushInstanceWriteBuffer()
}
}
}()
}
// ExecDBWriteFunc chooses how to execute a write onto the database: whether synchronuously or not
func ExecDBWriteFunc(f func() error) error {
m := query.NewMetric()
instanceWriteChan <- true
m.WaitLatency = time.Since(m.Timestamp)
// catch the exec time and error if there is one
defer func() {
if r := recover(); r != nil {
if _, ok := r.(runtime.Error); ok {
panic(r)
}
if s, ok := r.(string); ok {
m.Err = errors.New(s)
} else {
m.Err = r.(error)
}
}
m.ExecuteLatency = time.Since(m.Timestamp.Add(m.WaitLatency))
backendWrites.Append(m)
<-instanceWriteChan // assume this takes no time
}()
res := f()
return res
}
func ExpireTableData(tableName string, timestampColumn string) error {
query := fmt.Sprintf("delete from %s where %s < NOW() - INTERVAL ? DAY", tableName, timestampColumn)
writeFunc := func() error {
_, err := db.ExecOrchestrator(query, config.Config.AuditPurgeDays)
return err
}
return ExecDBWriteFunc(writeFunc)
}
// logReadTopologyInstanceError logs an error, if applicable, for a ReadTopologyInstance operation,
// providing context and hint as for the source of the error. If there's no hint just provide the
// original error.
func logReadTopologyInstanceError(instanceKey *InstanceKey, hint string, err error) error {
if err == nil {
return nil
}
if !util.ClearToLog("ReadTopologyInstance", instanceKey.StringCode()) {
return err
}
var msg string
if hint == "" {
msg = fmt.Sprintf("ReadTopologyInstance(%+v): %+v", *instanceKey, err)
} else {
msg = fmt.Sprintf("ReadTopologyInstance(%+v) %+v: %+v",
*instanceKey,
strings.Replace(hint, "%", "%%", -1), // escape %
err)
}
return log.Errorf(msg)
}
// ReadTopologyInstance collects information on the state of a MySQL
// server and writes the result synchronously to the orchestrator
// backend.
func ReadTopologyInstance(instanceKey *InstanceKey) (*Instance, error) {
return ReadTopologyInstanceBufferable(instanceKey, false, nil)
}
// ReadTopologyInstances is a convenience method that calls ReadTopologyInstance
// for all the instance keys and returns a slice of Instance.
func ReadTopologyInstances(instanceKeys []InstanceKey) ([]*Instance, error) {
instances := make([]*Instance, 0)
for _, instanceKey := range instanceKeys {
instance, err := ReadTopologyInstance(&instanceKey)
if err != nil {
return nil, err
}
instances = append(instances, instance)
}
return instances, nil
}
func RetryInstanceFunction(f func() (*Instance, error)) (instance *Instance, err error) {
for i := 0; i < retryInstanceFunctionCount; i++ {
if instance, err = f(); err == nil {
return instance, nil
}
}
return instance, err
}
// Is this an error which means that we shouldn't try going more queries for this discovery attempt?
func unrecoverableError(err error) bool {
contains := []string{
error1045AccessDenied,
errorConnectionRefused,
errorIOTimeout,
errorNoSuchHost,
}
for _, k := range contains {
if strings.Contains(err.Error(), k) {
return true
}
}
return false
}
// Check if the instance is a MaxScale binlog server (a proxy not a real
// MySQL server) and also update the resolved hostname
func (instance *Instance) checkMaxScale(db *sql.DB, latency *stopwatch.NamedStopwatch) (isMaxScale bool, resolvedHostname string, err error) {
if config.Config.SkipMaxScaleCheck {
return isMaxScale, resolvedHostname, err
}
latency.Start("instance")
err = sqlutils.QueryRowsMap(db, "show variables like 'maxscale%'", func(m sqlutils.RowMap) error {
if m.GetString("Variable_name") == "MAXSCALE_VERSION" {
originalVersion := m.GetString("Value")
if originalVersion == "" {
originalVersion = m.GetString("value")
}
if originalVersion == "" {
originalVersion = "0.0.0"
}
instance.Version = originalVersion + "-maxscale"
instance.ServerID = 0
instance.ServerUUID = ""
instance.Uptime = 0
instance.Binlog_format = "INHERIT"
instance.ReadOnly = true
instance.LogBinEnabled = true
instance.LogReplicationUpdatesEnabled = true
resolvedHostname = instance.Key.Hostname
latency.Start("backend")
UpdateResolvedHostname(resolvedHostname, resolvedHostname)
latency.Stop("backend")
isMaxScale = true
}
return nil
})
latency.Stop("instance")
// Detect failed connection attempts and don't report the command
// we are executing as that might be confusing.
if err != nil {
if strings.Contains(err.Error(), error1045AccessDenied) {
accessDeniedCounter.Inc(1)
}
if unrecoverableError(err) {
logReadTopologyInstanceError(&instance.Key, "", err)
} else {
logReadTopologyInstanceError(&instance.Key, "show variables like 'maxscale%'", err)
}
}
return isMaxScale, resolvedHostname, err
}
// expectReplicationThreadsState expects both replication threads to be running, or both to be not running.
// Specifically, it looks for both to be "Yes" or for both to be "No".
func expectReplicationThreadsState(instanceKey *InstanceKey, expectedState ReplicationThreadState) (expectationMet bool, err error) {
db, err := db.OpenTopology(instanceKey.Hostname, instanceKey.Port)
if err != nil {
return false, err
}
err = sqlutils.QueryRowsMap(db, "show slave status", func(m sqlutils.RowMap) error {
ioThreadState := ReplicationThreadStateFromStatus(m.GetString("Slave_IO_Running"))
sqlThreadState := ReplicationThreadStateFromStatus(m.GetString("Slave_SQL_Running"))
if ioThreadState == expectedState && sqlThreadState == expectedState {
expectationMet = true
}
return nil
})
return expectationMet, err
}
// ReadTopologyInstanceBufferable connects to a topology MySQL instance
// and collects information on the server and its replication state.
// It writes the information retrieved into orchestrator's backend.
// - writes are optionally buffered.
// - timing information can be collected for the stages performed.
func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, latency *stopwatch.NamedStopwatch) (inst *Instance, err error) {
defer func() {
if r := recover(); r != nil {
err = logReadTopologyInstanceError(instanceKey, "Unexpected, aborting", fmt.Errorf("%+v", r))
}
}()
var waitGroup sync.WaitGroup
var serverUuidWaitGroup sync.WaitGroup
readingStartTime := time.Now()
instance := NewInstance()
instanceFound := false
partialSuccess := false
foundByShowSlaveHosts := false
resolvedHostname := ""
maxScaleMasterHostname := ""
isMaxScale := false
isMaxScale110 := false
slaveStatusFound := false
errorChan := make(chan error, 32)
var resolveErr error
if !instanceKey.IsValid() {
latency.Start("backend")
if err := UpdateInstanceLastAttemptedCheck(instanceKey); err != nil {
log.Errorf("ReadTopologyInstanceBufferable: %+v: %v", instanceKey, err)
}
latency.Stop("backend")
return instance, fmt.Errorf("ReadTopologyInstance will not act on invalid instance key: %+v", *instanceKey)
}
lastAttemptedCheckTimer := time.AfterFunc(time.Second, func() {
go UpdateInstanceLastAttemptedCheck(instanceKey)
})
latency.Start("instance")
db, err := db.OpenDiscovery(instanceKey.Hostname, instanceKey.Port)
latency.Stop("instance")
if err != nil {
goto Cleanup
}
instance.Key = *instanceKey
if isMaxScale, resolvedHostname, err = instance.checkMaxScale(db, latency); err != nil {
// We do not "goto Cleanup" here, although it should be the correct flow.
// Reason is 5.7's new security feature that requires GRANTs on performance_schema.session_variables.
// There is a wrong decision making in this design and the migration path to 5.7 will be difficult.
// I don't want orchestrator to put even more burden on this.
// If the statement errors, then we are unable to determine that this is maxscale, hence assume it is not.
// In which case there would be other queries sent to the server that are not affected by 5.7 behavior, and that will fail.
// Certain errors are not recoverable (for this discovery process) so it's fine to go to Cleanup
if unrecoverableError(err) {
goto Cleanup
}
}
latency.Start("instance")
if isMaxScale {
if strings.Contains(instance.Version, "1.1.0") {
isMaxScale110 = true
// Buggy buggy maxscale 1.1.0. Reported Master_Host can be corrupted.
// Therefore we (currently) take @@hostname (which is masquerading as master host anyhow)
err = db.QueryRow("select @@hostname").Scan(&maxScaleMasterHostname)
if err != nil {
goto Cleanup
}
}
if isMaxScale110 {
// Only this is supported:
db.QueryRow("select @@server_id").Scan(&instance.ServerID)
} else {
db.QueryRow("select @@global.server_id").Scan(&instance.ServerID)
db.QueryRow("select @@global.server_uuid").Scan(&instance.ServerUUID)
}
} else {
// NOT MaxScale
// We begin with a few operations we can run concurrently, and which do not depend on anything
{
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
var dummy string
// show global status works just as well with 5.6 & 5.7 (5.7 moves variables to performance_schema)
err := db.QueryRow("show global status like 'Uptime'").Scan(&dummy, &instance.Uptime)
if err != nil {
logReadTopologyInstanceError(instanceKey, "show global status like 'Uptime'", err)
// We do not "goto Cleanup" here, although it should be the correct flow.
// Reason is 5.7's new security feature that requires GRANTs on performance_schema.global_variables.
// There is a wrong decisionmaking in this design and the migration path to 5.7 will be difficult.
// I don't want orchestrator to put even more burden on this. The 'Uptime' variable is not that important
// so as to completely fail reading a 5.7 instance.
// This is supposed to be fixed in 5.7.9
}
errorChan <- err
}()
}
var mysqlHostname, mysqlReportHost string
err = db.QueryRow("select @@global.hostname, ifnull(@@global.report_host, ''), @@global.server_id, @@global.version, @@global.version_comment, @@global.read_only, @@global.binlog_format, @@global.log_bin, @@global.log_slave_updates").Scan(
&mysqlHostname, &mysqlReportHost, &instance.ServerID, &instance.Version, &instance.VersionComment, &instance.ReadOnly, &instance.Binlog_format, &instance.LogBinEnabled, &instance.LogReplicationUpdatesEnabled)
if err != nil {
goto Cleanup
}
partialSuccess = true // We at least managed to read something from the server.
switch strings.ToLower(config.Config.MySQLHostnameResolveMethod) {
case "none":
resolvedHostname = instance.Key.Hostname
case "default", "hostname", "@@hostname":
resolvedHostname = mysqlHostname
case "report_host", "@@report_host":
if mysqlReportHost == "" {
err = fmt.Errorf("MySQLHostnameResolveMethod configured to use @@report_host but %+v has NULL/empty @@report_host", instanceKey)
goto Cleanup
}
resolvedHostname = mysqlReportHost
default:
resolvedHostname = instance.Key.Hostname
}
if instance.LogBinEnabled {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := sqlutils.QueryRowsMap(db, "show master status", func(m sqlutils.RowMap) error {
var err error
instance.SelfBinlogCoordinates.LogFile = m.GetString("File")
instance.SelfBinlogCoordinates.LogPos = m.GetInt64("Position")
return err
})
errorChan <- err
}()
}
{
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
semiSyncMasterPluginLoaded := false
semiSyncReplicaPluginLoaded := false
err := sqlutils.QueryRowsMap(db, "show global variables like 'rpl_semi_sync_%'", func(m sqlutils.RowMap) error {
if m.GetString("Variable_name") == "rpl_semi_sync_master_enabled" {
instance.SemiSyncMasterEnabled = (m.GetString("Value") == "ON")
semiSyncMasterPluginLoaded = true
} else if m.GetString("Variable_name") == "rpl_semi_sync_master_timeout" {
instance.SemiSyncMasterTimeout = m.GetUint64("Value")
} else if m.GetString("Variable_name") == "rpl_semi_sync_master_wait_for_slave_count" {
instance.SemiSyncMasterWaitForReplicaCount = m.GetUint("Value")
} else if m.GetString("Variable_name") == "rpl_semi_sync_slave_enabled" {
instance.SemiSyncReplicaEnabled = (m.GetString("Value") == "ON")
semiSyncReplicaPluginLoaded = true
}
return nil
})
instance.SemiSyncAvailable = (semiSyncMasterPluginLoaded && semiSyncReplicaPluginLoaded)
errorChan <- err
}()
}
{
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := sqlutils.QueryRowsMap(db, "show global status like 'rpl_semi_sync_%'", func(m sqlutils.RowMap) error {
if m.GetString("Variable_name") == "Rpl_semi_sync_master_status" {
instance.SemiSyncMasterStatus = (m.GetString("Value") == "ON")
} else if m.GetString("Variable_name") == "Rpl_semi_sync_master_clients" {
instance.SemiSyncMasterClients = m.GetUint("Value")
} else if m.GetString("Variable_name") == "Rpl_semi_sync_slave_status" {
instance.SemiSyncReplicaStatus = (m.GetString("Value") == "ON")
}
return nil
})
errorChan <- err
}()
}
if (instance.IsOracleMySQL() || instance.IsPercona()) && !instance.IsSmallerMajorVersionByString("5.6") {
waitGroup.Add(1)
serverUuidWaitGroup.Add(1)
go func() {
defer waitGroup.Done()
defer serverUuidWaitGroup.Done()
var masterInfoRepositoryOnTable bool
// Stuff only supported on Oracle MySQL >= 5.6
// ...
// @@gtid_mode only available in Orcale MySQL >= 5.6
// Previous version just issued this query brute-force, but I don't like errors being issued where they shouldn't.
_ = db.QueryRow("select @@global.gtid_mode, @@global.server_uuid, @@global.gtid_executed, @@global.gtid_purged, @@global.master_info_repository = 'TABLE', @@global.binlog_row_image").Scan(&instance.GTIDMode, &instance.ServerUUID, &instance.ExecutedGtidSet, &instance.GtidPurged, &masterInfoRepositoryOnTable, &instance.BinlogRowImage)
if instance.GTIDMode != "" && instance.GTIDMode != "OFF" {
instance.SupportsOracleGTID = true
}
if config.Config.ReplicationCredentialsQuery != "" {
instance.ReplicationCredentialsAvailable = true
} else if masterInfoRepositoryOnTable {
_ = db.QueryRow("select count(*) > 0 and MAX(User_name) != '' from mysql.slave_master_info").Scan(&instance.ReplicationCredentialsAvailable)
}
}()
}
}
if resolvedHostname != instance.Key.Hostname {
latency.Start("backend")
UpdateResolvedHostname(instance.Key.Hostname, resolvedHostname)
latency.Stop("backend")
instance.Key.Hostname = resolvedHostname
}
if instance.Key.Hostname == "" {
err = fmt.Errorf("ReadTopologyInstance: empty hostname (%+v). Bailing out", *instanceKey)
goto Cleanup
}
go ResolveHostnameIPs(instance.Key.Hostname)
if config.Config.DataCenterPattern != "" {
if pattern, err := regexp.Compile(config.Config.DataCenterPattern); err == nil {
match := pattern.FindStringSubmatch(instance.Key.Hostname)
if len(match) != 0 {
instance.DataCenter = match[1]
}
}
// This can be overriden by later invocation of DetectDataCenterQuery
}
if config.Config.RegionPattern != "" {
if pattern, err := regexp.Compile(config.Config.RegionPattern); err == nil {
match := pattern.FindStringSubmatch(instance.Key.Hostname)
if len(match) != 0 {
instance.Region = match[1]
}
}
// This can be overriden by later invocation of DetectRegionQuery
}
if config.Config.PhysicalEnvironmentPattern != "" {
if pattern, err := regexp.Compile(config.Config.PhysicalEnvironmentPattern); err == nil {
match := pattern.FindStringSubmatch(instance.Key.Hostname)
if len(match) != 0 {
instance.PhysicalEnvironment = match[1]
}
}
// This can be overriden by later invocation of DetectPhysicalEnvironmentQuery
}
instance.ReplicationIOThreadState = ReplicationThreadStateNoThread
instance.ReplicationSQLThreadState = ReplicationThreadStateNoThread
err = sqlutils.QueryRowsMap(db, "show slave status", func(m sqlutils.RowMap) error {
instance.HasReplicationCredentials = (m.GetString("Master_User") != "")
instance.ReplicationIOThreadState = ReplicationThreadStateFromStatus(m.GetString("Slave_IO_Running"))
instance.ReplicationSQLThreadState = ReplicationThreadStateFromStatus(m.GetString("Slave_SQL_Running"))
instance.ReplicationIOThreadRuning = instance.ReplicationIOThreadState.IsRunning()
if isMaxScale110 {
// Covering buggy MaxScale 1.1.0
instance.ReplicationIOThreadRuning = instance.ReplicationIOThreadRuning && (m.GetString("Slave_IO_State") == "Binlog Dump")
}
instance.ReplicationSQLThreadRuning = instance.ReplicationSQLThreadState.IsRunning()
instance.ReadBinlogCoordinates.LogFile = m.GetString("Master_Log_File")
instance.ReadBinlogCoordinates.LogPos = m.GetInt64("Read_Master_Log_Pos")
instance.ExecBinlogCoordinates.LogFile = m.GetString("Relay_Master_Log_File")
instance.ExecBinlogCoordinates.LogPos = m.GetInt64("Exec_Master_Log_Pos")
instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates()
instance.RelaylogCoordinates.LogFile = m.GetString("Relay_Log_File")
instance.RelaylogCoordinates.LogPos = m.GetInt64("Relay_Log_Pos")
instance.RelaylogCoordinates.Type = RelayLog
instance.LastSQLError = emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(m.GetString("Last_SQL_Error")), "")
instance.LastIOError = emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(m.GetString("Last_IO_Error")), "")
instance.SQLDelay = m.GetUintD("SQL_Delay", 0)
instance.UsingOracleGTID = (m.GetIntD("Auto_Position", 0) == 1)
instance.UsingMariaDBGTID = (m.GetStringD("Using_Gtid", "No") != "No")
instance.MasterUUID = m.GetStringD("Master_UUID", "No")
instance.HasReplicationFilters = ((m.GetStringD("Replicate_Do_DB", "") != "") || (m.GetStringD("Replicate_Ignore_DB", "") != "") || (m.GetStringD("Replicate_Do_Table", "") != "") || (m.GetStringD("Replicate_Ignore_Table", "") != "") || (m.GetStringD("Replicate_Wild_Do_Table", "") != "") || (m.GetStringD("Replicate_Wild_Ignore_Table", "") != ""))
masterHostname := m.GetString("Master_Host")
if isMaxScale110 {
// Buggy buggy maxscale 1.1.0. Reported Master_Host can be corrupted.
// Therefore we (currently) take @@hostname (which is masquarading as master host anyhow)
masterHostname = maxScaleMasterHostname
}
masterKey, err := NewResolveInstanceKey(masterHostname, m.GetInt("Master_Port"))
if err != nil {
logReadTopologyInstanceError(instanceKey, "NewResolveInstanceKey", err)
}
masterKey.Hostname, resolveErr = ResolveHostname(masterKey.Hostname)
if resolveErr != nil {
logReadTopologyInstanceError(instanceKey, fmt.Sprintf("ResolveHostname(%q)", masterKey.Hostname), resolveErr)
}
instance.MasterKey = *masterKey
instance.IsDetachedMaster = instance.MasterKey.IsDetached()
instance.SecondsBehindMaster = m.GetNullInt64("Seconds_Behind_Master")
if instance.SecondsBehindMaster.Valid && instance.SecondsBehindMaster.Int64 < 0 {
log.Warningf("Host: %+v, instance.SecondsBehindMaster < 0 [%+v], correcting to 0", instanceKey, instance.SecondsBehindMaster.Int64)
instance.SecondsBehindMaster.Int64 = 0
}
// And until told otherwise:
instance.ReplicationLagSeconds = instance.SecondsBehindMaster
instance.AllowTLS = (m.GetString("Master_SSL_Allowed") == "Yes")
// Not breaking the flow even on error
slaveStatusFound = true
return nil
})
if err != nil {
goto Cleanup
}
// Populate GR information for the instance in Oracle MySQL 8.0+. To do this we need to wait for the Server UUID to
// be populated to be able to find this instance's information in performance_schema.replication_group_members by
// comparing UUIDs. We could instead resolve the MEMBER_HOST and MEMBER_PORT columns into an InstanceKey and compare
// those instead, but this could require external calls for name resolving, whereas comparing UUIDs does not.
serverUuidWaitGroup.Wait()
if instance.IsOracleMySQL() && !instance.IsSmallerMajorVersionByString("8.0") {
err := PopulateGroupReplicationInformation(instance, db)
if err != nil {
goto Cleanup
}
}
if isMaxScale && !slaveStatusFound {
err = fmt.Errorf("No 'SHOW SLAVE STATUS' output found for a MaxScale instance: %+v", instanceKey)
goto Cleanup
}
if config.Config.ReplicationLagQuery != "" && !isMaxScale {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
if err := db.QueryRow(config.Config.ReplicationLagQuery).Scan(&instance.ReplicationLagSeconds); err == nil {
if instance.ReplicationLagSeconds.Valid && instance.ReplicationLagSeconds.Int64 < 0 {
log.Warningf("Host: %+v, instance.SlaveLagSeconds < 0 [%+v], correcting to 0", instanceKey, instance.ReplicationLagSeconds.Int64)
instance.ReplicationLagSeconds.Int64 = 0
}
} else {
instance.ReplicationLagSeconds = instance.SecondsBehindMaster
logReadTopologyInstanceError(instanceKey, "ReplicationLagQuery", err)
}
}()
}
instanceFound = true
// -------------------------------------------------------------------------
// Anything after this point does not affect the fact the instance is found.
// No `goto Cleanup` after this point.
// -------------------------------------------------------------------------
// Get replicas, either by SHOW SLAVE HOSTS or via PROCESSLIST
// MaxScale does not support PROCESSLIST, so SHOW SLAVE HOSTS is the only option
if config.Config.DiscoverByShowSlaveHosts || isMaxScale {
err := sqlutils.QueryRowsMap(db, `show slave hosts`,
func(m sqlutils.RowMap) error {
// MaxScale 1.1 may trigger an error with this command, but
// also we may see issues if anything on the MySQL server locks up.
// Consequently it's important to validate the values received look
// good prior to calling ResolveHostname()
host := m.GetString("Host")
port := m.GetIntD("Port", 0)
if host == "" || port == 0 {
if isMaxScale && host == "" && port == 0 {
// MaxScale reports a bad response sometimes so ignore it.
// - seen in 1.1.0 and 1.4.3.4
return nil
}
// otherwise report the error to the caller
return fmt.Errorf("ReadTopologyInstance(%+v) 'show slave hosts' returned row with <host,port>: <%v,%v>", instanceKey, host, port)
}
replicaKey, err := NewResolveInstanceKey(host, port)
if err == nil && replicaKey.IsValid() {
if !FiltersMatchInstanceKey(replicaKey, config.Config.DiscoveryIgnoreReplicaHostnameFilters) {
instance.AddReplicaKey(replicaKey)
}
foundByShowSlaveHosts = true
}
return err
})
logReadTopologyInstanceError(instanceKey, "show slave hosts", err)
}
if !foundByShowSlaveHosts && !isMaxScale {
// Either not configured to read SHOW SLAVE HOSTS or nothing was there.
// Discover by information_schema.processlist
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := sqlutils.QueryRowsMap(db, `
select
substring_index(host, ':', 1) as slave_hostname
from
information_schema.processlist
where
command IN ('Binlog Dump', 'Binlog Dump GTID')
`,
func(m sqlutils.RowMap) error {
cname, resolveErr := ResolveHostname(m.GetString("slave_hostname"))
if resolveErr != nil {
logReadTopologyInstanceError(instanceKey, "ResolveHostname: processlist", resolveErr)
}
replicaKey := InstanceKey{Hostname: cname, Port: instance.Key.Port}
if !FiltersMatchInstanceKey(&replicaKey, config.Config.DiscoveryIgnoreReplicaHostnameFilters) {
instance.AddReplicaKey(&replicaKey)
}
return err
})
logReadTopologyInstanceError(instanceKey, "processlist", err)
}()
}
if instance.IsNDB() {
// Discover by ndbinfo about MySQL Cluster SQL nodes
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := sqlutils.QueryRowsMap(db, `
select
substring(service_URI,9) mysql_host
from
ndbinfo.processes
where
process_name='mysqld'
`,
func(m sqlutils.RowMap) error {
cname, resolveErr := ResolveHostname(m.GetString("mysql_host"))
if resolveErr != nil {
logReadTopologyInstanceError(instanceKey, "ResolveHostname: ndbinfo", resolveErr)
}
replicaKey := InstanceKey{Hostname: cname, Port: instance.Key.Port}
instance.AddReplicaKey(&replicaKey)
return err
})
logReadTopologyInstanceError(instanceKey, "ndbinfo", err)
}()
}
if config.Config.DetectDataCenterQuery != "" && !isMaxScale {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := db.QueryRow(config.Config.DetectDataCenterQuery).Scan(&instance.DataCenter)
logReadTopologyInstanceError(instanceKey, "DetectDataCenterQuery", err)
}()
}
if config.Config.DetectRegionQuery != "" && !isMaxScale {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := db.QueryRow(config.Config.DetectRegionQuery).Scan(&instance.Region)
logReadTopologyInstanceError(instanceKey, "DetectRegionQuery", err)
}()
}
if config.Config.DetectPhysicalEnvironmentQuery != "" && !isMaxScale {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := db.QueryRow(config.Config.DetectPhysicalEnvironmentQuery).Scan(&instance.PhysicalEnvironment)
logReadTopologyInstanceError(instanceKey, "DetectPhysicalEnvironmentQuery", err)
}()
}
if config.Config.DetectInstanceAliasQuery != "" && !isMaxScale {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := db.QueryRow(config.Config.DetectInstanceAliasQuery).Scan(&instance.InstanceAlias)
logReadTopologyInstanceError(instanceKey, "DetectInstanceAliasQuery", err)
}()
}
if config.Config.DetectSemiSyncEnforcedQuery != "" && !isMaxScale {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := db.QueryRow(config.Config.DetectSemiSyncEnforcedQuery).Scan(&instance.SemiSyncPriority)
logReadTopologyInstanceError(instanceKey, "DetectSemiSyncEnforcedQuery", err)
}()
}
{
latency.Start("backend")
err = ReadInstanceClusterAttributes(instance)
latency.Stop("backend")
logReadTopologyInstanceError(instanceKey, "ReadInstanceClusterAttributes", err)
}
{
// Pseudo GTID
// Depends on ReadInstanceClusterAttributes above
instance.UsingPseudoGTID = false
if config.Config.AutoPseudoGTID {
var err error
instance.UsingPseudoGTID, err = isInjectedPseudoGTID(instance.ClusterName)
log.Errore(err)
} else if config.Config.DetectPseudoGTIDQuery != "" {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
if resultData, err := sqlutils.QueryResultData(db, config.Config.DetectPseudoGTIDQuery); err == nil {
if len(resultData) > 0 {
if len(resultData[0]) > 0 {
if resultData[0][0].Valid && resultData[0][0].String == "1" {
instance.UsingPseudoGTID = true
}
}
}
} else {
logReadTopologyInstanceError(instanceKey, "DetectPseudoGTIDQuery", err)
}
}()
}
}
// First read the current PromotionRule from candidate_database_instance.
{
latency.Start("backend")
err = ReadInstancePromotionRule(instance)
latency.Stop("backend")
logReadTopologyInstanceError(instanceKey, "ReadInstancePromotionRule", err)
}
// Then check if the instance wants to set a different PromotionRule.
// We'll set it here on their behalf so there's no race between the first
// time an instance is discovered, and setting a rule like "must_not".
if config.Config.DetectPromotionRuleQuery != "" && !isMaxScale {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
var value string
err := db.QueryRow(config.Config.DetectPromotionRuleQuery).Scan(&value)
logReadTopologyInstanceError(instanceKey, "DetectPromotionRuleQuery", err)
promotionRule, err := ParseCandidatePromotionRule(value)
logReadTopologyInstanceError(instanceKey, "ParseCandidatePromotionRule", err)
if err == nil {
// We need to update candidate_database_instance.
// We register the rule even if it hasn't changed,
// to bump the last_suggested time.
instance.PromotionRule = promotionRule
err = RegisterCandidateInstance(NewCandidateDatabaseInstance(instanceKey, promotionRule).WithCurrentTime())
logReadTopologyInstanceError(instanceKey, "RegisterCandidateInstance", err)
}
}()
}
ReadClusterAliasOverride(instance)
if !isMaxScale {
if instance.SuggestedClusterAlias == "" {
// Only need to do on masters
if config.Config.DetectClusterAliasQuery != "" {
clusterAlias := ""
if err := db.QueryRow(config.Config.DetectClusterAliasQuery).Scan(&clusterAlias); err != nil {
logReadTopologyInstanceError(instanceKey, "DetectClusterAliasQuery", err)
} else {
instance.SuggestedClusterAlias = clusterAlias
}
}
}
if instance.SuggestedClusterAlias == "" {
// Not found by DetectClusterAliasQuery...
// See if a ClusterNameToAlias configuration applies
if clusterAlias := mappedClusterNameToAlias(instance.ClusterName); clusterAlias != "" {
instance.SuggestedClusterAlias = clusterAlias
}
}
}
if instance.ReplicationDepth == 0 && config.Config.DetectClusterDomainQuery != "" && !isMaxScale {
// Only need to do on masters
domainName := ""
if err := db.QueryRow(config.Config.DetectClusterDomainQuery).Scan(&domainName); err != nil {
domainName = ""
logReadTopologyInstanceError(instanceKey, "DetectClusterDomainQuery", err)
}
if domainName != "" {
latency.Start("backend")
err := WriteClusterDomainName(instance.ClusterName, domainName)
latency.Stop("backend")
logReadTopologyInstanceError(instanceKey, "WriteClusterDomainName", err)
}
}
Cleanup:
waitGroup.Wait()
close(errorChan)
err = func() error {
if err != nil {
return err
}
for err := range errorChan {
if err != nil {
return err
}
}
return nil
}()
if instanceFound {
if instance.IsCoMaster {
// Take co-master into account, and avoid infinite loop
instance.AncestryUUID = fmt.Sprintf("%s,%s", instance.MasterUUID, instance.ServerUUID)
} else {
instance.AncestryUUID = fmt.Sprintf("%s,%s", instance.AncestryUUID, instance.ServerUUID)
}
// Add replication group ancestry UUID as well. Otherwise, Orchestrator thinks there are errant GTIDs in group
// members and its slaves, even though they are not.
instance.AncestryUUID = fmt.Sprintf("%s,%s", instance.AncestryUUID, instance.ReplicationGroupName)
instance.AncestryUUID = strings.Trim(instance.AncestryUUID, ",")
if instance.ExecutedGtidSet != "" && instance.masterExecutedGtidSet != "" {
// Compare master & replica GTID sets, but ignore the sets that present the master's UUID.
// This is because orchestrator may pool master and replica at an inconvenient timing,
// such that the replica may _seems_ to have more entries than the master, when in fact
// it's just that the master's probing is stale.
redactedExecutedGtidSet, _ := NewOracleGtidSet(instance.ExecutedGtidSet)
for _, uuid := range strings.Split(instance.AncestryUUID, ",") {
if uuid != instance.ServerUUID {
redactedExecutedGtidSet.RemoveUUID(uuid)
}
if instance.IsCoMaster && uuid == instance.ServerUUID {
// If this is a co-master, then this server is likely to show its own generated GTIDs as errant,
// because its co-master has not applied them yet
redactedExecutedGtidSet.RemoveUUID(uuid)
}
}
// Avoid querying the database if there's no point:
if !redactedExecutedGtidSet.IsEmpty() {
redactedMasterExecutedGtidSet, _ := NewOracleGtidSet(instance.masterExecutedGtidSet)
redactedMasterExecutedGtidSet.RemoveUUID(instance.MasterUUID)
db.QueryRow("select gtid_subtract(?, ?)", redactedExecutedGtidSet.String(), redactedMasterExecutedGtidSet.String()).Scan(&instance.GtidErrant)
}
}
}
latency.Stop("instance")
readTopologyInstanceCounter.Inc(1)
if instanceFound {
instance.LastDiscoveryLatency = time.Since(readingStartTime)
instance.IsLastCheckValid = true
instance.IsRecentlyChecked = true
instance.IsUpToDate = true
latency.Start("backend")
if bufferWrites {
enqueueInstanceWrite(instance, instanceFound, err)
} else {
WriteInstance(instance, instanceFound, err)
}
lastAttemptedCheckTimer.Stop()
latency.Stop("backend")
return instance, nil
}
// Something is wrong, could be network-wise. Record that we
// tried to check the instance. last_attempted_check is also
// updated on success by writeInstance.
latency.Start("backend")
_ = UpdateInstanceLastChecked(&instance.Key, partialSuccess)
latency.Stop("backend")
return nil, err
}
// ReadClusterAliasOverride reads and applies SuggestedClusterAlias based on cluster_alias_override
func ReadClusterAliasOverride(instance *Instance) (err error) {
aliasOverride := ""
query := `
select
alias
from
cluster_alias_override
where
cluster_name = ?
`
err = db.QueryOrchestrator(query, sqlutils.Args(instance.ClusterName), func(m sqlutils.RowMap) error {
aliasOverride = m.GetString("alias")
return nil
})
if aliasOverride != "" {
instance.SuggestedClusterAlias = aliasOverride
}
return err
}