Skip to content

Commit

Permalink
feat!: introduce new topology
Browse files Browse the repository at this point in the history
- add plugins section
- move migration context to plugin level
- move relation between services and tiers to plugin.service level

Closes #49
  • Loading branch information
lowitea committed Jan 28, 2025
1 parent ee1964b commit 1b24572
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 98 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,13 @@ cargo pike --help
replicasets = 2
replication_factor = 2

[[tier.default.services]]
name = "main"
plugin = "plugin_name"
[plugin.super_plugin]
migration_context = [
{ name = "example_name", value = "example_value" },
]

[plugin.super_plugin.service.main]
tiers = ["default"]
```

```bash
Expand Down
9 changes: 5 additions & 4 deletions plugin_template/topology.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
replicasets = 2
replication_factor = 2

[[tier.default.services]]
name = "main"
plugin = "{{ project_name }}"
migration_envs = [
[plugin.{{ project_name }}]
migration_context = [
{ name = "example_name", value = "example_value" },
]

[plugin.{{ project_name }}.service.main]
tiers = ["default"]
168 changes: 81 additions & 87 deletions src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,95 +13,109 @@ use std::process::{exit, Child, Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use std::{collections::HashMap, fs, path::PathBuf};
use std::{fs, path::PathBuf};

use crate::commands::lib;

#[derive(Debug, Deserialize)]
struct Service {
name: String,
plugin: String,
struct Tier {
replicasets: u8,
replication_factor: u8,
}

#[derive(Debug, Deserialize)]
struct MigrationEnv {
struct MigrationContextVar {
name: String,
value: String,
}

#[derive(Debug, Deserialize)]
struct Tier {
replicasets: u8,
replication_factor: u8,
migration_envs: Option<Vec<MigrationEnv>>,
services: Option<Vec<Service>>,
struct Service {
tiers: Vec<String>,
}

#[derive(Debug, Deserialize)]
struct Plugin {
#[serde(default)]
migration_context: Vec<MigrationContextVar>,
#[serde(default)]
#[serde(rename = "service")]
services: BTreeMap<String, Service>,
#[serde(skip)]
version: Option<String>,
}

#[derive(Debug, Deserialize)]
struct Topology {
#[serde(rename = "tier")]
tiers: BTreeMap<String, Tier>,
#[serde(default)]
#[serde(rename = "plugin")]
plugins: BTreeMap<String, Plugin>,
}

fn enable_plugins(
topology: &Topology,
data_dir: &Path,
picodata_path: &PathBuf,
plugins_dir: &Path,
) -> Result<()> {
let mut plugins: HashMap<String, String> = HashMap::new();
for tier in topology.tiers.values() {
let Some(services) = &tier.services else {
continue;
};
for service in services {
let current_plugin_dir = plugins_dir.join(service.plugin.clone());
impl Topology {
fn find_plugin_versions(&mut self, plugins_dir: &Path) -> Result<()> {
for (plugin_name, plugin) in &mut self.plugins {
let current_plugin_dir = plugins_dir.join(plugin_name);

if !current_plugin_dir.exists() {
bail!(
"directory {} does not exist, run \"cargo build\" inside plugin directory",
"plugin directory {} does not exist",
current_plugin_dir.display()
);
}
plugins.entry(service.plugin.clone()).or_insert_with(|| {
let mut versions: Vec<_> = fs::read_dir(current_plugin_dir)
.unwrap()
.map(|r| r.unwrap())
.collect();
versions.sort_by_key(std::fs::DirEntry::path);
versions
.last()
.unwrap()
.file_name()
.to_str()
.unwrap()
.to_string()
});
let mut versions: Vec<_> = fs::read_dir(current_plugin_dir)
.unwrap()
.map(|r| r.unwrap())
.collect();
versions.sort_by_key(std::fs::DirEntry::path);
let newest_version = versions
.last()
.unwrap()
.file_name()
.to_str()
.unwrap()
.to_string();
plugin.version = Some(newest_version);
}
Ok(())
}
}

fn enable_plugins(topology: &Topology, data_dir: &Path, picodata_path: &PathBuf) -> Result<()> {
let mut queries: Vec<String> = Vec::new();
// Queries to set migration variables, order of commands is not important
push_migration_envs_queries(&mut queries, topology, &plugins);

for (plugin, version) in &plugins {
queries.push(format!(r#"CREATE PLUGIN "{plugin}" {version};"#));
queries.push(format!(r#"ALTER PLUGIN "{plugin}" MIGRATE TO {version};"#));
queries.push(format!(r#"ALTER PLUGIN "{plugin}" {version} ENABLE;"#));
}
for (plugin_name, plugin) in &topology.plugins {
let plugin_version = plugin.version.as_ref().unwrap();

for (tier_name, tier) in &topology.tiers {
let Some(services) = &tier.services else {
continue;
};
for service in services {
let plugin_name = &service.plugin;
let plugin_version = plugins
.get(plugin_name)
.context("failed to find plugin version")?;
let service_name = &service.name;
queries.push(format!(r#"ALTER PLUGIN "{plugin_name}" {plugin_version} ADD SERVICE "{service_name}" TO TIER "{tier_name}";"#));
// add migration context
for migration_env in &plugin.migration_context {
queries.push(format!(
"ALTER PLUGIN {plugin_name} {plugin_version} SET migration_context.{}='{}';",
migration_env.name, migration_env.value
));
}

// create and migrate plugin
queries.push(format!(
r#"CREATE PLUGIN "{plugin_name}" {plugin_version};"#
));
queries.push(format!(
r#"ALTER PLUGIN "{plugin_name}" MIGRATE TO {plugin_version};"#
));

// add services to tiers
for (service_name, service) in &plugin.services {
for tier_name in &service.tiers {
queries.push(format!(r#"ALTER PLUGIN "{plugin_name}" {plugin_version} ADD SERVICE "{service_name}" TO TIER "{tier_name}";"#));
}
}

// enable plugin
queries.push(format!(
r#"ALTER PLUGIN "{plugin_name}" {plugin_version} ENABLE;"#
));
}

let admin_soket = data_dir.join("cluster").join("i1").join("admin.sock");
Expand All @@ -126,37 +140,18 @@ fn enable_plugins(
picodata_admin
.wait()
.context("failed to wait for picodata admin")?;

thread::sleep(Duration::from_secs(3));
}

for (plugin, version) in &plugins {
info!("Plugin {plugin}:{version} is enabled");
for (plugin_name, plugin) in &topology.plugins {
info!(
"Plugin {plugin_name}:{} is enabled",
plugin.version.as_ref().unwrap()
);
}

Ok(())
}

fn push_migration_envs_queries(
queries: &mut Vec<String>,
topology: &Topology,
plugins: &HashMap<String, String>,
) {
for tier in topology.tiers.values() {
let Some(migration_envs) = &tier.migration_envs else {
continue;
};
for migration_env in migration_envs {
for (plugin, version) in plugins {
queries.push(format!(
"ALTER PLUGIN {plugin} {version} SET migration_context.{}='{}';",
migration_env.name, migration_env.value
));
}
}
}
}

pub struct PicodataInstance {
instance_name: String,
tier: String,
Expand Down Expand Up @@ -351,7 +346,7 @@ pub struct Params {
}

pub fn cluster(params: &Params) -> Result<Vec<PicodataInstance>> {
let topology: &Topology = &toml::from_str(
let mut topology: Topology = toml::from_str(
&fs::read_to_string(&params.topology_path)
.context(format!("failed to read {}", params.topology_path.display()))?,
)
Expand All @@ -368,6 +363,8 @@ pub fn cluster(params: &Params) -> Result<Vec<PicodataInstance>> {
params.target_dir.join("debug")
};

topology.find_plugin_versions(&plugins_dir)?;

info!("Running the cluster...");

let mut picodata_processes = vec![];
Expand Down Expand Up @@ -399,12 +396,9 @@ pub fn cluster(params: &Params) -> Result<Vec<PicodataInstance>> {
}

if !params.disable_plugin_install {
let result = enable_plugins(
topology,
&params.data_dir,
&params.picodata_path,
&plugins_dir,
);
info!("Enabling plugins...");

let result = enable_plugins(&topology, &params.data_dir, &params.picodata_path);
if let Err(e) = result {
for process in &mut picodata_processes {
process.kill().unwrap_or_else(|e| {
Expand Down

0 comments on commit 1b24572

Please sign in to comment.