Skip to content

Commit

Permalink
Extract the code getting schema versions to separate function
Browse files Browse the repository at this point in the history
  • Loading branch information
sylwiaszunejko committed Sep 6, 2024
1 parent d0152a3 commit 9ff526d
Showing 1 changed file with 23 additions and 25 deletions.
48 changes: 23 additions & 25 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1840,27 +1840,22 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) error {
}
}

for time.Now().Before(endDeadline) {
getSchemaAgreement := func() error {
iter := c.querySystemPeers(ctx, c.host.version)

versions = make(map[string]struct{})

var rows []map[string]interface{}
rows, err = iter.SliceMap()
if err != nil {
if err == ErrConnectionClosed {
break
}
waitForNextTick()
continue
return err
}

for _, row := range rows {
var host *HostInfo
host, err = hostInfoFromMap(row, &HostInfo{connectAddress: c.host.ConnectAddress(), port: c.session.cfg.Port}, c.session.cfg.translateAddressPort)
if err != nil {
waitForNextTick()
continue
return err
}
if !isValidPeer(host) || host.schemaVersion == "" {
c.logger.Printf("invalid peer or peer with empty schema_version: peer=%q", host)
Expand All @@ -1871,8 +1866,7 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) error {
}

if err = iter.Close(); err != nil {
waitForNextTick()
continue
return err
}

iter = c.query(ctx, localSchemas)
Expand All @@ -1882,30 +1876,34 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) error {
}

if err = iter.Close(); err != nil {
waitForNextTick()
continue
return err
}

if len(versions) <= 1 {
return nil
}
if len(versions) > 1 {
schemas := make([]string, 0, len(versions))
for schema := range versions {
schemas = append(schemas, schema)
}

if err := waitForNextTick(); err != nil {
return err
return &ErrSchemaMismatch{schemas: schemas}
}
}

if err != nil {
return err
return nil
}

schemas := make([]string, 0, len(versions))
for schema := range versions {
schemas = append(schemas, schema)
for time.Now().Before(endDeadline) {
err = getSchemaAgreement()

if err == ErrConnectionClosed || err == nil {
return err
}

if tickerErr := waitForNextTick(); tickerErr != nil {
return tickerErr
}
}

// not exported
return &ErrSchemaMismatch{schemas: schemas}
return err
}

var (
Expand Down

0 comments on commit 9ff526d

Please sign in to comment.