Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ws): use controller-runtime for backend #43

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 25 additions & 22 deletions workspaces/backend/api/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ limitations under the License.
package api

import (
"fmt"
"github.com/kubeflow/notebooks/workspaces/backend/config"
"github.com/kubeflow/notebooks/workspaces/backend/data"
"github.com/kubeflow/notebooks/workspaces/backend/integrations"
"log/slog"
"net/http"

"github.com/julienschmidt/httprouter"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kubeflow/notebooks/workspaces/backend/internal/config"
"github.com/kubeflow/notebooks/workspaces/backend/internal/data"
)

const (
Expand All @@ -33,33 +34,35 @@ const (
)

type App struct {
config config.EnvConfig
logger *slog.Logger
models data.Models
kubernetesClient *integrations.KubernetesClient
}
Config config.EnvConfig
logger *slog.Logger
models data.Models

func NewApp(cfg config.EnvConfig, logger *slog.Logger) (*App, error) {
k8sClient, err := integrations.NewKubernetesClient()
if err != nil {
return nil, fmt.Errorf("failed to create Kubernetes client: %w", err)
}
client.Client
Scheme *runtime.Scheme
}

// NewApp creates a new instance of the app
func NewApp(cfg config.EnvConfig, logger *slog.Logger, client client.Client, scheme *runtime.Scheme) (*App, error) {
app := &App{
config: cfg,
logger: logger,
kubernetesClient: k8sClient,
Config: cfg,
logger: logger,
models: data.NewModels(),

Client: client,
Scheme: scheme,
}
return app, nil
}

func (app *App) Routes() http.Handler {
// Routes returns the HTTP handler for the app
func (a *App) Routes() http.Handler {
router := httprouter.New()

router.NotFound = http.HandlerFunc(app.notFoundResponse)
router.MethodNotAllowed = http.HandlerFunc(app.methodNotAllowedResponse)
router.NotFound = http.HandlerFunc(a.notFoundResponse)
router.MethodNotAllowed = http.HandlerFunc(a.methodNotAllowedResponse)

router.GET(HealthCheckPath, app.HealthcheckHandler)
router.GET(HealthCheckPath, a.HealthcheckHandler)

return app.RecoverPanic(app.enableCORS(router))
return a.RecoverPanic(a.enableCORS(router))
}
32 changes: 16 additions & 16 deletions workspaces/backend/api/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,40 +33,40 @@ type ErrorResponse struct {
Message string `json:"message"`
}

func (app *App) LogError(r *http.Request, err error) {
func (a *App) LogError(r *http.Request, err error) {
var (
method = r.Method
uri = r.URL.RequestURI()
)

app.logger.Error(err.Error(), "method", method, "uri", uri)
a.logger.Error(err.Error(), "method", method, "uri", uri)
}

func (app *App) badRequestResponse(w http.ResponseWriter, r *http.Request, err error) {
func (a *App) badRequestResponse(w http.ResponseWriter, r *http.Request, err error) {
httpError := &HTTPError{
StatusCode: http.StatusBadRequest,
ErrorResponse: ErrorResponse{
Code: strconv.Itoa(http.StatusBadRequest),
Message: err.Error(),
},
}
app.errorResponse(w, r, httpError)
a.errorResponse(w, r, httpError)
}

func (app *App) errorResponse(w http.ResponseWriter, r *http.Request, error *HTTPError) {
func (a *App) errorResponse(w http.ResponseWriter, r *http.Request, error *HTTPError) {

env := Envelope{"error": error}

err := app.WriteJSON(w, error.StatusCode, env, nil)
err := a.WriteJSON(w, error.StatusCode, env, nil)

if err != nil {
app.LogError(r, err)
a.LogError(r, err)
w.WriteHeader(error.StatusCode)
}
}

func (app *App) serverErrorResponse(w http.ResponseWriter, r *http.Request, err error) {
app.LogError(r, err)
func (a *App) serverErrorResponse(w http.ResponseWriter, r *http.Request, err error) {
a.LogError(r, err)

httpError := &HTTPError{
StatusCode: http.StatusInternalServerError,
Expand All @@ -75,10 +75,10 @@ func (app *App) serverErrorResponse(w http.ResponseWriter, r *http.Request, err
Message: "the server encountered a problem and could not process your request",
},
}
app.errorResponse(w, r, httpError)
a.errorResponse(w, r, httpError)
}

func (app *App) notFoundResponse(w http.ResponseWriter, r *http.Request) {
func (a *App) notFoundResponse(w http.ResponseWriter, r *http.Request) {

httpError := &HTTPError{
StatusCode: http.StatusNotFound,
Expand All @@ -87,10 +87,10 @@ func (app *App) notFoundResponse(w http.ResponseWriter, r *http.Request) {
Message: "the requested resource could not be found",
},
}
app.errorResponse(w, r, httpError)
a.errorResponse(w, r, httpError)
}

func (app *App) methodNotAllowedResponse(w http.ResponseWriter, r *http.Request) {
func (a *App) methodNotAllowedResponse(w http.ResponseWriter, r *http.Request) {

httpError := &HTTPError{
StatusCode: http.StatusMethodNotAllowed,
Expand All @@ -99,10 +99,10 @@ func (app *App) methodNotAllowedResponse(w http.ResponseWriter, r *http.Request)
Message: fmt.Sprintf("the %s method is not supported for this resource", r.Method),
},
}
app.errorResponse(w, r, httpError)
a.errorResponse(w, r, httpError)
}

func (app *App) failedValidationResponse(w http.ResponseWriter, r *http.Request, errors map[string]string) {
func (a *App) failedValidationResponse(w http.ResponseWriter, r *http.Request, errors map[string]string) {

message, err := json.Marshal(errors)
if err != nil {
Expand All @@ -115,5 +115,5 @@ func (app *App) failedValidationResponse(w http.ResponseWriter, r *http.Request,
Message: string(message),
},
}
app.errorResponse(w, r, httpError)
a.errorResponse(w, r, httpError)
}
10 changes: 6 additions & 4 deletions workspaces/backend/api/healthcheck__handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@ package api

import (
"encoding/json"
"github.com/kubeflow/notebooks/workspaces/backend/config"
"github.com/kubeflow/notebooks/workspaces/backend/data"
"github.com/stretchr/testify/assert"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"

"github.com/kubeflow/notebooks/workspaces/backend/internal/config"
"github.com/kubeflow/notebooks/workspaces/backend/internal/data"
)

func TestHealthCheckHandler(t *testing.T) {

app := App{config: config.EnvConfig{
app := App{Config: config.EnvConfig{
Port: 4000,
}}

Expand Down
10 changes: 5 additions & 5 deletions workspaces/backend/api/healthcheck_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ import (
"net/http"
)

func (app *App) HealthcheckHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
func (a *App) HealthcheckHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {

healthCheck, err := app.models.HealthCheck.HealthCheck(Version)
healthCheck, err := a.models.HealthCheck.HealthCheck(Version)
if err != nil {
app.serverErrorResponse(w, r, err)
a.serverErrorResponse(w, r, err)
return
}

err = app.WriteJSON(w, http.StatusOK, healthCheck, nil)
err = a.WriteJSON(w, http.StatusOK, healthCheck, nil)

if err != nil {
app.serverErrorResponse(w, r, err)
a.serverErrorResponse(w, r, err)
}

}
4 changes: 2 additions & 2 deletions workspaces/backend/api/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

type Envelope map[string]any

func (app *App) WriteJSON(w http.ResponseWriter, status int, data any, headers http.Header) error {
func (a *App) WriteJSON(w http.ResponseWriter, status int, data any, headers http.Header) error {

js, err := json.MarshalIndent(data, "", "\t")

Expand All @@ -48,7 +48,7 @@ func (app *App) WriteJSON(w http.ResponseWriter, status int, data any, headers h
return nil
}

func (app *App) ReadJSON(w http.ResponseWriter, r *http.Request, dst any) error {
func (a *App) ReadJSON(w http.ResponseWriter, r *http.Request, dst any) error {

maxBytes := 1_048_576
r.Body = http.MaxBytesReader(w, r.Body, int64(maxBytes))
Expand Down
6 changes: 3 additions & 3 deletions workspaces/backend/api/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ import (
"net/http"
)

func (app *App) RecoverPanic(next http.Handler) http.Handler {
func (a *App) RecoverPanic(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
if err := recover(); err != nil {
w.Header().Set("Connection", "close")
app.serverErrorResponse(w, r, fmt.Errorf("%s", err))
a.serverErrorResponse(w, r, fmt.Errorf("%s", err))
}
}()

next.ServeHTTP(w, r)
})
}

func (app *App) enableCORS(next http.Handler) http.Handler {
func (a *App) enableCORS(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// TODO(ederign) restrict CORS to a much smaller set of trusted origins.
// TODO(ederign) deal with preflight requests
Expand Down
64 changes: 45 additions & 19 deletions workspaces/backend/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package main

import (
"flag"
"fmt"
application "github.com/kubeflow/notebooks/workspaces/backend/api"
"github.com/kubeflow/notebooks/workspaces/backend/config"
"log/slog"
"net/http"
"os"
"strconv"
"time"

ctrl "sigs.k8s.io/controller-runtime"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

application "github.com/kubeflow/notebooks/workspaces/backend/api"
"github.com/kubeflow/notebooks/workspaces/backend/internal/config"
"github.com/kubeflow/notebooks/workspaces/backend/internal/helper"
"github.com/kubeflow/notebooks/workspaces/backend/internal/server"
)

func main() {
Expand All @@ -34,26 +37,49 @@ func main() {

logger := slog.New(slog.NewTextHandler(os.Stdout, nil))

app, err := application.NewApp(cfg, logger)
kubeconfig, err := helper.GetKubeconfig()
if err != nil {
logger.Error(err.Error())
logger.Error("failed to get kubeconfig", "error", err)
os.Exit(1)
}

srv := &http.Server{
Addr: fmt.Sprintf(":%d", cfg.Port),
Handler: app.Routes(),
IdleTimeout: time.Minute,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
ErrorLog: slog.NewLogLogger(logger.Handler(), slog.LevelError),
scheme, err := helper.BuildScheme()
if err != nil {
logger.Error("failed to build Kubernetes scheme", "error", err)
os.Exit(1)
}
mgr, err := ctrl.NewManager(kubeconfig, ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{
BindAddress: "0", // disable metrics serving
},
HealthProbeBindAddress: "0", // disable health probe serving
LeaderElection: false,
})
if err != nil {
logger.Error("unable to create manager", "error", err)
os.Exit(1)
}

logger.Info("starting server", "addr", srv.Addr)
app, err := application.NewApp(cfg, logger, mgr.GetClient(), mgr.GetScheme())
if err != nil {
logger.Error("failed to create app", "error", err)
os.Exit(1)
}
svr, err := server.NewServer(app, logger)
if err != nil {
logger.Error("failed to create server", "error", err)
os.Exit(1)
}
if err := svr.SetupWithManager(mgr); err != nil {
logger.Error("failed to setup server with manager", "error", err)
os.Exit(1)
}

err = srv.ListenAndServe()
logger.Error(err.Error())
os.Exit(1)
logger.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
logger.Error("problem running manager", "error", err)
os.Exit(1)
}
}

func getEnv(key, defaultVal string) string {
Expand Down
Loading