From 10c59a6b90310b8c58babf5beb108b59f4d74e4d Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Wed, 13 Dec 2023 15:54:00 +0400 Subject: [PATCH] fix: leave discovery service later in the reset sequence Fixes #8057 I went back and forth on the way to fix it exactly, and ended up with a pretty simple version of a fix. The problem was that discovery service was removing the member at the initial phase of reset, which actually still requires KubeSpan to be up: * leaving `etcd` (need to talk to other members) * stopping pods (might need to talk to Kubernetes API with some CNIs) Now leaving discovery service happens way later, when network interactions are no longer required. Signed-off-by: Andrey Smirnov --- .drone.jsonnet | 4 +- .../controllers/cluster/discovery_service.go | 12 ++-- .../cluster/discovery_service_test.go | 5 +- .../runtime/v1alpha1/v1alpha1_sequencer.go | 3 + .../v1alpha1/v1alpha1_sequencer_tasks.go | 7 +++ .../pkg/runtime/v1alpha2/v1alpha2_state.go | 1 + internal/integration/api/discovery.go | 52 ++++------------ internal/integration/base/api.go | 7 ++- .../resources/runtime/deep_copy.generated.go | 8 ++- .../resources/runtime/machine_reset_signal.go | 59 +++++++++++++++++++ pkg/machinery/resources/runtime/runtime.go | 2 +- .../resources/runtime/runtime_test.go | 1 + 12 files changed, 106 insertions(+), 55 deletions(-) create mode 100644 pkg/machinery/resources/runtime/machine_reset_signal.go diff --git a/.drone.jsonnet b/.drone.jsonnet index 2819f8c807..e0edf078ab 100644 --- a/.drone.jsonnet +++ b/.drone.jsonnet @@ -547,7 +547,6 @@ local integration_kubespan = Step('e2e-kubespan', target='e2e-qemu', privileged= WITH_CLUSTER_DISCOVERY: 'true', WITH_KUBESPAN: 'true', IMAGE_REGISTRY: local_registry, - WITH_CONFIG_PATCH: '[{"op": "replace", "path": "/cluster/discovery/registries/kubernetes/disabled", "value": false}]', // use Kubernetes discovery backend }); local integration_default_hostname = Step('e2e-default-hostname', target='e2e-qemu', privileged=true, depends_on=[integration_kubespan], environment={ // regression test: make sure Talos works in maintenance mode when no hostname is set @@ -557,9 +556,10 @@ local integration_default_hostname = Step('e2e-default-hostname', target='e2e-qe DISABLE_DHCP_HOSTNAME: 'true', }); -local integration_qemu_encrypted_vip = Step('e2e-encrypted-vip', target='e2e-qemu', privileged=true, depends_on=[load_artifacts], environment={ +local integration_qemu_encrypted_vip = Step('e2e-encrypted-kubespan-vip', target='e2e-qemu', privileged=true, depends_on=[load_artifacts], environment={ WITH_DISK_ENCRYPTION: 'true', WITH_VIRTUAL_IP: 'true', + WITH_KUBESPAN: 'true', IMAGE_REGISTRY: local_registry, }); diff --git a/internal/app/machined/pkg/controllers/cluster/discovery_service.go b/internal/app/machined/pkg/controllers/cluster/discovery_service.go index 6963af5165..4870da0a8b 100644 --- a/internal/app/machined/pkg/controllers/cluster/discovery_service.go +++ b/internal/app/machined/pkg/controllers/cluster/discovery_service.go @@ -68,8 +68,8 @@ func (ctrl *DiscoveryServiceController) Inputs() []controller.Input { }, { Namespace: runtime.NamespaceName, - Type: runtime.MachineStatusType, - ID: optional.Some(runtime.MachineStatusID), + Type: runtime.MachineResetSignalType, + ID: optional.Some(runtime.MachineResetSignalID), Kind: controller.InputWeak, }, } @@ -218,9 +218,9 @@ func (ctrl *DiscoveryServiceController) Run(ctx context.Context, r controller.Ru return fmt.Errorf("error listing endpoints: %w", err) } - machineStatus, err := safe.ReaderGet[*runtime.MachineStatus](ctx, r, resource.NewMetadata(runtime.NamespaceName, runtime.MachineStatusType, runtime.MachineStatusID, resource.VersionUndefined)) + machineResetSginal, err := safe.ReaderGetByID[*runtime.MachineResetSignal](ctx, r, runtime.MachineResetSignalID) if err != nil && !state.IsNotFoundError(err) { - return fmt.Errorf("error getting machine status: %w", err) + return fmt.Errorf("error getting machine reset signal: %w", err) } if client == nil { @@ -257,9 +257,9 @@ func (ctrl *DiscoveryServiceController) Run(ctx context.Context, r controller.Ru // delete/update local affiliate // - // if the node enters resetting stage, cleanup the local affiliate + // if the node enters final resetting stage, cleanup the local affiliate // otherwise, update local affiliate data - if machineStatus != nil && machineStatus.TypedSpec().Stage == runtime.MachineStageResetting { + if machineResetSginal != nil { client.DeleteLocalAffiliate() } else { localData := pbAffiliate(affiliateSpec) diff --git a/internal/app/machined/pkg/controllers/cluster/discovery_service_test.go b/internal/app/machined/pkg/controllers/cluster/discovery_service_test.go index 37f94d2b66..f1307b4d3b 100644 --- a/internal/app/machined/pkg/controllers/cluster/discovery_service_test.go +++ b/internal/app/machined/pkg/controllers/cluster/discovery_service_test.go @@ -242,9 +242,8 @@ func (suite *DiscoveryServiceSuite) TestReconcile() { ) // pretend that machine is being reset - machineStatus := runtime.NewMachineStatus() - machineStatus.TypedSpec().Stage = runtime.MachineStageResetting - suite.Require().NoError(suite.state.Create(suite.ctx, machineStatus)) + machineResetSignal := runtime.NewMachineResetSignal() + suite.Require().NoError(suite.state.Create(suite.ctx, machineResetSignal)) // client should see the affiliate being deleted suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry( diff --git a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer.go b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer.go index 77b2c88044..712f18a9e1 100644 --- a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer.go +++ b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer.go @@ -342,6 +342,9 @@ func (*Sequencer) Reset(r runtime.Runtime, in runtime.ResetOptions) []runtime.Ph in.GetGraceful() && (r.Config().Machine().Type() != machine.TypeWorker), "leave", LeaveEtcd, + ).Append( + "preReset", + SendResetSignal, ).AppendList( phaseListErrorHandler(logError, stopAllPhaselist(r, withKexec)...), ).Append( diff --git a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go index d9f4b2ca49..53f2cdca53 100644 --- a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go +++ b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go @@ -2303,6 +2303,13 @@ func StoreShutdownEmergency(runtime.Sequence, any) (runtime.TaskExecutionFunc, s }, "storeShutdownEmergency" } +// SendResetSignal func represents the task to send the final reset signal. +func SendResetSignal(runtime.Sequence, any) (runtime.TaskExecutionFunc, string) { + return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) (err error) { + return r.State().V1Alpha2().Resources().Create(ctx, resourceruntime.NewMachineResetSignal()) + }, "sendResetSignal" +} + func pauseOnFailure(callback func(runtime.Sequence, any) (runtime.TaskExecutionFunc, string), timeout time.Duration, ) func(seq runtime.Sequence, data any) (runtime.TaskExecutionFunc, string) { diff --git a/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_state.go b/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_state.go index 3258f4c2f2..4aca522a43 100644 --- a/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_state.go +++ b/internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_state.go @@ -179,6 +179,7 @@ func NewState() (*State, error) { &runtime.KmsgLogConfig{}, &runtime.MaintenanceServiceConfig{}, &runtime.MaintenanceServiceRequest{}, + &runtime.MachineResetSignal{}, &runtime.MachineStatus{}, &runtime.MetaKey{}, &runtime.MetaLoaded{}, diff --git a/internal/integration/api/discovery.go b/internal/integration/api/discovery.go index ccdbea8b42..9e080ede45 100644 --- a/internal/integration/api/discovery.go +++ b/internal/integration/api/discovery.go @@ -14,10 +14,12 @@ import ( "time" "github.com/cosi-project/runtime/pkg/resource" + "github.com/cosi-project/runtime/pkg/resource/rtestutils" "github.com/cosi-project/runtime/pkg/safe" "github.com/siderolabs/gen/maps" "github.com/siderolabs/gen/value" "github.com/siderolabs/gen/xslices" + "github.com/stretchr/testify/assert" "github.com/siderolabs/talos/internal/integration/base" "github.com/siderolabs/talos/pkg/machinery/client" @@ -262,18 +264,16 @@ func (suite *DiscoverySuite) TestKubeSpanPeers() { for _, node := range nodes { nodeCtx := client.WithNode(suite.ctx, node) - peerSpecs := suite.getKubeSpanPeerSpecs(nodeCtx) - suite.Assert().Len(peerSpecs, len(nodes)-1) + rtestutils.AssertLength[*kubespan.PeerSpec](nodeCtx, suite.T(), suite.Client.COSI, len(nodes)-1) + rtestutils.AssertLength[*kubespan.PeerStatus](nodeCtx, suite.T(), suite.Client.COSI, len(nodes)-1) - peerStatuses := suite.getKubeSpanPeerStatuses(nodeCtx) - suite.Assert().Len(peerStatuses, len(nodes)-1) - - for _, status := range peerStatuses { - suite.Assert().Equal(kubespan.PeerStateUp, status.TypedSpec().State) - suite.Assert().False(value.IsZero(status.TypedSpec().Endpoint)) - suite.Assert().Greater(status.TypedSpec().ReceiveBytes, int64(0)) - suite.Assert().Greater(status.TypedSpec().TransmitBytes, int64(0)) - } + rtestutils.AssertAll[*kubespan.PeerStatus](nodeCtx, suite.T(), suite.Client.COSI, + func(status *kubespan.PeerStatus, asrt *assert.Assertions) { + asrt.Equal(kubespan.PeerStateUp, status.TypedSpec().State) + asrt.False(value.IsZero(status.TypedSpec().Endpoint)) + asrt.Greater(status.TypedSpec().ReceiveBytes, int64(0)) + asrt.Greater(status.TypedSpec().TransmitBytes, int64(0)) + }) } } @@ -310,36 +310,6 @@ func (suite *DiscoverySuite) getAffiliates(nodeCtx context.Context, namespace re return result } -func (suite *DiscoverySuite) getKubeSpanPeerSpecs(nodeCtx context.Context) []*kubespan.PeerSpec { - var result []*kubespan.PeerSpec - - items, err := safe.StateListAll[*kubespan.PeerSpec](nodeCtx, suite.Client.COSI) - suite.Require().NoError(err) - - it := items.Iterator() - - for it.Next() { - result = append(result, it.Value()) - } - - return result -} - -func (suite *DiscoverySuite) getKubeSpanPeerStatuses(nodeCtx context.Context) []*kubespan.PeerStatus { - var result []*kubespan.PeerStatus - - items, err := safe.StateListAll[*kubespan.PeerStatus](nodeCtx, suite.Client.COSI) - suite.Require().NoError(err) - - it := items.Iterator() - - for it.Next() { - result = append(result, it.Value()) - } - - return result -} - func init() { allSuites = append(allSuites, new(DiscoverySuite)) } diff --git a/internal/integration/base/api.go b/internal/integration/base/api.go index 3197b835c4..0eb2911119 100644 --- a/internal/integration/base/api.go +++ b/internal/integration/base/api.go @@ -25,6 +25,7 @@ import ( "github.com/siderolabs/go-retry/retry" "github.com/stretchr/testify/suite" "google.golang.org/grpc/backoff" + "google.golang.org/grpc/codes" "github.com/siderolabs/talos/cmd/talosctl/pkg/talos/helpers" "github.com/siderolabs/talos/internal/app/machined/pkg/runtime" @@ -399,7 +400,11 @@ func (apiSuite *APISuite) ClearConnectionRefused(ctx context.Context, nodes ...s continue } - if strings.Contains(err.Error(), "connection refused") { + if client.StatusCode(err) == codes.Unavailable { + return retry.ExpectedError(err) + } + + if strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "connection reset by peer") { return retry.ExpectedError(err) } diff --git a/pkg/machinery/resources/runtime/deep_copy.generated.go b/pkg/machinery/resources/runtime/deep_copy.generated.go index 66724f67ca..709abb1d65 100644 --- a/pkg/machinery/resources/runtime/deep_copy.generated.go +++ b/pkg/machinery/resources/runtime/deep_copy.generated.go @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -// Code generated by "deep-copy -type DevicesStatusSpec -type EventSinkConfigSpec -type KernelModuleSpecSpec -type KernelParamSpecSpec -type KernelParamStatusSpec -type KmsgLogConfigSpec -type MaintenanceServiceConfigSpec -type MaintenanceServiceRequestSpec -type MachineStatusSpec -type MetaKeySpec -type MountStatusSpec -type PlatformMetadataSpec -type SecurityStateSpec -type MetaLoadedSpec -type UniqueMachineTokenSpec -header-file ../../../../hack/boilerplate.txt -o deep_copy.generated.go ."; DO NOT EDIT. +// Code generated by "deep-copy -type DevicesStatusSpec -type EventSinkConfigSpec -type KernelModuleSpecSpec -type KernelParamSpecSpec -type KernelParamStatusSpec -type KmsgLogConfigSpec -type MaintenanceServiceConfigSpec -type MaintenanceServiceRequestSpec -type MachineResetSignalSpec -type MachineStatusSpec -type MetaKeySpec -type MountStatusSpec -type PlatformMetadataSpec -type SecurityStateSpec -type MetaLoadedSpec -type UniqueMachineTokenSpec -header-file ../../../../hack/boilerplate.txt -o deep_copy.generated.go ."; DO NOT EDIT. package runtime @@ -81,6 +81,12 @@ func (o MaintenanceServiceRequestSpec) DeepCopy() MaintenanceServiceRequestSpec return cp } +// DeepCopy generates a deep copy of MachineResetSignalSpec. +func (o MachineResetSignalSpec) DeepCopy() MachineResetSignalSpec { + var cp MachineResetSignalSpec = o + return cp +} + // DeepCopy generates a deep copy of MachineStatusSpec. func (o MachineStatusSpec) DeepCopy() MachineStatusSpec { var cp MachineStatusSpec = o diff --git a/pkg/machinery/resources/runtime/machine_reset_signal.go b/pkg/machinery/resources/runtime/machine_reset_signal.go new file mode 100644 index 0000000000..58436a9c52 --- /dev/null +++ b/pkg/machinery/resources/runtime/machine_reset_signal.go @@ -0,0 +1,59 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package runtime + +import ( + "github.com/cosi-project/runtime/pkg/resource" + "github.com/cosi-project/runtime/pkg/resource/meta" + "github.com/cosi-project/runtime/pkg/resource/protobuf" + "github.com/cosi-project/runtime/pkg/resource/typed" + + "github.com/siderolabs/talos/pkg/machinery/proto" +) + +// MachineResetSignalType is type of MachineResetSignal resource. +const MachineResetSignalType = resource.Type("MachineResetSignals.runtime.talos.dev") + +// MachineResetSignalID is singleton MachineResetSignal resource ID. +const MachineResetSignalID = resource.ID("machine") + +// MachineResetSignal resource is created to signal that the machine is going to be reset soon. +// +// This resource is created when all remaining actions are local to the node, and network communication is not required. +type MachineResetSignal = typed.Resource[MachineResetSignalSpec, MachineResetSignalExtension] + +// MachineResetSignalSpec describes the spec of MachineResetSignal. +// +//gotagsrewrite:gen +type MachineResetSignalSpec struct{} + +// NewMachineResetSignal initializes a MachineResetSignal resource. +func NewMachineResetSignal() *MachineResetSignal { + return typed.NewResource[MachineResetSignalSpec, MachineResetSignalExtension]( + resource.NewMetadata(NamespaceName, MachineResetSignalType, MachineResetSignalID, resource.VersionUndefined), + MachineResetSignalSpec{}, + ) +} + +// MachineResetSignalExtension is auxiliary resource data for MachineResetSignal. +type MachineResetSignalExtension struct{} + +// ResourceDefinition implements meta.ResourceDefinitionProvider interface. +func (MachineResetSignalExtension) ResourceDefinition() meta.ResourceDefinitionSpec { + return meta.ResourceDefinitionSpec{ + Type: MachineResetSignalType, + Aliases: []resource.Type{}, + DefaultNamespace: NamespaceName, + } +} + +func init() { + proto.RegisterDefaultTypes() + + err := protobuf.RegisterDynamic[MachineResetSignalSpec](MachineResetSignalType, &MachineResetSignal{}) + if err != nil { + panic(err) + } +} diff --git a/pkg/machinery/resources/runtime/runtime.go b/pkg/machinery/resources/runtime/runtime.go index b92d5e90cb..49f92ba026 100644 --- a/pkg/machinery/resources/runtime/runtime.go +++ b/pkg/machinery/resources/runtime/runtime.go @@ -4,4 +4,4 @@ package runtime -//go:generate deep-copy -type DevicesStatusSpec -type EventSinkConfigSpec -type KernelModuleSpecSpec -type KernelParamSpecSpec -type KernelParamStatusSpec -type KmsgLogConfigSpec -type MaintenanceServiceConfigSpec -type MaintenanceServiceRequestSpec -type MachineStatusSpec -type MetaKeySpec -type MountStatusSpec -type PlatformMetadataSpec -type SecurityStateSpec -type MetaLoadedSpec -type UniqueMachineTokenSpec -header-file ../../../../hack/boilerplate.txt -o deep_copy.generated.go . +//go:generate deep-copy -type DevicesStatusSpec -type EventSinkConfigSpec -type KernelModuleSpecSpec -type KernelParamSpecSpec -type KernelParamStatusSpec -type KmsgLogConfigSpec -type MaintenanceServiceConfigSpec -type MaintenanceServiceRequestSpec -type MachineResetSignalSpec -type MachineStatusSpec -type MetaKeySpec -type MountStatusSpec -type PlatformMetadataSpec -type SecurityStateSpec -type MetaLoadedSpec -type UniqueMachineTokenSpec -header-file ../../../../hack/boilerplate.txt -o deep_copy.generated.go . diff --git a/pkg/machinery/resources/runtime/runtime_test.go b/pkg/machinery/resources/runtime/runtime_test.go index 142205775f..2f007f625e 100644 --- a/pkg/machinery/resources/runtime/runtime_test.go +++ b/pkg/machinery/resources/runtime/runtime_test.go @@ -33,6 +33,7 @@ func TestRegisterResource(t *testing.T) { &runtime.KernelParamStatus{}, &runtime.KmsgLogConfig{}, &runtime.MachineStatus{}, + &runtime.MachineResetSignal{}, &runtime.MaintenanceServiceConfig{}, &runtime.MaintenanceServiceRequest{}, &runtime.MetaKey{},