From ed29fb64a77e71d8d9ad24b728725a3de5b9e122 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Fri, 22 Nov 2024 13:48:58 +0100 Subject: [PATCH 1/5] Invalidate tablets when host is removed --- host_source.go | 37 +++++++++++++++++++++++++++++++++++++ tablet_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/host_source.go b/host_source.go index 4217e8895..2ed07c565 100644 --- a/host_source.go +++ b/host_source.go @@ -611,6 +611,27 @@ func addTabletToTabletsList(tablets []*TabletInfo, tablet *TabletInfo) []*Tablet return updated_tablets2 } +// Remove all tablets that have given host as a replica +func removeTabletsWithHostFromTabletsList(tablets []*TabletInfo, host *HostInfo) []*TabletInfo { + filteredTablets := make([]*TabletInfo, 0, len(tablets)) // Preallocate for efficiency + + for _, tablet := range tablets { + // Check if any replica matches the given host ID + shouldExclude := false + for _, replica := range tablet.replicas { + if replica.hostId.String() == host.HostID() { + shouldExclude = true + break + } + } + if !shouldExclude { + filteredTablets = append(filteredTablets, tablet) + } + } + + return filteredTablets +} + // Search for place in tablets table for token starting from index l to index r func findTabletForToken(tablets []*TabletInfo, token Token, l int, r int) *TabletInfo { for l < r { @@ -1000,6 +1021,7 @@ func refreshRing(r *ringDescriber) error { } for _, host := range prevHosts { + removeTabletsWithHost(r, host) r.session.removeHost(host) } @@ -1025,6 +1047,21 @@ func addTablet(r *ringDescriber, tablet *TabletInfo) error { return nil } +func removeTabletsWithHost(r *ringDescriber, host *HostInfo) error { + r.mu.Lock() + defer r.mu.Unlock() + + tablets := r.session.getTablets() + tablets = removeTabletsWithHostFromTabletsList(tablets, host) + + r.session.ring.setTablets(tablets) + r.session.policy.SetTablets(tablets) + + r.session.schemaDescriber.refreshTabletsSchema() + + return nil +} + const ( ringRefreshDebounceTime = 1 * time.Second ) diff --git a/tablet_test.go b/tablet_test.go index 1ec931d7b..0cddea380 100644 --- a/tablet_test.go +++ b/tablet_test.go @@ -337,3 +337,33 @@ func TestAddTabletIntersectingWithLast(t *testing.T) { assertTrue(t, "Token range in tablets table not correct", CompareRanges(tablets, [][]int64{{-8611686018427387905, -7917529027641081857}, {-5011686018427387905, -2987529027641081857}})) } + +func TestRemoveTabletsWithHost(t *testing.T) { + removed_host_id := TimeUUID() + + tablets := []*TabletInfo{{ + "test_ks", + "test_tb", + -8611686018427387905, + -7917529027641081857, + []ReplicaInfo{{TimeUUID(), 9}, {TimeUUID(), 8}, {TimeUUID(), 3}}, + }, { + "test_ks", + "test_tb", + -6917529027641081857, + -4611686018427387905, + []ReplicaInfo{{removed_host_id, 9}, {TimeUUID(), 8}, {TimeUUID(), 3}}, + }, { + "test_ks", + "test_tb", + -4611686018427387905, + -2305843009213693953, + []ReplicaInfo{{TimeUUID(), 9}, {removed_host_id, 8}, {TimeUUID(), 3}}, + }} + + tablets = removeTabletsWithHostFromTabletsList(tablets, &HostInfo{ + hostId: removed_host_id.String(), + }) + + assertEqual(t, "TabletsList length", 1, len(tablets)) +} From 133e8eac6f2146a1430db0d3311378f9b1249fcd Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Mon, 25 Nov 2024 10:33:17 +0100 Subject: [PATCH 2/5] Invalidate tablets on `SCHEMA_CHANGE`, for target `keyspace` + `table`, for change types: `DROPPED` or `UPDATED` --- events.go | 10 ++++++++++ host_source.go | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++ tablet_test.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+) diff --git a/events.go b/events.go index 73461f629..100ad7b81 100644 --- a/events.go +++ b/events.go @@ -114,6 +114,7 @@ func (s *Session) handleSchemaEvent(frames []frame) { s.handleKeyspaceChange(f.keyspace, f.change) case *schemaChangeTable: s.schemaDescriber.clearSchema(f.keyspace) + s.handleTableChange(f.keyspace, f.object, f.change) case *schemaChangeAggregate: s.schemaDescriber.clearSchema(f.keyspace) case *schemaChangeFunction: @@ -126,9 +127,18 @@ func (s *Session) handleSchemaEvent(frames []frame) { func (s *Session) handleKeyspaceChange(keyspace, change string) { s.control.awaitSchemaAgreement() + if change == "DROPPED" || change == "UPDATED" { + removeTabletsWithKeyspace(s.hostSource, keyspace) + } s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace, Change: change}) } +func (s *Session) handleTableChange(keyspace, table, change string) { + if change == "DROPPED" || change == "UPDATED" { + removeTabletsWithTable(s.hostSource, keyspace, table) + } +} + // handleNodeEvent handles inbound status and topology change events. // // Status events are debounced by host IP; only the latest event is processed. diff --git a/host_source.go b/host_source.go index 2ed07c565..17b9086d8 100644 --- a/host_source.go +++ b/host_source.go @@ -632,6 +632,30 @@ func removeTabletsWithHostFromTabletsList(tablets []*TabletInfo, host *HostInfo) return filteredTablets } +func removeTabletsWithKeyspaceFromTabletsList(tablets []*TabletInfo, keyspace string) []*TabletInfo { + filteredTablets := make([]*TabletInfo, 0, len(tablets)) + + for _, tablet := range tablets { + if tablet.keyspaceName != keyspace { + filteredTablets = append(filteredTablets, tablet) + } + } + + return filteredTablets +} + +func removeTabletsWithTableFromTabletsList(tablets []*TabletInfo, keyspace string, table string) []*TabletInfo { + filteredTablets := make([]*TabletInfo, 0, len(tablets)) + + for _, tablet := range tablets { + if !(tablet.keyspaceName == keyspace && tablet.tableName == table) { + filteredTablets = append(filteredTablets, tablet) + } + } + + return filteredTablets +} + // Search for place in tablets table for token starting from index l to index r func findTabletForToken(tablets []*TabletInfo, token Token, l int, r int) *TabletInfo { for l < r { @@ -1062,6 +1086,36 @@ func removeTabletsWithHost(r *ringDescriber, host *HostInfo) error { return nil } +func removeTabletsWithKeyspace(r *ringDescriber, keyspace string) error { + r.mu.Lock() + defer r.mu.Unlock() + + tablets := r.session.getTablets() + tablets = removeTabletsWithKeyspaceFromTabletsList(tablets, keyspace) + + r.session.ring.setTablets(tablets) + r.session.policy.SetTablets(tablets) + + r.session.schemaDescriber.refreshTabletsSchema() + + return nil +} + +func removeTabletsWithTable(r *ringDescriber, keyspace string, table string) error { + r.mu.Lock() + defer r.mu.Unlock() + + tablets := r.session.getTablets() + tablets = removeTabletsWithTableFromTabletsList(tablets, keyspace, table) + + r.session.ring.setTablets(tablets) + r.session.policy.SetTablets(tablets) + + r.session.schemaDescriber.refreshTabletsSchema() + + return nil +} + const ( ringRefreshDebounceTime = 1 * time.Second ) diff --git a/tablet_test.go b/tablet_test.go index 0cddea380..09fd344ce 100644 --- a/tablet_test.go +++ b/tablet_test.go @@ -367,3 +367,55 @@ func TestRemoveTabletsWithHost(t *testing.T) { assertEqual(t, "TabletsList length", 1, len(tablets)) } + +func TestRemoveTabletsWithKeyspace(t *testing.T) { + tablets := []*TabletInfo{{ + "removed_ks", + "test_tb", + -8611686018427387905, + -7917529027641081857, + []ReplicaInfo{{TimeUUID(), 9}, {TimeUUID(), 8}, {TimeUUID(), 3}}, + }, { + "removed_ks", + "test_tb", + -6917529027641081857, + -4611686018427387905, + []ReplicaInfo{{TimeUUID(), 9}, {TimeUUID(), 8}, {TimeUUID(), 3}}, + }, { + "test_ks", + "test_tb", + -4611686018427387905, + -2305843009213693953, + []ReplicaInfo{{TimeUUID(), 9}, {TimeUUID(), 8}, {TimeUUID(), 3}}, + }} + + tablets = removeTabletsWithKeyspaceFromTabletsList(tablets, "removed_ks") + + assertEqual(t, "TabletsList length", 1, len(tablets)) +} + +func TestRemoveTabletsWithTable(t *testing.T) { + tablets := []*TabletInfo{{ + "test_ks", + "test_tb", + -8611686018427387905, + -7917529027641081857, + []ReplicaInfo{{TimeUUID(), 9}, {TimeUUID(), 8}, {TimeUUID(), 3}}, + }, { + "test_ks", + "test_tb", + -6917529027641081857, + -4611686018427387905, + []ReplicaInfo{{TimeUUID(), 9}, {TimeUUID(), 8}, {TimeUUID(), 3}}, + }, { + "test_ks", + "removed_tb", + -4611686018427387905, + -2305843009213693953, + []ReplicaInfo{{TimeUUID(), 9}, {TimeUUID(), 8}, {TimeUUID(), 3}}, + }} + + tablets = removeTabletsWithTableFromTabletsList(tablets, "test_ks", "removed_tb") + + assertEqual(t, "TabletsList length", 2, len(tablets)) +} From ef5fffaffb86401269a859f400f638de84caed29 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Mon, 25 Nov 2024 12:13:20 +0100 Subject: [PATCH 3/5] Remove information that tablets are experimental as they no longer are --- connectionpool.go | 1 - host_source.go | 5 ----- metadata_scylla.go | 6 ------ policies.go | 8 -------- ring.go | 2 -- session.go | 4 ---- 6 files changed, 26 deletions(-) diff --git a/connectionpool.go b/connectionpool.go index f085728d1..aec19d804 100644 --- a/connectionpool.go +++ b/connectionpool.go @@ -25,7 +25,6 @@ type SetPartitioner interface { } // interface to implement to receive the tablets value -// Experimental, this interface and use may change type SetTablets interface { SetTablets(tablets []*TabletInfo) } diff --git a/host_source.go b/host_source.go index 17b9086d8..598e836e9 100644 --- a/host_source.go +++ b/host_source.go @@ -504,13 +504,11 @@ func (h *HostInfo) ScyllaShardAwarePortTLS() uint16 { return h.scyllaShardAwarePortTLS } -// Experimental, this interface and use may change type ReplicaInfo struct { hostId UUID shardId int } -// Experimental, this interface and use may change type TabletInfo struct { keyspaceName string tableName string @@ -681,8 +679,6 @@ type ringDescriber struct { mu sync.Mutex prevHosts []*HostInfo prevPartitioner string - // Experimental, this interface and use may change - prevTablets []*TabletInfo } // Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces @@ -1055,7 +1051,6 @@ func refreshRing(r *ringDescriber) error { return nil } -// Experimental, this interface and use may change func addTablet(r *ringDescriber, tablet *TabletInfo) error { r.mu.Lock() defer r.mu.Unlock() diff --git a/metadata_scylla.go b/metadata_scylla.go index 30a85574f..a62a8809e 100644 --- a/metadata_scylla.go +++ b/metadata_scylla.go @@ -135,13 +135,11 @@ type IndexMetadata struct { } // TabletsMetadata holds metadata for tablet list -// Experimental, this interface and use may change type TabletsMetadata struct { Tablets []*TabletMetadata } // TabletMetadata holds metadata for single tablet -// Experimental, this interface and use may change type TabletMetadata struct { KeyspaceName string TableName string @@ -151,7 +149,6 @@ type TabletMetadata struct { } // TabletMetadata holds metadata for single replica -// Experimental, this interface and use may change type ReplicaMetadata struct { HostId UUID ShardId int @@ -247,7 +244,6 @@ type schemaDescriber struct { cache map[string]*KeyspaceMetadata - // Experimental, this interface and use may change tabletsCache *TabletsMetadata } @@ -281,7 +277,6 @@ func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, err return metadata, nil } -// Experimental, this interface and use may change func (s *schemaDescriber) getTabletsSchema() *TabletsMetadata { s.mu.Lock() defer s.mu.Unlock() @@ -291,7 +286,6 @@ func (s *schemaDescriber) getTabletsSchema() *TabletsMetadata { return metadata } -// Experimental, this interface and use may change func (s *schemaDescriber) refreshTabletsSchema() { tablets := s.session.getTablets() s.tabletsCache.Tablets = []*TabletMetadata{} diff --git a/policies.go b/policies.go index 6106bd826..a07455a16 100644 --- a/policies.go +++ b/policies.go @@ -95,7 +95,6 @@ func (c *cowHostList) remove(host *HostInfo) bool { } // cowTabletList implements a copy on write tablet list, its equivalent type is []*TabletInfo -// Experimental, this interface and use may change type cowTabletList struct { list atomic.Value mu sync.Mutex @@ -338,7 +337,6 @@ type HostTierer interface { type HostSelectionPolicy interface { HostStateNotifier SetPartitioner - // Experimental, this interface and use may change SetTablets KeyspaceChanged(KeyspaceUpdateEvent) Init(*Session) @@ -399,7 +397,6 @@ func (r *roundRobinHostPolicy) Init(*Session) {} func (r *roundRobinHostPolicy) Reset() {} func (r *roundRobinHostPolicy) IsOperational(*Session) error { return nil } -// Experimental, this interface and use may change func (r *roundRobinHostPolicy) SetTablets(tablets []*TabletInfo) {} func (r *roundRobinHostPolicy) Pick(qry ExecutableQuery) NextHost { @@ -492,7 +489,6 @@ type tokenAwareHostPolicy struct { logger StdLogger - // Experimental, this interface and use may change tablets cowTabletList avoidSlowReplicas bool @@ -579,7 +575,6 @@ func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) { } } -// Experimental, this interface and use may change func (t *tokenAwareHostPolicy) SetTablets(tablets []*TabletInfo) { t.mu.Lock() defer t.mu.Unlock() @@ -867,7 +862,6 @@ func (r *hostPoolHostPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {} func (r *hostPoolHostPolicy) SetPartitioner(string) {} func (r *hostPoolHostPolicy) IsLocal(*HostInfo) bool { return true } -// Experimental, this interface and use may change func (r *hostPoolHostPolicy) SetTablets(tablets []*TabletInfo) {} func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) { @@ -1049,7 +1043,6 @@ func (d *dcAwareRR) IsLocal(host *HostInfo) bool { return host.DataCenter() == d.local } -// Experimental, this interface and use may change func (d *dcAwareRR) SetTablets(tablets []*TabletInfo) {} func (d *dcAwareRR) AddHost(host *HostInfo) { @@ -1176,7 +1169,6 @@ func (d *rackAwareRR) setDCFailoverDisabled() { d.disableDCFailover = true } -// Experimental, this interface and use may change func (d *rackAwareRR) SetTablets(tablets []*TabletInfo) {} func (d *rackAwareRR) HostTier(host *HostInfo) uint { diff --git a/ring.go b/ring.go index 86970a766..58c76124d 100644 --- a/ring.go +++ b/ring.go @@ -22,7 +22,6 @@ type ring struct { hostList []*HostInfo pos uint32 - // Experimental, this interface and use may change tabletList []*TabletInfo // TODO: we should store the ring metadata here also. @@ -145,7 +144,6 @@ func (c *clusterMetadata) setPartitioner(partitioner string) { } } -// Experimental, this interface and use may change func (r *ring) setTablets(newTablets []*TabletInfo) { r.mu.Lock() defer r.mu.Unlock() diff --git a/session.go b/session.go index 4724d0df8..edfc3e77e 100644 --- a/session.go +++ b/session.go @@ -608,7 +608,6 @@ func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) { } // TabletsMetadata returns the metadata about tablets -// Experimental, this interface and use may change func (s *Session) TabletsMetadata() (*TabletsMetadata, error) { // fail fast if s.Closed() { @@ -638,7 +637,6 @@ func (s *Session) getConn() *Conn { return nil } -// Experimental, this interface and use may change func (s *Session) getTablets() []*TabletInfo { s.ring.mu.Lock() defer s.ring.mu.Unlock() @@ -2288,8 +2286,6 @@ type ObservedQuery struct { } // QueryObserver is the interface implemented by query observers / stat collectors. -// -// Experimental, this interface and use may change type QueryObserver interface { // ObserveQuery gets called on every query to cassandra, including all queries in an iterator when paging is enabled. // It doesn't get called if there is no query because the session is closed or there are no connections available. From 81d67ac774c3e41a39512c057b81f1ce5ccaf53b Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Mon, 25 Nov 2024 16:01:18 +0100 Subject: [PATCH 4/5] Introduce new TabletInfoList type to replace []*TabletInfo --- connectionpool.go | 2 +- host_source.go | 71 +++++++++++++++++++++++++---------------------- policies.go | 30 ++++++++++---------- ring.go | 4 +-- scylla.go | 4 +-- session.go | 4 +-- tablet_test.go | 56 ++++++++++++++++++------------------- 7 files changed, 88 insertions(+), 83 deletions(-) diff --git a/connectionpool.go b/connectionpool.go index aec19d804..ff33feec9 100644 --- a/connectionpool.go +++ b/connectionpool.go @@ -26,7 +26,7 @@ type SetPartitioner interface { // interface to implement to receive the tablets value type SetTablets interface { - SetTablets(tablets []*TabletInfo) + SetTablets(tablets TabletInfoList) } type policyConnPool struct { diff --git a/host_source.go b/host_source.go index 598e836e9..514be2c97 100644 --- a/host_source.go +++ b/host_source.go @@ -537,12 +537,14 @@ func (t *TabletInfo) Replicas() []ReplicaInfo { return t.replicas } +type TabletInfoList []*TabletInfo + // Search for place in tablets table with specific Keyspace and Table name -func findTablets(tablets []*TabletInfo, k string, t string) (int, int) { +func (t TabletInfoList) findTablets(keyspace string, table string) (int, int) { l := -1 r := -1 - for i, tablet := range tablets { - if tablet.KeyspaceName() == k && tablet.TableName() == t { + for i, tablet := range t { + if tablet.KeyspaceName() == keyspace && tablet.TableName() == table { if l == -1 { l = i } @@ -555,8 +557,8 @@ func findTablets(tablets []*TabletInfo, k string, t string) (int, int) { return l, r } -func addTabletToTabletsList(tablets []*TabletInfo, tablet *TabletInfo) []*TabletInfo { - l, r := findTablets(tablets, tablet.keyspaceName, tablet.tableName) +func (t TabletInfoList) addTabletToTabletsList(tablet *TabletInfo) TabletInfoList { + l, r := t.findTablets(tablet.keyspaceName, tablet.tableName) if l == -1 && r == -1 { l = 0 r = 0 @@ -570,7 +572,7 @@ func addTabletToTabletsList(tablets []*TabletInfo, tablet *TabletInfo) []*Tablet // find first overlaping range for l1 < r1 { mid := (l1 + r1) / 2 - if tablets[mid].FirstToken() < tablet.FirstToken() { + if t[mid].FirstToken() < tablet.FirstToken() { l1 = mid + 1 } else { r1 = mid @@ -578,42 +580,42 @@ func addTabletToTabletsList(tablets []*TabletInfo, tablet *TabletInfo) []*Tablet } start := l1 - if start > l && tablets[start-1].LastToken() > tablet.FirstToken() { + if start > l && t[start-1].LastToken() > tablet.FirstToken() { start = start - 1 } // find last overlaping range for l2 < r2 { mid := (l2 + r2) / 2 - if tablets[mid].LastToken() < tablet.LastToken() { + if t[mid].LastToken() < tablet.LastToken() { l2 = mid + 1 } else { r2 = mid } } end := l2 - if end < r && tablets[end].FirstToken() >= tablet.LastToken() { + if end < r && t[end].FirstToken() >= tablet.LastToken() { end = end - 1 } - if end == len(tablets) { + if end == len(t) { end = end - 1 } - updated_tablets := tablets + updated_tablets := t if start <= end { // Delete elements from index start to end - updated_tablets = append(tablets[:start], tablets[end+1:]...) + updated_tablets = append(t[:start], t[end+1:]...) } // Insert tablet element at index start - updated_tablets2 := append(updated_tablets[:start], append([]*TabletInfo{tablet}, updated_tablets[start:]...)...) - return updated_tablets2 + t = append(updated_tablets[:start], append([]*TabletInfo{tablet}, updated_tablets[start:]...)...) + return t } // Remove all tablets that have given host as a replica -func removeTabletsWithHostFromTabletsList(tablets []*TabletInfo, host *HostInfo) []*TabletInfo { - filteredTablets := make([]*TabletInfo, 0, len(tablets)) // Preallocate for efficiency +func (t TabletInfoList) removeTabletsWithHostFromTabletsList(host *HostInfo) TabletInfoList { + filteredTablets := make([]*TabletInfo, 0, len(t)) // Preallocate for efficiency - for _, tablet := range tablets { + for _, tablet := range t { // Check if any replica matches the given host ID shouldExclude := false for _, replica := range tablet.replicas { @@ -627,35 +629,38 @@ func removeTabletsWithHostFromTabletsList(tablets []*TabletInfo, host *HostInfo) } } - return filteredTablets + t = filteredTablets + return t } -func removeTabletsWithKeyspaceFromTabletsList(tablets []*TabletInfo, keyspace string) []*TabletInfo { - filteredTablets := make([]*TabletInfo, 0, len(tablets)) +func (t TabletInfoList) removeTabletsWithKeyspaceFromTabletsList(keyspace string) TabletInfoList { + filteredTablets := make([]*TabletInfo, 0, len(t)) - for _, tablet := range tablets { + for _, tablet := range t { if tablet.keyspaceName != keyspace { filteredTablets = append(filteredTablets, tablet) } } - return filteredTablets + t = filteredTablets + return t } -func removeTabletsWithTableFromTabletsList(tablets []*TabletInfo, keyspace string, table string) []*TabletInfo { - filteredTablets := make([]*TabletInfo, 0, len(tablets)) +func (t TabletInfoList) removeTabletsWithTableFromTabletsList(keyspace string, table string) TabletInfoList { + filteredTablets := make([]*TabletInfo, 0, len(t)) - for _, tablet := range tablets { + for _, tablet := range t { if !(tablet.keyspaceName == keyspace && tablet.tableName == table) { filteredTablets = append(filteredTablets, tablet) } } - return filteredTablets + t = filteredTablets + return t } // Search for place in tablets table for token starting from index l to index r -func findTabletForToken(tablets []*TabletInfo, token Token, l int, r int) *TabletInfo { +func (t TabletInfoList) findTabletForToken(token Token, l int, r int) *TabletInfo { for l < r { var m int if r*l > 0 { @@ -663,14 +668,14 @@ func findTabletForToken(tablets []*TabletInfo, token Token, l int, r int) *Table } else { m = (r + l) / 2 } - if int64Token(tablets[m].LastToken()).Less(token) { + if int64Token(t[m].LastToken()).Less(token) { l = m + 1 } else { r = m } } - return tablets[l] + return t[l] } // Polls system.peers at a specific interval to find new hosts @@ -1056,7 +1061,7 @@ func addTablet(r *ringDescriber, tablet *TabletInfo) error { defer r.mu.Unlock() tablets := r.session.getTablets() - tablets = addTabletToTabletsList(tablets, tablet) + tablets = tablets.addTabletToTabletsList(tablet) r.session.ring.setTablets(tablets) r.session.policy.SetTablets(tablets) @@ -1071,7 +1076,7 @@ func removeTabletsWithHost(r *ringDescriber, host *HostInfo) error { defer r.mu.Unlock() tablets := r.session.getTablets() - tablets = removeTabletsWithHostFromTabletsList(tablets, host) + tablets = tablets.removeTabletsWithHostFromTabletsList(host) r.session.ring.setTablets(tablets) r.session.policy.SetTablets(tablets) @@ -1086,7 +1091,7 @@ func removeTabletsWithKeyspace(r *ringDescriber, keyspace string) error { defer r.mu.Unlock() tablets := r.session.getTablets() - tablets = removeTabletsWithKeyspaceFromTabletsList(tablets, keyspace) + tablets = tablets.removeTabletsWithKeyspaceFromTabletsList(keyspace) r.session.ring.setTablets(tablets) r.session.policy.SetTablets(tablets) @@ -1101,7 +1106,7 @@ func removeTabletsWithTable(r *ringDescriber, keyspace string, table string) err defer r.mu.Unlock() tablets := r.session.getTablets() - tablets = removeTabletsWithTableFromTabletsList(tablets, keyspace, table) + tablets = tablets.removeTabletsWithTableFromTabletsList(keyspace, table) r.session.ring.setTablets(tablets) r.session.policy.SetTablets(tablets) diff --git a/policies.go b/policies.go index a07455a16..0d4d9393b 100644 --- a/policies.go +++ b/policies.go @@ -94,31 +94,31 @@ func (c *cowHostList) remove(host *HostInfo) bool { return true } -// cowTabletList implements a copy on write tablet list, its equivalent type is []*TabletInfo +// cowTabletList implements a copy on write tablet list, its equivalent type is TabletInfoList type cowTabletList struct { list atomic.Value mu sync.Mutex } -func (c *cowTabletList) get() []*TabletInfo { - l, ok := c.list.Load().(*[]*TabletInfo) +func (c *cowTabletList) get() TabletInfoList { + l, ok := c.list.Load().(TabletInfoList) if !ok { return nil } - return *l + return l } -func (c *cowTabletList) set(tablets []*TabletInfo) { +func (c *cowTabletList) set(tablets TabletInfoList) { c.mu.Lock() defer c.mu.Unlock() n := len(tablets) - l := make([]*TabletInfo, n) + t := make(TabletInfoList, n) for i := 0; i < n; i++ { - l[i] = tablets[i] + t[i] = tablets[i] } - c.list.Store(&l) + c.list.Store(t) } // RetryableQuery is an interface that represents a query or batch statement that @@ -397,7 +397,7 @@ func (r *roundRobinHostPolicy) Init(*Session) {} func (r *roundRobinHostPolicy) Reset() {} func (r *roundRobinHostPolicy) IsOperational(*Session) error { return nil } -func (r *roundRobinHostPolicy) SetTablets(tablets []*TabletInfo) {} +func (r *roundRobinHostPolicy) SetTablets(tablets TabletInfoList) {} func (r *roundRobinHostPolicy) Pick(qry ExecutableQuery) NextHost { nextStartOffset := atomic.AddUint64(&r.lastUsedHostIdx, 1) @@ -575,7 +575,7 @@ func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) { } } -func (t *tokenAwareHostPolicy) SetTablets(tablets []*TabletInfo) { +func (t *tokenAwareHostPolicy) SetTablets(tablets TabletInfoList) { t.mu.Lock() defer t.mu.Unlock() @@ -705,9 +705,9 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { tablets := t.tablets.get() // Search for tablets with Keyspace and Table from the Query - l, r := findTablets(tablets, qry.Keyspace(), qry.Table()) + l, r := tablets.findTablets(qry.Keyspace(), qry.Table()) if l != -1 { - tablet := findTabletForToken(tablets, token, l, r) + tablet := tablets.findTabletForToken(token, l, r) hosts := t.hosts.get() for _, replica := range tablet.Replicas() { for _, host := range hosts { @@ -862,7 +862,7 @@ func (r *hostPoolHostPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {} func (r *hostPoolHostPolicy) SetPartitioner(string) {} func (r *hostPoolHostPolicy) IsLocal(*HostInfo) bool { return true } -func (r *hostPoolHostPolicy) SetTablets(tablets []*TabletInfo) {} +func (r *hostPoolHostPolicy) SetTablets(tablets TabletInfoList) {} func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) { peers := make([]string, len(hosts)) @@ -1043,7 +1043,7 @@ func (d *dcAwareRR) IsLocal(host *HostInfo) bool { return host.DataCenter() == d.local } -func (d *dcAwareRR) SetTablets(tablets []*TabletInfo) {} +func (d *dcAwareRR) SetTablets(tablets TabletInfoList) {} func (d *dcAwareRR) AddHost(host *HostInfo) { if d.IsLocal(host) { @@ -1169,7 +1169,7 @@ func (d *rackAwareRR) setDCFailoverDisabled() { d.disableDCFailover = true } -func (d *rackAwareRR) SetTablets(tablets []*TabletInfo) {} +func (d *rackAwareRR) SetTablets(tablets TabletInfoList) {} func (d *rackAwareRR) HostTier(host *HostInfo) uint { if host.DataCenter() == d.localDC { diff --git a/ring.go b/ring.go index 58c76124d..0a02bdefb 100644 --- a/ring.go +++ b/ring.go @@ -22,7 +22,7 @@ type ring struct { hostList []*HostInfo pos uint32 - tabletList []*TabletInfo + tabletList TabletInfoList // TODO: we should store the ring metadata here also. } @@ -144,7 +144,7 @@ func (c *clusterMetadata) setPartitioner(partitioner string) { } } -func (r *ring) setTablets(newTablets []*TabletInfo) { +func (r *ring) setTablets(newTablets TabletInfoList) { r.mu.Lock() defer r.mu.Unlock() diff --git a/scylla.go b/scylla.go index 03fc8737b..745949705 100644 --- a/scylla.go +++ b/scylla.go @@ -379,10 +379,10 @@ func (p *scyllaConnPicker) Pick(t Token, qry ExecutableQuery) *Conn { tablets := conn.session.getTablets() // Search for tablets with Keyspace and Table from the Query - l, r := findTablets(tablets, qry.Keyspace(), qry.Table()) + l, r := tablets.findTablets(qry.Keyspace(), qry.Table()) if l != -1 { - tablet := findTabletForToken(tablets, mmt, l, r) + tablet := tablets.findTabletForToken(mmt, l, r) for _, replica := range tablet.replicas { if replica.hostId.String() == p.hostId { diff --git a/session.go b/session.go index edfc3e77e..a60a1206c 100644 --- a/session.go +++ b/session.go @@ -276,7 +276,7 @@ func (s *Session) init() error { hosts = filteredHosts if s.tabletsRoutingV1 { - tablets := []*TabletInfo{} + tablets := TabletInfoList{} s.ring.setTablets(tablets) s.policy.SetTablets(tablets) } @@ -637,7 +637,7 @@ func (s *Session) getConn() *Conn { return nil } -func (s *Session) getTablets() []*TabletInfo { +func (s *Session) getTablets() TabletInfoList { s.ring.mu.Lock() defer s.ring.mu.Unlock() diff --git a/tablet_test.go b/tablet_test.go index 09fd344ce..005e5dac3 100644 --- a/tablet_test.go +++ b/tablet_test.go @@ -7,7 +7,7 @@ import ( "testing" ) -var tablets = []*TabletInfo{ +var tablets = TabletInfoList{ { "test1", "table1", @@ -123,31 +123,31 @@ var tablets = []*TabletInfo{ } func TestFindTablets(t *testing.T) { - id, id2 := findTablets(tablets, "test1", "table1") + id, id2 := tablets.findTablets("test1", "table1") assertEqual(t, "id", 0, id) assertEqual(t, "id2", 7, id2) - id, id2 = findTablets(tablets, "test2", "table1") + id, id2 = tablets.findTablets("test2", "table1") assertEqual(t, "id", 8, id) assertEqual(t, "id2", 15, id2) - id, id2 = findTablets(tablets, "test3", "table1") + id, id2 = tablets.findTablets("test3", "table1") assertEqual(t, "id", -1, id) assertEqual(t, "id2", -1, id2) } func TestFindTabletForToken(t *testing.T) { - tablet := findTabletForToken(tablets, parseInt64Token("0"), 0, 7) + tablet := tablets.findTabletForToken(parseInt64Token("0"), 0, 7) assertTrue(t, "tablet.lastToken == 2305843009213693951", tablet.lastToken == 2305843009213693951) - tablet = findTabletForToken(tablets, parseInt64Token("9223372036854775807"), 0, 7) + tablet = tablets.findTabletForToken(parseInt64Token("9223372036854775807"), 0, 7) assertTrue(t, "tablet.lastToken == 9223372036854775807", tablet.lastToken == 9223372036854775807) - tablet = findTabletForToken(tablets, parseInt64Token("-4611686018427387904"), 0, 7) + tablet = tablets.findTabletForToken(parseInt64Token("-4611686018427387904"), 0, 7) assertTrue(t, "tablet.lastToken == -2305843009213693953", tablet.lastToken == -2305843009213693953) } -func CompareRanges(tablets []*TabletInfo, ranges [][]int64) bool { +func CompareRanges(tablets TabletInfoList, ranges [][]int64) bool { if len(tablets) != len(ranges) { return false } @@ -160,9 +160,9 @@ func CompareRanges(tablets []*TabletInfo, ranges [][]int64) bool { return true } func TestAddTabletToEmptyTablets(t *testing.T) { - tablets := []*TabletInfo{} + tablets := TabletInfoList{} - tablets = addTabletToTabletsList(tablets, &TabletInfo{ + tablets = tablets.addTabletToTabletsList(&TabletInfo{ "test_ks", "test_tb", -6917529027641081857, @@ -174,7 +174,7 @@ func TestAddTabletToEmptyTablets(t *testing.T) { } func TestAddTabletAtTheBeggining(t *testing.T) { - tablets := []*TabletInfo{{ + tablets := TabletInfoList{{ "test_ks", "test_tb", -6917529027641081857, @@ -182,7 +182,7 @@ func TestAddTabletAtTheBeggining(t *testing.T) { []ReplicaInfo{}, }} - tablets = addTabletToTabletsList(tablets, &TabletInfo{ + tablets = tablets.addTabletToTabletsList(&TabletInfo{ "test_ks", "test_tb", -8611686018427387905, @@ -195,7 +195,7 @@ func TestAddTabletAtTheBeggining(t *testing.T) { } func TestAddTabletAtTheEnd(t *testing.T) { - tablets := []*TabletInfo{{ + tablets := TabletInfoList{{ "test_ks", "test_tb", -6917529027641081857, @@ -203,7 +203,7 @@ func TestAddTabletAtTheEnd(t *testing.T) { []ReplicaInfo{}, }} - tablets = addTabletToTabletsList(tablets, &TabletInfo{ + tablets = tablets.addTabletToTabletsList(&TabletInfo{ "test_ks", "test_tb", -1, @@ -216,7 +216,7 @@ func TestAddTabletAtTheEnd(t *testing.T) { } func TestAddTabletInTheMiddle(t *testing.T) { - tablets := []*TabletInfo{{ + tablets := TabletInfoList{{ "test_ks", "test_tb", -6917529027641081857, @@ -230,7 +230,7 @@ func TestAddTabletInTheMiddle(t *testing.T) { []ReplicaInfo{}, }} - tablets = addTabletToTabletsList(tablets, &TabletInfo{ + tablets = tablets.addTabletToTabletsList(&TabletInfo{ "test_ks", "test_tb", -4611686018427387905, @@ -244,7 +244,7 @@ func TestAddTabletInTheMiddle(t *testing.T) { } func TestAddTabletIntersecting(t *testing.T) { - tablets := []*TabletInfo{{ + tablets := TabletInfoList{{ "test_ks", "test_tb", -6917529027641081857, @@ -270,7 +270,7 @@ func TestAddTabletIntersecting(t *testing.T) { []ReplicaInfo{}, }} - tablets = addTabletToTabletsList(tablets, &TabletInfo{ + tablets = tablets.addTabletToTabletsList(&TabletInfo{ "test_ks", "test_tb", -3611686018427387905, @@ -285,7 +285,7 @@ func TestAddTabletIntersecting(t *testing.T) { } func TestAddTabletIntersectingWithFirst(t *testing.T) { - tablets := []*TabletInfo{{ + tablets := TabletInfoList{{ "test_ks", "test_tb", -8611686018427387905, @@ -299,7 +299,7 @@ func TestAddTabletIntersectingWithFirst(t *testing.T) { []ReplicaInfo{}, }} - tablets = addTabletToTabletsList(tablets, &TabletInfo{ + tablets = tablets.addTabletToTabletsList(&TabletInfo{ "test_ks", "test_tb", -8011686018427387905, @@ -312,7 +312,7 @@ func TestAddTabletIntersectingWithFirst(t *testing.T) { } func TestAddTabletIntersectingWithLast(t *testing.T) { - tablets := []*TabletInfo{{ + tablets := TabletInfoList{{ "test_ks", "test_tb", -8611686018427387905, @@ -326,7 +326,7 @@ func TestAddTabletIntersectingWithLast(t *testing.T) { []ReplicaInfo{}, }} - tablets = addTabletToTabletsList(tablets, &TabletInfo{ + tablets = tablets.addTabletToTabletsList(&TabletInfo{ "test_ks", "test_tb", -5011686018427387905, @@ -341,7 +341,7 @@ func TestAddTabletIntersectingWithLast(t *testing.T) { func TestRemoveTabletsWithHost(t *testing.T) { removed_host_id := TimeUUID() - tablets := []*TabletInfo{{ + tablets := TabletInfoList{{ "test_ks", "test_tb", -8611686018427387905, @@ -361,7 +361,7 @@ func TestRemoveTabletsWithHost(t *testing.T) { []ReplicaInfo{{TimeUUID(), 9}, {removed_host_id, 8}, {TimeUUID(), 3}}, }} - tablets = removeTabletsWithHostFromTabletsList(tablets, &HostInfo{ + tablets = tablets.removeTabletsWithHostFromTabletsList(&HostInfo{ hostId: removed_host_id.String(), }) @@ -369,7 +369,7 @@ func TestRemoveTabletsWithHost(t *testing.T) { } func TestRemoveTabletsWithKeyspace(t *testing.T) { - tablets := []*TabletInfo{{ + tablets := TabletInfoList{{ "removed_ks", "test_tb", -8611686018427387905, @@ -389,13 +389,13 @@ func TestRemoveTabletsWithKeyspace(t *testing.T) { []ReplicaInfo{{TimeUUID(), 9}, {TimeUUID(), 8}, {TimeUUID(), 3}}, }} - tablets = removeTabletsWithKeyspaceFromTabletsList(tablets, "removed_ks") + tablets = tablets.removeTabletsWithKeyspaceFromTabletsList("removed_ks") assertEqual(t, "TabletsList length", 1, len(tablets)) } func TestRemoveTabletsWithTable(t *testing.T) { - tablets := []*TabletInfo{{ + tablets := TabletInfoList{{ "test_ks", "test_tb", -8611686018427387905, @@ -415,7 +415,7 @@ func TestRemoveTabletsWithTable(t *testing.T) { []ReplicaInfo{{TimeUUID(), 9}, {TimeUUID(), 8}, {TimeUUID(), 3}}, }} - tablets = removeTabletsWithTableFromTabletsList(tablets, "test_ks", "removed_tb") + tablets = tablets.removeTabletsWithTableFromTabletsList("test_ks", "removed_tb") assertEqual(t, "TabletsList length", 2, len(tablets)) } From 64e47cc1dfbf14e7062022595766a6f4044bce32 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Mon, 25 Nov 2024 16:55:46 +0100 Subject: [PATCH 5/5] Make tablets addition/removal a method of Session --- conn.go | 2 +- events.go | 4 ++-- host_source.go | 54 ++++++++++++++++++++------------------------------ 3 files changed, 24 insertions(+), 36 deletions(-) diff --git a/conn.go b/conn.go index e0a9c7ada..404389e98 100644 --- a/conn.go +++ b/conn.go @@ -1522,7 +1522,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter { tablet.keyspaceName = qry.routingInfo.keyspace tablet.tableName = qry.routingInfo.table - addTablet(c.session.hostSource, &tablet) + c.session.addTablet(&tablet) } } diff --git a/events.go b/events.go index 100ad7b81..ef0d3a325 100644 --- a/events.go +++ b/events.go @@ -128,14 +128,14 @@ func (s *Session) handleSchemaEvent(frames []frame) { func (s *Session) handleKeyspaceChange(keyspace, change string) { s.control.awaitSchemaAgreement() if change == "DROPPED" || change == "UPDATED" { - removeTabletsWithKeyspace(s.hostSource, keyspace) + s.removeTabletsWithKeyspace(keyspace) } s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace, Change: change}) } func (s *Session) handleTableChange(keyspace, table, change string) { if change == "DROPPED" || change == "UPDATED" { - removeTabletsWithTable(s.hostSource, keyspace, table) + s.removeTabletsWithTable(keyspace, table) } } diff --git a/host_source.go b/host_source.go index 514be2c97..5bad8a204 100644 --- a/host_source.go +++ b/host_source.go @@ -1046,7 +1046,7 @@ func refreshRing(r *ringDescriber) error { } for _, host := range prevHosts { - removeTabletsWithHost(r, host) + r.session.removeTabletsWithHost(host) r.session.removeHost(host) } @@ -1056,62 +1056,50 @@ func refreshRing(r *ringDescriber) error { return nil } -func addTablet(r *ringDescriber, tablet *TabletInfo) error { - r.mu.Lock() - defer r.mu.Unlock() - - tablets := r.session.getTablets() +func (s *Session) addTablet(tablet *TabletInfo) error { + tablets := s.getTablets() tablets = tablets.addTabletToTabletsList(tablet) - r.session.ring.setTablets(tablets) - r.session.policy.SetTablets(tablets) + s.ring.setTablets(tablets) + s.policy.SetTablets(tablets) - r.session.schemaDescriber.refreshTabletsSchema() + s.schemaDescriber.refreshTabletsSchema() return nil } -func removeTabletsWithHost(r *ringDescriber, host *HostInfo) error { - r.mu.Lock() - defer r.mu.Unlock() - - tablets := r.session.getTablets() +func (s *Session) removeTabletsWithHost(host *HostInfo) error { + tablets := s.getTablets() tablets = tablets.removeTabletsWithHostFromTabletsList(host) - r.session.ring.setTablets(tablets) - r.session.policy.SetTablets(tablets) + s.ring.setTablets(tablets) + s.policy.SetTablets(tablets) - r.session.schemaDescriber.refreshTabletsSchema() + s.schemaDescriber.refreshTabletsSchema() return nil } -func removeTabletsWithKeyspace(r *ringDescriber, keyspace string) error { - r.mu.Lock() - defer r.mu.Unlock() - - tablets := r.session.getTablets() +func (s *Session) removeTabletsWithKeyspace(keyspace string) error { + tablets := s.getTablets() tablets = tablets.removeTabletsWithKeyspaceFromTabletsList(keyspace) - r.session.ring.setTablets(tablets) - r.session.policy.SetTablets(tablets) + s.ring.setTablets(tablets) + s.policy.SetTablets(tablets) - r.session.schemaDescriber.refreshTabletsSchema() + s.schemaDescriber.refreshTabletsSchema() return nil } -func removeTabletsWithTable(r *ringDescriber, keyspace string, table string) error { - r.mu.Lock() - defer r.mu.Unlock() - - tablets := r.session.getTablets() +func (s *Session) removeTabletsWithTable(keyspace string, table string) error { + tablets := s.getTablets() tablets = tablets.removeTabletsWithTableFromTabletsList(keyspace, table) - r.session.ring.setTablets(tablets) - r.session.policy.SetTablets(tablets) + s.ring.setTablets(tablets) + s.policy.SetTablets(tablets) - r.session.schemaDescriber.refreshTabletsSchema() + s.schemaDescriber.refreshTabletsSchema() return nil }