Skip to content

Commit

Permalink
Bugfix for: Druid connection not established (#187)
Browse files Browse the repository at this point in the history
## Description

Fixes #174 

A test for the bug is found in this PR https://github.com/stackabletech/integration-tests/pull/235

- the DruidConnection controller now watches the ConfigMaps, to be notified if a druid instance that should be connected is deployed; to connect it then.
- the DruidConnnection namespaces are also made optional in this PR
  • Loading branch information
Felix Hennig committed May 9, 2022
1 parent e1df37b commit 2423aa5
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 28 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@
### Changed

- Required product image version changed to 2 ([#182]).
- DruidConnection namespace properties are optional now ([#187]).

### Fixed

- A DruidConnection was not established if the Druid instance was started after
the Superset instance, this was fixed ([#187]).
- The correct secret key is used when upgrading the Superset database. This
issue was introduced in [#173] ([#190]).

[#173]: https://github.com/stackabletech/superset-operator/pull/173
[#178]: https://github.com/stackabletech/superset-operator/pull/178
[#179]: https://github.com/stackabletech/superset-operator/pull/179
[#182]: https://github.com/stackabletech/superset-operator/pull/182
[#187]: https://github.com/stackabletech/superset-operator/pull/187
[#190]: https://github.com/stackabletech/superset-operator/pull/190

## [0.4.0] - 2022-04-05
Expand Down
4 changes: 2 additions & 2 deletions deploy/crd/druidconnection.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ spec:
name:
type: string
namespace:
nullable: true
type: string
required:
- name
- namespace
type: object
superset:
properties:
name:
type: string
namespace:
nullable: true
type: string
required:
- name
- namespace
type: object
required:
- druid
Expand Down
4 changes: 2 additions & 2 deletions deploy/helm/superset-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -246,20 +246,20 @@ spec:
name:
type: string
namespace:
nullable: true
type: string
required:
- name
- namespace
type: object
superset:
properties:
name:
type: string
namespace:
nullable: true
type: string
required:
- name
- namespace
type: object
required:
- druid
Expand Down
4 changes: 2 additions & 2 deletions deploy/manifests/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -247,20 +247,20 @@ spec:
name:
type: string
namespace:
nullable: true
type: string
required:
- name
- namespace
type: object
superset:
properties:
name:
type: string
namespace:
nullable: true
type: string
required:
- name
- namespace
type: object
required:
- druid
Expand Down
6 changes: 5 additions & 1 deletion docs/modules/ROOT/pages/usage.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,14 @@ spec:
----

The `name` and `namespace` in `spec.superset` refer to the superset cluster that you want to connect. Following our example above, the name is `superset`.
The `name` and `namespace` in `spec.superset` refer to the Superset cluster that you want to connect. Following our example above, the name is `superset`.

In `spec.druid` you specify the `name` and `namespace` of your Druid cluster.

The `namespace` part is optional; if it is omitted it will default to the namespace of the DruidConnection.

The namespace for the Superset and Druid cluster can be omitted, in that case the Operator will assume that they are in the same namespace as the DruidConnection.

Once the database is initialized, the connection will be added to the cluster by the operator. You can see it in the user interface under Data > Databases:

image::superset-databases.png[Superset databases showing the connected Druid cluster]
Expand Down
45 changes: 44 additions & 1 deletion rust/crd/src/druidconnection.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
use serde::{Deserialize, Serialize};
use snafu::Snafu;
use stackable_operator::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time;
use stackable_operator::k8s_openapi::chrono::Utc;
use stackable_operator::kube::CustomResource;
use stackable_operator::kube::ResourceExt;
use stackable_operator::schemars::{self, JsonSchema};

#[derive(Snafu, Debug)]
#[allow(clippy::enum_variant_names)]
pub enum Error {
#[snafu(display("{druid_connection} is missing a namespace, this should not happen!"))]
NoNamespace { druid_connection: String },
}
type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterRef {
pub name: String,
pub namespace: String,
pub namespace: Option<String>,
}

#[derive(Clone, CustomResource, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
Expand All @@ -36,6 +45,40 @@ impl DruidConnection {
pub fn job_name(&self) -> String {
format!("{}-import", self.name())
}

pub fn superset_name(&self) -> String {
self.spec.superset.name.clone()
}

pub fn superset_namespace(&self) -> Result<String> {
if let Some(superset_ns) = &self.spec.superset.namespace {
Ok(superset_ns.clone())
} else if let Some(ns) = self.namespace() {
Ok(ns)
} else {
NoNamespaceSnafu {
druid_connection: self.name(),
}
.fail()
}
}

pub fn druid_name(&self) -> String {
self.spec.druid.name.clone()
}

pub fn druid_namespace(&self) -> Result<String> {
if let Some(druid_ns) = &self.spec.druid.namespace {
Ok(druid_ns.clone())
} else if let Some(ns) = self.namespace() {
Ok(ns)
} else {
NoNamespaceSnafu {
druid_connection: self.name(),
}
.fail()
}
}
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)]
Expand Down
50 changes: 39 additions & 11 deletions rust/operator-binary/src/druid_connection_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ pub enum Error {
SupersetDBRetrieval {
source: stackable_operator::error::Error,
},
#[snafu(display("namespace missing on DruidConnection {druid_connection}"))]
DruidConnectionNoNamespace {
source: stackable_superset_crd::druidconnection::Error,
druid_connection: ObjectRef<DruidConnection>,
},
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -89,6 +94,9 @@ impl ReconcilerError for Error {
Error::GetImportJob { import_job, .. } => Some(import_job.clone().erase()),
Error::DruidDiscoveryCheck { .. } => None,
Error::SupersetDBRetrieval { .. } => None,
Error::DruidConnectionNoNamespace {
druid_connection, ..
} => Some(druid_connection.clone().erase()),
}
}
}
Expand All @@ -108,8 +116,12 @@ pub async fn reconcile_druid_connection(
let mut superset_db_ready = false;
if let Some(status) = client
.get::<SupersetDB>(
&druid_connection.spec.superset.name,
Some(&druid_connection.spec.superset.namespace),
&druid_connection.superset_name(),
Some(&druid_connection.superset_namespace().context(
DruidConnectionNoNamespaceSnafu {
druid_connection: ObjectRef::from_obj(&*druid_connection),
},
)?),
)
.await
.context(SupersetDBRetrievalSnafu)?
Expand All @@ -120,24 +132,36 @@ pub async fn reconcile_druid_connection(
// Is the referenced druid discovery configmap there?
let druid_discovery_cm_exists = client
.exists::<ConfigMap>(
&druid_connection.spec.druid.name,
Some(&druid_connection.spec.druid.namespace),
&druid_connection.druid_name(),
Some(&druid_connection.druid_namespace().context(
DruidConnectionNoNamespaceSnafu {
druid_connection: ObjectRef::from_obj(&*druid_connection),
},
)?),
)
.await
.context(DruidDiscoveryCheckSnafu)?;

if superset_db_ready && druid_discovery_cm_exists {
let superset_db = client
.get::<SupersetDB>(
&druid_connection.spec.superset.name,
Some(&druid_connection.spec.superset.namespace),
&druid_connection.superset_name(),
Some(&druid_connection.superset_namespace().context(
DruidConnectionNoNamespaceSnafu {
druid_connection: ObjectRef::from_obj(&*druid_connection),
},
)?),
)
.await
.context(SupersetDBRetrievalSnafu)?;
// Everything is there, retrieve all necessary info and start the job
let sqlalchemy_str = get_sqlalchemy_uri_for_druid_cluster(
&druid_connection.spec.druid.name,
&druid_connection.spec.druid.namespace,
&druid_connection.druid_name(),
Some(&druid_connection.druid_namespace().context(
DruidConnectionNoNamespaceSnafu {
druid_connection: ObjectRef::from_obj(&*druid_connection),
},
)?),
client,
)
.await?;
Expand Down Expand Up @@ -201,14 +225,18 @@ pub async fn reconcile_druid_connection(
/// Takes a druid cluster name and namespace and returns the SQLAlchemy connect string
async fn get_sqlalchemy_uri_for_druid_cluster(
cluster_name: &str,
namespace: &str,
namespace: Option<&str>,
client: &Client,
) -> Result<String> {
client
.get::<ConfigMap>(cluster_name, Some(namespace))
.get::<ConfigMap>(cluster_name, namespace)
.await
.context(GetDruidConnStringConfigMapSnafu {
config_map: ObjectRef::<ConfigMap>::new(cluster_name).within(namespace),
config_map: if let Some(ns) = namespace {
ObjectRef::<ConfigMap>::new(cluster_name).within(ns)
} else {
ObjectRef::<ConfigMap>::new(cluster_name)
},
})?
.data
.and_then(|mut data| data.remove("DRUID_SQLALCHEMY"))
Expand Down
31 changes: 22 additions & 9 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use stackable_operator::{
k8s_openapi::api::{
apps::v1::StatefulSet,
batch::v1::Job,
core::v1::{Secret, Service},
core::v1::{ConfigMap, Secret, Service},
},
kube::{
api::ListParams,
Expand Down Expand Up @@ -169,6 +169,7 @@ async fn main() -> anyhow::Result<()> {
);
let druid_connection_store1 = druid_connection_controller_builder.store();
let druid_connection_store2 = druid_connection_controller_builder.store();
let druid_connection_store3 = druid_connection_controller_builder.store();
let druid_connection_controller = druid_connection_controller_builder
.shutdown_on_signal()
.watches(
Expand All @@ -179,10 +180,8 @@ async fn main() -> anyhow::Result<()> {
.state()
.into_iter()
.filter(move |druid_connection| {
&druid_connection.spec.superset.namespace
== sdb.metadata.namespace.as_ref().unwrap()
&& &druid_connection.spec.superset.name
== sdb.metadata.name.as_ref().unwrap()
druid_connection.superset_namespace().ok() == sdb.metadata.namespace
&& Some(druid_connection.superset_name()) == sdb.metadata.name
})
.map(|druid_connection| ObjectRef::from_obj(&*druid_connection))
},
Expand All @@ -195,10 +194,24 @@ async fn main() -> anyhow::Result<()> {
.state()
.into_iter()
.filter(move |druid_connection| {
druid_connection.metadata.namespace.as_ref().unwrap()
== job.metadata.namespace.as_ref().unwrap()
&& &druid_connection.job_name()
== job.metadata.name.as_ref().unwrap()
druid_connection.metadata.namespace == job.metadata.namespace
&& Some(druid_connection.job_name()) == job.metadata.name
})
.map(|druid_connection| ObjectRef::from_obj(&*druid_connection))
},
)
.watches(
watch_namespace.get_api::<ConfigMap>(&client),
ListParams::default(),
move |config_map| {
druid_connection_store3
.state()
.into_iter()
.filter(move |druid_connection| {
druid_connection.druid_namespace().ok()
== config_map.metadata.namespace
&& Some(druid_connection.druid_name())
== config_map.metadata.name
})
.map(|druid_connection| ObjectRef::from_obj(&*druid_connection))
},
Expand Down

0 comments on commit 2423aa5

Please sign in to comment.