Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

refactoring profile handler into entrypoints #113

Merged
merged 3 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions cmd/entrypoints/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package entrypoints
import (
"context"

"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"

"github.com/flyteorg/datacatalog/pkg/config"
"github.com/flyteorg/datacatalog/pkg/rpc/datacatalogservice"
"github.com/flyteorg/datacatalog/pkg/runtime"
"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/profutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"

"github.com/spf13/cobra"
)

Expand All @@ -27,6 +29,16 @@ var serveCmd = &cobra.Command{
}
}()

// Serve profiling endpoint.
dataCatalogConfig := runtime.NewConfigurationProvider().ApplicationConfiguration().GetDataCatalogConfig()
go func() {
err := profutils.StartProfilingServerWithDefaultHandlers(
context.Background(), dataCatalogConfig.ProfilerPort, nil)
if err != nil {
logger.Panicf(context.Background(), "Failed to Start profiling and Metrics server. Error, %v", err)
}
}()

// Set Keys
labeled.SetMetricKeys(contextutils.AppNameKey, contextutils.ProjectKey, contextutils.DomainKey)

Expand Down
10 changes: 10 additions & 0 deletions cmd/entrypoints/serve_dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/flyteorg/datacatalog/pkg/config"
"github.com/flyteorg/datacatalog/pkg/rpc/datacatalogservice"
"github.com/flyteorg/flytestdlib/logger"
"github.com/spf13/cobra"
)

Expand All @@ -14,6 +15,15 @@ var serveDummyCmd = &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
cfg := config.GetConfig()

// serve a http healthcheck endpoint
go func() {
err := datacatalogservice.ServeHTTPHealthCheck(ctx, cfg)
if err != nil {
logger.Errorf(ctx, "Unable to serve http", cfg.GetGrpcHostAddress(), err)
}
}()

return datacatalogservice.Serve(ctx, cfg)
},
}
Expand Down
18 changes: 0 additions & 18 deletions pkg/rpc/datacatalogservice/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
catalog "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog"
"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/profutils"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/storage"
)
Expand Down Expand Up @@ -106,15 +105,6 @@ func NewDataCatalogService() *DataCatalogService {
repos := repositories.GetRepository(ctx, repositories.POSTGRES, *dbConfigValues, catalogScope)
logger.Infof(ctx, "Created DB connection.")

// Serve profiling endpoint.
go func() {
err := profutils.StartProfilingServerWithDefaultHandlers(
context.Background(), dataCatalogConfig.ProfilerPort, nil)
if err != nil {
logger.Panicf(context.Background(), "Failed to Start profiling and Metrics server. Error, %v", err)
}
}()

return &DataCatalogService{
DatasetManager: impl.NewDatasetManager(repos, dataStorageClient, catalogScope.NewSubScope("dataset")),
ArtifactManager: impl.NewArtifactManager(repos, dataStorageClient, storagePrefix, catalogScope.NewSubScope("artifact")),
Expand Down Expand Up @@ -173,14 +163,6 @@ func ServeHTTPHealthCheck(ctx context.Context, cfg *config.Config) error {

// Create and start the gRPC server and http healthcheck endpoint
func Serve(ctx context.Context, cfg *config.Config) error {
// serve a http healthcheck endpoint
go func() {
err := ServeHTTPHealthCheck(ctx, cfg)
if err != nil {
logger.Errorf(ctx, "Unable to serve http", cfg.GetGrpcHostAddress(), err)
}
}()

grpcServer := newGRPCDummyServer(ctx, cfg)

grpcListener, err := net.Listen("tcp", cfg.GetGrpcHostAddress())
Expand Down