Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Invalidate tablets data #351

Merged
merged 5 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1522,7 +1522,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
tablet.keyspaceName = qry.routingInfo.keyspace
tablet.tableName = qry.routingInfo.table

addTablet(c.session.hostSource, &tablet)
c.session.addTablet(&tablet)
}
}

Expand Down
3 changes: 1 addition & 2 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ type SetPartitioner interface {
}

// interface to implement to receive the tablets value
// Experimental, this interface and use may change
type SetTablets interface {
SetTablets(tablets []*TabletInfo)
SetTablets(tablets TabletInfoList)
}

type policyConnPool struct {
Expand Down
10 changes: 10 additions & 0 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (s *Session) handleSchemaEvent(frames []frame) {
s.handleKeyspaceChange(f.keyspace, f.change)
case *schemaChangeTable:
s.schemaDescriber.clearSchema(f.keyspace)
s.handleTableChange(f.keyspace, f.object, f.change)
case *schemaChangeAggregate:
s.schemaDescriber.clearSchema(f.keyspace)
case *schemaChangeFunction:
Expand All @@ -126,9 +127,18 @@ func (s *Session) handleSchemaEvent(frames []frame) {

func (s *Session) handleKeyspaceChange(keyspace, change string) {
s.control.awaitSchemaAgreement()
if change == "DROPPED" || change == "UPDATED" {
s.removeTabletsWithKeyspace(keyspace)
}
s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace, Change: change})
}

func (s *Session) handleTableChange(keyspace, table, change string) {
if change == "DROPPED" || change == "UPDATED" {
s.removeTabletsWithTable(keyspace, table)
}
}

// handleNodeEvent handles inbound status and topology change events.
//
// Status events are debounced by host IP; only the latest event is processed.
Expand Down
139 changes: 109 additions & 30 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,13 +504,11 @@ func (h *HostInfo) ScyllaShardAwarePortTLS() uint16 {
return h.scyllaShardAwarePortTLS
}

// Experimental, this interface and use may change
type ReplicaInfo struct {
hostId UUID
shardId int
}

// Experimental, this interface and use may change
type TabletInfo struct {
keyspaceName string
tableName string
Expand Down Expand Up @@ -539,12 +537,14 @@ func (t *TabletInfo) Replicas() []ReplicaInfo {
return t.replicas
}

type TabletInfoList []*TabletInfo

// Search for place in tablets table with specific Keyspace and Table name
func findTablets(tablets []*TabletInfo, k string, t string) (int, int) {
func (t TabletInfoList) findTablets(keyspace string, table string) (int, int) {
l := -1
r := -1
for i, tablet := range tablets {
if tablet.KeyspaceName() == k && tablet.TableName() == t {
for i, tablet := range t {
if tablet.KeyspaceName() == keyspace && tablet.TableName() == table {
if l == -1 {
l = i
}
Expand All @@ -557,8 +557,8 @@ func findTablets(tablets []*TabletInfo, k string, t string) (int, int) {
return l, r
}

func addTabletToTabletsList(tablets []*TabletInfo, tablet *TabletInfo) []*TabletInfo {
l, r := findTablets(tablets, tablet.keyspaceName, tablet.tableName)
func (t TabletInfoList) addTabletToTabletsList(tablet *TabletInfo) TabletInfoList {
l, r := t.findTablets(tablet.keyspaceName, tablet.tableName)
if l == -1 && r == -1 {
l = 0
r = 0
Expand All @@ -572,62 +572,110 @@ func addTabletToTabletsList(tablets []*TabletInfo, tablet *TabletInfo) []*Tablet
// find first overlaping range
for l1 < r1 {
mid := (l1 + r1) / 2
if tablets[mid].FirstToken() < tablet.FirstToken() {
if t[mid].FirstToken() < tablet.FirstToken() {
l1 = mid + 1
} else {
r1 = mid
}
}
start := l1

if start > l && tablets[start-1].LastToken() > tablet.FirstToken() {
if start > l && t[start-1].LastToken() > tablet.FirstToken() {
start = start - 1
}

// find last overlaping range
for l2 < r2 {
mid := (l2 + r2) / 2
if tablets[mid].LastToken() < tablet.LastToken() {
if t[mid].LastToken() < tablet.LastToken() {
l2 = mid + 1
} else {
r2 = mid
}
}
end := l2
if end < r && tablets[end].FirstToken() >= tablet.LastToken() {
if end < r && t[end].FirstToken() >= tablet.LastToken() {
end = end - 1
}
if end == len(tablets) {
if end == len(t) {
end = end - 1
}

updated_tablets := tablets
updated_tablets := t
if start <= end {
// Delete elements from index start to end
updated_tablets = append(tablets[:start], tablets[end+1:]...)
updated_tablets = append(t[:start], t[end+1:]...)
}
// Insert tablet element at index start
updated_tablets2 := append(updated_tablets[:start], append([]*TabletInfo{tablet}, updated_tablets[start:]...)...)
return updated_tablets2
t = append(updated_tablets[:start], append([]*TabletInfo{tablet}, updated_tablets[start:]...)...)
return t
}

// Remove all tablets that have given host as a replica
func (t TabletInfoList) removeTabletsWithHostFromTabletsList(host *HostInfo) TabletInfoList {
filteredTablets := make([]*TabletInfo, 0, len(t)) // Preallocate for efficiency

for _, tablet := range t {
// Check if any replica matches the given host ID
shouldExclude := false
for _, replica := range tablet.replicas {
if replica.hostId.String() == host.HostID() {
shouldExclude = true
break
}
}
if !shouldExclude {
filteredTablets = append(filteredTablets, tablet)
}
}

t = filteredTablets
return t
}

func (t TabletInfoList) removeTabletsWithKeyspaceFromTabletsList(keyspace string) TabletInfoList {
filteredTablets := make([]*TabletInfo, 0, len(t))

for _, tablet := range t {
if tablet.keyspaceName != keyspace {
filteredTablets = append(filteredTablets, tablet)
}
}

t = filteredTablets
return t
}

func (t TabletInfoList) removeTabletsWithTableFromTabletsList(keyspace string, table string) TabletInfoList {
filteredTablets := make([]*TabletInfo, 0, len(t))

for _, tablet := range t {
if !(tablet.keyspaceName == keyspace && tablet.tableName == table) {
filteredTablets = append(filteredTablets, tablet)
}
}

t = filteredTablets
return t
}

// Search for place in tablets table for token starting from index l to index r
func findTabletForToken(tablets []*TabletInfo, token Token, l int, r int) *TabletInfo {
func (t TabletInfoList) findTabletForToken(token Token, l int, r int) *TabletInfo {
for l < r {
var m int
if r*l > 0 {
m = l + (r-l)/2
} else {
m = (r + l) / 2
}
if int64Token(tablets[m].LastToken()).Less(token) {
if int64Token(t[m].LastToken()).Less(token) {
l = m + 1
} else {
r = m
}
}

return tablets[l]
return t[l]
}

// Polls system.peers at a specific interval to find new hosts
Expand All @@ -636,8 +684,6 @@ type ringDescriber struct {
mu sync.Mutex
prevHosts []*HostInfo
prevPartitioner string
// Experimental, this interface and use may change
prevTablets []*TabletInfo
}

// Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
Expand Down Expand Up @@ -1000,6 +1046,7 @@ func refreshRing(r *ringDescriber) error {
}

for _, host := range prevHosts {
r.session.removeTabletsWithHost(host)
r.session.removeHost(host)
}

Expand All @@ -1009,18 +1056,50 @@ func refreshRing(r *ringDescriber) error {
return nil
}

// Experimental, this interface and use may change
func addTablet(r *ringDescriber, tablet *TabletInfo) error {
r.mu.Lock()
defer r.mu.Unlock()
func (s *Session) addTablet(tablet *TabletInfo) error {
tablets := s.getTablets()
tablets = tablets.addTabletToTabletsList(tablet)

s.ring.setTablets(tablets)
s.policy.SetTablets(tablets)

s.schemaDescriber.refreshTabletsSchema()

return nil
}

func (s *Session) removeTabletsWithHost(host *HostInfo) error {
tablets := s.getTablets()
tablets = tablets.removeTabletsWithHostFromTabletsList(host)

s.ring.setTablets(tablets)
s.policy.SetTablets(tablets)

s.schemaDescriber.refreshTabletsSchema()

return nil
}

func (s *Session) removeTabletsWithKeyspace(keyspace string) error {
tablets := s.getTablets()
tablets = tablets.removeTabletsWithKeyspaceFromTabletsList(keyspace)

s.ring.setTablets(tablets)
s.policy.SetTablets(tablets)

s.schemaDescriber.refreshTabletsSchema()

return nil
}

tablets := r.session.getTablets()
tablets = addTabletToTabletsList(tablets, tablet)
func (s *Session) removeTabletsWithTable(keyspace string, table string) error {
tablets := s.getTablets()
tablets = tablets.removeTabletsWithTableFromTabletsList(keyspace, table)

r.session.ring.setTablets(tablets)
r.session.policy.SetTablets(tablets)
s.ring.setTablets(tablets)
s.policy.SetTablets(tablets)

r.session.schemaDescriber.refreshTabletsSchema()
s.schemaDescriber.refreshTabletsSchema()

return nil
}
Expand Down
6 changes: 0 additions & 6 deletions metadata_scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,11 @@ type IndexMetadata struct {
}

// TabletsMetadata holds metadata for tablet list
// Experimental, this interface and use may change
type TabletsMetadata struct {
Tablets []*TabletMetadata
}

// TabletMetadata holds metadata for single tablet
// Experimental, this interface and use may change
type TabletMetadata struct {
KeyspaceName string
TableName string
Expand All @@ -151,7 +149,6 @@ type TabletMetadata struct {
}

// TabletMetadata holds metadata for single replica
// Experimental, this interface and use may change
type ReplicaMetadata struct {
HostId UUID
ShardId int
Expand Down Expand Up @@ -247,7 +244,6 @@ type schemaDescriber struct {

cache map[string]*KeyspaceMetadata

// Experimental, this interface and use may change
tabletsCache *TabletsMetadata
}

Expand Down Expand Up @@ -281,7 +277,6 @@ func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, err
return metadata, nil
}

// Experimental, this interface and use may change
func (s *schemaDescriber) getTabletsSchema() *TabletsMetadata {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -291,7 +286,6 @@ func (s *schemaDescriber) getTabletsSchema() *TabletsMetadata {
return metadata
}

// Experimental, this interface and use may change
func (s *schemaDescriber) refreshTabletsSchema() {
tablets := s.session.getTablets()
s.tabletsCache.Tablets = []*TabletMetadata{}
Expand Down
Loading
Loading