Skip to content

Commit

Permalink
Merge pull request #93 from muzarski/user_insert_operation
Browse files Browse the repository at this point in the history
c-s: Implement `insert` operation for user profiles
  • Loading branch information
muzarski authored Sep 9, 2024
2 parents 94202dd + 018d6ec commit 62f0584
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 15 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ Commands mentioned above are very limited. They do not, for example, allow to te

To test more complex schemas, make use of user profiles (`user` command). User profiles allow to define custom schemas and custom statements used to stress the database.

Users can define custom statements via user profile yaml file. See the exemplary yaml files under `tools/util/profiles`. The path to profile file can be provided via `profile=` parameter of `user` command.

Notice that the tool reserves an `insert` operation name and predefines the behaviour
of this operation. User can execute this operation (with a given sample ratio weight)
by providing it to `ops()` parameter along with other operations defined by the user in the yaml file. This operation will simply generate and insert a full row to the stressed table. It's analogous to `write` command - the only difference is that it operates on the custom schema.

To enable the `user` mode, the tool needs to be compiled with `user-profile` feature. This feature is enabled by default.

## Development
Expand Down
69 changes: 60 additions & 9 deletions src/bin/cql-stress-cassandra-stress/operation/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ use scylla::{
transport::topology::Table, Session,
};

use anyhow::Result;
use anyhow::{Context, Result};

use crate::{
java_generate::{
distribution::{Distribution, DistributionFactory},
values::{Generator, GeneratorConfig, ValueGeneratorFactory},
},
settings::{CassandraStressSettings, OpWeight},
settings::{CassandraStressSettings, OpWeight, PREDEFINED_INSERT_OPERATION},
stats::ShardedStats,
};

Expand Down Expand Up @@ -127,6 +127,31 @@ pub struct UserOperationFactory {
}

impl UserOperationFactory {
async fn prepare_insert_statement(
session: &Arc<Session>,
table_name: &str,
table_metadata: &Table,
) -> Result<PreparedStatement> {
let column_names = table_metadata
.columns
.keys()
.map(String::as_str)
.collect::<Vec<_>>();

let column_list_str = column_names.join(", ");
let column_values_str = std::iter::repeat("?")
.take(column_names.len())
.collect::<Vec<_>>()
.join(", ");

let statement_str =
format!("INSERT INTO {table_name} ({column_list_str}) VALUES ({column_values_str})");
session
.prepare(statement_str)
.await
.context("Failed to prepare statement for 'insert' operation.")
}

pub async fn new(
settings: Arc<CassandraStressSettings>,
session: Arc<Session>,
Expand Down Expand Up @@ -158,13 +183,39 @@ impl UserOperationFactory {
"Compound partition keys are not yet supported by the tool!"
);

let mut queries_payload = HashMap::new();
for (q_name, (q_def, weight)) in query_definitions {
queries_payload.insert(
q_name.to_owned(),
(q_def.to_prepared_statement(&session).await?, *weight),
);
}
let queries_payload = {
let mut queries_payload = HashMap::new();
for (q_name, (q_def, weight)) in query_definitions {
queries_payload.insert(
q_name.to_owned(),
(q_def.to_prepared_statement(&session).await?, *weight),
);
}
// Handle 'insert' operation separately.
if let Some(insert_weight) = &user_profile.insert_operation_weight {
let insert_statement =
Self::prepare_insert_statement(&session, &user_profile.table, &table_metadata)
.await?;
queries_payload.insert(
PREDEFINED_INSERT_OPERATION.to_owned(),
(insert_statement, *insert_weight),
);
}

println!("\n========================");
println!("Operations to be performed and their sample ratio weights:\n");
for (q_name, (statement, q_weight)) in queries_payload.iter() {
println!(
"- {}: {{ 'cql': '{}', 'weight': {} }}",
q_name,
statement.get_statement(),
q_weight
);
}
println!("========================\n");

queries_payload
};

let pk_seed_distribution = settings.population.pk_seed_distribution.create().into();
let max_operations = settings.command_params.common.operation_count;
Expand Down
4 changes: 2 additions & 2 deletions src/bin/cql-stress-cassandra-stress/settings/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use self::counter::CounterParams;
use self::mixed::print_help_mixed;
use self::mixed::MixedParams;
#[cfg(feature = "user-profile")]
pub use self::user::OpWeight;
#[cfg(feature = "user-profile")]
use self::user::UserParams;
#[cfg(feature = "user-profile")]
pub use self::user::{OpWeight, PREDEFINED_INSERT_OPERATION};
pub use help::print_help;

use super::ParsePayload;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# This file contains an 'insert' query, thus it's invalid.
# This is a reserved name for an operation - the tool predefines its behaviour.
keyspace: foo
table: bar
queries:
insert:
cql: select c1 from standard1 where pkey = ?
31 changes: 29 additions & 2 deletions src/bin/cql-stress-cassandra-stress/settings/command/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ impl QueryDefinition {
}
}

pub const PREDEFINED_INSERT_OPERATION: &str = "insert";

impl Parsable for UserProfile {
type Parsed = Self;

Expand All @@ -93,6 +95,10 @@ impl Parsable for UserProfile {
!profile.queries.is_empty(),
"'queries' map cannot be empty. Please define at least one query."
);
anyhow::ensure!(
!profile.queries.contains_key(PREDEFINED_INSERT_OPERATION),
"'{PREDEFINED_INSERT_OPERATION}' is a reserved name for the operation. See help message for user command for more details."
);
Ok(profile)
}
}
Expand All @@ -109,6 +115,7 @@ pub struct UserParams {
// this query will be sampled.
pub queries_payload: HashMap<String, (QueryDefinition, OpWeight)>,
pub clustering: Arc<dyn DistributionFactory>,
pub insert_operation_weight: Option<OpWeight>,
}

