Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [python client] Better Python garbage collection management for C++-owned objects #16535

Merged
merged 20 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
0c1c485
Better Python garbage collection management for C++-owned objects
zbentley Jul 12, 2022
b4d41f2
Remove debug prints
zbentley Jul 12, 2022
ce2b970
Disable logThreads during pulsar logger invocations; allow captive ob…
zbentley Jul 12, 2022
d3f261f
Don't GIL-protect unnecessary sections
zbentley Jul 12, 2022
4576f84
Reduce unnecessary GIL-protection when logging things below the curre…
zbentley Jul 12, 2022
3a3e433
Add integration test
zbentley Jul 12, 2022
b5f2319
Merge branch 'apache:master' into better_python_cpp_object_gc
zbentley Jul 17, 2022
df0db37
Restore lines to pass licence check
zbentley Jul 17, 2022
02564a5
clang-format
zbentley Jul 28, 2022
252fcf2
reformat with docker-format
zbentley Aug 18, 2022
fb577fd
add newlines
zbentley Aug 18, 2022
65bca8c
Switch more logging to pure python
zbentley Sep 2, 2022
2af92a8
Merge remote-tracking branch 'origin/master' into better_python_cpp_o…
zbentley Sep 2, 2022
a927a3f
Merge branch 'master' into better_python_cpp_object_gc
BewareMyPower Sep 6, 2022
c285861
Update pulsar-client-cpp/python/pulsar_test.py
zbentley Sep 6, 2022
7f53d06
Update pulsar-client-cpp/python/pulsar_test.py
zbentley Sep 6, 2022
6f62f5c
Update pulsar-client-cpp/python/pulsar_test.py
zbentley Sep 6, 2022
04cba0b
Update pulsar-client-cpp/python/src/utils.h
zbentley Sep 6, 2022
6f2ed13
Merge branch 'master' into better_python_cpp_object_gc
BewareMyPower Sep 7, 2022
88370c6
Merge branch 'apache:master' into better_python_cpp_object_gc
zbentley Sep 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pulsar-client-cpp/lib/checksum/crc32c_arm.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
#define crc32c_u16(crc, v) __crc32ch(crc, v)
#define crc32c_u32(crc, v) __crc32cw(crc, v)
#define crc32c_u64(crc, v) __crc32cd(crc, v)
#define PREF4X64L1(buffer, PREF_OFFSET, ITR) \
zbentley marked this conversation as resolved.
Show resolved Hide resolved
__asm__("PRFM PLDL1KEEP, [%x[v],%[c]]" ::[v] "r"(buffer), [ c ] "I"((PREF_OFFSET) + ((ITR) + 0) * 64)); \
__asm__("PRFM PLDL1KEEP, [%x[v],%[c]]" ::[v] "r"(buffer), [ c ] "I"((PREF_OFFSET) + ((ITR) + 1) * 64)); \
__asm__("PRFM PLDL1KEEP, [%x[v],%[c]]" ::[v] "r"(buffer), [ c ] "I"((PREF_OFFSET) + ((ITR) + 2) * 64)); \
__asm__("PRFM PLDL1KEEP, [%x[v],%[c]]" ::[v] "r"(buffer), [ c ] "I"((PREF_OFFSET) + ((ITR) + 3) * 64));
#define PREF4X64L1(buffer, PREF_OFFSET, ITR) \
__asm__("PRFM PLDL1KEEP, [%x[v],%[c]]" ::[v] "r"(buffer), [c] "I"((PREF_OFFSET) + ((ITR) + 0) * 64)); \
__asm__("PRFM PLDL1KEEP, [%x[v],%[c]]" ::[v] "r"(buffer), [c] "I"((PREF_OFFSET) + ((ITR) + 1) * 64)); \
__asm__("PRFM PLDL1KEEP, [%x[v],%[c]]" ::[v] "r"(buffer), [c] "I"((PREF_OFFSET) + ((ITR) + 2) * 64)); \
__asm__("PRFM PLDL1KEEP, [%x[v],%[c]]" ::[v] "r"(buffer), [c] "I"((PREF_OFFSET) + ((ITR) + 3) * 64));

