Skip to content

Commit

Permalink
WIP println logging
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Beaumont <mjboamail@gmail.com>
  • Loading branch information
michaelbeaumont committed Oct 5, 2023
1 parent 3302d4c commit 81e8212
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
9 changes: 8 additions & 1 deletion pkg/kds/mux/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,17 +296,24 @@ func (c *client) startHealthCheck(
log = log.WithValues("rpc", "healthcheck")
log.Info("starting")

count := 0

go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
log.Info("send health check")
_, err := client.HealthCheck(ctx, &mesh_proto.ZoneHealthCheckRequest{})
if err != nil && !errors.Is(err, context.Canceled) {
errorCh <- err
}

select {
case <-ticker.C:
if count > 3 {
log.Info("simulating failing check")
return
}
count ++

Check failure on line 316 in pkg/kds/mux/client.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/kumahq/kuma) --custom-order (gci)
continue
case <-stop:
log.Info("stopping")
Expand Down
5 changes: 5 additions & 0 deletions pkg/kds/mux/zone_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (zw *ZoneWatch) Start(stop <-chan struct{}) error {
for {
select {
case <-timer.C:
core.Log.Info("new poll!")
for zone, firstSeen := range zw.zones {
ctx := multitenant.WithTenant(context.TODO(), zone.tenantID)
zoneInsight := system.NewZoneInsightResource()
Expand All @@ -64,14 +65,18 @@ func (zw *ZoneWatch) Start(stop <-chan struct{}) error {
}
lastHealthCheck := zoneInsight.Spec.GetHealthCheck().GetTime().AsTime()
if lastHealthCheck.After(firstSeen) && time.Since(lastHealthCheck) > 20*time.Second {
core.Log.Info("TIMEOUT")
zw.bus.Send(service.ZoneWentOffline{
Zone: zone.zone,
TenantID: zone.tenantID,
})
delete(zw.zones, zone)
} else {
core.Log.Info("NO TIMEOUT")
}
}
case e := <-connectionWatch.Recv():
core.Log.Info("new conn")
newStream := e.(service.ZoneOpenedStream)

ctx := multitenant.WithTenant(context.TODO(), newStream.TenantID)
Expand Down
13 changes: 10 additions & 3 deletions pkg/kds/service/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,27 @@ func NewGlobalKDSServiceServer(
var _ mesh_proto.GlobalKDSServiceServer = &GlobalKDSServiceServer{}

func (g *GlobalKDSServiceServer) StreamXDSConfigs(stream mesh_proto.GlobalKDSService_StreamXDSConfigsServer) error {
return g.streamEnvoyAdminRPC(ConfigDumpRPC, g.envoyAdminRPCs.XDSConfigDump, stream, func() (util_grpc.ReverseUnaryMessage, error) {
err := g.streamEnvoyAdminRPC(ConfigDumpRPC, g.envoyAdminRPCs.XDSConfigDump, stream, func() (util_grpc.ReverseUnaryMessage, error) {
return stream.Recv()
})
log.Info("stream over")
return err
}

func (g *GlobalKDSServiceServer) StreamStats(stream mesh_proto.GlobalKDSService_StreamStatsServer) error {
return g.streamEnvoyAdminRPC(StatsRPC, g.envoyAdminRPCs.Stats, stream, func() (util_grpc.ReverseUnaryMessage, error) {
err := g.streamEnvoyAdminRPC(StatsRPC, g.envoyAdminRPCs.Stats, stream, func() (util_grpc.ReverseUnaryMessage, error) {
return stream.Recv()
})
log.Info("stream over")
return err
}

func (g *GlobalKDSServiceServer) StreamClusters(stream mesh_proto.GlobalKDSService_StreamClustersServer) error {
return g.streamEnvoyAdminRPC(ClustersRPC, g.envoyAdminRPCs.Clusters, stream, func() (util_grpc.ReverseUnaryMessage, error) {
err := g.streamEnvoyAdminRPC(ClustersRPC, g.envoyAdminRPCs.Clusters, stream, func() (util_grpc.ReverseUnaryMessage, error) {
return stream.Recv()
})
log.Info("stream over")
return err
}

func (g *GlobalKDSServiceServer) HealthCheck(ctx context.Context, _ *mesh_proto.ZoneHealthCheckRequest) (*mesh_proto.ZoneHealthCheckResponse, error) {
Expand Down Expand Up @@ -169,6 +175,7 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC(
go func() {
select {
case <-shouldDisconnectStream.Recv():
logger.Info("disconnecting stream")
streamResult <- nil
case <-streamReadEnded:
return
Expand Down

0 comments on commit 81e8212

Please sign in to comment.