impl UserParams {
Expand Down Expand Up @@ -157,9 +164,13 @@ impl UserParams {
table_definition,
mut queries,
} = handles.profile.get().unwrap();
let queries_ratio = handles.ratio.get().unwrap();
let mut queries_ratio = handles.ratio.get().unwrap();
let clustering: Arc<dyn DistributionFactory> = handles.clustering.get().unwrap().into();

// Handle the `insert` operation separately. This operation is not defined in the yaml file.
// Its behaviour is predefined by the tool.
let insert_operation_weight = queries_ratio.remove(PREDEFINED_INSERT_OPERATION);

let queries_payload = queries_ratio
.into_iter()
.map(
Expand All @@ -184,6 +195,7 @@ impl UserParams {
table_definition,
queries_payload,
clustering,
insert_operation_weight,
})
}
}
Expand All @@ -205,7 +217,13 @@ fn prepare_parser(cmd: &str) -> (ParamsParser, CommonParamHandles, UserParamHand
"Specify the path to a yaml cql3 profile",
true,
);
let ratio = parser.simple_param("ops", None, "Specify the ratios for inserts/queries to perform; e.g. ops(insert=2,<query1>=1) will perform 2 inserts for each query1", true);
let ratio = parser.simple_param(
"ops",
None,
"Specify the ratios for inserts/queries to perform; e.g. ops(insert=2,<query1>=1) will perform 2 inserts for each query1.
'insert' is a reserved name for an operation, thus query with such name cannot be defined in a profile yaml.",
true
);
let clustering = parser.simple_param(
"clustering=",
Some("GAUSSIAN(1..10)"),
Expand Down Expand Up @@ -298,6 +316,15 @@ mod tests {
assert!(profile.is_err());
}

#[test]
fn insert_query_profile_yaml_contents_test() {
// 'insert' is a reserved name for an operation. Parsing should fail.
let yaml_filepath = build_file_path("insert_query_profile.yaml");

let profile = UserProfile::parse(&yaml_filepath);
assert!(profile.is_err());
}

#[test]
fn full_profile_yaml_contents_test() {
let yaml_filepath = build_file_path("full_profile.yaml");
Expand Down
4 changes: 2 additions & 2 deletions src/bin/cql-stress-cassandra-stress/settings/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ mod test;
pub use command::Command;
pub use command::CommandParams;
pub use command::MixedSubcommand;
#[cfg(feature = "user-profile")]
pub use command::OpWeight;
pub use command::OperationRatio;
#[cfg(feature = "user-profile")]
pub use command::{OpWeight, PREDEFINED_INSERT_OPERATION};
pub use option::ThreadsInfo;
use regex::Regex;
use scylla::Session;
Expand Down

0 comments on commit 62f0584

Please sign in to comment.