Skip to content

Commit

Permalink
Implemented rpc logging
Browse files Browse the repository at this point in the history
  • Loading branch information
mkatanbaf committed Apr 13, 2022
1 parent f841b63 commit 3935e6b
Show file tree
Hide file tree
Showing 11 changed files with 1,342 additions and 263 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ list(APPEND COMPILER_SRCS "src/target/datatype/myfloat/myfloat.cc")
tvm_file_glob(GLOB RUNTIME_SRCS
src/runtime/*.cc
src/runtime/vm/*.cc
src/runtime/minrpc/*.cc
)

if(BUILD_FOR_HEXAGON)
Expand Down
12 changes: 9 additions & 3 deletions python/tvm/rpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ def request_and_run(self, key, func, priority=1, session_timeout=0, max_retry=2)
)


def connect(url, port, key="", session_timeout=0, session_constructor_args=None):
def connect(url, port, key="", session_timeout=0, session_constructor_args=None, **kwargs):
"""Connect to RPC Server
Parameters
Expand All @@ -483,6 +483,9 @@ def connect(url, port, key="", session_timeout=0, session_constructor_args=None)
The first element of the list is always a string specifying the name of
the session constructor, the following args are the positional args to that function.
**kwargs: optional
enable_logging flag to enable/disable logging. Logging is disabled by default.
Returns
-------
sess : RPCSession
Expand All @@ -505,7 +508,7 @@ def connect(url, port, key="", session_timeout=0, session_constructor_args=None)
client_via_proxy = rpc.connect(
proxy_server_url, proxy_server_port, proxy_server_key,
session_constructor_args=[
"rpc.Connect", internal_url, internal_port, internal_key])
"rpc.Connect", internal_url, internal_port, internal_key, enable_logging])
"""
try:
Expand All @@ -514,7 +517,10 @@ def connect(url, port, key="", session_timeout=0, session_constructor_args=None)
session_constructor_args = session_constructor_args if session_constructor_args else []
if not isinstance(session_constructor_args, (list, tuple)):
raise TypeError("Expect the session constructor to be a list or tuple")
sess = _ffi_api.Connect(url, port, key, *session_constructor_args)
enable_logging = False
if kwargs is not None and "enable_logging" in kwargs:
enable_logging = kwargs["enable_logging"]
sess = _ffi_api.Connect(url, port, key, enable_logging, *session_constructor_args)
except NameError:
raise RuntimeError("Please compile with USE_RPC=1")
return RPCSession(sess)
Expand Down
64 changes: 64 additions & 0 deletions src/runtime/minrpc/minrpc_intrfc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#ifndef TVM_RUNTIME_MINRPC_MINRPC_INTRFC_H_
#define TVM_RUNTIME_MINRPC_MINRPC_INTRFC_H_

#include <tvm/runtime/c_runtime_api.h>

#include "rpc_reference.h"

namespace tvm {
namespace runtime {

/*!
* \brief Return interface used in ExecInterface to generate and sent the responses.
*/
class ReturnInterface {
public:
virtual ~ReturnInterface() {}
virtual void ReturnVoid() = 0;
virtual void ReturnHandle(void* handle) = 0;
virtual void ReturnException(const char* msg) = 0;
virtual void ReturnPackedSeq(const TVMValue* arg_values, const int* type_codes, int num_args) = 0;
virtual void ReturnCopyAck(uint64_t* num_bytes, uint8_t** data_ptr) = 0;
virtual void ReturnLastTVMError() = 0;
virtual void ThrowError(RPCServerStatus code, RPCCode info = RPCCode::kNone) = 0;
};

/*!
* \brief Execute interface used in MinRPCServer to process different received commands
*/
class ExecInterface {
public:
virtual ~ExecInterface() {}
virtual void ExecInitServer(int* _num_args) = 0;
virtual void ExecNormalCallFunc(uint64_t* _call_handle, TVMValue** _values, int** _tcodes,
int* _num_args) = 0;
virtual void ExecCopyFromRemote(DLTensor** _arr, uint64_t* _num_bytes, uint8_t** _data_ptr) = 0;
virtual int ExecCopyToRemote(DLTensor** _arr, uint64_t* _num_bytes, uint8_t** _data_ptr) = 0;
virtual void ExecSyscallFunc(RPCCode* _code, TVMValue** _values, int** _tcodes,
int* _num_args) = 0;
virtual void ThrowError(RPCServerStatus code, RPCCode info = RPCCode::kNone) = 0;
virtual ReturnInterface* getReturnInterface() = 0;
};

} // namespace runtime
} // namespace tvm
#endif // TVM_RUNTIME_MINRPC_MINRPC_INTRFC_H_
252 changes: 252 additions & 0 deletions src/runtime/minrpc/minrpc_logger.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "minrpc_logger.h"

