diff --git a/cpp/src/gandiva/base_cache_key.h b/cpp/src/gandiva/base_cache_key.h new file mode 100644 index 0000000000000..f3eeee2923680 --- /dev/null +++ b/cpp/src/gandiva/base_cache_key.h @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "gandiva/expression.h" +#include "gandiva/filter.h" +#include "gandiva/projector.h" + +namespace gandiva { + +class BaseCacheKey { + public: + BaseCacheKey(Expression& expr, std::string type) : type_(type) { + static const int32_t kSeedValue = 4; + std::string expr_as_string = expr.ToString(); + size_t result_hash = kSeedValue; + arrow::internal::hash_combine(result_hash, type); + arrow::internal::hash_combine(result_hash, expr_as_string); + hash_code_ = result_hash; + } + + BaseCacheKey(ProjectorCacheKey& key, std::string type) : type_(type) { + static const int32_t kSeedValue = 4; + size_t key_hash = key.Hash(); + size_t result_hash = kSeedValue; + arrow::internal::hash_combine(result_hash, type); + arrow::internal::hash_combine(result_hash, key_hash); + hash_code_ = result_hash; + schema_ = key.schema(); + } + + BaseCacheKey(FilterCacheKey& key, std::string type) : type_(type) { + static const size_t kSeedValue = 4; + size_t key_hash = key.Hash(); + size_t result_hash = kSeedValue; + arrow::internal::hash_combine(result_hash, type); + arrow::internal::hash_combine(result_hash, key_hash); + hash_code_ = result_hash; + schema_ = key.schema(); + } + + BaseCacheKey(std::shared_ptr schema, std::shared_ptr expr, + std::string type) + : type_(type) { + static const int32_t kSeedValue = 4; + size_t result_hash = kSeedValue; + std::string schema_string = schema->ToString(); + std::string expr_string = expr->ToString(); + arrow::internal::hash_combine(result_hash, type); + arrow::internal::hash_combine(result_hash, schema_string); + arrow::internal::hash_combine(result_hash, expr_string); + hash_code_ = result_hash; + } + + size_t Hash() const { return hash_code_; } + + std::string Type() const { return type_; } + + bool operator==(const BaseCacheKey& other) const { + if (hash_code_ != other.hash_code_) { + return false; + } + return true; + } + + bool operator!=(const BaseCacheKey& other) const { return !(*this == other); } + + private: + uint64_t hash_code_; + std::string type_; + SchemaPtr schema_; +}; + +} // namespace gandiva diff --git a/cpp/src/gandiva/cache.cc b/cpp/src/gandiva/cache.cc index d823a676bc2f7..7546c83c1d0ee 100644 --- a/cpp/src/gandiva/cache.cc +++ b/cpp/src/gandiva/cache.cc @@ -20,13 +20,14 @@ namespace gandiva { -static const int DEFAULT_CACHE_SIZE = 500; +static const size_t DEFAULT_CACHE_SIZE = 128 * 1024 * 1024; // 256 MiB -int GetCapacity() { - int capacity; +size_t GetCapacity() { + size_t capacity; const char* env_cache_size = std::getenv("GANDIVA_CACHE_SIZE"); if (env_cache_size != nullptr) { capacity = std::atoi(env_cache_size); + if (capacity <= 0) { ARROW_LOG(WARNING) << "Invalid cache size provided. Using default cache size: " << DEFAULT_CACHE_SIZE; @@ -35,11 +36,12 @@ int GetCapacity() { } else { capacity = DEFAULT_CACHE_SIZE; } + return capacity; } void LogCacheSize(size_t capacity) { - ARROW_LOG(INFO) << "Creating gandiva cache with capacity: " << capacity; + ARROW_LOG(INFO) << "Creating gandiva cache with capacity of " << capacity << " bytes"; } } // namespace gandiva diff --git a/cpp/src/gandiva/cache.h b/cpp/src/gandiva/cache.h index 8d0f75ce36a06..1d5f086a5a54a 100644 --- a/cpp/src/gandiva/cache.h +++ b/cpp/src/gandiva/cache.h @@ -27,7 +27,7 @@ namespace gandiva { GANDIVA_EXPORT -int GetCapacity(); +size_t GetCapacity(); GANDIVA_EXPORT void LogCacheSize(size_t capacity); @@ -39,6 +39,10 @@ class Cache { Cache() : Cache(GetCapacity()) {} + ::std::shared_ptr create(size_t capacity) { + return ::std::make_shared(cache_(capacity)); + } + ValueType GetModule(KeyType cache_key) { arrow::util::optional> result; mtx_.lock(); @@ -47,12 +51,32 @@ class Cache { return result != arrow::util::nullopt ? (*result).module : nullptr; } + ValueType GetObjectCode(KeyType cache_key) { + arrow::util::optional> result; + mtx_.lock(); + result = cache_.GetObjectCode(cache_key); + mtx_.unlock(); + return result != arrow::util::nullopt ? (*result).module : nullptr; + } + void PutModule(KeyType cache_key, ValueCacheObject valueCacheObject) { mtx_.lock(); cache_.insert(cache_key, valueCacheObject); mtx_.unlock(); } + void PutObjectCode(KeyType& cache_key, ValueCacheObject object_code) { + mtx_.lock(); + cache_.InsertObjectCode(cache_key, object_code); + mtx_.unlock(); + } + + ::std::shared_ptr CreateSharedCachePtr() { return Cache::create(); } + + std::string ToString() { return cache_.ToString(); } + + size_t GetCacheSize() { return cache_.GetCacheSize(); } + private: GreedyDualSizeCache cache_; std::mutex mtx_; diff --git a/cpp/src/gandiva/engine.cc b/cpp/src/gandiva/engine.cc index f0b768f5f43cd..21e7a82170d7b 100644 --- a/cpp/src/gandiva/engine.cc +++ b/cpp/src/gandiva/engine.cc @@ -167,7 +167,6 @@ Status Engine::Make(const std::shared_ptr& conf, return Status::CodeGenError("Could not instantiate llvm::ExecutionEngine: ", builder_error); } - std::unique_ptr engine{ new Engine(conf, std::move(ctx), std::move(exec_engine), module_ptr)}; ARROW_RETURN_NOT_OK(engine->Init()); @@ -303,11 +302,14 @@ Status Engine::FinalizeModule() { ARROW_RETURN_IF(llvm::verifyModule(*module_, &llvm::errs()), Status::CodeGenError("Module verification failed after optimizer")); - // do the compilation + if (execution_engine_->hasError()) { + ARROW_LOG(WARNING) << "[ERROR]: " << execution_engine_->getErrorMessage(); + module_finalized_ = false; + return Status::ExecutionError(execution_engine_->getErrorMessage()); + } execution_engine_->finalizeObject(); module_finalized_ = true; - return Status::OK(); } diff --git a/cpp/src/gandiva/engine.h b/cpp/src/gandiva/engine.h index d26b8aa0ea96c..4a3ba76a75232 100644 --- a/cpp/src/gandiva/engine.h +++ b/cpp/src/gandiva/engine.h @@ -22,10 +22,10 @@ #include #include -#include "arrow/util/macros.h" - #include "arrow/util/logging.h" +#include "arrow/util/macros.h" #include "gandiva/configuration.h" +#include "gandiva/gandiva_object_cache.h" #include "gandiva/llvm_includes.h" #include "gandiva/llvm_types.h" #include "gandiva/visibility.h" @@ -54,6 +54,17 @@ class GANDIVA_EXPORT Engine { functions_to_compile_.push_back(fname); } + /// Set BaseObjectCache. + template + Status SetLLVMObjectCache(GandivaObjectCache& object_cache) { + execution_engine_->setObjectCache(&object_cache); + if (execution_engine_->hasError()) { + return Status::ExecutionError( + "[CACHE-LOG][ERROR]: Can not set custom llvm object cache"); + } + return Status::OK(); + } + /// Optimise and compile the module. Status FinalizeModule(); diff --git a/cpp/src/gandiva/filter.cc b/cpp/src/gandiva/filter.cc index 875cc5447f43d..aeec550d4005a 100644 --- a/cpp/src/gandiva/filter.cc +++ b/cpp/src/gandiva/filter.cc @@ -24,6 +24,7 @@ #include "arrow/util/hash_util.h" +#include "gandiva/base_cache_key.h" #include "gandiva/bitmap_accumulator.h" #include "gandiva/cache.h" #include "gandiva/condition.h" @@ -42,7 +43,7 @@ FilterCacheKey::FilterCacheKey(SchemaPtr schema, expression_as_string_ = expression.ToString(); UpdateUniqifier(expression_as_string_); arrow::internal::hash_combine(result, expression_as_string_); - arrow::internal::hash_combine(result, configuration); + arrow::internal::hash_combine(result, configuration->Hash()); arrow::internal::hash_combine(result, schema_->ToString()); arrow::internal::hash_combine(result, uniqifier_); hash_code_ = result; @@ -102,14 +103,31 @@ Status Filter::Make(SchemaPtr schema, ConditionPtr condition, ARROW_RETURN_IF(configuration == nullptr, Status::Invalid("Configuration cannot be null")); - static Cache> cache; - FilterCacheKey cache_key(schema, configuration, *(condition.get())); - auto cachedFilter = cache.GetModule(cache_key); - if (cachedFilter != nullptr) { - *filter = cachedFilter; - return Status::OK(); + std::shared_ptr>> shared_cache = + LLVMGenerator::GetCache(); + + Condition conditionToKey = *(condition.get()); + + FilterCacheKey filter_key(schema, configuration, conditionToKey); + BaseCacheKey cache_key(filter_key, "filter"); + std::unique_ptr base_cache_key = + std::make_unique(cache_key); + std::shared_ptr shared_base_cache_key = std::move(base_cache_key); + + bool llvm_flag = false; + + std::shared_ptr prev_cached_obj; + prev_cached_obj = shared_cache->GetObjectCode(*shared_base_cache_key); + + // Verify if previous filter obj code was cached + if (prev_cached_obj != nullptr) { + ARROW_LOG(DEBUG) + << "[DEBUG][CACHE-LOG][INFO]: Filter object code WAS already cached!"; + llvm_flag = true; } + GandivaObjectCache obj_cache(shared_cache, shared_base_cache_key); + // Build LLVM generator, and generate code for the specified expression std::unique_ptr llvm_gen; ARROW_RETURN_NOT_OK(LLVMGenerator::Make(configuration, &llvm_gen)); @@ -120,17 +138,27 @@ Status Filter::Make(SchemaPtr schema, ConditionPtr condition, ARROW_RETURN_NOT_OK(expr_validator.Validate(condition)); // Start measuring build time - auto begin = std::chrono::high_resolution_clock::now(); - ARROW_RETURN_NOT_OK(llvm_gen->Build({condition}, SelectionVector::Mode::MODE_NONE)); + // auto begin = std::chrono::high_resolution_clock::now(); + // ARROW_RETURN_NOT_OK(llvm_gen->Build({condition}, SelectionVector::Mode::MODE_NONE)); // Stop measuring time and calculate the elapsed time - auto end = std::chrono::high_resolution_clock::now(); - auto elapsed = - std::chrono::duration_cast(end - begin).count(); + // auto end = std::chrono::high_resolution_clock::now(); + // auto elapsed = + // std::chrono::duration_cast(end - begin).count(); + ARROW_RETURN_NOT_OK( + llvm_gen->Build({condition}, SelectionVector::Mode::MODE_NONE, + obj_cache)); // to use when caching only the obj code // Instantiate the filter with the completely built llvm generator *filter = std::make_shared(std::move(llvm_gen), schema, configuration); - ValueCacheObject> value_cache(*filter, elapsed); - cache.PutModule(cache_key, value_cache); + // ValueCacheObject> value_cache(*filter, elapsed); + // cache.PutModule(cache_key, value_cache); + // + filter->get()->SetCompiledFromCache( + llvm_flag); // to use when caching only the obj code + + ARROW_LOG(DEBUG) + << "[DEBUG][CACHE-LOG][INFO]: " + + shared_cache->ToString(); // to use when caching only the obj code return Status::OK(); } @@ -168,4 +196,8 @@ Status Filter::Evaluate(const arrow::RecordBatch& batch, std::string Filter::DumpIR() { return llvm_generator_->DumpIR(); } +void Filter::SetCompiledFromCache(bool flag) { compiled_from_cache_ = flag; } + +bool Filter::GetCompiledFromCache() { return compiled_from_cache_; } + } // namespace gandiva diff --git a/cpp/src/gandiva/filter.h b/cpp/src/gandiva/filter.h index 70ccd7cf0ceee..4b98f9ccbfc0e 100644 --- a/cpp/src/gandiva/filter.h +++ b/cpp/src/gandiva/filter.h @@ -103,10 +103,14 @@ class GANDIVA_EXPORT Filter { std::string DumpIR(); + void SetCompiledFromCache(bool flag); + bool GetCompiledFromCache(); + private: std::unique_ptr llvm_generator_; SchemaPtr schema_; std::shared_ptr configuration_; + bool compiled_from_cache_; }; } // namespace gandiva diff --git a/cpp/src/gandiva/gandiva_object_cache.h b/cpp/src/gandiva/gandiva_object_cache.h new file mode 100644 index 0000000000000..44463152f709e --- /dev/null +++ b/cpp/src/gandiva/gandiva_object_cache.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "gandiva/cache.h" +#include "llvm/ExecutionEngine/ObjectCache.h" +#include "llvm/IR/Module.h" + +namespace gandiva { +/// Class that enables the LLVM to use a custom rule to deal with the object code. +template +class GandivaObjectCache : public llvm::ObjectCache { + public: + GandivaObjectCache( + std::shared_ptr>>& cache, + std::shared_ptr& key) { + cache_ = cache; + cache_key_ = key; + } + + ~GandivaObjectCache() {} + + void notifyObjectCompiled(const llvm::Module* M, llvm::MemoryBufferRef Obj) { + std::unique_ptr obj_buffer = + llvm::MemoryBuffer::getMemBufferCopy(Obj.getBuffer(), Obj.getBufferIdentifier()); + std::shared_ptr obj_code = std::move(obj_buffer); + ValueCacheObject> value_cache( + obj_code, elapsed_, obj_code->getBufferSize()); + cache_->PutObjectCode(*cache_key_.get(), value_cache); + } + + std::unique_ptr getObject(const llvm::Module* M) { + std::shared_ptr cached_obj = + cache_->GetObjectCode(*cache_key_.get()); + auto null = std::nullptr_t(); + if (cached_obj != null) { + std::unique_ptr cached_buffer = cached_obj->getMemBufferCopy( + cached_obj->getBuffer(), cached_obj->getBufferIdentifier()); + return cached_buffer; + } + return null; + } + + void AddElapsedTime(size_t elapsed) { elapsed_ = elapsed; } + + private: + std::shared_ptr cache_key_; + std::shared_ptr>> cache_; + size_t elapsed_; +}; +} // namespace gandiva diff --git a/cpp/src/gandiva/greedy_dual_size_cache.h b/cpp/src/gandiva/greedy_dual_size_cache.h index cb5c38e075c4b..7180bef0dd815 100644 --- a/cpp/src/gandiva/greedy_dual_size_cache.h +++ b/cpp/src/gandiva/greedy_dual_size_cache.h @@ -17,6 +17,8 @@ #pragma once +#include + #include #include #include @@ -31,9 +33,11 @@ namespace gandiva { template class ValueCacheObject { public: - ValueCacheObject(ValueType module, uint64_t cost) : module(module), cost(cost) {} + ValueCacheObject(ValueType module, uint64_t cost, size_t size) + : module(module), cost(cost), size(size) {} ValueType module; uint64_t cost; + size_t size; bool operator<(const ValueCacheObject& other) const { return cost < other.cost; } }; @@ -110,6 +114,25 @@ class GreedyDualSizeCache { } } + void InsertObjectCode(const Key& key, const ValueCacheObject& value) { + typename map_type::iterator i = map_.find(key); + // check if element is not in the cache to add it + if (i == map_.end()) { + // insert item into the cache, but first check if it is full, to evict an item + // if it is necessary + if (size() >= capacity_) { + evict(); + } + + // insert the new item + auto item = + priority_set_.insert(PriorityItem(value.cost + inflation_, value.cost, key)); + // save on map the value and the priority item iterator position + map_.emplace(key, std::make_pair(value, item.first)); + cache_size_ += value.size; + } + } + arrow::util::optional> get(const Key& key) { // lookup value in the cache typename map_type::iterator value_for_key = map_.find(key); @@ -128,11 +151,37 @@ class GreedyDualSizeCache { return value_for_key->second.first; } + arrow::util::optional> GetObjectCode(const Key& key) { + // lookup value in the cache + typename map_type::iterator value_for_key = map_.find(key); + if (value_for_key == map_.end()) { + // value not in cache + return arrow::util::nullopt; + } + PriorityItem item = *value_for_key->second.second; + // if the value was found on the cache, update its cost (original + inflation) + if (item.actual_priority != item.original_priority + inflation_) { + priority_set_.erase(value_for_key->second.second); + auto iter = priority_set_.insert(PriorityItem( + item.original_priority + inflation_, item.original_priority, item.cache_key)); + value_for_key->second.second = iter.first; + } + return value_for_key->second.first; + } + void clear() { map_.clear(); priority_set_.clear(); } + size_t GetCacheSize() { return cache_size_; } + + std::string ToString() { + size_t cache_map_length = map_.size(); + return "Cache has " + std::to_string(cache_map_length) + " items," + + " with total size of " + std::to_string(cache_size_) + " bytes."; + } + private: void evict() { // TODO: inflation overflow is unlikely to happen but needs to be handled @@ -146,9 +195,25 @@ class GreedyDualSizeCache { priority_set_.erase(i); } + void evictObject() { + // TODO: inflation overflow is unlikely to happen but needs to be handled + // for correctness. + // evict item from the beginning of the set. This set is ordered from the + // lower priority value to the higher priority value. + typename std::set::iterator i = priority_set_.begin(); + // update the inflation cost related to the evicted item + inflation_ = (*i).actual_priority; + size_t size_to_decrease = map_.find((*i).cache_key)->second.first.size; + cache_size_ -= size_to_decrease; + map_.erase((*i).cache_key); + priority_set_.erase(i); + } + map_type map_; std::set priority_set_; uint64_t inflation_; size_t capacity_; + size_t cache_size_ = 0; + llvm::SmallString<128> cache_dir_; }; } // namespace gandiva diff --git a/cpp/src/gandiva/greedy_dual_size_cache_test.cc b/cpp/src/gandiva/greedy_dual_size_cache_test.cc index 3c72eef7092aa..11c5e275d29b3 100644 --- a/cpp/src/gandiva/greedy_dual_size_cache_test.cc +++ b/cpp/src/gandiva/greedy_dual_size_cache_test.cc @@ -46,11 +46,11 @@ class TestGreedyDualSizeCache : public ::testing::Test { TEST_F(TestGreedyDualSizeCache, TestEvict) { // check if the cache is evicting the items with low priority on cache - cache_.insert(GreedyDualSizeCacheKey(1), ValueCacheObject("1", 1)); - cache_.insert(GreedyDualSizeCacheKey(2), ValueCacheObject("2", 10)); - cache_.insert(GreedyDualSizeCacheKey(3), ValueCacheObject("3", 20)); - cache_.insert(GreedyDualSizeCacheKey(4), ValueCacheObject("4", 15)); - cache_.insert(GreedyDualSizeCacheKey(1), ValueCacheObject("5", 1)); + cache_.insert(GreedyDualSizeCacheKey(1), ValueCacheObject("1", 1, 1)); + cache_.insert(GreedyDualSizeCacheKey(2), ValueCacheObject("2", 10, 10)); + cache_.insert(GreedyDualSizeCacheKey(3), ValueCacheObject("3", 20, 20)); + cache_.insert(GreedyDualSizeCacheKey(4), ValueCacheObject("4", 15, 15)); + cache_.insert(GreedyDualSizeCacheKey(1), ValueCacheObject("5", 1, 1)); ASSERT_EQ(2, cache_.size()); // we check initially the values that won't be on the cache, since the get operation // may affect the entity costs, which is not the purpose of this test @@ -62,9 +62,9 @@ TEST_F(TestGreedyDualSizeCache, TestEvict) { TEST_F(TestGreedyDualSizeCache, TestGreedyDualSizeBehavior) { // insert 1 and 3 evicting 2 (this eviction will increase the inflation cost by 20) - cache_.insert(GreedyDualSizeCacheKey(1), ValueCacheObject("1", 40)); - cache_.insert(GreedyDualSizeCacheKey(2), ValueCacheObject("2", 20)); - cache_.insert(GreedyDualSizeCacheKey(3), ValueCacheObject("3", 30)); + cache_.insert(GreedyDualSizeCacheKey(1), ValueCacheObject("1", 40, 40)); + cache_.insert(GreedyDualSizeCacheKey(2), ValueCacheObject("2", 20, 20)); + cache_.insert(GreedyDualSizeCacheKey(3), ValueCacheObject("3", 30, 30)); // when accessing key 3, its actual cost will be increased by the inflation, so in the // next eviction, the key 1 will be evicted, since the key 1 actual cost (original(40)) @@ -72,13 +72,13 @@ TEST_F(TestGreedyDualSizeCache, TestGreedyDualSizeBehavior) { ASSERT_EQ(cache_.get(GreedyDualSizeCacheKey(3))->module, "3"); // try to insert key 2 and expect the eviction of key 1 - cache_.insert(GreedyDualSizeCacheKey(2), ValueCacheObject("2", 20)); + cache_.insert(GreedyDualSizeCacheKey(2), ValueCacheObject("2", 20, 20)); ASSERT_EQ(cache_.get(GreedyDualSizeCacheKey(1)), arrow::util::nullopt); // when accessing key 2, its original cost should be increased by inflation, so when // inserting the key 1 again, now the key 3 should be evicted ASSERT_EQ(cache_.get(GreedyDualSizeCacheKey(2))->module, "2"); - cache_.insert(GreedyDualSizeCacheKey(1), ValueCacheObject("1", 20)); + cache_.insert(GreedyDualSizeCacheKey(1), ValueCacheObject("1", 20, 20)); ASSERT_EQ(cache_.get(GreedyDualSizeCacheKey(1))->module, "1"); ASSERT_EQ(cache_.get(GreedyDualSizeCacheKey(2))->module, "2"); diff --git a/cpp/src/gandiva/llvm_generator.cc b/cpp/src/gandiva/llvm_generator.cc index d84a0374e6b6a..f2b2ad4c888cf 100644 --- a/cpp/src/gandiva/llvm_generator.cc +++ b/cpp/src/gandiva/llvm_generator.cc @@ -29,6 +29,7 @@ #include "gandiva/dex.h" #include "gandiva/expr_decomposer.h" #include "gandiva/expression.h" +#include "gandiva/gandiva_object_cache.h" #include "gandiva/lvalue.h" namespace gandiva { @@ -50,6 +51,23 @@ Status LLVMGenerator::Make(std::shared_ptr config, return Status::OK(); } +std::shared_ptr>> +LLVMGenerator::GetCache() { + static Cache> cache; + // static std::unique_ptr>> + // cache_unique = std::make_unique>>(); + + // static std::shared_ptr>> + // shared_cache = std::move(cache_unique); + + static std::shared_ptr>> + shared_cache = + std::make_shared>>(); + + return shared_cache; +} + Status LLVMGenerator::Add(const ExpressionPtr expr, const FieldDescriptorPtr output) { int idx = static_cast(compiled_exprs_.size()); // decompose the expression to separate out value and validities. diff --git a/cpp/src/gandiva/llvm_generator.h b/cpp/src/gandiva/llvm_generator.h index ff6d846024cb9..3ac9b77357cc5 100644 --- a/cpp/src/gandiva/llvm_generator.h +++ b/cpp/src/gandiva/llvm_generator.h @@ -23,15 +23,17 @@ #include #include "arrow/util/macros.h" - #include "gandiva/annotator.h" +#include "gandiva/base_cache_key.h" #include "gandiva/compiled_expr.h" #include "gandiva/configuration.h" #include "gandiva/dex_visitor.h" #include "gandiva/engine.h" #include "gandiva/execution_context.h" +#include "gandiva/expr_decomposer.h" #include "gandiva/function_registry.h" #include "gandiva/gandiva_aliases.h" +#include "gandiva/gandiva_object_cache.h" #include "gandiva/llvm_types.h" #include "gandiva/lvalue.h" #include "gandiva/selection_vector.h" @@ -49,10 +51,48 @@ class GANDIVA_EXPORT LLVMGenerator { static Status Make(std::shared_ptr config, std::unique_ptr* llvm_generator); + static std::shared_ptr>> + GetCache(); + /// \brief Build the code for the expression trees for default mode. Each /// element in the vector represents an expression tree Status Build(const ExpressionVector& exprs, SelectionVector::Mode mode); + /// \brief Build the code for the expression trees for default mode with a LLVM + /// ObjectCache. Each element in the vector represents an expression tree + template + Status Build(const ExpressionVector& exprs, SelectionVector::Mode mode, + GandivaObjectCache& obj_cache) { + selection_vector_mode_ = mode; + + // Start measuring code gen time + auto begin = std::chrono::high_resolution_clock::now(); + for (auto& expr : exprs) { + auto output = annotator_.AddOutputFieldDescriptor(expr->result()); + ARROW_RETURN_NOT_OK(Add(expr, output)); + } + + // Stop measuring time, calculate the elapsed time and pass it to object cache + auto end = std::chrono::high_resolution_clock::now(); + size_t elapsed = + std::chrono::duration_cast(end - begin).count(); + obj_cache.AddElapsedTime(elapsed); + + ARROW_RETURN_NOT_OK(engine_->SetLLVMObjectCache(obj_cache)); + + // Compile and inject into the process' memory the generated function. + ARROW_RETURN_NOT_OK(engine_->FinalizeModule()); + + // setup the jit functions for each expression. + for (auto& compiled_expr : compiled_exprs_) { + auto ir_fn = compiled_expr->GetIRFunction(mode); + auto jit_fn = reinterpret_cast(engine_->CompiledFunction(ir_fn)); + compiled_expr->SetJITFunction(selection_vector_mode_, jit_fn); + } + + return Status::OK(); + } + /// \brief Build the code for the expression trees for default mode. Each /// element in the vector represents an expression tree Status Build(const ExpressionVector& exprs) { @@ -240,7 +280,7 @@ class GANDIVA_EXPORT LLVMGenerator { void AddTrace(const std::string& msg, llvm::Value* value = NULLPTR); std::unique_ptr engine_; - std::vector> compiled_exprs_; + std::vector> compiled_exprs_; FunctionRegistry function_registry_; Annotator annotator_; SelectionVector::Mode selection_vector_mode_; diff --git a/cpp/src/gandiva/projector.cc b/cpp/src/gandiva/projector.cc index ff167538f9c1c..cdbdcdd4fa8ba 100644 --- a/cpp/src/gandiva/projector.cc +++ b/cpp/src/gandiva/projector.cc @@ -24,102 +24,88 @@ #include "arrow/util/hash_util.h" #include "arrow/util/logging.h" - +#include "gandiva/base_cache_key.h" #include "gandiva/cache.h" #include "gandiva/expr_validator.h" +#include "gandiva/gandiva_object_cache.h" #include "gandiva/llvm_generator.h" namespace gandiva { -class ProjectorCacheKey { - public: - ProjectorCacheKey(SchemaPtr schema, std::shared_ptr configuration, - ExpressionVector expression_vector, SelectionVector::Mode mode) - : schema_(schema), configuration_(configuration), mode_(mode), uniqifier_(0) { - static const int kSeedValue = 4; - size_t result = kSeedValue; - for (auto& expr : expression_vector) { - std::string expr_as_string = expr->ToString(); - expressions_as_strings_.push_back(expr_as_string); - arrow::internal::hash_combine(result, expr_as_string); - UpdateUniqifier(expr_as_string); - } - arrow::internal::hash_combine(result, static_cast(mode)); - arrow::internal::hash_combine(result, configuration->Hash()); - arrow::internal::hash_combine(result, schema_->ToString()); - arrow::internal::hash_combine(result, uniqifier_); - hash_code_ = result; +ProjectorCacheKey::ProjectorCacheKey(SchemaPtr schema, + std::shared_ptr configuration, + ExpressionVector expression_vector, + SelectionVector::Mode mode) + : schema_(schema), configuration_(configuration), mode_(mode), uniqifier_(0) { + static const int kSeedValue = 4; + size_t result = kSeedValue; + for (auto& expr : expression_vector) { + std::string expr_as_string = expr->ToString(); + expressions_as_strings_.push_back(expr_as_string); + arrow::internal::hash_combine(result, expr_as_string); + UpdateUniqifier(expr_as_string); } + arrow::internal::hash_combine(result, static_cast(mode)); + arrow::internal::hash_combine(result, configuration->Hash()); + arrow::internal::hash_combine(result, schema_->ToString()); + arrow::internal::hash_combine(result, uniqifier_); + hash_code_ = result; +} - std::size_t Hash() const { return hash_code_; } - - bool operator==(const ProjectorCacheKey& other) const { - // arrow schema does not overload equality operators. - if (!(schema_->Equals(*other.schema().get(), true))) { - return false; - } - - if (*configuration_ != *other.configuration_) { - return false; - } - - if (expressions_as_strings_ != other.expressions_as_strings_) { - return false; - } +bool ProjectorCacheKey::operator==(const ProjectorCacheKey& other) const { + // arrow schema does not overload equality operators. + if (!(schema_->Equals(*other.schema().get(), true))) { + return false; + } - if (mode_ != other.mode_) { - return false; - } + if (*configuration_ != *other.configuration_) { + return false; + } - if (uniqifier_ != other.uniqifier_) { - return false; - } - return true; + if (expressions_as_strings_ != other.expressions_as_strings_) { + return false; } - bool operator!=(const ProjectorCacheKey& other) const { return !(*this == other); } + if (mode_ != other.mode_) { + return false; + } - SchemaPtr schema() const { return schema_; } + if (uniqifier_ != other.uniqifier_) { + return false; + } + return true; +} - std::string ToString() const { - std::stringstream ss; - // indent, window, indent_size, null_rep and skip new lines. - arrow::PrettyPrintOptions options{0, 10, 2, "null", true}; - DCHECK_OK(PrettyPrint(*schema_.get(), options, &ss)); - - ss << "Expressions: ["; - bool first = true; - for (auto& expr : expressions_as_strings_) { - if (first) { - first = false; - } else { - ss << ", "; - } - - ss << expr; +std::string ProjectorCacheKey::ToString() const { + std::stringstream ss; + // indent, window, indent_size, null_rep and skip new lines. + arrow::PrettyPrintOptions options{0, 10, 2, "null", true}; + DCHECK_OK(PrettyPrint(*schema_.get(), options, &ss)); + + ss << "Expressions: ["; + bool first = true; + for (auto& expr : expressions_as_strings_) { + if (first) { + first = false; + } else { + ss << ", "; } - ss << "]"; - return ss.str(); + + ss << expr; } + ss << "]"; + return ss.str(); +} - private: - void UpdateUniqifier(const std::string& expr) { - if (uniqifier_ == 0) { - // caching of expressions with re2 patterns causes lock contention. So, use - // multiple instances to reduce contention. - if (expr.find(" like(") != std::string::npos) { - uniqifier_ = std::hash()(std::this_thread::get_id()) % 16; - } +void ProjectorCacheKey::UpdateUniqifier(const std::string& expr) { + if (uniqifier_ == 0) { + // caching of expressions with re2 patterns causes lock contention. So, use + // multiple instances to reduce contention. + if (expr.find(" like(") != std::string::npos) { + uniqifier_ = std::hash()(std::this_thread::get_id()) % 16; } } - - const SchemaPtr schema_; - const std::shared_ptr configuration_; - SelectionVector::Mode mode_; - std::vector expressions_as_strings_; - size_t hash_code_; - uint32_t uniqifier_; -}; +} Projector::Projector(std::unique_ptr llvm_generator, SchemaPtr schema, const FieldVector& output_fields, @@ -153,15 +139,29 @@ Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs, ARROW_RETURN_IF(configuration == nullptr, Status::Invalid("Configuration cannot be null")); - // see if equivalent projector was already built - static Cache> cache; - ProjectorCacheKey cache_key(schema, configuration, exprs, selection_vector_mode); - std::shared_ptr cached_projector = cache.GetModule(cache_key); - if (cached_projector != nullptr) { - *projector = cached_projector; - return Status::OK(); + std::shared_ptr>> shared_cache = + LLVMGenerator::GetCache(); + + ProjectorCacheKey projector_key(schema, configuration, exprs, selection_vector_mode); + BaseCacheKey cache_key(projector_key, "projector"); + std::unique_ptr base_cache_key = + std::make_unique(cache_key); + std::shared_ptr shared_base_cache_key = std::move(base_cache_key); + + bool llvm_flag = false; + + std::shared_ptr prev_cached_obj; + prev_cached_obj = shared_cache->GetObjectCode(*shared_base_cache_key); + + // Verify if previous projector obj code was cached + if (prev_cached_obj != nullptr) { + ARROW_LOG(DEBUG) + << "[DEBUG][CACHE-LOG][INFO]: Projector object code WAS already cached"; + llvm_flag = true; } + GandivaObjectCache obj_cache(shared_cache, shared_base_cache_key); + // Build LLVM generator, and generate code for the specified expressions std::unique_ptr llvm_gen; ARROW_RETURN_NOT_OK(LLVMGenerator::Make(configuration, &llvm_gen)); @@ -174,13 +174,15 @@ Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs, ARROW_RETURN_NOT_OK(expr_validator.Validate(expr)); } - // Start measuring build time - auto begin = std::chrono::high_resolution_clock::now(); - ARROW_RETURN_NOT_OK(llvm_gen->Build(exprs, selection_vector_mode)); - // Stop measuring time and calculate the elapsed time - auto end = std::chrono::high_resolution_clock::now(); - auto elapsed = - std::chrono::duration_cast(end - begin).count(); + // // Start measuring build time + // auto begin = std::chrono::high_resolution_clock::now(); + // ARROW_RETURN_NOT_OK(llvm_gen->Build(exprs, selection_vector_mode)); + // // Stop measuring time and calculate the elapsed time + // auto end = std::chrono::high_resolution_clock::now(); + // auto elapsed = + // std::chrono::duration_cast(end - begin).count(); + ARROW_RETURN_NOT_OK(llvm_gen->Build( + exprs, selection_vector_mode, obj_cache)); // to use when caching only the obj code // save the output field types. Used for validation at Evaluate() time. std::vector output_fields; @@ -192,8 +194,12 @@ Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs, // Instantiate the projector with the completely built llvm generator *projector = std::shared_ptr( new Projector(std::move(llvm_gen), schema, output_fields, configuration)); - ValueCacheObject> value_cache(*projector, elapsed); - cache.PutModule(cache_key, value_cache); + // ValueCacheObject> value_cache(*projector, elapsed); + // shared_cache->PutModule(cache_key, value_cache); + projector->get()->SetCompiledFromCache(llvm_flag); + ARROW_LOG(DEBUG) + << "[DEBUG][CACHE-LOG][INFO]: " + + shared_cache->ToString(); // to use when caching only the obj code return Status::OK(); } @@ -366,4 +372,8 @@ Status Projector::ValidateArrayDataCapacity(const arrow::ArrayData& array_data, std::string Projector::DumpIR() { return llvm_generator_->DumpIR(); } +void Projector::SetCompiledFromCache(bool flag) { compiled_from_cache_ = flag; } + +bool Projector::GetCompiledFromCache() { return compiled_from_cache_; } + } // namespace gandiva diff --git a/cpp/src/gandiva/projector.h b/cpp/src/gandiva/projector.h index 20b36c9d883cd..64bd21829a516 100644 --- a/cpp/src/gandiva/projector.h +++ b/cpp/src/gandiva/projector.h @@ -34,6 +34,32 @@ namespace gandiva { class LLVMGenerator; +class ProjectorCacheKey { + public: + ProjectorCacheKey(SchemaPtr schema, std::shared_ptr configuration, + ExpressionVector expression_vector, SelectionVector::Mode mode); + + std::size_t Hash() const { return hash_code_; } + + bool operator==(const ProjectorCacheKey& other) const; + + bool operator!=(const ProjectorCacheKey& other) const { return !(*this == other); } + + SchemaPtr schema() const { return schema_; } + + std::string ToString() const; + + private: + void UpdateUniqifier(const std::string& expr); + + const SchemaPtr schema_; + const std::shared_ptr configuration_; + SelectionVector::Mode mode_; + std::vector expressions_as_strings_; + size_t hash_code_; + uint32_t uniqifier_; +}; + /// \brief projection using expressions. /// /// A projector is built for a specific schema and vector of expressions. @@ -119,6 +145,9 @@ class GANDIVA_EXPORT Projector { std::string DumpIR(); + void SetCompiledFromCache(bool flag); + bool GetCompiledFromCache(); + private: Projector(std::unique_ptr llvm_generator, SchemaPtr schema, const FieldVector& output_fields, std::shared_ptr); @@ -138,6 +167,7 @@ class GANDIVA_EXPORT Projector { SchemaPtr schema_; FieldVector output_fields_; std::shared_ptr configuration_; + bool compiled_from_cache_; }; } // namespace gandiva diff --git a/cpp/src/gandiva/tests/filter_test.cc b/cpp/src/gandiva/tests/filter_test.cc index d4433f11eb1a8..4fbbea012eeff 100644 --- a/cpp/src/gandiva/tests/filter_test.cc +++ b/cpp/src/gandiva/tests/filter_test.cc @@ -29,7 +29,11 @@ using arrow::int32; class TestFilter : public ::testing::Test { public: - void SetUp() { pool_ = arrow::default_memory_pool(); } + void SetUp() { + pool_ = arrow::default_memory_pool(); + // Setup arrow log severity threshold to debug level. + arrow::util::ArrowLog::StartArrowLog("", arrow::util::ArrowLogLevel::ARROW_DEBUG); + } protected: arrow::MemoryPool* pool_; @@ -55,12 +59,13 @@ TEST_F(TestFilter, TestFilterCache) { std::shared_ptr filter; auto status = Filter::Make(schema, condition, configuration, &filter); EXPECT_TRUE(status.ok()); + EXPECT_FALSE(filter->GetCompiledFromCache()); // same schema and condition, should return the same filter as above. std::shared_ptr cached_filter; status = Filter::Make(schema, condition, configuration, &cached_filter); EXPECT_TRUE(status.ok()); - EXPECT_TRUE(cached_filter.get() == filter.get()); + EXPECT_TRUE(cached_filter->GetCompiledFromCache()); // schema is different should return a new filter. auto field2 = field("f2", int32()); @@ -69,7 +74,7 @@ TEST_F(TestFilter, TestFilterCache) { status = Filter::Make(different_schema, condition, configuration, &should_be_new_filter); EXPECT_TRUE(status.ok()); - EXPECT_TRUE(cached_filter.get() != should_be_new_filter.get()); + EXPECT_FALSE(should_be_new_filter->GetCompiledFromCache()); // condition is different, should return a new filter. auto greater_than_10 = TreeExprBuilder::MakeFunction( @@ -78,7 +83,7 @@ TEST_F(TestFilter, TestFilterCache) { std::shared_ptr should_be_new_filter1; status = Filter::Make(schema, new_condition, configuration, &should_be_new_filter1); EXPECT_TRUE(status.ok()); - EXPECT_TRUE(cached_filter.get() != should_be_new_filter1.get()); + EXPECT_FALSE(should_be_new_filter->GetCompiledFromCache()); } TEST_F(TestFilter, TestSimple) { diff --git a/cpp/src/gandiva/tests/projector_test.cc b/cpp/src/gandiva/tests/projector_test.cc index 12020777309a4..cc7eabbb037e6 100644 --- a/cpp/src/gandiva/tests/projector_test.cc +++ b/cpp/src/gandiva/tests/projector_test.cc @@ -39,7 +39,11 @@ using arrow::int32; class TestProjector : public ::testing::Test { public: - void SetUp() { pool_ = arrow::default_memory_pool(); } + void SetUp() { + pool_ = arrow::default_memory_pool(); + // Setup arrow log severity threshold to debug level. + arrow::util::ArrowLog::StartArrowLog("", arrow::util::ArrowLogLevel::ARROW_DEBUG); + } protected: arrow::MemoryPool* pool_; @@ -65,14 +69,16 @@ TEST_F(TestProjector, TestProjectCache) { std::shared_ptr projector; auto status = Projector::Make(schema, {sum_expr, sub_expr}, configuration, &projector); ASSERT_OK(status); + EXPECT_FALSE(projector->GetCompiledFromCache()); // everything is same, should return the same projector. auto schema_same = arrow::schema({field0, field1}); std::shared_ptr cached_projector; status = Projector::Make(schema_same, {sum_expr, sub_expr}, configuration, &cached_projector); + ASSERT_OK(status); - EXPECT_EQ(cached_projector, projector); + EXPECT_TRUE(cached_projector->GetCompiledFromCache()); // schema is different should return a new projector. auto field2 = field("f2", int32()); @@ -81,19 +87,19 @@ TEST_F(TestProjector, TestProjectCache) { status = Projector::Make(different_schema, {sum_expr, sub_expr}, configuration, &should_be_new_projector); ASSERT_OK(status); - EXPECT_NE(cached_projector, should_be_new_projector); + EXPECT_FALSE(should_be_new_projector->GetCompiledFromCache()); // expression list is different should return a new projector. std::shared_ptr should_be_new_projector1; status = Projector::Make(schema, {sum_expr}, configuration, &should_be_new_projector1); ASSERT_OK(status); - EXPECT_NE(cached_projector, should_be_new_projector1); + EXPECT_FALSE(should_be_new_projector1->GetCompiledFromCache()); // another instance of the same configuration, should return the same projector. status = Projector::Make(schema, {sum_expr, sub_expr}, TestConfiguration(), &cached_projector); ASSERT_OK(status); - EXPECT_EQ(cached_projector, projector); + EXPECT_TRUE(cached_projector->GetCompiledFromCache()); } TEST_F(TestProjector, TestProjectCacheFieldNames) { @@ -196,13 +202,15 @@ TEST_F(TestProjector, TestProjectCacheDecimalCast) { auto expr0 = TreeExprBuilder::MakeExpression("castDECIMAL", {field_float64}, res_31_13); std::shared_ptr projector0; ASSERT_OK(Projector::Make(schema, {expr0}, TestConfiguration(), &projector0)); + EXPECT_FALSE(projector0->GetCompiledFromCache()); // if the output scale is different, the cache can't be used. auto res_31_14 = field("result", arrow::decimal(31, 14)); auto expr1 = TreeExprBuilder::MakeExpression("castDECIMAL", {field_float64}, res_31_14); std::shared_ptr projector1; ASSERT_OK(Projector::Make(schema, {expr1}, TestConfiguration(), &projector1)); - EXPECT_NE(projector0.get(), projector1.get()); + // EXPECT_NE(projector0.get(), projector1.get()); -> old expect. + EXPECT_FALSE(projector1->GetCompiledFromCache()); // if the output scale/precision are same, should get a cache hit. auto res_31_13_alt = field("result", arrow::decimal(31, 13)); @@ -210,7 +218,7 @@ TEST_F(TestProjector, TestProjectCacheDecimalCast) { TreeExprBuilder::MakeExpression("castDECIMAL", {field_float64}, res_31_13_alt); std::shared_ptr projector2; ASSERT_OK(Projector::Make(schema, {expr2}, TestConfiguration(), &projector2)); - EXPECT_EQ(projector0.get(), projector2.get()); + EXPECT_TRUE(projector2->GetCompiledFromCache()); } TEST_F(TestProjector, TestIntSumSub) {