Skip to content

Conversation

@AscendTransport
Copy link
Contributor

No description provided.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @AscendTransport, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly advances the integration and optimization for Ascend NPU devices within the system. It introduces a more efficient memory copy strategy by leveraging Ascend-specific APIs and refactors the core transfer engine for better resource sharing and pooling. The changes also include new batch data transfer functionalities tailored for Ascend, alongside a comprehensive overhaul of the underlying transport layer to support aggregated operations, aiming to boost overall performance and scalability on NPU hardware.

Highlights

  • Ascend NPU Integration: Introduced specialized batch put and get operations (batch_put_from_ascend, batch_get_into_ascend) for Ascend NPUs, enhancing data transfer capabilities for these devices.
  • Memory Copy Optimization: Refactored memory copy operations within the MemcpyWorkerPool to utilize Ascend-specific aclrtMemcpy functions, replacing generic std::memcpy for improved performance on NPU devices.
  • Transfer Engine Pooling: Modified the TransferEngine to use std::shared_ptr and introduced global pooling mechanisms (g_transfer_engine, g_separate_pool) to allow for shared and multiplexed use of the transfer engine across different components.
  • Aggregated Transport Logic: Implemented new aggregated transport mechanisms within the HCCL transport layer, including dedicated threads for initiator and target roles, to optimize batch data transfers by grouping smaller operations.
  • Build System and Configuration: Updated CMake configurations to enable USE_ASCEND by default and allow flexible specification of the Ascend toolkit path via environment variables, streamlining the build process for Ascend-enabled environments.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces significant changes to support Ascend NPUs and implements a pooling mechanism for the TransferEngine. The changes are extensive, touching the build system, core store and transfer engine logic, Python bindings, and adding new transport layers for Ascend. While the overall direction seems correct, there are several issues that need to be addressed. I've found a critical bug in the RPC service logic, some design concerns regarding encapsulation and use of global variables, and several inconsistencies and leftover debug code. Please review my comments for details.

Comment on lines 325 to 335
for (size_t i = 0; i < keys.size(); ++i) {
slice_len.reserve(keys.size());
all_slice_len = 0;
for (size_t j = 0; j < slice_lengths[i].size(); ++j) {
all_slice_len += slice_lengths[i][j];
}
slice_len.emplace_back(all_slice_len);
// LOG(ERROR) << "master_server put start, len:" << slice_lengths[i].size();
results.emplace_back(
master_service_.PutStart(keys[i], slice_lengths[i], config));
master_service_.PutStart(keys[i], slice_len, config));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There's a bug in the BatchPutStart implementation. The slice_len vector is not cleared within the loop, causing it to accumulate total sizes from previous keys. For the i-th key, master_service_.PutStart is called with a slice_len vector containing total sizes for keys 0 to i, instead of just for key i. This will likely lead to incorrect behavior or errors in the master service.

Additionally, slice_len.reserve(keys.size()); is called inside the loop, which is inefficient. It should be moved outside or removed if only one element is ever needed.

    for (size_t i = 0; i < keys.size(); ++i) {
        uint64_t all_slice_len = 0;
        for (size_t j = 0; j < slice_lengths[i].size(); ++j) {
            all_slice_len += slice_lengths[i][j];
        }
        // LOG(ERROR) << "master_server put start, len:" << slice_lengths[i].size();
        results.emplace_back(
            master_service_.PutStart(keys[i], {all_slice_len}, config));
    }