#define PREF1KL1(buffer, PREF_OFFSET) \
PREF4X64L1(buffer, (PREF_OFFSET), 0) \
Expand Down
31 changes: 31 additions & 0 deletions pulsar-client-cpp/python/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#


import threading
import logging
from unittest import TestCase, main
import time
import os
Expand Down Expand Up @@ -1238,6 +1240,35 @@ def test_json_schema_encode(self):
second_encode = schema.encode(record)
self.assertEqual(first_encode, second_encode)

def test_logger_thread_leaks(self):
def _do_connect(close):
logger = logging.getLogger(str(threading.currentThread().ident))
zbentley marked this conversation as resolved.
Show resolved Hide resolved
logger.setLevel(logging.INFO)
client = pulsar.Client(
service_url="pulsar://localhost:6650",
io_threads=4,
message_listener_threads=4,
operation_timeout_seconds=1,
log_conf_file_path=None,
authentication=None,
logger=logger,
)
client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test")
if close:
client.close()

for should_close in (True, False):
assert threading.active_count() == 1, "Explicit close: {}; baseline is 1 thread".format(should_close)
_do_connect(should_close)
assert threading.active_count() == 1, "Explicit close: {}; synchronous connect doesn't leak threads".format(should_close)
zbentley marked this conversation as resolved.
Show resolved Hide resolved
threads = []
for _ in range(10):
threads.append(threading.Thread(target=_do_connect))
zbentley marked this conversation as resolved.
Show resolved Hide resolved
threads[-1].start()
for thread in threads:
thread.join()
assert threading.active_count() == 1, "Explicit close: {}; threaded connect in parallel doesn't leak threads".format(should_close)

def _check_value_error(self, fun):
with self.assertRaises(ValueError):
fun()
Expand Down
70 changes: 36 additions & 34 deletions pulsar-client-cpp/python/src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,70 +93,76 @@ static ReaderConfiguration& ReaderConfiguration_setCryptoKeyReader(ReaderConfigu
return conf;
}

