@@ -27,7 +27,7 @@ use once_cell::sync::Lazy;
27
27
use serde_json:: Error as SerdeError ;
28
28
use std:: collections:: { HashMap , HashSet } ;
29
29
use std:: fmt:: { self , Display } ;
30
- use tokio:: sync:: oneshot:: { Receiver , Sender } ;
30
+ use tokio:: sync:: oneshot:: { self , Receiver , Sender } ;
31
31
use tokio:: sync:: RwLock ;
32
32
use tokio:: task:: JoinHandle ;
33
33
use tracing:: { trace, warn} ;
@@ -36,7 +36,7 @@ use ulid::Ulid;
36
36
pub mod alerts_utils;
37
37
pub mod target;
38
38
39
- use crate :: option :: CONFIG ;
39
+ use crate :: parseable :: PARSEABLE ;
40
40
use crate :: query:: { TableScanVisitor , QUERY_SESSION } ;
41
41
use crate :: rbac:: map:: SessionKey ;
42
42
use crate :: storage;
@@ -650,8 +650,8 @@ impl AlertConfig {
650
650
fn get_context ( & self ) -> Context {
651
651
let deployment_instance = format ! (
652
652
"{}://{}" ,
653
- CONFIG . options. get_scheme( ) ,
654
- CONFIG . options. address
653
+ PARSEABLE . options. get_scheme( ) ,
654
+ PARSEABLE . options. address
655
655
) ;
656
656
let deployment_id = storage:: StorageMetadata :: global ( ) . deployment_id ;
657
657
let deployment_mode = storage:: StorageMetadata :: global ( ) . mode . to_string ( ) ;
@@ -730,13 +730,20 @@ impl Alerts {
730
730
/// Loads alerts from disk, blocks
731
731
pub async fn load ( & self ) -> Result < ( ) , AlertError > {
732
732
let mut map = self . alerts . write ( ) . await ;
733
- let store = CONFIG . storage ( ) . get_object_store ( ) ;
733
+ let store = PARSEABLE . storage . get_object_store ( ) ;
734
734
735
735
for alert in store. get_alerts ( ) . await . unwrap_or_default ( ) {
736
- let ( handle, rx, tx) =
737
- schedule_alert_task ( alert. get_eval_frequency ( ) , alert. clone ( ) ) . await ?;
738
-
739
- self . update_task ( alert. id , handle, rx, tx) . await ;
736
+ let ( outbox_tx, outbox_rx) = oneshot:: channel :: < ( ) > ( ) ;
737
+ let ( inbox_tx, inbox_rx) = oneshot:: channel :: < ( ) > ( ) ;
738
+ let handle = schedule_alert_task (
739
+ alert. get_eval_frequency ( ) ,
740
+ alert. clone ( ) ,
741
+ inbox_rx,
742
+ outbox_tx,
743
+ ) ?;
744
+
745
+ self . update_task ( alert. id , handle, outbox_rx, inbox_tx)
746
+ . await ;
740
747
741
748
map. insert ( alert. id , alert) ;
742
749
}
@@ -785,7 +792,7 @@ impl Alerts {
785
792
new_state : AlertState ,
786
793
trigger_notif : Option < String > ,
787
794
) -> Result < ( ) , AlertError > {
788
- let store = CONFIG . storage ( ) . get_object_store ( ) ;
795
+ let store = PARSEABLE . storage . get_object_store ( ) ;
789
796
790
797
// read and modify alert
791
798
let mut alert = self . get_alert_by_id ( alert_id) . await ?;
0 commit comments