Skip to content

Commit

Permalink
win, pipe: avoid synchronous pipe deadlocks
Browse files Browse the repository at this point in the history
Add a thread that will interrupt uv_pipe_zero_readdile_thread_proc every two
and half second. This allows other processes to access the pipe without
deadlocking

Ref: nodejs/node#10836
  • Loading branch information
bzoz committed Sep 18, 2017
1 parent eaf25ae commit a8df782
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 5 deletions.
12 changes: 8 additions & 4 deletions common.gypi
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
{
'variables': {
'target_arch%': 'ia32', # set v8's target architecture
'host_arch%': 'ia32', # set v8's host architecture
'uv_library%': 'static_library', # allow override to 'shared_library' for DLL/.so builds
'msvs_multi_core_compile': '0', # we do enable multicore compiles, but not using the V8 way
'target_arch%': 'ia32', # set v8's target architecture
'host_arch%': 'ia32', # set v8's host architecture
'uv_library%': 'static_library', # allow override to 'shared_library' for DLL/.so builds
'uv_enable_interrupter%': 'false', # enable pipe read interrupter (see [PR link])
'msvs_multi_core_compile': '0', # we do enable multicore compiles, but not using the V8 way
},

'target_defaults': {
Expand Down Expand Up @@ -208,6 +209,9 @@
# pull in V8's postmortem metadata
'ldflags': [ '-Wl,-z,allextract' ]
}],
['uv_enable_interrupter=="true"', {
'defines': ['UV_USE_PIPE_INTERRUPTER']
}],
],
},
}
3 changes: 3 additions & 0 deletions gyp_uv.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ def run_gyp(args):
if not any(a.startswith('-Duv_library=') for a in args):
args.append('-Duv_library=static_library')

if not any(a.startswith('-Duv_enable_interrupter=') for a in args):
args.append('-Duv_enable_interrupter=false')

# Some platforms (OpenBSD for example) don't have multiprocessing.synchronize
# so gyp must be run with --no-parallel
if not gyp_parallel_support:
Expand Down
111 changes: 111 additions & 0 deletions src/win/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ struct uv__ipc_queue_item_s {
/* A zero-size buffer for use by uv_pipe_read */
static char uv_zero_[] = "";

#ifdef UV_USE_PIPE_INTERRUPTER
/* To prevent deadlocks we will interrupt synchronous pipe read every 2.5 s. */
#define PIPE_ZERO_READ_INTERRUPT_INTERVAL 2500
#endif

/* Null uv_buf_t */
static const uv_buf_t uv_null_buf_ = { 0, NULL };

Expand Down Expand Up @@ -76,6 +81,15 @@ typedef struct {
uv__ipc_socket_info_ex socket_info_ex;
} uv_ipc_frame_uv_stream;

#ifdef UV_USE_PIPE_INTERRUPTER
/* Parameter for uv__pipe_readfile_interrupter */
typedef struct {
volatile HANDLE thread;
HANDLE thread_mutex;
HANDLE completed_event;
} uv__readfile_interrupter_t;
#endif

static void eof_timer_init(uv_pipe_t* pipe);
static void eof_timer_start(uv_pipe_t* pipe);
static void eof_timer_stop(uv_pipe_t* pipe);
Expand Down Expand Up @@ -942,6 +956,89 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
}


#ifdef UV_USE_PIPE_INTERRUPTER
static DWORD WINAPI uv__readfile_interrupter_thread(void* param) {
uv__readfile_interrupter_t* interrupter;
int r;

interrupter = param;
for (;;) {
r = WaitForSingleObject(interrupter->completed_event,
PIPE_ZERO_READ_INTERRUPT_INTERVAL);
if (r == WAIT_TIMEOUT) {
/* ReadFile thread is active - interrupt ReadFile to give other */
/* processes a chance to act upon pipe */
WaitForSingleObject(interrupter->thread_mutex, INFINITE);
if (interrupter->thread != NULL) {
pCancelSynchronousIo(interrupter->thread);
SwitchToThread();
}
ReleaseMutex(interrupter->thread_mutex);
} else if (r == WAIT_OBJECT_0) {
/* ReadFile thread has terminated - clean up the handles and exit */
CloseHandle(interrupter->completed_event);
CloseHandle(interrupter->thread_mutex);
uv__free(interrupter);
return 0;
}
}
}


