Skip to content

Commit

Permalink
Merge pull request #121 from IBM/scheduler-fix
Browse files Browse the repository at this point in the history
Scheduler fix
  • Loading branch information
No9 authored Dec 28, 2022
2 parents 07a6d3d + 869c90e commit e3efed8
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions charts/core-dump-handler/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ replicaCount: 1
image:
registry: quay.io
repository: icdh/core-dump-handler
tag: v8.8.0
tag: scheduler-fix
pullPolicy: Always
pullSecrets: []
request_mem: "64Mi"
Expand Down Expand Up @@ -41,8 +41,8 @@ daemonset:
suidDumpable: 2
vendor: default
# interval: 60000
# schedule: "1/60 * * * * *"
useINotify: true
schedule: "1/1 * * * * *"
# useINotify: true
deployCrioConfig: false
includeCrioExe: false
# S3 access
Expand Down
45 changes: 25 additions & 20 deletions core-dump-agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::process;
use std::process::Command;
use std::time::Duration;
use thiserror::Error;
use tokio::runtime::Handle;
use tokio_cron_scheduler::{Job, JobScheduler};

#[allow(dead_code)]
Expand Down Expand Up @@ -59,7 +58,7 @@ async fn main() -> Result<(), anyhow::Error> {

env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let host_dir = env::var("HOST_DIR").unwrap_or_else(|_| DEFAULT_BASE_DIR.to_string());
let core_dir = env::var("CORE_DIR").unwrap_or_else(|_| DEFAULT_CORE_DIR.to_string());
let core_dir_command = env::var("CORE_DIR").unwrap_or_else(|_| DEFAULT_CORE_DIR.to_string());
let suid = env::var("SUID_DUMPABLE").unwrap_or_else(|_| DEFAULT_SUID_DUMPABLE.to_string());
let deploy_crio_config = env::var("DEPLOY_CRIO_CONFIG")
.unwrap_or_else(|_| "false".to_string())
Expand Down Expand Up @@ -94,9 +93,8 @@ async fn main() -> Result<(), anyhow::Error> {
info!("Uploading {}", file);
process_file(p, &bucket).await;
} else {
let core_store = core_dir.clone();
info!("Uploading all content in {}", core_store);
run_polling_agent(core_store.as_str()).await;
info!("Uploading all content in {}", core_dir_command);
run_polling_agent().await;
}
process::exit(0);
}
Expand All @@ -119,7 +117,7 @@ async fn main() -> Result<(), anyhow::Error> {
format!("{}/core_pattern.bak", host_location).as_str(),
format!(
"|{}/{} -c=%c -e=%e -p=%p -s=%s -t=%t -d={} -h=%h -E=%E",
host_location, CDC_NAME, core_dir
host_location, CDC_NAME, core_dir_command
)
.as_str(),
)?;
Expand All @@ -135,8 +133,6 @@ async fn main() -> Result<(), anyhow::Error> {
&suid,
)?;

let core_location = core_dir.clone();

create_env_file(host_location)?;
// Run polling agent on startup to clean up files.

Expand All @@ -155,7 +151,7 @@ async fn main() -> Result<(), anyhow::Error> {
std::thread::sleep(Duration::from_millis(1000));
}
} else {
run_polling_agent(core_location.as_str()).await;
run_polling_agent().await;
}

if !interval.is_empty() && !schedule.is_empty() {
Expand All @@ -180,7 +176,6 @@ async fn main() -> Result<(), anyhow::Error> {
}
}

let notify_location = core_location.clone();
if !schedule.is_empty() {
info!("Schedule Initialising with: {}", schedule);
let sched = match JobScheduler::new().await {
Expand All @@ -190,12 +185,18 @@ async fn main() -> Result<(), anyhow::Error> {
panic!("Schedule Creation Failed with {}", e)
}
};
let s_job = match Job::new(schedule.as_str(), move |_uuid, _l| {
let handle = Handle::current();
let core_str = core_location.clone();
handle.spawn(async move {
run_polling_agent(&core_str).await;
});

let s_job = match Job::new_async(schedule.as_str(), move |uuid, mut l| {
Box::pin(async move {
let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => {
info!("Next scheduled run {:?}", ts);
run_polling_agent().await;
}
_ => warn!("Could not get next tick for job"),
}
})
}) {
Ok(v) => v,
Err(e) => {
Expand All @@ -218,6 +219,9 @@ async fn main() -> Result<(), anyhow::Error> {
panic!("Schedule Start failed, {:#?}", e);
}
};
loop {
std::thread::sleep(Duration::from_millis(100));
}
}

if use_inotify == "true" {
Expand All @@ -231,14 +235,14 @@ async fn main() -> Result<(), anyhow::Error> {
}
};
info!("INotify Initialised...");
match inotify.add_watch(&notify_location, WatchMask::CLOSE) {
match inotify.add_watch(&core_dir_command, WatchMask::CLOSE) {
Ok(_) => {}
Err(e) => {
error!("Add watch failed: {}", e);
panic!("Add watch failed: {}", e)
}
};
info!("INotify watching : {}", notify_location);
info!("INotify watching : {}", core_dir_command);
let mut buffer = [0; 4096];
loop {
let events = match inotify.read_events_blocking(&mut buffer) {
Expand All @@ -264,7 +268,7 @@ async fn main() -> Result<(), anyhow::Error> {
Some(s) => {
let file = format!(
"{}/{}",
notify_location,
core_dir_command,
s.to_str().unwrap_or_default()
);
let p = Path::new(&file);
Expand Down Expand Up @@ -389,7 +393,8 @@ fn get_bucket() -> Result<Bucket, anyhow::Error> {
Ok(Bucket::new(&s3.bucket, s3.region, s3.credentials)?.with_path_style())
}

async fn run_polling_agent(core_location: &str) {
async fn run_polling_agent() {
let core_location = env::var("CORE_DIR").unwrap_or_else(|_| DEFAULT_CORE_DIR.to_string());
info!("Executing Agent with location : {}", core_location);

let bucket = match get_bucket() {
Expand Down
3 changes: 2 additions & 1 deletion integration/run-ibm.sh
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#! /bin/bash

cd ../
set -a
export $(grep -v '^#' .env | xargs)

set +a
cd ./charts/core-dump-handler

helm install core-dump-handler . --create-namespace --namespace observe \
Expand Down

0 comments on commit e3efed8

Please sign in to comment.