Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [python client] Better Python garbage collection management for C++-owned objects #16535

Merged
merged 20 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
0c1c485
Better Python garbage collection management for C++-owned objects
zbentley Jul 12, 2022
b4d41f2
Remove debug prints
zbentley Jul 12, 2022
ce2b970
Disable logThreads during pulsar logger invocations; allow captive ob…
zbentley Jul 12, 2022
d3f261f
Don't GIL-protect unnecessary sections
zbentley Jul 12, 2022
4576f84
Reduce unnecessary GIL-protection when logging things below the curre…
zbentley Jul 12, 2022
3a3e433
Add integration test
zbentley Jul 12, 2022
b5f2319
Merge branch 'apache:master' into better_python_cpp_object_gc
zbentley Jul 17, 2022
df0db37
Restore lines to pass licence check
zbentley Jul 17, 2022
02564a5
clang-format
zbentley Jul 28, 2022
252fcf2
reformat with docker-format
zbentley Aug 18, 2022
fb577fd
add newlines
zbentley Aug 18, 2022
65bca8c
Switch more logging to pure python
zbentley Sep 2, 2022
2af92a8
Merge remote-tracking branch 'origin/master' into better_python_cpp_o…
zbentley Sep 2, 2022
a927a3f
Merge branch 'master' into better_python_cpp_object_gc
BewareMyPower Sep 6, 2022
c285861
Update pulsar-client-cpp/python/pulsar_test.py
zbentley Sep 6, 2022
7f53d06
Update pulsar-client-cpp/python/pulsar_test.py
zbentley Sep 6, 2022
6f62f5c
Update pulsar-client-cpp/python/pulsar_test.py
zbentley Sep 6, 2022
04cba0b
Update pulsar-client-cpp/python/src/utils.h
zbentley Sep 6, 2022
6f2ed13
Merge branch 'master' into better_python_cpp_object_gc
BewareMyPower Sep 7, 2022
88370c6
Merge branch 'apache:master' into better_python_cpp_object_gc
zbentley Sep 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ target/

# Python
*.pyc
.python-version