static uv__readfile_interrupter_t* uv__start_readfile_interrupter(void) {
HANDLE thread_handle;
uv__readfile_interrupter_t* interrupter;

interrupter = uv__malloc(sizeof(*interrupter));
if (interrupter == NULL)
return NULL;

if (!DuplicateHandle(GetCurrentProcess(),
GetCurrentThread(),
GetCurrentProcess(),
&thread_handle,
0,
FALSE,
DUPLICATE_SAME_ACCESS))
goto failed_duplicate;

interrupter->thread = thread_handle;
interrupter->thread_mutex = CreateMutex(NULL, FALSE, NULL);
if (interrupter->thread_mutex == NULL)
goto failed_mutex;

interrupter->completed_event = CreateEvent(NULL, FALSE, FALSE, NULL);
if (interrupter->completed_event == NULL)
goto failed_event;

if (!QueueUserWorkItem(uv__readfile_interrupter_thread,
interrupter,
WT_EXECUTEINLONGTHREAD))
goto failed_queue;

return interrupter;

failed_queue:
CloseHandle(interrupter->completed_event);
failed_event:
CloseHandle(interrupter->thread_mutex);
failed_mutex:
CloseHandle(interrupter->thread);
failed_duplicate:
uv__free(interrupter);
return NULL;
}

static void uv__stop_readfile_interrupter(uv__readfile_interrupter_t* interrupter) {
WaitForSingleObject(interrupter->thread_mutex, INFINITE);
CloseHandle(interrupter->thread);
interrupter->thread = NULL;
ReleaseMutex(interrupter->thread_mutex);
SetEvent(interrupter->completed_event);
}
#endif


static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
int result;
DWORD bytes;
Expand All @@ -951,6 +1048,10 @@ static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
HANDLE hThread = NULL;
DWORD err;
uv_mutex_t *m = &handle->pipe.conn.readfile_mutex;
#ifdef UV_USE_PIPE_INTERRUPTER
uv__readfile_interrupter_t* interrupter;
interrupter = NULL;
#endif

assert(req != NULL);
assert(req->type == UV_READ);
Expand All @@ -966,6 +1067,9 @@ static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
hThread = NULL;
}
uv_mutex_unlock(m);
#ifdef UV_USE_PIPE_INTERRUPTER
interrupter = uv__start_readfile_interrupter();
#endif
}
restart_readfile:
if (handle->flags & UV_HANDLE_READING) {
Expand All @@ -985,6 +1089,10 @@ static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
uv_mutex_lock(m);
handle->pipe.conn.readfile_thread = hThread;
uv_mutex_unlock(m);
#ifdef UV_USE_PIPE_INTERRUPTER
/* Give some time for other processes to wake before restarting. */
Sleep(1);
#endif
goto restart_readfile;
} else {
result = 1; /* successfully stopped reading */
Expand All @@ -1010,6 +1118,9 @@ static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
}

POST_COMPLETION_FOR_REQ(loop, req);
#ifdef UV_USE_PIPE_INTERRUPTER
uv__stop_readfile_interrupter(interrupter);
#endif
return 0;
}

Expand Down
6 changes: 6 additions & 0 deletions test/test-list.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ TEST_DECLARE (pipe_ref4)
TEST_DECLARE (pipe_close_stdout_read_stdin)
#endif
TEST_DECLARE (pipe_set_non_blocking)
#if defined(_WIN32) && defined(UV_USE_PIPE_INTERRUPTER)
TEST_DECLARE (pipe_open_read_pipe)
#endif
TEST_DECLARE (process_ref)
TEST_DECLARE (has_ref)
TEST_DECLARE (active)
Expand Down Expand Up @@ -437,6 +440,9 @@ TASK_LIST_START
TEST_ENTRY (pipe_close_stdout_read_stdin)
#endif
TEST_ENTRY (pipe_set_non_blocking)
#if defined(_WIN32) && defined(UV_USE_PIPE_INTERRUPTER)
TEST_ENTRY_CUSTOM (pipe_open_read_pipe, 0, 0, 10000)
#endif
TEST_ENTRY (tty)
#ifdef _WIN32
TEST_ENTRY (tty_raw)
Expand Down
84 changes: 84 additions & 0 deletions test/test-pipe-open-read-pipe.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/* Copyright libuv project contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

#include "uv.h"
#include "task.h"

#if !defined(_WIN32)
TEST_IMPL(pipe_open_read_pipe) {
RETURN_SKIP("Test only for Windows.");
}
#elif !defined(UV_USE_PIPE_INTERRUPTER)
TEST_IMPL(pipe_open_read_pipe) {
RETURN_SKIP("Test only avaiable with UV_USE_PIPE_INTERRUPTER enabled.");
}
#else

void alloc_cb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = malloc(suggested_size);
buf->len = suggested_size;
}

void read_cb(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
}

void pipe_read_thread_proc(void* arg) {
uv_pipe_t* pipe;
pipe = arg;
uv_read_start((uv_stream_t*) pipe, alloc_cb, read_cb);
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
FATAL("loop should not exit");
}

TEST_IMPL(pipe_open_read_pipe) {
int r, pipe_fd;
uv_thread_t pipe_read_thread;
uv_pipe_t uv_pipe, uv_reopen_pipe;
uv_loop_t test_loop;
HANDLE stdin_read_pipe, stdin_write_pipe;
SECURITY_ATTRIBUTES sa_attr;

sa_attr.nLength = sizeof(sa_attr);
sa_attr.bInheritHandle = TRUE;
sa_attr.lpSecurityDescriptor = NULL;
r = CreatePipe(&stdin_read_pipe, &stdin_write_pipe, &sa_attr, 0);
ASSERT(r != 0);

r = uv_pipe_init(uv_default_loop(), &uv_pipe, 0);
ASSERT(r == 0);
pipe_fd = _open_osfhandle((intptr_t) stdin_read_pipe, 0);
r = uv_pipe_open(&uv_pipe, pipe_fd);
ASSERT(r == 0);

r = uv_thread_create(&pipe_read_thread, pipe_read_thread_proc, &uv_pipe);
ASSERT(r == 0);

/* Give uv_run some time to start */
uv_sleep(250);
/* Try to access the pipe again, in different loop */
r = uv_loop_init(&test_loop);
ASSERT(r == 0);
r = uv_pipe_init(&test_loop, &uv_reopen_pipe, 0);
ASSERT(r == 0);
r = uv_pipe_open(&uv_reopen_pipe, pipe_fd);
return TEST_OK;
}
#endif
1 change: 1 addition & 0 deletions uv.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@
'test/test-pipe-server-close.c',
'test/test-pipe-close-stdout-read-stdin.c',
'test/test-pipe-set-non-blocking.c',
'test/test-pipe-open-read-pipe.c',
'test/test-platform-output.c',
'test/test-poll.c',
'test/test-poll-close.c',
Expand Down
4 changes: 3 additions & 1 deletion vcbuild.bat
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ set run=
set vs_toolset=x86
set msbuild_platform=WIN32
set library=static_library
set enable_interrupter=false

:next-arg
if "%1"=="" goto args-done
Expand All @@ -38,6 +39,7 @@ if /i "%1"=="ia32" set target_arch=ia32&set msbuild_platform=WIN32&set v
if /i "%1"=="x64" set target_arch=x64&set msbuild_platform=x64&set vs_toolset=x64&goto arg-ok
if /i "%1"=="shared" set library=shared_library&goto arg-ok
if /i "%1"=="static" set library=static_library&goto arg-ok
if /i "%1"=="enable-interrupter" set enable_interrupter=true&goto arg-ok
:arg-ok
shift
goto next-arg
Expand Down Expand Up @@ -138,7 +140,7 @@ exit /b 1

:have_gyp
if not defined PYTHON set PYTHON=python
"%PYTHON%" gyp_uv.py -Dtarget_arch=%target_arch% -Duv_library=%library%
"%PYTHON%" gyp_uv.py -Dtarget_arch=%target_arch% -Duv_library=%library% -Duv_enable_interrupter=%enable_interrupter%
if errorlevel 1 goto create-msvs-files-failed
if not exist uv.sln goto create-msvs-files-failed
echo Project files generated.
Expand Down

0 comments on commit a8df782

Please sign in to comment.