Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State machine #29

Merged
merged 13 commits into from
Aug 24, 2018
994 changes: 684 additions & 310 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,19 @@ assets = [
["target/release/bynar", "usr/sbin/", "755"],
["target/release/client", "usr/bin/", "755"],
["target/release/disk-manager", "usr/sbin/", "755"],
["README.md", "usr/share/doc/bynar/README", "644"],
["README.md", "usr/share/doc/bynar/README", "644"],
]

[workspace]

[dev-dependencies]
lazy_static = "~1.1"
mocktopus = "0.5.0"
rand = "~0.5"

[dependencies]
api = { path = "api" }
blkid = "~0.2"
block-utils = {git = "https://github.com/cholcombe973/block-utils"}
ceph = "~0.4"
clap = "~2"
Expand All @@ -37,7 +43,9 @@ json = "~0.11"
libatasmart = "~0.1"
libc = "~0.2"
log = "~0.4"
petgraph = "~0.4"
protobuf = "~2"
rayon = "~1.0"
reqwest = "~0.7"
rusqlite = "~0.13"
serde = "~1"
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ If you want to build Bynar:

```
$ curl https://sh.rustup.rs -sSf | sh
$ rustup override set nightly
$ cargo build --release
```
#### Dependencies:
Expand All @@ -93,6 +94,7 @@ $ cargo build --release
3. librados # ceph jewel or higher
4. libatasmart
5. openssl-dev
6. libblkid-dev

## Hard Drive Workflow
Hard drives die all the time as part of the regular cycle of things in servers. Bynar
Expand All @@ -115,6 +117,14 @@ to keep you in the loop.
The time saved here multplies with each piece of hardware replaced and now you
can focus your time and energy on other things. It's a positive snowball effect!

## Testing

Note that root permissions are required for integration testing. The reason
is that the test functions will attempt to create loopback devices, mount them,
check their filesystems etc and all that requires root. The nightly compiler
is also required for testing because mocktopus makes use of features that
haven't landed in stable yet. Run: `sudo ~/.cargo/bin/cargo test -- --nocapture` to test.

## Support and Contributions

If you need support, start by checking the [issues] page.
Expand Down
155 changes: 142 additions & 13 deletions src/in_progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,28 @@
extern crate rusqlite;
extern crate time;

use std::path::Path;
use std::path::{Path, PathBuf};
use std::str::FromStr;

use test_disk;

use self::time::Timespec;
use self::rusqlite::{Connection, Result};
use self::time::Timespec;

#[cfg(test)]
mod tests {
extern crate tempdir;

use self::tempdir::TempDir;
use std::path::Path;

#[test]
fn test_in_progress() {
let temp_dir = Temp::new_dir().expect("mktemp creation failed");
let mut db_file = temp_dir.to_path_buf();
db_file.push("test_db.sqlite3");
let sql_dir = TempDir::new("bynar").expect("Temp file creation failed");
let db_path = sql_dir.path().join("in_progress.sqlite3");

let conn = super::create_repair_database(&db_file).expect("sqlite3 creation failed");
let conn =
super::connect_to_repair_database(&db_path).expect("sqlite3 creation failed");
super::record_new_repair_ticket(&conn, "001", &Path::new("/dev/sda"))
.expect("Create repair ticket failed");
let result = super::is_disk_in_progress(&conn, &Path::new("/dev/sda"))
Expand All @@ -42,12 +48,17 @@ pub struct DiskRepairTicket {
pub fn connect_to_repair_database(db_path: &Path) -> Result<Connection> {
let conn = Connection::open(db_path)?;
debug!("Opening or creating repairs table if needed");
// TODO: should this be broken out into 2 tables,
// 1 for repairs and 1 for state machine?
conn.execute(
"CREATE TABLE if not exists repairs (
id INTEGER PRIMARY KEY,
ticket_id TEXT NOT NULL,
time_created TEXT NOT NULL,
disk_path TEXT NOT NULL)",
ticket_id TEXT,
time_created TEXT,
disk_path TEXT NOT NULL,
smart_passed BOOLEAN,
mount_path TEXT,
state TEXT)",
&[],
)?;
Ok(conn)
Expand Down Expand Up @@ -91,17 +102,18 @@ pub fn is_disk_in_progress(conn: &Connection, dev_path: &Path) -> Result<bool> {
"Searching for repair ticket for disk: {}",
dev_path.display()
);
let mut stmt = conn.prepare(
"SELECT id, ticket_id, time_created, disk_path FROM repairs where disk_path=?",
)?;
let mut stmt = conn
.prepare("SELECT id, ticket_id, time_created, disk_path FROM repairs where disk_path=?")?;
let in_progress = stmt.exists(&[&dev_path.to_string_lossy().into_owned()])?;
Ok(in_progress)
}

/// Gather all the outstanding repair tickets
pub fn get_outstanding_repair_tickets(conn: &Connection) -> Result<Vec<DiskRepairTicket>> {
let mut tickets: Vec<DiskRepairTicket> = Vec::new();
let mut stmt = conn.prepare("SELECT id, ticket_id, time_created, disk_path FROM repairs")?;
let mut stmt = conn.prepare(
"SELECT id, ticket_id, time_created, disk_path FROM repairs where ticket_id IS NOT NULL",
)?;
let repair_iter = stmt.query_map(&[], |row| DiskRepairTicket {
id: row.get(0),
ticket_id: row.get(1),
Expand All @@ -114,3 +126,120 @@ pub fn get_outstanding_repair_tickets(conn: &Connection) -> Result<Vec<DiskRepai
}
Ok(tickets)
}

pub fn get_mount_location(conn: &Connection, dev_path: &Path) -> Result<PathBuf> {
debug!("Searching smart results for disk: {}", dev_path.display());
let mut stmt = conn.prepare("SELECT mount_path FROM repairs where disk_path=?")?;
let mount_path = stmt.query_row(&[&dev_path.to_string_lossy().into_owned()], |row| {
let row: String = row.get(0);
PathBuf::from(row)
})?;
Ok(mount_path)
}

pub fn get_smart_result(conn: &Connection, dev_path: &Path) -> Result<bool> {
debug!("Searching smart results for disk: {}", dev_path.display());
let mut stmt = conn.prepare("SELECT smart_passed FROM repairs where disk_path=?")?;
let passed = stmt.query_row(&[&dev_path.to_string_lossy().into_owned()], |row| row.get(0))?;
Ok(passed)
}

pub fn get_state(conn: &Connection, dev_path: &Path) -> Result<Option<test_disk::State>> {
debug!("Searching state results for disk: {}", dev_path.display());
let mut stmt = conn.prepare("SELECT state FROM repairs where disk_path=?")?;
let state_exists = stmt.exists(&[&dev_path.to_string_lossy().into_owned()])?;
if state_exists {
let mut stmt = conn.prepare("SELECT state FROM repairs where disk_path=?")?;
let state = stmt.query_row(&[&dev_path.to_string_lossy().into_owned()], |row| {
let state: String = row.get(0);
state
})?;
debug!("Found state: {}", state);

return Ok(Some(test_disk::State::from_str(&state).unwrap()));
}

Ok(None)
}

pub fn save_mount_location(conn: &Connection, dev_path: &Path, mount_path: &Path) -> Result<()> {
debug!(
"Saving mount path for {}: {}",
dev_path.display(),
mount_path.display()
);
// First check if a row exists with this disk
let mut stmt = conn.prepare("SELECT * FROM repairs where disk_path=?")?;
match stmt.exists(&[&dev_path.to_string_lossy().into_owned()])? {
true => {
// It exists so we update
let mut stmt = conn.prepare("Update repairs set mount_path=? where disk_path=?")?;
stmt.execute(&[
&mount_path.to_string_lossy().into_owned(),
&dev_path.to_string_lossy().into_owned(),
])?;
}
false => {
// It does not exist so we insert
conn.execute(
"INSERT INTO repairs (mount_path, disk_path)
VALUES (?1, ?2)",
&[
&mount_path.to_string_lossy().into_owned(),
&dev_path.to_string_lossy().into_owned(),
],
)?;
}
}

Ok(())
}

pub fn save_smart_results(conn: &Connection, dev_path: &Path, smart_passed: bool) -> Result<()> {
debug!(
"Saving smart results for {} passed: {}",
dev_path.display(),
smart_passed.to_string()
);
// First check if a row exists with this disk
let mut stmt = conn.prepare("SELECT * FROM repairs where disk_path=?")?;
match stmt.exists(&[&dev_path.to_string_lossy().into_owned()])? {
true => {
// It exists so we update
let mut stmt = conn.prepare("Update repairs set smart_passed=? where disk_path=?")?;
stmt.execute(&[&smart_passed, &dev_path.to_string_lossy().into_owned()])?;
}
false => {
// It does not exist so we insert
let mut stmt =
conn.prepare("Insert INTO repairs (smart_passed, disk_path) VALUES (?1, ?2)")?;
stmt.execute(&[&smart_passed, &dev_path.to_string_lossy().into_owned()])?;
}
}

Ok(())
}

pub fn save_state(conn: &Connection, dev_path: &Path, state: test_disk::State) -> Result<()> {
debug!("Saving state for {}: {}", dev_path.display(), state);

// First check if a row exists with this disk
let mut stmt = conn.prepare("SELECT * FROM repairs where disk_path=?")?;
match stmt.exists(&[&dev_path.to_string_lossy().into_owned()])? {
true => {
debug!("Updating state for {}", dev_path.display());
// It exists so we update
let mut stmt = conn.prepare("Update repairs set state=? where disk_path=?")?;
stmt.execute(&[&state.to_string(), &dev_path.to_string_lossy().into_owned()])?;
}
false => {
debug!("Inserting state for {}", dev_path.display());
// It does not exist so we insert
conn.execute(
"INSERT INTO repairs (state, disk_path) VALUES (?1, ?2)",
&[&state.to_string(), &dev_path.to_string_lossy().into_owned()],
)?;
}
}
Ok(())
}
44 changes: 25 additions & 19 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![cfg_attr(test, feature(test, proc_macro_mod))]
/// Detect dead disks in a ceph cluster
/// 1. Detect dead disk
/// 2. Report dead disk to JIRA for repairs
Expand All @@ -7,6 +8,11 @@ extern crate api;
#[macro_use]
extern crate clap;
extern crate helpers;

#[cfg(test)]
#[macro_use]
extern crate lazy_static;

#[macro_use]
extern crate log;
extern crate protobuf;
Expand All @@ -23,12 +29,13 @@ mod in_progress;
mod test_disk;

use std::fs::File;
use std::io::{Error, ErrorKind, Read};
use std::io::Result as IOResult;
use std::io::{Error, ErrorKind, Read};
use std::path::{Path, PathBuf};

use create_support_ticket::{create_support_ticket, ticket_resolved};
use self::test_disk::State;
use clap::{App, Arg};
use create_support_ticket::{create_support_ticket, ticket_resolved};
use helpers::host_information::Host;
use simplelog::{CombinedLogger, Config, TermLogger, WriteLogger};
use slack_hook::{PayloadBuilder, Slack};
Expand Down Expand Up @@ -108,21 +115,21 @@ fn check_for_failed_disks(config_dir: &str, simulate: bool) -> Result<(), String
);

info!("Checking all drives");
for result in test_disk::check_all_disks().map_err(|e| e.to_string())? {
let conn =
in_progress::connect_to_repair_database(&config_location).map_err(|e| e.to_string())?;
for result in test_disk::check_all_disks(&config_location).map_err(|e| e.to_string())? {
match result {
Ok(status) => {
info!("Disk status: {:?}", status);
Ok(state) => {
info!("Disk status: {:?}", state);
let mut dev_path = PathBuf::from("/dev");
dev_path.push(status.device.name);
dev_path.push(state.disk.name);

if status.corrupted == true && status.repaired == false {
if state.state == State::WaitingForReplacement {
description.push_str(&format!("\nDisk path: {}", dev_path.display()));
if let Some(serial) = status.device.serial_number {
if let Some(serial) = state.disk.serial_number {
description.push_str(&format!("\nDisk serial: {}", serial));
}
info!("Connecting to database to check if disk is in progress");
let conn = in_progress::connect_to_repair_database(&config_location)
.map_err(|e| e.to_string())?;
let in_progress = in_progress::is_disk_in_progress(&conn, &dev_path)
.map_err(|e| e.to_string())?;
if !simulate {
Expand Down Expand Up @@ -208,10 +215,14 @@ fn check_for_failed_disks(config_dir: &str, simulate: bool) -> Result<(), String
debug!("Device is already in the repair queue");
}
}
// Handle the ones that ended up stuck in Fail
} else if state.state == State::Fail {
error!("Disk {} ended in a Fail state", dev_path.display(),);
} else {
// The rest should be State::Good ?
}
}
Err(e) => {
//
error!("check_all_disks failed with error: {:?}", e);
return Err(format!("check_all_disks failed with error: {:?}", e));
}
Expand Down Expand Up @@ -299,20 +310,17 @@ fn main() {
.long("configdir")
.takes_value(true)
.required(false),
)
.arg(
).arg(
Arg::with_name("simulate")
.help("Log messages but take no action")
.long("simulate")
.required(false),
)
.arg(
).arg(
Arg::with_name("v")
.short("v")
.multiple(true)
.help("Sets the level of verbosity"),
)
.get_matches();
).get_matches();
let level = match matches.occurrences_of("v") {
0 => log::LevelFilter::Info, //default
1 => log::LevelFilter::Debug,
Expand All @@ -336,7 +344,6 @@ fn main() {
let simulate = matches.is_present("simulate");
let config_dir = matches.value_of("configdir").unwrap();

//TODO: Get a vault token so I can talk to disk-manager
match check_for_failed_disks(config_dir, simulate) {
Err(e) => {
error!("Check for failed disks failed with error: {}", e);
Expand All @@ -345,7 +352,6 @@ fn main() {
info!("Check for failed disks completed");
}
};
// TODO: Use token here
match add_repaired_disks(config_dir, simulate) {
Err(e) => {
error!("Add repaired disks failed with error: {}", e);
Expand Down
Loading