Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move axiom-rs to use tabular results for queries #62

Merged
merged 6 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ futures-util = "0.3"
httpmock = "0.7"
structopt = "0.3"
tracing-subscriber = { version = "0.3", features = ["ansi", "env-filter"] }
cli-table = "0.4.9"

[features]
default = ["tokio", "default-tls"]
Expand Down
47 changes: 29 additions & 18 deletions examples/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use axiom_rs::{
datasets::{ContentEncoding, ContentType},
Client,
};
use std::time::Duration;
use cli_table::{Cell as _, Style as _, Table as _};
use structopt::StructOpt;
use tokio::io::{stdin, AsyncReadExt};

Expand Down Expand Up @@ -37,13 +37,6 @@ enum Datasets {
},
/// Delete a dataset
Delete { name: String },
/// Trim a dataset
Trim {
name: String,

#[structopt(long)]
seconds: u64,
},
/// Ingest into a dataset from stdin.
Ingest {
name: String,
Expand Down Expand Up @@ -73,19 +66,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("{:?}", dataset);
}),
Datasets::Get { name } => println!("{:?}", client.datasets().get(&name).await?),
// Datasets::Info { name } => println!("{:?}", client.datasets().info(&name).await?),
Datasets::Update { name, description } => {
let dataset = client.datasets().update(&name, description).await?;
println!("{:?}", dataset);
}
Datasets::Delete { name } => client.datasets().delete(&name).await?,
Datasets::Trim { name, seconds } => println!(
"{:?}",
client
.datasets()
.trim(&name, Duration::from_secs(seconds))
.await?
),
Datasets::Ingest {
name,
content_type,
Expand All @@ -99,8 +84,34 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("{:?}", ingest_status);
}
Datasets::Query { apl } => {
let result = client.query(apl, None).await?;
println!("{:?}", result);
let result = client.query(&apl, None).await?;
for table in result.tables {
println!("{}:", table.name());

let rows_iter = table.iter();
let mut rows = Vec::with_capacity(rows_iter.size_hint().0);
for row in rows_iter {
let field_iter = row.iter();
let mut row_vec = Vec::with_capacity(field_iter.size_hint().0);
for field in field_iter {
row_vec.push(field.map_or_else(
|| "-".to_string(),
|v| serde_json::to_string(v).unwrap(),
));
}
rows.push(row_vec);
}

let mut fields = Vec::with_capacity(table.fields().len());
for field in table.fields() {
fields.push(field.name().to_string().cell().bold(true));
}

let t = rows.table().title(fields).bold(true);

let table_display = t.display().unwrap();
println!("{}", table_display);
}
}
},
Opt::Users(users) => match users {
Expand Down
75 changes: 7 additions & 68 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use tracing::instrument;
use crate::{
annotations,
datasets::{
self, ContentEncoding, ContentType, IngestStatus, LegacyQuery, LegacyQueryOptions,
LegacyQueryResult, Query, QueryOptions, QueryParams, QueryResult,
self, ContentEncoding, ContentType, IngestStatus, Query, QueryOptions, QueryParams,
QueryResult,
},
error::{Error, Result},
http::{self, HeaderMap},
Expand Down Expand Up @@ -107,37 +107,14 @@ impl Client {
/// Executes the given query specified using the Axiom Processing Language (APL).
/// To learn more about APL, see the APL documentation at https://www.axiom.co/docs/apl/introduction.
#[instrument(skip(self, opts))]
pub async fn query<S, O>(&self, apl: S, opts: O) -> Result<QueryResult>
pub async fn query<S, O>(&self, apl: &S, opts: O) -> Result<QueryResult>
where
S: Into<String> + FmtDebug,
S: ToString + FmtDebug + ?Sized,
O: Into<Option<QueryOptions>>,
{
let (req, query_params) = match opts.into() {
Some(opts) => {
let req = Query {
apl: apl.into(),
start_time: opts.start_time,
end_time: opts.end_time,
cursor: opts.cursor,
include_cursor: opts.include_cursor,
};

let query_params = QueryParams {
no_cache: opts.no_cache,
save: opts.save,
format: opts.format,
};

(req, query_params)
}
None => (
Query {
apl: apl.into(),
..Default::default()
},
QueryParams::default(),
),
};
let opts: QueryOptions = opts.into().unwrap_or_default();
let query_params = QueryParams::from(&opts);
let req = Query::new(apl, opts);

let query_params = serde_qs::to_string(&query_params)?;
let path = format!("/v1/datasets/_apl?{query_params}");
Expand All @@ -157,44 +134,6 @@ impl Client {
Ok(result)
}

/// Execute the given query on the dataset identified by its id.
#[instrument(skip(self, opts))]
#[deprecated(
since = "0.6.0",
note = "The legacy query will be removed in future versions, use `apl_query` instead"
)]
pub async fn query_legacy<N, O>(
&self,
dataset_name: N,
query: LegacyQuery,
opts: O,
) -> Result<LegacyQueryResult>
where
N: Into<String> + FmtDebug,
O: Into<Option<LegacyQueryOptions>>,
{
let path = format!(
"/v1/datasets/{}/query?{}",
dataset_name.into(),
&opts
.into()
.map_or_else(|| Ok(String::new()), |opts| { serde_qs::to_string(&opts) })?
);
let res = self.http_client.post(path, &query).await?;

let saved_query_id = res
.headers()
.get("X-Axiom-History-Query-Id")
.map(|s| s.to_str())
.transpose()
.map_err(|_e| Error::InvalidQueryId)?
.map(std::string::ToString::to_string);
let mut result = res.json::<LegacyQueryResult>().await?;
result.saved_query_id = saved_query_id;

Ok(result)
}

/// Ingest events into the dataset identified by its id.
/// Restrictions for field names (JSON object keys) can be reviewed here:
/// <https://www.axiom.co/docs/usage/field-restrictions>.
Expand Down
29 changes: 2 additions & 27 deletions src/datasets/client.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
#[allow(deprecated)]
use crate::{
datasets::model::{
Dataset, DatasetCreateRequest, DatasetUpdateRequest, Info, TrimRequest, TrimResult,
},
datasets::model::{Dataset, DatasetCreateRequest, DatasetUpdateRequest, Info},
error::{Error, Result},
http,
};
use std::{
convert::{TryFrom, TryInto},
fmt::Debug as FmtDebug,
result::Result as StdResult,
convert::TryFrom, fmt::Debug as FmtDebug, result::Result as StdResult,
time::Duration as StdDuration,
};
use tracing::instrument;
Expand Down Expand Up @@ -93,27 +89,6 @@ impl<'client> Client<'client> {
self.http_client.get("/v1/datasets").await?.json().await
}

/// Trim the dataset identified by its id to a given length.
/// The max duration given will mark the oldest timestamp an event can have.
/// Older ones will be deleted from the dataset.
/// The duration can either be a [`std::time::Duration`] or a
/// [`chrono::Duration`].
#[instrument(skip(self))]
#[allow(deprecated)]
pub async fn trim<N, D>(&self, dataset_name: N, duration: D) -> Result<TrimResult>
where
N: Into<String> + FmtDebug,
D: TryInto<Duration, Error = Error> + FmtDebug,
{
let duration = duration.try_into()?;
let req = TrimRequest::new(duration.into());
self.http_client
.post(format!("/v1/datasets/{}/trim", dataset_name.into()), &req)
.await?
.json()
.await
}

/// Update a dataset.
#[instrument(skip(self))]
pub async fn update<N, D>(&self, dataset_name: N, new_description: D) -> Result<Dataset>
Expand Down
Loading
Loading