Skip to content

Commit

Permalink
Conflict fix cte (#142)
Browse files Browse the repository at this point in the history
* [#125] Use CTEs in the updatestock procedures
* [#125] Use CTEs in geopartitioned schema manager
* Removing no null from column definitions.
* setting yb_enable_expression_pushdown at connection level

Co-authored-by: Hemant Bhanawat <hbhanawat@yugabyte.com>
Co-authored-by: Andrei Martsinchyk <amartsinchyk@yugabyte.com>
  • Loading branch information
3 people authored Sep 29, 2023
1 parent fe07ceb commit 9a36b61
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 102 deletions.
1 change: 1 addition & 0 deletions src/com/oltpbenchmark/api/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ protected final ArrayList<Pair<TransactionExecutionState, TransactionStatus>> do
startConnection = System.nanoTime();

conn = dataSource.getConnection();
conn.createStatement().execute("SET yb_enable_expression_pushdown to on");
if (next.getProcedureClass() != StockLevel.class) {
// In accordance with 2.8.2.3 of the TPCC spec, StockLevel should execute each query in its own Snapshot
// Isolation.
Expand Down
180 changes: 90 additions & 90 deletions src/com/oltpbenchmark/schema/TPCCTableSchemas.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,128 +10,128 @@
public class TPCCTableSchemas {
public static final Map<String, TableSchema> tables = Collections.unmodifiableMap(Stream.of(
new TableSchemaBuilder(TPCCConstants.TABLENAME_ORDERLINE)
.column("ol_w_id", "int NOT NULL")
.column("ol_d_id", "int NOT NULL")
.column("ol_o_id", "int NOT NULL")
.column("ol_number", "int NOT NULL")
.column("ol_i_id", "int NOT NULL")
.column("ol_w_id", "int ")
.column("ol_d_id", "int ")
.column("ol_o_id", "int ")
.column("ol_number", "int ")
.column("ol_i_id", "int ")
.column("ol_delivery_d", "timestamp NULL DEFAULT NULL")
.column("ol_amount", "decimal(6,2) NOT NULL")
.column("ol_supply_w_id", "int NOT NULL")
.column("ol_quantity", "decimal(2,0) NOT NULL")
.column("ol_dist_info", "char(24) NOT NULL")
.column("ol_amount", "decimal(6,2) ")
.column("ol_supply_w_id", "int ")
.column("ol_quantity", "decimal(2,0) ")
.column("ol_dist_info", "char(24) ")
.primaryKey("((ol_w_id,ol_d_id) HASH,ol_o_id,ol_number)")
.partitionKey("(ol_w_id)")
.build(),
new TableSchemaBuilder(TPCCConstants.TABLENAME_NEWORDER)
.column("no_w_id", "int NOT NULL")
.column("no_d_id", "int NOT NULL")
.column("no_o_id", "int NOT NULL")
.column("no_w_id", "int ")
.column("no_d_id", "int ")
.column("no_o_id", "int ")
.primaryKey("((no_w_id,no_d_id) HASH,no_o_id)")
.partitionKey("(no_w_id)")
.build(),
new TableSchemaBuilder(TPCCConstants.TABLENAME_STOCK)
.column("s_w_id", "int NOT NULL")
.column("s_i_id", "int NOT NULL")
.column("s_quantity", "decimal(4,0) NOT NULL")
.column("s_ytd", "decimal(8,2) NOT NULL")
.column("s_order_cnt", "int NOT NULL")
.column("s_remote_cnt", "int NOT NULL")
.column("s_data", "varchar(50) NOT NULL")
.column("s_dist_01", "char(24) NOT NULL")
.column("s_dist_02", "char(24) NOT NULL")
.column("s_dist_03", "char(24) NOT NULL")
.column("s_dist_04", "char(24) NOT NULL")
.column("s_dist_05", "char(24) NOT NULL")
.column("s_dist_06", "char(24) NOT NULL")
.column("s_dist_07", "char(24) NOT NULL")
.column("s_dist_08", "char(24) NOT NULL")
.column("s_dist_09", "char(24) NOT NULL")
.column("s_dist_10", "char(24) NOT NULL")
.column("s_w_id", "int ")
.column("s_i_id", "int ")
.column("s_quantity", "decimal(4,0) ")
.column("s_ytd", "decimal(8,2) ")
.column("s_order_cnt", "int ")
.column("s_remote_cnt", "int ")
.column("s_data", "varchar(50) ")
.column("s_dist_01", "char(24) ")
.column("s_dist_02", "char(24) ")
.column("s_dist_03", "char(24) ")
.column("s_dist_04", "char(24) ")
.column("s_dist_05", "char(24) ")
.column("s_dist_06", "char(24) ")
.column("s_dist_07", "char(24) ")
.column("s_dist_08", "char(24) ")
.column("s_dist_09", "char(24) ")
.column("s_dist_10", "char(24) ")
.primaryKey("(s_w_id HASH, s_i_id ASC)")
.partitionKey("(s_w_id)")
.build(),
new TableSchemaBuilder(TPCCConstants.TABLENAME_OPENORDER)
.column("o_w_id", "int NOT NULL")
.column("o_d_id", "int NOT NULL")
.column("o_id", "int NOT NULL")
.column("o_c_id", "int NOT NULL")
.column("o_w_id", "int ")
.column("o_d_id", "int ")
.column("o_id", "int ")
.column("o_c_id", "int ")
.column("o_carrier_id", "int DEFAULT NULL")
.column("o_ol_cnt", "decimal(2,0) NOT NULL")
.column("o_all_local", "decimal(1,0) NOT NULL")
.column("o_entry_d", "timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP")
.column("o_ol_cnt", "decimal(2,0) ")
.column("o_all_local", "decimal(1,0) ")
.column("o_entry_d", "timestamp DEFAULT CURRENT_TIMESTAMP")
.primaryKey("((o_w_id,o_d_id) HASH,o_id)")
.partitionKey("(o_w_id)")
.build(),
new TableSchemaBuilder(TPCCConstants.TABLENAME_HISTORY)
.column("h_c_id", "int NOT NULL")
.column("h_c_d_id", "int NOT NULL")
.column("h_c_w_id", "int NOT NULL")
.column("h_d_id", "int NOT NULL")
.column("h_w_id", "int NOT NULL")
.column("h_date", "timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP")
.column("h_amount", "decimal(6,2) NOT NULL")
.column("h_data", "varchar(24) NOT NULL")
.column("h_c_id", "int ")
.column("h_c_d_id", "int ")
.column("h_c_w_id", "int ")
.column("h_d_id", "int ")
.column("h_w_id", "int ")
.column("h_date", "timestamp DEFAULT CURRENT_TIMESTAMP")
.column("h_amount", "decimal(6,2) ")
.column("h_data", "varchar(24) ")
.partitionKey("(h_w_id)")
.build(),
new TableSchemaBuilder(TPCCConstants.TABLENAME_CUSTOMER)
.column("c_w_id", "int NOT NULL")
.column("c_d_id", "int NOT NULL")
.column("c_id", "int NOT NULL")
.column("c_discount", "decimal(4,4) NOT NULL")
.column("c_credit", "char(2) NOT NULL")
.column("c_last", "varchar(16) NOT NULL")
.column("c_first", "varchar(16) NOT NULL")
.column("c_credit_lim", "decimal(12,2) NOT NULL")
.column("c_balance", "decimal(12,2) NOT NULL")
.column("c_ytd_payment", "float NOT NULL")
.column("c_payment_cnt", "int NOT NULL")
.column("c_delivery_cnt", "int NOT NULL")
.column("c_street_1", "varchar(20) NOT NULL")
.column("c_street_2", "varchar(20) NOT NULL")
.column("c_city", "varchar(20) NOT NULL")
.column("c_state", "char(2) NOT NULL")
.column("c_zip", "char(9) NOT NULL")
.column("c_phone", "char(16) NOT NULL")
.column("c_since", "timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP")
.column("c_middle", "char(2) NOT NULL")
.column("c_data", "varchar(500) NOT NULL")
.column("c_w_id", "int ")
.column("c_d_id", "int ")
.column("c_id", "int ")
.column("c_discount", "decimal(4,4) ")
.column("c_credit", "char(2) ")
.column("c_last", "varchar(16) ")
.column("c_first", "varchar(16) ")
.column("c_credit_lim", "decimal(12,2) ")
.column("c_balance", "decimal(12,2) ")
.column("c_ytd_payment", "float ")
.column("c_payment_cnt", "int ")
.column("c_delivery_cnt", "int ")
.column("c_street_1", "varchar(20) ")
.column("c_street_2", "varchar(20) ")
.column("c_city", "varchar(20) ")
.column("c_state", "char(2) ")
.column("c_zip", "char(9) ")
.column("c_phone", "char(16) ")
.column("c_since", "timestamp DEFAULT CURRENT_TIMESTAMP")
.column("c_middle", "char(2) ")
.column("c_data", "varchar(500) ")
.primaryKey("((c_w_id,c_d_id) HASH,c_id)")
.partitionKey("(c_w_id)")
.build(),
new TableSchemaBuilder(TPCCConstants.TABLENAME_DISTRICT)
.column("d_w_id", "int NOT NULL")
.column("d_id", "int NOT NULL")
.column("d_ytd", "decimal(12,2) NOT NULL")
.column("d_tax", "decimal(4,4) NOT NULL")
.column("d_next_o_id", "int NOT NULL")
.column("d_name", "varchar(10) NOT NULL")
.column("d_street_1", "varchar(20) NOT NULL")
.column("d_street_2", "varchar(20) NOT NULL")
.column("d_city", "varchar(20) NOT NULL")
.column("d_state", "char(2) NOT NULL")
.column("d_zip", "char(9) NOT NULL")
.column("d_w_id", "int ")
.column("d_id", "int ")
.column("d_ytd", "decimal(12,2) ")
.column("d_tax", "decimal(4,4) ")
.column("d_next_o_id", "int ")
.column("d_name", "varchar(10) ")
.column("d_street_1", "varchar(20) ")
.column("d_street_2", "varchar(20) ")
.column("d_city", "varchar(20) ")
.column("d_state", "char(2) ")
.column("d_zip", "char(9) ")
.primaryKey("((d_w_id,d_id) HASH)")
.partitionKey("(d_w_id)")
.build(),
new TableSchemaBuilder(TPCCConstants.TABLENAME_ITEM)
.column("i_id", "int NOT NULL")
.column("i_name", "varchar(24) NOT NULL")
.column("i_price", "decimal(5,2) NOT NULL")
.column("i_data", "varchar(50) NOT NULL")
.column("i_im_id", "int NOT NULL")
.column("i_id", "int ")
.column("i_name", "varchar(24) ")
.column("i_price", "decimal(5,2) ")
.column("i_data", "varchar(50) ")
.column("i_im_id", "int ")
.primaryKey("(i_id)")
.build(),
new TableSchemaBuilder(TPCCConstants.TABLENAME_WAREHOUSE)
.column("w_id", "int NOT NULL")
.column("w_ytd", "decimal(12,2) NOT NULL")
.column("w_tax", "decimal(4,4) NOT NULL")
.column("w_name", "varchar(10) NOT NULL")
.column("w_street_1", "varchar(20) NOT NULL")
.column("w_street_2", "varchar(20) NOT NULL")
.column("w_city", "varchar(20) NOT NULL")
.column("w_state", "char(2) NOT NULL")
.column("w_zip", "char(9) NOT NULL")
.column("w_id", "int ")
.column("w_ytd", "decimal(12,2) ")
.column("w_tax", "decimal(4,4) ")
.column("w_name", "varchar(10) ")
.column("w_street_1", "varchar(20) ")
.column("w_street_2", "varchar(20) ")
.column("w_city", "varchar(20) ")
.column("w_state", "char(2) ")
.column("w_zip", "char(9) ")
.primaryKey("(w_id)")
.partitionKey("(w_id)")
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void enableForeignKeyConstraints() throws SQLException {
execute("ALTER TABLE ORDER_LINE ADD CONSTRAINT OL_FKEY_S FOREIGN KEY " +
"(OL_SUPPLY_W_ID, OL_I_ID) REFERENCES STOCK (S_W_ID, S_I_ID) NOT VALID");
}

@Override
public void createSqlProcedures() throws Exception {
try (Statement st = db_connection.createStatement()) {
Expand All @@ -97,12 +97,19 @@ public void createSqlProcedures() throws Exception {
argsSb.append("wid int");
for (int i = 1; i <= 15; ++i) {
argsSb.append(String.format(", i%d int, q%d int, y%d int, r%d int", i, i, i, i));

if (i == 1) {
updateStatements.append("WITH ");
} else {
updateStatements.append(", ");
}
updateStatements.append(String.format(
"UPDATE STOCK SET S_QUANTITY = q%d, S_YTD = y%d, S_ORDER_CNT = S_ORDER_CNT + 1, " +
"S_REMOTE_CNT = r%d WHERE S_W_ID = wid AND S_I_ID = i%d;",
i, i, i, i));
"update_cte%d AS (UPDATE STOCK SET " +
"S_QUANTITY = q%d, S_YTD = y%d, S_ORDER_CNT = S_ORDER_CNT + 1, " +
"S_REMOTE_CNT = r%d WHERE S_W_ID = wid AND S_I_ID = i%d)",
i, i, i, i, i));
String updateStmt =
String.format("CREATE PROCEDURE updatestock%d (%s) AS '%s' LANGUAGE SQL;",
String.format("CREATE PROCEDURE updatestock%d (%s) AS '%s SELECT 1' LANGUAGE SQL;",
i, argsSb.toString(), updateStatements.toString());

st.execute(String.format("DROP PROCEDURE IF EXISTS updatestock%d", i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ public GeoPartitionedSchemaManager(GeoPartitionPolicy geoPartitioningPolicy, Con
super(conn);
this.geoPartitionPolicy = geoPartitioningPolicy;
for (TableSchema t : TPCCTableSchemas.tables.values()) {
tables.put(t.name(),
t.name().equals(TPCCConstants.TABLENAME_ITEM) ? new DefaultTable(t, geoPartitioningPolicy.getTablespaceForItemTable())
tables.put(t.name(),
t.name().equals(TPCCConstants.TABLENAME_ITEM) ? new DefaultTable(t, geoPartitioningPolicy.getTablespaceForItemTable())
: new PartitionedTable(t, geoPartitionPolicy));
}
}
Expand Down Expand Up @@ -121,7 +121,7 @@ public void enableForeignKeyConstraints() throws SQLException {
"(OL_SUPPLY_W_ID, OL_I_ID) REFERENCES STOCK%d (S_W_ID, S_I_ID) NOT VALID", idx, idx, idx));
}
}

@Override
public void createSqlProcedures() throws Exception {
try (Statement st = db_connection.createStatement()) {
Expand All @@ -137,12 +137,18 @@ public void createSqlProcedures() throws Exception {
// Create functions that update the partition tables themselves.
for (int i = 1; i <= 15; ++i) {
argsSb.append(String.format(", i%d int, q%d int, y%d int, r%d int", i, i, i, i));
if (i == 1) {
updateStatements.append("WITH ");
} else {
updateStatements.append(", ");
}
updateStatements.append(String.format(
"UPDATE STOCK%d SET S_QUANTITY = q%d, S_YTD = y%d, S_ORDER_CNT = S_ORDER_CNT + 1, " +
"S_REMOTE_CNT = r%d WHERE S_W_ID = wid AND S_I_ID = i%d;",
partition, i, i, i, i));
"update_cte%d AS (UPDATE STOCK%d SET " +
"S_QUANTITY = q%d, S_YTD = y%d, S_ORDER_CNT = S_ORDER_CNT + 1, " +
"S_REMOTE_CNT = r%d WHERE S_W_ID = wid AND S_I_ID = i%d)",
i, partition, i, i, i, i));
String updateStmt =
String.format("CREATE PROCEDURE updatestock%d_%d (%s) AS '%s' LANGUAGE SQL;",
String.format("CREATE PROCEDURE updatestock%d_%d (%s) AS '%s SELECT 1' LANGUAGE SQL;",
i, partition, argsSb.toString(), updateStatements.toString());

st.execute(String.format("DROP PROCEDURE IF EXISTS updatestock%d_%d", i, partition));
Expand Down

0 comments on commit 9a36b61

Please sign in to comment.