Skip to content

Commit

Permalink
Add a subcommand to introspect stored procedures (#148)
Browse files Browse the repository at this point in the history
<!-- The PR description should answer 2 (maybe 3) important questions:
-->

### What

<!-- What is this PR trying to accomplish (and why, if it's not
obvious)? -->

This PR adds a subcommand `stored-procedures` to the CLI `update`
command to introspect stored procedures separately. This is because the
stored procedures introspection doesn't include any return type and a
stored procedure without any return type is not added to the connector's
schema. After the introspection, the user needs to manually add the
return types of the stored procedures and this information should be
retained, that's why the introspection of stored procedures is decoupled
from the introspection of the `tables` , `types` etc.

### How

<!-- How is it trying to accomplish it (what are the implementation
steps)? -->
  • Loading branch information
codingkarthik authored Sep 16, 2024
1 parent 368b38f commit 6b30734
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 96 deletions.
28 changes: 25 additions & 3 deletions crates/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ pub struct Context<Env: Environment> {
pub environment: Env,
}

#[derive(Debug, Clone, Subcommand)]
pub enum UpdateCommand {
StoredProcedures {
#[arg(long)]
overwrite: bool,
},
}

/// The command invoked by the user.
#[derive(Debug, Clone, Subcommand)]
pub enum Command {
Expand All @@ -30,7 +38,10 @@ pub enum Command {
with_metadata: bool,
},
/// Update the configuration by introspecting the database, using the configuration options.
Update,
Update {
#[command(subcommand)]
subcommand: Option<UpdateCommand>,
},
}

/// The set of errors that can go wrong _in addition to_ generic I/O or parsing errors.
Expand All @@ -44,7 +55,7 @@ pub enum Error {
pub async fn run(command: Command, context: Context<impl Environment>) -> anyhow::Result<()> {
match command {
Command::Initialize { with_metadata } => initialize(with_metadata, context).await?,
Command::Update => update(context).await?,
Command::Update { subcommand } => update(context, subcommand).await?,
};
Ok(())
}
Expand Down Expand Up @@ -130,7 +141,10 @@ async fn initialize(with_metadata: bool, context: Context<impl Environment>) ->
/// Update the configuration in the current directory by introspecting the database.
///
/// This expects a configuration with a valid connection URI.
async fn update(context: Context<impl Environment>) -> anyhow::Result<()> {
async fn update(
context: Context<impl Environment>,
subcommand: Option<UpdateCommand>,
) -> anyhow::Result<()> {
// It is possible to change the file in the middle of introspection.
// We want to detect this scenario and retry, or fail if we are unable to.
// We do that with a few attempts.
Expand All @@ -144,10 +158,18 @@ async fn update(context: Context<impl Environment>) -> anyhow::Result<()> {
read_config_file_contents(&configuration_file_path).await?;
serde_json::from_str(&configuration_file_contents)?
};
let stored_procs_config = subcommand.clone().map(|sub_cmd| match sub_cmd {
UpdateCommand::StoredProcedures {
overwrite: r#override,
} => configuration::version1::StoredProceduresConfigurationOptions {
overwrite_existing_stored_procedures: r#override,
},
});
let output = configuration::configure(
&configuration_file_path,
&raw_configuration,
&context.environment,
stored_procs_config,
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion crates/cli/tests/update_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn test_update_configuration() -> anyhow::Result<()> {
environment,
release_version: None,
};
run(Command::Update, context).await?;
run(Command::Update { subcommand: None }, context).await?;

let configuration_file_path = dir.path().join("configuration.json");
assert!(configuration_file_path.exists());
Expand Down
4 changes: 4 additions & 0 deletions crates/configuration/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,8 @@ pub enum Error {

#[error("Error creating connection pool while introspecting the database: {0}")]
ConnectionPoolError(#[from] bb8_tiberius::Error),

// error while parsing stored procedure introspection
#[error("Error parsing stored procedure introspection: {0}")]
StoredProcedureIntrospectionError(serde_json::Error),
}
60 changes: 53 additions & 7 deletions crates/configuration/src/version1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ pub struct RawConfiguration {
pub metadata: query_engine_metadata::metadata::Metadata,
}

#[derive(Debug, PartialEq, Eq, Deserialize, Serialize, JsonSchema, Clone)]
pub struct StoredProceduresConfigurationOptions {
pub overwrite_existing_stored_procedures: bool,
}

impl RawConfiguration {
pub fn empty() -> Self {
Self {
Expand Down Expand Up @@ -166,11 +171,53 @@ async fn select_first_row(
stream.into_row().await.unwrap().unwrap()
}

// get_stored_procedures fetches the stored procedures from the database and returns them as a
// vector of introspection::IntrospectStoredProcedure.
async fn configure_stored_procedures(
mssql_pool: &bb8::Pool<bb8_tiberius::ConnectionManager>,
existing_stored_procedures: StoredProcedures,
config_options: Option<StoredProceduresConfigurationOptions>,
) -> Result<StoredProcedures, Error> {
match config_options {
Some(config_options) => {
let stored_procedures_row =
select_first_row(mssql_pool, STORED_PROCS_CONFIGURATION_QUERY).await;
let introspected_stored_procedures: Vec<introspection::IntrospectStoredProcedure> =
serde_json::from_str(stored_procedures_row.get(0).unwrap())
.map_err(Error::StoredProcedureIntrospectionError)?;
let new_stored_procedures = get_stored_procedures(introspected_stored_procedures);

// traverse the new stored procedures and add them to the existing stored procedures
let mut merged_stored_procedures = existing_stored_procedures.0.clone();
for (name, stored_procedure) in new_stored_procedures.0 {
// check if the stored procedure already exists
match merged_stored_procedures.entry(name) {
std::collections::btree_map::Entry::Occupied(mut e) => {
if config_options.overwrite_existing_stored_procedures {
e.insert(stored_procedure);
} else {
// do not overwrite the existing stored procedure
continue;
}
}
std::collections::btree_map::Entry::Vacant(e) => {
e.insert(stored_procedure);
}
}
}

Ok(StoredProcedures(merged_stored_procedures))
}
None => Ok(existing_stored_procedures),
}
}

/// Construct the deployment configuration by introspecting the database.
pub async fn configure(
file_path: &std::path::Path,
configuration: &RawConfiguration,
environment: impl Environment,
stored_procedure_configuration_options: Option<StoredProceduresConfigurationOptions>,
) -> Result<RawConfiguration, Error> {
let connection_string = match &configuration.mssql_connection_string {
uri::ConnectionUri(Secret::Plain(s)) => s.to_owned(),
Expand Down Expand Up @@ -205,13 +252,12 @@ pub async fn configure(

metadata.aggregate_functions = get_aggregate_functions(&type_names);

let stored_procedures_row =
select_first_row(&mssql_pool, STORED_PROCS_CONFIGURATION_QUERY).await;

let stored_procedures: Vec<introspection::IntrospectStoredProcedure> =
serde_json::from_str(stored_procedures_row.get(0).unwrap()).unwrap();

metadata.stored_procedures = get_stored_procedures(stored_procedures);
metadata.stored_procedures = configure_stored_procedures(
&mssql_pool,
configuration.metadata.stored_procedures.clone(),
stored_procedure_configuration_options,
)
.await?;

Ok(RawConfiguration {
version: 1,
Expand Down
3 changes: 3 additions & 0 deletions crates/ndc-sqlserver/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ impl<Env: Environment + Send + Sync> connector::ConnectorSetup for SQLServerSetu
configuration::Error::ConnectionPoolError(inner) => {
connector::ParseError::Other(inner.into())
}
configuration::Error::StoredProcedureIntrospectionError(inner) => {
connector::ParseError::Other(inner.into())
}
})
}

Expand Down
7 changes: 3 additions & 4 deletions crates/ndc-sqlserver/tests/common/fresh_deployments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ impl FreshDeployment {
.expect("Error initializing the newly created DB");

for file_path in data_setup_file_paths.into_iter() {
let query = fs::read_to_string(file_path.clone()).expect(&format!(
"Failed to read file from {}",
file_path.to_str().unwrap()
));
let query = fs::read_to_string(file_path.clone()).unwrap_or_else(|_| {
panic!("Failed to read file from {}", file_path.to_str().unwrap())
});

new_db_connection
.simple_query(query)
Expand Down
4 changes: 2 additions & 2 deletions crates/ndc-sqlserver/tests/configuration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub async fn configure_is_idempotent(
)]);
let file_path = PathBuf::new();

let mut actual = configuration::configure(&file_path, &args, environment)
let mut actual = configuration::configure(&file_path, &args, environment, None)
.await
.expect("configuration::configure");

Expand Down Expand Up @@ -92,7 +92,7 @@ pub async fn configure_initial_configuration_is_unchanged(
let environment = HashMap::from([(connection_uri_variable, connection_string.into())]);
let file_path = PathBuf::new();

configuration::configure(&file_path, &args, environment)
configuration::configure(&file_path, &args, environment, None)
.await
.expect("configuration::configure")
}
Expand Down
26 changes: 23 additions & 3 deletions docs/usage/stored_procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,23 @@ In this guide, we'll look at how to use stored procedures with the ndc-sqlserver

## Tracking Stored Procedures

The ndc-sqlserver connector can track stored procedures in a SQL Server database. The stored
procedures present in the database are automatically added while introspecting the database
via the `update` operation. The stored procedures will appear in the `$.metadata.storedProcedures`
The ndc-sqlserver connector can track stored procedures in a SQL Server database.

The stored procedures present in the database can be added by running the following command:

```
ddn connector plugin --connector app/sqlserver/connector.yaml -- stored-procedures
```

If you want to overwrite the existing stored procedures,

```
ddn connector plugin --connector app/sqlserver/connector.yaml -- stored-procedures --overwrite
```



After running the above command, the stored procedures will appear in the `$.metadata.storedProcedures`
key of the configuration that is generated by the `update` operation.


Expand Down Expand Up @@ -99,6 +113,12 @@ we can add a return type for it, as following:
}
```

### Marking required arguments as `nonNullable`

If your stored procedure contains a required argument, then you can mark the argument as `nonNullable`
which will enable to throw a validation error as soon as possible. For example, in the above, the `Phone` field
is a required argument, hence it is marked as `nonNullable`.

## Schema of Stored Procedures

## Schema
Expand Down
2 changes: 1 addition & 1 deletion static/chinook-stored-procedures.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"type": "int",
"description": null,
"nullable": "nullable"
}
}
},
"description": null
}
Expand Down
114 changes: 57 additions & 57 deletions static/configuration.json
Original file line number Diff line number Diff line change
Expand Up @@ -843,63 +843,6 @@
"description": null
}
},
"storedProcedures": {
"GetCustomerDetailsWithTotalPurchases": {
"name": "GetCustomerDetailsWithTotalPurchases",
"schema": "dbo",
"arguments": {
"CustomerId": {
"name": "CustomerId",
"type": "int",
"nullable": "nullable",
"isOutput": false,
"description": null
},
"Phone": {
"name": "Phone",
"type": "varchar",
"nullable": "nullable",
"isOutput": false,
"description": null
}
},
"returns": {
"CustomerId": {
"name": "CustomerId",
"type": "int",
"nullable": "nonNullable",
"description": null
},
"Phone": {
"name": "Phone",
"type": "varchar",
"nullable": "nonNullable",
"description": null
},
"TotalPurchases": {
"name": "TotalPurchases",
"type": "int",
"nullable": "nonNullable",
"description": null
}
},
"description": null
},
"ReturnOne": {
"name": "ReturnOne",
"schema": "dbo",
"arguments": {},
"returns": {
"result": {
"name": "result",
"type": "int",
"description": null,
"nullable": "nullable"
}
},
"description": null
}
},
"aggregateFunctions": {
"bigint": {
"APPROX_COUNT_DISTINCT": {
Expand Down Expand Up @@ -2763,6 +2706,63 @@
"operatorKind": "custom"
}
}
},
"storedProcedures": {
"GetCustomerDetailsWithTotalPurchases": {
"name": "GetCustomerDetailsWithTotalPurchases",
"schema": "dbo",
"arguments": {
"CustomerId": {
"name": "CustomerId",
"type": "int",
"nullable": "nullable",
"isOutput": false,
"description": null
},
"Phone": {
"name": "Phone",
"type": "varchar",
"nullable": "nullable",
"isOutput": false,
"description": null
}
},
"returns": {
"CustomerId": {
"name": "CustomerId",
"type": "int",
"nullable": "nonNullable",
"description": null
},
"Phone": {
"name": "Phone",
"type": "varchar",
"nullable": "nonNullable",
"description": null
},
"TotalPurchases": {
"name": "TotalPurchases",
"type": "int",
"nullable": "nonNullable",
"description": null
}
},
"description": null
},
"ReturnOne": {
"name": "ReturnOne",
"schema": "dbo",
"arguments": {},
"returns": {
"result": {
"name": "result",
"type": "int",
"description": null,
"nullable": "nullable"
}
},
"description": null
}
}
}
}
Loading

0 comments on commit 6b30734

Please sign in to comment.