|
17 | 17 | */ |
18 | 18 |
|
19 | 19 | use std::{ |
20 | | - collections::{BTreeMap, HashSet}, |
| 20 | + collections::HashSet, |
21 | 21 | path::Path, |
22 | 22 | sync::{ |
23 | 23 | Arc, |
@@ -50,13 +50,11 @@ use tracing::error; |
50 | 50 | use url::Url; |
51 | 51 |
|
52 | 52 | use crate::{ |
53 | | - handlers::http::users::USERS_ROOT_DIR, |
54 | 53 | metrics::{ |
55 | 54 | STORAGE_REQUEST_RESPONSE_TIME, increment_files_scanned_in_object_store_calls_by_date, |
56 | 55 | increment_object_store_calls_by_date, |
57 | 56 | }, |
58 | 57 | parseable::LogStream, |
59 | | - storage::STREAM_ROOT_DIRECTORY, |
60 | 58 | }; |
61 | 59 |
|
62 | 60 | use super::{ |
@@ -354,82 +352,6 @@ impl BlobStore { |
354 | 352 | Ok(()) |
355 | 353 | } |
356 | 354 |
|
357 | | - async fn _list_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError> { |
358 | | - let mut result_file_list = HashSet::new(); |
359 | | - let mut total_files_scanned = 0u64; |
360 | | - |
361 | | - let list_start = Instant::now(); |
362 | | - let resp = self.client.list_with_delimiter(None).await?; |
363 | | - let list_elapsed = list_start.elapsed().as_secs_f64(); |
364 | | - total_files_scanned += resp.objects.len() as u64; |
365 | | - STORAGE_REQUEST_RESPONSE_TIME |
366 | | - .with_label_values(&["azure_blob", "LIST", "200"]) |
367 | | - .observe(list_elapsed); |
368 | | - increment_object_store_calls_by_date( |
369 | | - "azure_blob", |
370 | | - "LIST", |
371 | | - &Utc::now().date_naive().to_string(), |
372 | | - ); |
373 | | - |
374 | | - let streams = resp |
375 | | - .common_prefixes |
376 | | - .iter() |
377 | | - .flat_map(|path| path.parts()) |
378 | | - .map(|name| name.as_ref().to_string()) |
379 | | - .filter(|name| name != PARSEABLE_ROOT_DIRECTORY && name != USERS_ROOT_DIR) |
380 | | - .collect::<Vec<_>>(); |
381 | | - |
382 | | - for stream in streams { |
383 | | - let stream_path = |
384 | | - object_store::path::Path::from(format!("{}/{}", &stream, STREAM_ROOT_DIRECTORY)); |
385 | | - |
386 | | - // Track individual LIST operations for each stream |
387 | | - let stream_list_start = Instant::now(); |
388 | | - let resp = self.client.list_with_delimiter(Some(&stream_path)).await; |
389 | | - let stream_list_elapsed = stream_list_start.elapsed().as_secs_f64(); |
390 | | - increment_object_store_calls_by_date( |
391 | | - "azure_blob", |
392 | | - "LIST", |
393 | | - &Utc::now().date_naive().to_string(), |
394 | | - ); |
395 | | - match &resp { |
396 | | - Ok(resp) => { |
397 | | - STORAGE_REQUEST_RESPONSE_TIME |
398 | | - .with_label_values(&["azure_blob", "LIST", "200"]) |
399 | | - .observe(stream_list_elapsed); |
400 | | - |
401 | | - total_files_scanned += resp.objects.len() as u64; |
402 | | - if resp |
403 | | - .objects |
404 | | - .iter() |
405 | | - .any(|name| name.location.filename().unwrap().ends_with("stream.json")) |
406 | | - { |
407 | | - result_file_list.insert(stream); |
408 | | - } |
409 | | - } |
410 | | - Err(err) => { |
411 | | - let status_code = error_to_status_code(err); |
412 | | - STORAGE_REQUEST_RESPONSE_TIME |
413 | | - .with_label_values(&["azure_blob", "LIST", status_code]) |
414 | | - .observe(stream_list_elapsed); |
415 | | - |
416 | | - return Err(ObjectStorageError::UnhandledError(Box::new( |
417 | | - std::io::Error::other(format!("List operation failed: {}", err)), |
418 | | - ))); |
419 | | - } |
420 | | - } |
421 | | - } |
422 | | - |
423 | | - // Record total files scanned across all operations |
424 | | - increment_files_scanned_in_object_store_calls_by_date( |
425 | | - "azure_blob", |
426 | | - "LIST", |
427 | | - total_files_scanned, |
428 | | - &Utc::now().date_naive().to_string(), |
429 | | - ); |
430 | | - Ok(result_file_list) |
431 | | - } |
432 | | - |
433 | 355 | async fn _list_dates(&self, stream: &str) -> Result<Vec<String>, ObjectStorageError> { |
434 | 356 | let list_start = Instant::now(); |
435 | 357 | let resp: Result<object_store::ListResult, object_store::Error> = self |
@@ -478,98 +400,6 @@ impl BlobStore { |
478 | 400 | Ok(dates) |
479 | 401 | } |
480 | 402 |
|
481 | | - async fn _list_manifest_files( |
482 | | - &self, |
483 | | - stream: &str, |
484 | | - ) -> Result<BTreeMap<String, Vec<String>>, ObjectStorageError> { |
485 | | - let mut result_file_list: BTreeMap<String, Vec<String>> = BTreeMap::new(); |
486 | | - let mut total_files_scanned = 0u64; |
487 | | - |
488 | | - // Track initial LIST operation |
489 | | - let list_start = Instant::now(); |
490 | | - let resp = self |
491 | | - .client |
492 | | - .list_with_delimiter(Some(&(stream.into()))) |
493 | | - .await; |
494 | | - let list_elapsed = list_start.elapsed().as_secs_f64(); |
495 | | - increment_object_store_calls_by_date( |
496 | | - "azure_blob", |
497 | | - "LIST", |
498 | | - &Utc::now().date_naive().to_string(), |
499 | | - ); |
500 | | - let resp = match resp { |
501 | | - Ok(resp) => { |
502 | | - total_files_scanned += resp.objects.len() as u64; |
503 | | - STORAGE_REQUEST_RESPONSE_TIME |
504 | | - .with_label_values(&["azure_blob", "LIST", "200"]) |
505 | | - .observe(list_elapsed); |
506 | | - |
507 | | - resp |
508 | | - } |
509 | | - Err(err) => { |
510 | | - let status_code = error_to_status_code(&err); |
511 | | - STORAGE_REQUEST_RESPONSE_TIME |
512 | | - .with_label_values(&["azure_blob", "LIST", status_code]) |
513 | | - .observe(list_elapsed); |
514 | | - return Err(err.into()); |
515 | | - } |
516 | | - }; |
517 | | - |
518 | | - let dates = resp |
519 | | - .common_prefixes |
520 | | - .iter() |
521 | | - .flat_map(|path| path.parts()) |
522 | | - .filter(|name| name.as_ref() != stream && name.as_ref() != STREAM_ROOT_DIRECTORY) |
523 | | - .map(|name| name.as_ref().to_string()) |
524 | | - .collect::<Vec<_>>(); |
525 | | - |
526 | | - for date in dates { |
527 | | - let date_path = object_store::path::Path::from(format!("{}/{}", stream, &date)); |
528 | | - |
529 | | - // Track individual LIST operation for each date |
530 | | - let date_list_start = Instant::now(); |
531 | | - let resp = self.client.list_with_delimiter(Some(&date_path)).await; |
532 | | - let date_list_elapsed = date_list_start.elapsed().as_secs_f64(); |
533 | | - increment_object_store_calls_by_date( |
534 | | - "azure_blob", |
535 | | - "LIST", |
536 | | - &Utc::now().date_naive().to_string(), |
537 | | - ); |
538 | | - match resp { |
539 | | - Ok(resp) => { |
540 | | - STORAGE_REQUEST_RESPONSE_TIME |
541 | | - .with_label_values(&["azure_blob", "LIST", "200"]) |
542 | | - .observe(date_list_elapsed); |
543 | | - |
544 | | - total_files_scanned += resp.objects.len() as u64; |
545 | | - let manifests: Vec<String> = resp |
546 | | - .objects |
547 | | - .iter() |
548 | | - .filter(|name| name.location.filename().unwrap().ends_with("manifest.json")) |
549 | | - .map(|name| name.location.to_string()) |
550 | | - .collect(); |
551 | | - result_file_list.entry(date).or_default().extend(manifests); |
552 | | - } |
553 | | - Err(err) => { |
554 | | - let status_code = error_to_status_code(&err); |
555 | | - STORAGE_REQUEST_RESPONSE_TIME |
556 | | - .with_label_values(&["azure_blob", "LIST", status_code]) |
557 | | - .observe(date_list_elapsed); |
558 | | - return Err(err.into()); |
559 | | - } |
560 | | - } |
561 | | - } |
562 | | - |
563 | | - // Record total files scanned across all date operations |
564 | | - increment_files_scanned_in_object_store_calls_by_date( |
565 | | - "azure_blob", |
566 | | - "LIST", |
567 | | - total_files_scanned, |
568 | | - &Utc::now().date_naive().to_string(), |
569 | | - ); |
570 | | - Ok(result_file_list) |
571 | | - } |
572 | | - |
573 | 403 | async fn _upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { |
574 | 404 | let bytes = tokio::fs::read(path).await?; |
575 | 405 |
|
|
0 commit comments