#include <string.h>
#include <time.h>
#include <tvm/runtime/c_runtime_api.h>
#include <tvm/runtime/logging.h>

#include <functional>
#include <iostream>
#include <sstream>
#include <unordered_map>

#include "minrpc_intrfc.h"
#include "rpc_reference.h"

namespace tvm {
namespace runtime {

std::stringstream Logger::LogTime() {
char buf[100];
time_t t;
struct tm *timeptr, result, temp;
std::stringstream ss;

t = time(NULL);
timeptr = localtime_r(&t, &temp);
strftime(buf, sizeof(buf), "%a %m/%d/%Y %r", timeptr);

if (strptime(buf, "%a %m/%d/%Y %r", &result) == NULL) {
ss << "TIME UNKNOWN, ";
} else {
ss << result.tm_hour << ":" << result.tm_min << ":" << result.tm_sec << ", ";
}
return ss;
}

void Logger::OutputLog() {
LOG(INFO) << os_.str();
this->os_.str(std::string());
}

MinRPCReturnsWithLog::MinRPCReturnsWithLog(ReturnInterface* next) : next_(next), logger_() {}

MinRPCReturnsWithLog::~MinRPCReturnsWithLog() {}

void MinRPCReturnsWithLog::ReturnVoid() {
next_->ReturnVoid();
logger->LogString("-> ReturnVoid");
logger->OutputLog();
}

void MinRPCReturnsWithLog::ReturnHandle(void* handle) {
next_->ReturnHandle(handle);
if (this->code == RPCCode::kGetGlobalFunc) {
this->registerHandleName(handle);
}
logger->LogVal<void*>("-> ReturnHandle: ", handle);
logger->OutputLog();
}

void MinRPCReturnsWithLog::ReturnException(const char* msg) {
next_->ReturnException(msg);
logger->LogString("-> Exception: ");
logger->LogString(msg);
logger->OutputLog();
}

void MinRPCReturnsWithLog::ReturnPackedSeq(const TVMValue* arg_values, const int* type_codes,
int num_args) {
next_->ReturnPackedSeq(arg_values, type_codes, num_args);
processValues(&arg_values, &type_codes, &num_args);
logger->OutputLog();
}

void MinRPCReturnsWithLog::ReturnCopyAck(uint64_t* num_bytes, uint8_t** data_ptr) {
next_->ReturnCopyAck(num_bytes, data_ptr);
logger->LogVal<uint64_t>("-> CopyAck: ", *num_bytes);
logger->LogVal<void*>(", ", reinterpret_cast<void*>(*data_ptr));
logger->OutputLog();
}

void MinRPCReturnsWithLog::ReturnLastTVMError() {
const char* err = TVMGetLastError();
this->ReturnException(err);
}

void MinRPCReturnsWithLog::ThrowError(RPCServerStatus code, RPCCode info = RPCCode::kNone) {
next_->ThrowError(code, info);
logger->LogString("-> ERROR");
logger->OutputLog();
}

void MinRPCReturnsWithLog::processValues(const TVMValue** values, const int** tcodes,
int* num_args) {
if (*tcodes != nullptr) {
logger->LogString("-> [");
for (int i = 0; i < *num_args; ++i) {
logger->LogTVMValue((*tcodes)[i], (*values)[i]);

if ((*tcodes)[i] == kTVMOpaqueHandle) {
this->registerHandleName((*values)[i].v_handle);
}
}
logger->LogString("]");
}
}

void MinRPCReturnsWithLog::resetCurrHandleName(RPCCode code) {
this->code = code;
logger->LogString(RPCCodeToString(code));
logger->LogString(", ");
this->CurrHandleName.clear();
}

void MinRPCReturnsWithLog::updateCurrHandleName(const char* name) {
if (this->CurrHandleName.length() != 0) {
this->CurrHandleName.append("::");
}
this->CurrHandleName.append(name);
}

void MinRPCReturnsWithLog::getHandleName(void* handle) {
if (array_tracker.find(handle) != array_tracker.end()) {
CurrHandleName.append(array_tracker[handle]);
logger->LogHandleName(CurrHandleName);
}
}

void MinRPCReturnsWithLog::releaseHandleName(void* handle) {
if (array_tracker.find(handle) != array_tracker.end()) {
logger->LogHandleName(array_tracker[handle]);
array_tracker.erase(handle);
}
}

Logger* MinRPCReturnsWithLog::getLogger() { return this->logger; }

void MinRPCReturnsWithLog::registerHandleName(void* handle) {
const std::string newString = CurrHandleName;
array_tracker[handle] = newString;
}

MinRPCExecuteWithLog::MinRPCExecuteWithLog(ExecInterface* next) : next_(next) {
this->ret_ = reinterpret_cast<MinRPCReturnsWithLog*>(next_->getReturnInterface());
this->logger = ret_->getLogger();
}

MinRPCExecuteWithLog::~MinRPCExecuteWithLog() {}

void MinRPCExecuteWithLog::ExecInitServer(int* _num_args) {
setRPCCode(RPCCode::kInitServer);
logger->LogString("Init Server");
next_->ExecInitServer(_num_args);
}

void MinRPCExecuteWithLog::ExecNormalCallFunc(uint64_t* call_handle, TVMValue** values,
int** tcodes, int* num_args) {
setRPCCode(RPCCode::kCallFunc);
logger->LogVal<void*>("call_handle: ", reinterpret_cast<void*>(*call_handle));
ret_->getHandleName(reinterpret_cast<void*>(*call_handle));
if (*num_args > 0) {
logger->LogString(", ");
}
this->processValues(values, tcodes, num_args);
next_->ExecNormalCallFunc(call_handle, values, tcodes, num_args);
}

void MinRPCExecuteWithLog::ExecCopyFromRemote(DLTensor** arr, uint64_t* num_bytes,
uint8_t** temp_data) {
setRPCCode(RPCCode::kCopyFromRemote);
logger->LogVal<void*>("data_handle: ", reinterpret_cast<void*>((*arr)->data));
logger->LogDLDevice(", DLDevice(type,id):", &(*arr)->device);
logger->LogVal<int64_t>(", ndim: ", (*arr)->ndim);
logger->LogDLData(", DLDataType(code,bits,lane): ", &(*arr)->dtype);
logger->LogVal<uint64_t>(", num_bytes:", *num_bytes);
next_->ExecCopyFromRemote(arr, num_bytes, temp_data);
}

int MinRPCExecuteWithLog::ExecCopyToRemote(DLTensor** arr, uint64_t* _num_bytes,
uint8_t** _data_ptr) {
setRPCCode(RPCCode::kCopyToRemote);
logger->LogVal<void*>("data_handle: ", reinterpret_cast<void*>((*arr)->data));
logger->LogDLDevice(", DLDevice(type,id):", &(*arr)->device);
logger->LogVal<int64_t>(", ndim: ", (*arr)->ndim);
logger->LogDLData(", DLDataType(code,bits,lane): ", &(*arr)->dtype);
logger->LogVal<uint64_t>(", byte_offset: ", (*arr)->byte_offset);
return next_->ExecCopyToRemote(arr, _num_bytes, _data_ptr);
}

void MinRPCExecuteWithLog::ExecSyscallFunc(RPCCode* code, TVMValue** values, int** tcodes,
int* num_args) {
setRPCCode(*code);
if ((*code) == RPCCode::kFreeHandle) {
if (((*num_args) == 2) && ((*tcodes)[0] == kTVMOpaqueHandle) && ((*tcodes)[1] == kDLInt)) {
logger->LogVal<void*>("handle: ", reinterpret_cast<void*>((*values)[0].v_handle));
if ((*values)[1].v_int64 == kTVMModuleHandle ||
(*values)[1].v_int64 == kTVMPackedFuncHandle) {
ret_->releaseHandleName(reinterpret_cast<void*>((*values)[0].v_handle));
}
}
} else {
this->processValues(values, tcodes, num_args);
}
next_->ExecSyscallFunc(code, values, tcodes, num_args);
}

void MinRPCExecuteWithLog::ThrowError(RPCServerStatus code, RPCCode info = RPCCode::kNone) {
logger->LogString("-> Error\n");
next_->ThrowError(code, info);
}

ReturnInterface* MinRPCExecuteWithLog::getReturnInterface() { return next_->getReturnInterface(); }

void MinRPCExecuteWithLog::processValues(TVMValue** values, int** tcodes, int* num_args) {
if (*tcodes != nullptr) {
logger->LogString("[");
for (int i = 0; i < *num_args; ++i) {
logger->LogTVMValue((*tcodes)[i], (*values)[i]);

if ((*tcodes)[i] == kTVMStr) {
if (strlen((*values)[i].v_str) > 0) {
ret_->updateCurrHandleName((*values)[i].v_str);
}
}
}
logger->LogString("]");
}
}

void MinRPCExecuteWithLog::setRPCCode(RPCCode code) { ret_->resetCurrHandleName(code); }

} // namespace runtime
} // namespace tvm
Loading

0 comments on commit 3935e6b

Please sign in to comment.