Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 2 additions & 17 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7402,24 +7402,9 @@ func (r *rpcServer) AddFederationServer(ctx context.Context,
) (*unirpc.AddFederationServerResponse, error) {

serversToAdd := fn.Map(req.Servers, unmarshalUniverseServer)

for idx := range serversToAdd {
server := serversToAdd[idx]

// Before we add the server as a federation member, we check
// that we can actually connect to it and that it isn't
// ourselves.
err := CheckFederationServer(
r.cfg.RuntimeID, universe.DefaultTimeout, server,
)
if err != nil {
return nil, err
}
}

err := r.cfg.UniverseFederation.AddServer(serversToAdd...)
_, err := r.cfg.UniverseFederation.AddServer(ctx, true, serversToAdd...)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to add server: %w", err)
}

return &unirpc.AddFederationServerResponse{}, nil
Expand Down
4 changes: 3 additions & 1 deletion tapdb/sqlc/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions tapdb/sqlc/queries/universe.sql
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,16 @@ ORDER BY
CASE WHEN sqlc.narg('sort_direction') = 1 THEN universe_roots.id END DESC
LIMIT @num_limit OFFSET @num_offset;

-- name: InsertUniverseServer :exec
-- name: UpsertUniverseServer :exec
-- Upserts a universe server by inserting or updating the last sync time for a
-- given server host.
INSERT INTO universe_servers(
server_host, last_sync_time
) VALUES (
@server_host, @last_sync_time
);
@server_host, @last_sync_time
)
ON CONFLICT(server_host)
DO UPDATE SET last_sync_time = EXCLUDED.last_sync_time;

-- name: DeleteUniverseServer :exec
DELETE FROM universe_servers
Expand Down
40 changes: 22 additions & 18 deletions tapdb/sqlc/universe.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 7 additions & 13 deletions tapdb/universe_federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"database/sql"
"errors"
"fmt"
"sort"
"sync/atomic"
Expand Down Expand Up @@ -39,7 +38,7 @@ type (
ProofSyncLogEntry = sqlc.QueryFederationProofSyncLogRow

// NewUniverseServer is used to create a new universe server.
NewUniverseServer = sqlc.InsertUniverseServerParams
NewUniverseServer = sqlc.UpsertUniverseServerParams

// DelUniverseServer is used to delete a universe server.
DelUniverseServer = sqlc.DeleteUniverseServerParams
Expand Down Expand Up @@ -131,8 +130,8 @@ type UniverseServerStore interface {
FederationSyncConfigStore
FederationProofSyncLogStore

// InsertUniverseServer inserts a new universe server in to the DB.
InsertUniverseServer(ctx context.Context, arg NewUniverseServer) error
// UpsertUniverseServer upserts a new universe server in to the DB.
UpsertUniverseServer(ctx context.Context, arg NewUniverseServer) error

// DeleteUniverseServer removes a universe server from the store.
DeleteUniverseServer(ctx context.Context, r DelUniverseServer) error
Expand Down Expand Up @@ -249,19 +248,14 @@ func (u *UniverseFederationDB) AddServers(ctx context.Context,
return fn.ForEachErr(addrs, func(a universe.ServerAddr) error {
addr := NewUniverseServer{
ServerHost: a.HostStr(),
LastSyncTime: time.Now(),
LastSyncTime: time.Now().UTC(),
}
return db.InsertUniverseServer(ctx, addr)
return db.UpsertUniverseServer(ctx, addr)
})
})
if err != nil {
// Add context to unique constraint errors.
var uniqueConstraintErr *ErrSqlUniqueConstraintViolation
if errors.As(err, &uniqueConstraintErr) {
return universe.ErrDuplicateUniverse
}

return err
return fmt.Errorf("failed to upsert universe server addr: %w",
err)
}

return nil
Expand Down
6 changes: 3 additions & 3 deletions tapdb/universe_federation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ func TestUniverseFederationCRUD(t *testing.T) {
// Next, we'll try to add a new series of servers to the DB.
addrs := db.AddRandomServerAddrs(t, 10)

// If we try to insert them all again, then we should get an error as
// we ensure the host names are unique.
// Re-inserting the same addresses should not cause an error, since they
// are expected to be upserted.
err = fedDB.AddServers(ctx, addrs...)
require.ErrorIs(t, err, universe.ErrDuplicateUniverse)
require.NoError(t, err)

// Next, we should be able to fetch all the active hosts.
dbAddrs, err := fedDB.UniverseServers(ctx)
Expand Down
Loading
Loading