From efa78803f8dabeef11fdb208d15d334259b3bf0f Mon Sep 17 00:00:00 2001 From: Ryan Nowak Date: Sat, 9 Dec 2023 17:27:50 -0800 Subject: [PATCH] Implement tracked resources (#6204) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Description This change implements 'tracked' resources for for every resource created in the Radius plane. This means that UCP will observe each operation and maintain its own state for the lifecycle of each resource. The purpose of this is to power multiple other pieces of infrastructure. New in this pull-request it's possible to list the resources (of all types) in a resource group. This is served from the 'tracked' resources maintained by UCP and does not need to query all resource providers or their databases. Future changes will implement cascading deletion for resource groups as well as notifications for resource modifications. ## Type of change - This pull request fixes a bug in Radius and has an approved issue (issue link required). - This pull request adds or changes features of Radius and has an approved issue (issue link required). ## Auto-generated summary ### 🤖 Generated by Copilot at 969afb9 ### Summary 📝🌐🔄 This pull request adds support for generic and proxy resources in the UCP service. It introduces a new `GenericResource` type in the UCP API and datamodel packages, and implements the conversion, serialization, and client functions for it. It also adds a new `Resources` group to the UCP API, which allows listing and querying resources in UCP. It updates the UCP backend service and controller to use the new resource type and operation method for tracked resources. It adds unit tests, documentation, and configuration settings for the new features. It also improves the logging and error handling of async operations, and removes some unused dependencies. > _Sing, O Muse, of the mighty deeds of the UCP service_ > _That tracks and lists the myriad resources in the cloud_ > _And how the skillful coders added new fields and types_ > _To make the `GenericResource` more versatile and proud_ ### Walkthrough * Add a new `GenericResource` type and its conversion functions to support storing and listing resources in UCP regardless of their actual type ([link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-f2d1781ff7fc84fff544f3a071c4bad77c26a30777d6d95f39cbd49ee16b6ba7R1-R49), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-f845c7096c463030307454c370c23460c9a806f229fe959e226cc9e81222ad70R1-R73), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-893ab293df1ca8739cc792cd8e8c5c5691cb0a308cec7f4ef2f06ecce4019761L1-R21), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-6f7e7aae5544d15eeb60fd6452c8cdcdd56c68c1c6c5a9910511ef0aef14aa77R170-R188), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-6f7e7aae5544d15eeb60fd6452c8cdcdd56c68c1c6c5a9910511ef0aef14aa77R232-R258), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-6f300088351b430ca2464beef84ad3069228f02d95a3dcd3641926ad1c2620fbR387-R429), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-6f300088351b430ca2464beef84ad3069228f02d95a3dcd3641926ad1c2620fbR558-R631), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-4f5a8ccf4f4002068030786c91856b61579168b6c60a91b815f6d525552bf907R1-R47), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-c71c1646c3c381abc13b22a43803aadd6725bdc1b956f957452fa0955032a723R1-R86)) * Add a new `Resources` group and its client to the UCP API, which contains the methods for listing and querying resources in UCP ([link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-df683afb7070d6ba64f3b2a742db37199c9827b9ba03cc95b48777ad2e601644R57-R61), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-416356e403db5b572d2c0f372baef2abfeb3000363efc6d41f55bdc65f47ebafR121-R125), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-a0503cdfd9eb8e97f731ed907b35daf38dde994f995701b0fd69ca1699883adfR1-R108), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-c74c23d29da9a5ac2ea4388c4cfd23f42018e45c93a29f71c4c5eeadebdfa4c6R132-R137)) * Add a new `TrackedResourceProcessController` type and its tests to the UCP backend, which performs background processing on tracked resources ([link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-58df6e92977a97ec7d0156ae7e0dafc869190c32b82b448d3e684ad7586170bfR1-R94), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-f966a886f9d05281b158e9739ff6c1e1e0bffe560230e7d5d8305dbedef6e309R1-R180), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-8aded038840f1f8802bdb81f53055a340da414b4d53a57b2af19bc6463599388L68-R93)) * Update the `UCPProviderName` constant to `System.Resources` to align with the new resource type for tracked resources ([link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-8aded038840f1f8802bdb81f53055a340da414b4d53a57b2af19bc6463599388L23-R33)) * Add some logging and error handling to the `updateResourceAndOperationStatus` function in the `worker.go` file, which updates the resource and operation status in the storage ([link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-5d9b9964242d77606f4868e3fe049978ea1c3f1903afcfeb60c76e19d9785fc5R254-R261), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-5d9b9964242d77606f4868e3fe049978ea1c3f1903afcfeb60c76e19d9785fc5L350-R360)) * Add a comment to the `generate-openapi-spec` target in the `Makefile` to describe its purpose ([link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-cc76a2a14994ce1ed06bd12ba9665a1d20a17992f345a7f8ca06afc934da2a92L35-R35)) * Add a new field `location` to the UCP service configuration and deployment files, and document its meaning ([link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-ceba0600c7e49ffd65b0a2fd7bf1798d9f6f6f531db64e051bb00ff29c7dcd93R12), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-be9c0b61a26165c87503e667cec005520aea94da66f0dd1eedfff863efd266acR13), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-c5a6e900bac29ec26476b731e62862ee9afb8f9f67225da0aa8fd1d052f8183fR220)) * Remove some unused dependencies and imports from the `go.mod` and `worker.go` files ([link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-33ef32bf6c23acb95f5902d7097b7a1d5128ca061167ec0716715b0b9eeaa5f6L14), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-33ef32bf6c23acb95f5902d7097b7a1d5128ca061167ec0716715b0b9eeaa5f6L70), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-33ef32bf6c23acb95f5902d7097b7a1d5128ca061167ec0716715b0b9eeaa5f6R99), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-33ef32bf6c23acb95f5902d7097b7a1d5128ca061167ec0716715b0b9eeaa5f6L107), [link](https://github.com/radius-project/radius/pull/6204/files?diff=unified&w=0#diff-5d9b9964242d77606f4868e3fe049978ea1c3f1903afcfeb60c76e19d9785fc5L24)) Signed-off-by: ytimocin Signed-off-by: Ryan Nowak Co-authored-by: Yetkin Timocin --- cmd/ucpd/ucp-self-hosted-dev.yaml | 1 + deploy/Chart/templates/ucp/configmaps.yaml | 1 + .../configSettings.md | 1 + pkg/armrpc/asyncoperation/worker/worker.go | 16 +- .../resourcegroups/trackedresourceprocess.go | 94 ++++ .../trackedresourceprocess_test.go | 180 +++++++ pkg/ucp/backend/service.go | 26 +- pkg/ucp/datamodel/genericresource.go | 40 +- pkg/ucp/datamodel/genericresource_test.go | 37 ++ pkg/ucp/frontend/api/server.go | 10 +- pkg/ucp/frontend/controller/radius/proxy.go | 323 +++++++++++++ .../frontend/controller/radius/proxy_test.go | 357 ++++++++++++++ .../controller/resourcegroups/util.go | 111 +++++ .../controller/resourcegroups/util_test.go | 204 ++++++++ pkg/ucp/frontend/modules/types.go | 4 + pkg/ucp/frontend/radius/routes.go | 13 +- pkg/ucp/integrationtests/radius/proxy_test.go | 139 +++++- pkg/ucp/integrationtests/testrp/async.go | 14 +- .../integrationtests/testserver/testserver.go | 31 +- pkg/ucp/server/server.go | 12 +- pkg/ucp/trackedresource/doc.go | 19 + pkg/ucp/trackedresource/name.go | 85 ++++ pkg/ucp/trackedresource/name_test.go | 38 ++ pkg/ucp/trackedresource/update.go | 294 ++++++++++++ pkg/ucp/trackedresource/update_test.go | 442 ++++++++++++++++++ test/functional/shared/test.go | 3 +- test/functional/ucp/tracked_resource_test.go | 174 +++++++ 27 files changed, 2634 insertions(+), 35 deletions(-) create mode 100644 pkg/ucp/backend/controller/resourcegroups/trackedresourceprocess.go create mode 100644 pkg/ucp/backend/controller/resourcegroups/trackedresourceprocess_test.go create mode 100644 pkg/ucp/datamodel/genericresource_test.go create mode 100644 pkg/ucp/frontend/controller/radius/proxy.go create mode 100644 pkg/ucp/frontend/controller/radius/proxy_test.go create mode 100644 pkg/ucp/frontend/controller/resourcegroups/util.go create mode 100644 pkg/ucp/frontend/controller/resourcegroups/util_test.go create mode 100644 pkg/ucp/trackedresource/doc.go create mode 100644 pkg/ucp/trackedresource/name.go create mode 100644 pkg/ucp/trackedresource/name_test.go create mode 100644 pkg/ucp/trackedresource/update.go create mode 100644 pkg/ucp/trackedresource/update_test.go create mode 100644 test/functional/ucp/tracked_resource_test.go diff --git a/cmd/ucpd/ucp-self-hosted-dev.yaml b/cmd/ucpd/ucp-self-hosted-dev.yaml index f5311cbee1..d69c202251 100644 --- a/cmd/ucpd/ucp-self-hosted-dev.yaml +++ b/cmd/ucpd/ucp-self-hosted-dev.yaml @@ -9,6 +9,7 @@ # - Talk to Portable Resources' Providers on port 8081 # - Disables metrics and profiler # +location: 'global' storageProvider: provider: "apiserver" apiserver: diff --git a/deploy/Chart/templates/ucp/configmaps.yaml b/deploy/Chart/templates/ucp/configmaps.yaml index 96b012f2b7..18fe1d60e1 100644 --- a/deploy/Chart/templates/ucp/configmaps.yaml +++ b/deploy/Chart/templates/ucp/configmaps.yaml @@ -10,6 +10,7 @@ data: ucp-config.yaml: |- # Radius configuration file. # See https://github.com/radius-project/radius/blob/main/docs/contributing/contributing-code/contributing-code-control-plane/configSettings.md for more information. + location: 'global' storageProvider: provider: "apiserver" apiserver: diff --git a/docs/contributing/contributing-code/contributing-code-control-plane/configSettings.md b/docs/contributing/contributing-code/contributing-code-control-plane/configSettings.md index 307e0e09b0..d6245a508a 100644 --- a/docs/contributing/contributing-code/contributing-code-control-plane/configSettings.md +++ b/docs/contributing/contributing-code/contributing-code-control-plane/configSettings.md @@ -218,6 +218,7 @@ ucp: ### UCP ```yaml +location: 'global' storageProvider: provider: "apiserver" apiserver: diff --git a/pkg/armrpc/asyncoperation/worker/worker.go b/pkg/armrpc/asyncoperation/worker/worker.go index 36486b73b3..20640b57d7 100644 --- a/pkg/armrpc/asyncoperation/worker/worker.go +++ b/pkg/armrpc/asyncoperation/worker/worker.go @@ -21,7 +21,6 @@ import ( "encoding/json" "errors" "fmt" - "net/http" "runtime/debug" "strings" "time" @@ -252,6 +251,14 @@ func (w *AsyncRequestProcessWorker) runOperation(ctx context.Context, message *q logger.Info("Start processing operation.") result, err := asyncCtrl.Run(asyncReqCtx, asyncReq) + + code := "" + if result.Error != nil { + code = result.Error.Code + } + + logger.Info("Operation returned", "success", result.Error == nil, "code", code, "provisioningState", result.ProvisioningState(), "err", err) + // There are two cases when asyncReqCtx is canceled. // 1. When the operation is timed out, w.completeOperation will be called in L186 // 2. When parent context is canceled or done, we need to requeue the operation to reprocess the request. @@ -262,6 +269,7 @@ func (w *AsyncRequestProcessWorker) runOperation(ctx context.Context, message *q result.SetFailed(armErr, false) logger.Error(err, "Operation Failed") } + w.completeOperation(ctx, message, result, asyncCtrl.StorageClient()) } trace.SetAsyncResultStatus(result, span) @@ -347,10 +355,10 @@ func (w *AsyncRequestProcessWorker) updateResourceAndOperationStatus(ctx context return err } - opType, _ := v1.ParseOperationType(req.OperationType) - err = updateResourceState(ctx, sc, rID.String(), state) - if err != nil && !(opType.Method == http.MethodDelete && errors.Is(&store.ErrNotFound{ID: rID.String()}, err)) { + if errors.Is(err, &store.ErrNotFound{}) { + logger.Info("failed to update the provisioningState in resource because it no longer exists.") + } else if err != nil { logger.Error(err, "failed to update the provisioningState in resource.") return err } diff --git a/pkg/ucp/backend/controller/resourcegroups/trackedresourceprocess.go b/pkg/ucp/backend/controller/resourcegroups/trackedresourceprocess.go new file mode 100644 index 0000000000..5623d88518 --- /dev/null +++ b/pkg/ucp/backend/controller/resourcegroups/trackedresourceprocess.go @@ -0,0 +1,94 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcegroups + +import ( + "context" + "errors" + "fmt" + "net/http" + + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + ctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller" + "github.com/radius-project/radius/pkg/ucp/datamodel" + "github.com/radius-project/radius/pkg/ucp/frontend/controller/resourcegroups" + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/pkg/ucp/trackedresource" + "github.com/radius-project/radius/pkg/ucp/ucplog" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" +) + +var _ ctrl.Controller = (*TrackedResourceProcessController)(nil) + +type updater interface { + Update(ctx context.Context, downstreamURL string, originalID resources.ID, version string) error +} + +// TrackedResourceProcessController is the async operation controller to perform background processing on tracked resources. +type TrackedResourceProcessController struct { + ctrl.BaseController + + // Updater is the utility struct that can perform updates on tracked resources. This can be modified for testing. + updater updater +} + +// NewTrackedResourceProcessController creates a new TrackedResourceProcessController controller which is used to process resources asynchronously. +func NewTrackedResourceProcessController(opts ctrl.Options) (ctrl.Controller, error) { + transport := otelhttp.NewTransport(http.DefaultTransport) + return &TrackedResourceProcessController{ctrl.NewBaseAsyncController(opts), trackedresource.NewUpdater(opts.StorageClient, &http.Client{Transport: transport})}, nil +} + +// Run retrieves a resource from storage, parses the resource ID, and updates our tracked resource entry in the background. +func (c *TrackedResourceProcessController) Run(ctx context.Context, request *ctrl.Request) (ctrl.Result, error) { + resource, err := store.GetResource[datamodel.GenericResource](ctx, c.StorageClient(), request.ResourceID) + if errors.Is(err, &store.ErrNotFound{}) { + return ctrl.NewFailedResult(v1.ErrorDetails{Code: v1.CodeNotFound, Message: fmt.Sprintf("resource %q not found", request.ResourceID), Target: request.ResourceID}), nil + } else if err != nil { + return ctrl.Result{}, err + } + + originalID, err := resources.Parse(resource.Properties.ID) + if err != nil { + return ctrl.Result{}, err + } + + downstreamURL, err := resourcegroups.ValidateDownstream(ctx, c.StorageClient(), originalID) + if errors.Is(err, &resourcegroups.NotFoundError{}) { + return ctrl.NewFailedResult(v1.ErrorDetails{Code: v1.CodeNotFound, Message: err.Error(), Target: request.ResourceID}), nil + } else if errors.Is(err, &resourcegroups.InvalidError{}) { + return ctrl.NewFailedResult(v1.ErrorDetails{Code: v1.CodeInvalid, Message: err.Error(), Target: request.ResourceID}), nil + } else if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to validate downstream: %w", err) + } + + logger := ucplog.FromContextOrDiscard(ctx) + logger.Info("Processing tracked resource", "resourceID", originalID) + err = c.updater.Update(ctx, downstreamURL.String(), originalID, resource.Properties.APIVersion) + if errors.Is(err, &trackedresource.InProgressErr{}) { + // The resource is still being processed, so we can sleep for a while. + result := ctrl.Result{} + result.SetFailed(v1.ErrorDetails{Code: v1.CodeConflict, Message: err.Error(), Target: request.ResourceID}, true) + + return result, nil + } else if err != nil { + return ctrl.Result{}, err + } + + logger.Info("Completed processing tracked resource", "resourceID", originalID) + return ctrl.Result{}, nil +} diff --git a/pkg/ucp/backend/controller/resourcegroups/trackedresourceprocess_test.go b/pkg/ucp/backend/controller/resourcegroups/trackedresourceprocess_test.go new file mode 100644 index 0000000000..6ab1ad7cea --- /dev/null +++ b/pkg/ucp/backend/controller/resourcegroups/trackedresourceprocess_test.go @@ -0,0 +1,180 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcegroups + +import ( + "context" + "fmt" + "testing" + + "github.com/golang/mock/gomock" + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller" + "github.com/radius-project/radius/pkg/to" + "github.com/radius-project/radius/pkg/ucp/api/v20231001preview" + "github.com/radius-project/radius/pkg/ucp/datamodel" + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/pkg/ucp/trackedresource" + "github.com/radius-project/radius/test/testcontext" + "github.com/stretchr/testify/require" +) + +func Test_Run(t *testing.T) { + setup := func(t *testing.T) (*TrackedResourceProcessController, *mockUpdater, *store.MockStorageClient) { + ctrl := gomock.NewController(t) + storageClient := store.NewMockStorageClient(ctrl) + + pc, err := NewTrackedResourceProcessController(controller.Options{StorageClient: storageClient}) + require.NoError(t, err) + + updater := mockUpdater{} + pc.(*TrackedResourceProcessController).updater = &updater + return pc.(*TrackedResourceProcessController), &updater, storageClient + } + + id := resources.MustParse("/planes/test/local/resourceGroups/test-rg/providers/Applications.Test/testResources/my-resource") + trackingID := trackedresource.IDFor(id) + + plane := datamodel.Plane{ + Properties: datamodel.PlaneProperties{ + Kind: datamodel.PlaneKind(v20231001preview.PlaneKindUCPNative), + ResourceProviders: map[string]*string{ + "Applications.Test": to.Ptr("https://localhost:1234"), + }, + }, + } + resourceGroup := datamodel.ResourceGroup{} + data := datamodel.GenericResourceFromID(id, trackingID) + + // Most of the heavy lifting is done by the updater. We just need to test that we're calling it correctly. + t.Run("Success", func(t *testing.T) { + pc, _, storageClient := setup(t) + + storageClient.EXPECT(). + Get(gomock.Any(), trackingID.String(), gomock.Any()). + Return(&store.Object{Data: data}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+trackingID.PlaneNamespace(), gomock.Any()). + Return(&store.Object{Data: plane}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), trackingID.RootScope(), gomock.Any()). + Return(&store.Object{Data: resourceGroup}, nil).Times(1) + + result, err := pc.Run(testcontext.New(t), &controller.Request{ResourceID: trackingID.String()}) + require.Equal(t, controller.Result{}, result) + require.NoError(t, err) + }) + + t.Run("retry", func(t *testing.T) { + pc, updater, storageClient := setup(t) + + storageClient.EXPECT(). + Get(gomock.Any(), trackingID.String(), gomock.Any()). + Return(&store.Object{Data: data}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+trackingID.PlaneNamespace(), gomock.Any()). + Return(&store.Object{Data: plane}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), trackingID.RootScope(), gomock.Any()). + Return(&store.Object{Data: resourceGroup}, nil).Times(1) + + // Force a retry. + updater.Result = &trackedresource.InProgressErr{} + + expected := controller.Result{} + expected.SetFailed(v1.ErrorDetails{Code: v1.CodeConflict, Message: updater.Result.Error(), Target: trackingID.String()}, true) + + result, err := pc.Run(testcontext.New(t), &controller.Request{ResourceID: trackingID.String()}) + require.Equal(t, expected, result) + require.NoError(t, err) + }) + + t.Run("Failure (resource not found)", func(t *testing.T) { + pc, _, storageClient := setup(t) + + storageClient.EXPECT(). + Get(gomock.Any(), trackingID.String(), gomock.Any()). + Return(nil, &store.ErrNotFound{}).Times(1) + + expected := controller.NewFailedResult(v1.ErrorDetails{ + Code: v1.CodeNotFound, + Message: fmt.Sprintf("resource %q not found", trackingID.String()), + Target: trackingID.String(), + }) + + result, err := pc.Run(testcontext.New(t), &controller.Request{ResourceID: trackingID.String()}) + require.Equal(t, expected, result) + require.NoError(t, err) + }) + + t.Run("Failure (validate downstream: not found)", func(t *testing.T) { + pc, _, storageClient := setup(t) + + storageClient.EXPECT(). + Get(gomock.Any(), trackingID.String(), gomock.Any()). + Return(&store.Object{Data: data}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+trackingID.PlaneNamespace(), gomock.Any()). + Return(nil, &store.ErrNotFound{}).Times(1) + + expected := controller.NewFailedResult(v1.ErrorDetails{ + Code: v1.CodeNotFound, + Message: "plane \"/planes/test/local\" not found", + Target: trackingID.String(), + }) + + result, err := pc.Run(testcontext.New(t), &controller.Request{ResourceID: trackingID.String()}) + require.Equal(t, expected, result) + require.NoError(t, err) + }) + + t.Run("Failure (validate downstream: invalid downstream)", func(t *testing.T) { + pc, _, storageClient := setup(t) + + storageClient.EXPECT(). + Get(gomock.Any(), trackingID.String(), gomock.Any()). + Return(&store.Object{Data: data}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+trackingID.PlaneNamespace(), gomock.Any()). + Return(&store.Object{Data: datamodel.Plane{}}, nil).Times(1) + + expected := controller.NewFailedResult(v1.ErrorDetails{ + Code: v1.CodeInvalid, + Message: "unexpected plane type ", + Target: trackingID.String(), + }) + + result, err := pc.Run(testcontext.New(t), &controller.Request{ResourceID: trackingID.String()}) + require.Equal(t, expected, result) + require.NoError(t, err) + }) +} + +type mockUpdater struct { + Result error +} + +func (u *mockUpdater) Update(ctx context.Context, downstreamURL string, originalID resources.ID, version string) error { + return u.Result +} diff --git a/pkg/ucp/backend/service.go b/pkg/ucp/backend/service.go index ad3039e447..66c902fac7 100644 --- a/pkg/ucp/backend/service.go +++ b/pkg/ucp/backend/service.go @@ -20,12 +20,17 @@ import ( "context" "fmt" + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + ctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller" "github.com/radius-project/radius/pkg/armrpc/asyncoperation/worker" "github.com/radius-project/radius/pkg/armrpc/hostoptions" + "github.com/radius-project/radius/pkg/ucp/api/v20231001preview" + "github.com/radius-project/radius/pkg/ucp/backend/controller/resourcegroups" + "github.com/radius-project/radius/pkg/ucp/datamodel" ) const ( - UCPProviderName = "ucp" + UCPProviderName = "System.Resources" ) // Service is a service to run AsyncReqeustProcessWorker. @@ -65,5 +70,24 @@ func (w *Service) Run(ctx context.Context) error { } } + opts := ctrl.Options{ + DataProvider: w.StorageProvider, + } + + err := RegisterControllers(ctx, w.Controllers, opts) + if err != nil { + return err + } + return w.Start(ctx, workerOpts) } + +// RegisterControllers registers the controllers for the UCP backend. +func RegisterControllers(ctx context.Context, registry *worker.ControllerRegistry, opts ctrl.Options) error { + err := registry.Register(ctx, v20231001preview.ResourceType, v1.OperationMethod(datamodel.OperationProcess), resourcegroups.NewTrackedResourceProcessController, opts) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/ucp/datamodel/genericresource.go b/pkg/ucp/datamodel/genericresource.go index f80d60a3b2..f561569a54 100644 --- a/pkg/ucp/datamodel/genericresource.go +++ b/pkg/ucp/datamodel/genericresource.go @@ -16,7 +16,17 @@ limitations under the License. package datamodel -import v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" +import ( + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/ucp/resources" +) + +const ( + // OperationProcess is the operation type for processing a tracked resource. + OperationProcess = "PROCESS" + // ResourceType is the resource type for a generic resource. + ResourceType = "System.Resources/resources" +) // GenericResource represents a stored "tracked resource" within a UCP resource group. // @@ -37,7 +47,7 @@ type GenericResource struct { // ResourceTypeName gives the type of ucp resource. func (r *GenericResource) ResourceTypeName() string { - return "System.Resources/resources" + return ResourceType } // GenericResourceProperties stores the properties of the resource being tracked. @@ -52,4 +62,30 @@ type GenericResourceProperties struct { Name string `json:"name"` // Type is the resource type. Type string `json:"type"` + + // APIVersion is the version of the API that can be used to query the resource. + APIVersion string `json:"apiVersion"` + + // OperationID is the last operation that updated this entry. This is used when an operation + // is enqueued as a way to force a different Etag to be returned. This data doesn't need to be + // read or used, it's just acting as a "salt" for the Etag. + OperationID string `json:"operationId"` +} + +// GenericResourceFromID creates a new GenericResource from the given original resource ID and tracking ID. +func GenericResourceFromID(originalID resources.ID, trackingID resources.ID) *GenericResource { + return &GenericResource{ + BaseResource: v1.BaseResource{ + TrackedResource: v1.TrackedResource{ + ID: trackingID.String(), + Type: trackingID.Type(), + Name: trackingID.Name(), + }, + }, + Properties: GenericResourceProperties{ + ID: originalID.String(), + Name: originalID.Name(), + Type: originalID.Type(), + }, + } } diff --git a/pkg/ucp/datamodel/genericresource_test.go b/pkg/ucp/datamodel/genericresource_test.go new file mode 100644 index 0000000000..e1e0054a38 --- /dev/null +++ b/pkg/ucp/datamodel/genericresource_test.go @@ -0,0 +1,37 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datamodel + +import ( + "testing" + + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/stretchr/testify/require" +) + +func Test_GenericResourceFromID(t *testing.T) { + id := resources.MustParse("/planes/test/local/resourceGroups/test-rg/providers/Applications.Test/testResources/my-resource") + trackingID := resources.MustParse("/planes/test/local/resourceGroups/test-rg/providers/System.Resources/genericResources/asdf") + + actual := GenericResourceFromID(id, trackingID) + require.Equal(t, trackingID.String(), actual.ID) + require.Equal(t, trackingID.Type(), actual.Type) + require.Equal(t, trackingID.Name(), actual.Name) + require.Equal(t, id.String(), actual.Properties.ID) + require.Equal(t, id.Type(), actual.Properties.Type) + require.Equal(t, id.Name(), actual.Properties.Name) +} diff --git a/pkg/ucp/frontend/api/server.go b/pkg/ucp/frontend/api/server.go index 78f88b04a6..8222a316b1 100644 --- a/pkg/ucp/frontend/api/server.go +++ b/pkg/ucp/frontend/api/server.go @@ -25,6 +25,7 @@ import ( "net/http" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" armrpc_controller "github.com/radius-project/radius/pkg/armrpc/frontend/controller" "github.com/radius-project/radius/pkg/armrpc/frontend/defaultoperation" "github.com/radius-project/radius/pkg/armrpc/servicecontext" @@ -113,7 +114,6 @@ func (s *Service) Name() string { // registers the routes, configures the default planes, and sets up the http server with the appropriate middleware. It // returns an http server and an error if one occurs. func (s *Service) Initialize(ctx context.Context) (*http.Server, error) { - var err error r := chi.NewRouter() s.storageProvider = dataprovider.NewStorageProvider(s.options.StorageProviderOptions) @@ -125,6 +125,13 @@ func (s *Service) Initialize(ctx context.Context) (*http.Server, error) { return nil, err } + queueClient, err := s.queueProvider.GetClient(ctx) + if err != nil { + return nil, err + } + + statusManager := statusmanager.New(s.storageProvider, queueClient, s.options.Location) + moduleOptions := modules.Options{ Address: s.options.Address, PathBase: s.options.PathBase, @@ -134,6 +141,7 @@ func (s *Service) Initialize(ctx context.Context) (*http.Server, error) { QueueProvider: s.queueProvider, SecretProvider: s.secretProvider, SpecLoader: specLoader, + StatusManager: statusManager, UCPConnection: s.options.UCPConnection, } diff --git a/pkg/ucp/frontend/controller/radius/proxy.go b/pkg/ucp/frontend/controller/radius/proxy.go new file mode 100644 index 0000000000..a11bd92c04 --- /dev/null +++ b/pkg/ucp/frontend/controller/radius/proxy.go @@ -0,0 +1,323 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package radius + +import ( + "context" + "errors" + "fmt" + "net/http" + "net/url" + "strings" + "time" + + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" + armrpc_controller "github.com/radius-project/radius/pkg/armrpc/frontend/controller" + armrpc_rest "github.com/radius-project/radius/pkg/armrpc/rest" + "github.com/radius-project/radius/pkg/middleware" + "github.com/radius-project/radius/pkg/ucp/datamodel" + "github.com/radius-project/radius/pkg/ucp/frontend/controller/resourcegroups" + "github.com/radius-project/radius/pkg/ucp/proxy" + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/pkg/ucp/trackedresource" + "github.com/radius-project/radius/pkg/ucp/ucplog" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" +) + +const ( + PlanesPath = "/planes" + + // ProcessOperationTimeout is the timeout for processing a tracked resource in the background. + ProcessOperationTimeout = 12 * time.Hour + + // ProcessOperationRetryAfter is the retry interval for processing a tracked resource in the background. + // This is used when the tracked resource is not in a terminal state. + ProcessOperationRetryAfter = 5 * time.Second + + // EnqueueOperationRetryCount is the number of times to retry enqueueing an async operation before giving up. + EnqueueOperationRetryCount = 10 +) + +type updater interface { + Update(ctx context.Context, downstream string, id resources.ID, version string) error +} + +var _ armrpc_controller.Controller = (*ProxyController)(nil) + +// ProxyController is the controller implementation to proxy requests to appropriate RP or URL. +type ProxyController struct { + armrpc_controller.Operation[*datamodel.Plane, datamodel.Plane] + + // transport is the http.RoundTripper to use for proxying requests. Can be overridden for testing. + transport http.RoundTripper + + // updater is used to process tracked resources. Can be overridden for testing. + updater updater +} + +// # Function Explanation +// +// NewProxyController creates a new ProxyPlane controller with the given options and returns it, or returns an error if the +// controller cannot be created. +func NewProxyController(opts armrpc_controller.Options) (armrpc_controller.Controller, error) { + transport := otelhttp.NewTransport(http.DefaultTransport) + updater := trackedresource.NewUpdater(opts.StorageClient, &http.Client{Transport: transport}) + return &ProxyController{ + Operation: armrpc_controller.NewOperation(opts, armrpc_controller.ResourceOptions[datamodel.Plane]{}), + transport: transport, + updater: updater, + }, nil +} + +// # Function Explanation +// +// Run() takes in a request object and context, looks up the plane and resource provider associated with the +// request, and proxies the request to the appropriate resource provider. +func (p *ProxyController) Run(ctx context.Context, w http.ResponseWriter, req *http.Request) (armrpc_rest.Response, error) { + logger := ucplog.FromContextOrDiscard(ctx) + + logger.V(ucplog.LevelDebug).Info("starting proxy request") + for key, value := range req.Header { + logger.V(ucplog.LevelDebug).Info("incoming request header", "key", key, "value", value) + } + + // NOTE: avoid using the request URL directly as the casing may have been normalized. + // use the original URL instead. + requestCtx := v1.ARMRequestContextFromContext(ctx) + id := requestCtx.ResourceID + relativePath := middleware.GetRelativePath(p.Options().PathBase, requestCtx.OrignalURL.Path) + + downstreamURL, err := resourcegroups.ValidateDownstream(ctx, p.StorageClient(), id) + if errors.Is(err, &resourcegroups.NotFoundError{}) { + return armrpc_rest.NewNotFoundResponse(id), nil + } else if errors.Is(err, &resourcegroups.InvalidError{}) { + response := v1.ErrorResponse{Error: v1.ErrorDetails{Code: v1.CodeInvalid, Message: err.Error(), Target: id.String()}} + return armrpc_rest.NewBadRequestARMResponse(response), nil + } else if err != nil { + return nil, fmt.Errorf("failed to validate downstream: %w", err) + } + + proxyReq, err := p.PrepareProxyRequest(ctx, req, downstreamURL.String(), relativePath) + if err != nil { + return nil, err + } + + interceptor := &responseInterceptor{Inner: p.transport} + + sender := proxy.NewARMProxy(proxy.ReverseProxyOptions{RoundTripper: interceptor}, downstreamURL, nil) + sender.ServeHTTP(w, proxyReq) + + if interceptor.Response == nil { + logger.V(ucplog.LevelDebug).Error(err, "failed to proxy request") + return nil, nil + } + + // If we get here then we've successfully proxied the request. Now we interpret the response. + logger.V(ucplog.LevelDebug).Info("finished proxy request", "http.statuscode", interceptor.Response.StatusCode) + for key, value := range req.Header { + logger.V(ucplog.LevelDebug).Info("outgoing response header", "key", key, "value", value) + } + + if !p.ShouldTrackRequest(req.Method, id, interceptor.Response) { + logger.V(ucplog.LevelDebug).Info("request does not need to be tracked") + return nil, nil + } + + if p.IsTerminalResponse(interceptor.Response) { + logger.V(ucplog.LevelDebug).Info("response is terminal, updating tracked resource synchronously") + err = p.UpdateTrackedResource(ctx, downstreamURL.String(), id, requestCtx.APIVersion) + if errors.Is(err, &trackedresource.InProgressErr{}) { + logger.V(ucplog.LevelDebug).Info("synchronous update failed, updating tracked resource asynchronously") + // Continue executing + } else if err != nil { + // We can't return the response to the client if we failed to update the tracked resource. Instead + // fallback to the async path. + logger.Error(err, "failed to update tracked resource synchronously") + // Continue executing + } else { + logger.V(ucplog.LevelDebug).Info("tracked resource updated synchronously") + return nil, nil + } + } else { + logger.V(ucplog.LevelDebug).Info("response is not terminal, updating tracked resource asynchronously") + } + + // If we get here then we need to update the tracked resource, but the operation is not yet complete. + err = p.EnqueueTrackedResourceUpdate(ctx, id, requestCtx.APIVersion) + if err != nil { + logger.Error(err, "failed to enqueue tracked resource update") + return nil, nil + } + + return nil, nil +} + +// PrepareProxyRequest constructs and initializes the proxy request. +func (p *ProxyController) PrepareProxyRequest(ctx context.Context, originalReq *http.Request, downstream string, relativePath string) (*http.Request, error) { + proxyReq := originalReq.Clone(ctx) + requestURL, err := url.Parse(downstream) + if err != nil { + return nil, fmt.Errorf("failed to parse downstream URL: %w", err) + } + proxyReq.URL = requestURL + proxyReq.URL.Path = relativePath + proxyReq.URL.RawQuery = originalReq.URL.RawQuery + + refererURL := url.URL{ + Scheme: "http", + Host: originalReq.Host, + Path: originalReq.URL.Path, + RawQuery: originalReq.URL.RawQuery, + } + + // As per https://github.com/golang/go/issues/28940#issuecomment-441749380, the way to check + // for http vs https is check the TLS field + if originalReq.TLS != nil { + refererURL.Scheme = "https" + } + + proxyReq.Header.Set("X-Forwarded-Proto", refererURL.Scheme) + proxyReq.Header.Set(v1.RefererHeader, refererURL.String()) + + return proxyReq, nil +} + +// ShouldTrackRequest returns true if the request should be tracked. +func (p *ProxyController) ShouldTrackRequest(httpMethod string, id resources.ID, resp *http.Response) bool { + // Only track mutating requests. + if !strings.EqualFold(httpMethod, http.MethodPut) && !strings.EqualFold(httpMethod, http.MethodPatch) && !strings.EqualFold(httpMethod, http.MethodDelete) { + return false + } + + // For now we just track top-level resources. + if len(id.TypeSegments()) != 1 || !id.IsResource() { + return false + } + + if resp.StatusCode < 200 && resp.StatusCode >= 300 { + return false // Not a success + } + + return true +} + +// IsTerminalResponse returns true if the response is terminal. +func (p *ProxyController) IsTerminalResponse(resp *http.Response) bool { + return resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusAccepted +} + +// UpdateTrackedResource updates the tracked resource synchronously. +func (p *ProxyController) UpdateTrackedResource(ctx context.Context, downstream string, id resources.ID, apiVersion string) error { + return p.updater.Update(ctx, downstream, id, apiVersion) +} + +// EnqueueTrackedResourceUpdate enqueues an async operation to update the tracked resource. +func (p *ProxyController) EnqueueTrackedResourceUpdate(ctx context.Context, id resources.ID, apiVersion string) error { + logger := ucplog.FromContextOrDiscard(ctx) + + trackingID := trackedresource.IDFor(id) + + // Create a serviceCtx for the operation that we're going to process on the resource. + serviceCtx := *v1.ARMRequestContextFromContext(ctx) + serviceCtx.ResourceID = trackingID + serviceCtx.OperationType = v1.OperationType{Type: trackingID.Type(), Method: datamodel.OperationProcess} + + // Create the database entry for the tracked resource. + // + // If a non-terminal response was returned from the RP then at this instant the resource exists, even if it is + // being deleted. + entry := datamodel.GenericResourceFromID(id, trackingID) + entry.Properties.APIVersion = apiVersion + entry.Properties.OperationID = serviceCtx.OperationID.String() + + // We need to update the tracked resource entry in the database using optimistic concurrency. This means that we + // need to read the existing entry, update it, and then write it back. If the write fails then we need to retry. + // + // This concurrency scheme ensures that the background process will "observe" the last state of the resource. + // + // Think of it like this, each time the resource is changing we poke the background process and say "hey, the + // resource is changing, you should check it out". The background process then reads the resource and updates the + // state. + queueOperation := false +retry: + for retryCount := 1; retryCount <= EnqueueOperationRetryCount; retryCount++ { + obj, err := p.StorageClient().Get(ctx, trackingID.String()) + if errors.Is(err, &store.ErrNotFound{}) { + // Safe to ignore. This means that the resource has not been tracked yet. + } else if err != nil { + return err + } + + etag := "" + if obj != nil { + etag = obj.ETag + err = obj.As(&entry) + if err != nil { + return err + } + } + + // Keep the existing provisioningState if possible. + if entry.InternalMetadata.AsyncProvisioningState == "" || entry.InternalMetadata.AsyncProvisioningState.IsTerminal() { + queueOperation = true + entry.InternalMetadata.AsyncProvisioningState = v1.ProvisioningStateAccepted + } + + logger.V(ucplog.LevelDebug).Info("enqueuing tracked resource update") + err = p.StorageClient().Save(ctx, &store.Object{Metadata: store.Metadata{ID: trackingID.String()}, Data: entry}, store.WithETag(etag)) + if errors.Is(err, &store.ErrConcurrency{}) { + // This means we hit a concurrency error saving the tracked resource entry. This means that the resource + // was updated in the background. We should retry. + logger.V(ucplog.LevelDebug).Info("enqueue tracked resource update failed due to concurrency error", "retryCount", retryCount) + continue + } else if err != nil { + return err + } + + break retry + } + + // Only queue an operation if necessary, eg: if we changed the provisioningState. + if !queueOperation { + return nil + } + + err := p.StatusManager().QueueAsyncOperation(ctx, &serviceCtx, statusmanager.QueueOperationOptions{OperationTimeout: ProcessOperationTimeout, RetryAfter: ProcessOperationRetryAfter}) + if err != nil { + return err + } + + return nil +} + +// responseInterceptor is a http.RoundTripper that records the response and error from the inner http.RoundTripper. +// +// This type is NOT thread-safe and should be created and used per-request. +type responseInterceptor struct { + Inner http.RoundTripper + + Response *http.Response + Error error +} + +// RoundTrip implements http.RoundTripper by capturing the response and error. +func (i *responseInterceptor) RoundTrip(req *http.Request) (*http.Response, error) { + i.Response, i.Error = i.Inner.RoundTrip(req) + return i.Response, i.Error +} diff --git a/pkg/ucp/frontend/controller/radius/proxy_test.go b/pkg/ucp/frontend/controller/radius/proxy_test.go new file mode 100644 index 0000000000..69212994e7 --- /dev/null +++ b/pkg/ucp/frontend/controller/radius/proxy_test.go @@ -0,0 +1,357 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package radius + +import ( + "context" + "crypto/tls" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/golang/mock/gomock" + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" + "github.com/radius-project/radius/pkg/armrpc/frontend/controller" + "github.com/radius-project/radius/pkg/armrpc/rest" + "github.com/radius-project/radius/pkg/to" + "github.com/radius-project/radius/pkg/ucp/api/v20231001preview" + "github.com/radius-project/radius/pkg/ucp/datamodel" + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/pkg/ucp/trackedresource" + "github.com/radius-project/radius/test/testcontext" + "github.com/stretchr/testify/require" +) + +// The Run function is also tested by integration tests in the pkg/ucp/integrationtests/radius package. + +func createController(t *testing.T) (*ProxyController, *store.MockStorageClient, *mockUpdater, *mockRoundTripper, *statusmanager.MockStatusManager) { + ctrl := gomock.NewController(t) + storageClient := store.NewMockStorageClient(ctrl) + statusManager := statusmanager.NewMockStatusManager(ctrl) + + p, err := NewProxyController(controller.Options{StorageClient: storageClient, StatusManager: statusManager}) + require.NoError(t, err) + + updater := mockUpdater{} + roundTripper := mockRoundTripper{} + + pc := p.(*ProxyController) + pc.updater = &updater + pc.transport = &roundTripper + + return pc, storageClient, &updater, &roundTripper, statusManager +} + +func Test_Run(t *testing.T) { + id := resources.MustParse("/planes/test/local/resourceGroups/test-rg/providers/Applications.Test/testResources/my-resource") + + plane := datamodel.Plane{ + Properties: datamodel.PlaneProperties{ + Kind: datamodel.PlaneKind(v20231001preview.PlaneKindUCPNative), + ResourceProviders: map[string]*string{ + "Applications.Test": to.Ptr("https://localhost:1234"), + }, + }, + } + resourceGroup := datamodel.ResourceGroup{} + + t.Run("success (non-tracked)", func(t *testing.T) { + p, storageClient, _, roundTripper, _ := createController(t) + + svcContext := &v1.ARMRequestContext{ + ResourceID: id, + } + ctx := testcontext.New(t) + ctx = v1.WithARMRequestContext(ctx, svcContext) + + w := httptest.NewRecorder() + + // Not a mutating request + req := httptest.NewRequest(http.MethodGet, id.String(), nil) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+id.PlaneNamespace(), gomock.Any()). + Return(&store.Object{Data: plane}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), id.RootScope(), gomock.Any()). + Return(&store.Object{Data: resourceGroup}, nil).Times(1) + + downstreamResponse := httptest.NewRecorder() + downstreamResponse.WriteHeader(http.StatusOK) + roundTripper.Response = downstreamResponse.Result() + + response, err := p.Run(ctx, w, req.WithContext(ctx)) + require.NoError(t, err) + require.Nil(t, response) + }) + + t.Run("success (tracked terminal response)", func(t *testing.T) { + p, storageClient, updater, roundTripper, _ := createController(t) + + svcContext := &v1.ARMRequestContext{ + ResourceID: id, + } + ctx := testcontext.New(t) + ctx = v1.WithARMRequestContext(ctx, svcContext) + + w := httptest.NewRecorder() + + // Mutating request that will complete synchronously + req := httptest.NewRequest(http.MethodDelete, id.String(), nil) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+id.PlaneNamespace(), gomock.Any()). + Return(&store.Object{Data: plane}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), id.RootScope(), gomock.Any()). + Return(&store.Object{Data: resourceGroup}, nil).Times(1) + + downstreamResponse := httptest.NewRecorder() + downstreamResponse.WriteHeader(http.StatusOK) + roundTripper.Response = downstreamResponse.Result() + + // Successful update + updater.Result = nil + + response, err := p.Run(ctx, w, req.WithContext(ctx)) + require.NoError(t, err) + require.Nil(t, response) + }) + + t.Run("success (fallback to async)", func(t *testing.T) { + p, storageClient, updater, roundTripper, statusManager := createController(t) + + svcContext := &v1.ARMRequestContext{ + ResourceID: id, + } + ctx := testcontext.New(t) + ctx = v1.WithARMRequestContext(ctx, svcContext) + + w := httptest.NewRecorder() + + // Mutating request that will complete synchronously + req := httptest.NewRequest(http.MethodDelete, id.String(), nil) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+id.PlaneNamespace(), gomock.Any()). + Return(&store.Object{Data: plane}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), id.RootScope(), gomock.Any()). + Return(&store.Object{Data: resourceGroup}, nil).Times(1) + + // Tracking entry created + storageClient.EXPECT(). + Get(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, &store.ErrNotFound{}).Times(1) + storageClient.EXPECT(). + Save(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil).Times(1) + + downstreamResponse := httptest.NewRecorder() + downstreamResponse.WriteHeader(http.StatusOK) + roundTripper.Response = downstreamResponse.Result() + + // Contended update, fallback to async + updater.Result = &trackedresource.InProgressErr{} + + statusManager.EXPECT(). + QueueAsyncOperation(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil).Times(1) + + response, err := p.Run(ctx, w, req.WithContext(ctx)) + require.NoError(t, err) + require.Nil(t, response) + }) + + t.Run("success (fallback to async without workitem)", func(t *testing.T) { + p, storageClient, updater, roundTripper, _ := createController(t) + + svcContext := &v1.ARMRequestContext{ + ResourceID: id, + } + ctx := testcontext.New(t) + ctx = v1.WithARMRequestContext(ctx, svcContext) + + w := httptest.NewRecorder() + + // Mutating request that will complete asynchronously + req := httptest.NewRequest(http.MethodDelete, id.String(), nil) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+id.PlaneNamespace(), gomock.Any()). + Return(&store.Object{Data: plane}, nil).Times(1) + + storageClient.EXPECT(). + Get(gomock.Any(), id.RootScope(), gomock.Any()). + Return(&store.Object{Data: resourceGroup}, nil).Times(1) + + // Tracking entry created + existingEntry := &store.Object{ + Data: &datamodel.GenericResource{ + BaseResource: v1.BaseResource{ + InternalMetadata: v1.InternalMetadata{ + AsyncProvisioningState: v1.ProvisioningStateAccepted, + }, + }, + }, + } + storageClient.EXPECT(). + Get(gomock.Any(), gomock.Any(), gomock.Any()). + Return(existingEntry, nil).Times(1) + storageClient.EXPECT(). + Save(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil).Times(1) + + downstreamResponse := httptest.NewRecorder() + downstreamResponse.WriteHeader(http.StatusAccepted) + roundTripper.Response = downstreamResponse.Result() + + // Contended update, fallback to async + updater.Result = &trackedresource.InProgressErr{} + + // No work item created, it was already in the queue. + + response, err := p.Run(ctx, w, req.WithContext(ctx)) + require.NoError(t, err) + require.Nil(t, response) + }) + + t.Run("failure (validate downstream: not found)", func(t *testing.T) { + p, storageClient, _, _, _ := createController(t) + + svcContext := &v1.ARMRequestContext{ + ResourceID: id, + } + ctx := testcontext.New(t) + ctx = v1.WithARMRequestContext(ctx, svcContext) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPut, id.String(), nil) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+id.PlaneNamespace(), gomock.Any()). + Return(nil, &store.ErrNotFound{}).Times(1) + + expected := rest.NewNotFoundResponse(id) + + response, err := p.Run(ctx, w, req) + require.NoError(t, err) + require.Equal(t, expected, response) + }) + + t.Run("failure (validate downstream: invalid downstream)", func(t *testing.T) { + p, storageClient, _, _, _ := createController(t) + + svcContext := &v1.ARMRequestContext{ + ResourceID: id, + } + ctx := testcontext.New(t) + ctx = v1.WithARMRequestContext(ctx, svcContext) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPut, id.String(), nil) + + storageClient.EXPECT(). + Get(gomock.Any(), "/planes/"+id.PlaneNamespace(), gomock.Any()). + Return(&store.Object{Data: datamodel.Plane{}}, nil).Times(1) + + expected := rest.NewBadRequestARMResponse(v1.ErrorResponse{Error: v1.ErrorDetails{Code: v1.CodeInvalid, Message: "unexpected plane type ", Target: id.String()}}) + response, err := p.Run(ctx, w, req) + require.NoError(t, err) + require.Equal(t, expected, response) + }) +} + +func Test_ProxyController_PrepareProxyRequest(t *testing.T) { + downstream := "http://localhost:7443" + relativePath := "/planes/radius/local/resourceGroups/test-group/providers/System.TestRP" + t.Run("success (http)", func(t *testing.T) { + originalURL, err := url.Parse("http://localhost:9443/path/base/planes/radius/local/resourceGroups/test-group/providers/System.TestRP?test=yes") + require.NoError(t, err) + originalReq := &http.Request{ + Host: originalURL.Host, + Header: http.Header{"Copied": []string{"yes"}}, + TLS: nil, + URL: originalURL} + + p, _, _, _, _ := createController(t) + proxyReq, err := p.PrepareProxyRequest(testcontext.New(t), originalReq, downstream, relativePath) + require.NoError(t, err) + require.NotNil(t, proxyReq) + + require.Equal(t, "http://localhost:7443/planes/radius/local/resourceGroups/test-group/providers/System.TestRP?test=yes", proxyReq.URL.String()) + require.Equal(t, "http", proxyReq.Header.Get("X-Forwarded-Proto")) + require.Equal(t, "http://localhost:9443/path/base/planes/radius/local/resourceGroups/test-group/providers/System.TestRP?test=yes", proxyReq.Header.Get("Referer")) + require.Equal(t, "yes", proxyReq.Header.Get("Copied")) + }) + + t.Run("success (http)", func(t *testing.T) { + originalURL, err := url.Parse("http://localhost:9443/path/base/planes/radius/local/resourceGroups/test-group/providers/System.TestRP?test=yes") + require.NoError(t, err) + originalReq := &http.Request{ + Host: originalURL.Host, + Header: http.Header{"Copied": []string{"yes"}}, + TLS: &tls.ConnectionState{}, + URL: originalURL} + + p, _, _, _, _ := createController(t) + proxyReq, err := p.PrepareProxyRequest(testcontext.New(t), originalReq, downstream, relativePath) + require.NoError(t, err) + require.NotNil(t, proxyReq) + + require.Equal(t, "http://localhost:7443/planes/radius/local/resourceGroups/test-group/providers/System.TestRP?test=yes", proxyReq.URL.String()) + require.Equal(t, "https", proxyReq.Header.Get("X-Forwarded-Proto")) + require.Equal(t, "https://localhost:9443/path/base/planes/radius/local/resourceGroups/test-group/providers/System.TestRP?test=yes", proxyReq.Header.Get("Referer")) + require.Equal(t, "yes", proxyReq.Header.Get("Copied")) + }) + + t.Run("invalid downstream URL", func(t *testing.T) { + originalReq := &http.Request{Header: http.Header{}, URL: &url.URL{}} + + p, _, _, _, _ := createController(t) + proxyReq, err := p.PrepareProxyRequest(testcontext.New(t), originalReq, "\ninvalid", relativePath) + require.Error(t, err) + require.Equal(t, "failed to parse downstream URL: parse \"\\ninvalid\": net/url: invalid control character in URL", err.Error()) + require.Nil(t, proxyReq) + }) +} + +type mockUpdater struct { + Result error +} + +func (u *mockUpdater) Update(ctx context.Context, downstreamURL string, originalID resources.ID, version string) error { + return u.Result +} + +type mockRoundTripper struct { + Response *http.Response + Err error +} + +func (rt *mockRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + if rt.Response != nil { + rt.Response.Request = r + } + return rt.Response, rt.Err +} diff --git a/pkg/ucp/frontend/controller/resourcegroups/util.go b/pkg/ucp/frontend/controller/resourcegroups/util.go new file mode 100644 index 0000000000..185500b8d9 --- /dev/null +++ b/pkg/ucp/frontend/controller/resourcegroups/util.go @@ -0,0 +1,111 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package resourcegroups + +import ( + "context" + "errors" + "fmt" + "net/url" + + "github.com/radius-project/radius/pkg/ucp/datamodel" + "github.com/radius-project/radius/pkg/ucp/resources" + resources_radius "github.com/radius-project/radius/pkg/ucp/resources/radius" + "github.com/radius-project/radius/pkg/ucp/rest" + "github.com/radius-project/radius/pkg/ucp/store" +) + +// NotFoundError is returned when a resource group or plane is not found. +type NotFoundError struct { + Message string +} + +// Error returns the error message. +func (e *NotFoundError) Error() string { + return e.Message +} + +// Is returns true if the error is a NotFoundError. +func (e *NotFoundError) Is(err error) bool { + _, ok := err.(*NotFoundError) + return ok +} + +// InvalidError is returned when the data is invalid. +type InvalidError struct { + Message string +} + +// Error returns the error message. +func (e *InvalidError) Error() string { + return e.Message +} + +// Is returns true if the error is a InvalidError. +func (e *InvalidError) Is(err error) bool { + _, ok := err.(*InvalidError) + return ok +} + +// ValidateDownstream can be used to find and validate the downstream URL for a resource. +// Returns NotFoundError for the case where the plane or resource group does not exist. +// Returns InvalidError for cases where the data is invalid, like when the resource provider is not configured. +func ValidateDownstream(ctx context.Context, client store.StorageClient, id resources.ID) (*url.URL, error) { + planeID, err := resources.ParseScope(id.PlaneScope()) + if err != nil { + // Not expected to happen. + return nil, err + } + + plane, err := store.GetResource[datamodel.Plane](ctx, client, planeID.String()) + if errors.Is(err, &store.ErrNotFound{}) { + return nil, &NotFoundError{Message: fmt.Sprintf("plane %q not found", planeID.String())} + } else if err != nil { + return nil, fmt.Errorf("failed to find plane %q: %w", planeID.String(), err) + } + + if plane.Properties.Kind != rest.PlaneKindUCPNative { + return nil, &InvalidError{Message: fmt.Sprintf("unexpected plane type %s", plane.Properties.Kind)} + } + + // If the ID contains a resource group, validate it now. + if id.FindScope(resources_radius.ScopeResourceGroups) != "" { + resourceGroupID, err := resources.ParseScope(id.RootScope()) + if err != nil { + // Not expected to happen. + return nil, err + } + + _, err = store.GetResource[datamodel.ResourceGroup](ctx, client, resourceGroupID.String()) + if errors.Is(err, &store.ErrNotFound{}) { + return nil, &NotFoundError{Message: fmt.Sprintf("resource group %q not found", resourceGroupID.String())} + } else if err != nil { + return nil, fmt.Errorf("failed to find resource group %q: %w", resourceGroupID.String(), err) + } + } + + downstream := plane.LookupResourceProvider(id.ProviderNamespace()) + if downstream == "" { + return nil, &InvalidError{Message: fmt.Sprintf("resource provider %s not configured", id.ProviderNamespace())} + } + + downstreamURL, err := url.Parse(downstream) + if err != nil { + return nil, &InvalidError{Message: fmt.Sprintf("failed to parse downstream URL: %v", err.Error())} + } + + return downstreamURL, nil +} diff --git a/pkg/ucp/frontend/controller/resourcegroups/util_test.go b/pkg/ucp/frontend/controller/resourcegroups/util_test.go new file mode 100644 index 0000000000..ff751721ff --- /dev/null +++ b/pkg/ucp/frontend/controller/resourcegroups/util_test.go @@ -0,0 +1,204 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package resourcegroups + +import ( + "errors" + "fmt" + "net/url" + "testing" + + "github.com/golang/mock/gomock" + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/to" + "github.com/radius-project/radius/pkg/ucp/datamodel" + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/radius-project/radius/pkg/ucp/rest" + "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/test/testcontext" + "github.com/stretchr/testify/require" +) + +func Test_ValidateDownstream(t *testing.T) { + id, err := resources.ParseResource("/planes/radius/local/resourceGroups/test-group/providers/System.TestRP/testResources/name") + require.NoError(t, err) + + idWithoutResourceGroup, err := resources.Parse("/planes/radius/local/providers/System.TestRP/testResources") + require.NoError(t, err) + + downstream := "http://localhost:7443" + + plane := &datamodel.Plane{ + BaseResource: v1.BaseResource{ + TrackedResource: v1.TrackedResource{ + ID: id.PlaneScope(), + }, + }, + Properties: datamodel.PlaneProperties{ + Kind: rest.PlaneKindUCPNative, + ResourceProviders: map[string]*string{ + "System.TestRP": to.Ptr(downstream), + }, + }, + } + + setup := func(t *testing.T) *store.MockStorageClient { + ctrl := gomock.NewController(t) + return store.NewMockStorageClient(ctrl) + } + + t.Run("success (resource group)", func(t *testing.T) { + resourceGroup := &datamodel.ResourceGroup{ + BaseResource: v1.BaseResource{ + TrackedResource: v1.TrackedResource{ + ID: id.RootScope(), + }, + }, + } + + mock := setup(t) + mock.EXPECT().Get(gomock.Any(), id.PlaneScope()).Return(&store.Object{Data: plane}, nil).Times(1) + mock.EXPECT().Get(gomock.Any(), id.RootScope()).Return(&store.Object{Data: resourceGroup}, nil).Times(1) + + expectedURL, err := url.Parse(downstream) + require.NoError(t, err) + + downstreamURL, err := ValidateDownstream(testcontext.New(t), mock, id) + require.NoError(t, err) + require.Equal(t, expectedURL, downstreamURL) + }) + + t.Run("success (non resource group)", func(t *testing.T) { + mock := setup(t) + mock.EXPECT().Get(gomock.Any(), idWithoutResourceGroup.PlaneScope()).Return(&store.Object{Data: plane}, nil).Times(1) + + expectedURL, err := url.Parse(downstream) + require.NoError(t, err) + + downstreamURL, err := ValidateDownstream(testcontext.New(t), mock, idWithoutResourceGroup) + require.NoError(t, err) + require.Equal(t, expectedURL, downstreamURL) + }) + + t.Run("plane not found", func(t *testing.T) { + mock := setup(t) + mock.EXPECT().Get(gomock.Any(), id.PlaneScope()).Return(nil, &store.ErrNotFound{}).Times(1) + + downstreamURL, err := ValidateDownstream(testcontext.New(t), mock, id) + require.Error(t, err) + require.Equal(t, &NotFoundError{Message: "plane \"/planes/radius/local\" not found"}, err) + require.Nil(t, downstreamURL) + }) + + t.Run("plane retreival failure", func(t *testing.T) { + mock := setup(t) + + expected := fmt.Errorf("failed to find plane \"/planes/radius/local\": %w", errors.New("test error")) + mock.EXPECT().Get(gomock.Any(), id.PlaneScope()).Return(nil, errors.New("test error")).Times(1) + + downstreamURL, err := ValidateDownstream(testcontext.New(t), mock, id) + require.Error(t, err) + require.Equal(t, expected, err) + require.Nil(t, downstreamURL) + }) + + t.Run("resource group not found", func(t *testing.T) { + mock := setup(t) + mock.EXPECT().Get(gomock.Any(), id.PlaneScope()).Return(&store.Object{Data: plane}, nil).Times(1) + mock.EXPECT().Get(gomock.Any(), id.RootScope()).Return(nil, &store.ErrNotFound{}).Times(1) + + downstreamURL, err := ValidateDownstream(testcontext.New(t), mock, id) + require.Error(t, err) + require.Equal(t, &NotFoundError{Message: "resource group \"/planes/radius/local/resourceGroups/test-group\" not found"}, err) + require.Nil(t, downstreamURL) + }) + + t.Run("resource group err", func(t *testing.T) { + mock := setup(t) + + mock.EXPECT().Get(gomock.Any(), id.PlaneScope()).Return(&store.Object{Data: plane}, nil).Times(1) + mock.EXPECT().Get(gomock.Any(), id.RootScope()).Return(nil, errors.New("test error")).Times(1) + + downstreamURL, err := ValidateDownstream(testcontext.New(t), mock, id) + require.Error(t, err) + require.Equal(t, "failed to find resource group \"/planes/radius/local/resourceGroups/test-group\": test error", err.Error()) + require.Nil(t, downstreamURL) + }) + + t.Run("resource provider not found", func(t *testing.T) { + plane := &datamodel.Plane{ + BaseResource: v1.BaseResource{ + TrackedResource: v1.TrackedResource{ + ID: id.PlaneScope(), + }, + }, + Properties: datamodel.PlaneProperties{ + Kind: rest.PlaneKindUCPNative, + ResourceProviders: map[string]*string{}, + }, + } + + resourceGroup := &datamodel.ResourceGroup{ + BaseResource: v1.BaseResource{ + TrackedResource: v1.TrackedResource{ + ID: id.RootScope(), + }, + }, + } + + mock := setup(t) + mock.EXPECT().Get(gomock.Any(), id.PlaneScope()).Return(&store.Object{Data: plane}, nil).Times(1) + mock.EXPECT().Get(gomock.Any(), id.RootScope()).Return(&store.Object{Data: resourceGroup}, nil).Times(1) + + downstreamURL, err := ValidateDownstream(testcontext.New(t), mock, id) + require.Error(t, err) + require.Equal(t, &InvalidError{Message: "resource provider System.TestRP not configured"}, err) + require.Nil(t, downstreamURL) + }) + + t.Run("resource provider invalid URL", func(t *testing.T) { + plane := &datamodel.Plane{ + BaseResource: v1.BaseResource{ + TrackedResource: v1.TrackedResource{ + ID: id.PlaneScope(), + }, + }, + Properties: datamodel.PlaneProperties{ + Kind: rest.PlaneKindUCPNative, + ResourceProviders: map[string]*string{ + "System.TestRP": to.Ptr("\ninvalid"), + }, + }, + } + + resourceGroup := &datamodel.ResourceGroup{ + BaseResource: v1.BaseResource{ + TrackedResource: v1.TrackedResource{ + ID: id.RootScope(), + }, + }, + } + + mock := setup(t) + mock.EXPECT().Get(gomock.Any(), id.PlaneScope()).Return(&store.Object{Data: plane}, nil).Times(1) + mock.EXPECT().Get(gomock.Any(), id.RootScope()).Return(&store.Object{Data: resourceGroup}, nil).Times(1) + + downstreamURL, err := ValidateDownstream(testcontext.New(t), mock, id) + require.Error(t, err) + require.Equal(t, &InvalidError{Message: "failed to parse downstream URL: parse \"\\ninvalid\": net/url: invalid control character in URL"}, err) + require.Nil(t, downstreamURL) + }) +} diff --git a/pkg/ucp/frontend/modules/types.go b/pkg/ucp/frontend/modules/types.go index a8d4cd7d05..6b0a0d6e9a 100644 --- a/pkg/ucp/frontend/modules/types.go +++ b/pkg/ucp/frontend/modules/types.go @@ -20,6 +20,7 @@ import ( "context" "net/http" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" "github.com/radius-project/radius/pkg/sdk" "github.com/radius-project/radius/pkg/ucp/dataprovider" "github.com/radius-project/radius/pkg/ucp/hostoptions" @@ -71,6 +72,9 @@ type Options struct { // SpecLoader is the OpenAPI spec loader containing specs for the UCP APIs. SpecLoader *validator.Loader + // StatusManager is the async operation status manager. + StatusManager statusmanager.StatusManager + // UCPConnection is the connection used to communicate with UCP APIs. UCPConnection sdk.Connection } diff --git a/pkg/ucp/frontend/radius/routes.go b/pkg/ucp/frontend/radius/routes.go index b36810dff9..a6135f7c65 100644 --- a/pkg/ucp/frontend/radius/routes.go +++ b/pkg/ucp/frontend/radius/routes.go @@ -27,7 +27,7 @@ import ( "github.com/radius-project/radius/pkg/ucp/api/v20231001preview" "github.com/radius-project/radius/pkg/ucp/datamodel" "github.com/radius-project/radius/pkg/ucp/datamodel/converter" - planes_ctrl "github.com/radius-project/radius/pkg/ucp/frontend/controller/planes" + radius_ctrl "github.com/radius-project/radius/pkg/ucp/frontend/controller/radius" resourcegroups_ctrl "github.com/radius-project/radius/pkg/ucp/frontend/controller/resourcegroups" "github.com/radius-project/radius/pkg/validator" ) @@ -117,21 +117,22 @@ func (m *Module) Initialize(ctx context.Context) (http.Handler, error) { ParentRouter: resourceGroupResourceRouter, Path: server.CatchAllPath, OperationType: &v1.OperationType{Type: OperationTypeUCPRadiusProxy, Method: v1.OperationProxy}, - ControllerFactory: planes_ctrl.NewProxyController, + ControllerFactory: radius_ctrl.NewProxyController, }, { // Proxy request should use CatchAllPath(/*) to process all requests under /planes/radius/{planeName}/. ParentRouter: baseRouter, Path: server.CatchAllPath, OperationType: &v1.OperationType{Type: OperationTypeUCPRadiusProxy, Method: v1.OperationProxy}, - ControllerFactory: planes_ctrl.NewProxyController, + ControllerFactory: radius_ctrl.NewProxyController, }, } ctrlOptions := controller.Options{ - Address: m.options.Address, - PathBase: m.options.PathBase, - DataProvider: m.options.DataProvider, + Address: m.options.Address, + PathBase: m.options.PathBase, + DataProvider: m.options.DataProvider, + StatusManager: m.options.StatusManager, } for _, h := range handlerOptions { diff --git a/pkg/ucp/integrationtests/radius/proxy_test.go b/pkg/ucp/integrationtests/radius/proxy_test.go index 462d0dc538..47d2dbd0ef 100644 --- a/pkg/ucp/integrationtests/radius/proxy_test.go +++ b/pkg/ucp/integrationtests/radius/proxy_test.go @@ -42,6 +42,9 @@ const ( testResourceGroupID = testRadiusPlaneID + "/resourceGroups/test-rg" testResourceCollectionID = testResourceGroupID + "/providers/System.Test/testResources" testResourceID = testResourceCollectionID + "/test-resource" + + assertTimeout = time.Second * 10 + assertRetry = time.Second * 2 ) func Test_RadiusPlane_Proxy_ResourceGroupDoesNotExist(t *testing.T) { @@ -72,6 +75,12 @@ func Test_RadiusPlane_ResourceSync(t *testing.T) { message := "here is some test data" + expectedTrackedResource := v20231001preview.GenericResource{ + ID: to.Ptr(testResourceID), + Name: to.Ptr("test-resource"), + Type: to.Ptr("System.Test/testResources"), + } + t.Run("PUT", func(t *testing.T) { data := testrp.TestResource{ Properties: testrp.TestResourceProperties{ @@ -101,6 +110,17 @@ func Test_RadiusPlane_ResourceSync(t *testing.T) { require.Equal(t, message, *resources.Value[0].Properties.Message) }) + t.Run("List - Tracked Resources", func(t *testing.T) { + response := ucp.MakeRequest(http.MethodGet, testResourceGroupID+"/resources?api-version="+v20231001preview.Version, nil) + response.EqualsStatusCode(http.StatusOK) + + resources := &v20231001preview.GenericResourceListResult{} + err := json.Unmarshal(response.Body.Bytes(), resources) + require.NoError(t, err) + require.Len(t, resources.Value, 1) + require.Equal(t, expectedTrackedResource, *resources.Value[0]) + }) + t.Run("GET", func(t *testing.T) { response := ucp.MakeRequest(http.MethodGet, testResourceID+"?api-version="+testrp.Version, nil) response.EqualsStatusCode(http.StatusOK) @@ -121,6 +141,16 @@ func Test_RadiusPlane_ResourceSync(t *testing.T) { response.EqualsStatusCode(http.StatusNotFound) }) + t.Run("List - Tracked Resources (after delete)", func(t *testing.T) { + response := ucp.MakeRequest(http.MethodGet, testResourceGroupID+"/resources?api-version="+v20231001preview.Version, nil) + response.EqualsStatusCode(http.StatusOK) + + resources := &v20231001preview.GenericResourceListResult{} + err := json.Unmarshal(response.Body.Bytes(), resources) + require.NoError(t, err) + require.Empty(t, resources.Value) + }) + t.Run("DELETE (again)", func(t *testing.T) { response := ucp.MakeRequest(http.MethodDelete, testResourceID+"?api-version="+testrp.Version, nil) response.EqualsStatusCode(http.StatusNoContent) @@ -132,16 +162,19 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { rp := testrp.Start(t) // Block background work item completion until we're ready. - putCh := make(chan struct{}) - deleteCh := make(chan struct{}) + putCh := make(chan backend_ctrl.Result) + deleteCh := make(chan backend_ctrl.Result) onPut := func(ctx context.Context, request *backend_ctrl.Request) (backend_ctrl.Result, error) { t.Log("PUT operation is waiting for completion") - <-putCh - return backend_ctrl.Result{}, nil + result := <-putCh + return result, nil } onDelete := func(ctx context.Context, request *backend_ctrl.Request) (backend_ctrl.Result, error) { t.Log("DELETE operation is waiting for completion") - <-deleteCh + result := <-deleteCh + if result.Requeue || result.Error != nil { + return result, nil + } client, err := ucp.Clients.StorageProvider.GetStorageClient(ctx, "System.Test/testResources") require.NoError(t, err) @@ -162,7 +195,14 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { message := "here is some test data" + expectedTrackedResource := v20231001preview.GenericResource{ + ID: to.Ptr(testResourceID), + Name: to.Ptr("test-resource"), + Type: to.Ptr("System.Test/testResources"), + } + t.Run("PUT", func(t *testing.T) { + t.Log("starting PUT operation") data := testrp.TestResource{ Properties: testrp.TestResourceProperties{ Message: to.Ptr(message), @@ -178,7 +218,7 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { err = json.Unmarshal(response.Body.Bytes(), resource) require.NoError(t, err) require.Equal(t, message, *resource.Properties.Message) - require.Equal(t, string(v1.ProvisioningStateAccepted), *resource.Properties.ProvisioningState) + require.False(t, v1.ProvisioningState(*resource.Properties.ProvisioningState).IsTerminal()) location := response.Raw.Header.Get("Location") azureAsyncOperation := response.Raw.Header.Get("Azure-AsyncOperation") @@ -195,7 +235,18 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { require.NoError(t, err) require.Len(t, resources.Value, 1) require.Equal(t, message, *resources.Value[0].Properties.Message) - require.Equal(t, string(v1.ProvisioningStateAccepted), *resources.Value[0].Properties.ProvisioningState) + require.False(t, v1.ProvisioningState(*resources.Value[0].Properties.ProvisioningState).IsTerminal()) + }) + + t.Run("List - Tracked Resources (during PUT)", func(t *testing.T) { + response := ucp.MakeRequest(http.MethodGet, testResourceGroupID+"/resources?api-version="+v20231001preview.Version, nil) + response.EqualsStatusCode(http.StatusOK) + + resources := &v20231001preview.GenericResourceListResult{} + err := json.Unmarshal(response.Body.Bytes(), resources) + require.NoError(t, err) + require.Len(t, resources.Value, 1) + require.Equal(t, expectedTrackedResource, *resources.Value[0]) }) t.Run("GET (during PUT)", func(t *testing.T) { @@ -210,7 +261,8 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { }) t.Run("Complete PUT", func(t *testing.T) { - putCh <- struct{}{} + t.Log("completing PUT operation") + putCh <- backend_ctrl.Result{} require.EventuallyWithT(t, func(collect *assert.CollectT) { response := ucp.MakeRequest(http.MethodGet, testResourceID+"?api-version="+testrp.Version, nil) assert.Equal(collect, http.StatusOK, response.Raw.StatusCode) @@ -219,10 +271,46 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { err := json.Unmarshal(response.Body.Bytes(), resource) assert.NoError(collect, err) assert.Equal(collect, string(v1.ProvisioningStateSucceeded), *resource.Properties.ProvisioningState) - }, time.Second*5, time.Millisecond*100) + }, assertTimeout, assertRetry) + }) + + t.Run("DELETE FAILURE", func(t *testing.T) { + t.Log("starting DELETE FAILURE operation") + response := ucp.MakeRequest(http.MethodDelete, testResourceID+"?api-version="+testrp.Version, nil) + response.EqualsStatusCode(http.StatusAccepted) + }) + + t.Run("Complete DELETE FAILURE", func(t *testing.T) { + t.Log("completing DELETE FAILURE operation") + deleteCh <- backend_ctrl.NewFailedResult(v1.ErrorDetails{ + Code: v1.CodeInternal, + Message: "Oh no!", + }) + require.EventuallyWithT(t, func(collect *assert.CollectT) { + response := ucp.MakeRequest(http.MethodGet, testResourceID+"?api-version="+testrp.Version, nil) + assert.Equal(collect, http.StatusOK, response.Raw.StatusCode) + + resource := &testrp.TestResource{} + err := json.Unmarshal(response.Body.Bytes(), resource) + assert.NoError(collect, err) + assert.Equal(collect, string(v1.ProvisioningStateFailed), *resource.Properties.ProvisioningState) + t.Logf("Resource provisioning state: %s", *resource.Properties.ProvisioningState) + }, assertTimeout, assertRetry) + }) + + t.Run("List - Tracked Resources (after failed delete)", func(t *testing.T) { + response := ucp.MakeRequest(http.MethodGet, testResourceGroupID+"/resources?api-version="+v20231001preview.Version, nil) + response.EqualsStatusCode(http.StatusOK) + + resources := &v20231001preview.GenericResourceListResult{} + err := json.Unmarshal(response.Body.Bytes(), resources) + require.NoError(t, err) + require.Len(t, resources.Value, 1) + require.Equal(t, expectedTrackedResource, *resources.Value[0]) }) t.Run("DELETE", func(t *testing.T) { + t.Log("starting DELETE operation") response := ucp.MakeRequest(http.MethodDelete, testResourceID+"?api-version="+testrp.Version, nil) response.EqualsStatusCode(http.StatusAccepted) }) @@ -236,7 +324,18 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { require.NoError(t, err) require.Len(t, resources.Value, 1) require.Equal(t, message, *resources.Value[0].Properties.Message) - require.Equal(t, string(v1.ProvisioningStateAccepted), *resources.Value[0].Properties.ProvisioningState) + require.False(t, v1.ProvisioningState(*resources.Value[0].Properties.ProvisioningState).IsTerminal()) + }) + + t.Run("List - Tracked Resources (during delete)", func(t *testing.T) { + response := ucp.MakeRequest(http.MethodGet, testResourceGroupID+"/resources?api-version="+v20231001preview.Version, nil) + response.EqualsStatusCode(http.StatusOK) + + resources := &v20231001preview.GenericResourceListResult{} + err := json.Unmarshal(response.Body.Bytes(), resources) + require.NoError(t, err) + require.Len(t, resources.Value, 1) + require.Equal(t, expectedTrackedResource, *resources.Value[0]) }) t.Run("GET (during delete)", func(t *testing.T) { @@ -247,15 +346,16 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { err := json.Unmarshal(response.Body.Bytes(), resource) require.NoError(t, err) require.Equal(t, message, *resource.Properties.Message) - require.Equal(t, string(v1.ProvisioningStateAccepted), *resource.Properties.ProvisioningState) + require.False(t, v1.ProvisioningState(*resource.Properties.ProvisioningState).IsTerminal()) }) t.Run("Complete DELETE", func(t *testing.T) { - deleteCh <- struct{}{} + t.Log("completing DELETE operation") + deleteCh <- backend_ctrl.Result{} require.EventuallyWithT(t, func(collect *assert.CollectT) { response := ucp.MakeRequest(http.MethodGet, testResourceID+"?api-version="+testrp.Version, nil) assert.Equal(collect, http.StatusNotFound, response.Raw.StatusCode) - }, time.Second*5, time.Millisecond*100) + }, assertTimeout, assertRetry) }) t.Run("GET (after delete)", func(t *testing.T) { @@ -263,6 +363,19 @@ func Test_RadiusPlane_ResourceAsync(t *testing.T) { response.EqualsStatusCode(http.StatusNotFound) }) + t.Run("List - Tracked Resources (after delete)", func(t *testing.T) { + // This is eventually consistent. + require.EventuallyWithT(t, func(collect *assert.CollectT) { + response := ucp.MakeRequest(http.MethodGet, testResourceGroupID+"/resources?api-version="+v20231001preview.Version, nil) + assert.Equal(collect, http.StatusOK, response.Raw.StatusCode) + + resources := &v20231001preview.GenericResourceListResult{} + err := json.Unmarshal(response.Body.Bytes(), resources) + assert.NoError(collect, err) + assert.Empty(collect, resources.Value) + }, assertTimeout, assertRetry) + }) + t.Run("DELETE (again)", func(t *testing.T) { response := ucp.MakeRequest(http.MethodDelete, testResourceID+"?api-version="+testrp.Version, nil) response.EqualsStatusCode(http.StatusNoContent) diff --git a/pkg/ucp/integrationtests/testrp/async.go b/pkg/ucp/integrationtests/testrp/async.go index 4723b359e6..144b598c8c 100644 --- a/pkg/ucp/integrationtests/testrp/async.go +++ b/pkg/ucp/integrationtests/testrp/async.go @@ -33,6 +33,7 @@ import ( "github.com/radius-project/radius/pkg/armrpc/servicecontext" "github.com/radius-project/radius/pkg/middleware" "github.com/radius-project/radius/pkg/ucp/integrationtests/testserver" + queueprovider "github.com/radius-project/radius/pkg/ucp/queue/provider" "github.com/radius-project/radius/test/testcontext" "github.com/stretchr/testify/require" ) @@ -58,7 +59,18 @@ func AsyncResource(t *testing.T, ts *testserver.TestServer, rootScope string, pu resourceType := "System.Test/testResources" - queueClient, err := ts.Clients.QueueProvider.GetClient(ctx) + // We can share the storage provider with the test server. + _, err := ts.Clients.StorageProvider.GetStorageClient(ctx, "System.Test/operationStatuses") + require.NoError(t, err) + + // Do not share the queue. + queueOptions := queueprovider.QueueProviderOptions{ + Provider: queueprovider.TypeInmemory, + InMemory: &queueprovider.InMemoryQueueOptions{}, + Name: "System.Test", + } + queueProvider := queueprovider.New(queueOptions) + queueClient, err := queueProvider.GetClient(ctx) require.NoError(t, err) statusManager := statusmanager.New(ts.Clients.StorageProvider, queueClient, v1.LocationGlobal) diff --git a/pkg/ucp/integrationtests/testserver/testserver.go b/pkg/ucp/integrationtests/testserver/testserver.go index fa38494e83..cbd0a9a8f7 100644 --- a/pkg/ucp/integrationtests/testserver/testserver.go +++ b/pkg/ucp/integrationtests/testserver/testserver.go @@ -37,9 +37,13 @@ import ( etcdclient "go.etcd.io/etcd/client/v3" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + backend_ctrl "github.com/radius-project/radius/pkg/armrpc/asyncoperation/controller" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/statusmanager" + "github.com/radius-project/radius/pkg/armrpc/asyncoperation/worker" "github.com/radius-project/radius/pkg/armrpc/rpctest" "github.com/radius-project/radius/pkg/armrpc/servicecontext" "github.com/radius-project/radius/pkg/middleware" + "github.com/radius-project/radius/pkg/ucp/backend" "github.com/radius-project/radius/pkg/ucp/data" "github.com/radius-project/radius/pkg/ucp/dataprovider" "github.com/radius-project/radius/pkg/ucp/frontend/api" @@ -50,6 +54,7 @@ import ( queueprovider "github.com/radius-project/radius/pkg/ucp/queue/provider" "github.com/radius-project/radius/pkg/ucp/secret" secretprovider "github.com/radius-project/radius/pkg/ucp/secret/provider" + "github.com/radius-project/radius/pkg/ucp/server" "github.com/radius-project/radius/pkg/ucp/store" "github.com/radius-project/radius/pkg/validator" "github.com/radius-project/radius/swagger" @@ -156,6 +161,8 @@ func StartWithMocks(t *testing.T, configureModules func(options modules.Options) secretProvider := secretprovider.NewSecretProvider(secretprovider.SecretProviderOptions{}) secretProvider.SetClient(secretClient) + statusManager := statusmanager.NewMockStatusManager(ctrl) + router := chi.NewRouter() router.Use(servicecontext.ARMRequestCtx(pathBase, "global")) @@ -176,6 +183,7 @@ func StartWithMocks(t *testing.T, configureModules func(options modules.Options) DataProvider: dataProvider, SecretProvider: secretProvider, SpecLoader: specLoader, + StatusManager: statusManager, } if configureModules == nil { @@ -222,6 +230,7 @@ func StartWithETCD(t *testing.T, configureModules func(options modules.Options) }) ctx, cancel := testcontext.NewWithCancel(t) + t.Cleanup(cancel) stoppedChan := make(chan struct{}) defer close(stoppedChan) @@ -235,7 +244,9 @@ func StartWithETCD(t *testing.T, configureModules func(options modules.Options) // and you'll be able to see the spam from etcd. // // This is caught by the race checker and will fail your pr if you do it. - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + err := etcd.Run(ctx) if err != nil { t.Logf("error from etcd: %v", err) @@ -254,7 +265,7 @@ func StartWithETCD(t *testing.T, configureModules func(options modules.Options) ETCD: storageOptions.ETCD, } queueOptions := queueprovider.QueueProviderOptions{ - Name: "System.Resources", + Name: server.UCPProviderName, Provider: queueprovider.TypeInmemory, InMemory: &queueprovider.InMemoryQueueOptions{}, } @@ -265,6 +276,21 @@ func StartWithETCD(t *testing.T, configureModules func(options modules.Options) secretProvider := secretprovider.NewSecretProvider(secretOptions) queueProvider := queueprovider.New(queueOptions) + queueClient, err := queueProvider.GetClient(ctx) + require.NoError(t, err) + + statusManager := statusmanager.New(dataProvider, queueClient, v1.LocationGlobal) + + registry := worker.NewControllerRegistry(dataProvider) + err = backend.RegisterControllers(ctx, registry, backend_ctrl.Options{DataProvider: dataProvider}) + require.NoError(t, err) + + w := worker.New(worker.Options{}, statusManager, queueClient, registry) + go func() { + err = w.Start(ctx) + require.NoError(t, err) + }() + router := chi.NewRouter() router.Use(servicecontext.ARMRequestCtx(pathBase, "global")) @@ -285,6 +311,7 @@ func StartWithETCD(t *testing.T, configureModules func(options modules.Options) SecretProvider: secretProvider, SpecLoader: specLoader, QueueProvider: queueProvider, + StatusManager: statusManager, } if configureModules == nil { diff --git a/pkg/ucp/server/server.go b/pkg/ucp/server/server.go index 0896121804..a24af00caf 100644 --- a/pkg/ucp/server/server.go +++ b/pkg/ucp/server/server.go @@ -23,7 +23,7 @@ import ( "strings" "time" - hostOpts "github.com/radius-project/radius/pkg/armrpc/hostoptions" + hostopts "github.com/radius-project/radius/pkg/armrpc/hostoptions" "github.com/radius-project/radius/pkg/kubeutil" metricsprovider "github.com/radius-project/radius/pkg/metrics/provider" metricsservice "github.com/radius-project/radius/pkg/metrics/service" @@ -69,7 +69,7 @@ type Options struct { Location string } -const UCPProviderName = "ucp" +const UCPProviderName = "System.Resources" // NewServerOptionsFromEnvironment creates a new Options struct from environment variables and returns it along with any errors. func NewServerOptionsFromEnvironment() (Options, error) { @@ -187,8 +187,12 @@ func NewServer(options *Options) (*hosting.Host, error) { hostingServices = append(hostingServices, profilerservice.NewService(profilerOptions)) } - backendServiceOptions := hostOpts.HostOptions{ - Config: &hostOpts.ProviderConfig{ + backendServiceOptions := hostopts.HostOptions{ + + Config: &hostopts.ProviderConfig{ + Env: hostopts.EnvironmentOptions{ + RoleLocation: options.Config.Location, + }, StorageProvider: options.StorageProviderOptions, SecretProvider: options.SecretProviderOptions, QueueProvider: options.QueueProviderOptions, diff --git a/pkg/ucp/trackedresource/doc.go b/pkg/ucp/trackedresource/doc.go new file mode 100644 index 0000000000..2b93fc134b --- /dev/null +++ b/pkg/ucp/trackedresource/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// package trackedresource provides utility functionality for working with tracked resources. +// This functionality is shared between frontend and backend controllers for tracked resources. +package trackedresource diff --git a/pkg/ucp/trackedresource/name.go b/pkg/ucp/trackedresource/name.go new file mode 100644 index 0000000000..4e94796a7b --- /dev/null +++ b/pkg/ucp/trackedresource/name.go @@ -0,0 +1,85 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package trackedresource + +import ( + "crypto/sha1" + "fmt" + "strings" + + "github.com/radius-project/radius/pkg/ucp/api/v20231001preview" + "github.com/radius-project/radius/pkg/ucp/resources" +) + +// NameFor computes the resource name of a tracked resource from its ID. +// +// This can be used to compute the name of a tracked resource based on the resource that it is tracking. +// +// Names are computed by taking the name of the resource being tracked and appending a suffix to it based +// on the hash of the resource ID. This ensures that the name is unique and deterministic. +func NameFor(id resources.ID) string { + if id.IsEmpty() { + return "" + } + + // We need to generate a valid ARM/UCP name. The original resource name is used as a prefix for readability + // followed by the hash of the resource ID. + // + // example: my-resource-ec291e26078b7ea8a74abfac82530005a0ecbf15 + // + // We want this to fit in 63 characters so we allow a prefix of 22 characters a separator and a hash of 40 characters. + const prefixLength = 22 + + prefix := strings.ToLower(id.Name()) + if len(prefix) > prefixLength { + prefix = prefix[:prefixLength] + } + + hasher := sha1.New() + + // It's OK to ignore the error here, it's part of the API because io.Writer is being used, but the implementation + // does not return errors. + _, err := hasher.Write([]byte(strings.ToLower(id.String()))) + if err != nil { + panic("unexpected error writing to hash: " + err.Error()) + } + + hash := hasher.Sum(nil) + + return fmt.Sprintf("%s-%x", prefix, hash) +} + +// IDFor computes the resource ID of a tracked resource entry from the original resource ID. +func IDFor(id resources.ID) resources.ID { + if id.IsEmpty() { + return resources.ID{} + } + + // Tracking ID is the ID of the entry that will store the data. + // + // Example: + // id: /planes/radius/local/resourceGroups/test-group/providers/Applications.Core/applications/test-app + // trackingID: /planes/radius/local/resourceGroups/test-group/providers/System.Resources/genericResources/test-app-ec291e26078b7ea8a74abfac82530005a0ecbf15 + return resources.MustParse(resources.MakeUCPID( + id.ScopeSegments(), + []resources.TypeSegment{ + { + Type: v20231001preview.ResourceType, + Name: NameFor(id), + }, + }, nil)) +} diff --git a/pkg/ucp/trackedresource/name_test.go b/pkg/ucp/trackedresource/name_test.go new file mode 100644 index 0000000000..f23263253f --- /dev/null +++ b/pkg/ucp/trackedresource/name_test.go @@ -0,0 +1,38 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package trackedresource + +import ( + "testing" + + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/stretchr/testify/require" +) + +var ( + testID = resources.MustParse("/planes/radius/local/resourceGroups/test-group/providers/Applications.Core/applications/test-app") +) + +func Test_NameFor(t *testing.T) { + name := NameFor(testID) + require.Equal(t, "test-app-303153687ee5adbcf353bc6c2caa4373f31e04c6", name) +} + +func Test_IDFor(t *testing.T) { + id := IDFor(testID) + require.Equal(t, resources.MustParse("/planes/radius/local/resourceGroups/test-group/providers/System.Resources/resources/test-app-303153687ee5adbcf353bc6c2caa4373f31e04c6"), id) +} diff --git a/pkg/ucp/trackedresource/update.go b/pkg/ucp/trackedresource/update.go new file mode 100644 index 0000000000..49f9de3c8c --- /dev/null +++ b/pkg/ucp/trackedresource/update.go @@ -0,0 +1,294 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package trackedresource + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "mime" + "net/http" + "net/url" + "strings" + "time" + + "github.com/go-logr/logr" + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/ucp/datamodel" + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/pkg/ucp/ucplog" +) + +const ( + retryCount = 10 + retryDelay = time.Second * 3 + requestTimeout = time.Second * 10 +) + +// NewUpdater creates a new Updater. +func NewUpdater(storeClient store.StorageClient, httpClient *http.Client) *Updater { + return &Updater{ + Store: storeClient, + Client: httpClient, + AttemptCount: retryCount, + RetryDelay: retryDelay, + RequestTimeout: requestTimeout, + } +} + +// Updater is a utility struct that can perform updates on tracked resources. +type Updater struct { + // Store is the storage client used to access the database. + Store store.StorageClient + + // Client is the HTTP client used to make requests to the downstream API. + Client *http.Client + + // AttemptCount is the number of times to attempt a request and database update. + AttemptCount int + + // RetryDelay is the delay between retries. + RetryDelay time.Duration + + // RequestTimeout is the timeout used for requests to the downstream API. + RequestTimeout time.Duration +} + +// InProgressErr signifies that the resource is currently in a non-terminal state. +type InProgressErr struct { +} + +// Error returns the error message. +func (e *InProgressErr) Error() string { + return "resource is still being provisioned" +} + +// Is returns true if the other error is an InProgressErr. +func (e *InProgressErr) Is(other error) bool { + _, ok := other.(*InProgressErr) + return ok +} + +// trackedResourceState holds the state of a tracked resource as reported by the downstream API. +// This only defines the fields we use, so many fields returned by the API are omitted. +type trackedResourceState struct { + ID string `json:"id"` + Name string `json:"name"` + Type string `json:"type"` + Properties trackedResourceStateProperties `json:"properties,omitempty"` +} + +type trackedResourceStateProperties struct { + ProvisioningState *v1.ProvisioningState `json:"provisioningState,omitempty"` +} + +// Update updates a tracked resource. +// +// This function return attempt to update the state using optimistic concurrency and will retry on the following +// conditions: +// +// - Downstream failure or timeout +// - Database failure +// - Optimistic concurrency failure +// - Resource is still being provisioned (provisioning state is non-terminal) +func (u *Updater) Update(ctx context.Context, downstream string, id resources.ID, apiVersion string) error { + logger := ucplog.FromContextOrDiscard(ctx) + destination, err := url.Parse(downstream) + if err != nil { + return err + } + + destination = destination.JoinPath(id.String()) + + query := destination.Query() + query.Set("api-version", apiVersion) + destination.RawQuery = query.Encode() + + // Tracking ID is the ID of the TrackedResourceEntry that will store the data. + // + // Example: + // id: /planes/radius/local/resourceGroups/test-group/providers/Applications.Core/applications/test-app + // trackingID: /planes/radius/local/resourceGroups/test-group/providers/System.Resources/trackingResourceEntries/test-app-ec291e26078b7ea8a74abfac82530005a0ecbf15 + trackingID := IDFor(id) + + logger = logger.WithValues("id", id, "trackingID", trackingID, "destination", destination.String()) + logger.V(ucplog.LevelDebug).Info("updating tracked resource") + for attempt := 1; attempt <= u.AttemptCount; attempt++ { + logger.WithValues("attempt", attempt) + ctx := logr.NewContext(ctx, logger) + logger.V(ucplog.LevelDebug).Info("beginning attempt") + + err := u.run(ctx, id, trackingID, destination, apiVersion) + if errors.Is(err, &InProgressErr{}) && attempt == u.AttemptCount { + // Preserve the InprogressErr for the last attempt. + return err + } else if err != nil { + logger.Error(err, "attempt failed", "delay", u.RetryDelay) + time.Sleep(u.RetryDelay) + continue + } + + logger.V(ucplog.LevelDebug).Info("tracked resource processing completed successfully") + return nil + } + + return fmt.Errorf("failed to update tracked resource after %d attempts", u.AttemptCount) +} + +func (u *Updater) run(ctx context.Context, id resources.ID, trackingID resources.ID, destination *url.URL, apiVersion string) error { + logger := ucplog.FromContextOrDiscard(ctx) + obj, err := u.Store.Get(ctx, trackingID.String()) + if errors.Is(err, &store.ErrNotFound{}) { + // This is fine. It might be a new resource. + } else if err != nil { + return err + } + + etag := "" + entry := datamodel.GenericResourceFromID(id, trackingID) + entry.Properties.APIVersion = apiVersion + if obj != nil { + etag = obj.ETag + err := obj.As(&entry) + if err != nil { + return err + } + } + + data, err := u.fetch(ctx, destination) + if err != nil { + return err + } + + if data == nil { + // Resource was not found. We can delete the tracked resource entry. + logger.V(ucplog.LevelDebug).Info("deleting tracked resource entry") + err = u.Store.Delete(ctx, trackingID.String(), store.WithETag(etag)) + if errors.Is(err, &store.ErrNotFound{}) { + return nil + } else if err != nil { + return err + } + + return nil + } else if data.Properties.ProvisioningState != nil && !data.Properties.ProvisioningState.IsTerminal() { + // Resource is still being provisioned. We should not update anything yet. + logger.V(ucplog.LevelDebug).Info("resource is still being provisioned") + return &InProgressErr{} + } + + // If we get here we're ready to save the changes for a create/update. + // + // Mark the resource as provisioned. This will will "reset" the lock on the resource. + entry.AsyncProvisioningState = v1.ProvisioningStateSucceeded + if data.Properties.ProvisioningState != nil { + entry.AsyncProvisioningState = *data.Properties.ProvisioningState + } + + obj = &store.Object{ + Metadata: store.Metadata{ + ID: trackingID.String(), + }, + Data: entry, + } + logger.V(ucplog.LevelDebug).Info("updating tracked resource entry") + err = u.Store.Save(ctx, obj, store.WithETag(etag)) + if errors.Is(err, &store.ErrConcurrency{}) { + logger.V(ucplog.LevelDebug).Info("tracked resource was updated concurrently") + return &InProgressErr{} + } else if err != nil { + return err + } + + return nil +} + +func (u *Updater) fetch(ctx context.Context, destination *url.URL) (*trackedResourceState, error) { + logger := ucplog.FromContextOrDiscard(ctx) + + ctx, cancel := context.WithTimeout(ctx, requestTimeout) + defer cancel() + + logger.V(ucplog.LevelDebug).Info("fetching resource") + request, err := http.NewRequestWithContext(ctx, http.MethodGet, destination.String(), nil) + if err != nil { + return nil, err + } + response, err := u.Client.Do(request) + if err != nil { + return nil, err + } + logger.V(ucplog.LevelDebug).Info("resource fetched", "status", response.StatusCode) + + defer response.Body.Close() + if !u.isJSONResponse(response) { + return nil, fmt.Errorf("response is not JSON. Content-Type: %q", response.Header.Get("Content-Type")) + } + + if response.StatusCode == 404 { + return nil, nil + } + + if response.StatusCode >= 400 { + return nil, u.reportRequestFailure(response) + } + + data := &trackedResourceState{} + decoder := json.NewDecoder(response.Body) + err = decoder.Decode(data) + if err != nil { + return nil, err + } + + return data, nil +} + +func (u *Updater) isJSONResponse(response *http.Response) bool { + contentType, _, err := mime.ParseMediaType(response.Header.Get("Content-Type")) + if err != nil { + return false + } + + if contentType == "application/json" { + return true + } else if contentType == "text/json" { + return true + } else if strings.HasSuffix(contentType, "+json") { + return true + } + + return false +} + +func (u *Updater) reportRequestFailure(response *http.Response) error { + data := v1.ErrorResponse{} + + decoder := json.NewDecoder(response.Body) + err := decoder.Decode(&data) + if err != nil { + return err + } + + body, err := json.MarshalIndent(data, "", " ") + if err != nil { + return err + } + + return fmt.Errorf("request failed with status code %s:\n%s", response.Status, body) +} diff --git a/pkg/ucp/trackedresource/update_test.go b/pkg/ucp/trackedresource/update_test.go new file mode 100644 index 0000000000..bb252688a5 --- /dev/null +++ b/pkg/ucp/trackedresource/update_test.go @@ -0,0 +1,442 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package trackedresource + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + + "github.com/golang/mock/gomock" + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + "github.com/radius-project/radius/pkg/to" + "github.com/radius-project/radius/pkg/ucp/datamodel" + "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/test/testcontext" + "github.com/stretchr/testify/require" +) + +var ( + testURL = func() *url.URL { + u, err := url.Parse("http://example.com/some-url") + if err != nil { + panic(err) + } + return u + }() +) + +func setupUpdater(t *testing.T) (*Updater, *store.MockStorageClient, *mockRoundTripper) { + ctrl := gomock.NewController(t) + + storeClient := store.NewMockStorageClient(ctrl) + roundTripper := &mockRoundTripper{} + updater := NewUpdater(storeClient, &http.Client{Transport: roundTripper}) + + // Optimize these values for testability. We don't want to wait for retries or timeouts unless + // the test is specifically testing that behavior. + updater.RetryDelay = time.Millisecond * 100 + updater.AttemptCount = 1 + updater.RequestTimeout = time.Microsecond * 100 + + return updater, storeClient, roundTripper +} + +func Test_Update(t *testing.T) { + t.Run("successful update", func(t *testing.T) { + updater, storeClient, roundTripper := setupUpdater(t) + + apiVersion := "1234" + resource := map[string]any{ + "id": testID.String(), + "name": testID.Name(), + "type": testID.Type(), + "properties": map[string]any{}, + } + + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(nil, &store.ErrNotFound{}). + Times(1) + + // Mock a successful (terminal) response from the downstream API. + roundTripper.RespondWithJSON(t, http.StatusOK, resource) + + storeClient.EXPECT(). + Save(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, obj *store.Object, options ...store.SaveOptions) error { + require.Equal(t, IDFor(testID).String(), obj.ID) + + dm := obj.Data.(*datamodel.GenericResource) + require.Equal(t, IDFor(testID).String(), dm.ID) + require.Equal(t, testID.String(), dm.Properties.ID) + require.Equal(t, apiVersion, dm.Properties.APIVersion) + return nil + }). + Times(1) + + err := updater.Update(testcontext.New(t), testURL.String(), testID, apiVersion) + require.NoError(t, err) + }) + + t.Run("retry then success", func(t *testing.T) { + updater, storeClient, roundTripper := setupUpdater(t) + updater.AttemptCount = 2 + + apiVersion := "1234" + resource := map[string]any{ + "id": testID.String(), + "name": testID.Name(), + "type": testID.Type(), + "properties": map[string]any{}, + } + + // Fail once, then succeed. + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(nil, errors.New("this will be retried")). + Times(1) + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(nil, &store.ErrNotFound{}). + Times(1) + + // Mock a successful (terminal) response from the downstream API. + roundTripper.RespondWithJSON(t, http.StatusOK, resource) + + storeClient.EXPECT(). + Save(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, obj *store.Object, options ...store.SaveOptions) error { + require.Equal(t, IDFor(testID).String(), obj.ID) + + dm := obj.Data.(*datamodel.GenericResource) + require.Equal(t, IDFor(testID).String(), dm.ID) + require.Equal(t, testID.String(), dm.Properties.ID) + require.Equal(t, apiVersion, dm.Properties.APIVersion) + return nil + }). + Times(1) + + err := updater.Update(testcontext.New(t), testURL.String(), testID, apiVersion) + require.NoError(t, err) + }) + + t.Run("resource still provisioning", func(t *testing.T) { + updater, storeClient, roundTripper := setupUpdater(t) + + apiVersion := "1234" + resource := map[string]any{ + "id": testID.String(), + "name": testID.Name(), + "type": testID.Type(), + "properties": map[string]any{ + "provisioningState": v1.ProvisioningStateAccepted, + }, + } + + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(nil, &store.ErrNotFound{}). + Times(1) + + // Mock a successful (non-terminal) response from the downstream API. + roundTripper.RespondWithJSON(t, http.StatusOK, resource) + + err := updater.Update(testcontext.New(t), testURL.String(), testID, apiVersion) + require.Error(t, err) + require.ErrorIs(t, err, &InProgressErr{}) + }) + + t.Run("tracked resource updated concurrently", func(t *testing.T) { + updater, storeClient, roundTripper := setupUpdater(t) + + apiVersion := "1234" + resource := map[string]any{ + "id": testID.String(), + "name": testID.Name(), + "type": testID.Type(), + } + + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(nil, &store.ErrNotFound{}). + Times(1) + + // Fail the "Save" operation due to a concurrent update. + storeClient.EXPECT(). + Save(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&store.ErrConcurrency{}). + Times(1) + + // Mock a successful (non-terminal) response from the downstream API. + roundTripper.RespondWithJSON(t, http.StatusOK, resource) + + err := updater.Update(testcontext.New(t), testURL.String(), testID, apiVersion) + require.Error(t, err) + require.ErrorIs(t, err, &InProgressErr{}) + }) + + t.Run("retries exhausted", func(t *testing.T) { + updater, storeClient, _ := setupUpdater(t) + updater.AttemptCount = 3 + + apiVersion := "1234" + + // Fail enough times to exhaust our retries. + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(nil, errors.New("this will be retried")). + Times(3) + + err := updater.Update(testcontext.New(t), testURL.String(), testID, apiVersion) + require.Error(t, err) + require.Equal(t, "failed to update tracked resource after 3 attempts", err.Error()) + }) +} + +func Test_run(t *testing.T) { + apiVersion := "1234" + + t.Run("successful update (new resource)", func(t *testing.T) { + updater, storeClient, roundTripper := setupUpdater(t) + + resource := map[string]any{ + "id": testID.String(), + "name": testID.Name(), + "type": testID.Type(), + "properties": map[string]any{}, + } + + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(nil, &store.ErrNotFound{}). + Times(1) + + // Mock a successful (terminal) response from the downstream API. + roundTripper.RespondWithJSON(t, http.StatusOK, resource) + + storeClient.EXPECT(). + Save(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, obj *store.Object, options ...store.SaveOptions) error { + require.Equal(t, IDFor(testID).String(), obj.ID) + + dm := obj.Data.(*datamodel.GenericResource) + require.Equal(t, IDFor(testID).String(), dm.ID) + require.Equal(t, testID.String(), dm.Properties.ID) + require.Equal(t, apiVersion, dm.Properties.APIVersion) + return nil + }). + Times(1) + + err := updater.run(testcontext.New(t), testID, IDFor(testID), testURL, apiVersion) + require.NoError(t, err) + }) + + t.Run("successful update (existing resource)", func(t *testing.T) { + updater, storeClient, roundTripper := setupUpdater(t) + + resource := map[string]any{ + "id": testID.String(), + "name": testID.Name(), + "type": testID.Type(), + "properties": map[string]any{}, + } + + etag := "some-etag" + dm := datamodel.GenericResourceFromID(testID, IDFor(testID)) + dm.Properties.APIVersion = apiVersion + + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(&store.Object{Metadata: store.Metadata{ETag: etag}, Data: dm}, nil). + Times(1) + + // Mock a successful (terminal) response from the downstream API. + roundTripper.RespondWithJSON(t, http.StatusOK, resource) + + storeClient.EXPECT(). + Save(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, obj *store.Object, options ...store.SaveOptions) error { + require.Equal(t, IDFor(testID).String(), obj.ID) + + dm := obj.Data.(*datamodel.GenericResource) + require.Equal(t, IDFor(testID).String(), dm.ID) + require.Equal(t, testID.String(), dm.Properties.ID) + require.Equal(t, apiVersion, dm.Properties.APIVersion) + return nil + }). + Times(1) + + err := updater.run(testcontext.New(t), testID, IDFor(testID), testURL, apiVersion) + require.NoError(t, err) + }) + + t.Run("successful delete", func(t *testing.T) { + updater, storeClient, roundTripper := setupUpdater(t) + + etag := "some-etag" + dm := datamodel.GenericResourceFromID(testID, IDFor(testID)) + dm.Properties.APIVersion = apiVersion + + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(&store.Object{Metadata: store.Metadata{ETag: etag}, Data: dm}, nil). + Times(1) + + // Mock a successful (terminal) response from the downstream API. + roundTripper.RespondWithJSON(t, http.StatusNotFound, &v1.ErrorResponse{Error: v1.ErrorDetails{Code: v1.CodeNotFound}}) + + storeClient.EXPECT(). + Delete(gomock.Any(), IDFor(testID).String(), gomock.Any()). + Return(nil). + Times(1) + + err := updater.run(testcontext.New(t), testID, IDFor(testID), testURL, apiVersion) + require.NoError(t, err) + }) + + t.Run("resource still provisioning", func(t *testing.T) { + updater, storeClient, roundTripper := setupUpdater(t) + + resource := map[string]any{ + "id": testID.String(), + "name": testID.Name(), + "type": testID.Type(), + "properties": map[string]any{ + "provisioningState": v1.ProvisioningStateAccepted, + }, + } + + storeClient.EXPECT(). + Get(gomock.Any(), IDFor(testID).String()). + Return(nil, &store.ErrNotFound{}). + Times(1) + + // Mock a successful (terminal) response from the downstream API. + roundTripper.RespondWithJSON(t, http.StatusOK, resource) + + err := updater.run(testcontext.New(t), testID, IDFor(testID), testURL, apiVersion) + require.Error(t, err) + require.ErrorIs(t, err, &InProgressErr{}) + }) +} + +func Test_fetch(t *testing.T) { + resource := map[string]any{ + "id": testID.String(), + "name": testID.Name(), + "type": testID.Type(), + "properties": map[string]any{ + "provisioningState": v1.ProvisioningStateAccepted, + }, + } + + errorResponse := &v1.ErrorResponse{ + Error: v1.ErrorDetails{ + Code: "SomeErrorCode", + Message: "This is a test.", + }, + } + b, err := json.MarshalIndent(errorResponse, "", " ") + require.NoError(t, err) + errorResponseText := string(b) + + t.Run("successful fetch (200)", func(t *testing.T) { + updater, _, roundTripper := setupUpdater(t) + + // Mock a successful (terminal) response from the downstream API. + roundTripper.RespondWithJSON(t, http.StatusOK, resource) + + expected := &trackedResourceState{ + ID: testID.String(), + Name: testID.Name(), + Type: testID.Type(), + Properties: trackedResourceStateProperties{ + ProvisioningState: to.Ptr(v1.ProvisioningStateAccepted), + }, + } + + state, err := updater.fetch(testcontext.New(t), testURL) + require.NoError(t, err) + require.Equal(t, expected, state) + }) + + t.Run("successful fetch (404)", func(t *testing.T) { + updater, _, roundTripper := setupUpdater(t) + + // We consider 404 a success case. + roundTripper.RespondWithJSON(t, http.StatusNotFound, errorResponse) + + state, err := updater.fetch(testcontext.New(t), testURL) + require.NoError(t, err) + require.Nil(t, state) + }) + + t.Run("failure (non-JSON)", func(t *testing.T) { + updater, _, roundTripper := setupUpdater(t) + + w := httptest.NewRecorder() + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("LOL here's some not-JSON")) + roundTripper.Response = w.Result() + + state, err := updater.fetch(testcontext.New(t), testURL) + require.Error(t, err) + require.Equal(t, "response is not JSON. Content-Type: \"text/plain\"", err.Error()) + require.Nil(t, state) + }) + + t.Run("failure (non-JSON)", func(t *testing.T) { + updater, _, roundTripper := setupUpdater(t) + + roundTripper.RespondWithJSON(t, http.StatusBadRequest, errorResponse) + + state, err := updater.fetch(testcontext.New(t), testURL) + require.Error(t, err) + require.Equal(t, "request failed with status code 400 Bad Request:\n"+errorResponseText, err.Error()) + require.Nil(t, state) + }) +} + +type mockRoundTripper struct { + Response *http.Response + Err error +} + +func (rt *mockRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + return rt.Response, rt.Err +} + +func (rt *mockRoundTripper) RespondWithJSON(t *testing.T, statusCode int, body any) { + t.Helper() + + b, err := json.Marshal(body) + require.NoError(t, err) + + w := httptest.NewRecorder() + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + _, _ = w.Write(b) + + rt.Response = w.Result() +} diff --git a/test/functional/shared/test.go b/test/functional/shared/test.go index 87b795a6e5..5bec9096f6 100644 --- a/test/functional/shared/test.go +++ b/test/functional/shared/test.go @@ -82,16 +82,17 @@ func NewRPTestOptions(t *testing.T) RPTestOptions { return RPTestOptions{ TestOptions: test.NewTestOptions(t), + Workspace: workspace, CustomAction: customAction, ManagementClient: client, AWSClient: awsClient, Connection: connection, - Workspace: workspace, } } type RPTestOptions struct { test.TestOptions + CustomAction *clientv2.CustomActionClient ManagementClient clients.ApplicationsManagementClient AWSClient aws.AWSCloudControlClient diff --git a/test/functional/ucp/tracked_resource_test.go b/test/functional/ucp/tracked_resource_test.go new file mode 100644 index 0000000000..5ed7375afe --- /dev/null +++ b/test/functional/ucp/tracked_resource_test.go @@ -0,0 +1,174 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ucp + +import ( + "encoding/json" + "fmt" + "sort" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/google/uuid" + v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" + aztoken "github.com/radius-project/radius/pkg/azure/tokencredentials" + corerp "github.com/radius-project/radius/pkg/corerp/api/v20231001preview" + "github.com/radius-project/radius/pkg/sdk" + ucp "github.com/radius-project/radius/pkg/ucp/api/v20231001preview" + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/radius-project/radius/test/functional/shared" + "github.com/radius-project/radius/test/testcontext" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_TrackedResources(t *testing.T) { + log := func(message string, obj any) { + j, err := json.MarshalIndent(&obj, "", " ") + require.NoError(t, err) + t.Logf("%s:\n\n%+v", message, string(j)) + } + + ctx := testcontext.New(t) + options := shared.NewRPTestOptions(t) + resourceGroupID := resources.MustParse("/planes/radius/local/resourcegroups/test-" + uuid.New().String()) + + rgc, err := ucp.NewResourceGroupsClient(&aztoken.AnonymousCredential{}, sdk.NewClientOptions(options.Connection)) + require.NoError(t, err) + rc, err := ucp.NewResourcesClient(&aztoken.AnonymousCredential{}, sdk.NewClientOptions(options.Connection)) + require.NoError(t, err) + ac, err := corerp.NewApplicationsClient(resourceGroupID.String(), &aztoken.AnonymousCredential{}, sdk.NewClientOptions(options.Connection)) + require.NoError(t, err) + exc, err := corerp.NewExtendersClient(resourceGroupID.String(), &aztoken.AnonymousCredential{}, sdk.NewClientOptions(options.Connection)) + require.NoError(t, err) + + rg, err := rgc.CreateOrUpdate(ctx, "radius", "local", resourceGroupID.Name(), ucp.ResourceGroupResource{Location: to.Ptr(v1.LocationGlobal)}, nil) + require.NoError(t, err) + log("Created resource group", rg) + + t.Run("Resource group starts empty", func(t *testing.T) { + resources := []*ucp.GenericResource{} + pager := rc.NewListPager("radius", "local", resourceGroupID.Name(), nil) + for pager.More() { + page, err := pager.NextPage(ctx) + require.NoError(t, err) + log("Got resource page", page) + resources = append(resources, page.Value...) + } + require.Empty(t, resources) + }) + + t.Run("Create resources", func(t *testing.T) { + for i := 0; i < 3; i++ { + a, err := ac.CreateOrUpdate(ctx, fmt.Sprintf("app-%d", i), corerp.ApplicationResource{ + Location: to.Ptr(v1.LocationGlobal), + Properties: &corerp.ApplicationProperties{ + Environment: to.Ptr(options.Workspace.Environment), + }, + }, nil) + require.NoError(t, err) + log("Got application", a) + + // We're using extender here because its operations are asynchronous. + poller, err := exc.BeginCreateOrUpdate(ctx, fmt.Sprintf("ex-%d", i), corerp.ExtenderResource{ + Location: to.Ptr(v1.LocationGlobal), + Properties: &corerp.ExtenderProperties{ + Environment: to.Ptr(options.Workspace.Environment), + Application: to.Ptr(*a.ID), + ResourceProvisioning: to.Ptr(corerp.ResourceProvisioningManual), + }, + }, nil) + require.NoError(t, err) + + ex, err := poller.PollUntilDone(ctx, nil) + require.NoError(t, err) + log("Got extender", ex) + } + }) + + t.Run("Resource group contains resources", func(t *testing.T) { + expected := []*ucp.GenericResource{} + + for i := 0; i < 3; i++ { + expected = append(expected, &ucp.GenericResource{ + ID: to.Ptr(resourceGroupID.Append(resources.TypeSegment{Type: "Applications.Core/applications", Name: fmt.Sprintf("app-%d", i)}).String()), + Name: to.Ptr(fmt.Sprintf("app-%d", i)), + Type: to.Ptr("Applications.Core/applications"), + }) + expected = append(expected, &ucp.GenericResource{ + ID: to.Ptr(resourceGroupID.Append(resources.TypeSegment{Type: "Applications.Core/extenders", Name: fmt.Sprintf("ex-%d", i)}).String()), + Name: to.Ptr(fmt.Sprintf("ex-%d", i)), + Type: to.Ptr("Applications.Core/extenders"), + }) + } + + sort.Slice(expected, func(i, j int) bool { + return *expected[i].ID < *expected[j].ID + }) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + resources := []*ucp.GenericResource{} + pager := rc.NewListPager("radius", "local", resourceGroupID.Name(), nil) + for pager.More() { + page, err := pager.NextPage(ctx) + require.NoError(t, err) + log("Got resource page", page) + resources = append(resources, page.Value...) + } + + sort.Slice(resources, func(i, j int) bool { + return *resources[i].ID < *resources[j].ID + }) + assert.Equal(t, expected, resources) + }, time.Second*30, time.Millisecond*500) + }) + + t.Run("Delete resources", func(t *testing.T) { + for i := 0; i < 3; i++ { + // Delete in reverse order to make sure the extender is deleted before the application it + // belongs to. + poller, err := exc.BeginDelete(ctx, fmt.Sprintf("ex-%d", i), nil) + require.NoError(t, err) + + _, err = poller.PollUntilDone(ctx, nil) + require.NoError(t, err) + + _, err = ac.Delete(ctx, fmt.Sprintf("app-%d", i), nil) + require.NoError(t, err) + } + }) + + t.Run("Resource group is empty again", func(t *testing.T) { + require.EventuallyWithT(t, func(t *assert.CollectT) { + resources := []*ucp.GenericResource{} + pager := rc.NewListPager("radius", "local", resourceGroupID.Name(), nil) + for pager.More() { + page, err := pager.NextPage(ctx) + require.NoError(t, err) + log("Got resource page", page) + resources = append(resources, page.Value...) + } + assert.Empty(t, resources) + }, time.Second*30, time.Millisecond*500) + }) + + t.Run("Delete resource group", func(t *testing.T) { + _, err := rgc.Delete(ctx, "radius", "local", resourceGroupID.Name(), nil) + require.NoError(t, err) + }) +}