-
Notifications
You must be signed in to change notification settings - Fork 900
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for CRDs for the Karmada Descheduler #4905
Comments
I just ran a quick example as per Flink Operater QuickStart, a simple FlinkDeployment looks like(you can get it from here): apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-example
spec:
image: flink:1.17
flinkVersion: v1_17
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: stateless Also, dumped the configurations here: apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
creationTimestamp: "2024-05-07T08:59:32Z"
finalizers:
- flinkdeployments.flink.apache.org/finalizer
generation: 2
name: basic-example
namespace: default
resourceVersion: "30397"
uid: fa8d217e-9859-4ccd-ba39-f4670aa6f3f5
spec:
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
flinkVersion: v1_17
image: flink:1.17
job:
args: []
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
state: running
upgradeMode: stateless
jobManager:
replicas: 1
resource:
cpu: 1
memory: 2048m
serviceAccount: flink
taskManager:
resource:
cpu: 1
memory: 2048m
status:
clusterInfo:
flink-revision: c0027e5 @ 2023-11-09T13:24:38+01:00
flink-version: 1.17.2
total-cpu: "2.0"
total-memory: "4294967296"
jobManagerDeploymentStatus: READY
jobStatus:
checkpointInfo:
lastPeriodicCheckpointTimestamp: 0
jobId: fb11661e5eebb5c2aea39ab0405f9b85
jobName: State machine job
savepointInfo:
lastPeriodicSavepointTimestamp: 0
savepointHistory: []
startTime: "1715073594831"
state: RUNNING
updateTime: "1715073620122"
lifecycleState: STABLE
observedGeneration: 2
reconciliationStatus:
lastReconciledSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.17","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_17","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
lastStableSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.17","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_17","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
reconciliationTimestamp: 1715072375768
state: DEPLOYED
taskManager:
labelSelector: component=taskmanager,app=basic-example
replicas: 1 I guess your use case might be:
You want a feature that Karmada can schedule the |
Thanks for the quick response @RainbowMango! Yes, the general use-case is as you've described -
We want to be able to:
For point 2, it becomes a little tricky as we would like to differentiate between application-level errors that result in pod failures (which Karmada shouldn't reschedule) vs. pod scheduling errors. Ideally the FlinkDeployment's status could reflect these differences so that Karmada can intepret the FlinkDeployment's health status accurately - will need to discuss further. |
For the active-active configuration, I guess you mean deploy 2 FlinkDeployments on 2 clusters, that result in two Flink clusters. I'm not familiar with Flink, does that mean the two Flink clusters are collectively consuming a single data stream?
The cluster-level failover is based on taints, you can set the tolerations to controller how long a deployment should be wait before cluster failure.
Yes, It would be great if we can observe the status(failover or not), from the FlinkDeployment's status? Can we do that? |
Yes we would deploy 2 identical FlinkDeployments, both deployed with identical data stream sources that would feed the applications the same data. In an active-passive configuration, the setup would be similar except for only one of the data sources providing data to the active application.
Thanks! Yes we are actively tuning these parameters. :)
We used a custom health interpreter to determine the FlinkDeployment's health - which seems to be working. Will confirm if this is fully suited for our use-cases after we finish testing. |
Hi @mszacillo Thanks for your explanation. To better understand your use case, I went through some materials about the Flink and Flink Kubernetes Operator. And comes more questions, hope you can help. For the active-active configuration, we can simply understand it as there are two identical systems(Flink Clusters) processing the same data, which means each piece of data will be handled separately by both systems(Flink Clusters). My question here is how the two systems(Flink Cluster) interact during the Data Sinks process? Will the data streams be forwarded to the same external systems(files, sockets, etc)? You still need a way to aggregate the data from the two systems(Flink Clusters), right? Another question, do you know how to simulate a FlinkDeployment failure? |
Hello @RainbowMango, Apologies for the delay. Your understanding is correct that in active-active there would be two identical but separate Flink Clusters processing the same data. These applications would be writing to the same Data Sink, and it would be up to the user to run the correct de-duping / aggregation process. I just published a PR with a custom health interpreter for FlinkDeployment CRD here: #5023. You can simulate an application failure by:
|
I guess we don't need this anymore in favor of #5788. Please correct me if not the case. |
@RainbowMango: Closing this issue. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
What would you like to be added:
During our testing we noticed that descheduler will filter out all resource types that are not deployments. Because of this, we would like to propose adding descheduler support for CRDs (custom resources).
Why is this needed:
We aim to use Karmada specifically for it's failover and deschedule capabilities, for the FlinkDeployment CRD. Looking at previous issues, there seems to be interest this in type of support, and we believe a generic solution could benefit the community. We wanted to create this ticket to start a conversation and get some feedback / opinions on this type of support.
The text was updated successfully, but these errors were encountered: