Skip to content

Commit

Permalink
[bigquery] Support bigquery emulator (#336)
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshidan authored Dec 30, 2024
1 parent be627ee commit 94880c5
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 9 deletions.
170 changes: 162 additions & 8 deletions bigquery/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use backon::{ExponentialBuilder, Retryable};
use core::time::Duration;
use std::borrow::Cow;
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::sync::Arc;

use google_cloud_gax::conn::{ConnectionOptions, Environment};
use google_cloud_gax::retry::RetrySetting;
use google_cloud_googleapis::cloud::bigquery::storage::v1::{
read_session, CreateReadSessionRequest, DataFormat, ReadSession,
};
use google_cloud_token::TokenSourceProvider;
use google_cloud_token::{TokenSource, TokenSourceProvider};
use std::borrow::Cow;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;

use crate::grpc::apiv1::conn_pool::ConnectionManager;
use crate::http::bigquery_client::BigqueryClient;
Expand Down Expand Up @@ -107,7 +107,38 @@ impl Default for ChannelConfig {
}
}

#[derive(Debug)]
pub struct EmptyTokenSourceProvider {}

#[derive(Debug)]
pub struct EmptyTokenSource {}

#[async_trait::async_trait]
impl TokenSource for EmptyTokenSource {
async fn token(&self) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
Ok("".to_string())
}
}

impl TokenSourceProvider for EmptyTokenSourceProvider {
fn token_source(&self) -> Arc<dyn TokenSource> {
Arc::new(EmptyTokenSource {})
}
}

impl ClientConfig {
pub fn new_with_emulator(grpc_host: &str, http_addr: impl Into<Cow<'static, str>>) -> Self {
Self {
http: reqwest_middleware::ClientBuilder::new(reqwest::Client::default()).build(),
bigquery_endpoint: http_addr.into(),
token_source_provider: Box::new(EmptyTokenSourceProvider {}),
environment: Environment::Emulator(grpc_host.to_string()),
streaming_read_config: ChannelConfig::default(),
streaming_write_config: StreamingWriteConfig::default(),
debug: false,
}
}

pub fn new(
http_token_source_provider: Box<dyn TokenSourceProvider>,
grpc_token_source_provider: Box<dyn TokenSourceProvider>,
Expand Down Expand Up @@ -694,7 +725,7 @@ impl Client {
trace_id: "".to_string(),
schema: option.session_schema,
}),
max_stream_count: 0,
max_stream_count: option.max_stream_count,
preferred_min_stream_count: 0,
},
option.session_retry_setting,
Expand All @@ -712,6 +743,7 @@ pub struct ReadTableOption {
session_schema: Option<read_session::Schema>,
session_retry_setting: Option<RetrySetting>,
read_rows_retry_setting: Option<RetrySetting>,
max_stream_count: i32,
}

impl ReadTableOption {
Expand Down Expand Up @@ -739,16 +771,21 @@ impl ReadTableOption {
self.read_rows_retry_setting = Some(value);
self
}

pub fn with_max_stream_count(mut self, value: i32) -> Self {
self.max_stream_count = value;
self
}
}

#[cfg(test)]
mod tests {
use bigdecimal::BigDecimal;

use serial_test::serial;
use std::collections::HashMap;
use std::ops::AddAssign;
use std::time::Duration;

use time::{Date, OffsetDateTime, Time};

use google_cloud_googleapis::cloud::bigquery::storage::v1::read_session::TableReadOptions;
Expand Down Expand Up @@ -1237,3 +1274,120 @@ mod tests {
}
}
}

#[cfg(test)]
mod emulator_tests {
use crate::client::{Client, ClientConfig};
use crate::http::table::{Table, TableFieldSchema, TableFieldType, TableSchema};
use crate::http::tabledata::insert_all::{InsertAllRequest, Row};
use crate::http::tabledata::list::FetchDataRequest;
use futures_util::StreamExt;

use prost::Message;

use std::time::SystemTime;

#[ignore]
#[tokio::test]
async fn test_emulator_use() {
let config = ClientConfig::new_with_emulator("localhost:9060", "http://localhost:9050");

// Create Table
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
let client = Client::new(config).await.unwrap();
let mut table1 = Table::default();
table1.table_reference.dataset_id = "dataset1".to_string();
table1.table_reference.project_id = "local-project".to_string();
table1.table_reference.table_id = format!("table{now}").to_string();
table1.schema = Some(TableSchema {
fields: vec![TableFieldSchema {
name: "col_string".to_string(),
data_type: TableFieldType::String,
..Default::default()
}],
});
client.table_client.create(&table1).await.unwrap();

// Insert data
let mut req = InsertAllRequest::<serde_json::Value>::default();
req.rows.push(Row {
insert_id: None,
json: serde_json::from_str(
r#"
{"col_string": "test1"}
"#,
)
.unwrap(),
});
client
.tabledata_client
.insert(
&table1.table_reference.project_id,
&table1.table_reference.dataset_id,
&table1.table_reference.table_id,
&req,
)
.await
.unwrap();

// Streaming write
let writer = client.default_storage_writer();
let fqtn = &format!(
"projects/local-project/datasets/dataset1/tables/{}",
table1.table_reference.table_id
);
let stream = writer.create_write_stream(fqtn).await.unwrap();

let mut rows = vec![];
for j in 0..5 {
let data = crate::storage_write::stream::tests::TestData {
col_string: format!("default_{j}"),
};
let mut buf = Vec::new();
data.encode(&mut buf).unwrap();
rows.push(crate::storage_write::stream::tests::create_append_rows_request(vec![
buf.clone(),
buf.clone(),
buf,
]));
}
let mut result = stream.append_rows(rows).await.unwrap();
while let Some(res) = result.next().await {
let res = res.unwrap();
tracing::info!("append row errors = {:?}", res.row_errors.len());
}

// Read all data
let tref = &table1.table_reference;
let data = client
.tabledata_client
.read(
&tref.project_id,
&tref.dataset_id,
&tref.table_id,
&FetchDataRequest { ..Default::default() },
)
.await
.unwrap();
assert_eq!(16, data.total_rows);

/* TODO fix emulator stream
// Read all data by storage
let opt = ReadTableOption::default()
.with_session_read_options(TableReadOptions::default())
.with_max_stream_count(1);
let mut records= client
.read_table::<crate::storage::row::Row>(&tref, Some(opt))
.await
.unwrap();
let mut count = 0;
while let record = records.next().await.unwrap() {
count += 1;
}
assert_eq!(count, data.total_rows);
*/
}
}
8 changes: 8 additions & 0 deletions bigquery/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ where
T::from_str(&s).map_err(de::Error::custom)
}