Comment on lines +1309 to +1313
if (replica.is_memory_replica() == false) {
key_slices.emplace_back(Slice{buffers[j], sizes[j]});
} else {
key_slices.emplace_back(Slice{buffers[j], sizes[j]});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The if/else block here has identical bodies. This is redundant and can be simplified to a single statement.

            key_slices.emplace_back(Slice{buffers[j], sizes[j]});

Comment on lines +1240 to +1245
const size_t num_keys = 1;
std::vector<tl::expected<int64_t, ErrorCode>> results;
results.reserve(num_keys);

if (num_keys == 0) {
return results;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The code const size_t num_keys = 1; followed by if (num_keys == 0) is dead code since num_keys is a compile-time constant and the condition will always be false. This should be removed to improve code clarity.

    const size_t num_keys = 1;
    std::vector<tl::expected<int64_t, ErrorCode>> results;
    results.reserve(num_keys);

this->local_hostname = local_hostname;
}

LOG(ERROR) << "setup_internal local_hostname:" << this->local_hostname;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This log message uses LOG(ERROR) for what appears to be a debug/informational message. Using ERROR level for non-error conditions can clutter logs and make it harder to find real errors. Please consider changing this to LOG(INFO) or VLOG(1).

Suggested change
LOG(ERROR) << "setup_internal local_hostname:" << this->local_hostname;
LOG(INFO) << "setup_internal local_hostname:" << this->local_hostname;

Comment on lines +1454 to +1455
LOG(ERROR) << "batch put keys size:" << keys.size() << ", ordered_batched_slices size:" << ordered_batched_slices.size()
<< ", slice size len:" << slices.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This LOG(ERROR) message appears to be for debugging purposes. Please consider removing it or changing it to a lower severity level like LOG(INFO) or VLOG to avoid cluttering error logs.

    LOG(INFO) << "batch put keys size:" << keys.size() << ", ordered_batched_slices size:" << ordered_batched_slices.size()
    << ", slice size len:" << slices.size();

Comment on lines +1017 to +1029
// auto start = std::chrono::high_resolution_clock::now();

auto internal_results = batch_get_into_internal_ascend(key, buffers, sizes);
std::vector<int> results;
results.reserve(internal_results.size());

for (const auto &result : internal_results) {
results.push_back(to_py_ret(result));
}
// auto stop = std::chrono::high_resolution_clock::now();
// auto duration_call =
// std::chrono::duration_cast<std::chrono::microseconds>(stop - start);
// LOG(INFO) << "key: " << key << ", batch_get_into_ascend: " << duration_call.count() << "us";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This function contains commented-out code for performance measurement. This should be removed before merging to keep the codebase clean.

Suggested change
// auto start = std::chrono::high_resolution_clock::now();
auto internal_results = batch_get_into_internal_ascend(key, buffers, sizes);
std::vector<int> results;
results.reserve(internal_results.size());
for (const auto &result : internal_results) {
results.push_back(to_py_ret(result));
}
// auto stop = std::chrono::high_resolution_clock::now();
// auto duration_call =
// std::chrono::duration_cast<std::chrono::microseconds>(stop - start);
// LOG(INFO) << "key: " << key << ", batch_get_into_ascend: " << duration_call.count() << "us";
auto internal_results = batch_get_into_internal_ascend(key, buffers, sizes);
std::vector<int> results;
results.reserve(internal_results.size());
for (const auto &result : internal_results) {
results.push_back(to_py_ret(result));
}
return results;


std::vector<Transport *> listTransports();

std::map<std::string, std::shared_ptr<Transport>> transport_map_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The transport_map_ member has been made public. This breaks encapsulation. If other classes need to access the transports, consider providing a getter method that returns a const reference or a specific transport by name, rather than exposing the entire map.

return local_topology_;
}

std::string local_server_name_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The local_server_name_ member has been made public. This breaks encapsulation. If other components need this information, it would be better to provide a getter method.

Comment on lines +255 to +256
extern __attribute__ ((visibility ("default"))) std::shared_ptr<TransferEngine> g_transfer_engine;
extern __attribute__ ((visibility ("default"))) bool g_separate_pool;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The introduction of global variables g_transfer_engine and g_separate_pool for pooling is a design concern. Global state can make the code harder to reason about, test, and maintain. It also introduces tight coupling between different parts of the system. Have you considered alternative approaches, such as dependency injection or a singleton pattern with controlled access, to manage the shared TransferEngine instance?

LOG(WARNING) << "Transport " << proto << " already installed";
return transport;
}
LOG(WARNING) << "Transport not used";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This log message appears to be a leftover debug statement. Please remove it before merging.

@AscendTransport
Copy link
Contributor Author

Conflict resolution in progress

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant