@@ -32,7 +32,7 @@ use tracing::warn;
32
32
use ulid:: Ulid ;
33
33
34
34
use crate :: {
35
- alerts:: target:: Target ,
35
+ alerts:: { alert_structs :: AlertStateEntry , target:: Target } ,
36
36
catalog:: { manifest:: Manifest , partition_path} ,
37
37
handlers:: http:: {
38
38
modal:: { Metadata , NodeMetadata , NodeType } ,
@@ -49,8 +49,8 @@ use crate::{
49
49
SETTINGS_ROOT_DIRECTORY , STREAM_METADATA_FILE_NAME , STREAM_ROOT_DIRECTORY ,
50
50
TARGETS_ROOT_DIRECTORY ,
51
51
object_storage:: {
52
- alert_json_path, filter_path , manifest_path , parseable_json_path , schema_path ,
53
- stream_json_path, to_bytes,
52
+ alert_json_path, alert_state_json_path , filter_path , manifest_path ,
53
+ parseable_json_path , schema_path , stream_json_path, to_bytes,
54
54
} ,
55
55
} ,
56
56
users:: filters:: { Filter , migrate_v1_v2} ,
@@ -115,6 +115,102 @@ impl Metastore for ObjectStoreMetastore {
115
115
. await ?)
116
116
}
117
117
118
+ /// alerts state
119
+ async fn get_alert_states ( & self ) -> Result < Vec < AlertStateEntry > , MetastoreError > {
120
+ let base_path = RelativePathBuf :: from_iter ( [ ALERTS_ROOT_DIRECTORY ] ) ;
121
+ let alert_state_bytes = self
122
+ . storage
123
+ . get_objects (
124
+ Some ( & base_path) ,
125
+ Box :: new ( |file_name| {
126
+ file_name. starts_with ( "alert_state_" ) && file_name. ends_with ( ".json" )
127
+ } ) ,
128
+ )
129
+ . await ?;
130
+
131
+ let mut alert_states = Vec :: new ( ) ;
132
+ for bytes in alert_state_bytes {
133
+ if let Ok ( entry) = serde_json:: from_slice :: < AlertStateEntry > ( & bytes) {
134
+ alert_states. push ( entry) ;
135
+ }
136
+ }
137
+ Ok ( alert_states)
138
+ }
139
+
140
+ async fn get_alert_state_entry (
141
+ & self ,
142
+ alert_id : & Ulid ,
143
+ ) -> Result < Option < AlertStateEntry > , MetastoreError > {
144
+ let path = alert_state_json_path ( * alert_id) ;
145
+ match self . storage . get_object ( & path) . await {
146
+ Ok ( bytes) => {
147
+ if let Ok ( entry) = serde_json:: from_slice :: < AlertStateEntry > ( & bytes) {
148
+ Ok ( Some ( entry) )
149
+ } else {
150
+ Ok ( None )
151
+ }
152
+ }
153
+ Err ( ObjectStorageError :: NoSuchKey ( _) ) => Ok ( None ) ,
154
+ Err ( e) => Err ( MetastoreError :: ObjectStorageError ( e) ) ,
155
+ }
156
+ }
157
+
158
+ async fn put_alert_state ( & self , obj : & dyn MetastoreObject ) -> Result < ( ) , MetastoreError > {
159
+ let id = Ulid :: from_string ( & obj. get_object_id ( ) ) . map_err ( |e| MetastoreError :: Error {
160
+ status_code : StatusCode :: BAD_REQUEST ,
161
+ message : e. to_string ( ) ,
162
+ flow : "put_alert_state" . into ( ) ,
163
+ } ) ?;
164
+ let path = alert_state_json_path ( id) ;
165
+
166
+ // Parse the new state entry from the MetastoreObject
167
+ let new_state_entry: AlertStateEntry = serde_json:: from_slice ( & to_bytes ( obj) ) ?;
168
+ let new_state = new_state_entry
169
+ . current_state ( )
170
+ . ok_or_else ( || MetastoreError :: InvalidJsonStructure {
171
+ expected : "AlertStateEntry with at least one state" . to_string ( ) ,
172
+ found : "AlertStateEntry with empty states" . to_string ( ) ,
173
+ } ) ?
174
+ . state ;
175
+
176
+ // Try to read existing file
177
+ let mut alert_entry = match self . storage . get_object ( & path) . await {
178
+ Ok ( existing_bytes) => {
179
+ if let Ok ( entry) = serde_json:: from_slice :: < AlertStateEntry > ( & existing_bytes) {
180
+ entry
181
+ } else {
182
+ // Create new entry if parsing fails or file doesn't exist
183
+ AlertStateEntry :: new ( id, new_state)
184
+ }
185
+ }
186
+ Err ( _) => {
187
+ // File doesn't exist, create new entry
188
+ AlertStateEntry :: new ( id, new_state)
189
+ }
190
+ } ;
191
+
192
+ // Update the state and only save if it actually changed
193
+ let state_changed = alert_entry. update_state ( new_state) ;
194
+
195
+ if state_changed {
196
+ let updated_bytes =
197
+ serde_json:: to_vec ( & alert_entry) . map_err ( MetastoreError :: JsonParseError ) ?;
198
+
199
+ self . storage . put_object ( & path, updated_bytes. into ( ) ) . await ?;
200
+ }
201
+
202
+ Ok ( ( ) )
203
+ }
204
+
205
+ /// Delete an alert state file
206
+ async fn delete_alert_state ( & self , obj : & dyn MetastoreObject ) -> Result < ( ) , MetastoreError > {
207
+ let path = obj. get_object_path ( ) ;
208
+ Ok ( self
209
+ . storage
210
+ . delete_object ( & RelativePathBuf :: from ( path) )
211
+ . await ?)
212
+ }
213
+
118
214
/// This function fetches all the llmconfigs from the underlying object store
119
215
async fn get_llmconfigs ( & self ) -> Result < Vec < Bytes > , MetastoreError > {
120
216
let base_path = RelativePathBuf :: from_iter ( [ SETTINGS_ROOT_DIRECTORY , "llmconfigs" ] ) ;
0 commit comments