Skip to content

Commit 6862f03

Browse files
razvanmaltesander
andauthored
feat: add support for SparkConnect (#539)
* wip * it builds * kuttl test * successfully create driver and executors * use bundled spark-connect jar instead of --package * kuttl test is successful * simplify CRD: no roles and no role groups for spark connect servers * test passes again * move all config to properties file * use deployment instead of stateful set * implement jvm overrides for the connect server * implement log aggregation * implement pod overrides * implement resource requests * implement cluster operation * implement connect server status tracking * Refactor server related code in its own module. * cleanup * configure executors with pod templates * split configuration between server and executor * merge executor pod overrides into pod template * implement executor affinity and resource properties * bump op-rs to 0.88.0 * implement user provided command line args for the connect server * spark connect usage guide * update readme and fix typo * cleanup, liveliness probe, do not use the iceberg test for now * expose prometheus metrics * Apply suggestions from code review Co-authored-by: Malte Sander <malte.sander.it@gmail.com> * remove duplicate constant * rename argument * GRPC and HTTP constants * fix main merge problems * remove unused error variant * remove iceberg test script * Apply suggestions from code review Co-authored-by: Malte Sander <malte.sander.it@gmail.com> * regenerate charts * change visibility * fix comment --------- Co-authored-by: Malte Sander <malte.sander.it@gmail.com>
1 parent 4419c44 commit 6862f03

26 files changed

+2846
-11
lines changed

Diff for: CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
44

55
## [Unreleased]
66

7+
### Added
8+
9+
- Experimental support for Spark Connect ([#539]).
10+
711
### Changed
812

913
- BREAKING: Replace stackable-operator `initialize_logging` with stackable-telemetry `Tracing` ([#547], [#554]).
@@ -19,6 +23,7 @@ All notable changes to this project will be documented in this file.
1923

2024
- Use `json` file extension for log files ([#553]).
2125

26+
[#539]: https://github.com/stackabletech/spark-k8s-operator/pull/539
2227
[#547]: https://github.com/stackabletech/spark-k8s-operator/pull/547
2328
[#551]: https://github.com/stackabletech/spark-k8s-operator/pull/551
2429
[#553]: https://github.com/stackabletech/spark-k8s-operator/pull/553

Diff for: crate-hashes.json

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: deploy/helm/spark-k8s-operator/crds/crds.yaml

+582-1
Large diffs are not rendered by default.

Diff for: deploy/helm/spark-k8s-operator/templates/roles.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ rules:
5353
- apps
5454
resources:
5555
- statefulsets
56+
- deployments
5657
verbs:
5758
- create
5859
- delete
@@ -102,6 +103,7 @@ rules:
102103
resources:
103104
- sparkapplications
104105
- sparkhistoryservers
106+
- sparkconnectservers
105107
verbs:
106108
- get
107109
- list
@@ -111,6 +113,7 @@ rules:
111113
- spark.stackable.tech
112114
resources:
113115
- sparkapplications/status
116+
- sparkconnectservers/status
114117
verbs:
115118
- patch
116119
- apiGroups:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
---
2+
apiVersion: spark.stackable.tech/v1alpha1
3+
kind: SparkConnectServer
4+
metadata:
5+
name: spark-connect # <1>
6+
spec:
7+
image:
8+
productVersion: "3.5.5" # <2>
9+
pullPolicy: IfNotPresent
10+
args:
11+
- "--package org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1" # <3>
12+
server:
13+
podOverrides:
14+
spec:
15+
containers:
16+
- name: spark
17+
env:
18+
- name: DEMO_GREETING # <4>
19+
value: "Hello"
20+
jvmArgumentOverrides:
21+
add:
22+
- -Dmy.custom.jvm.arg=customValue # <5>
23+
config:
24+
logging:
25+
enableVectorAgent: False
26+
containers:
27+
spark:
28+
custom:
29+
configMap: spark-connect-log-config # <6>
30+
configOverrides:
31+
spark-defaults.conf:
32+
spark.driver.cores: "3" # <7>
33+
executor:
34+
configOverrides:
35+
spark-defaults.conf:
36+
spark.executor.memoryOverhead: "1m" # <8>
37+
spark.executor.instances: "3"
38+
config:
39+
logging:
40+
enableVectorAgent: False
41+
containers:
42+
spark:
43+
custom:
44+
configMap: spark-connect-log-config
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
= Spark Connect
2+
:description: Set up a Spark Connect Server with Kubernetes as distributed execution engine with an external service to be used by clients
3+
:page-aliases: spark_connect.adoc
4+
5+
WARNING: Support for Apache Spark Connect is considered experimental and is subject to change in future releases. Spark Connect is a young technology and there are important questions to be answered yet, mostly related to security and multi-tenancy.
6+
7+
Apache Spark Connect is a remote procedure call (RPC) server that allows clients to run Spark applications on a remote cluster. Clients can connect to the Spark Connect server using a variety of programming languages, editors and IDEs without needing to install Spark locally.
8+
9+
The Stackable Spark operator can set up Spark Connect servers backed by Kubernetes as a distributed execution engine.
10+
11+
== Deployment
12+
13+
The example below demonstrates how to set up a Spark Connect server and apply some customizations.
14+
15+
[source,yaml]
16+
----
17+
include::example$example-spark-connect.yaml[]
18+
----
19+
20+
<1> The name of the Spark Connect server.
21+
<2> Version of the Spark Connect server.
22+
<3> Additional package to install when starting the Spark Connect server and executors.
23+
<4> Environment variable to be created via `podOverrides`. Alternatively, the environment variable can be set in the `spec.server.envOverrides` section.
24+
<5> Additional argument to be passed to the Spark Connect JVM settings. Do not use this to tweak heap settings. Use `spec.server.jvmOptions` instead.
25+
<6> A custom log4j configuration file to be used by the Spark Connect server. The config map must have an entry called `log4j.properties`.
26+
<7> Customize the driver properties in the `server` role. The number of cores here is not related to Kubernetes cores!
27+
<8> Customize `spark.executor.\*` and `spark.kubernetes.executor.*` in the `executor` role.
28+
29+
== Metrics
30+
31+
The server pod exposes Prometheus metrics at the following endpoints:
32+
33+
* `/metrics/prometheus` for driver instances.
34+
* `/metrics/executors/prometheus` for executor instances.
35+
36+
To customize the metrics configuration use the `spec.server.configOverrides' like this:
37+
38+
```
39+
spec:
40+
server:
41+
configOverrides:
42+
metrics.properties:
43+
applications.sink.prometheusServlet.path: "/metrics/applications/prometheus"
44+
```
45+
46+
The example above adds a new endpoint for application metrics.
47+
48+
== Notable Omissions
49+
50+
The following features are not supported by the Stackable Spark operator yet
51+
52+
* Integration with the Spark History Server.
53+
* Authorization and authentication. Currently, anyone with access to the Spark Connect service can run jobs.
54+
* Volumes and volume mounts can be added only with pod overrides.
55+
* Job dependencies must be provisioned as custom images or via `--packages` or `--jars` arguments.
56+
57+
== Known Issues
58+
59+
* Dynamically provisioning the iceberg runtime leads to "iceberg.SparkWrite$WriterFactory" ClassNotfoundException when attempting to use it from clients.

Diff for: docs/modules/spark-k8s/partials/nav.adoc

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
** xref:spark-k8s:usage-guide/security.adoc[]
1010
** xref:spark-k8s:usage-guide/logging.adoc[]
1111
** xref:spark-k8s:usage-guide/history-server.adoc[]
12+
** xref:spark-k8s:usage-guide/spark-connect.adoc[]
1213
** xref:spark-k8s:usage-guide/examples.adoc[]
1314
** xref:spark-k8s:usage-guide/overrides.adoc[]
1415
** xref:spark-k8s:usage-guide/operations/index.adoc[]

Diff for: rust/operator-binary/src/connect/common.rs

+152
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
use std::collections::{BTreeMap, HashMap};
2+
3+
use product_config::writer::to_java_properties_string;
4+
use snafu::{ResultExt, Snafu};
5+
use stackable_operator::{
6+
kvp::ObjectLabels,
7+
role_utils::{JavaCommonConfig, JvmArgumentOverrides},
8+
};
9+
use strum::Display;
10+
11+
use super::crd::CONNECT_EXECUTOR_ROLE_NAME;
12+
use crate::{
13+
connect::crd::{
14+
CONNECT_CONTROLLER_NAME, CONNECT_SERVER_ROLE_NAME, DUMMY_SPARK_CONNECT_GROUP_NAME,
15+
},
16+
crd::constants::{APP_NAME, OPERATOR_NAME},
17+
};
18+
19+
#[derive(Snafu, Debug)]
20+
#[allow(clippy::enum_variant_names)]
21+
pub enum Error {
22+
#[snafu(display("failed to merge jvm argument overrides"))]
23+
MergeJvmArgumentOverrides {
24+
source: stackable_operator::role_utils::Error,
25+
},
26+
27+
#[snafu(display("failed to serialize spark properties"))]
28+
SparkProperties {
29+
source: product_config::writer::PropertiesWriterError,
30+
},
31+
32+
#[snafu(display("failed to serialize jvm security properties",))]
33+
JvmSecurityProperties {
34+
source: product_config::writer::PropertiesWriterError,
35+
},
36+
}
37+
38+
pub(crate) fn labels<'a, T>(
39+
scs: &'a T,
40+
app_version_label: &'a str,
41+
role: &'a str,
42+
) -> ObjectLabels<'a, T> {
43+
ObjectLabels {
44+
owner: scs,
45+
app_name: APP_NAME,
46+
app_version: app_version_label,
47+
operator_name: OPERATOR_NAME,
48+
controller_name: CONNECT_CONTROLLER_NAME,
49+
role,
50+
role_group: DUMMY_SPARK_CONNECT_GROUP_NAME,
51+
}
52+
}
53+
54+
// The dead code annotation is to shut up complains about missing Executor instantiations
55+
// These will come in the future.
56+
#[allow(dead_code)]
57+
#[derive(Clone, Debug, Display)]
58+
#[strum(serialize_all = "lowercase")]
59+
pub(crate) enum SparkConnectRole {
60+
Server,
61+
Executor,
62+
}
63+
64+
pub(crate) fn object_name(stacklet_name: &str, role: SparkConnectRole) -> String {
65+
match role {
66+
SparkConnectRole::Server => format!("{}-{}", stacklet_name, CONNECT_SERVER_ROLE_NAME),
67+
SparkConnectRole::Executor => format!("{}-{}", stacklet_name, CONNECT_EXECUTOR_ROLE_NAME),
68+
}
69+
}
70+
71+
// Returns the jvm arguments a user has provided merged with the operator props.
72+
pub(crate) fn jvm_args(
73+
jvm_args: &[String],
74+
user_java_config: Option<&JavaCommonConfig>,
75+
) -> Result<String, Error> {
76+
if let Some(user_jvm_props) = user_java_config {
77+
let operator_generated = JvmArgumentOverrides::new_with_only_additions(jvm_args.to_vec());
78+
let mut user_jvm_props_copy = user_jvm_props.jvm_argument_overrides.clone();
79+
user_jvm_props_copy
80+
.try_merge(&operator_generated)
81+
.context(MergeJvmArgumentOverridesSnafu)?;
82+
Ok(user_jvm_props_copy
83+
.effective_jvm_config_after_merging()
84+
.join(" "))
85+
} else {
86+
Ok(jvm_args.join(" "))
87+
}
88+
}
89+
90+
// Merges server and executor properties and renders the contents
91+
// of the Spark properties file.
92+
pub(crate) fn spark_properties(
93+
props: &[BTreeMap<String, Option<String>>; 2],
94+
) -> Result<String, Error> {
95+
let mut result = BTreeMap::new();
96+
for p in props {
97+
result.extend(p);
98+
}
99+
to_java_properties_string(result.into_iter()).context(SparkPropertiesSnafu)
100+
}
101+
102+
pub(crate) fn security_properties(
103+
config_overrides: Option<&HashMap<String, String>>,
104+
) -> Result<String, Error> {
105+
let mut result: BTreeMap<String, Option<String>> = [
106+
(
107+
"networkaddress.cache.ttl".to_string(),
108+
Some("30".to_string()),
109+
),
110+
(
111+
"networkaddress.cache.negative.ttl".to_string(),
112+
Some("0".to_string()),
113+
),
114+
]
115+
.into();
116+
117+
if let Some(user_config) = config_overrides {
118+
result.extend(
119+
user_config
120+
.iter()
121+
.map(|(k, v)| (k.clone(), Some(v.clone()))),
122+
);
123+
}
124+
125+
to_java_properties_string(result.iter()).context(JvmSecurityPropertiesSnafu)
126+
}
127+
128+
pub(crate) fn metrics_properties(
129+
config_overrides: Option<&HashMap<String, String>>,
130+
) -> Result<String, Error> {
131+
let mut result: BTreeMap<String, Option<String>> = [
132+
(
133+
"*.sink.prometheusServlet.class".to_string(),
134+
Some("org.apache.spark.metrics.sink.PrometheusServlet".to_string()),
135+
),
136+
(
137+
"*.sink.prometheusServlet.path".to_string(),
138+
Some("/metrics/prometheus".to_string()),
139+
),
140+
]
141+
.into();
142+
143+
if let Some(user_config) = config_overrides {
144+
result.extend(
145+
user_config
146+
.iter()
147+
.map(|(k, v)| (k.clone(), Some(v.clone()))),
148+
);
149+
}
150+
151+
to_java_properties_string(result.iter()).context(JvmSecurityPropertiesSnafu)
152+
}

0 commit comments

Comments
 (0)