fn into_str<T, S>(value: T, s: S) -> Result<S::Ok, S::Error>
where
T: ToString,
S: serde::Serializer,
{
s.serialize_str(&value.to_string())
}

fn from_str_vec_option<'de, T, D>(deserializer: D) -> Result<Option<Vec<T>>, D::Error>
where
T: FromStr,
Expand Down
9 changes: 9 additions & 0 deletions bigquery/src/http/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@ pub struct Table {
/// Output only. The resource type.
pub kind: String,
/// Output only. A hash of the resource.
#[serde(default)]
pub etag: String,
/// Output only. The fully-qualified unique name of the dataset in the format projectId:datasetId.
/// The dataset name without the project name is given in the datasetId field. When creating a new dataset,
Expand Down Expand Up @@ -712,15 +713,21 @@ pub struct Table {
pub require_partition_filter: Option<bool>,
/// Output only. The size of this table in logical bytes, excluding any data in the streaming buffer.
#[serde(deserialize_with = "crate::http::from_str_option")]
#[serde(default)]
pub num_bytes: Option<i64>,
/// Output only. The number of logical bytes in the table that are considered "long-term storage".
#[serde(deserialize_with = "crate::http::from_str")]
#[serde(serialize_with = "crate::http::into_str")]
#[serde(default)]
pub num_long_term_bytes: i64,
/// Output only. The number of rows of data in this table, excluding any data in the streaming buffer.
#[serde(deserialize_with = "crate::http::from_str")]
#[serde(serialize_with = "crate::http::into_str")]
#[serde(default)]
pub num_rows: u64,
/// Output only. The time when this table was created, in milliseconds since the epoch.
#[serde(deserialize_with = "crate::http::from_str")]
#[serde(serialize_with = "crate::http::into_str")]
pub creation_time: i64,
/// Optional. The time when this table expires, in milliseconds since the epoch.
/// If not present, the table will persist indefinitely. Expired tables will be deleted and their storage reclaimed. The defaultTableExpirationMs property of the encapsulating dataset can be used to set a default expirationTime on newly created tables.
Expand All @@ -729,6 +736,7 @@ pub struct Table {
pub expiration_time: Option<i64>,
/// Output only. The time when this table was last modified, in milliseconds since the epoch.
#[serde(deserialize_with = "crate::http::from_str")]
#[serde(serialize_with = "crate::http::into_str")]
pub last_modified_time: u64,
/// Output only. Describes the table type. The following values are supported:
///
Expand All @@ -750,6 +758,7 @@ pub struct Table {
pub external_data_configuration: Option<ExternalDataConfiguration>,
/// Output only. The geographic location where the table resides.
/// This value is inherited from the dataset.
#[serde(default)]
pub location: String,
/// Output only. Contains information regarding this table's streaming buffer, if one is present.
/// This field will be absent if the table is not being streamed to or
Expand Down
1 change: 1 addition & 0 deletions bigquery/src/http/tabledata/insert_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub struct Error {
#[derive(Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize, Default, Debug)]
#[serde(rename_all = "camelCase")]
pub struct InsertAllResponse {
#[serde(default)]
pub kind: String,
pub insert_errors: Option<Vec<Error>>,
}
Expand Down
2 changes: 2 additions & 0 deletions bigquery/src/http/tabledata/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ pub struct FetchDataRequest {
#[serde(rename_all = "camelCase")]
pub struct FetchDataResponse {
/// Will be set to "bigquery#tableDataList".
#[serde(default)]
pub kind: String,
/// Etag to the response.
#[serde(default)]
pub etag: String,
/// Total rows of the entire table. In order to show default value "0", we have to present it as string.
#[serde(deserialize_with = "crate::http::from_str")]
Expand Down
2 changes: 1 addition & 1 deletion bigquery/src/storage_write/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl DisposableStreamDelegate {
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use crate::storage_write::AppendRowsRequestBuilder;
use prost_types::{field_descriptor_proto, DescriptorProto, FieldDescriptorProto};

Expand Down

0 comments on commit 94880c5

Please sign in to comment.