Skip to content

Commit

Permalink
Merge pull request #351 from sylwiaszunejko/invalidate-tablets-data
Browse files Browse the repository at this point in the history
Invalidate tablets data
  • Loading branch information
dkropachev authored Nov 26, 2024
2 parents ad9bddd + 64e47cc commit 5cd2a6c
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 96 deletions.
2 changes: 1 addition & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1522,7 +1522,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
tablet.keyspaceName = qry.routingInfo.keyspace
tablet.tableName = qry.routingInfo.table

addTablet(c.session.hostSource, &tablet)
c.session.addTablet(&tablet)
}
}

Expand Down
3 changes: 1 addition & 2 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ type SetPartitioner interface {
}

// interface to implement to receive the tablets value
// Experimental, this interface and use may change
type SetTablets interface {
SetTablets(tablets []*TabletInfo)
SetTablets(tablets TabletInfoList)
}

type policyConnPool struct {
Expand Down
10 changes: 10 additions & 0 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (s *Session) handleSchemaEvent(frames []frame) {
s.handleKeyspaceChange(f.keyspace, f.change)
case *schemaChangeTable:
s.schemaDescriber.clearSchema(f.keyspace)
s.handleTableChange(f.keyspace, f.object, f.change)
case *schemaChangeAggregate:
s.schemaDescriber.clearSchema(f.keyspace)
case *schemaChangeFunction:
Expand All @@ -126,9 +127,18 @@ func (s *Session) handleSchemaEvent(frames []frame) {

func (s *Session) handleKeyspaceChange(keyspace, change string) {
s.control.awaitSchemaAgreement()
if change == "DROPPED" || change == "UPDATED" {
s.removeTabletsWithKeyspace(keyspace)
}
s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace, Change: change})
}

func (s *Session) handleTableChange(keyspace, table, change string) {
if change == "DROPPED" || change == "UPDATED" {
s.removeTabletsWithTable(keyspace, table)
}
}

// handleNodeEvent handles inbound status and topology change events.
//
// Status events are debounced by host IP; only the latest event is processed.
Expand Down
139 changes: 109 additions & 30 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,13 +504,11 @@ func (h *HostInfo) ScyllaShardAwarePortTLS() uint16 {
return h.scyllaShardAwarePortTLS
}

// Experimental, this interface and use may change
type ReplicaInfo struct {
hostId UUID
shardId int
}

// Experimental, this interface and use may change
type TabletInfo struct {
keyspaceName string
tableName string
Expand Down Expand Up @@ -539,12 +537,14 @@ func (t *TabletInfo) Replicas() []ReplicaInfo {
return t.replicas
}

type TabletInfoList []*TabletInfo

// Search for place in tablets table with specific Keyspace and Table name
func findTablets(tablets []*TabletInfo, k string, t string) (int, int) {
func (t TabletInfoList) findTablets(keyspace string, table string) (int, int) {
l := -1
r := -1
for i, tablet := range tablets {
if tablet.KeyspaceName() == k && tablet.TableName() == t {
for i, tablet := range t {
if tablet.KeyspaceName() == keyspace && tablet.TableName() == table {
if l == -1 {
l = i
}
Expand All @@ -557,8 +557,8 @@ func findTablets(tablets []*TabletInfo, k string, t string) (int, int) {
return l, r
}

func addTabletToTabletsList(tablets []*TabletInfo, tablet *TabletInfo) []*TabletInfo {
l, r := findTablets(tablets, tablet.keyspaceName, tablet.tableName)
func (t TabletInfoList) addTabletToTabletsList(tablet *TabletInfo) TabletInfoList {
l, r := t.findTablets(tablet.keyspaceName, tablet.tableName)
if l == -1 && r == -1 {
l = 0
r = 0
Expand All @@ -572,62 +572,110 @@ func addTabletToTabletsList(tablets []*TabletInfo, tablet *TabletInfo) []*Tablet
// find first overlaping range
for l1 < r1 {
mid := (l1 + r1) / 2
if tablets[mid].FirstToken() < tablet.FirstToken() {
if t[mid].FirstToken() < tablet.FirstToken() {
l1 = mid + 1
} else {
r1 = mid
}
}
start := l1

if start > l && tablets[start-1].LastToken() > tablet.FirstToken() {
if start > l && t[start-1].LastToken() > tablet.FirstToken() {
start = start - 1
}

// find last overlaping range
for l2 < r2 {
mid := (l2 + r2) / 2
if tablets[mid].LastToken() < tablet.LastToken() {
if t[mid].LastToken() < tablet.LastToken() {
l2 = mid + 1
} else {
r2 = mid
}
}
end := l2
if end < r && tablets[end].FirstToken() >= tablet.LastToken() {
if end < r && t[end].FirstToken() >= tablet.LastToken() {
end = end - 1
}
if end == len(tablets) {
if end == len(t) {
end = end - 1
}

updated_tablets := tablets
updated_tablets := t
if start <= end {
// Delete elements from index start to end
updated_tablets = append(tablets[:start], tablets[end+1:]...)
updated_tablets = append(t[:start], t[end+1:]...)
}
// Insert tablet element at index start
updated_tablets2 := append(updated_tablets[:start], append([]*TabletInfo{tablet}, updated_tablets[start:]...)...)
return updated_tablets2
t = append(updated_tablets[:start], append([]*TabletInfo{tablet}, updated_tablets[start:]...)...)
return t
}

// Remove all tablets that have given host as a replica
func (t TabletInfoList) removeTabletsWithHostFromTabletsList(host *HostInfo) TabletInfoList {
filteredTablets := make([]*TabletInfo, 0, len(t)) // Preallocate for efficiency

for _, tablet := range t {
// Check if any replica matches the given host ID
shouldExclude := false
for _, replica := range tablet.replicas {
if replica.hostId.String() == host.HostID() {
shouldExclude = true
break
}
}
if !shouldExclude {
filteredTablets = append(filteredTablets, tablet)
}
}

t = filteredTablets
return t
}

func (t TabletInfoList) removeTabletsWithKeyspaceFromTabletsList(keyspace string) TabletInfoList {
filteredTablets := make([]*TabletInfo, 0, len(t))

for _, tablet := range t {
if tablet.keyspaceName != keyspace {
filteredTablets = append(filteredTablets, tablet)
}
}

t = filteredTablets
return t
}

func (t TabletInfoList) removeTabletsWithTableFromTabletsList(keyspace string, table string) TabletInfoList {
filteredTablets := make([]*TabletInfo, 0, len(t))

for _, tablet := range t {
if !(tablet.keyspaceName == keyspace && tablet.tableName == table) {
filteredTablets = append(filteredTablets, tablet)
}
}

t = filteredTablets
return t
}

// Search for place in tablets table for token starting from index l to index r
func findTabletForToken(tablets []*TabletInfo, token Token, l int, r int) *TabletInfo {
func (t TabletInfoList) findTabletForToken(token Token, l int, r int) *TabletInfo {
for l < r {
var m int
if r*l > 0 {
m = l + (r-l)/2
} else {
m = (r + l) / 2
}
if int64Token(tablets[m].LastToken()).Less(token) {
if int64Token(t[m].LastToken()).Less(token) {
l = m + 1
} else {
r = m
}
}

return tablets[l]
return t[l]
}

// Polls system.peers at a specific interval to find new hosts
Expand All @@ -636,8 +684,6 @@ type ringDescriber struct {
mu sync.Mutex
prevHosts []*HostInfo
prevPartitioner string
// Experimental, this interface and use may change
prevTablets []*TabletInfo
}

// Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
Expand Down Expand Up @@ -1005,6 +1051,7 @@ func refreshRing(r *ringDescriber) error {
}

for _, host := range prevHosts {
r.session.removeTabletsWithHost(host)
r.session.removeHost(host)
}

Expand All @@ -1014,18 +1061,50 @@ func refreshRing(r *ringDescriber) error {
return nil
}

// Experimental, this interface and use may change
func addTablet(r *ringDescriber, tablet *TabletInfo) error {
r.mu.Lock()
defer r.mu.Unlock()
func (s *Session) addTablet(tablet *TabletInfo) error {
tablets := s.getTablets()
tablets = tablets.addTabletToTabletsList(tablet)

s.ring.setTablets(tablets)
s.policy.SetTablets(tablets)

s.schemaDescriber.refreshTabletsSchema()

return nil
}

func (s *Session) removeTabletsWithHost(host *HostInfo) error {
tablets := s.getTablets()
tablets = tablets.removeTabletsWithHostFromTabletsList(host)

s.ring.setTablets(tablets)
s.policy.SetTablets(tablets)

s.schemaDescriber.refreshTabletsSchema()

return nil
}

func (s *Session) removeTabletsWithKeyspace(keyspace string) error {
tablets := s.getTablets()
tablets = tablets.removeTabletsWithKeyspaceFromTabletsList(keyspace)

s.ring.setTablets(tablets)
s.policy.SetTablets(tablets)

s.schemaDescriber.refreshTabletsSchema()

return nil
}

tablets := r.session.getTablets()
tablets = addTabletToTabletsList(tablets, tablet)
func (s *Session) removeTabletsWithTable(keyspace string, table string) error {
tablets := s.getTablets()
tablets = tablets.removeTabletsWithTableFromTabletsList(keyspace, table)

r.session.ring.setTablets(tablets)
r.session.policy.SetTablets(tablets)
s.ring.setTablets(tablets)
s.policy.SetTablets(tablets)

r.session.schemaDescriber.refreshTabletsSchema()
s.schemaDescriber.refreshTabletsSchema()

return nil
}
Expand Down
6 changes: 0 additions & 6 deletions metadata_scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,11 @@ type IndexMetadata struct {
}

// TabletsMetadata holds metadata for tablet list
// Experimental, this interface and use may change
type TabletsMetadata struct {
Tablets []*TabletMetadata
}

// TabletMetadata holds metadata for single tablet
// Experimental, this interface and use may change
type TabletMetadata struct {
KeyspaceName string
TableName string
Expand All @@ -151,7 +149,6 @@ type TabletMetadata struct {
}

// TabletMetadata holds metadata for single replica
// Experimental, this interface and use may change
type ReplicaMetadata struct {
HostId UUID
ShardId int
Expand Down Expand Up @@ -247,7 +244,6 @@ type schemaDescriber struct {

cache map[string]*KeyspaceMetadata

// Experimental, this interface and use may change
tabletsCache *TabletsMetadata
}

Expand Down Expand Up @@ -281,7 +277,6 @@ func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, err
return metadata, nil
}

// Experimental, this interface and use may change
func (s *schemaDescriber) getTabletsSchema() *TabletsMetadata {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -291,7 +286,6 @@ func (s *schemaDescriber) getTabletsSchema() *TabletsMetadata {
return metadata
}

// Experimental, this interface and use may change
func (s *schemaDescriber) refreshTabletsSchema() {
tablets := s.session.getTablets()
s.tabletsCache.Tablets = []*TabletMetadata{}
Expand Down
Loading

0 comments on commit 5cd2a6c

Please sign in to comment.