Skip to content

Commit

Permalink
refactor: implement volume mount controller
Browse files Browse the repository at this point in the history
Fixes #9602

Aggregate incoming volume mount requests, reconcile them with volume
status, perform actual mounting, and produce mount status.
  • Loading branch information
smira committed Dec 27, 2024
1 parent 01bf844 commit 549db24
Show file tree
Hide file tree
Showing 15 changed files with 2,439 additions and 465 deletions.
32 changes: 32 additions & 0 deletions api/resource/definitions/block/block.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,28 @@ message LocatorSpec {
google.api.expr.v1alpha1.CheckedExpr match = 1;
}

// MountRequestSpec is the spec for MountRequest.
message MountRequestSpec {
string source = 1;
string target = 2;
string fs_type = 3;
repeated string options = 4;
uint64 flags = 5;
repeated string requesters = 6;
repeated string requester_i_ds = 7;
string parent_id = 8;
}

// MountSpec is the spec for volume mount.
message MountSpec {
string target_path = 1;
string selinux_label = 2;
repeated string options = 3;
}

// MountStatusSpec is the spec for MountStatus.
message MountStatusSpec {
MountRequestSpec spec = 1;
}

// PartitionSpec is the spec for volume partitioning.
Expand Down Expand Up @@ -153,6 +171,19 @@ message VolumeConfigSpec {
EncryptionSpec encryption = 6;
}

// VolumeMountRequestSpec is the spec for VolumeMountRequest.
message VolumeMountRequestSpec {
string volume_id = 1;
string requester = 2;
}

// VolumeMountStatusSpec is the spec for VolumeMountStatus.
message VolumeMountStatusSpec {
string volume_id = 1;
string requester = 2;
string target = 3;
}

// VolumeStatusSpec is the spec for VolumeStatus resource.
message VolumeStatusSpec {
talos.resource.definitions.enums.BlockVolumePhase phase = 1;
Expand All @@ -169,5 +200,6 @@ message VolumeStatusSpec {
talos.resource.definitions.enums.BlockEncryptionProviderType encryption_provider = 12;
string pretty_size = 13;
repeated string encryption_failed_syncs = 14;
MountSpec mount_spec = 15;
}

107 changes: 107 additions & 0 deletions internal/app/machined/pkg/controllers/block/mount_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package block

import (
"context"
"fmt"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/safe"
"go.uber.org/zap"

"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/talos/pkg/machinery/resources/block"
)

// MountRequestController provides mount requests based on VolumeMountRequests and VolumeStatuses.
type MountRequestController struct{}

// Name implements controller.Controller interface.
func (ctrl *MountRequestController) Name() string {
return "block.MountRequestController"
}

// Inputs implements controller.Controller interface.
func (ctrl *MountRequestController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: block.NamespaceName,
Type: block.VolumeMountRequestType,
Kind: controller.InputStrong,
},
{
Namespace: block.NamespaceName,
Type: block.VolumeStatusType,
Kind: controller.InputWeak,
},
{
Namespace: block.NamespaceName,
Type: block.MountRequestType,
Kind: controller.InputDestroyReady,
},
}
}

// Outputs implements controller.Controller interface.
func (ctrl *MountRequestController) Outputs() []controller.Output {
return []controller.Output{
{
Type: block.MountRequestType,
Kind: controller.OutputExclusive,
},
}
}

// Run implements controller.Controller interface.
//
//nolint:gocyclo
func (ctrl *MountRequestController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
for {
select {
case <-r.EventCh():
case <-ctx.Done():
return nil
}

volumeStatuses, err := safe.ReaderListAll[*block.VolumeStatus](ctx, r)
if err != nil {
return fmt.Errorf("failed to read volume statuses: %w", err)
}

volumeStatusMap := xslices.ToMap(
safe.ToSlice(
volumeStatuses,
func(v *block.VolumeStatus) *block.VolumeStatus {
return v
},
),
func(v *block.VolumeStatus) (string, *block.VolumeStatusSpec) {
return v.Metadata().ID(), v.TypedSpec()
},
)

volumeMountRequests, err := safe.ReaderListAll[*block.VolumeMountRequest](ctx, r)
if err != nil {
return fmt.Errorf("failed to read volume mount requests: %w", err)
}

desiredMountRequests := map[string]*block.MountRequestSpec{}

for volumeMountRequest := range volumeMountRequests.All() {
volumeStatus, ok := volumeStatusMap[volumeMountRequest.TypedSpec().VolumeID]
if !ok || volumeStatus.Phase != block.VolumePhaseReady {
continue
}

if _, exists := desiredMountRequests[volumeMountRequest.Metadata().ID()]; !exists {
desiredMountRequests[volumeMountRequest.Metadata().ID()] = &block.MountRequestSpec{
Source: volumeStatus.MountLocation,
}
}
}

}
}
4 changes: 4 additions & 0 deletions internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,14 @@ func NewState() (*State, error) {
&block.DiscoveryRefreshRequest{},
&block.DiscoveryRefreshStatus{},
&block.Disk{},
&block.MountRequest{},
&block.MountStatus{},
&block.SystemDisk{},
&block.UserDiskConfigStatus{},
&block.VolumeConfig{},
&block.VolumeLifecycle{},
&block.VolumeMountRequest{},
&block.VolumeMountStatus{},
&block.VolumeStatus{},
&cluster.Affiliate{},
&cluster.Config{},
Expand Down
Loading

0 comments on commit 549db24

Please sign in to comment.