Skip to content

Commit

Permalink
feat: allow stream creation from ingestor in distributed deployments (#…
Browse files Browse the repository at this point in the history
…980)

Co-authored-by: Akshat Agarwal <hey@akshat.dev>
  • Loading branch information
Anirudhxx and akshatagarwl authored Nov 12, 2024
1 parent d361b69 commit 46686dc
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 11 deletions.
72 changes: 71 additions & 1 deletion server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::metrics::prom_utils::Metrics;
use crate::rbac::role::model::DefaultPrivilege;
use crate::rbac::user::User;
use crate::stats::Stats;
use crate::storage::get_staging_metadata;
use crate::storage::object_storage::ingestor_metadata_path;
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY};
Expand Down Expand Up @@ -64,6 +65,7 @@ pub async fn sync_streams_with_ingestors(
headers: HeaderMap,
body: Bytes,
stream_name: &str,
skip_ingestor: Option<String>,
) -> Result<(), StreamError> {
let mut reqwest_headers = http_header::HeaderMap::new();

Expand All @@ -76,7 +78,16 @@ pub async fn sync_streams_with_ingestors(
})?;

let client = reqwest::Client::new();
for ingestor in ingestor_infos.iter() {

let final_ingestor_infos = match skip_ingestor {
None => ingestor_infos,
Some(skip_ingestor) => ingestor_infos
.into_iter()
.filter(|ingestor| ingestor.domain_name != to_url_string(skip_ingestor.clone()))
.collect::<Vec<IngestorMetadata>>(),
};

for ingestor in final_ingestor_infos {
if !utils::check_liveness(&ingestor.domain_name).await {
log::warn!("Ingestor {} is not live", ingestor.domain_name);
continue;
Expand Down Expand Up @@ -841,3 +852,62 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {

Ok(())
}

pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), StreamError> {
let client = reqwest::Client::new();

let staging_metadata = get_staging_metadata().unwrap().ok_or_else(|| {
StreamError::Anyhow(anyhow::anyhow!("Failed to retrieve staging metadata"))
})?;
let querier_endpoint = to_url_string(staging_metadata.querier_endpoint.unwrap());
let token = staging_metadata.querier_auth_token.unwrap();

if !check_liveness(&querier_endpoint).await {
log::warn!("Querier {} is not live", querier_endpoint);
return Err(StreamError::Anyhow(anyhow::anyhow!("Querier is not live")));
}

let url = format!(
"{}{}/logstream/{}?skip_ingestors={}",
querier_endpoint,
base_path_without_preceding_slash(),
stream_name,
CONFIG.parseable.ingestor_endpoint,
);

let response = client
.put(&url)
.header(header::AUTHORIZATION, &token)
.send()
.await
.map_err(|err| {
log::error!(
"Fatal: failed to forward create stream request to querier: {}\n Error: {:?}",
&url,
err
);
StreamError::Network(err)
})?;

let status = response.status();

if !status.is_success() {
let response_text = response.text().await.map_err(|err| {
log::error!("Failed to read response text from querier: {}", &url);
StreamError::Network(err)
})?;

log::error!(
"Failed to forward create stream request to querier: {}\nResponse Returned: {:?}",
&url,
response_text
);

return Err(StreamError::Anyhow(anyhow::anyhow!(
"Request failed with status: {}",
status,
)));
}

Ok(())
}
16 changes: 11 additions & 5 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::event::{
error::EventError,
format::{self, EventFormat},
};
use crate::handlers::http::cluster::forward_create_stream_request;
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
use crate::localcache::CacheError;
use crate::metadata::error::stream_info::MetadataError;
Expand Down Expand Up @@ -210,11 +211,16 @@ pub async fn create_stream_if_not_exists(
if !streams.contains(&LogStream {
name: stream_name.to_owned(),
}) {
log::error!("Stream {} not found", stream_name);
return Err(PostError::Invalid(anyhow::anyhow!(
"Stream `{}` not found. Please create it using the Query server.",
stream_name
)));
match forward_create_stream_request(stream_name).await {
Ok(()) => log::info!("Stream {} created", stream_name),
Err(e) => {
return Err(PostError::Invalid(anyhow::anyhow!(
"Unable to create stream: {} using query server. Error: {}",
stream_name,
e.to_string(),
)))
}
};
}
metadata::STREAM_INFO
.upsert_stream_info(
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
);
sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?;
sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME, None).await?;
}
Ok(())
}
Expand Down
20 changes: 18 additions & 2 deletions server/src/handlers/http/modal/query/querier_logstream.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use core::str;
use std::fs;

use actix_web::{web, HttpRequest, Responder};
use bytes::Bytes;
use chrono::Utc;
use http::StatusCode;
use serde::Deserialize;
use tokio::sync::Mutex;

static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(());

use crate::{
event,
Expand Down Expand Up @@ -74,11 +79,22 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
}

pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder, StreamError> {
#[derive(Deserialize)]
pub struct PutStreamQuery {
skip_ingestors: Option<String>,
}

pub async fn put_stream(
req: HttpRequest,
body: Bytes,
info: web::Query<PutStreamQuery>,
) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

let _ = CREATE_STREAM_LOCK.lock().await;
let headers = create_update_stream(&req, &body, &stream_name).await?;
sync_streams_with_ingestors(headers, body, &stream_name).await?;

sync_streams_with_ingestors(headers, body, &stream_name, info.skip_ingestors.clone()).await?;

Ok(("Log stream created", StatusCode::OK))
}
Expand Down
4 changes: 4 additions & 0 deletions server/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ pub async fn run_metadata_migration(
let metadata = metadata_migration::v3_v4(storage_metadata);
put_remote_metadata(&*object_store, &metadata).await?;
}
Some("v4") => {
let metadata = metadata_migration::v4_v5(storage_metadata);
put_remote_metadata(&*object_store, &metadata).await?;
}
_ => (),
}
}
Expand Down
50 changes: 50 additions & 0 deletions server/src/migration/metadata_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*
*/

use base64::Engine;
use rand::distributions::DistString;
use serde_json::{Map, Value as JsonValue};

Expand Down Expand Up @@ -148,6 +149,55 @@ pub fn v3_v4(mut storage_metadata: JsonValue) -> JsonValue {
storage_metadata
}

// maybe rename
pub fn v4_v5(mut storage_metadata: JsonValue) -> JsonValue {
let metadata = storage_metadata.as_object_mut().unwrap();
metadata.remove_entry("version");
metadata.insert("version".to_string(), JsonValue::String("v5".to_string()));

match metadata.get("server_mode") {
None => {
metadata.insert(
"server_mode".to_string(),
JsonValue::String(CONFIG.parseable.mode.to_string()),
);
}
Some(JsonValue::String(mode)) => match mode.as_str() {
"Query" => {
metadata.insert(
"querier_endpoint".to_string(),
JsonValue::String(CONFIG.parseable.address.clone()),
);
}
"All" => {
metadata.insert(
"server_mode".to_string(),
JsonValue::String(CONFIG.parseable.mode.to_string()),
);
metadata.insert(
"querier_endpoint".to_string(),
JsonValue::String(CONFIG.parseable.address.clone()),
);
}
_ => (),
},
_ => (),
}

metadata.insert(
"querier_auth_token".to_string(),
JsonValue::String(format!(
"Basic {}",
base64::prelude::BASE64_STANDARD.encode(format!(
"{}:{}",
CONFIG.parseable.username, CONFIG.parseable.password
))
)),
);

storage_metadata
}

pub async fn migrate_ingester_metadata() -> anyhow::Result<Option<IngestorMetadata>> {
let imp = ingestor_metadata_path(None);
let bytes = match CONFIG.storage().get_object_store().get_object(&imp).await {
Expand Down
3 changes: 2 additions & 1 deletion server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ pub use localfs::FSConfig;
pub use object_storage::{ObjectStorage, ObjectStorageProvider};
pub use s3::S3Config;
pub use store_metadata::{
put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata,
get_staging_metadata, put_remote_metadata, put_staging_metadata, resolve_parseable_metadata,
StorageMetadata,
};

// metadata file names in a Stream prefix
Expand Down
23 changes: 22 additions & 1 deletion server/src/storage/store_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
path::PathBuf,
};

use base64::Engine;
use bytes::Bytes;
use once_cell::sync::OnceCell;
use relative_path::RelativePathBuf;
Expand Down Expand Up @@ -63,10 +64,29 @@ pub struct StorageMetadata {
pub roles: HashMap<String, Vec<DefaultPrivilege>>,
#[serde(default)]
pub default_role: Option<String>,
pub querier_endpoint: Option<String>,
pub querier_auth_token: Option<String>,
}

impl StorageMetadata {
pub fn new() -> Self {
let (querier_endpoint, querier_auth_token) = match CONFIG.parseable.mode {
Mode::All | Mode::Query => {
let querier_auth_token = format!(
"Basic {}",
base64::prelude::BASE64_STANDARD.encode(format!(
"{}:{}",
CONFIG.parseable.username, CONFIG.parseable.password
))
);
(
Some(CONFIG.parseable.address.clone()),
Some(querier_auth_token),
)
}
Mode::Ingest => (None, None),
};

Self {
version: CURRENT_STORAGE_METADATA_VERSION.to_string(),
mode: CONFIG.storage_name.to_owned(),
Expand All @@ -78,9 +98,10 @@ impl StorageMetadata {
streams: Vec::new(),
roles: HashMap::default(),
default_role: None,
querier_endpoint,
querier_auth_token,
}
}

pub fn global() -> &'static StaticStorageMetadata {
STORAGE_METADATA
.get()
Expand Down

0 comments on commit 46686dc

Please sign in to comment.