From 14f2a62957aac66c251cbcf56e50a29c1363a209 Mon Sep 17 00:00:00 2001 From: Daniel Baston Date: Tue, 6 Dec 2022 22:10:51 -0500 Subject: [PATCH 1/9] Add functions to interrupt processing in a specific thread - Add Interrupt::requestForCurrentThread (C API: GEOS_interruptThread) to request interruption of the current thread only. - Add Interrupt::registerThreadCallback (C API: GEOS_interruptRegisterThreadCallback) to register an interruption callback for the current thread only. --- capi/geos_c.cpp | 12 +++ capi/geos_c.h.in | 25 ++++- include/geos/util/Interrupt.h | 30 +++++- src/util/Interrupt.cpp | 29 +++++- tests/unit/capi/GEOSInterruptTest.cpp | 46 +++++++++ tests/unit/util/InterruptTest.cpp | 137 ++++++++++++++++++++++++++ 6 files changed, 270 insertions(+), 9 deletions(-) create mode 100644 tests/unit/util/InterruptTest.cpp diff --git a/capi/geos_c.cpp b/capi/geos_c.cpp index d8cbdd9f43..3833758d9b 100644 --- a/capi/geos_c.cpp +++ b/capi/geos_c.cpp @@ -116,12 +116,24 @@ extern "C" { return geos::util::Interrupt::registerCallback(cb); } + GEOSInterruptThreadCallback* + GEOS_interruptRegisterThreadCallback(GEOSInterruptThreadCallback* cb, void* data) + { + return geos::util::Interrupt::registerThreadCallback(cb, data); + } + void GEOS_interruptRequest() { geos::util::Interrupt::request(); } + void + GEOS_interruptThread() + { + geos::util::Interrupt::requestForCurrentThread(); + } + void GEOS_interruptCancel() { diff --git a/capi/geos_c.h.in b/capi/geos_c.h.in index 6a9d0fd1fa..b7f3c38a8c 100644 --- a/capi/geos_c.h.in +++ b/capi/geos_c.h.in @@ -305,8 +305,10 @@ typedef int (*GEOSTransformXYCallback)( */ typedef void (GEOSInterruptCallback)(void); +typedef void (GEOSInterruptThreadCallback)(void*); + /** -* Register a function to be called when processing is interrupted. +* Register a function to be called when processing is interrupted on any thread. * \param cb Callback function to invoke * \return the previously configured callback * \see GEOSInterruptCallback @@ -315,10 +317,29 @@ extern GEOSInterruptCallback GEOS_DLL *GEOS_interruptRegisterCallback( GEOSInterruptCallback* cb); /** -* Request safe interruption of operations +* Register a function to be called when processing is interrupted on the current thread. +* +* \param cb Callback function to invoke +* \param context pointer to a context object that will be passed to `cb` +* \return the previously configured callback +* \see GEOSInterruptCallback +*/ +extern GEOSInterruptThreadCallback GEOS_DLL *GEOS_interruptRegisterThreadCallback( + GEOSInterruptThreadCallback* cb, void* context); + +/** +* Request safe interruption of operations. The next thread to check for an +* interrupt will be interrupted. To request interruption of a specific thread, +* instead call `GEOS_interruptThread` from a callback executed by that thread. */ extern void GEOS_DLL GEOS_interruptRequest(void); +/** +* Request safe interruption of operations in the current thread. This function +* should be called from a callback registered by `GEOS_interruptRegisterThreadCallback`. +*/ +extern void GEOS_DLL GEOS_interruptThread(void); + /** * Cancel a pending interruption request */ diff --git a/include/geos/util/Interrupt.h b/include/geos/util/Interrupt.h index e52386d410..f4c355cc13 100644 --- a/include/geos/util/Interrupt.h +++ b/include/geos/util/Interrupt.h @@ -27,15 +27,25 @@ class GEOS_DLL Interrupt { public: typedef void (Callback)(void); + typedef void (ThreadCallback)(void*); /** * Request interruption of operations * * Operations will be terminated by a GEOSInterrupt - * exception at first occasion. + * exception at first occasion, by the first thread + * to check for an interrupt request. */ static void request(); + /** + * Request interruption of operations in the current thread + * + * Operations in the current thread will be terminated by + * a GEOSInterrupt at first occasion. + */ + static void requestForCurrentThread(); + /** Cancel a pending interruption request */ static void cancel(); @@ -43,17 +53,29 @@ class GEOS_DLL Interrupt { static bool check(); /** \brief - * Register a callback that will be invoked + * Register a callback that will be invoked by all threads * before checking for interruption requests. * * NOTE that interruption request checking may happen - * frequently so any callback would better be quick. + * frequently so the callback should execute quickly. * * The callback can be used to call Interrupt::request() - * + * or Interrupt::requestForCurrentThread(). */ static Callback* registerCallback(Callback* cb); + /** \brief + * Register a callback that will be invoked the current thread + * before checking for interruption requests. + * + * NOTE that interruption request checking may happen + * frequently so the callback shoudl execute quickly. + * + * The callback can be used to call Interrupt::request() + * or Interrupt::requestForCurrentThread(). + */ + static ThreadCallback* registerThreadCallback(ThreadCallback* cb, void* data); + /** * Invoke the callback, if any. Process pending interruption, if any. * diff --git a/src/util/Interrupt.cpp b/src/util/Interrupt.cpp index 0bc988221b..409c131c63 100644 --- a/src/util/Interrupt.cpp +++ b/src/util/Interrupt.cpp @@ -18,8 +18,12 @@ namespace { /* Could these be portably stored in thread-specific space ? */ bool requested = false; +thread_local bool requested_for_thread = false; geos::util::Interrupt::Callback* callback = nullptr; +thread_local geos::util::Interrupt::ThreadCallback* callback_thread = nullptr; +thread_local void* callback_thread_data = nullptr; + } namespace geos { @@ -37,16 +41,23 @@ Interrupt::request() requested = true; } +void +Interrupt::requestForCurrentThread() +{ + requested_for_thread = true; +} + void Interrupt::cancel() { requested = false; + requested_for_thread = false; } bool Interrupt::check() { - return requested; + return requested || requested_for_thread; } Interrupt::Callback* @@ -57,14 +68,25 @@ Interrupt::registerCallback(Interrupt::Callback* cb) return prev; } +Interrupt::ThreadCallback* +Interrupt::registerThreadCallback(ThreadCallback* cb, void* data) +{ + ThreadCallback* prev = callback_thread; + callback_thread = cb; + callback_thread_data = data; + return prev; +} + void Interrupt::process() { if(callback) { (*callback)(); } - if(requested) { - requested = false; + if(callback_thread) { + (*callback_thread)(callback_thread_data); + } + if(check()) { interrupt(); } } @@ -74,6 +96,7 @@ void Interrupt::interrupt() { requested = false; + requested_for_thread = false; throw InterruptedException(); } diff --git a/tests/unit/capi/GEOSInterruptTest.cpp b/tests/unit/capi/GEOSInterruptTest.cpp index 0bd0301143..a230713103 100644 --- a/tests/unit/capi/GEOSInterruptTest.cpp +++ b/tests/unit/capi/GEOSInterruptTest.cpp @@ -9,6 +9,7 @@ #include #include #include +#include namespace tut { // @@ -18,6 +19,7 @@ namespace tut { // Common data used in test cases. struct test_capiinterrupt_data { static int numcalls; + static int maxcalls; static GEOSInterruptCallback* nextcb; static void @@ -56,9 +58,18 @@ struct test_capiinterrupt_data { } } + static void + interruptAfterMaxCalls(void* data) + { + if (++*static_cast(data) >= maxcalls) { + GEOS_interruptThread(); + } + } + }; int test_capiinterrupt_data::numcalls = 0; +int test_capiinterrupt_data::maxcalls = 0; GEOSInterruptCallback* test_capiinterrupt_data::nextcb = nullptr; typedef test_group group; @@ -221,5 +232,40 @@ void object::test<5> } +// Test callback is thread-local +template<> +template<> +void object::test<6> +() +{ + maxcalls = 3; + int calls_1 = 0; + int calls_2 = 0; + + initGEOS(notice, notice); + + auto buffer = [](GEOSInterruptThreadCallback* cb, void* data) { + GEOSGeometry* geom1 = GEOSGeomFromWKT("LINESTRING (0 0, 1 0)"); + + GEOS_interruptRegisterThreadCallback(cb, data); + + GEOSGeometry* geom2 = GEOSBuffer(geom1, 1, 8); + GEOSGeom_destroy(geom2); + GEOSGeom_destroy(geom1); + }; + + std::thread t1(buffer, interruptAfterMaxCalls, &calls_1); + std::thread t2(buffer, interruptAfterMaxCalls, &calls_2); + + t1.join(); + t2.join(); + + ensure_equals(calls_1, maxcalls); + ensure_equals(calls_2, maxcalls); + + finishGEOS(); +} + + } // namespace tut diff --git a/tests/unit/util/InterruptTest.cpp b/tests/unit/util/InterruptTest.cpp new file mode 100644 index 0000000000..ceb3147764 --- /dev/null +++ b/tests/unit/util/InterruptTest.cpp @@ -0,0 +1,137 @@ +// tut +#include +// geos +#include +// std +#include +#include +#include + +using geos::util::Interrupt; + +namespace tut { +// +// Test Group +// + +// Common data used in test cases. +struct test_interrupt_data { + static void workForever() { + try { + std::cerr << "Started " << std::this_thread::get_id() << "." << std::endl; + while (true) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + GEOS_CHECK_FOR_INTERRUPTS(); + } + } catch (const std::exception&) { + std::cerr << "Interrupted " << std::this_thread::get_id() << "." << std::endl; + return; + } + } + + static void interruptNow() { + Interrupt::request(); + } + + static std::map* toInterrupt; + + static void interruptIfRequested() { + if (toInterrupt == nullptr) { + return; + } + + auto it = toInterrupt->find(std::this_thread::get_id()); + if (it != toInterrupt->end() && it->second) { + it->second = false; + Interrupt::request(); + } + } +}; + +std::map* test_interrupt_data::toInterrupt = nullptr; + +typedef test_group group; +typedef group::object object; + +group test_interrupt_group("geos::util::Interrupt"); + +// +// Test Cases +// + + +// Interrupt worker thread via global request from from main thead +template<> +template<> +void object::test<1> +() +{ + std::thread t(workForever); + Interrupt::request(); + + t.join(); +} + +// Interrupt worker thread via global requset from worker thread using a callback +template<> +template<> +void object::test<2> +() +{ + Interrupt::registerCallback(interruptIfRequested); + + std::thread t1(workForever); + std::thread t2(workForever); + + std::map shouldInterrupt; + shouldInterrupt[t1.get_id()] = false; + shouldInterrupt[t2.get_id()] = false; + toInterrupt = &shouldInterrupt; + + shouldInterrupt[t2.get_id()] = true; + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + shouldInterrupt[t1.get_id()] = true; + + t1.join(); + t2.join(); +} + +// Register separate callbacks for each thread. Each callback will +// request interruption of itself only. +template<> +template<> +void object::test<3> +() +{ + bool interrupt1 = false; + int numCalls2 = 0; + + auto cb1 = ([](void* data) { + if (*static_cast(data)) { + Interrupt::requestForCurrentThread(); + } + }); + + auto cb2 = ([](void* data) { + if (++*static_cast(data) > 5) { + Interrupt::requestForCurrentThread(); + } + }); + + + std::thread t1([&cb1, &interrupt1]() { + Interrupt::registerThreadCallback(cb1, &interrupt1); + }); + + std::thread t2([&cb2, &numCalls2]() { + Interrupt::registerThreadCallback(cb2, &numCalls2); + }); + + t2.join(); + + interrupt1 = true; + t1.join(); +} + +} // namespace tut + From 26a87eab59e695cc95a520404f692a452ab2964f Mon Sep 17 00:00:00 2001 From: Daniel Baston Date: Thu, 8 Dec 2022 20:24:46 -0500 Subject: [PATCH 2/9] Interrupt doc clarifications --- capi/geos_c.h.in | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/capi/geos_c.h.in b/capi/geos_c.h.in index b7f3c38a8c..f76c94718f 100644 --- a/capi/geos_c.h.in +++ b/capi/geos_c.h.in @@ -297,7 +297,7 @@ typedef int (*GEOSTransformXYCallback)( /** * Callback function for use in interruption. The callback will be invoked _before_ checking for -* interruption, so can be used to request it. +* an interruption request and can be used to request interruption. * * \see GEOS_interruptRegisterCallback * \see GEOS_interruptRequest @@ -308,20 +308,23 @@ typedef void (GEOSInterruptCallback)(void); typedef void (GEOSInterruptThreadCallback)(void*); /** -* Register a function to be called when processing is interrupted on any thread. +* Register a function to be called when a possible interruption point is reached +* on any thread. The function may be used to request interruption. +* * \param cb Callback function to invoke -* \return the previously configured callback +* \return the previously registered callback, or NULL * \see GEOSInterruptCallback */ extern GEOSInterruptCallback GEOS_DLL *GEOS_interruptRegisterCallback( GEOSInterruptCallback* cb); /** -* Register a function to be called when processing is interrupted on the current thread. +* Register a function to be called when a possible interruption point is reached +* on the current thread. The function may be used to request interruption. * * \param cb Callback function to invoke * \param context pointer to a context object that will be passed to `cb` -* \return the previously configured callback +* \return the previously registered callback, or NULL * \see GEOSInterruptCallback */ extern GEOSInterruptThreadCallback GEOS_DLL *GEOS_interruptRegisterThreadCallback( @@ -330,13 +333,14 @@ extern GEOSInterruptThreadCallback GEOS_DLL *GEOS_interruptRegisterThreadCallbac /** * Request safe interruption of operations. The next thread to check for an * interrupt will be interrupted. To request interruption of a specific thread, -* instead call `GEOS_interruptThread` from a callback executed by that thread. +* instead call GEOS_interruptThread() from a callback executed by that thread. */ extern void GEOS_DLL GEOS_interruptRequest(void); /** * Request safe interruption of operations in the current thread. This function -* should be called from a callback registered by `GEOS_interruptRegisterThreadCallback`. +* should be called from a callback registered by GEOS_interruptRegisterThreadCallback() +* or GEOS_interruptRegisterCallback(). */ extern void GEOS_DLL GEOS_interruptThread(void); From cd372424ef6456f5ad3bae58af21ee6e4b4fe19a Mon Sep 17 00:00:00 2001 From: Daniel Baston Date: Thu, 8 Dec 2022 21:11:48 -0500 Subject: [PATCH 3/9] Add default 30s timeout to unit tests --- tests/unit/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index f29a52cf77..697c5fa9bb 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -44,6 +44,7 @@ foreach(_testfile ${_testfiles}) string(CONCAT _testname "geos::" ${_testname}) endif() add_test(NAME unit-${_cmake_testname} COMMAND test_geos_unit ${_testname}) + set_tests_properties(unit-${_cmake_testname} PROPERTIES TIMEOUT 30) endforeach() # Run all the unit tests in one go, for faster memory checking From a941096457e3865da2722a1dbfeeb6a1d462bb05 Mon Sep 17 00:00:00 2001 From: Dan Baston Date: Thu, 8 Dec 2022 21:20:40 -0500 Subject: [PATCH 4/9] Update include/geos/util/Interrupt.h Co-authored-by: Even Rouault --- include/geos/util/Interrupt.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/geos/util/Interrupt.h b/include/geos/util/Interrupt.h index f4c355cc13..d2d20cd54e 100644 --- a/include/geos/util/Interrupt.h +++ b/include/geos/util/Interrupt.h @@ -69,7 +69,7 @@ class GEOS_DLL Interrupt { * before checking for interruption requests. * * NOTE that interruption request checking may happen - * frequently so the callback shoudl execute quickly. + * frequently so the callback should execute quickly. * * The callback can be used to call Interrupt::request() * or Interrupt::requestForCurrentThread(). From 6be3f0379df04c14beada961d60db63d38a0055d Mon Sep 17 00:00:00 2001 From: Daniel Baston Date: Thu, 8 Dec 2022 21:29:38 -0500 Subject: [PATCH 5/9] Avoid potential lost interrupt request in test --- tests/unit/util/InterruptTest.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/unit/util/InterruptTest.cpp b/tests/unit/util/InterruptTest.cpp index ceb3147764..226d74ab97 100644 --- a/tests/unit/util/InterruptTest.cpp +++ b/tests/unit/util/InterruptTest.cpp @@ -72,7 +72,7 @@ void object::test<1> t.join(); } -// Interrupt worker thread via global requset from worker thread using a callback +// Interrupt worker thread via global request from worker thread using a callback template<> template<> void object::test<2> @@ -89,11 +89,10 @@ void object::test<2> toInterrupt = &shouldInterrupt; shouldInterrupt[t2.get_id()] = true; - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - shouldInterrupt[t1.get_id()] = true; + t2.join(); + shouldInterrupt[t1.get_id()] = true; t1.join(); - t2.join(); } // Register separate callbacks for each thread. Each callback will From 6b2bce89ed0a1d8928f899b504c10f5894cca0e7 Mon Sep 17 00:00:00 2001 From: Daniel Baston Date: Fri, 9 Dec 2022 08:28:03 -0500 Subject: [PATCH 6/9] Add more comments --- src/util/Interrupt.cpp | 4 +++- tests/unit/util/InterruptTest.cpp | 4 ++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/util/Interrupt.cpp b/src/util/Interrupt.cpp index 409c131c63..ea3d1691de 100644 --- a/src/util/Interrupt.cpp +++ b/src/util/Interrupt.cpp @@ -16,10 +16,12 @@ #include // for inheritance namespace { -/* Could these be portably stored in thread-specific space ? */ + +// Callback and request status for interruption of any single thread bool requested = false; thread_local bool requested_for_thread = false; +// Callback and request status for interruption of a the current thread geos::util::Interrupt::Callback* callback = nullptr; thread_local geos::util::Interrupt::ThreadCallback* callback_thread = nullptr; thread_local void* callback_thread_data = nullptr; diff --git a/tests/unit/util/InterruptTest.cpp b/tests/unit/util/InterruptTest.cpp index 226d74ab97..01925c6c62 100644 --- a/tests/unit/util/InterruptTest.cpp +++ b/tests/unit/util/InterruptTest.cpp @@ -89,6 +89,10 @@ void object::test<2> toInterrupt = &shouldInterrupt; shouldInterrupt[t2.get_id()] = true; + + // We need to wait until t2 has actually been interrupted + // before we interrupt t1. Otherwise, t2 may cancel our + // request for t1's interrupt. t2.join(); shouldInterrupt[t1.get_id()] = true; From 70f891c43101fdc63a31a360835622cf61e1ceb9 Mon Sep 17 00:00:00 2001 From: Daniel Baston Date: Fri, 9 Dec 2022 08:59:59 -0500 Subject: [PATCH 7/9] Add more comments --- tests/unit/util/InterruptTest.cpp | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/unit/util/InterruptTest.cpp b/tests/unit/util/InterruptTest.cpp index 01925c6c62..11a0ece577 100644 --- a/tests/unit/util/InterruptTest.cpp +++ b/tests/unit/util/InterruptTest.cpp @@ -83,6 +83,10 @@ void object::test<2> std::thread t1(workForever); std::thread t2(workForever); + // Create map and add entries before exposing it to the interrupt + // callback that will be acessed from multiple threads. It's OK + // for multiple threads to modify entries in the map but not for + // multiple threads to create entries. std::map shouldInterrupt; shouldInterrupt[t1.get_id()] = false; shouldInterrupt[t2.get_id()] = false; @@ -92,7 +96,11 @@ void object::test<2> // We need to wait until t2 has actually been interrupted // before we interrupt t1. Otherwise, t2 may cancel our - // request for t1's interrupt. + // request for t1's interrupt. Alternatively, we could + // implement `interruptIfRequested` to repeatedly call + // Interrupt::request() to avoid the lost request. Or just + // use Interrupt::requestForThread() which would also + // avoid this possibility. t2.join(); shouldInterrupt[t1.get_id()] = true; From dbd126394163cad6b17912becf27a8b898160fc7 Mon Sep 17 00:00:00 2001 From: Daniel Baston Date: Fri, 9 Dec 2022 13:16:32 -0500 Subject: [PATCH 8/9] Fix hanging interrupt test --- tests/unit/util/InterruptTest.cpp | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/tests/unit/util/InterruptTest.cpp b/tests/unit/util/InterruptTest.cpp index 11a0ece577..5b5586a4af 100644 --- a/tests/unit/util/InterruptTest.cpp +++ b/tests/unit/util/InterruptTest.cpp @@ -43,7 +43,7 @@ struct test_interrupt_data { auto it = toInterrupt->find(std::this_thread::get_id()); if (it != toInterrupt->end() && it->second) { it->second = false; - Interrupt::request(); + Interrupt::requestForCurrentThread(); } } }; @@ -72,7 +72,7 @@ void object::test<1> t.join(); } -// Interrupt worker thread via global request from worker thread using a callback +// Interrupt worker thread via thread-specific request from worker thread using a callback template<> template<> void object::test<2> @@ -93,14 +93,6 @@ void object::test<2> toInterrupt = &shouldInterrupt; shouldInterrupt[t2.get_id()] = true; - - // We need to wait until t2 has actually been interrupted - // before we interrupt t1. Otherwise, t2 may cancel our - // request for t1's interrupt. Alternatively, we could - // implement `interruptIfRequested` to repeatedly call - // Interrupt::request() to avoid the lost request. Or just - // use Interrupt::requestForThread() which would also - // avoid this possibility. t2.join(); shouldInterrupt[t1.get_id()] = true; From 5081f27843b2015e8767c025cb1f5f06f0adeac4 Mon Sep 17 00:00:00 2001 From: Daniel Baston Date: Mon, 19 Dec 2022 18:21:27 -0500 Subject: [PATCH 9/9] Doc clarification --- capi/geos_c.h.in | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/capi/geos_c.h.in b/capi/geos_c.h.in index f76c94718f..9ecf55886c 100644 --- a/capi/geos_c.h.in +++ b/capi/geos_c.h.in @@ -296,8 +296,8 @@ typedef int (*GEOSTransformXYCallback)( /* ========== Interruption ========== */ /** -* Callback function for use in interruption. The callback will be invoked _before_ checking for -* an interruption request and can be used to request interruption. +* Callback function for use in interruption. The callback will be invoked at each +* possible interruption point and can be used to request interruption. * * \see GEOS_interruptRegisterCallback * \see GEOS_interruptRequest @@ -305,6 +305,13 @@ typedef int (*GEOSTransformXYCallback)( */ typedef void (GEOSInterruptCallback)(void); +/** +* Callback function for use in interruption. The callback will be invoked at each +* possible interruption point and can be used to request interruption. +* +* \see GEOS_interruptRegisterThreadCallback +* \see GEOS_interruptThread +*/ typedef void (GEOSInterruptThreadCallback)(void*); /**