-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
test_iostream.cpp
118 lines (90 loc) · 3.3 KB
/
test_iostream.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/*
tests/test_iostream.cpp -- Usage of scoped_output_redirect
Copyright (c) 2017 Henry F. Schreiner
All rights reserved. Use of this source code is governed by a
BSD-style license that can be found in the LICENSE file.
*/
#if defined(_MSC_VER) && _MSC_VER < 1910 // VS 2015's MSVC
# pragma warning(disable: 4702) // unreachable code in system header (xatomic.h(382))
#endif
#include <pybind11/iostream.h>
#include "pybind11_tests.h"
#include <atomic>
#include <iostream>
#include <thread>
void noisy_function(std::string msg, bool flush) {
std::cout << msg;
if (flush)
std::cout << std::flush;
}
void noisy_funct_dual(std::string msg, std::string emsg) {
std::cout << msg;
std::cerr << emsg;
}
// object to manage C++ thread
// simply repeatedly write to std::cerr until stopped
// redirect is called at some point to test the safety of scoped_estream_redirect
struct TestThread {
TestThread() : t_{nullptr}, stop_{false} {
auto thread_f = [this] {
while (!stop_) {
std::cout << "x" << std::flush;
std::this_thread::sleep_for(std::chrono::microseconds(50));
} };
t_ = new std::thread(std::move(thread_f));
}
~TestThread() {
delete t_;
}
void stop() { stop_ = true; }
void join() {
py::gil_scoped_release gil_lock;
t_->join();
}
void sleep() {
py::gil_scoped_release gil_lock;
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
std::thread * t_;
std::atomic<bool> stop_;
};
TEST_SUBMODULE(iostream, m) {
add_ostream_redirect(m);
// test_evals
m.def("captured_output_default", [](std::string msg) {
py::scoped_ostream_redirect redir;
std::cout << msg << std::flush;
});
m.def("captured_output", [](std::string msg) {
py::scoped_ostream_redirect redir(std::cout, py::module_::import("sys").attr("stdout"));
std::cout << msg << std::flush;
});
m.def("guard_output", &noisy_function,
py::call_guard<py::scoped_ostream_redirect>(),
py::arg("msg"), py::arg("flush")=true);
m.def("captured_err", [](std::string msg) {
py::scoped_ostream_redirect redir(std::cerr, py::module_::import("sys").attr("stderr"));
std::cerr << msg << std::flush;
});
m.def("noisy_function", &noisy_function, py::arg("msg"), py::arg("flush") = true);
m.def("dual_guard", &noisy_funct_dual,
py::call_guard<py::scoped_ostream_redirect, py::scoped_estream_redirect>(),
py::arg("msg"), py::arg("emsg"));
m.def("raw_output", [](std::string msg) {
std::cout << msg << std::flush;
});
m.def("raw_err", [](std::string msg) {
std::cerr << msg << std::flush;
});
m.def("captured_dual", [](std::string msg, std::string emsg) {
py::scoped_ostream_redirect redirout(std::cout, py::module_::import("sys").attr("stdout"));
py::scoped_ostream_redirect redirerr(std::cerr, py::module_::import("sys").attr("stderr"));
std::cout << msg << std::flush;
std::cerr << emsg << std::flush;
});
py::class_<TestThread>(m, "TestThread")
.def(py::init<>())
.def("stop", &TestThread::stop)
.def("join", &TestThread::join)
.def("sleep", &TestThread::sleep);
}