Skip to content

Commit

Permalink
Merge pull request #2933 from redpanda-data/pause-on-completion
Browse files Browse the repository at this point in the history
Add a GRPC HealthCheck endpoint
  • Loading branch information
Jeffail authored Nov 4, 2024
2 parents fd1838f + f020aa5 commit c09208b
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 0 deletions.
32 changes: 32 additions & 0 deletions cmd/redpanda-connect-ai/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/redpanda-data/connect/v4/internal/cli"
"github.com/redpanda-data/connect/v4/internal/protohealth"
"github.com/redpanda-data/connect/v4/public/schema"

// Only import a subset of components for execution.
Expand All @@ -35,5 +42,30 @@ var (

func main() {
schema := schema.CloudAI(Version, DateBuilt)
if len(os.Args) > 1 && os.Args[1] != "run" {
cli.InitEnterpriseCLI(BinaryName, Version, DateBuilt, schema)
return
}

status := protohealth.NewEndpoint(2999)
errC := make(chan error)
sigC := make(chan os.Signal, 1)
signal.Notify(sigC, os.Interrupt, syscall.SIGTERM)
go func() {
errC <- status.Run(context.Background())
}()
cli.InitEnterpriseCLI(BinaryName, Version, DateBuilt, schema)
select {
case <-sigC:
// External termination should not cause the pipeline to be killed
fmt.Println("received interrupt signal, not marking as complete")
return
default:
}
fmt.Println("exited without interrupt signal, marking as complete")
status.MarkDone()
select {
case <-errC:
case <-sigC:
}
}
32 changes: 32 additions & 0 deletions cmd/redpanda-connect-cloud/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/redpanda-data/connect/v4/internal/cli"
"github.com/redpanda-data/connect/v4/internal/protohealth"
"github.com/redpanda-data/connect/v4/public/schema"

// Only import a subset of components for execution.
Expand All @@ -33,5 +40,30 @@ var (

func main() {
schema := schema.Cloud(Version, DateBuilt)
if len(os.Args) > 1 && os.Args[1] != "run" {
cli.InitEnterpriseCLI(BinaryName, Version, DateBuilt, schema)
return
}

status := protohealth.NewEndpoint(2999)
errC := make(chan error)
sigC := make(chan os.Signal, 1)
signal.Notify(sigC, os.Interrupt, syscall.SIGTERM)
go func() {
errC <- status.Run(context.Background())
}()
cli.InitEnterpriseCLI(BinaryName, Version, DateBuilt, schema)
select {
case <-sigC:
// External termination should not cause the pipeline to be killed
fmt.Println("received interrupt signal, not marking as complete")
return
default:
}
fmt.Println("exited without interrupt signal, marking as complete")
status.MarkDone()
select {
case <-errC:
case <-sigC:
}
}
120 changes: 120 additions & 0 deletions internal/protohealth/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package protohealth

import (
"context"
"fmt"
"net"
"sync/atomic"

"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
)

// Endpoint hosts a grpc health endpoint at the specified port.
// No TLS is wrapped around this; it's for k8s consumption.
type Endpoint struct {
port int16
srv *grpc.Server
running atomic.Bool
signal chan struct{}
grpc_health_v1.UnimplementedHealthServer
}

// NewEndpoint constructs the Endpoint
func NewEndpoint(port int16) *Endpoint {
srv := grpc.NewServer()
reflection.Register(srv)
e := &Endpoint{
port: port,
srv: srv,
signal: make(chan struct{}),
}
grpc_health_v1.RegisterHealthServer(srv, e)

return e
}

// Run listens on the supplied GRPC health endpoint for unencrypted connections
func (e *Endpoint) Run(ctx context.Context) error {
e.running.Store(true)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", e.port))
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}
errC := make(chan error, 1)
go func() {
errC <- e.srv.Serve(lis)
}()
select {
case <-ctx.Done():
e.srv.Stop()
return ctx.Err()
case err := <-errC:
return err
}
}

// MarkDone should be called to latch the Endpoint into "not ready"
// status. This cannot be reversed. All watchers will be notified.
func (e *Endpoint) MarkDone() {
if e.running.Swap(false) {
close(e.signal)
}
}

// Check is the one-shot GRPC test endpoint.
func (e *Endpoint) Check(context.Context, *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
status := grpc_health_v1.HealthCheckResponse_NOT_SERVING
if e.running.Load() {
status = grpc_health_v1.HealthCheckResponse_SERVING
}
return &grpc_health_v1.HealthCheckResponse{
Status: status,
}, nil
}

// Watch is the streaming GRPC endpoint.
func (e *Endpoint) Watch(_ *grpc_health_v1.HealthCheckRequest, server grpc_health_v1.Health_WatchServer) error {
status := grpc_health_v1.HealthCheckResponse_NOT_SERVING
if e.running.Load() {
status = grpc_health_v1.HealthCheckResponse_SERVING
}

err := server.Send(&grpc_health_v1.HealthCheckResponse{
Status: status,
})
if err != nil {
return err
}

watcher := e.signal
for {
select {
case <-server.Context().Done():
return server.Context().Err()
case <-watcher:
watcher = nil
err := server.Send(&grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
})
if err != nil {
return err
}
}
}
}

0 comments on commit c09208b

Please sign in to comment.