Skip to content

Commit

Permalink
feat: integrate to cgo (#153)
Browse files Browse the repository at this point in the history
Signed-off-by: Ted Xu <ted.xu@zilliz.com>
  • Loading branch information
tedxu authored Nov 11, 2024
1 parent fcfc4a9 commit 96d4408
Show file tree
Hide file tree
Showing 16 changed files with 375 additions and 94 deletions.
25 changes: 23 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: 1.19
go-version: 1.21

- name: Install dependencies
run: cd go && go mod download
Expand All @@ -35,6 +35,27 @@ jobs:
-v /tmp/data:/data \
-v /tmp/config:/root/.minio \
minio/minio server /data
- name: Install dependencies
uses: aminya/setup-cpp@v1
with:
conan: 1.61.0
cmake: true

- name: setup conan
run:
conan remote add default-conan-local https://milvus01.jfrog.io/artifactory/api/conan/default-conan-local --insert
&& conan remote list

- name: conan package cache
uses: actions/cache@v3
with:
path: ~/.conan
key: conan-${{ hashFiles('./cpp/conanfile.py') }}
restore-keys: conan-

- name: Build c++
run: cd cpp && make

- name: Run tests
run: cd go && go test -v ./...
run: cd go && make && make test
13 changes: 10 additions & 3 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,22 @@ option(WITH_BENCHMARK "Build with micro benchmark." ON)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(CMAKE_INSTALL_PREFIX ${CMAKE_CURRENT_BINARY_DIR})
include(GNUInstallDirs)


if (WITH_OPENDAL)
add_compile_definitions(MILVUS_OPENDAL)
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
include(libopendal)
endif()

find_package(Azure REQUIRED)
#find_package(Azure REQUIRED)
find_package(Boost REQUIRED)
find_package(Arrow REQUIRED)
find_package(protobuf REQUIRED)
find_package(glog REQUIRED)
find_package(AWSSDK REQUIRED)
#find_package(AWSSDK REQUIRED)

file(GLOB_RECURSE SRC_FILES src/*.cpp src/*.cc)
add_library(milvus-storage ${SRC_FILES})
Expand Down Expand Up @@ -49,4 +52,8 @@ endif()

if (WITH_BENCHMARK)
add_subdirectory(benchmark)
endif()
endif()

configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/milvus-storage.pc.in "${CMAKE_CURRENT_BINARY_DIR}/milvus-storage.pc" @ONLY)
install(FILES "${CMAKE_CURRENT_BINARY_DIR}/milvus-storage.pc" DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")
install(FILES "${CMAKE_CURRENT_BINARY_DIR}/libmilvus-storage.dylib" DESTINATION "${CMAKE_INSTALL_LIBDIR}")
5 changes: 3 additions & 2 deletions cpp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ endif

build:
mkdir -p build && cd build && \
conan install .. --build=missing --update && \
conan build ..
conan install .. --build=missing -u && \
conan build .. && \
conan install .. --install-folder . -g make

debug:
mkdir -p build && cd build && \
Expand Down
9 changes: 5 additions & 4 deletions cpp/conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@


class StorageConan(ConanFile):
name = "storage"
name = "milvus-storage"
description = "empty"
topics = ("vector", "cloud", "ann")
url = "https://github.com/milvus-io/milvus-storage"
homepage = "https://github.com/milvus-io/milvus-storage"
license = "Apache-2.0"
version = "0.1.0"

settings = "os", "arch", "compiler", "build_type"
options = {
Expand Down Expand Up @@ -48,11 +49,12 @@ class StorageConan(ConanFile):
"arrow:with_jemalloc": True,
"boost:without_test": True,
}

exports_sources = (
"src/*",
"include/*",
"thirdparty/*",
"test/*",
"benchmark/*",
"CMakeLists.txt",
"*.cmake",
"conanfile.py",
Expand Down Expand Up @@ -156,8 +158,7 @@ def build(self):
def package(self):
cmake = CMake(self)
cmake.install()
files.rmdir(self, os.path.join(self.package_folder, "lib", "cmake"))
files.rmdir(self, os.path.join(self.package_folder, "lib", "pkgconfig"))
self.copy("*_c.h")

def package_info(self):
self.cpp_info.set_property("cmake_file_name", "storage")
Expand Down
6 changes: 6 additions & 0 deletions cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ using RowOffsetMinHeap =

class PackedRecordBatchReader : public arrow::RecordBatchReader {
public:
// Test only
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema> schema,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
Expand Down
32 changes: 32 additions & 0 deletions cpp/include/milvus-storage/packed/reader_c.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2023 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#ifdef __cplusplus
extern "C" {
#endif

#include <arrow/c/abi.h>

typedef void* CReader;
typedef void* CStatus;
typedef void* CRecordBatch;
typedef void* CFileSystem;

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out);

#ifdef __cplusplus
}
#endif
9 changes: 9 additions & 0 deletions cpp/src/milvus-storage.pc.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
libdir=@CMAKE_INSTALL_FULL_LIBDIR@
includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@

Name: Milvus Storage
Description: Milvus Storage
Version: @PROJECT_VERSION@

Libs: -L${libdir} -lstorage
Cflags: -I${includedir}
31 changes: 27 additions & 4 deletions cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@

namespace milvus_storage {

PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema> schema,
const int64_t buffer_size)
: PackedRecordBatchReader(
fs, std::vector<std::string>{path}, schema, std::vector<ColumnOffset>(), std::set<int>(), buffer_size) {}

PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
Expand All @@ -39,21 +46,38 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
row_limit_(0),
absolute_row_position_(0),
read_count_(0) {
auto cols = std::set(needed_columns);
if (cols.empty()) {
for (int i = 0; i < schema->num_fields(); i++) {
cols.emplace(i);
}
}
auto offsets = std::vector<ColumnOffset>(column_offsets);
if (column_offsets.empty()) {
for (int i = 0; i < schema->num_fields(); i++) {
offsets.emplace_back(0, i);
}
}
std::set<int> needed_paths;
for (int i : needed_columns) {
needed_column_offsets_.push_back(column_offsets[i]);
needed_paths.emplace(column_offsets[i].path_index);
for (int i : cols) {
needed_column_offsets_.push_back(offsets[i]);
needed_paths.emplace(offsets[i].path_index);
}
for (auto i : needed_paths) {
auto result = MakeArrowFileReader(fs, paths[i]);
if (!result.ok()) {
LOG_STORAGE_ERROR_ << "Error making file reader " << i << ":" << result.status().ToString();
throw std::runtime_error(result.status().ToString());
}
file_readers_.emplace_back(std::move(result.value()));
}

for (int i = 0; i < file_readers_.size(); ++i) {
auto metadata = file_readers_[i]->parquet_reader()->metadata()->key_value_metadata()->Get(ROW_GROUP_SIZE_META_KEY);
if (!metadata.ok()) {
LOG_STORAGE_ERROR_ << "metadata not found in file " << i;
throw std::runtime_error(metadata.status().ToString());
}
row_group_sizes_.push_back(PackedMetaSerde::deserialize(metadata.ValueOrDie()));
LOG_STORAGE_DEBUG_ << " file " << i << " metadata size: " << file_readers_[i]->parquet_reader()->metadata()->size();
}
Expand Down Expand Up @@ -149,7 +173,6 @@ arrow::Status PackedRecordBatchReader::advanceBuffer() {
}
buffer_available_ -= plan_buffer_size;
row_limit_ = sorted_offsets.top().second;

return arrow::Status::OK();
}

Expand Down
47 changes: 47 additions & 0 deletions cpp/src/packed/reader_c.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "packed/reader_c.h"
#include "common/log.h"
#include "packed/reader.h"
#include "filesystem/fs.h"
#include "common/config.h"

#include <arrow/c/bridge.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/status.h>
#include <iostream>
#include <memory>

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out) {
auto truePath = std::string(path);
auto factory = std::make_shared<milvus_storage::FileSystemFactory>();
auto conf = milvus_storage::StorageConfig();
conf.uri = "file:///tmp/";
auto r = factory->BuildFileSystem(conf, &truePath);
if (!r.ok()) {
LOG_STORAGE_ERROR_ << "Error building filesystem: " << path;
return -2;
}
auto trueFs = r.value();
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
auto reader = std::make_shared<milvus_storage::PackedRecordBatchReader>(*trueFs, path, trueSchema, buffer_size);
auto status = ExportRecordBatchReader(reader, out);
LOG_STORAGE_ERROR_ << "read export done";
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Error exporting record batch reader" << status.ToString();
return static_cast<int>(status.code());
}
return 0;
}
5 changes: 2 additions & 3 deletions cpp/test/packed/packed_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
#include <string>
#include <iostream>

using namespace std;

namespace milvus_storage {

class PackedTestBase : public ::testing::Test {
Expand Down Expand Up @@ -76,6 +74,7 @@ class PackedTestBase : public ::testing::Test {

void TearDown() override {
if (fs_ != nullptr) {
// FIXME: delete is not working
fs_->DeleteDir(file_path_);
}
}
Expand Down Expand Up @@ -201,7 +200,7 @@ class PackedTestBase : public ::testing::Test {

std::vector<int32_t> int32_values;
std::vector<int64_t> int64_values;
std::vector<basic_string<char>> str_values;
std::vector<std::basic_string<char>> str_values;
};

} // namespace milvus_storage
19 changes: 18 additions & 1 deletion go/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
include ../cpp/build/conanbuildinfo.mak

MILVUS_STORAGE_ROOT = $(abspath $(CURDIR)/..)
MILVUS_STORAGE_INCLUDE_DIR = $(abspath $(MILVUS_STORAGE_ROOT)/cpp/include)
MILVUS_STORAGE_LD_DIR = $(abspath $(MILVUS_STORAGE_ROOT)/cpp/build/Release)

CFLAGS += $(CONAN_CFLAGS)
CXXFLAGS += $(CONAN_CXXFLAGS)
INCLUDE_DIRS = $(CONAN_INCLUDE_DIRS_ARROW) $(MILVUS_STORAGE_INCLUDE_DIR)
CPPFLAGS = $(addprefix -I, $(INCLUDE_DIRS))
LDFLAGS += $(addprefix -L, $(MILVUS_STORAGE_LD_DIR))

.EXPORT_ALL_VARIABLES:
.PHONY: proto
.PHONY: build

build:
CGO_CFLAGS="$(CPPFLAGS)" CGO_LDFLAGS="$(LDFLAGS) -lmilvus-storage" go build ./...

test:
CGO_CFLAGS="$(CPPFLAGS)" CGO_LDFLAGS="$(LDFLAGS) -Wl,-rpath,$(MILVUS_STORAGE_LD_DIR) -lmilvus-storage" go test -timeout 30s ./...
proto:
mkdir -p proto/manifest_proto
mkdir -p proto/schema_proto
Expand Down
Loading

0 comments on commit 96d4408

Please sign in to comment.