Skip to content

Commit 40f1dc4

Browse files
authored
fix: reconcile bug (#41)
1 parent a7eb784 commit 40f1dc4

File tree

9 files changed

+63
-53
lines changed

9 files changed

+63
-53
lines changed

charts/tensor-fusion/Chart.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ type: application
1515
# This is the chart version. This version number should be incremented each time you make changes
1616
# to the chart and its templates, including the app version.
1717
# Versions are expected to follow Semantic Versioning (https://semver.org/)
18-
version: 1.1.3
18+
version: 1.1.4
1919

2020
# This is the version number of the application being deployed. This version number should be
2121
# incremented each time you make changes to the application. Versions are not expected to

charts/tensor-fusion/values.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ serviceAccount:
2020
annotations: {}
2121

2222
hypervisor:
23-
greptimedbEndpoint: greptimedb-standalone.greptimedb.svc.cluster.local:4001
23+
greptimedbEndpoint: greptimedb-standalone.tensor-fusion.svc.cluster.local:4001
2424
image:
2525
repository: tensorfusion/tensor-fusion-hypervisor
2626
# Overrides the image tag whose default is the chart appVersion.
@@ -51,7 +51,7 @@ controller:
5151
tolerations: []
5252
affinity: {}
5353

54-
greptimedbEndpoint: http://greptimedb-standalone.greptimedb.svc.cluster.local:4000/v1/prometheus/write?db=public
54+
greptimedbEndpoint: http://greptimedb-standalone.tensor-fusion.svc.cluster.local:4000/v1/prometheus/write?db=public
5555
admissionWebhooks:
5656
failurePolicy: Fail
5757
secretName: tensor-fusion-webhook-secret

internal/cloudprovider/common/utils.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func CalculateLeastCostGPUNodes(ctx context.Context, provider types.GPUNodeProvi
128128
for i := int64(0); i < bestNumInstances; i++ {
129129
// Zone and region should ideally be determined from nodeClass's subnet selectors
130130
nodes = append(nodes, types.NodeCreationParam{
131-
NodeName: fmt.Sprintf("%s-%s-%d", pool.Name, generateRandomString(6), i+1),
131+
NodeName: fmt.Sprintf("%s-%s", pool.Name, generateRandomString(8)),
132132
InstanceType: bestInstance.InstanceType,
133133
NodeClass: nodeClass,
134134
Region: region,

internal/controller/gpunode_controller.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ func (r *GPUNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
116116
}
117117
}
118118

119+
// Only reconcile if the node has a kubernetes node name, otherwise the DaemonSet like workloads can not be scheduled
120+
if node.Status.KubernetesNodeName == "" {
121+
return ctrl.Result{}, nil
122+
}
123+
119124
if err := r.reconcileNodeDiscoveryJob(ctx, node, poolName); err != nil {
120125
return ctrl.Result{}, err
121126
}
@@ -173,7 +178,7 @@ func (r *GPUNodeReconciler) reconcileNodeDiscoveryJob(
173178
{
174179
Key: "metadata.name",
175180
Operator: corev1.NodeSelectorOpIn,
176-
Values: []string{gpunode.Name},
181+
Values: []string{gpunode.Status.KubernetesNodeName},
177182
},
178183
},
179184
})
@@ -227,10 +232,9 @@ func (r *GPUNodeReconciler) reconcileHypervisorPod(ctx context.Context, node *tf
227232
if poolName == "" {
228233
return nil
229234
}
230-
231235
namespace := utils.CurrentNamespace()
232236
log := log.FromContext(ctx)
233-
hypervisorPodName := fmt.Sprintf("%s-%s-hypervisor", poolName, node.Name)
237+
hypervisorPodName := fmt.Sprintf("%s-hypervisor", node.Name)
234238
pool := r.GpuPoolState.Get(poolName)
235239
if pool == nil {
236240
return fmt.Errorf("failed to get tensor-fusion pool, can not create hypervisor pod, pool: %s", poolName)
@@ -245,7 +249,7 @@ func (r *GPUNodeReconciler) reconcileHypervisorPod(ctx context.Context, node *tf
245249
if spec.NodeSelector == nil {
246250
spec.NodeSelector = make(map[string]string)
247251
}
248-
spec.NodeSelector["kubernetes.io/hostname"] = node.Name
252+
spec.NodeSelector["kubernetes.io/hostname"] = node.Status.KubernetesNodeName
249253
newPod := &corev1.Pod{
250254
ObjectMeta: metav1.ObjectMeta{
251255
Name: hypervisorPodName,

internal/controller/gpupool_controller.go

+7-8
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ type GPUPoolReconciler struct {
5151
func (r *GPUPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
5252
log := log.FromContext(ctx)
5353

54+
// TODO, debounce the reconcile when lots of GPUNodes created/updated, GPUPool should not updated too frequently (it owns Job)
5455
log.Info("Reconciling GPUPool", "name", req.NamespacedName.Name)
5556
defer func() {
5657
log.Info("Finished reconciling GPUPool", "name", req.NamespacedName.Name)
@@ -77,6 +78,11 @@ func (r *GPUPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
7778
return ctrl.Result{}, nil
7879
}
7980

81+
// Update pool capacity at first
82+
if err := r.reconcilePoolCurrentCapacityAndReadiness(ctx, pool); err != nil {
83+
return ctrl.Result{}, err
84+
}
85+
8086
// For provisioning mode, check if need to scale up GPUNodes upon AvailableCapacity changed
8187
isProvisioningMode := pool.Spec.NodeManagerConfig.ProvisioningMode == tfv1.ProvisioningModeProvisioned
8288

@@ -91,11 +97,8 @@ func (r *GPUPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
9197
}
9298
// Set phase to updating and let GPUNode event trigger the check and update capacity loop, util all nodes are ready
9399
if newNodeCreated {
100+
// Refresh the capacity again since new node has been created
94101
pool.Status.Phase = tfv1.TensorFusionPoolPhaseUpdating
95-
err = r.Client.Status().Update(ctx, pool)
96-
if err != nil {
97-
return ctrl.Result{}, err
98-
}
99102
if err := r.reconcilePoolCurrentCapacityAndReadiness(ctx, pool); err != nil {
100103
return ctrl.Result{}, err
101104
}
@@ -109,10 +112,6 @@ func (r *GPUPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
109112
// TODO, when componentConfig changed, it should notify corresponding resource to upgrade
110113
// eg. when hypervisor changed, should change all owned GPUNode's status.phase to Updating
111114

112-
if err := r.reconcilePoolCurrentCapacityAndReadiness(ctx, pool); err != nil {
113-
return ctrl.Result{}, err
114-
}
115-
116115
return ctrl.Result{}, nil
117116
}
118117

internal/controller/gpupool_node_provision.go

+29-26
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,6 @@ func (r *GPUPoolReconciler) reconcilePoolCapacityWithProvisioner(ctx context.Con
140140
ManageMode: tfv1.GPUNodeManageModeProvisioned,
141141
CostPerHour: strconv.FormatFloat(costPerHour, 'f', 6, 64),
142142
},
143-
Status: tfv1.GPUNodeStatus{
144-
TotalTFlops: node.TFlopsOffered,
145-
TotalVRAM: node.VRAMOffered,
146-
TotalGPUs: node.GPUDeviceOffered,
147-
},
148143
}
149144
_ = controllerutil.SetControllerReference(pool, gpuNodeRes, r.Scheme)
150145
err := r.Client.Create(ctx, gpuNodeRes)
@@ -153,27 +148,35 @@ func (r *GPUPoolReconciler) reconcilePoolCapacityWithProvisioner(ctx context.Con
153148
return
154149
}
155150
// Update GPUNode status to set the resource quantity
156-
// gpuNodeRes.Status =
157-
// if err := r.Client.Status().Update(ctx, gpuNodeRes); err != nil {
158-
// errList = append(errList, err)
159-
// return
160-
// }
161-
162-
// // Create node on cloud provider
163-
// status, err := provider.CreateNode(ctx, &node)
164-
// if err != nil {
165-
// errList = append(errList, err)
166-
// return
167-
// }
168-
169-
// // Update GPUNode status about the cloud vendor info
170-
// // To match GPUNode - K8S node, the --node-label in Kubelet is MUST-have, like Karpenter, it force set userdata to add a provisionerId label, k8s node controller then can set its ownerReference to the GPUNode
171-
// gpuNodeRes.Status.NodeInfo.IP = status.PrivateIP
172-
// gpuNodeRes.Status.NodeInfo.InstanceID = status.InstanceID
173-
// gpuNodeRes.Status.NodeInfo.Region = node.Region
174-
// r.Client.Status().Update(ctx, gpuNodeRes)
175-
176-
// r.Recorder.Eventf(pool, corev1.EventTypeNormal, "GPUNodeCreated", "Created node: %s, IP: %s", status.InstanceID, status.PrivateIP)
151+
gpuNodeRes.Status = tfv1.GPUNodeStatus{
152+
Phase: tfv1.TensorFusionGPUNodePhasePending,
153+
TotalTFlops: node.TFlopsOffered,
154+
TotalVRAM: node.VRAMOffered,
155+
TotalGPUs: node.GPUDeviceOffered,
156+
AllocationDetails: []tfv1.GPUNodeAllocationDetails{},
157+
LoadedModels: []string{},
158+
ManagedGPUDeviceIDs: []string{},
159+
}
160+
if err := r.Client.Status().Patch(ctx, gpuNodeRes, client.Merge); err != nil {
161+
errList = append(errList, err)
162+
return
163+
}
164+
165+
// Create node on cloud provider
166+
status, err := provider.CreateNode(ctx, &node)
167+
if err != nil {
168+
errList = append(errList, err)
169+
return
170+
}
171+
172+
// Update GPUNode status about the cloud vendor info
173+
// To match GPUNode - K8S node, the --node-label in Kubelet is MUST-have, like Karpenter, it force set userdata to add a provisionerId label, k8s node controller then can set its ownerReference to the GPUNode
174+
gpuNodeRes.Status.NodeInfo.IP = status.PrivateIP
175+
gpuNodeRes.Status.NodeInfo.InstanceID = status.InstanceID
176+
gpuNodeRes.Status.NodeInfo.Region = node.Region
177+
r.Client.Status().Update(ctx, gpuNodeRes)
178+
179+
r.Recorder.Eventf(pool, corev1.EventTypeNormal, "GPUNodeCreated", "Created node: %s, IP: %s", status.InstanceID, status.PrivateIP)
177180
}(node)
178181
}
179182

internal/controller/node_controller.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
9696
if e != nil {
9797
return ctrl.Result{}, fmt.Errorf("failed to create or patch GPUNode: %w", e)
9898
}
99+
100+
gpuNode.Status.KubernetesNodeName = node.Name
101+
if err := r.Client.Status().Patch(ctx, gpuNode, client.Merge); err != nil {
102+
return ctrl.Result{}, fmt.Errorf("can not add Kubernetes Node info into gpuNode(%s) status : %w", gpuNode.Name, err)
103+
}
99104
log.Info("Created GPUNode due to selector matched", "name", gpuNode.Name)
100105
}
101106

@@ -118,10 +123,6 @@ func (r *NodeReconciler) generateGPUNode(ctx context.Context, node *corev1.Node,
118123
Spec: tfv1.GPUNodeSpec{
119124
ManageMode: tfv1.GPUNodeManageModeAutoSelect,
120125
},
121-
Status: tfv1.GPUNodeStatus{
122-
KubernetesNodeName: node.Name,
123-
ObservedGeneration: node.Generation,
124-
},
125126
}
126127
return gpuNode
127128
}

internal/controller/tensorfusioncluster_controller.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ func (r *TensorFusionClusterReconciler) Reconcile(ctx context.Context, req ctrl.
153153
return ctrl.Result{RequeueAfter: delay}, nil
154154
} else {
155155
// all components are ready, set cluster as ready
156+
tfc.Status.RetryCount = 0
156157
tfc.SetAsReady(conditions...)
157158
gpupools, err := r.mustListOwnedGPUPools(ctx, tfc)
158159
if err != nil {
@@ -349,10 +350,6 @@ func (r *TensorFusionClusterReconciler) checkTFClusterComponentsReady(ctx contex
349350
Type: constants.ConditionStatusTypeTimeSeriesDatabase,
350351
Status: metav1.ConditionTrue,
351352
},
352-
{
353-
Type: constants.ConditionStatusTypeCloudVendorConnection,
354-
Status: metav1.ConditionTrue,
355-
},
356353
}
357354

358355
// check if all conditions are true, any not ready component will make allPass false
@@ -387,7 +384,8 @@ func (r *TensorFusionClusterReconciler) checkTFClusterComponentsReady(ctx contex
387384
func (r *TensorFusionClusterReconciler) updateTFClusterStatus(ctx context.Context, tfc *tfv1.TensorFusionCluster, prevStatus *tfv1.TensorFusionClusterStatus) error {
388385
// Update the cluster status, ignore retryCount, keep other fields to compare
389386
prevStatus.RetryCount = tfc.Status.RetryCount
390-
if equality.Semantic.DeepEqual(tfc.Status, prevStatus) {
387+
388+
if equality.Semantic.DeepEqual(tfc.Status, *prevStatus) {
391389
return nil
392390
}
393391
if err := r.Status().Update(ctx, tfc); err != nil {

internal/utils/reconcile.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"context"
55
"crypto/sha256"
66
"encoding/hex"
7+
"encoding/json"
78
"errors"
8-
"fmt"
99
"math"
1010
"math/rand/v2"
1111
"os"
@@ -91,6 +91,11 @@ func CurrentNamespace() string {
9191

9292
func GetObjectHash(obj any) string {
9393
hasher := sha256.New()
94-
hasher.Write([]byte(fmt.Sprintf("%v", obj)))
94+
jsonBytes, err := json.Marshal(obj)
95+
if err != nil {
96+
panic(err)
97+
}
98+
str := string(jsonBytes)
99+
hasher.Write([]byte(str))
95100
return hex.EncodeToString(hasher.Sum(nil))
96101
}

0 commit comments

Comments
 (0)