Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Invalidate tablets data #351

Merged
merged 5 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type SetPartitioner interface {
}

// interface to implement to receive the tablets value
// Experimental, this interface and use may change
type SetTablets interface {
SetTablets(tablets []*TabletInfo)
}
Expand Down
10 changes: 10 additions & 0 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (s *Session) handleSchemaEvent(frames []frame) {
s.handleKeyspaceChange(f.keyspace, f.change)
case *schemaChangeTable:
s.schemaDescriber.clearSchema(f.keyspace)
s.handleTableChange(f.keyspace, f.object, f.change)
case *schemaChangeAggregate:
s.schemaDescriber.clearSchema(f.keyspace)
case *schemaChangeFunction:
Expand All @@ -126,9 +127,18 @@ func (s *Session) handleSchemaEvent(frames []frame) {

func (s *Session) handleKeyspaceChange(keyspace, change string) {
s.control.awaitSchemaAgreement()
if change == "DROPPED" || change == "UPDATED" {
removeTabletsWithKeyspace(s.hostSource, keyspace)
}
s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace, Change: change})
}

func (s *Session) handleTableChange(keyspace, table, change string) {
if change == "DROPPED" || change == "UPDATED" {
removeTabletsWithTable(s.hostSource, keyspace, table)
}
}

// handleNodeEvent handles inbound status and topology change events.
//
// Status events are debounced by host IP; only the latest event is processed.
Expand Down
96 changes: 91 additions & 5 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,13 +504,11 @@ func (h *HostInfo) ScyllaShardAwarePortTLS() uint16 {
return h.scyllaShardAwarePortTLS
}

// Experimental, this interface and use may change
type ReplicaInfo struct {
hostId UUID
shardId int
}

// Experimental, this interface and use may change
type TabletInfo struct {
keyspaceName string
tableName string
Expand Down Expand Up @@ -611,6 +609,51 @@ func addTabletToTabletsList(tablets []*TabletInfo, tablet *TabletInfo) []*Tablet
return updated_tablets2
}

// Remove all tablets that have given host as a replica
func removeTabletsWithHostFromTabletsList(tablets []*TabletInfo, host *HostInfo) []*TabletInfo {
dkropachev marked this conversation as resolved.
Show resolved Hide resolved
filteredTablets := make([]*TabletInfo, 0, len(tablets)) // Preallocate for efficiency

for _, tablet := range tablets {
// Check if any replica matches the given host ID
shouldExclude := false
for _, replica := range tablet.replicas {
if replica.hostId.String() == host.HostID() {
shouldExclude = true
break
}
}
if !shouldExclude {
filteredTablets = append(filteredTablets, tablet)
}
}

return filteredTablets
}

func removeTabletsWithKeyspaceFromTabletsList(tablets []*TabletInfo, keyspace string) []*TabletInfo {
dkropachev marked this conversation as resolved.
Show resolved Hide resolved
filteredTablets := make([]*TabletInfo, 0, len(tablets))

for _, tablet := range tablets {
if tablet.keyspaceName != keyspace {
filteredTablets = append(filteredTablets, tablet)
}
}

return filteredTablets
}

func removeTabletsWithTableFromTabletsList(tablets []*TabletInfo, keyspace string, table string) []*TabletInfo {
dkropachev marked this conversation as resolved.
Show resolved Hide resolved
filteredTablets := make([]*TabletInfo, 0, len(tablets))

for _, tablet := range tablets {
if !(tablet.keyspaceName == keyspace && tablet.tableName == table) {
filteredTablets = append(filteredTablets, tablet)
}
}

return filteredTablets
}

// Search for place in tablets table for token starting from index l to index r
func findTabletForToken(tablets []*TabletInfo, token Token, l int, r int) *TabletInfo {
for l < r {
Expand All @@ -636,8 +679,6 @@ type ringDescriber struct {
mu sync.Mutex
prevHosts []*HostInfo
prevPartitioner string
// Experimental, this interface and use may change
prevTablets []*TabletInfo
}

// Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
Expand Down Expand Up @@ -1000,6 +1041,7 @@ func refreshRing(r *ringDescriber) error {
}

for _, host := range prevHosts {
removeTabletsWithHost(r, host)
r.session.removeHost(host)
}

Expand All @@ -1009,7 +1051,6 @@ func refreshRing(r *ringDescriber) error {
return nil
}

// Experimental, this interface and use may change
func addTablet(r *ringDescriber, tablet *TabletInfo) error {
r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -1025,6 +1066,51 @@ func addTablet(r *ringDescriber, tablet *TabletInfo) error {
return nil
}

func removeTabletsWithHost(r *ringDescriber, host *HostInfo) error {
dkropachev marked this conversation as resolved.
Show resolved Hide resolved
r.mu.Lock()
defer r.mu.Unlock()

tablets := r.session.getTablets()
tablets = removeTabletsWithHostFromTabletsList(tablets, host)

r.session.ring.setTablets(tablets)
r.session.policy.SetTablets(tablets)

r.session.schemaDescriber.refreshTabletsSchema()

return nil
}

func removeTabletsWithKeyspace(r *ringDescriber, keyspace string) error {
dkropachev marked this conversation as resolved.
Show resolved Hide resolved
r.mu.Lock()
defer r.mu.Unlock()

tablets := r.session.getTablets()
tablets = removeTabletsWithKeyspaceFromTabletsList(tablets, keyspace)

r.session.ring.setTablets(tablets)
r.session.policy.SetTablets(tablets)

r.session.schemaDescriber.refreshTabletsSchema()

return nil
}

func removeTabletsWithTable(r *ringDescriber, keyspace string, table string) error {
dkropachev marked this conversation as resolved.
Show resolved Hide resolved
r.mu.Lock()
defer r.mu.Unlock()

tablets := r.session.getTablets()
tablets = removeTabletsWithTableFromTabletsList(tablets, keyspace, table)

r.session.ring.setTablets(tablets)
r.session.policy.SetTablets(tablets)

r.session.schemaDescriber.refreshTabletsSchema()

return nil
}

const (
ringRefreshDebounceTime = 1 * time.Second
)
Expand Down
6 changes: 0 additions & 6 deletions metadata_scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,11 @@ type IndexMetadata struct {
}

// TabletsMetadata holds metadata for tablet list
// Experimental, this interface and use may change
type TabletsMetadata struct {
Tablets []*TabletMetadata
}

// TabletMetadata holds metadata for single tablet
// Experimental, this interface and use may change
type TabletMetadata struct {
KeyspaceName string
TableName string
Expand All @@ -151,7 +149,6 @@ type TabletMetadata struct {
}

// TabletMetadata holds metadata for single replica
// Experimental, this interface and use may change
type ReplicaMetadata struct {
HostId UUID
ShardId int
Expand Down Expand Up @@ -247,7 +244,6 @@ type schemaDescriber struct {

cache map[string]*KeyspaceMetadata

// Experimental, this interface and use may change
tabletsCache *TabletsMetadata
}

Expand Down Expand Up @@ -281,7 +277,6 @@ func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, err
return metadata, nil
}

// Experimental, this interface and use may change
func (s *schemaDescriber) getTabletsSchema() *TabletsMetadata {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -291,7 +286,6 @@ func (s *schemaDescriber) getTabletsSchema() *TabletsMetadata {
return metadata
}

// Experimental, this interface and use may change
func (s *schemaDescriber) refreshTabletsSchema() {
tablets := s.session.getTablets()
s.tabletsCache.Tablets = []*TabletMetadata{}
Expand Down
8 changes: 0 additions & 8 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func (c *cowHostList) remove(host *HostInfo) bool {
}

// cowTabletList implements a copy on write tablet list, its equivalent type is []*TabletInfo
// Experimental, this interface and use may change
type cowTabletList struct {
list atomic.Value
mu sync.Mutex
Expand Down Expand Up @@ -338,7 +337,6 @@ type HostTierer interface {
type HostSelectionPolicy interface {
HostStateNotifier
SetPartitioner
// Experimental, this interface and use may change
SetTablets
KeyspaceChanged(KeyspaceUpdateEvent)
Init(*Session)
Expand Down Expand Up @@ -399,7 +397,6 @@ func (r *roundRobinHostPolicy) Init(*Session) {}
func (r *roundRobinHostPolicy) Reset() {}
func (r *roundRobinHostPolicy) IsOperational(*Session) error { return nil }

// Experimental, this interface and use may change
func (r *roundRobinHostPolicy) SetTablets(tablets []*TabletInfo) {}

func (r *roundRobinHostPolicy) Pick(qry ExecutableQuery) NextHost {
Expand Down Expand Up @@ -492,7 +489,6 @@ type tokenAwareHostPolicy struct {

logger StdLogger

// Experimental, this interface and use may change
tablets cowTabletList

avoidSlowReplicas bool
Expand Down Expand Up @@ -579,7 +575,6 @@ func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) {
}
}

// Experimental, this interface and use may change
func (t *tokenAwareHostPolicy) SetTablets(tablets []*TabletInfo) {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down Expand Up @@ -867,7 +862,6 @@ func (r *hostPoolHostPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {}
func (r *hostPoolHostPolicy) SetPartitioner(string) {}
func (r *hostPoolHostPolicy) IsLocal(*HostInfo) bool { return true }

// Experimental, this interface and use may change
func (r *hostPoolHostPolicy) SetTablets(tablets []*TabletInfo) {}

func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) {
Expand Down Expand Up @@ -1049,7 +1043,6 @@ func (d *dcAwareRR) IsLocal(host *HostInfo) bool {
return host.DataCenter() == d.local
}

// Experimental, this interface and use may change
func (d *dcAwareRR) SetTablets(tablets []*TabletInfo) {}

func (d *dcAwareRR) AddHost(host *HostInfo) {
Expand Down Expand Up @@ -1176,7 +1169,6 @@ func (d *rackAwareRR) setDCFailoverDisabled() {
d.disableDCFailover = true
}

// Experimental, this interface and use may change
func (d *rackAwareRR) SetTablets(tablets []*TabletInfo) {}

func (d *rackAwareRR) HostTier(host *HostInfo) uint {
Expand Down
2 changes: 0 additions & 2 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ type ring struct {
hostList []*HostInfo
pos uint32

// Experimental, this interface and use may change
tabletList []*TabletInfo

// TODO: we should store the ring metadata here also.
Expand Down Expand Up @@ -145,7 +144,6 @@ func (c *clusterMetadata) setPartitioner(partitioner string) {
}
}

// Experimental, this interface and use may change
func (r *ring) setTablets(newTablets []*TabletInfo) {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down
4 changes: 0 additions & 4 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,6 @@ func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) {
}

// TabletsMetadata returns the metadata about tablets
// Experimental, this interface and use may change
func (s *Session) TabletsMetadata() (*TabletsMetadata, error) {
// fail fast
if s.Closed() {
Expand Down Expand Up @@ -638,7 +637,6 @@ func (s *Session) getConn() *Conn {
return nil
}

// Experimental, this interface and use may change
func (s *Session) getTablets() []*TabletInfo {
s.ring.mu.Lock()
defer s.ring.mu.Unlock()
Expand Down Expand Up @@ -2288,8 +2286,6 @@ type ObservedQuery struct {
}

// QueryObserver is the interface implemented by query observers / stat collectors.
//
// Experimental, this interface and use may change
type QueryObserver interface {
// ObserveQuery gets called on every query to cassandra, including all queries in an iterator when paging is enabled.
// It doesn't get called if there is no query because the session is closed or there are no connections available.
Expand Down
Loading
Loading