class LoggerWrapper : public Logger {
PyObject* const _pyLogger;
class LoggerWrapper : public Logger, public CaptivePythonObjectMixin {
const int _pythonLogLevel;
const std::unique_ptr<Logger> _fallbackLogger;

py::object _loggerModule;
static constexpr int _getLogLevelValue(Level level) { return 10 + (level * 10); }

public:
LoggerWrapper(PyObject* pyLogger, int pythonLogLevel, Logger* fallbackLogger)
: _pyLogger(pyLogger), _pythonLogLevel(pythonLogLevel), _fallbackLogger(fallbackLogger) {
Py_XINCREF(_pyLogger);
: CaptivePythonObjectMixin(pyLogger),
_pythonLogLevel(pythonLogLevel),
_fallbackLogger(fallbackLogger) {
PyGILState_STATE state = PyGILState_Ensure();
_loggerModule = py::import("logging");
PyGILState_Release(state);
}

LoggerWrapper(const LoggerWrapper&) = delete;
LoggerWrapper(LoggerWrapper&&) noexcept = delete;
LoggerWrapper& operator=(const LoggerWrapper&) = delete;
LoggerWrapper& operator=(LoggerWrapper&&) = delete;

virtual ~LoggerWrapper() { Py_XDECREF(_pyLogger); }

bool isEnabled(Level level) { return _getLogLevelValue(level) >= _pythonLogLevel; }

void log(Level level, int line, const std::string& message) {
switch (level) {
case Logger::LEVEL_DEBUG:
log(level, line, "debug", message);
break;
case Logger::LEVEL_INFO:
log(level, line, "info", message);
break;
case Logger::LEVEL_WARN:
log(level, line, "warning", message);
break;
case Logger::LEVEL_ERROR:
log(level, line, "error", message);
break;
}
}

private:
void log(Level level, int line, const char* method, const std::string& message) {
if (!Py_IsInitialized()) {
// Python logger is unavailable - fallback to console logger
_fallbackLogger->log(level, line, message);
} else {
PyGILState_STATE state = PyGILState_Ensure();

py::object oldLogThreads = _loggerModule.attr("logThreads");
_loggerModule.attr("logThreads") = py::object(false);
try {
switch (level) {
case Logger::LEVEL_DEBUG:
py::call_method<void>(_pyLogger, "debug", message.c_str());
break;
case Logger::LEVEL_INFO:
py::call_method<void>(_pyLogger, "info", message.c_str());
break;
case Logger::LEVEL_WARN:
py::call_method<void>(_pyLogger, "warning", message.c_str());
break;
case Logger::LEVEL_ERROR:
py::call_method<void>(_pyLogger, "error", message.c_str());
break;
}

py::call_method<void>(_captive, method, message.c_str());
_loggerModule.attr("logThreads") = oldLogThreads;
} catch (const py::error_already_set& e) {
_fallbackLogger->log(level, line, message);
_loggerModule.attr("logThreads") = oldLogThreads;
}

PyGILState_Release(state);
}
}
};

class LoggerWrapperFactory : public LoggerFactory {
class LoggerWrapperFactory : public LoggerFactory, public CaptivePythonObjectMixin {
std::unique_ptr<LoggerFactory> _fallbackLoggerFactory{new ConsoleLoggerFactory};
PyObject* _pyLogger;
Optional<int> _pythonLogLevel{Optional<int>::empty()};

void initializePythonLogLevel() {
PyGILState_STATE state = PyGILState_Ensure();

try {
int level = py::call_method<int>(_pyLogger, "getEffectiveLevel");
int level = py::call_method<int>(_captive, "getEffectiveLevel");
_pythonLogLevel = Optional<int>::of(level);
} catch (const py::error_already_set& e) {
// Failed to get log level from _pyLogger, set it to empty to fallback to _fallbackLogger
Expand All @@ -167,18 +173,14 @@ class LoggerWrapperFactory : public LoggerFactory {
}

public:
LoggerWrapperFactory(py::object pyLogger) {
_pyLogger = pyLogger.ptr();
Py_XINCREF(_pyLogger);
LoggerWrapperFactory(py::object pyLogger) : CaptivePythonObjectMixin(pyLogger) {
initializePythonLogLevel();
}

virtual ~LoggerWrapperFactory() { Py_XDECREF(_pyLogger); }

Logger* getLogger(const std::string& fileName) {
const auto fallbackLogger = _fallbackLoggerFactory->getLogger(fileName);
if (_pythonLogLevel.is_present()) {
return new LoggerWrapper(_pyLogger, _pythonLogLevel.value(), fallbackLogger);
return new LoggerWrapper(_captive, _pythonLogLevel.value(), fallbackLogger);
} else {
return fallbackLogger;
}
Expand Down Expand Up @@ -315,4 +317,4 @@ void export_config() {
.def("read_compacted", &ReaderConfiguration::isReadCompacted)
.def("read_compacted", &ReaderConfiguration::setReadCompacted)
.def("crypto_key_reader", &ReaderConfiguration_setCryptoKeyReader, return_self<>());
}
}
22 changes: 22 additions & 0 deletions pulsar-client-cpp/python/src/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,25 @@ struct CryptoKeyReaderWrapper {
CryptoKeyReaderWrapper();
CryptoKeyReaderWrapper(const std::string& publicKeyPath, const std::string& privateKeyPath);
};

class CaptivePythonObjectMixin {
protected:
PyObject* _captive;

CaptivePythonObjectMixin(PyObject* captive) {
_captive = captive;
PyGILState_STATE state = PyGILState_Ensure();
Py_XINCREF(_captive);
PyGILState_Release(state);
}

CaptivePythonObjectMixin(py::object captive) : CaptivePythonObjectMixin(captive.ptr()) {}
zbentley marked this conversation as resolved.
Show resolved Hide resolved

~CaptivePythonObjectMixin() {
if (Py_IsInitialized()) {
PyGILState_STATE state = PyGILState_Ensure();
Py_XDECREF(_captive);
PyGILState_Release(state);
}
}
};
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved