-
Notifications
You must be signed in to change notification settings - Fork 623
[15721] Compiled Catalog Query Access #1339
base: master
Are you sure you want to change the base?
Changes from all commits
f91e43b
8d2dc8b
7b5d5f2
644a73b
f80cead
b318853
038f7e2
dedc056
d215620
96e0ac8
90439c3
6e40ce2
d2f0a9a
ca66c5b
c718372
b410e3d
f581859
89161d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,9 @@ | |
//===----------------------------------------------------------------------===// | ||
|
||
#include "catalog/abstract_catalog.h" | ||
|
||
#include "executor/plan_executor.h" | ||
#include "codegen/buffering_consumer.h" | ||
#include "common/internal_types.h" | ||
#include "common/statement.h" | ||
|
||
#include "catalog/catalog.h" | ||
|
@@ -90,11 +92,6 @@ AbstractCatalog::AbstractCatalog(const std::string &catalog_table_ddl, | |
} | ||
} | ||
|
||
/*@brief insert tuple(reord) helper function | ||
* @param tuple tuple to be inserted | ||
* @param txn TransactionContext | ||
* @return Whether insertion is Successful | ||
*/ | ||
bool AbstractCatalog::InsertTuple(std::unique_ptr<storage::Tuple> tuple, | ||
concurrency::TransactionContext *txn) { | ||
if (txn == nullptr) | ||
|
@@ -129,12 +126,53 @@ bool AbstractCatalog::InsertTuple(std::unique_ptr<storage::Tuple> tuple, | |
return this_p_status.m_result == peloton::ResultType::SUCCESS; | ||
} | ||
|
||
/*@brief Delete a tuple using index scan | ||
* @param index_offset Offset of index for scan | ||
* @param values Values for search | ||
* @param txn TransactionContext | ||
* @return Whether deletion is Successful | ||
*/ | ||
|
||
bool AbstractCatalog::InsertTupleWithCompiledPlan(const std::vector<std::vector< | ||
std::unique_ptr<expression::AbstractExpression>>> *insert_values, | ||
concurrency::TransactionContext *txn) { | ||
if (txn == nullptr) | ||
throw CatalogException("Insert tuple requires transaction"); | ||
|
||
std::vector<std::string> columns; | ||
std::shared_ptr<planner::InsertPlan> insert_plan( | ||
new planner::InsertPlan(catalog_table_, &columns, insert_values)); | ||
|
||
// Bind the plan | ||
planner::BindingContext context; | ||
insert_plan->PerformBinding(context); | ||
|
||
// Prepare a consumer to collect the result | ||
codegen::BufferingConsumer buffer{{}, context}; | ||
|
||
|
||
bool cached; | ||
|
||
codegen::QueryParameters parameters(*insert_plan, {}); | ||
std::unique_ptr<executor::ExecutorContext> executor_context( | ||
new executor::ExecutorContext(txn, std::move(parameters))); | ||
|
||
// search for query | ||
codegen::Query *query = codegen::QueryCache::Instance().Find(insert_plan); | ||
std::unique_ptr<codegen::Query> compiled_query(nullptr); | ||
cached = (query != nullptr); | ||
|
||
// if not cached, compile the query and save it into cache | ||
executor::ExecutionResult ret; | ||
if (!cached) { | ||
compiled_query = codegen::QueryCompiler().Compile( | ||
*insert_plan, executor_context->GetParams().GetQueryParametersMap(), | ||
buffer); | ||
query = compiled_query.get(); | ||
codegen::QueryCache::Instance().Add(insert_plan, std::move(compiled_query)); | ||
} | ||
|
||
query->Execute(std::move(executor_context), buffer, | ||
[&ret](executor::ExecutionResult result) { ret = result; }); | ||
|
||
return ret.m_result == peloton::ResultType::SUCCESS; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we keep the query in our cache even if it fails? |
||
} | ||
|
||
|
||
bool AbstractCatalog::DeleteWithIndexScan( | ||
oid_t index_offset, std::vector<type::Value> values, | ||
concurrency::TransactionContext *txn) { | ||
|
@@ -178,13 +216,53 @@ bool AbstractCatalog::DeleteWithIndexScan( | |
return status; | ||
} | ||
|
||
/*@brief Index scan helper function | ||
* @param column_offsets Column ids for search (projection) | ||
* @param index_offset Offset of index for scan | ||
* @param values Values for search | ||
* @param txn TransactionContext | ||
* @return Unique pointer of vector of logical tiles | ||
*/ | ||
bool AbstractCatalog::DeleteWithCompiledSeqScan( | ||
std::vector<oid_t> column_offsets, | ||
expression::AbstractExpression *predicate, | ||
concurrency::TransactionContext *txn) { | ||
if (txn == nullptr) | ||
throw CatalogException("Delete tuple requires transaction"); | ||
|
||
std::shared_ptr<planner::DeletePlan> delete_plan{ | ||
new planner::DeletePlan(catalog_table_)}; | ||
|
||
std::unique_ptr<planner::AbstractPlan> scan{new planner::SeqScanPlan( | ||
catalog_table_, predicate, column_offsets)}; | ||
delete_plan->AddChild(std::move(scan)); | ||
|
||
// Do binding | ||
planner::BindingContext context; | ||
delete_plan->PerformBinding(context); | ||
|
||
codegen::BufferingConsumer buffer{column_offsets, context}; | ||
|
||
bool cached; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move the |
||
|
||
codegen::QueryParameters parameters(*delete_plan, {}); | ||
std::unique_ptr<executor::ExecutorContext> executor_context( | ||
new executor::ExecutorContext(txn, std::move(parameters))); | ||
|
||
// search for query | ||
codegen::Query *query = codegen::QueryCache::Instance().Find(delete_plan);; | ||
std::unique_ptr<codegen::Query> compiled_query(nullptr); | ||
cached = (query != nullptr); | ||
|
||
// if not cached, compile the query and save it into cache | ||
executor::ExecutionResult ret; | ||
if (!cached) { | ||
compiled_query = codegen::QueryCompiler().Compile( | ||
*delete_plan, executor_context->GetParams().GetQueryParametersMap(), | ||
buffer); | ||
query = compiled_query.get(); | ||
codegen::QueryCache::Instance().Add(delete_plan, std::move(compiled_query)); | ||
} | ||
|
||
query->Execute(std::move(executor_context), buffer, | ||
[&ret](executor::ExecutionResult result) { ret = result; }); | ||
|
||
return ret.m_result == peloton::ResultType::SUCCESS; | ||
} | ||
|
||
std::unique_ptr<std::vector<std::unique_ptr<executor::LogicalTile>>> | ||
AbstractCatalog::GetResultWithIndexScan( | ||
std::vector<oid_t> column_offsets, oid_t index_offset, | ||
|
@@ -226,15 +304,7 @@ AbstractCatalog::GetResultWithIndexScan( | |
return result_tiles; | ||
} | ||
|
||
/*@brief Sequential scan helper function | ||
* NOTE: try to use efficient index scan instead of sequential scan, but you | ||
* shouldn't build too many indexes on one catalog table | ||
* @param column_offsets Column ids for search (projection) | ||
* @param predicate predicate for this sequential scan query | ||
* @param txn TransactionContext | ||
* | ||
* @return Unique pointer of vector of logical tiles | ||
*/ | ||
|
||
std::unique_ptr<std::vector<std::unique_ptr<executor::LogicalTile>>> | ||
AbstractCatalog::GetResultWithSeqScan(std::vector<oid_t> column_offsets, | ||
expression::AbstractExpression *predicate, | ||
|
@@ -261,15 +331,48 @@ AbstractCatalog::GetResultWithSeqScan(std::vector<oid_t> column_offsets, | |
return result_tiles; | ||
} | ||
|
||
/*@brief Add index on catalog table | ||
* @param key_attrs indexed column offset(position) | ||
* @param index_oid index id(global unique) | ||
* @param index_name index name(global unique) | ||
* @param index_constraint index constraints | ||
* @return Unique pointer of vector of logical tiles | ||
* Note: Use catalog::Catalog::CreateIndex() if you can, only ColumnCatalog and | ||
* IndexCatalog should need this | ||
*/ | ||
|
||
std::vector<codegen::WrappedTuple> | ||
AbstractCatalog::GetResultWithCompiledSeqScan( | ||
std::vector<oid_t> column_offsets, | ||
expression::AbstractExpression *predicate, | ||
concurrency::TransactionContext *txn) const { | ||
if (txn == nullptr) throw CatalogException("Scan table requires transaction"); | ||
|
||
// Create sequential scan | ||
auto plan_ptr = std::make_shared<planner::SeqScanPlan>( | ||
catalog_table_, predicate, column_offsets); | ||
planner::BindingContext scan_context; | ||
plan_ptr->PerformBinding(scan_context); | ||
|
||
// Create consumer | ||
codegen::BufferingConsumer buffer{column_offsets, scan_context}; | ||
bool cached; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move the |
||
|
||
codegen::QueryParameters parameters(*plan_ptr, {}); | ||
std::unique_ptr<executor::ExecutorContext> executor_context( | ||
new executor::ExecutorContext(txn, std::move(parameters))); | ||
|
||
// search for query | ||
codegen::Query *query = codegen::QueryCache::Instance().Find(plan_ptr); | ||
std::unique_ptr<codegen::Query> compiled_query(nullptr); | ||
cached = (query != nullptr); | ||
|
||
// if not cached, compile the query and save it into cache | ||
if (!cached) { | ||
compiled_query = codegen::QueryCompiler().Compile( | ||
*plan_ptr, executor_context->GetParams().GetQueryParametersMap(), | ||
buffer); | ||
query = compiled_query.get(); | ||
codegen::QueryCache::Instance().Add(plan_ptr, std::move(compiled_query)); | ||
} | ||
|
||
query->Execute(std::move(executor_context), buffer, | ||
[](executor::ExecutionResult result) { return result; }); | ||
|
||
return buffer.GetOutputTuples(); | ||
} | ||
|
||
void AbstractCatalog::AddIndex(const std::vector<oid_t> &key_attrs, | ||
oid_t index_oid, const std::string &index_name, | ||
IndexConstraintType index_constraint) { | ||
|
@@ -298,13 +401,77 @@ void AbstractCatalog::AddIndex(const std::vector<oid_t> &key_attrs, | |
index_name.c_str(), (int)catalog_table_->GetOid()); | ||
} | ||
|
||
/*@brief Update specific columns using index scan | ||
* @param update_columns Columns to be updated | ||
* @param update_values Values to be updated | ||
* @param scan_values Value to be scaned (used in index scan) | ||
* @param index_offset Offset of index for scan | ||
* @return true if successfully executes | ||
*/ | ||
bool AbstractCatalog::UpdateWithCompiledSeqScan( | ||
std::vector<oid_t> update_columns, std::vector<type::Value> update_values, | ||
std::vector<oid_t> column_offsets, expression::AbstractExpression *predicate, | ||
concurrency::TransactionContext *txn) { | ||
if (txn == nullptr) throw CatalogException("Scan table requires transaction"); | ||
// Construct update executor | ||
TargetList target_list; | ||
DirectMapList direct_map_list; | ||
|
||
size_t column_count = catalog_table_->GetSchema()->GetColumnCount(); | ||
for (size_t col_itr = 0; col_itr < column_count; col_itr++) { | ||
// Skip any column for update | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure what this comment means here. Could you please help me understand what this means There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is only looking for the columns in the tuple which need to be updated. For more detailed explanation you can turn to @mengranwo |
||
if (std::find(std::begin(update_columns), std::end(update_columns), | ||
col_itr) == std::end(update_columns)) { | ||
direct_map_list.emplace_back(col_itr, std::make_pair(0, col_itr)); | ||
} | ||
} | ||
|
||
PELOTON_ASSERT(update_columns.size() == update_values.size()); | ||
for (size_t i = 0; i < update_values.size(); i++) { | ||
planner::DerivedAttribute update_attribute{ | ||
new expression::ConstantValueExpression(update_values[i])}; | ||
target_list.emplace_back(update_columns[i], update_attribute); | ||
} | ||
|
||
std::unique_ptr<const planner::ProjectInfo> project_info( | ||
new planner::ProjectInfo(std::move(target_list), | ||
std::move(direct_map_list))); | ||
|
||
std::shared_ptr<planner::UpdatePlan> update_plan{ | ||
new planner::UpdatePlan(catalog_table_, std::move(project_info)) | ||
}; | ||
|
||
std::unique_ptr<planner::AbstractPlan> scan{new planner::SeqScanPlan( | ||
catalog_table_, predicate, column_offsets)}; | ||
update_plan->AddChild(std::move(scan)); | ||
|
||
// Do binding | ||
planner::BindingContext context; | ||
update_plan->PerformBinding(context); | ||
|
||
codegen::BufferingConsumer buffer{column_offsets, context}; | ||
|
||
bool cached; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move the |
||
|
||
codegen::QueryParameters parameters(*update_plan, {}); | ||
std::unique_ptr<executor::ExecutorContext> executor_context( | ||
new executor::ExecutorContext(txn, std::move(parameters))); | ||
|
||
// search for query | ||
codegen::Query *query = codegen::QueryCache::Instance().Find(update_plan);; | ||
std::unique_ptr<codegen::Query> compiled_query(nullptr); | ||
cached = (query != nullptr); | ||
|
||
// if not cached, compile the query and save it into cache | ||
executor::ExecutionResult ret; | ||
if (!cached) { | ||
compiled_query = codegen::QueryCompiler().Compile( | ||
*update_plan, executor_context->GetParams().GetQueryParametersMap(), | ||
buffer); | ||
query = compiled_query.get(); | ||
codegen::QueryCache::Instance().Add(update_plan, std::move(compiled_query)); | ||
} | ||
|
||
query->Execute(std::move(executor_context), buffer, | ||
[&ret](executor::ExecutionResult result) { ret = result; }); | ||
|
||
return ret.m_result == peloton::ResultType::SUCCESS; | ||
} | ||
|
||
|
||
bool AbstractCatalog::UpdateWithIndexScan( | ||
std::vector<oid_t> update_columns, std::vector<type::Value> update_values, | ||
std::vector<type::Value> scan_values, oid_t index_offset, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be better to define
cached
below where it's first assigned to a value.