Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DS-146] Enabling caching to disk #8

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
919f174
Add SetLlvmObjectcache function
augustoasilva Jun 21, 2021
f2da60d
Add the GandivaObjectCache class
augustoasilva Jun 21, 2021
5b5d127
Create new builder to cache the object code on memory and set an uniq…
augustoasilva Jun 21, 2021
c05d351
Add funcs to read the cached object code and a safely evict to free m…
augustoasilva Jun 21, 2021
ec74782
Change Make() func to cache the object code instead of module
augustoasilva Jun 21, 2021
8daf0d2
Remove unused debug logs
augustoasilva Jun 21, 2021
df835e2
Change Filter::Make() func to cache object code instead of module
augustoasilva Jun 21, 2021
0a5ce03
Comment test env var configuration
augustoasilva Jun 21, 2021
dd2daab
Enable caching on disk of the object code
augustoasilva Jun 22, 2021
1cf9797
Add gandiva cache file protobuf schema
augustoasilva Jun 22, 2021
45f8c29
Add gandiva cache file support for flatbuffers
augustoasilva Jun 23, 2021
4218669
Add gandiva cache file initial support for protobuf
augustoasilva Jun 23, 2021
8484be9
Add gandiva types flatbuffer schema
augustoasilva Jun 23, 2021
da812ab
Update gandiva cache file flatbuffer schema
augustoasilva Jun 24, 2021
cde9831
Add schema and expression to the cache's file to prevent hash collisi…
augustoasilva Jun 24, 2021
1e2d4ef
Fix broken lru_cache tests
augustoasilva Jun 24, 2021
c900ac7
Add to lru_cache test the object code eviction and on disck caching t…
augustoasilva Jun 24, 2021
ac85888
Add to lru_cache test the object code reinsertion test case
augustoasilva Jun 24, 2021
0e5638f
Add protobuf support with optional flatbuffer support also and format…
augustoasilva Jun 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions cpp/src/gandiva/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,32 +121,74 @@ if(NOT APPLE AND NOT MSVC_TOOLCHAIN)
"${GANDIVA_SHARED_LINK_FLAGS} ${GANDIVA_VERSION_SCRIPT_FLAGS}")
endif()

# protobuf
include(FindProtobuf)
find_package(Protobuf REQUIRED)
include_directories(${PROTOBUF_INCLUDE_DIR})

if(MSVC)
add_definitions(-DPROTOBUF_USE_DLLS)
endif()

set(PROTO_OUTPUT_DIR ${CMAKE_CURRENT_BINARY_DIR})
set(PROTO_OUTPUT_FILES "${PROTO_OUTPUT_DIR}/gandiva_cache_file.pb.cc")
set(PROTO_OUTPUT_FILES ${PROTO_OUTPUT_FILES} "${PROTO_OUTPUT_DIR}/gandiva_cache_file.pb.h")

set_source_files_properties(${PROTO_OUTPUT_FILES} PROPERTIES GENERATED TRUE)


get_filename_component(ABS_GANDIVA_PROTO_CACHE ${CMAKE_SOURCE_DIR}/src/gandiva/proto/gandiva_cache_file.proto
ABSOLUTE)


add_custom_command(OUTPUT ${PROTO_OUTPUT_FILES}
COMMAND ${ARROW_PROTOBUF_PROTOC}
--proto_path
${CMAKE_SOURCE_DIR}/src/gandiva/proto
--cpp_out
${PROTO_OUTPUT_DIR}
${CMAKE_SOURCE_DIR}/src/gandiva/proto/gandiva_cache_file.proto
DEPENDS ${ABS_GANDIVA_PROTO_CACHE} ${ARROW_PROTOBUF_LIBPROTOBUF}
COMMENT "Running PROTO compiler on gandiva_cache_file.proto"
VERBATIM)

add_custom_target(gandiva_cache_proto ALL DEPENDS ${PROTO_OUTPUT_FILES})

set(PROTO_SRCS "${PROTO_OUTPUT_DIR}/gandiva_cache_file.pb.cc")
set(PROTO_HDRS "${PROTO_OUTPUT_DIR}/gandiva_cache_file.pb.h")


add_arrow_lib(gandiva
CMAKE_PACKAGE_NAME
Gandiva
PKG_CONFIG_NAME
gandiva
SOURCES
${SRC_FILES}
${PROTO_SRCS}
PRECOMPILED_HEADERS
"$<$<COMPILE_LANGUAGE:CXX>:gandiva/pch.h>"
OUTPUTS
GANDIVA_LIBRARIES
DEPENDENCIES
arrow_dependencies
precompiled
gandiva_cache_proto
EXTRA_INCLUDES
$<TARGET_PROPERTY:LLVM::LLVM_INTERFACE,INTERFACE_INCLUDE_DIRECTORIES>
${GANDIVA_OPENSSL_INCLUDE_DIR}
${UTF8PROC_INCLUDE_DIR}
${PROTOBUF_INCLUDE_DIR}
SHARED_LINK_FLAGS
${GANDIVA_SHARED_LINK_FLAGS}
SHARED_LINK_LIBS
arrow_shared
SHARED_PRIVATE_LINK_LIBS
${GANDIVA_SHARED_PRIVATE_LINK_LIBS}
${ARROW_PROTOBUF_LIBPROTOBUF}
STATIC_LINK_LIBS
${GANDIVA_STATIC_LINK_LIBS})
${GANDIVA_STATIC_LINK_LIBS}
${ARROW_PROTOBUF_LIBPROTOBUF})

