Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DS-110] Cache individual expressions instead of all expressions in projector #7

Open
wants to merge 43 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
919f174
Add SetLlvmObjectcache function
augustoasilva Jun 21, 2021
f2da60d
Add the GandivaObjectCache class
augustoasilva Jun 21, 2021
5b5d127
Create new builder to cache the object code on memory and set an uniq…
augustoasilva Jun 21, 2021
c05d351
Add funcs to read the cached object code and a safely evict to free m…
augustoasilva Jun 21, 2021
ec74782
Change Make() func to cache the object code instead of module
augustoasilva Jun 21, 2021
8daf0d2
Remove unused debug logs
augustoasilva Jun 21, 2021
df835e2
Change Filter::Make() func to cache object code instead of module
augustoasilva Jun 21, 2021
0a5ce03
Comment test env var configuration
augustoasilva Jun 21, 2021
4f6a7e4
Add individual caching for expressions instead of all expressions on …
augustoasilva Jun 21, 2021
6fd426e
Add SetLlvmObjectcache function
augustoasilva Jun 21, 2021
f3f48e7
Add the GandivaObjectCache class
augustoasilva Jun 21, 2021
148be9a
Create new builder to cache the object code on memory and set an uniq…
augustoasilva Jun 21, 2021
337b6ef
Fix conflicts from master
augustoasilva Aug 30, 2021
06827e4
Fix conflicts from master
augustoasilva Aug 30, 2021
24a5a10
Remove unused debug logs
augustoasilva Jun 21, 2021
c559bf6
Fix conflicts from master on filter
augustoasilva Aug 30, 2021
ff01008
Comment test env var configuration
augustoasilva Jun 21, 2021
b54e3f1
Remove unecessary comments
augustoasilva Jun 24, 2021
7522bc8
Remove more unnecessary comments
augustoasilva Jun 24, 2021
c12b7c3
Fix conflicts from master on cache and projector
augustoasilva Aug 30, 2021
9c29e8d
Add comment to GandivaObjectCache class
augustoasilva Jun 24, 2021
780a164
Fix conflicts from master on obj_cache and filter
augustoasilva Aug 30, 2021
604acbf
Fix conflicts from master on filter
augustoasilva Aug 30, 2021
b1c5fa0
Update base_cache_key.h as per reviewed
augustoasilva Jun 25, 2021
b998c89
Fix conflicts from master on filter and projector
augustoasilva Aug 30, 2021
730582b
Update engine.h and engine.cc as per reviewed
augustoasilva Jun 25, 2021
3101f04
Fix conflicts from master on filter projector and cache
augustoasilva Aug 30, 2021
76491f3
Update obj-cache to new cache policy
augustoasilva Sep 1, 2021
ae09512
Fix base cache key build errors
augustoasilva Sep 2, 2021
86a08e9
Remove boost::any from dependencies
augustoasilva Sep 2, 2021
f75cf07
Remove unused imports and functions from base cache key
augustoasilva Sep 2, 2021
7126b65
Fix linting errors manually
augustoasilva Sep 2, 2021
6242b31
Fix linting errors manually on gandiva_object_cache.h
augustoasilva Sep 2, 2021
548d3e7
Fix nullptr usage on gandiva_object_cache.h
augustoasilva Sep 2, 2021
8324575
Change conda env cpp to limit boost-cpp max version
augustoasilva Sep 2, 2021
dba8d70
Fix finalize module returning ok when execution engine has error
augustoasilva Sep 8, 2021
c57f98a
Remove static used_cache_size var from projector and filter
augustoasilva Sep 8, 2021
170f381
Change back the stoul to atoi
augustoasilva Sep 8, 2021
17d5e3c
Fix linx error
augustoasilva Sep 8, 2021
553e86c
Remove unnused imports on base cache key
augustoasilva Sep 8, 2021
3fa6594
Undo conda_env_cpp.txt boost-cpp lib modification
augustoasilva Sep 8, 2021
75650ae
Merge branch 'feature/cache-object-code-on-memory-and-enable-memory-t…
augustoasilva Sep 9, 2021
00ffbd8
Fix formatting errors
augustoasilva Sep 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions cpp/src/gandiva/base_cache_key.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <arrow/util/hash_util.h>
#include <stddef.h>