# Perf tools
*.hgrm
Expand Down
13 changes: 11 additions & 2 deletions pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,7 @@ def __init__(self, service_url,
conf.concurrent_lookup_requests(concurrent_lookup_requests)
if log_conf_file_path:
conf.log_conf_file_path(log_conf_file_path)
if logger:
conf.set_logger(logger)
conf.set_logger(self._prepare_logger(logger) if logger else None)
if listener_name:
conf.listener_name(listener_name)
if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
Expand All @@ -476,6 +475,16 @@ def __init__(self, service_url,
self._client = _pulsar.Client(service_url, conf)
self._consumers = []

@staticmethod
def _prepare_logger(logger):
import logging
def log(level, message):
old_threads = logging.logThreads
logging.logThreads = False
logger.log(logging.getLevelName(level), message)
logging.logThreads = old_threads
return log

def create_producer(self, topic,
producer_name=None,
schema=schema.BytesSchema(),
Expand Down
31 changes: 31 additions & 0 deletions pulsar-client-cpp/python/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#


import threading
import logging
from unittest import TestCase, main
import time
import os
Expand Down Expand Up @@ -1249,6 +1251,35 @@ def test_json_schema_encode(self):
second_encode = schema.encode(record)
self.assertEqual(first_encode, second_encode)

def test_logger_thread_leaks(self):
def _do_connect(close):
logger = logging.getLogger(str(threading.current_thread().ident))
logger.setLevel(logging.INFO)
client = pulsar.Client(
service_url="pulsar://localhost:6650",
io_threads=4,
message_listener_threads=4,
operation_timeout_seconds=1,
log_conf_file_path=None,
authentication=None,
logger=logger,
)
client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test")
if close:
client.close()

for should_close in (True, False):
self.assertEqual(threading.active_count(), 1, "Explicit close: {}; baseline is 1 thread".format(should_close))
_do_connect(should_close)
self.assertEqual(threading.active_count(), 1, "Explicit close: {}; synchronous connect doesn't leak threads".format(should_close))
threads = []
for _ in range(10):
threads.append(threading.Thread(target=_do_connect, args=(should_close)))
threads[-1].start()
for thread in threads:
thread.join()
assert threading.active_count() == 1, "Explicit close: {}; threaded connect in parallel doesn't leak threads".format(should_close)

def test_chunking(self):
client = Client(self.serviceUrl)
data_size = 10 * 1024 * 1024
Expand Down
65 changes: 19 additions & 46 deletions pulsar-client-cpp/python/src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,94 +93,67 @@ static ReaderConfiguration& ReaderConfiguration_setCryptoKeyReader(ReaderConfigu
return conf;
}

class LoggerWrapper : public Logger {
PyObject* const _pyLogger;
const int _pythonLogLevel;
class LoggerWrapper : public Logger, public CaptivePythonObjectMixin {
const std::unique_ptr<Logger> _fallbackLogger;

static constexpr int _getLogLevelValue(Level level) { return 10 + (level * 10); }

public:
LoggerWrapper(PyObject* pyLogger, int pythonLogLevel, Logger* fallbackLogger)
: _pyLogger(pyLogger), _pythonLogLevel(pythonLogLevel), _fallbackLogger(fallbackLogger) {
Py_XINCREF(_pyLogger);
}
LoggerWrapper(PyObject* pyLogger, Logger* fallbackLogger)
: CaptivePythonObjectMixin(pyLogger), _fallbackLogger(fallbackLogger) {}

LoggerWrapper(const LoggerWrapper&) = delete;
LoggerWrapper(LoggerWrapper&&) noexcept = delete;
LoggerWrapper& operator=(const LoggerWrapper&) = delete;
LoggerWrapper& operator=(LoggerWrapper&&) = delete;

virtual ~LoggerWrapper() { Py_XDECREF(_pyLogger); }

bool isEnabled(Level level) { return _getLogLevelValue(level) >= _pythonLogLevel; }
bool isEnabled(Level level) {
return true; // Python loggers are always enabled; they decide internally whether or not to log.
}

void log(Level level, int line, const std::string& message) {
if (!Py_IsInitialized()) {
// Python logger is unavailable - fallback to console logger
_fallbackLogger->log(level, line, message);
} else {
PyGILState_STATE state = PyGILState_Ensure();

PyObject *type, *value, *traceback;
PyErr_Fetch(&type, &value, &traceback);
try {
switch (level) {
case Logger::LEVEL_DEBUG:
py::call_method<void>(_pyLogger, "debug", message.c_str());
py::call<void>(_captive, "DEBUG", message.c_str());
break;
case Logger::LEVEL_INFO:
py::call_method<void>(_pyLogger, "info", message.c_str());
py::call<void>(_captive, "INFO", message.c_str());
break;
case Logger::LEVEL_WARN:
py::call_method<void>(_pyLogger, "warning", message.c_str());
py::call<void>(_captive, "WARNING", message.c_str());
break;
case Logger::LEVEL_ERROR:
py::call_method<void>(_pyLogger, "error", message.c_str());
py::call<void>(_captive, "ERROR", message.c_str());
break;
}

} catch (const py::error_already_set& e) {
PyErr_Print();
_fallbackLogger->log(level, line, message);
}

PyErr_Restore(type, value, traceback);
PyGILState_Release(state);
}
}
};

class LoggerWrapperFactory : public LoggerFactory {
class LoggerWrapperFactory : public LoggerFactory, public CaptivePythonObjectMixin {
std::unique_ptr<LoggerFactory> _fallbackLoggerFactory{new ConsoleLoggerFactory};
PyObject* _pyLogger;
Optional<int> _pythonLogLevel{Optional<int>::empty()};

void initializePythonLogLevel() {
PyGILState_STATE state = PyGILState_Ensure();

try {
int level = py::call_method<int>(_pyLogger, "getEffectiveLevel");
_pythonLogLevel = Optional<int>::of(level);
} catch (const py::error_already_set& e) {
// Failed to get log level from _pyLogger, set it to empty to fallback to _fallbackLogger
_pythonLogLevel = Optional<int>::empty();
}

PyGILState_Release(state);
}

public:
LoggerWrapperFactory(py::object pyLogger) {
_pyLogger = pyLogger.ptr();
Py_XINCREF(_pyLogger);
initializePythonLogLevel();
}

virtual ~LoggerWrapperFactory() { Py_XDECREF(_pyLogger); }
LoggerWrapperFactory(py::object pyLogger) : CaptivePythonObjectMixin(pyLogger.ptr()) {}

Logger* getLogger(const std::string& fileName) {
const auto fallbackLogger = _fallbackLoggerFactory->getLogger(fileName);
if (_pythonLogLevel.is_present()) {
return new LoggerWrapper(_pyLogger, _pythonLogLevel.value(), fallbackLogger);
} else {
if (_captive == py::object().ptr()) {
return fallbackLogger;
} else {
return new LoggerWrapper(_captive, fallbackLogger);
}
}
};
Expand Down
20 changes: 20 additions & 0 deletions pulsar-client-cpp/python/src/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,23 @@ struct CryptoKeyReaderWrapper {
CryptoKeyReaderWrapper();
CryptoKeyReaderWrapper(const std::string& publicKeyPath, const std::string& privateKeyPath);
};

class CaptivePythonObjectMixin {
protected:
PyObject* _captive;

CaptivePythonObjectMixin(PyObject* captive) {
_captive = captive;
PyGILState_STATE state = PyGILState_Ensure();
Py_XINCREF(_captive);
PyGILState_Release(state);
}

~CaptivePythonObjectMixin() {
if (Py_IsInitialized()) {
PyGILState_STATE state = PyGILState_Ensure();
Py_XDECREF(_captive);
PyGILState_Release(state);
}
}
};