Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vtgateproxy: add conn warmup time #465

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ const PoolTypeAttr = "PoolType"

// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
type JSONGateResolver struct {
target resolver.Target
clientConn resolver.ClientConn
poolType string
target resolver.Target
clientConn resolver.ClientConn
poolType string
currentAddrs []resolver.Address
}

func (r *JSONGateResolver) ResolveNow(o resolver.ResolveNowOptions) {}
Expand Down Expand Up @@ -210,15 +211,24 @@ func (b *JSONGateResolverBuilder) start() error {
}
parseCount.Add("changed", 1)

var wg sync.WaitGroup

// notify all the resolvers that the targets changed in parallel, since each update might sleep for
// the warmup time
b.mu.RLock()
// notify all the resolvers that the targets changed
for _, r := range b.resolvers {
err = b.update(r)
if err != nil {
log.Errorf("Failed to update resolver: %v", err)
}
wg.Add(1)
go func(r *JSONGateResolver) {
defer wg.Done()

err = b.update(r)
if err != nil {
log.Errorf("Failed to update resolver: %v", err)
}
}(r)
}
b.mu.RUnlock()
wg.Wait()
}
}()

Expand Down Expand Up @@ -393,8 +403,17 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) error {
addrs = append(addrs, resolver.Address{Addr: target.Addr, Attributes: attributes.New(PoolTypeAttr, r.poolType)})
}

log.V(100).Infof("updated targets for %s to %v", r.target.URL.String(), targets)
// If we've already selected some targets, give the new addresses some time to warm up before removing
// the old ones from the list
if r.currentAddrs != nil && warmupTime.Seconds() > 0 {
combined := append(r.currentAddrs, addrs...)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be a good idea to de-duplicate and skip if there's nothing new in addrs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure we could. I thought about that in the original implementation too but there's no real difference since that's basically what UpdateState is going to do under the covers.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Ignoring the sleep of course, but I think that's ok since we shouldn't have no-op updates to the file).

log.V(100).Infof("updating targets for %s to warmup %v", r.target.URL.String(), targets)
r.clientConn.UpdateState(resolver.State{Addresses: combined})
time.Sleep(*warmupTime)
}

log.V(100).Infof("updating targets for %s after warmup to %v", r.target.URL.String(), targets)
r.currentAddrs = addrs
return r.clientConn.UpdateState(resolver.State{Addresses: addrs})
}

Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var (
addressField = flag.String("address_field", "address", "field name in the json file containing the address")
portField = flag.String("port_field", "port", "field name in the json file containing the port")
balancerType = flag.String("balancer", "round_robin", "load balancing algorithm to use")
warmupTime = flag.Duration("warmup_time", 30*time.Second, "time to maintain connections to previously selected hosts")

timings = stats.NewTimings("Timings", "proxy timings by operation", "operation")

Expand Down
Loading