Skip to content

Commit

Permalink
Fix sparse
Browse files Browse the repository at this point in the history
  • Loading branch information
loneylee committed Dec 5, 2023
1 parent f31d163 commit 2432284
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@
}
},
"baseSchema": {
"names": ["s_suppkey", "s_nationkey"],
"names": ["s_suppkey", "s_nationkey", "s_name", "s_address", "s_phone", "s_acctbal", "s_comment"],
"struct": {
"types": [{
"i64": {
Expand All @@ -256,6 +256,26 @@
"i64": {
"nullability": "NULLABILITY_REQUIRED"
}
}, {
"string": {
"nullability": "NULLABILITY_REQUIRED"
}
}, {
"string": {
"nullability": "NULLABILITY_REQUIRED"
}
}, {
"string": {
"nullability": "NULLABILITY_REQUIRED"
}
}, {
"fp64": {
"nullability": "NULLABILITY_REQUIRED"
}
}, {
"string": {
"nullability": "NULLABILITY_REQUIRED"
}
}]
}
},
Expand Down Expand Up @@ -408,7 +428,7 @@
}
},
"baseSchema": {
"names": ["n_nationkey", "n_regionkey"],
"names": ["n_nationkey", "n_regionkey", "n_name"],
"struct": {
"types": [{
"i64": {
Expand All @@ -418,6 +438,10 @@
"i64": {
"nullability": "NULLABILITY_REQUIRED"
}
}, {
"string": {
"nullability": "NULLABILITY_REQUIRED"
}
}]
}
},
Expand Down
8 changes: 4 additions & 4 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/
#include <filesystem>
#include <memory>
#include <fstream>
#include <iostream>
#include <memory>
#include <optional>
#include <AggregateFunctions/Combinators/AggregateFunctionCombinatorFactory.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
Expand All @@ -31,8 +31,8 @@
#include <Core/ColumnWithTypeAndName.h>
#include <Core/NamesAndTypes.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
Expand Down Expand Up @@ -68,10 +68,10 @@
#include <regex>
#include "CHUtil.h"

#include <Storages/StorageMergeTreeFactory.h>
#include <Common/MergeTreeTool.h>
#include <Parser/TypeParser.h>
#include <Storages/StorageMergeTreeFactory.h>
#include <Poco/JSON/Parser.h>
#include <Common/MergeTreeTool.h>

namespace DB
{
Expand Down
25 changes: 11 additions & 14 deletions cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,26 +85,23 @@ MergeTreeRelParser::parse(DB::QueryPlanPtr query_plan, const substrait::Rel & re
}
auto names_and_types_list = header.getNamesAndTypesList();
auto storage_factory = StorageMergeTreeFactory::instance();
// auto metadata = buildMetaData(names_and_types_list, context);


auto storage = storage_factory.getStorage(
StorageID(merge_tree_table.database, merge_tree_table.table),
ColumnsDescription(),
[&]() -> CustomStorageMergeTreePtr
{
// auto custom_storage_merge_tree = std::make_shared<CustomStorageMergeTree>(
// StorageID(merge_tree_table.database, merge_tree_table.table),
// merge_tree_table.relative_path,
// *metadata,
// false,
// global_context,
// "",
// MergeTreeData::MergingParams(),
// buildMergeTreeSettings());
// custom_storage_merge_tree->loadDataParts(false, std::nullopt);
// return custom_storage_merge_tree;
return nullptr;
auto custom_storage_merge_tree = std::make_shared<CustomStorageMergeTree>(
StorageID(merge_tree_table.database, merge_tree_table.table),
merge_tree_table.relative_path,
*buildMetaData(names_and_types_list, context),
false,
global_context,
"",
MergeTreeData::MergingParams(),
buildMergeTreeSettings());
custom_storage_merge_tree->loadDataParts(false, std::nullopt);
return custom_storage_merge_tree;
});
auto metadata = storage->getInMemoryMetadataPtr();
query_context.metadata = metadata;
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,15 +298,15 @@ void ColumnsBuffer::appendSelective(
accumulated_columns.reserve(source.columns());
for (size_t i = 0; i < source.columns(); i++)
{
auto column = source.getColumns()[i]->convertToFullColumnIfConst()->cloneEmpty();
auto column = source.getColumns()[i]->convertToFullColumnIfConst()->convertToFullColumnIfSparse()->cloneEmpty();
column->reserve(prefer_buffer_size);
accumulated_columns.emplace_back(std::move(column));
}
}
if (!accumulated_columns[column_idx]->onlyNull())
{
accumulated_columns[column_idx]->insertRangeSelective(
*source.getByPosition(column_idx).column->convertToFullColumnIfConst(), selector, from, length);
*source.getByPosition(column_idx).column->convertToFullColumnIfConst()->convertToFullColumnIfSparse(), selector, from, length);
}
else
{
Expand Down
49 changes: 25 additions & 24 deletions cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,34 +41,35 @@ void StorageMergeTreeFactory::loadStorage(StorageID id, ColumnsDescription colum
}

CustomStorageMergeTreePtr
StorageMergeTreeFactory::getStorage(StorageID id, ColumnsDescription columns, std::function<CustomStorageMergeTreePtr()> creator)
StorageMergeTreeFactory::getStorage(StorageID id, ColumnsDescription /*columns*/, std::function<CustomStorageMergeTreePtr()> creator)
{
auto table_name = id.database_name + "." + id.table_name;
// std::lock_guard lock(storage_map_mutex);

if (!storage_map.contains(table_name))
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} metadata is not loaded.", table_name);
// if (storage_map.contains(table_name))
// {
// std::set<std::string> existed_columns = storage_columns_map.at(table_name);
// for (const auto & column : columns)
// {
// if (!existed_columns.contains(column.name))
// {
// storage_map.erase(table_name);
// storage_columns_map.erase(table_name);
// }
// }
// }
// if (!storage_map.contains(table_name))
// {
// storage_map.emplace(table_name, creator());
// storage_columns_map.emplace(table_name, std::set<std::string>());
// for (const auto & column : storage_map.at(table_name)->getInMemoryMetadataPtr()->columns)
// {
// storage_columns_map.at(table_name).emplace(column.name);
// }
// }
LOG_WARNING(&Poco::Logger::get("StorageMergeTreeFactory"), "Table {} metadata is not load on session start.", table_name);

std::lock_guard lock(storage_map_mutex);

if (storage_map.contains(table_name))
{
std::set<std::string> existed_columns = storage_columns_map.at(table_name);
for (const auto & column : storage_map.at(table_name)->getInMemoryMetadata().getColumns())
{
if (!existed_columns.contains(column.name))
{
storage_map.erase(table_name);
storage_columns_map.erase(table_name);
}
}
}
if (!storage_map.contains(table_name))
{
storage_map.emplace(table_name, creator());
storage_columns_map.emplace(table_name, std::set<std::string>());
for (const auto & column : storage_map.at(table_name)->getInMemoryMetadataPtr()->columns)
storage_columns_map.at(table_name).emplace(column.name);
}
}
return storage_map.at(table_name);
}
Expand Down

0 comments on commit 2432284

Please sign in to comment.