Skip to content

Commit

Permalink
Support specifying Service type (#228)
Browse files Browse the repository at this point in the history
# Description

For stackabletech/issues#360
  • Loading branch information
sbernauer committed Apr 13, 2023
1 parent bd19c4a commit fd8e931
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 11 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@ All notable changes to this project will be documented in this file.
### Added

- Deploy default and support custom affinities ([#217])
- BREAKING: Dropped support for old spec.{driver,executor}.nodeSelector field. Use spec.{driver,executor}.affinity.nodeSelector instead ([#217])
- Log aggregation added ([#226]).

### Changed

- [BREAKING] Support specifying Service type for HistoryServer.
This enables us to later switch non-breaking to using `ListenerClasses` for the exposure of Services.
This change is breaking, because - for security reasons - we default to the `cluster-internal` `ListenerClass`.
If you need your cluster to be accessible from outside of Kubernetes you need to set `clusterConfig.listenerClass`
to `external-unstable` or `external-stable` ([#228]).
- [BREAKING]: Dropped support for old `spec.{driver,executor}.nodeSelector` field. Use `spec.{driver,executor}.affinity.nodeSelector` instead ([#217])
- Revert openshift settings ([#207])
- BUGFIX: assign service account to history pods ([#207])
- Merging and validation of the configuration refactored ([#223])
Expand All @@ -21,6 +26,7 @@ All notable changes to this project will be documented in this file.
[#217]: https://github.com/stackabletech/spark-k8s-operator/pull/217
[#223]: https://github.com/stackabletech/spark-k8s-operator/pull/223
[#226]: https://github.com/stackabletech/spark-k8s-operator/pull/226
[#228]: https://github.com/stackabletech/spark-k8s-operator/pull/228

## [23.1.0] - 2023-01-23

Expand Down
21 changes: 21 additions & 0 deletions deploy/helm/spark-k8s-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2933,6 +2933,27 @@ spec:
properties:
spec:
properties:
clusterConfig:
default:
listenerClass: cluster-internal
description: Global Spark history server configuration that applies to all roles and role groups
properties:
listenerClass:
default: cluster-internal
description: |-
In the future this setting will control, which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html> will be used to expose the service. Currently only a subset of the ListenerClasses are supported by choosing the type of the created Services by looking at the ListenerClass name specified, In a future release support for custom ListenerClasses will be introduced without a breaking change:
* cluster-internal: Use a ClusterIP service
* external-unstable: Use a NodePort service
* external-stable: Use a LoadBalancer service
enum:
- cluster-internal
- external-unstable
- external-stable
type: string
type: object
image:
anyOf:
- required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ spec:
sparkConf: # <4>
nodes:
roleGroups:
cleaner:
default:
replicas: 1 # <5>
config:
cleaner: true # <6>
4 changes: 2 additions & 2 deletions rust/crd/src/affinity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ mod test {
reference: spark-history-s3-bucket
nodes:
roleGroups:
cleaner:
default:
replicas: 1
config:
cleaner: true
Expand Down Expand Up @@ -102,7 +102,7 @@ mod test {
let rolegroup_ref = RoleGroupRef {
cluster: ObjectRef::from_obj(&history),
role: HISTORY_ROLE_NAME.to_string(),
role_group: "cleaner".to_string(),
role_group: "default".to_string(),
};

let affinity = history.merged_config(&rolegroup_ref).unwrap().affinity;
Expand Down
44 changes: 44 additions & 0 deletions rust/crd/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ pub enum Error {
#[serde(rename_all = "camelCase")]
pub struct SparkHistoryServerSpec {
pub image: ProductImage,
/// Global Spark history server configuration that applies to all roles and role groups
#[serde(default)]
pub cluster_config: SparkHistoryServerClusterConfig,
/// Name of the Vector aggregator discovery ConfigMap.
/// It must contain the key `ADDRESS` with the address of the Vector aggregator.
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -73,6 +76,47 @@ pub struct SparkHistoryServerSpec {
pub nodes: Role<HistoryConfigFragment>,
}

#[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SparkHistoryServerClusterConfig {
/// In the future this setting will control, which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html>
/// will be used to expose the service.
/// Currently only a subset of the ListenerClasses are supported by choosing the type of the created Services
/// by looking at the ListenerClass name specified,
/// In a future release support for custom ListenerClasses will be introduced without a breaking change:
///
/// * cluster-internal: Use a ClusterIP service
///
/// * external-unstable: Use a NodePort service
///
/// * external-stable: Use a LoadBalancer service
#[serde(default)]
pub listener_class: CurrentlySupportedListenerClasses,
}

// TODO: Temporary solution until listener-operator is finished
#[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "PascalCase")]
pub enum CurrentlySupportedListenerClasses {
#[default]
#[serde(rename = "cluster-internal")]
ClusterInternal,
#[serde(rename = "external-unstable")]
ExternalUnstable,
#[serde(rename = "external-stable")]
ExternalStable,
}

impl CurrentlySupportedListenerClasses {
pub fn k8s_service_type(&self) -> String {
match self {
CurrentlySupportedListenerClasses::ClusterInternal => "ClusterIP".to_string(),
CurrentlySupportedListenerClasses::ExternalUnstable => "NodePort".to_string(),
CurrentlySupportedListenerClasses::ExternalStable => "LoadBalancer".to_string(),
}
}
}

impl SparkHistoryServer {
pub fn merged_config(
&self,
Expand Down
14 changes: 10 additions & 4 deletions rust/operator-binary/src/history_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,11 +430,16 @@ fn build_service(
None => "global".to_owned(),
};

let (service_name, service_type) = match group {
Some(rgr) => (rgr.object_name(), "ClusterIP".to_string()),
let (service_name, service_type, service_cluster_ip) = match group {
Some(rgr) => (
rgr.object_name(),
"ClusterIP".to_string(),
Some("None".to_string()),
),
None => (
format!("{}-{}", shs.metadata.name.as_ref().unwrap(), role),
"NodePort".to_string(),
shs.spec.cluster_config.listener_class.k8s_service_type(),
None,
),
};

Expand All @@ -452,13 +457,14 @@ fn build_service(
.with_recommended_labels(labels(shs, app_version_label, &group_name))
.build(),
spec: Some(ServiceSpec {
type_: Some(service_type),
cluster_ip: service_cluster_ip,
ports: Some(vec![ServicePort {
name: Some(String::from("http")),
port: 18080,
..ServicePort::default()
}]),
selector: Some(selector),
type_: Some(service_type),
..ServiceSpec::default()
}),
status: None,
Expand Down
2 changes: 1 addition & 1 deletion tests/templates/kuttl/spark-history-server/06-assert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ timeout: 900
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: spark-history-node-cleaner
name: spark-history-node-default
status:
readyReplicas: 1
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ spec:
logging:
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
roleGroups:
cleaner:
default:
replicas: 1
config:
cleaner: true
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ spec:
"bash",
"-x",
"-c",
"test 2 == $(curl http://spark-history-node-cleaner:18080/api/v1/applications | jq length)",
"test 2 == $(curl http://spark-history-node-default:18080/api/v1/applications | jq length)",
]

0 comments on commit fd8e931

Please sign in to comment.