diff --git a/Cargo.toml b/Cargo.toml index 9a9c7d5..79acaaf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ futures-util = "0.3" httpmock = "0.7" structopt = "0.3" tracing-subscriber = { version = "0.3", features = ["ansi", "env-filter"] } +cli-table = "0.4.9" [features] default = ["tokio", "default-tls"] diff --git a/examples/cli.rs b/examples/cli.rs index 5ac7bfc..de8f19e 100644 --- a/examples/cli.rs +++ b/examples/cli.rs @@ -2,7 +2,7 @@ use axiom_rs::{ datasets::{ContentEncoding, ContentType}, Client, }; -use std::time::Duration; +use cli_table::{Cell as _, Style as _, Table as _}; use structopt::StructOpt; use tokio::io::{stdin, AsyncReadExt}; @@ -37,13 +37,6 @@ enum Datasets { }, /// Delete a dataset Delete { name: String }, - /// Trim a dataset - Trim { - name: String, - - #[structopt(long)] - seconds: u64, - }, /// Ingest into a dataset from stdin. Ingest { name: String, @@ -73,19 +66,11 @@ async fn main() -> Result<(), Box> { println!("{:?}", dataset); }), Datasets::Get { name } => println!("{:?}", client.datasets().get(&name).await?), - // Datasets::Info { name } => println!("{:?}", client.datasets().info(&name).await?), Datasets::Update { name, description } => { let dataset = client.datasets().update(&name, description).await?; println!("{:?}", dataset); } Datasets::Delete { name } => client.datasets().delete(&name).await?, - Datasets::Trim { name, seconds } => println!( - "{:?}", - client - .datasets() - .trim(&name, Duration::from_secs(seconds)) - .await? - ), Datasets::Ingest { name, content_type, @@ -99,8 +84,34 @@ async fn main() -> Result<(), Box> { println!("{:?}", ingest_status); } Datasets::Query { apl } => { - let result = client.query(apl, None).await?; - println!("{:?}", result); + let result = client.query(&apl, None).await?; + for table in result.tables { + println!("{}:", table.name()); + + let rows_iter = table.iter(); + let mut rows = Vec::with_capacity(rows_iter.size_hint().0); + for row in rows_iter { + let field_iter = row.iter(); + let mut row_vec = Vec::with_capacity(field_iter.size_hint().0); + for field in field_iter { + row_vec.push(field.map_or_else( + || "-".to_string(), + |v| serde_json::to_string(v).unwrap(), + )); + } + rows.push(row_vec); + } + + let mut fields = Vec::with_capacity(table.fields().len()); + for field in table.fields() { + fields.push(field.name().to_string().cell().bold(true)); + } + + let t = rows.table().title(fields).bold(true); + + let table_display = t.display().unwrap(); + println!("{}", table_display); + } } }, Opt::Users(users) => match users { diff --git a/src/client.rs b/src/client.rs index 50ef156..d878e5e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -18,8 +18,8 @@ use tracing::instrument; use crate::{ annotations, datasets::{ - self, ContentEncoding, ContentType, IngestStatus, LegacyQuery, LegacyQueryOptions, - LegacyQueryResult, Query, QueryOptions, QueryParams, QueryResult, + self, ContentEncoding, ContentType, IngestStatus, Query, QueryOptions, QueryParams, + QueryResult, }, error::{Error, Result}, http::{self, HeaderMap}, @@ -107,37 +107,14 @@ impl Client { /// Executes the given query specified using the Axiom Processing Language (APL). /// To learn more about APL, see the APL documentation at https://www.axiom.co/docs/apl/introduction. #[instrument(skip(self, opts))] - pub async fn query(&self, apl: S, opts: O) -> Result + pub async fn query(&self, apl: &S, opts: O) -> Result where - S: Into + FmtDebug, + S: ToString + FmtDebug + ?Sized, O: Into>, { - let (req, query_params) = match opts.into() { - Some(opts) => { - let req = Query { - apl: apl.into(), - start_time: opts.start_time, - end_time: opts.end_time, - cursor: opts.cursor, - include_cursor: opts.include_cursor, - }; - - let query_params = QueryParams { - no_cache: opts.no_cache, - save: opts.save, - format: opts.format, - }; - - (req, query_params) - } - None => ( - Query { - apl: apl.into(), - ..Default::default() - }, - QueryParams::default(), - ), - }; + let opts: QueryOptions = opts.into().unwrap_or_default(); + let query_params = QueryParams::from(&opts); + let req = Query::new(apl, opts); let query_params = serde_qs::to_string(&query_params)?; let path = format!("/v1/datasets/_apl?{query_params}"); @@ -157,44 +134,6 @@ impl Client { Ok(result) } - /// Execute the given query on the dataset identified by its id. - #[instrument(skip(self, opts))] - #[deprecated( - since = "0.6.0", - note = "The legacy query will be removed in future versions, use `apl_query` instead" - )] - pub async fn query_legacy( - &self, - dataset_name: N, - query: LegacyQuery, - opts: O, - ) -> Result - where - N: Into + FmtDebug, - O: Into>, - { - let path = format!( - "/v1/datasets/{}/query?{}", - dataset_name.into(), - &opts - .into() - .map_or_else(|| Ok(String::new()), |opts| { serde_qs::to_string(&opts) })? - ); - let res = self.http_client.post(path, &query).await?; - - let saved_query_id = res - .headers() - .get("X-Axiom-History-Query-Id") - .map(|s| s.to_str()) - .transpose() - .map_err(|_e| Error::InvalidQueryId)? - .map(std::string::ToString::to_string); - let mut result = res.json::().await?; - result.saved_query_id = saved_query_id; - - Ok(result) - } - /// Ingest events into the dataset identified by its id. /// Restrictions for field names (JSON object keys) can be reviewed here: /// . diff --git a/src/datasets/client.rs b/src/datasets/client.rs index 7fbc2a7..ddccffd 100644 --- a/src/datasets/client.rs +++ b/src/datasets/client.rs @@ -1,15 +1,11 @@ #[allow(deprecated)] use crate::{ - datasets::model::{ - Dataset, DatasetCreateRequest, DatasetUpdateRequest, Info, TrimRequest, TrimResult, - }, + datasets::model::{Dataset, DatasetCreateRequest, DatasetUpdateRequest, Info}, error::{Error, Result}, http, }; use std::{ - convert::{TryFrom, TryInto}, - fmt::Debug as FmtDebug, - result::Result as StdResult, + convert::TryFrom, fmt::Debug as FmtDebug, result::Result as StdResult, time::Duration as StdDuration, }; use tracing::instrument; @@ -93,27 +89,6 @@ impl<'client> Client<'client> { self.http_client.get("/v1/datasets").await?.json().await } - /// Trim the dataset identified by its id to a given length. - /// The max duration given will mark the oldest timestamp an event can have. - /// Older ones will be deleted from the dataset. - /// The duration can either be a [`std::time::Duration`] or a - /// [`chrono::Duration`]. - #[instrument(skip(self))] - #[allow(deprecated)] - pub async fn trim(&self, dataset_name: N, duration: D) -> Result - where - N: Into + FmtDebug, - D: TryInto + FmtDebug, - { - let duration = duration.try_into()?; - let req = TrimRequest::new(duration.into()); - self.http_client - .post(format!("/v1/datasets/{}/trim", dataset_name.into()), &req) - .await? - .json() - .await - } - /// Update a dataset. #[instrument(skip(self))] pub async fn update(&self, dataset_name: N, new_description: D) -> Result diff --git a/src/datasets/model.rs b/src/datasets/model.rs index f33be37..b0c9848 100644 --- a/src/datasets/model.rs +++ b/src/datasets/model.rs @@ -1,7 +1,6 @@ -#![allow(deprecated)] // we need this to be allowed to declare depricated code use bitflags::bitflags; use bitflags_serde_shim::impl_serde_for_bitflags; -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Utc}; use http::header::HeaderValue; use serde::{ de::{self, Visitor}, @@ -14,8 +13,9 @@ use std::{ ops::Add, str::FromStr, }; +use table::Field; -use crate::serde::{deserialize_null_default, empty_string_as_none}; +use crate::serde::deserialize_null_default; /// The default field the server looks for a time to use as /// ingestion time. If not present, the server will set the ingestion time by @@ -137,67 +137,24 @@ pub struct Dataset { // ignored: integrationConfigs, integrationFilters, quickQueries } -/// A field of an Axiom dataset. -#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] -pub struct Field { - /// Name is the unique name of the field. - pub name: String, - /// Description is the description of the field. - pub description: String, - /// Type is the datatype of the field. - #[serde(rename = "type")] - pub typ: String, - /// Unit is the unit of the field. - pub unit: String, - /// Hidden describes if the field is hidden or not. - pub hidden: bool, -} - /// Details of the information stored in a dataset. #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct Stat { /// The unique name of the dataset. pub name: String, - /// The number of blocks of the dataset. - #[deprecated( - since = "0.8.0", - note = "This field will be removed in a future version." - )] - pub num_blocks: u64, /// The number of events of the dataset. pub num_events: u64, /// The number of fields of the dataset. pub num_fields: u32, /// The amount of data stored in the dataset. pub input_bytes: u64, - /// The amount of data stored in the dataset formatted in a human - /// readable format. - #[deprecated( - since = "0.8.0", - note = "This field will be removed in a future version." - )] - pub input_bytes_human: String, /// The amount of compressed data stored in the dataset. pub compressed_bytes: u64, - /// The amount of compressed data stored in the - /// dataset formatted in a human readable format. - #[deprecated( - since = "0.8.0", - note = "This field will be removed in a future version." - )] - pub compressed_bytes_human: String, /// The time of the oldest event stored in the dataset. pub min_time: Option>, /// The time of the newest event stored in the dataset. pub max_time: Option>, - /// The ID of the user who created the dataset. - #[serde(rename = "who")] - #[deprecated( - since = "0.8.0", - note = "This field will be removed in a future version." - )] - pub created_by: Option, /// The time the dataset was created at. #[serde(rename = "created")] pub created_at: DateTime, @@ -214,37 +171,6 @@ pub struct Info { pub fields: Vec, } -#[derive(Serialize, Debug)] -#[serde(rename_all = "camelCase")] -pub(crate) struct TrimRequest { - max_duration: String, -} - -impl TrimRequest { - pub(crate) fn new(duration: Duration) -> Self { - TrimRequest { - max_duration: format!("{}s", duration.num_seconds()), - } - } -} - -/// The result of a trim operation. -#[deprecated( - since = "0.8.0", - note = "The trim response will be removed in a future version." -)] -#[derive(Deserialize, Debug)] -pub struct TrimResult { - /// The amount of blocks deleted by the trim operation. - #[deprecated( - since = "0.4.0", - note = "This field is deprecated and will be removed in a future version." - )] - #[serde(rename = "numDeleted")] - #[allow(deprecated, warnings)] - pub blocks_deleted: u64, -} - /// Returned on event ingestion operation. #[derive(Serialize, Deserialize, Debug, Default)] #[serde(rename_all = "camelCase")] @@ -332,6 +258,22 @@ pub struct Query { pub cursor: Option, /// Specifies whether the event that matches the cursor should be included or not pub include_cursor: bool, + /// Requests the cursor to be included in the response + pub include_cursor_field: bool, +} + +impl Query { + /// Creates a new query with the given APL and options. + pub fn new(apl: &S, opts: QueryOptions) -> Self { + Self { + apl: apl.to_string(), + start_time: opts.start_time, + end_time: opts.end_time, + cursor: opts.cursor, + include_cursor: opts.include_cursor, + include_cursor_field: opts.include_cursor_field, + } + } } // QueryParams is the part of `QueryOptions` that is added to the request url. @@ -344,8 +286,20 @@ pub(crate) struct QueryParams { pub format: AplResultFormat, } +impl From<&QueryOptions> for QueryParams { + fn from(options: &QueryOptions) -> Self { + Self { + no_cache: options.no_cache, + save: options.save, + format: options.format, + } + } +} + +// This is a configuration that just happens to have many flags. +#[allow(clippy::struct_excessive_bools)] /// The optional parameters to APL query methods. -#[derive(Debug)] +#[derive(Debug, Default, Serialize, Clone)] pub struct QueryOptions { /// The start time of the query. pub start_time: Option>, @@ -356,7 +310,6 @@ pub struct QueryOptions { /// Specifies whether the event that matches the cursor should be /// included in the result. pub include_cursor: bool, - /// Omits the query cache. pub no_cache: bool, /// Save the query on the server, if set to `true`. The ID of the saved query @@ -368,43 +321,27 @@ pub struct QueryOptions { pub save: bool, /// Format specifies the format of the APL query. Defaults to Legacy. pub format: AplResultFormat, -} - -impl Default for QueryOptions { - fn default() -> Self { - QueryOptions { - start_time: None, - end_time: None, - cursor: None, - include_cursor: false, - no_cache: false, - save: false, - format: AplResultFormat::Legacy, - } - } + /// Requests the cursor to be included in the response + pub include_cursor_field: bool, } /// The result format of an APL query. -#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Default)] #[non_exhaustive] #[serde(rename_all = "camelCase")] pub enum AplResultFormat { - /// Legacy result format - Legacy, -} - -impl Default for AplResultFormat { - fn default() -> Self { - AplResultFormat::Legacy - } + /// Tabular result format + #[default] + Tabular, } /// The kind of a query. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Default)] #[non_exhaustive] #[serde(rename_all = "camelCase")] pub enum QueryKind { /// Analytics query + #[default] Analytics, /// Streaming query Stream, @@ -412,62 +349,6 @@ pub enum QueryKind { Apl, } -impl Default for QueryKind { - fn default() -> Self { - QueryKind::Analytics - } -} - -/// A query that gets executed on a dataset. -/// If you're looking for the APL query, check out [`Query`]. -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Default)] -#[serde(rename_all = "camelCase")] -pub struct LegacyQuery { - /// Start time of the query. - #[serde(deserialize_with = "empty_string_as_none")] - pub start_time: Option>, - /// End time of the query. - #[serde(deserialize_with = "empty_string_as_none")] - pub end_time: Option>, - /// Resolution of the queries graph. Valid values are the queries time - /// range / 100 at maximum and / 1000 at minimum. Use zero value for - /// serve-side auto-detection. - #[serde(default)] - pub resolution: String, // TODO: Implement custom type to {de,}serialize to/from go string - /// Aggregations performed as part of the query. - #[serde(default, deserialize_with = "deserialize_null_default")] - pub aggregations: Vec, - /// Filter applied on the queried results. - pub filter: Option, - /// Field names to group the query results by. - #[serde(default, deserialize_with = "deserialize_null_default")] - pub group_by: Vec, - /// Order rules that specify the order of the query result. - #[serde(default, deserialize_with = "deserialize_null_default")] - pub order: Vec, - /// Number of results returned from the query. - #[serde(default)] - pub limit: u32, - /// Virtual fields that can be referenced by aggregations, filters and - /// orders. - #[serde(default, deserialize_with = "deserialize_null_default")] - pub virtual_fields: Vec, - /// Pricections for the query result. - #[serde(default, deserialize_with = "deserialize_null_default")] - pub projections: Vec, - /// The query cursor. Should be set to the cursor returned with a previous - /// query result, if it was parital. - #[serde(default)] - pub cursor: String, - /// Return the Cursor as part of the query result. - #[serde(default)] - pub include_cursor: bool, - /// Used to get more results of a previous query. It is not valid for starred - /// queries or otherwise stored queries. - #[serde(default)] - pub continuation_token: String, -} - /// A field that is projected to the query result. #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub struct Projection { @@ -617,7 +498,7 @@ pub struct Aggregation { #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] #[non_exhaustive] #[serde(rename_all = "lowercase")] -pub enum FilterOp { +enum FilterOp { /// Logical AND And, /// Logical OR @@ -675,7 +556,7 @@ pub enum FilterOp { /// A filter is applied to a query. #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[serde(rename_all = "camelCase")] -pub struct Filter { +struct Filter { /// The operation of the filter. pub op: FilterOp, /// The field to filter on. @@ -702,15 +583,6 @@ impl Default for Filter { } } -/// Specifies the order a queries result will be in. -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] -pub struct Order { - /// Field to order on. - pub field: String, - /// If the field is ordered desending. - pub desc: bool, -} - /// A `VirtualField` is not part of a dataset and its value is derived from an /// expression. Aggregations, filters and orders can reference this field like /// any other field. @@ -722,53 +594,16 @@ pub struct VirtualField { pub expr: String, } -/// The parameters for a query. -#[derive(Serialize, Deserialize, Debug, Default)] -pub struct LegacyQueryOptions { - /// Duration of the stream - #[serde(rename = "streaming-duration")] - pub streaming_duration: Option, // TODO: Implement custom type to {de,}serialize to/from go string - /// If the query should not be cached. - #[serde(rename = "no-cache")] - pub no_cache: bool, - /// The kind to save the query wit. - #[serde(rename = "saveAsKind")] - pub save_as_kind: QueryKind, -} - +mod table; /// The query result. It embeds the APL request in the result it created. #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct QueryResult { - /// The query request. - pub request: LegacyQuery, - // NOTE: The following is copied from QueryResult. Maybe we should have a macro? /// The status of the query result. pub status: QueryStatus, - /// The datasets that were queried. - #[serde(default, deserialize_with = "deserialize_null_default")] - pub dataset_names: Vec, - /// The events that matched the query. - #[serde(default, deserialize_with = "deserialize_null_default")] - pub matches: Vec, - /// The time series buckets. - pub buckets: Timeseries, - /// The ID of the query that generated this result when it was saved on the - /// server. This is only set when the query was send with the `SaveKind` - /// option specified. - #[serde(skip)] - pub saved_query_id: Option, -} -/// The legacy result of a query. -#[derive(Serialize, Deserialize, Debug)] -pub struct LegacyQueryResult { - /// The status of the query result. - pub status: QueryStatus, - /// The events that matched the query. - pub matches: Vec, - /// The time series buckets. - pub buckets: Timeseries, + /// The tables that were queried. + pub tables: Vec, /// The ID of the query that generated this result when it was saved on the /// server. This is only set when the query was send with the `SaveKind` /// option specified. diff --git a/src/datasets/model/table.rs b/src/datasets/model/table.rs new file mode 100644 index 0000000..3d6f0fb --- /dev/null +++ b/src/datasets/model/table.rs @@ -0,0 +1,390 @@ +use std::fmt::{self, Display}; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::value::Value as JsonValue; + +/// Specifies the order a queries result will be in. +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +pub struct Order { + /// Field to order on. + pub field: String, + /// If the field is ordered desending. + pub desc: bool, +} + +/// The datatype of the column in a table. +#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Eq)] +#[serde(transparent)] +pub struct FieldType { + name: String, +} + +impl FieldType { + /// Returns the name of the field type. + #[must_use] + pub fn name(&self) -> &str { + &self.name + } +} + +impl Display for FieldType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.name()) + } +} + +impl AsRef for FieldType { + fn as_ref(&self) -> &str { + self.name() + } +} + +#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] +pub struct Agg { + // Name of the aggregation + name: String, + // Fields that the aggregation is applied to + #[serde(default)] + fields: Vec, + // Arguments to the aggregation + #[serde(default)] + args: Vec, +} +impl Agg { + /// Returns the name of the aggregation. + #[must_use] + pub fn name(&self) -> &str { + &self.name + } + /// Returns the fields of the aggregation. + #[must_use] + pub fn fields(&self) -> &[String] { + &self.fields + } + /// Returns the arguments of the aggregation. + #[must_use] + pub fn args(&self) -> &[JsonValue] { + &self.args + } +} +impl Display for Agg { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}({})", self.name(), self.fields().join(", ")) + } +} + +/// A field of an Axiom dataset. +#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] +pub struct Field { + /// Name is the unique name of the field. + name: String, + /// Type is the datatype of the field. + #[serde(rename = "type")] + typ: FieldType, + /// Aggregation details if field is an aggregate + agg: Option, +} + +impl Field { + /// Returns the name of the field. + #[must_use] + pub fn name(&self) -> &str { + &self.name + } + /// Returns the type of the field. + #[must_use] + pub fn typ(&self) -> &FieldType { + &self.typ + } + /// Returns the aggregation of the field. + #[must_use] + pub fn agg(&self) -> Option<&Agg> { + self.agg.as_ref() + } +} + +impl Display for Field { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}: {}", self.name(), self.typ()) + } +} + +/// The source dataset of a table. +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Source { + name: String, +} + +impl Source { + /// Returns the name of the source. + #[must_use] + pub fn name(&self) -> &str { + &self.name + } +} +impl Display for Source { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.name()) + } +} + +/// A grouping as part of a table. +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Group { + name: String, +} + +impl Group { + /// Returns the name of the group. + #[must_use] + pub fn name(&self) -> &str { + &self.name + } +} + +impl Display for Group { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.name()) + } +} + +/// The range over which a given field is queried. +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Range { + field: String, + start: DateTime, + end: DateTime, +} + +impl Range { + /// Returns the field of the range. + #[must_use] + pub fn field(&self) -> &str { + &self.field + } + /// Returns the start of the range. + #[must_use] + pub fn start(&self) -> DateTime { + self.start + } + /// Returns the end of the range. + #[must_use] + pub fn end(&self) -> DateTime { + self.end + } +} + +impl Display for Range { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}[{}..{}]", self.field(), self.start(), self.end()) + } +} + +/// The bucketing applied to a table. +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Bucket { + field: String, + size: u64, +} + +impl Bucket { + /// Returns the field of the bucket. + #[must_use] + pub fn field(&self) -> &str { + &self.field + } + /// Returns the size of the bucket. + #[must_use] + pub fn size(&self) -> u64 { + self.size + } +} + +impl Display for Bucket { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}[{}]", self.field(), self.size()) + } +} + +/// A table in the query result. +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Table { + name: String, + sources: Vec, + fields: Vec, + order: Vec, + groups: Vec, + range: Option, + buckets: Option, + columns: Vec>, +} + +impl Table { + /// Returns the name of the table. + #[must_use] + pub fn name(&self) -> &str { + &self.name + } + /// Returns the sources of the table. + #[must_use] + pub fn sources(&self) -> &[Source] { + &self.sources + } + /// Returns the fields of the table. + #[must_use] + pub fn fields(&self) -> &[Field] { + &self.fields + } + /// Returns the order of the table. + #[must_use] + pub fn order(&self) -> &[Order] { + &self.order + } + /// Returns the groups of the table. + #[must_use] + pub fn groups(&self) -> &[Group] { + &self.groups + } + /// Returns the range of the table. + #[must_use] + pub fn range(&self) -> Option<&Range> { + self.range.as_ref() + } + /// Returns the buckets of the table. + #[must_use] + pub fn buckets(&self) -> Option<&Bucket> { + self.buckets.as_ref() + } + /// Returns the columns of the table. + #[must_use] + pub fn columns(&self) -> &[Vec] { + &self.columns + } + + /// Returns the maximum length of the first column + pub fn len(&self) -> usize { + self.columns.first().map(Vec::len).unwrap_or_default() + } + + pub fn get_row(&self, row: usize) -> Option { + if self.len() > row { + Some(Row { table: self, row }) + } else { + None + } + } + pub fn iter(&self) -> RowIter { + RowIter { + table: self, + row: 0, + } + } +} + +/// Iterator over the rows of a table. +pub struct RowIter<'table> { + table: &'table Table, + row: usize, +} +impl<'table> Iterator for RowIter<'table> { + type Item = Row<'table>; + + fn next(&mut self) -> Option { + let row = self.table.get_row(self.row)?; + self.row += 1; + Some(row) + } + + fn size_hint(&self) -> (usize, Option) { + let size = self.table.len(); + (size - self.row, Some(size - self.row)) + } + + fn count(self) -> usize + where + Self: Sized, + { + self.table.len() - self.row + } + + fn last(self) -> Option + where + Self: Sized, + { + if self.table.len() > 0 { + self.table.get_row(self.table.len() - 1) + } else { + None + } + } +} + +/// A row in a table. +pub struct Row<'table> { + table: &'table Table, + row: usize, +} + +impl<'table> Row<'table> { + /// Returns the value of the row by name + pub fn get_field(&self, field: &str) -> Option<&JsonValue> { + let mut index = None; + + for (i, f) in self.table.fields.iter().enumerate() { + if f.name() == field { + index = Some(i); + break; + } + } + + self.get(index?) + } + /// Returns the value of the row. + pub fn get(&self, column: usize) -> Option<&JsonValue> { + self.table.columns.get(column).and_then(|c| c.get(self.row)) + } + /// Returns the value of the row as a string. + #[must_use] + pub fn fields(&self) -> &[Field] { + &self.table.fields + } + /// Returns an iterator over the fields of the row. + #[must_use] + pub fn iter(&self) -> FieldIter<'table> { + FieldIter { + table: self.table, + row: self.row, + index: 0, + } + } +} + +/// Iterator over the fields of a row. +pub struct FieldIter<'table> { + table: &'table Table, + row: usize, + index: usize, +} + +impl<'table> Iterator for FieldIter<'table> { + type Item = Option<&'table JsonValue>; + + fn next(&mut self) -> Option { + if self.index >= self.table.columns.len() { + return None; + } + let value = self + .table + .columns + .get(self.index) + .and_then(|c| c.get(self.row)); + self.index += 1; + Some(value) + } +} diff --git a/src/lib.rs b/src/lib.rs index 520e4a0..3921416 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,7 +22,7 @@ //! //! // Query the dataset //! let query_res = client.query(r#"['my-dataset']"#, None).await?; -//! dbg!(query_res.matches); +//! dbg!(query_res); //! //! // Delete the dataset //! client.datasets().delete(dataset.name).await?; diff --git a/src/serde.rs b/src/serde.rs index c315b26..ec4ae44 100644 --- a/src/serde.rs +++ b/src/serde.rs @@ -1,4 +1,4 @@ -use serde::de::{Deserialize, Deserializer, IntoDeserializer}; +use serde::de::{Deserialize, Deserializer}; /// Set `deserialize_with` to this fn to get the default if null. /// See @@ -10,18 +10,3 @@ where let opt = Option::deserialize(deserializer)?; Ok(opt.unwrap_or_default()) } - -// Set `deserialize_with` to this fn to get the none for an empty string. -// Stolen from https://github.com/serde-rs/serde/issues/1425#issuecomment-462282398 -pub(crate) fn empty_string_as_none<'de, D, T>(de: D) -> Result, D::Error> -where - D: serde::Deserializer<'de>, - T: serde::Deserialize<'de>, -{ - let opt = Option::::deserialize(de)?; - let opt = opt.as_deref(); - match opt { - None | Some("") => Ok(None), - Some(s) => T::deserialize(s.into_deserializer()).map(Some), - } -} diff --git a/tests/cursor.rs b/tests/cursor.rs index 94656b2..6d4a603 100644 --- a/tests/cursor.rs +++ b/tests/cursor.rs @@ -107,30 +107,40 @@ async fn test_cursor_impl(ctx: &mut Context) { let apl_query_result = ctx .client .query( - format!("['{}'] | sort by _time desc", ctx.dataset.name), + &format!("['{}'] | sort by _time desc", ctx.dataset.name), QueryOptions { start_time: Some(start_time), end_time: Some(end_time), save: true, + include_cursor_field: true, ..Default::default() }, ) .await .unwrap(); assert!(apl_query_result.saved_query_id.is_some()); - assert_eq!(1000, apl_query_result.matches.len()); + assert_eq!(1000, apl_query_result.tables[0].len()); - let mid_row_id = &apl_query_result.matches[500].row_id; + let table = &apl_query_result.tables[0]; + let row = table.get_row(500).unwrap(); + + let mid_row_id = &row.get_field("_cursor").expect("column _cursor not found"); + + let cursor_key = if let serde_json::Value::String(mid_row_id) = mid_row_id { + mid_row_id.clone() + } else { + panic!("Expected _cursor to be a string"); + }; let apl_query_result = ctx .client .query( - format!("['{}'] | sort by _time desc", ctx.dataset.name), + &format!("['{}'] | sort by _time desc", ctx.dataset.name), QueryOptions { start_time: Some(start_time), end_time: Some(end_time), include_cursor: true, - cursor: Some(mid_row_id.to_string()), + cursor: Some(cursor_key), save: true, ..Default::default() }, @@ -138,5 +148,6 @@ async fn test_cursor_impl(ctx: &mut Context) { .await .unwrap(); assert!(apl_query_result.saved_query_id.is_some()); - assert_eq!(500, apl_query_result.matches.len()); + assert_eq!(1, apl_query_result.tables.len()); + assert_eq!(500, apl_query_result.tables[0].len()); } diff --git a/tests/datasets.rs b/tests/datasets.rs index 32ba79c..73ad73f 100644 --- a/tests/datasets.rs +++ b/tests/datasets.rs @@ -1,6 +1,5 @@ #![cfg(feature = "integration-tests")] use axiom_rs::{datasets::*, Client}; -use chrono::{Duration, Utc}; use futures::StreamExt; use serde_json::json; use std::{env, time::Duration as StdDuration}; @@ -43,7 +42,7 @@ impl AsyncTestContext for Context { #[test_context(Context)] #[tokio::test] async fn test_datasets(ctx: &mut Context) -> Result<(), Box> { - Ok(test_datasets_impl(ctx).await?) + test_datasets_impl(ctx).await } #[cfg(feature = "async-std")] #[test_context(Context)] @@ -79,7 +78,7 @@ async fn test_datasets_impl(ctx: &mut Context) -> Result<(), Box Result<(), Box 0); - - // Run a query and make sure we see some results. - #[allow(deprecated)] - let simple_query_result = ctx - .client - .query_legacy( - &ctx.dataset.name, - LegacyQuery { - start_time: Some(Utc::now() - Duration::minutes(1)), - end_time: Some(Utc::now()), - ..Default::default() - }, - Some(LegacyQueryOptions { - save_as_kind: QueryKind::Analytics, - ..Default::default() - }), - ) - .await?; - assert!(simple_query_result.saved_query_id.is_some()); - // assert_eq!(1, simple_query_result.status.blocks_examined); - assert_eq!(4327, simple_query_result.status.rows_examined); - assert_eq!(4327, simple_query_result.status.rows_matched); - assert_eq!(1000, simple_query_result.matches.len()); + assert!(!info.fields.is_empty()); // Run another query but using APL. let apl_query_result = ctx .client .query( - format!("['{}']", ctx.dataset.name), + &format!("['{}']", ctx.dataset.name), QueryOptions { save: true, ..Default::default() @@ -204,68 +180,8 @@ async fn test_datasets_impl(ctx: &mut Context) -> Result<(), Box