foreach(LIB_TARGET ${GANDIVA_LIBRARIES})
target_compile_definitions(${LIB_TARGET} PRIVATE GANDIVA_EXPORTING)
Expand All @@ -157,8 +199,6 @@ if(ARROW_BUILD_STATIC AND WIN32)
target_compile_definitions(gandiva_static PUBLIC GANDIVA_STATIC)
endif()

add_dependencies(gandiva ${GANDIVA_LIBRARIES})

arrow_install_all_headers("gandiva")

set(GANDIVA_STATIC_TEST_LINK_LIBS gandiva_static ${ARROW_TEST_LINK_LIBS})
Expand Down
213 changes: 213 additions & 0 deletions cpp/src/gandiva/base_cache_key.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef ARROW_BASE_CACHE_KEY_H
#define ARROW_BASE_CACHE_KEY_H

#include <arrow/util/hash_util.h>
#include <stddef.h>

#include <boost/any.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <boost/variant.hpp>
#include <sstream>

#include "gandiva/expression.h"
#include "gandiva/filter.h"
#include "gandiva/projector.h"

namespace gandiva {

class BaseCacheKey {
public:
BaseCacheKey(Expression& expr, std::string type) : type_(type) {
static const int kSeedValue = 4;
std::string expr_as_string = expr.ToString();
size_t result_hash = kSeedValue;
arrow::internal::hash_combine(result_hash, type);
arrow::internal::hash_combine(result_hash, expr_as_string);
hash_code_ = result_hash;

// Generate the same UUID based on the hash_code
boost::uuids::name_generator_sha1 gen(boost::uuids::ns::oid());
uuid_ = gen(std::to_string(result_hash));
};

BaseCacheKey(ProjectorCacheKey& key, std::string type, std::vector<ExpressionPtr> exprs)
: type_(type) {
// static const int kSeedValue = 4;
// size_t key_hash = key.Hash();
// size_t result_hash = kSeedValue;
// arrow::internal::hash_combine(result_hash, type);
// arrow::internal::hash_combine(result_hash, key_hash);
// hash_code_ = result_hash;
hash_code_ = key.Hash();
key_ = key;
key_ = key;

schema_string_ = key.schema()->ToString(true);

for (auto& expr : exprs) {
exprs_string_.push_back(expr->ToString());
}

// Generate the same UUID based on the hash_code
boost::uuids::name_generator_sha1 gen(boost::uuids::ns::oid());
uuid_ = gen(std::to_string(hash_code_));
};

BaseCacheKey(FilterCacheKey& key, std::string type, ConditionPtr expr) : type_(type) {
// static const int kSeedValue = 4;
// size_t key_hash = key.Hash();
// size_t result_hash = kSeedValue;
// arrow::internal::hash_combine(result_hash, type);
// arrow::internal::hash_combine(result_hash, key_hash);
// hash_code_ = result_hash;
hash_code_ = key.Hash();
key_ = key;

schema_string_ = key.schema()->ToString(true);

exprs_string_.push_back(expr->ToString());

// Generate the same UUID based on the hash_code
boost::uuids::name_generator_sha1 gen(boost::uuids::ns::oid());
uuid_ = gen(std::to_string(hash_code_));
};

BaseCacheKey(std::shared_ptr<arrow::Schema> schema, std::shared_ptr<Expression> expr,
std::string type)
: type_(type) {
static const int kSeedValue = 4;
unsigned long int result_hash = kSeedValue;
arrow::internal::hash_combine(result_hash, type);
arrow::internal::hash_combine(result_hash, schema->ToString());
arrow::internal::hash_combine(result_hash, expr->ToString());
hash_code_ = result_hash;

// Generate the same UUID based on the hash_code
boost::uuids::name_generator_sha1 gen(boost::uuids::ns::oid());
uuid_ = gen(std::to_string(result_hash));
};

/// Constructor used only for tests
BaseCacheKey(std::string type, std::string value) : type_(type) {
static const int kSeedValue = 4;

size_t result_hash = kSeedValue;
arrow::internal::hash_combine(result_hash, type);
arrow::internal::hash_combine(result_hash, value);

exprs_string_.push_back(value);
schema_string_ = value;

hash_code_ = result_hash;

// Generate the same UUID based on the hash_code
boost::uuids::name_generator_sha1 gen(boost::uuids::ns::oid());
uuid_ = gen(std::to_string(result_hash));
};

/// Constructor used only for tests involving caching objects code
BaseCacheKey(std::string type, SchemaPtr schema, std::vector<ExpressionPtr> exprs)
: type_(type) {
static const int kSeedValue = 4;

size_t result_hash = kSeedValue;
arrow::internal::hash_combine(result_hash, type);
arrow::internal::hash_combine(result_hash, schema->ToString());

for (auto& expr : exprs) {
auto expr_string = expr->ToString();
arrow::internal::hash_combine(result_hash, expr_string);
exprs_string_.push_back(expr_string);
}

hash_code_ = result_hash;
schema_string_ = schema->ToString();

// Generate the same UUID based on the hash_code
boost::uuids::name_generator_sha1 gen(boost::uuids::ns::oid());
uuid_ = gen(std::to_string(result_hash));
};

size_t Hash() const { return hash_code_; }

boost::uuids::uuid Uuid() const { return uuid_; }

std::string Type() const { return type_; }

std::string getUuidString() const {
std::string uuid_string = "";
std::stringstream ss;
ss << uuid_;
return ss.str();
}

std::string getSchemaString() const { return schema_string_; }

std::vector<std::string> getExprsString() const { return exprs_string_; }

bool checkCacheFile(const std::string& schema,
const std::vector<std::string>& exprs) const {
if (schema_string_ != schema) {
return false;
}

if (exprs_string_ != exprs) {
return false;
}
return true;
}

bool operator==(const BaseCacheKey& other) const {
if (hash_code_ != other.hash_code_) {
return false;
}

if (uuid_ != other.uuid_) {
return false;
}

if (exprs_string_ != other.exprs_string_) {
return false;
}

if (schema_string_ != other.schema_string_) {
return false;
}

return true;
};

bool operator!=(const BaseCacheKey& other) const { return !(*this == other); }

private:
uint64_t hash_code_;
std::string type_;
boost::uuids::uuid uuid_;
boost::any key_ = nullptr;
std::vector<std::string> exprs_string_;
std::string schema_string_;
};

} // namespace gandiva