#include "gandiva/expression.h"
#include "gandiva/filter.h"
#include "gandiva/projector.h"

namespace gandiva {

class BaseCacheKey {
public:
BaseCacheKey(Expression& expr, std::string type) : type_(type) {
static const int32_t kSeedValue = 4;
std::string expr_as_string = expr.ToString();
size_t result_hash = kSeedValue;
arrow::internal::hash_combine(result_hash, type);
arrow::internal::hash_combine(result_hash, expr_as_string);
hash_code_ = result_hash;
}

BaseCacheKey(ProjectorCacheKey& key, std::string type) : type_(type) {
static const int32_t kSeedValue = 4;
size_t key_hash = key.Hash();
size_t result_hash = kSeedValue;
arrow::internal::hash_combine(result_hash, type);
arrow::internal::hash_combine(result_hash, key_hash);
hash_code_ = result_hash;
schema_ = key.schema();
}

BaseCacheKey(FilterCacheKey& key, std::string type) : type_(type) {
static const size_t kSeedValue = 4;
size_t key_hash = key.Hash();
size_t result_hash = kSeedValue;
arrow::internal::hash_combine(result_hash, type);
arrow::internal::hash_combine(result_hash, key_hash);
hash_code_ = result_hash;
schema_ = key.schema();
}

BaseCacheKey(std::shared_ptr<arrow::Schema> schema, std::shared_ptr<Expression> expr,
std::string type)
: type_(type) {
static const int32_t kSeedValue = 4;
size_t result_hash = kSeedValue;
std::string schema_string = schema->ToString();
std::string expr_string = expr->ToString();
arrow::internal::hash_combine(result_hash, type);
arrow::internal::hash_combine(result_hash, schema_string);
arrow::internal::hash_combine(result_hash, expr_string);
hash_code_ = result_hash;
}

size_t Hash() const { return hash_code_; }

std::string Type() const { return type_; }

bool operator==(const BaseCacheKey& other) const {
if (hash_code_ != other.hash_code_) {
return false;
}
return true;
}

bool operator!=(const BaseCacheKey& other) const { return !(*this == other); }

private:
uint64_t hash_code_;
std::string type_;
SchemaPtr schema_;
};

} // namespace gandiva
10 changes: 6 additions & 4 deletions cpp/src/gandiva/cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@

