Skip to content

Commit

Permalink
support disk hdfs
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 committed Mar 7, 2024
1 parent c88c7e7 commit 8cd8493
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 21 deletions.
62 changes: 62 additions & 0 deletions cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "GlutenDiskHDFS.h"
#include <ranges>
#if USE_HDFS

namespace local_engine
{
using namespace DB;

void GlutenDiskHDFS::createDirectory(const String & path)
{
DiskObjectStorage::createDirectory(path);
hdfsCreateDirectory(hdfs_object_storage->getHDFSFS(), path.c_str());
}

String GlutenDiskHDFS::path2AbsPath(const String & path)
{
return getObjectStorage()->generateObjectKeyForPath(path).serialize();
}

void GlutenDiskHDFS::createDirectories(const String & path)
{
DiskObjectStorage::createDirectories(path);
auto* hdfs = hdfs_object_storage->getHDFSFS();
fs::path p = path;
std::vector<std::string> paths_created;
while (hdfsExists(hdfs, p.c_str()) < 0)
{
paths_created.push_back(p);
if (!p.has_parent_path())
break;
p = p.parent_path();
}
for (const auto & path_to_create : paths_created | std::views::reverse)
hdfsCreateDirectory(hdfs, path_to_create.c_str());
}

void GlutenDiskHDFS::removeDirectory(const String & path)
{
DiskObjectStorage::removeDirectory(path);
hdfsDelete(hdfs_object_storage->getHDFSFS(), path.c_str(), 1);
}


}
#endif
58 changes: 58 additions & 0 deletions cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <config.h>

#include <Disks/ObjectStorages/DiskObjectStorage.h>
#if USE_HDFS
#include <Disks/ObjectStorages/GlutenHDFSObjectStorage.h>

namespace local_engine
{
class GlutenDiskHDFS : public DB::DiskObjectStorage
{
public:
GlutenDiskHDFS(
const String & name_,
const String & object_key_prefix_,
DB::MetadataStoragePtr metadata_storage_,
DB::ObjectStoragePtr object_storage_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix)
: DB::DiskObjectStorage(name_, object_key_prefix_, metadata_storage_, object_storage_, config, config_prefix)
{
chassert(dynamic_cast<local_engine::GlutenHDFSObjectStorage *>(object_storage_.get()) != nullptr);
hdfs_object_storage = dynamic_cast<local_engine::GlutenHDFSObjectStorage *>(object_storage_.get());
hdfsSetWorkingDirectory(hdfs_object_storage->getHDFSFS(), "/");
}

void createDirectory(const String & path) override;

void createDirectories(const String & path) override;

void removeDirectory(const String & path) override;

private:
String path2AbsPath(const String & path);

local_engine::GlutenHDFSObjectStorage * hdfs_object_storage;
};
#endif
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "GlutenHDFSObjectStorage.h"

#if USE_HDFS
namespace local_engine
{
DB::ObjectStorageKey local_engine::GlutenHDFSObjectStorage::generateObjectKeyForPath(const std::string & path) const
{
return DB::ObjectStorageKey::createAsAbsolute(hdfs_root_path + path);
}
}
#endif

48 changes: 48 additions & 0 deletions cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once
#include "config.h"

#if USE_HDFS
#include <Disks/ObjectStorages/HDFS/HDFSObjectStorage.h>
#endif

namespace local_engine
{

#if USE_HDFS
class GlutenHDFSObjectStorage final : public DB::HDFSObjectStorage
{
public:
GlutenHDFSObjectStorage(
const String & hdfs_root_path_,
SettingsPtr settings_,
const Poco::Util::AbstractConfiguration & config_)
: HDFSObjectStorage(hdfs_root_path_, std::move(settings_), config_)
{
}

DB::ObjectStorageKey generateObjectKeyForPath(const std::string & path) const override;
hdfsFS getHDFSFS() const { return hdfs_fs.get(); }

};
#endif

}


Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
#include <Disks/ObjectStorages/S3/DiskS3Utils.h>
#endif

#if USE_HDFS
#include <Disks/ObjectStorages/GlutenHDFSObjectStorage.h>
#endif

#include <Interpreters/Context.h>
#include <Common/Macros.h>

Expand Down Expand Up @@ -102,5 +106,32 @@ void registerGlutenS3ObjectStorage(ObjectStorageFactory & factory)
});
}