#endif // ARROW_BASE_CACHE_KEY_H
65 changes: 59 additions & 6 deletions cpp/src/gandiva/cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@

namespace gandiva {

static const int DEFAULT_CACHE_SIZE = 500;
// static const int DEFAULT_CACHE_SIZE = 500; //old cache capacity of 500 itens list
static const size_t DEFAULT_CACHE_SIZE = 256 * 1024 * 1024; // bytes or 256 MiB
static const size_t DEFAULT_DISK_CACHE_SIZE = 1ULL * 1024 * 1024 * 1024; // bytes or 1
// GiB
static const size_t DEFAULT_RESERVED_SIZE = 10ULL * 1024 * 1024 * 1024; // bytes or 10
// GiB

int GetCapacity() {
int capacity;
size_t GetCapacity() {
size_t capacity;
const char* env_cache_size = std::getenv("GANDIVA_CACHE_SIZE");
if (env_cache_size != nullptr) {
capacity = std::atoi(env_cache_size);
capacity = std::stoul(env_cache_size);

if (capacity <= 0) {
ARROW_LOG(WARNING) << "Invalid cache size provided. Using default cache size: "
<< DEFAULT_CACHE_SIZE;
Expand All @@ -35,11 +41,58 @@ int GetCapacity() {
} else {
capacity = DEFAULT_CACHE_SIZE;
}

return capacity;
}

size_t GetDiskCapacity() {
size_t capacity;
const char* env_disk_cache_size = std::getenv("GANDIVA_DISK_CAPACITY_SIZE");
if (env_disk_cache_size != nullptr) {
capacity = std::stoul(env_disk_cache_size);

if (capacity <= 0) {
ARROW_LOG(WARNING) << "Invalid cache size provided. Using default cache size: "
<< DEFAULT_DISK_CACHE_SIZE;
capacity = DEFAULT_DISK_CACHE_SIZE;
}

} else {
capacity = DEFAULT_DISK_CACHE_SIZE;
}

return capacity;
}

void LogCacheSize(size_t capacity) {
ARROW_LOG(INFO) << "Creating gandiva cache with capacity: " << capacity;
size_t GetReserved() {
size_t reserved;
const char* env_reserved_size = std::getenv("GANDIVA_DISK_RESERVED_SIZE");
if (env_reserved_size != nullptr) {
reserved = std::stoul(env_reserved_size);

if (reserved <= 0) {
ARROW_LOG(WARNING) << "Invalid cache size provided. Using default cache size: "
<< DEFAULT_RESERVED_SIZE;
reserved = DEFAULT_RESERVED_SIZE;
}
} else {
reserved = DEFAULT_RESERVED_SIZE;
}

return reserved;
}

/*void LogCacheSize(size_t capacity) {
ARROW_LOG(DEBUG) << "Creating gandiva cache with capacity of " << capacity << " bytes";
}*/

void LogCacheSizeSafely(size_t capacity, size_t disk_capacity, size_t reserved) {
ARROW_LOG(DEBUG) << "Creating gandiva cache with memory capacity of " << capacity
<< " bytes";
ARROW_LOG(DEBUG) << "Creating gandiva cache with disk space of " << disk_capacity
<< " bytes";
ARROW_LOG(DEBUG) << "Creating gandiva cache with reserved disk space of " << reserved
<< " bytes";
}

} // namespace gandiva
Loading