Skip to content

Commit

Permalink
Add functions to interrupt processing in a specific thread
Browse files Browse the repository at this point in the history
- Add Interrupt::requestForCurrentThread (C API: GEOS_interruptThread) to
  request interruption of the current thread only.

- Add Interrupt::registerThreadCallback (C API:
  GEOS_interruptRegisterThreadCallback) to register an interruption callback
  for the current thread only.
  • Loading branch information
dbaston committed Dec 9, 2022
1 parent 9af6077 commit 14f2a62
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 9 deletions.
12 changes: 12 additions & 0 deletions capi/geos_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,24 @@ extern "C" {
return geos::util::Interrupt::registerCallback(cb);
}

GEOSInterruptThreadCallback*
GEOS_interruptRegisterThreadCallback(GEOSInterruptThreadCallback* cb, void* data)
{
return geos::util::Interrupt::registerThreadCallback(cb, data);
}

void
GEOS_interruptRequest()
{
geos::util::Interrupt::request();
}

void
GEOS_interruptThread()
{
geos::util::Interrupt::requestForCurrentThread();
}

void
GEOS_interruptCancel()
{
Expand Down
25 changes: 23 additions & 2 deletions capi/geos_c.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,10 @@ typedef int (*GEOSTransformXYCallback)(
*/
typedef void (GEOSInterruptCallback)(void);

typedef void (GEOSInterruptThreadCallback)(void*);

/**
* Register a function to be called when processing is interrupted.
* Register a function to be called when processing is interrupted on any thread.
* \param cb Callback function to invoke
* \return the previously configured callback
* \see GEOSInterruptCallback
Expand All @@ -315,10 +317,29 @@ extern GEOSInterruptCallback GEOS_DLL *GEOS_interruptRegisterCallback(
GEOSInterruptCallback* cb);

/**
* Request safe interruption of operations
* Register a function to be called when processing is interrupted on the current thread.
*
* \param cb Callback function to invoke
* \param context pointer to a context object that will be passed to `cb`
* \return the previously configured callback
* \see GEOSInterruptCallback
*/
extern GEOSInterruptThreadCallback GEOS_DLL *GEOS_interruptRegisterThreadCallback(
GEOSInterruptThreadCallback* cb, void* context);

/**
* Request safe interruption of operations. The next thread to check for an
* interrupt will be interrupted. To request interruption of a specific thread,
* instead call `GEOS_interruptThread` from a callback executed by that thread.
*/
extern void GEOS_DLL GEOS_interruptRequest(void);

/**
* Request safe interruption of operations in the current thread. This function
* should be called from a callback registered by `GEOS_interruptRegisterThreadCallback`.
*/
extern void GEOS_DLL GEOS_interruptThread(void);

/**
* Cancel a pending interruption request
*/
Expand Down
30 changes: 26 additions & 4 deletions include/geos/util/Interrupt.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,55 @@ class GEOS_DLL Interrupt {
public:

typedef void (Callback)(void);
typedef void (ThreadCallback)(void*);

/**
* Request interruption of operations
*
* Operations will be terminated by a GEOSInterrupt
* exception at first occasion.
* exception at first occasion, by the first thread
* to check for an interrupt request.
*/
static void request();

/**
* Request interruption of operations in the current thread
*
* Operations in the current thread will be terminated by
* a GEOSInterrupt at first occasion.
*/
static void requestForCurrentThread();

/** Cancel a pending interruption request */
static void cancel();

/** Check if an interruption request is pending */
static bool check();

/** \brief
* Register a callback that will be invoked
* Register a callback that will be invoked by all threads
* before checking for interruption requests.
*
* NOTE that interruption request checking may happen
* frequently so any callback would better be quick.
* frequently so the callback should execute quickly.
*
* The callback can be used to call Interrupt::request()
*
* or Interrupt::requestForCurrentThread().
*/
static Callback* registerCallback(Callback* cb);

/** \brief
* Register a callback that will be invoked the current thread
* before checking for interruption requests.
*
* NOTE that interruption request checking may happen
* frequently so the callback shoudl execute quickly.
*
* The callback can be used to call Interrupt::request()
* or Interrupt::requestForCurrentThread().
*/
static ThreadCallback* registerThreadCallback(ThreadCallback* cb, void* data);

/**
* Invoke the callback, if any. Process pending interruption, if any.
*
Expand Down
29 changes: 26 additions & 3 deletions src/util/Interrupt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
namespace {
/* Could these be portably stored in thread-specific space ? */
bool requested = false;
thread_local bool requested_for_thread = false;

geos::util::Interrupt::Callback* callback = nullptr;
thread_local geos::util::Interrupt::ThreadCallback* callback_thread = nullptr;
thread_local void* callback_thread_data = nullptr;

}

namespace geos {
Expand All @@ -37,16 +41,23 @@ Interrupt::request()
requested = true;
}

void
Interrupt::requestForCurrentThread()
{
requested_for_thread = true;
}

void
Interrupt::cancel()
{
requested = false;
requested_for_thread = false;
}

bool
Interrupt::check()
{
return requested;
return requested || requested_for_thread;
}

Interrupt::Callback*
Expand All @@ -57,14 +68,25 @@ Interrupt::registerCallback(Interrupt::Callback* cb)
return prev;
}

Interrupt::ThreadCallback*
Interrupt::registerThreadCallback(ThreadCallback* cb, void* data)
{
ThreadCallback* prev = callback_thread;
callback_thread = cb;
callback_thread_data = data;
return prev;
}

void
Interrupt::process()
{
if(callback) {
(*callback)();
}
if(requested) {
requested = false;
if(callback_thread) {
(*callback_thread)(callback_thread_data);
}
if(check()) {
interrupt();
}
}
Expand All @@ -74,6 +96,7 @@ void
Interrupt::interrupt()
{
requested = false;
requested_for_thread = false;
throw InterruptedException();
}

Expand Down
46 changes: 46 additions & 0 deletions tests/unit/capi/GEOSInterruptTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <cstdio>
#include <cstdlib>
#include <memory>
#include <thread>

namespace tut {
//
Expand All @@ -18,6 +19,7 @@ namespace tut {
// Common data used in test cases.
struct test_capiinterrupt_data {
static int numcalls;
static int maxcalls;
static GEOSInterruptCallback* nextcb;

static void
Expand Down Expand Up @@ -56,9 +58,18 @@ struct test_capiinterrupt_data {
}
}

static void
interruptAfterMaxCalls(void* data)
{
if (++*static_cast<int*>(data) >= maxcalls) {
GEOS_interruptThread();
}
}

};

int test_capiinterrupt_data::numcalls = 0;
int test_capiinterrupt_data::maxcalls = 0;
GEOSInterruptCallback* test_capiinterrupt_data::nextcb = nullptr;

typedef test_group<test_capiinterrupt_data> group;
Expand Down Expand Up @@ -221,5 +232,40 @@ void object::test<5>
}


// Test callback is thread-local
template<>
template<>
void object::test<6>
()
{
maxcalls = 3;
int calls_1 = 0;
int calls_2 = 0;

initGEOS(notice, notice);

auto buffer = [](GEOSInterruptThreadCallback* cb, void* data) {
GEOSGeometry* geom1 = GEOSGeomFromWKT("LINESTRING (0 0, 1 0)");

GEOS_interruptRegisterThreadCallback(cb, data);

GEOSGeometry* geom2 = GEOSBuffer(geom1, 1, 8);
GEOSGeom_destroy(geom2);
GEOSGeom_destroy(geom1);
};

std::thread t1(buffer, interruptAfterMaxCalls, &calls_1);
std::thread t2(buffer, interruptAfterMaxCalls, &calls_2);

t1.join();
t2.join();

ensure_equals(calls_1, maxcalls);
ensure_equals(calls_2, maxcalls);

finishGEOS();
}


} // namespace tut

Loading

0 comments on commit 14f2a62

Please sign in to comment.