#endif

#if USE_HDFS
void registerGlutenHDFSObjectStorage(ObjectStorageFactory & factory)
{
factory.registerObjectStorageType(
"hdfs_gluten",
[](
const std::string & /* name */,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const ContextPtr & context,
bool /* skip_access_check */) -> ObjectStoragePtr
{
auto uri = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
checkHDFSURL(uri);
if (uri.back() != '/')
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS path must ends with '/', but '{}' doesn't.", uri);

std::unique_ptr<HDFSObjectStorageSettings> settings = std::make_unique<HDFSObjectStorageSettings>(
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000),
context->getSettingsRef().hdfs_replication
);
return std::make_unique<GlutenHDFSObjectStorage>(uri, std::move(settings), config);
});
}
#endif
}
67 changes: 46 additions & 21 deletions cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,49 +21,74 @@
#include <Disks/ObjectStorages/MetadataStorageFactory.h>
#include <Disks/ObjectStorages/ObjectStorageFactory.h>

#if USE_HDFS
#include <Disks/ObjectStorages/GlutenDiskHDFS.h>
#endif

#include "registerGlutenDisks.h"

namespace local_engine
{
using namespace DB;
#if USE_AWS_S3
void registerGlutenS3ObjectStorage(ObjectStorageFactory & factory);
void registerGlutenS3ObjectStorage(DB::ObjectStorageFactory & factory);
#endif

#if USE_HDFS
void registerGlutenHDFSObjectStorage(DB::ObjectStorageFactory & factory);
#endif

void registerGlutenDisks(bool global_skip_access_check)
{
auto & factory = DiskFactory::instance();

auto & factory = DB::DiskFactory::instance();
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const DisksMap & /* map */,
bool, bool) -> DiskPtr
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DB::ContextPtr context,
const DB::DisksMap & /* map */,
bool,
bool) -> DB::DiskPtr
{
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
auto object_storage = ObjectStorageFactory::instance().create(name, config, config_prefix, context, skip_access_check);
auto metadata_storage = MetadataStorageFactory::instance().create(
name, config, config_prefix, object_storage, "local");
auto object_storage = DB::ObjectStorageFactory::instance().create(name, config, config_prefix, context, skip_access_check);
auto metadata_storage = DB::MetadataStorageFactory::instance().create(name, config, config_prefix, object_storage, "local");

DiskObjectStoragePtr disk = std::make_shared<DiskObjectStorage>(
name,
object_storage->getCommonKeyPrefix(),
std::move(metadata_storage),
std::move(object_storage),
config,
config_prefix);
DB::DiskObjectStoragePtr disk = std::make_shared<DB::DiskObjectStorage>(
name, object_storage->getCommonKeyPrefix(), std::move(metadata_storage), std::move(object_storage), config, config_prefix);

disk->startup(context, skip_access_check);
return disk;
};

auto & object_factory = DB::ObjectStorageFactory::instance();
#if USE_AWS_S3
auto & object_factory = ObjectStorageFactory::instance();
registerGlutenS3ObjectStorage(object_factory);
factory.registerDiskType("s3_gluten", creator); /// For compatibility
#endif

#if USE_HDFS
auto hdfs_creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DB::ContextPtr context,
const DB::DisksMap & /* map */,
bool,
bool) -> DB::DiskPtr
{
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
auto object_storage = DB::ObjectStorageFactory::instance().create(name, config, config_prefix, context, skip_access_check);
auto metadata_storage = DB::MetadataStorageFactory::instance().create(name, config, config_prefix, object_storage, "local");

DB::DiskObjectStoragePtr disk = std::make_shared<local_engine::GlutenDiskHDFS>(
name, object_storage->getCommonKeyPrefix(), std::move(metadata_storage), std::move(object_storage), config, config_prefix);

disk->startup(context, skip_access_check);
return disk;
};

registerGlutenHDFSObjectStorage(object_factory);
factory.registerDiskType("hdfs_gluten", hdfs_creator); /// For compatibility
#endif
}
}

0 comments on commit 8cd8493

Please sign in to comment.