Skip to content

Commit

Permalink
refactor(table options): backwards compat for catalog (#2775)
Browse files Browse the repository at this point in the history
  • Loading branch information
universalmind303 authored Mar 12, 2024
1 parent c056a0e commit 4935cc3
Show file tree
Hide file tree
Showing 29 changed files with 380 additions and 205 deletions.
6 changes: 3 additions & 3 deletions crates/datasources/src/debug/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datafusion_ext::functions::VirtualLister;
use errors::DebugError;
use futures::Stream;
use parser::options::StatementOptions;
use protogen::metastore::types::options::{CredentialsOptions, TableOptions, TunnelOptions};
use protogen::metastore::types::options::{CredentialsOptions, TableOptionsV1, TunnelOptions};

pub use self::options::{DebugTableType, TableOptionsDebug};
use crate::DatasourceError;
Expand Down Expand Up @@ -259,7 +259,7 @@ impl crate::Datasource for DebugDatasource {
opts: &mut StatementOptions,
creds: Option<CredentialsOptions>,
tunnel_opts: Option<TunnelOptions>,
) -> Result<TableOptions, DatasourceError> {
) -> Result<TableOptionsV1, DatasourceError> {
validate_tunnel_connections(tunnel_opts.as_ref()).unwrap();

let typ: Option<DebugTableType> = match creds {
Expand All @@ -276,7 +276,7 @@ impl crate::Datasource for DebugDatasource {

async fn create_table_provider(
&self,
options: &TableOptions,
options: &TableOptionsV1,
tunnel_opts: Option<&TunnelOptions>,
) -> Result<Arc<dyn TableProvider>, DatasourceError> {
let options: TableOptionsDebug = options.extract()?;
Expand Down
9 changes: 5 additions & 4 deletions crates/datasources/src/debug/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ use serde::{Deserialize, Serialize};
use super::errors::DebugError;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DebugTableType {
/// A table that will always return an error on the record batch stream.
ErrorDuringExecution,
/// A table that never stops sending record batches.
NeverEnding,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TableOptionsDebug {
pub table_type: DebugTableType,
}

impl ParseOptionValue<DebugTableType> for SqlOptionValue {
fn parse_opt(self) -> Result<DebugTableType, ParserError> {
Expand Down Expand Up @@ -124,10 +129,6 @@ impl DebugTableType {
}


#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TableOptionsDebug {
pub table_type: DebugTableType,
}
impl TableOptionsImpl for TableOptionsDebug {
const NAME: &'static str = "debug";
}
Expand Down
6 changes: 3 additions & 3 deletions crates/datasources/src/lance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use parser::options::StatementOptions;
use protogen::metastore::types::options::{
CredentialsOptions,
StorageOptions,
TableOptions,
TableOptionsObjectStore,
TableOptionsV1,
TunnelOptions,
};

Expand Down Expand Up @@ -93,7 +93,7 @@ impl Datasource for LanceDatasource {
opts: &mut StatementOptions,
creds: Option<CredentialsOptions>,
_tunnel_opts: Option<TunnelOptions>,
) -> Result<TableOptions, DatasourceError> {
) -> Result<TableOptionsV1, DatasourceError> {
let location: String = opts.remove_required("location")?;
let mut storage_options = StorageOptions::try_from(opts)?;
if let Some(creds) = creds {
Expand All @@ -113,7 +113,7 @@ impl Datasource for LanceDatasource {

async fn create_table_provider(
&self,
options: &TableOptions,
options: &TableOptionsV1,
_tunnel_opts: Option<&TunnelOptions>,
) -> Result<Arc<dyn TableProvider>, DatasourceError> {
let TableOptionsObjectStore {
Expand Down
6 changes: 3 additions & 3 deletions crates/datasources/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use datafusion::datasource::TableProvider;
use parser::options::StatementOptions;
use protogen::metastore::types::options::{CredentialsOptions, TableOptions, TunnelOptions};
use protogen::metastore::types::options::{CredentialsOptions, TableOptionsV1, TunnelOptions};
pub mod bigquery;
pub mod bson;
pub mod cassandra;
Expand Down Expand Up @@ -46,12 +46,12 @@ pub trait Datasource: Send + Sync {
opts: &mut StatementOptions,
creds: Option<CredentialsOptions>,
tunnel_opts: Option<TunnelOptions>,
) -> Result<TableOptions, DatasourceError>;
) -> Result<TableOptionsV1, DatasourceError>;


async fn create_table_provider(
&self,
tbl_options: &TableOptions,
tbl_options: &TableOptionsV1,
_tunnel_opts: Option<&TunnelOptions>,
) -> Result<Arc<dyn TableProvider>, DatasourceError>;
}
4 changes: 2 additions & 2 deletions crates/datasources/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ use object_store::{ObjectMeta, ObjectStore};
use protogen::metastore::types::options::{
CredentialsOptions,
StorageOptions,
TableOptions,
TableOptionsGcs,
TableOptionsObjectStore,
TableOptionsS3,
TableOptionsV1,
};

use self::azure::AzureStoreAccess;
Expand Down Expand Up @@ -390,7 +390,7 @@ pub fn file_type_from_path(path: &ObjectStorePath) -> Result<FileType> {

pub fn init_session_registry<'a>(
runtime: &RuntimeEnv,
entries: impl Iterator<Item = &'a TableOptions>,
entries: impl Iterator<Item = &'a TableOptionsV1>,
) -> Result<()> {
for opts in entries {
let access: Arc<dyn ObjStoreAccess> = match opts.name.as_ref() {
Expand Down
37 changes: 37 additions & 0 deletions crates/glaredb/tests/catalog_compat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
mod setup;
use crate::setup::make_cli;


#[test]
/// Assert that we can still load the old catalog without panicking
fn test_catalog_backwards_compat() {
let mut cmd = make_cli();
let pwd = std::env::current_dir().unwrap();
let root_dir = pwd.parent().unwrap().parent().unwrap();
let old_catalog = root_dir.join("testdata/catalog_compat/v0");
cmd.args(["-l", old_catalog.to_str().unwrap()])
.assert()
.success();
}


#[test]
/// Make sure that we can read the table options from the old catalog
/// The v0 catalog has a table created from the following SQL:
/// ```sql
/// CREATE EXTERNAL TABLE debug_table
/// FROM debug OPTIONS (
/// table_type 'never_ending'
/// );
/// ```
fn test_catalog_backwards_compat_tbl_options() {
let mut cmd = make_cli();
let pwd = std::env::current_dir().unwrap();
let root_dir = pwd.parent().unwrap().parent().unwrap();
let old_catalog = root_dir.join("testdata/catalog_compat/v0");

let query = "SELECT * FROM debug_table LIMIT 1";
cmd.args(["-l", old_catalog.to_str().unwrap(), "-q", query])
.assert()
.success();
}
3 changes: 3 additions & 0 deletions crates/metastore/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use protogen::metastore::types::catalog::{
TableEntry,
TunnelEntry,
ViewEntry,
CURRENT_CATALOG_VERSION,
};
use protogen::metastore::types::options::{
DatabaseOptions,
Expand Down Expand Up @@ -183,6 +184,7 @@ impl DatabaseCatalog {
version: guard.version,
entries: guard.entries.as_ref().clone(),
deployment: guard.deployment.clone(),
catalog_version: CURRENT_CATALOG_VERSION,
}
}

Expand Down Expand Up @@ -530,6 +532,7 @@ impl State {
.into_iter()
.filter(|(_, ent)| !ent.get_meta().builtin)
.collect(),
catalog_version: CURRENT_CATALOG_VERSION,
},
extra: ExtraState {
oid_counter: self.oid_counter,
Expand Down
7 changes: 6 additions & 1 deletion crates/metastore/src/storage/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ use object_store::{Error as ObjectStoreError, ObjectStore};
use pgrepr::oid::FIRST_AVAILABLE_ID;
use prost::Message;
use protogen::gen::metastore::storage;
use protogen::metastore::types::catalog::{CatalogState, DeploymentMetadata};
use protogen::metastore::types::catalog::{
CatalogState,
DeploymentMetadata,
CURRENT_CATALOG_VERSION,
};
use protogen::metastore::types::storage::{CatalogMetadata, ExtraState, PersistedCatalog};
use tracing::{debug, error};
use uuid::Uuid;
Expand Down Expand Up @@ -75,6 +79,7 @@ impl Storage {
version: 0,
entries: HashMap::new(),
deployment: DeploymentMetadata { storage_size: 0 },
catalog_version: CURRENT_CATALOG_VERSION,
},
extra: ExtraState {
oid_counter: FIRST_AVAILABLE_ID,
Expand Down
15 changes: 12 additions & 3 deletions crates/protogen/proto/metastore/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,13 @@ message CatalogState {

// Metadata for the deployment.
DeploymentMetadata deployment = 3;

// next: 4

// The version of the actual catalog implementation.
// This is incremented when there are incompatible changes to the physical
// catalog layout.
// Purposely set to a high number to avoid accidental collisions
optional uint32 catalog_version = 2047;
}

// Metadata for the deployment.
Expand Down Expand Up @@ -160,11 +165,15 @@ message TableEntry {

reserved 2; // Column fields

options.TableOptions options = 3;
// The old table options. This is deprecated and should not be used.
// It is only here for backwards compatibility.
options.TableOptionsV0 options_v0 = 3;
optional uint32 tunnel_id = 4;
SourceAccessMode access_mode = 5;
repeated options.InternalColumnDefinition columns = 6;
// next: 7
// The new table options.
options.TableOptionsV1 options = 7;
// next: 8
}

message ViewEntry {
Expand Down
4 changes: 2 additions & 2 deletions crates/protogen/proto/metastore/options.proto
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ message StorageOptions {
map<string, string> inner = 1;
}

message TableOptions {
message TableOptionsV1 {
string name = 1;
bytes options = 2;
}

// Table options

message TableOptionsOld {
message TableOptionsV0 {
oneof options {
TableOptionsInternal internal = 1;
TableOptionsDebug debug = 2;
Expand Down
2 changes: 1 addition & 1 deletion crates/protogen/proto/metastore/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ message CreateFunction {
message CreateExternalTable {
string schema = 1;
string name = 2;
options.TableOptions options = 3;
options.TableOptionsV1 options = 3;
bool if_not_exists = 4;
optional string tunnel = 5;
bool or_replace = 6;
Expand Down
33 changes: 30 additions & 3 deletions crates/protogen/src/metastore/types/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,33 @@ use super::options::{
CredentialsOptions,
DatabaseOptions,
InternalColumnDefinition,
TableOptions,
TableOptionsInternal,
TableOptionsV0,
TableOptionsV1,
TunnelOptions,
};
use crate::gen::common::arrow::ArrowType;
use crate::gen::metastore::catalog::{self, type_signature};
use crate::{gen, FromOptionalField, ProtoConvError};

/// The current version of the catalog IMPLEMENTATION
/// this is incremented every time there is a breaking change physical representation of the catalog.
pub const CURRENT_CATALOG_VERSION: u32 = 1;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CatalogState {
/// the version of the catalog STATE.
/// This version corresponds the state of the catalog and is incremented every time there is a change to the catalog.
/// It is associated with the "data" in the catalog
pub version: u64,
pub entries: HashMap<u32, CatalogEntry>,
pub deployment: DeploymentMetadata,
/// This is the version of the catalog IMPLEMENTATION.
/// any new code paths should generally use [`CURRENT_CATALOG_VERSION`] unless it is specifically
/// for handling older versions of the catalog.
/// unlike the `version` field, this is only incremented when the physical representation of the catalog changes.
/// it is associated with the "code" that represents the catalog.
pub catalog_version: u32,
}

impl TryFrom<catalog::CatalogState> for CatalogState {
Expand All @@ -47,6 +61,7 @@ impl TryFrom<catalog::CatalogState> for CatalogState {
version: value.version,
entries,
deployment,
catalog_version: value.catalog_version.unwrap_or(0),
})
}
}
Expand All @@ -65,6 +80,7 @@ impl TryFrom<CatalogState> for catalog::CatalogState {
})
.collect::<Result<_, _>>()?,
deployment: Some(value.deployment.try_into()?),
catalog_version: Some(value.catalog_version),
})
}
}
Expand Down Expand Up @@ -438,7 +454,7 @@ impl From<SchemaEntry> for catalog::SchemaEntry {
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TableEntry {
pub meta: EntryMeta,
pub options: TableOptions,
pub options: TableOptionsV1,
pub tunnel_id: Option<u32>,
pub access_mode: SourceAccessMode,
pub columns: Option<Vec<InternalColumnDefinition>>,
Expand All @@ -465,6 +481,14 @@ impl TryFrom<catalog::TableEntry> for TableEntry {
type Error = ProtoConvError;
fn try_from(value: catalog::TableEntry) -> Result<Self, Self::Error> {
let meta: EntryMeta = value.meta.required("meta")?;
let options_old = value.options_v0;
let options: TableOptionsV1 = match options_old {
Some(options) => {
let options_v0: TableOptionsV0 = options.try_into()?;
options_v0.into()
}
None => value.options.required("options")?,
};

let columns: Vec<crate::metastore::types::options::InternalColumnDefinition> = value
.columns
Expand All @@ -480,7 +504,7 @@ impl TryFrom<catalog::TableEntry> for TableEntry {

Ok(TableEntry {
meta,
options: value.options.required("options")?,
options,
tunnel_id: value.tunnel_id,
access_mode: value.access_mode.try_into()?,
columns,
Expand All @@ -502,6 +526,7 @@ impl From<TableEntry> for catalog::TableEntry {
options: Some(value.options.into()),
tunnel_id: value.tunnel_id,
access_mode: value.access_mode.into(),
options_v0: None,
columns,
}
}
Expand Down Expand Up @@ -882,13 +907,15 @@ mod tests {
version: 4,
entries: HashMap::new(),
deployment: None,
catalog_version: None,
};

let converted: CatalogState = state.try_into().unwrap();
let expected = CatalogState {
version: 4,
entries: HashMap::new(),
deployment: DeploymentMetadata { storage_size: 0 },
catalog_version: 0,
};

assert_eq!(expected, converted);
Expand Down
Loading

0 comments on commit 4935cc3

Please sign in to comment.