namespace gandiva {

static const int DEFAULT_CACHE_SIZE = 500;
static const size_t DEFAULT_CACHE_SIZE = 128 * 1024 * 1024; // 256 MiB

int GetCapacity() {
int capacity;
size_t GetCapacity() {
size_t capacity;
const char* env_cache_size = std::getenv("GANDIVA_CACHE_SIZE");
if (env_cache_size != nullptr) {
capacity = std::atoi(env_cache_size);

if (capacity <= 0) {
ARROW_LOG(WARNING) << "Invalid cache size provided. Using default cache size: "
<< DEFAULT_CACHE_SIZE;
Expand All @@ -35,11 +36,12 @@ int GetCapacity() {
} else {
capacity = DEFAULT_CACHE_SIZE;
}

return capacity;
}

void LogCacheSize(size_t capacity) {
ARROW_LOG(INFO) << "Creating gandiva cache with capacity: " << capacity;
ARROW_LOG(INFO) << "Creating gandiva cache with capacity of " << capacity << " bytes";
}

} // namespace gandiva
26 changes: 25 additions & 1 deletion cpp/src/gandiva/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
namespace gandiva {

GANDIVA_EXPORT
int GetCapacity();
size_t GetCapacity();

GANDIVA_EXPORT
void LogCacheSize(size_t capacity);
Expand All @@ -39,6 +39,10 @@ class Cache {

Cache() : Cache(GetCapacity()) {}

::std::shared_ptr<Cache> create(size_t capacity) {
return ::std::make_shared<Cache>(cache_(capacity));
}

ValueType GetModule(KeyType cache_key) {
arrow::util::optional<ValueCacheObject<ValueType>> result;
mtx_.lock();
Expand All @@ -47,12 +51,32 @@ class Cache {
return result != arrow::util::nullopt ? (*result).module : nullptr;
}

ValueType GetObjectCode(KeyType cache_key) {
arrow::util::optional<ValueCacheObject<ValueType>> result;
mtx_.lock();
result = cache_.GetObjectCode(cache_key);
mtx_.unlock();
return result != arrow::util::nullopt ? (*result).module : nullptr;
}

void PutModule(KeyType cache_key, ValueCacheObject<ValueType> valueCacheObject) {
mtx_.lock();
cache_.insert(cache_key, valueCacheObject);
mtx_.unlock();
}

void PutObjectCode(KeyType& cache_key, ValueCacheObject<ValueType> object_code) {
mtx_.lock();
cache_.InsertObjectCode(cache_key, object_code);
mtx_.unlock();
}

::std::shared_ptr<Cache> CreateSharedCachePtr() { return Cache::create(); }

std::string ToString() { return cache_.ToString(); }

size_t GetCacheSize() { return cache_.GetCacheSize(); }

private:
GreedyDualSizeCache<KeyType, ValueType> cache_;
std::mutex mtx_;
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/gandiva/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ Status Engine::Make(const std::shared_ptr<Configuration>& conf,
return Status::CodeGenError("Could not instantiate llvm::ExecutionEngine: ",
builder_error);
}

std::unique_ptr<Engine> engine{
new Engine(conf, std::move(ctx), std::move(exec_engine), module_ptr)};
ARROW_RETURN_NOT_OK(engine->Init());
Expand Down Expand Up @@ -303,11 +302,14 @@ Status Engine::FinalizeModule() {

ARROW_RETURN_IF(llvm::verifyModule(*module_, &llvm::errs()),
Status::CodeGenError("Module verification failed after optimizer"));

// do the compilation
if (execution_engine_->hasError()) {
ARROW_LOG(WARNING) << "[ERROR]: " << execution_engine_->getErrorMessage();
module_finalized_ = false;
return Status::ExecutionError(execution_engine_->getErrorMessage());
}
execution_engine_->finalizeObject();
module_finalized_ = true;

return Status::OK();
}

Expand Down
15 changes: 13 additions & 2 deletions cpp/src/gandiva/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
#include <string>
#include <vector>

#include "arrow/util/macros.h"

#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "gandiva/configuration.h"
#include "gandiva/gandiva_object_cache.h"
#include "gandiva/llvm_includes.h"
#include "gandiva/llvm_types.h"
#include "gandiva/visibility.h"
Expand Down Expand Up @@ -54,6 +54,17 @@ class GANDIVA_EXPORT Engine {
functions_to_compile_.push_back(fname);
}

/// Set BaseObjectCache.
template <class KeyType>
Status SetLLVMObjectCache(GandivaObjectCache<KeyType>& object_cache) {
execution_engine_->setObjectCache(&object_cache);
if (execution_engine_->hasError()) {
return Status::ExecutionError(
"[CACHE-LOG][ERROR]: Can not set custom llvm object cache");
}
return Status::OK();
}

/// Optimise and compile the module.
Status FinalizeModule();

Expand Down
60 changes: 46 additions & 14 deletions cpp/src/gandiva/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "arrow/util/hash_util.h"

#include "gandiva/base_cache_key.h"
#include "gandiva/bitmap_accumulator.h"
#include "gandiva/cache.h"
#include "gandiva/condition.h"
Expand All @@ -42,7 +43,7 @@ FilterCacheKey::FilterCacheKey(SchemaPtr schema,
expression_as_string_ = expression.ToString();
UpdateUniqifier(expression_as_string_);
arrow::internal::hash_combine(result, expression_as_string_);
arrow::internal::hash_combine(result, configuration);
arrow::internal::hash_combine(result, configuration->Hash());
arrow::internal::hash_combine(result, schema_->ToString());
arrow::internal::hash_combine(result, uniqifier_);
hash_code_ = result;
Expand Down Expand Up @@ -102,14 +103,31 @@ Status Filter::Make(SchemaPtr schema, ConditionPtr condition,
ARROW_RETURN_IF(configuration == nullptr,
Status::Invalid("Configuration cannot be null"));

static Cache<FilterCacheKey, std::shared_ptr<Filter>> cache;
FilterCacheKey cache_key(schema, configuration, *(condition.get()));
auto cachedFilter = cache.GetModule(cache_key);
if (cachedFilter != nullptr) {
*filter = cachedFilter;
return Status::OK();
std::shared_ptr<Cache<BaseCacheKey, std::shared_ptr<llvm::MemoryBuffer>>> shared_cache =
LLVMGenerator::GetCache();

Condition conditionToKey = *(condition.get());

FilterCacheKey filter_key(schema, configuration, conditionToKey);
BaseCacheKey cache_key(filter_key, "filter");
std::unique_ptr<BaseCacheKey> base_cache_key =
std::make_unique<BaseCacheKey>(cache_key);
std::shared_ptr<BaseCacheKey> shared_base_cache_key = std::move(base_cache_key);

bool llvm_flag = false;

std::shared_ptr<llvm::MemoryBuffer> prev_cached_obj;
prev_cached_obj = shared_cache->GetObjectCode(*shared_base_cache_key);

// Verify if previous filter obj code was cached
if (prev_cached_obj != nullptr) {
ARROW_LOG(DEBUG)
<< "[DEBUG][CACHE-LOG][INFO]: Filter object code WAS already cached!";
llvm_flag = true;
}

GandivaObjectCache<BaseCacheKey> obj_cache(shared_cache, shared_base_cache_key);

// Build LLVM generator, and generate code for the specified expression
std::unique_ptr<LLVMGenerator> llvm_gen;
ARROW_RETURN_NOT_OK(LLVMGenerator::Make(configuration, &llvm_gen));
Expand All @@ -120,17 +138,27 @@ Status Filter::Make(SchemaPtr schema, ConditionPtr condition,
ARROW_RETURN_NOT_OK(expr_validator.Validate(condition));

// Start measuring build time
auto begin = std::chrono::high_resolution_clock::now();
ARROW_RETURN_NOT_OK(llvm_gen->Build({condition}, SelectionVector::Mode::MODE_NONE));
// auto begin = std::chrono::high_resolution_clock::now();
// ARROW_RETURN_NOT_OK(llvm_gen->Build({condition}, SelectionVector::Mode::MODE_NONE));
// Stop measuring time and calculate the elapsed time
auto end = std::chrono::high_resolution_clock::now();
auto elapsed =
std::chrono::duration_cast<std::chrono::milliseconds>(end - begin).count();
// auto end = std::chrono::high_resolution_clock::now();
// auto elapsed =
// std::chrono::duration_cast<std::chrono::milliseconds>(end - begin).count();
ARROW_RETURN_NOT_OK(
llvm_gen->Build({condition}, SelectionVector::Mode::MODE_NONE,
obj_cache)); // to use when caching only the obj code

// Instantiate the filter with the completely built llvm generator
*filter = std::make_shared<Filter>(std::move(llvm_gen), schema, configuration);
ValueCacheObject<std::shared_ptr<Filter>> value_cache(*filter, elapsed);
cache.PutModule(cache_key, value_cache);
// ValueCacheObject<std::shared_ptr<Filter>> value_cache(*filter, elapsed);
// cache.PutModule(cache_key, value_cache);
//
filter->get()->SetCompiledFromCache(
llvm_flag); // to use when caching only the obj code

ARROW_LOG(DEBUG)
<< "[DEBUG][CACHE-LOG][INFO]: " +
shared_cache->ToString(); // to use when caching only the obj code

return Status::OK();
}
Expand Down Expand Up @@ -168,4 +196,8 @@ Status Filter::Evaluate(const arrow::RecordBatch& batch,

std::string Filter::DumpIR() { return llvm_generator_->DumpIR(); }

void Filter::SetCompiledFromCache(bool flag) { compiled_from_cache_ = flag; }

bool Filter::GetCompiledFromCache() { return compiled_from_cache_; }

} // namespace gandiva
4 changes: 4 additions & 0 deletions cpp/src/gandiva/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,14 @@ class GANDIVA_EXPORT Filter {

std::string DumpIR();

void SetCompiledFromCache(bool flag);
bool GetCompiledFromCache();

private:
std::unique_ptr<LLVMGenerator> llvm_generator_;
SchemaPtr schema_;
std::shared_ptr<Configuration> configuration_;
bool compiled_from_cache_;
};

} // namespace gandiva
Loading