Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: introduce new topology #50

Merged
merged 3 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,13 @@ cargo pike --help
replicasets = 2
replication_factor = 2

[[tier.default.services]]
name = "main"
plugin = "plugin_name"
[plugin.super_plugin]
migration_context = [
{ name = "example_name", value = "example_value" },
]

[plugin.super_plugin.service.main]
tiers = ["default"]
```

```bash
Expand Down
9 changes: 5 additions & 4 deletions plugin_template/topology.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
replicasets = 2
replication_factor = 2

[[tier.default.services]]
name = "main"
plugin = "{{ project_name }}"
migration_envs = [
[plugin.{{ project_name }}]
migration_context = [
{ name = "example_name", value = "example_value" },
]

[plugin.{{ project_name }}.service.main]
tiers = ["default"]
199 changes: 109 additions & 90 deletions src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,106 +13,124 @@ use std::process::{exit, Child, Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use std::{collections::HashMap, fs, path::PathBuf};
use std::{fs, path::PathBuf};

use crate::commands::lib;

#[derive(Debug, Deserialize)]
struct Service {
name: String,
plugin: String,
struct Tier {
replicasets: u8,
replication_factor: u8,
}

#[derive(Debug, Deserialize)]
struct MigrationEnv {
struct MigrationContextVar {
name: String,
value: String,
}

#[derive(Debug, Deserialize)]
struct Tier {
replicasets: u8,
replication_factor: u8,
migration_envs: Option<Vec<MigrationEnv>>,
services: Option<Vec<Service>>,
struct Service {
tiers: Vec<String>,
}

#[derive(Debug, Deserialize)]
struct Plugin {
#[serde(default)]
migration_context: Vec<MigrationContextVar>,
#[serde(default)]
#[serde(rename = "service")]
services: BTreeMap<String, Service>,
#[serde(skip)]
version: Option<String>,
}

#[derive(Debug, Deserialize)]
struct Topology {
#[serde(rename = "tier")]
tiers: BTreeMap<String, Tier>,
#[serde(default)]
#[serde(rename = "plugin")]
plugins: BTreeMap<String, Plugin>,
}

fn enable_plugins(
topology: &Topology,
data_dir: &Path,
picodata_path: &PathBuf,
plugins_dir: &Path,
) -> Result<()> {
let mut plugins: HashMap<String, String> = HashMap::new();
for tier in topology.tiers.values() {
let Some(services) = &tier.services else {
continue;
};
for service in services {
let current_plugin_dir = plugins_dir.join(service.plugin.clone());
impl Topology {
fn find_plugin_versions(&mut self, plugins_dir: &Path) -> Result<()> {
for (plugin_name, plugin) in &mut self.plugins {
let current_plugin_dir = plugins_dir.join(plugin_name);

if !current_plugin_dir.exists() {
bail!(
"directory {} does not exist, run \"cargo build\" inside plugin directory",
"plugin directory {} does not exist",
current_plugin_dir.display()
);
}
plugins.entry(service.plugin.clone()).or_insert_with(|| {
let mut versions: Vec<_> = fs::read_dir(current_plugin_dir)
.unwrap()
.map(|r| r.unwrap())
.collect();
versions.sort_by_key(std::fs::DirEntry::path);
versions
.last()
.unwrap()
.file_name()
.to_str()
.unwrap()
.to_string()
});
let mut versions: Vec<_> = fs::read_dir(current_plugin_dir)
.unwrap()
.map(|r| r.unwrap())
.collect();
versions.sort_by_key(std::fs::DirEntry::path);
let newest_version = versions
.last()
.unwrap()
.file_name()
.to_str()
.unwrap()
.to_string();
plugin.version = Some(newest_version);
}
Ok(())
}
}

fn enable_plugins(topology: &Topology, data_dir: &Path, picodata_path: &PathBuf) -> Result<()> {
let mut queries: Vec<String> = Vec::new();
// Queries to set migration variables, order of commands is not important
push_migration_envs_queries(&mut queries, topology, &plugins);

for (plugin, version) in &plugins {
queries.push(format!(r#"CREATE PLUGIN "{plugin}" {version};"#));
queries.push(format!(r#"ALTER PLUGIN "{plugin}" MIGRATE TO {version};"#));
queries.push(format!(r#"ALTER PLUGIN "{plugin}" {version} ENABLE;"#));
}
for (plugin_name, plugin) in &topology.plugins {
let plugin_version = plugin.version.as_ref().unwrap();

for (tier_name, tier) in &topology.tiers {
let Some(services) = &tier.services else {
continue;
};
for service in services {
let plugin_name = &service.plugin;
let plugin_version = plugins
.get(plugin_name)
.context("failed to find plugin version")?;
let service_name = &service.name;
queries.push(format!(r#"ALTER PLUGIN "{plugin_name}" {plugin_version} ADD SERVICE "{service_name}" TO TIER "{tier_name}";"#));
// create plugin
queries.push(format!(
r#"CREATE PLUGIN "{plugin_name}" {plugin_version};"#
));

// add migration context
for migration_env in &plugin.migration_context {
queries.push(format!(
"ALTER PLUGIN {plugin_name} {plugin_version} SET migration_context.{}='{}';",
migration_env.name, migration_env.value
));
}

// run migrations
queries.push(format!(
r#"ALTER PLUGIN "{plugin_name}" MIGRATE TO {plugin_version};"#
));

// add services to tiers
for (service_name, service) in &plugin.services {
for tier_name in &service.tiers {
queries.push(format!(r#"ALTER PLUGIN "{plugin_name}" {plugin_version} ADD SERVICE "{service_name}" TO TIER "{tier_name}";"#));
}
}

// enable plugin
queries.push(format!(
r#"ALTER PLUGIN "{plugin_name}" {plugin_version} ENABLE;"#
));
}

let admin_soket = data_dir.join("cluster").join("i1").join("admin.sock");

for query in queries {
log::info!("picodata admin: {query}");

let mut picodata_admin = Command::new(picodata_path)
.arg("admin")
.arg(admin_soket.to_str().unwrap())
.stdin(Stdio::piped())
.stdout(Stdio::null())
.stderr(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.context("failed to spawn child proccess of picodata admin")?;

Expand All @@ -127,36 +145,29 @@ fn enable_plugins(
.wait()
.context("failed to wait for picodata admin")?;

thread::sleep(Duration::from_secs(3));
let outputs: [Box<dyn Read + Send>; 2] = [
Box::new(picodata_admin.stdout.unwrap()),
Box::new(picodata_admin.stderr.unwrap()),
];
for output in outputs {
let reader = BufReader::new(output);
for line in reader.lines() {
let line = line.expect("failed to read picodata admin output");
log::info!("picodata admin: {line}");
}
}
}

for (plugin, version) in &plugins {
info!("Plugin {plugin}:{version} is enabled");
for (plugin_name, plugin) in &topology.plugins {
info!(
"Plugin {plugin_name}:{} is enabled",
plugin.version.as_ref().unwrap()
);
}

Ok(())
}

fn push_migration_envs_queries(
queries: &mut Vec<String>,
topology: &Topology,
plugins: &HashMap<String, String>,
) {
for tier in topology.tiers.values() {
let Some(migration_envs) = &tier.migration_envs else {
continue;
};
for migration_env in migration_envs {
for (plugin, version) in plugins {
queries.push(format!(
"ALTER PLUGIN {plugin} {version} SET migration_context.{}='{}';",
migration_env.name, migration_env.value
));
}
}
}
}

pub struct PicodataInstance {
instance_name: String,
tier: String,
Expand Down Expand Up @@ -351,7 +362,7 @@ pub struct Params {
}

pub fn cluster(params: &Params) -> Result<Vec<PicodataInstance>> {
let topology: &Topology = &toml::from_str(
let mut topology: Topology = toml::from_str(
&fs::read_to_string(&params.topology_path)
.context(format!("failed to read {}", params.topology_path.display()))?,
)
Expand All @@ -368,8 +379,12 @@ pub fn cluster(params: &Params) -> Result<Vec<PicodataInstance>> {
params.target_dir.join("debug")
};

topology.find_plugin_versions(&plugins_dir)?;

info!("Running the cluster...");

let is_clean_run = !params.data_dir.join("cluster").exists();

let mut picodata_processes = vec![];

let first_instance_bin_port = 3001;
Expand All @@ -391,20 +406,24 @@ pub fn cluster(params: &Params) -> Result<Vec<PicodataInstance>> {

picodata_processes.push(pico_instance);

// TODO: check is started by logs or iproto
thread::sleep(Duration::from_secs(5));
if is_clean_run {
// TODO: check is started by logs or iproto
thread::sleep(Duration::from_secs(5));
}

info!("i{instance_id} - started");
}
}

if !params.disable_plugin_install {
let result = enable_plugins(
topology,
&params.data_dir,
&params.picodata_path,
&plugins_dir,
);
info!("Enabling plugins...");

if !is_clean_run {
// TODO: check is started by logs or iproto
thread::sleep(Duration::from_secs(5));
}

let result = enable_plugins(&topology, &params.data_dir, &params.picodata_path);
if let Err(e) = result {
for process in &mut picodata_processes {
process.kill().unwrap_or_else(|e| {
Expand Down
Loading