Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Merge branch 'master' into GH-2268
Browse files Browse the repository at this point in the history
  • Loading branch information
jgrund authored Sep 24, 2020
2 parents 3994821 + b7bae22 commit 8ea6e40
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 71 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ members = [
'iml-api',
'iml-change',
'iml-cmd',
'iml-command-utils',
'iml-fs',
'iml-graphql-queries',
'iml-command-utils',
'iml-influx',
'iml-job-scheduler-rpc',
'iml-mailbox',
Expand Down
188 changes: 122 additions & 66 deletions Pipfile.lock

Large diffs are not rendered by default.

26 changes: 26 additions & 0 deletions iml-command-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ use tokio::time::delay_for;
pub enum CmdUtilError {
#[error(transparent)]
ImlManagerClientError(#[from] iml_manager_client::ImlManagerClientError),
#[error("Failed commands: {0:?}")]
FailedCommandError(Vec<Command>),
}

impl From<Vec<Command>> for CmdUtilError {
fn from(xs: Vec<Command>) -> Self {
CmdUtilError::FailedCommandError(xs)
}
}

pub enum Progress {
Expand Down Expand Up @@ -61,6 +69,24 @@ pub async fn wait_for_cmds_progress(
}
}

/// Waits for command completion and prints progress messages.
/// This will error on command failure and print failed commands in the error message.
pub async fn wait_for_cmds_success(
cmds: &[Command],
tx: Option<mpsc::UnboundedSender<Progress>>,
) -> Result<Vec<Command>, CmdUtilError> {
let cmds = wait_for_cmds_progress(cmds, tx).await?;

let (failed, passed): (Vec<_>, Vec<_>) =
cmds.into_iter().partition(|x| x.errored || x.cancelled);

if !failed.is_empty() {
Err(failed.into())
} else {
Ok(passed)
}
}

fn cmd_finished(cmd: &Command) -> bool {
cmd.complete
}
6 changes: 5 additions & 1 deletion iml-services/iml-snapshot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ version = "0.1.0"
[dependencies]
futures = "0.3"
futures-util = "0.3"
iml-command-utils = {path = "../../iml-command-utils", version = "0.3"}
iml-graphql-queries = {path = "../../iml-graphql-queries", version = "0.1"}
iml-influx = {path = "../../iml-influx", version = "0.1"}
iml-manager-client = {path = "../../iml-manager-client", version = "0.3"}
iml-manager-env = {path = "../../iml-manager-env", version = "0.3"}
iml-postgres = {path = "../../iml-postgres", version = "0.3"}
iml-rabbit = {path = "../../iml-rabbit", version = "0.3"}
iml-service-queue = {path = "../iml-service-queue", version = "0.3"}
iml-tracing = {version = "0.2", path = "../../iml-tracing"}
iml-wire-types = {path = "../../iml-wire-types", version = "0.3"}
thiserror = "1.0"
tokio = {version = "0.2", features = ["rt-threaded"]}

127 changes: 125 additions & 2 deletions iml-services/iml-snapshot/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,126 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

use futures_util::stream::TryStreamExt;
use futures::{StreamExt, TryStreamExt};
use iml_command_utils::{wait_for_cmds_success, CmdUtilError};
use iml_graphql_queries::snapshot as snapshot_queries;
use iml_manager_client::{get_influx, graphql, Client, ImlManagerClientError};
use iml_manager_env::get_pool_limit;
use iml_postgres::{get_db_pool, sqlx};
use iml_postgres::{get_db_pool, sqlx, PgPool};
use iml_service_queue::service_queue::consume_data;
use iml_tracing::tracing;
use iml_wire_types::snapshot;
use std::{collections::HashMap, fmt::Debug};
use thiserror::Error;
use tokio::{
time::Instant,
time::{interval, Duration},
};

// Default pool limit if not overridden by POOL_LIMIT
const DEFAULT_POOL_LIMIT: u32 = 2;

#[derive(Error, Debug)]
enum Error {
#[error(transparent)]
CmdUtilError(#[from] CmdUtilError),
#[error(transparent)]
ImlManagerClientError(#[from] ImlManagerClientError),
#[error(transparent)]
ImlGraphqlQueriesError(#[from] iml_graphql_queries::Errors),
#[error(transparent)]
ImlPostgresError(#[from] iml_postgres::sqlx::Error),
}

#[derive(Debug, PartialEq, Eq, Hash, Clone)]
enum State {
Monitoring(u64),
CountingDown(Instant),
}

async fn tick(snapshot_client_counts: &mut HashMap<i32, State>, pool: PgPool) -> Result<(), Error> {
let client: Client = iml_manager_client::get_client()?;
let client_2 = client.clone();

let query = iml_influx::filesystems::query();
let stats_fut =
get_influx::<iml_influx::filesystems::InfluxResponse>(client, "iml_stats", query.as_str());

let influx_resp = stats_fut.await?;
let stats = iml_influx::filesystems::Response::from(influx_resp);

tracing::debug!("Influx stats: {:?}", stats);

let snapshots =
sqlx::query!("SELECT id, filesystem_name, snapshot_name, snapshot_fsname FROM snapshot")
.fetch_all(&pool)
.await?;
tracing::debug!("Fetched {} snapshots from DB", snapshots.len());

for snapshot in snapshots {
let snapshot_id = snapshot.id;
let snapshot_stats = match stats.get(&snapshot.snapshot_fsname) {
Some(x) => x,
None => continue,
};

let state = snapshot_client_counts.get_mut(&snapshot_id);
let clients = snapshot_stats.clients.unwrap_or(0);

match state {
Some(State::Monitoring(prev_clients)) => {
tracing::debug!(
"Monitoring. Snapshot {}: {} clients (previously {} clients)",
&snapshot.snapshot_fsname,
clients,
prev_clients,
);
if *prev_clients > 0 && clients == 0 {
tracing::trace!("counting down for job");
let instant = Instant::now() + Duration::from_secs(5 * 60);
state.map(|s| *s = State::CountingDown(instant));
} else {
*prev_clients = clients;
}
}
Some(State::CountingDown(when)) => {
tracing::debug!(
"Counting down. Snapshot {}: 0 clients (previously {} clients)",
&snapshot.snapshot_fsname,
clients
);
if clients > 0 {
tracing::trace!("changing state");
state.map(|s| *s = State::Monitoring(clients));
} else if Instant::now() >= *when {
tracing::trace!("running the job");
state.map(|s| *s = State::Monitoring(0));

let query = snapshot_queries::unmount::build(
&snapshot.filesystem_name,
&snapshot.snapshot_name,
);
let resp: iml_graphql_queries::Response<snapshot_queries::unmount::Resp> =
graphql(client_2.clone(), query).await?;
let command = Result::from(resp)?.data.unmount_snapshot;
wait_for_cmds_success(&[command], None).await?;
}
}
None => {
tracing::debug!(
"Just learnt about this snapshot. Snapshot {}: {} clients",
&snapshot.snapshot_fsname,
clients,
);

snapshot_client_counts.insert(snapshot_id, State::Monitoring(clients));
}
}
}

Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
iml_tracing::init();
Expand All @@ -25,8 +135,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut s = consume_data::<Vec<snapshot::Snapshot>>(&ch, "rust_agent_snapshot_rx");

let pool = get_db_pool(get_pool_limit().unwrap_or(DEFAULT_POOL_LIMIT)).await?;
let pool_2 = pool.clone();
sqlx::migrate!("../../migrations").run(&pool).await?;

tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(60));
let mut snapshot_client_counts: HashMap<i32, State> = HashMap::new();

while let Some(_) = interval.next().await {
let tick_result = tick(&mut snapshot_client_counts, pool_2.clone()).await;
if let Err(e) = tick_result {
tracing::error!("Error during handling snapshot autounmount: {}", e);
}
}
});

while let Some((fqdn, snapshots)) = s.try_next().await? {
tracing::debug!("snapshots from {}: {:?}", fqdn, snapshots);

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ requests_unixsocket==0.1.5
requests==2.12.5
setuptools
tablib==0.9.11
toolz
toolz==0.10.0
tzlocal
urllib3
vine==1.3.0
36 changes: 36 additions & 0 deletions sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -2837,6 +2837,42 @@
"nullable": []
}
},
"b90d797530fd50b0596cbdc7bce108066fc5c4e8d8feec13f22143f7e44653e9": {
"query": "SELECT id, filesystem_name, snapshot_name, snapshot_fsname FROM snapshot",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int4"
},
{
"ordinal": 1,
"name": "filesystem_name",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "snapshot_name",
"type_info": "Text"
},
{
"ordinal": 3,
"name": "snapshot_fsname",
"type_info": "Text"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false,
false,
false
]
}
},
"bb7fb337cfeac9a17eefb21a498e76be70c2f395473ba42331f51cc5e6d48679": {
"query": "\n INSERT INTO chroma_core_device\n (fqdn, devices)\n VALUES ($1, $2)\n ON CONFLICT (fqdn) DO UPDATE\n SET devices = EXCLUDED.devices\n ",
"describe": {
Expand Down

0 comments on commit 8ea6e40

Please sign in to comment.