From 347f8c1c328a09da11dca41f2dea757014c93a76 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sat, 9 Mar 2024 19:23:14 +0000 Subject: [PATCH 1/4] Changes to support parameterized queries --- src/backend/tcop/postgres.c | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 27e597d2c4..4665f45570 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -351,15 +351,17 @@ interactive_getc(void) #ifdef EMSCRIPTEN EM_ASYNC_JS(char *, await_query, (), { // Await a query from JS land - var event = new Module.Event("waiting"); - Module.eventTarget.dispatchEvent(event); + Module.eventTarget.dispatchEvent(new Module.Event("waiting")); var query = await new Promise((resolve, reject) => { Module.eventTarget.addEventListener("query", (e) => { resolve(e.detail); }, {once: true}); }); - var cstring_ptr = allocateUTF8(query); - return cstring_ptr; + // `query` is a Uint8Array containing the query in pg wire format + var bytes = query.length; + var ptr = _malloc(bytes); + Module.HEAPU8.set(query, ptr); + return ptr; }); #endif @@ -367,15 +369,18 @@ static int EmscriptenBackend(StringInfo inBuf) { char *query = await_query(); - char qtype = *query; // First character is qtype - int qlen = strlen(query); + char qtype = *query; // First byte is qtype + + int32 msgLen = *((int32 *)(query + 1)); // Next 4 bytes are message length + msgLen = pg_ntoh32(msgLen); + int dataLen = msgLen - 4; // The rest of the message is the data resetStringInfo(inBuf); - if (qlen > 1) - { - appendBinaryStringInfoNT(inBuf, query + 1, qlen - 1); - appendStringInfoChar(inBuf, (char) '\0'); - } + if (dataLen > 0) + { + // Append the data to the buffer + appendBinaryStringInfo(inBuf, query + 5, dataLen); + } free(query); From 3c14839c6abe8a36e28f24804e3c451bbd11b2be Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Tue, 12 Mar 2024 11:12:55 +0000 Subject: [PATCH 2/4] WIP Support for pg-protocol output --- src/backend/libpq/pqcomm.c | 100 ++++++++++++++++++++++++++++++++++++ src/backend/tcop/postgres.c | 4 +- 2 files changed, 102 insertions(+), 2 deletions(-) diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 89a5f901aa..2e3f302584 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -83,6 +83,10 @@ #include "utils/guc.h" #include "utils/memutils.h" +#ifdef EMSCRIPTEN +#include +#endif + /* * Cope with the various platform-specific ways to spell TCP keepalive socket * options. This doesn't cover Windows, which as usual does its own thing. @@ -149,11 +153,28 @@ static void socket_putmessage_noblock(char msgtype, const char *s, size_t len); static int internal_putbytes(const char *s, size_t len); static int internal_flush(void); +static void emscripten_comm_reset(void); +static int emscripten_flush(void); +static int emscripten_flush_if_writable(void); +static bool emscripten_is_send_pending(void); +static int emscripten_putmessage(char msgtype, const char *s, size_t len); +static void emscripten_putmessage_noblock(char msgtype, const char *s, size_t len); + #ifdef HAVE_UNIX_SOCKETS static int Lock_AF_UNIX(const char *unixSocketDir, const char *unixSocketPath); static int Setup_AF_UNIX(const char *sock_path); #endif /* HAVE_UNIX_SOCKETS */ +#ifdef EMSCRIPTEN +static const PQcommMethods PqCommSocketMethods = { + emscripten_comm_reset, + emscripten_flush, + emscripten_flush_if_writable, + emscripten_is_send_pending, + emscripten_putmessage, + emscripten_putmessage_noblock +}; +#else static const PQcommMethods PqCommSocketMethods = { socket_comm_reset, socket_flush, @@ -162,12 +183,91 @@ static const PQcommMethods PqCommSocketMethods = { socket_putmessage, socket_putmessage_noblock }; +#endif const PQcommMethods *PqCommMethods = &PqCommSocketMethods; WaitEventSet *FeBeWaitSet; +/* -------------------------------- + * Emscripten implementation + * -------------------------------- + */ + +#ifdef EMSCRIPTEN +EM_JS(void, emscripten_dispatch_result, (char *res, int len), { + // Dispatch the result to JS land + if (!Module.eventTarget) return; + var heapBytes = new Uint8Array(Module.HEAPU8.buffer, res, len); + var resultBytes = new Uint8Array(heapBytes); + Module.eventTarget.dispatchEvent(new Module.Event("result", { + detail: resultBytes + })); +}); +#endif + +static void emscripten_comm_reset(void) { + printf("emscripten_comm_reset"); +} + +static int emscripten_flush(void) { + printf("emscripten_flush"); + return 0; +} + +static int emscripten_flush_if_writable(void) { + printf("emscripten_flush_if_writable"); + return 0; +} + +static bool emscripten_is_send_pending(void) { + printf("emscripten_is_send_pending"); + return false; +} + +static int emscripten_putmessage(char msgtype, const char *s, size_t len) { + StringInfoData buf; + uint32 n32; + + Assert(msgtype != 0); + + if (PqCommBusy) + return 0; + PqCommBusy = true; + + // Initialize StringInfoData buffer + initStringInfo(&buf); + + // Append msgtype + appendStringInfoChar(&buf, msgtype); + + // Calculate message length (len + 4) and convert to network byte order + n32 = pg_hton32((uint32) (len + 4)); + // Append length + appendBinaryStringInfo(&buf, (char *) &n32, 4); + + // Append actual message + appendBinaryStringInfo(&buf, s, len); + + // Dispatch the result + emscripten_dispatch_result(buf.data, buf.len); + + // Free StringInfoData buffer memory + pfree(buf.data); + + PqCommBusy = false; + return 0; + +fail: + PqCommBusy = false; + return EOF; +} + +static void emscripten_putmessage_noblock(char msgtype, const char *s, size_t len) { + emscripten_putmessage(msgtype, s, len); +} + /* -------------------------------- * pq_init - initialize libpq at backend startup * -------------------------------- diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 4665f45570..e9b695a34e 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -93,7 +93,7 @@ const char *debug_query_string; /* client-supplied query string */ /* Note: whereToSendOutput is initialized for the bootstrap/standalone case */ -CommandDest whereToSendOutput = DestDebugJson; +CommandDest whereToSendOutput = DestRemote; /* flag for logging end of session */ bool Log_disconnections = false; @@ -529,7 +529,7 @@ ReadCommand(StringInfo inBuf) int result; if (whereToSendOutput == DestRemote) - result = SocketBackend(inBuf); + result = EmscriptenBackend(inBuf); else if (whereToSendOutput == DestDebugJson) result = EmscriptenBackend(inBuf); else From e5e987b34f8ab93a04bf30d0000f041b9c25d9ed Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Tue, 12 Mar 2024 13:06:05 +0000 Subject: [PATCH 3/4] Buffer output --- src/backend/libpq/pqcomm.c | 70 ++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 34 deletions(-) diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 2e3f302584..44dc8f0ee4 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -195,6 +195,10 @@ WaitEventSet *FeBeWaitSet; * -------------------------------- */ +static StringInfoData emscripten_buffer; +static bool emscripten_buffer_is_initialized = false; +static bool emscripten_buffer_busy = false; + #ifdef EMSCRIPTEN EM_JS(void, emscripten_dispatch_result, (char *res, int len), { // Dispatch the result to JS land @@ -207,61 +211,59 @@ EM_JS(void, emscripten_dispatch_result, (char *res, int len), { }); #endif +static void emscripten_init_buffer(void) { + if (!emscripten_buffer_is_initialized) { + initStringInfo(&emscripten_buffer); + emscripten_buffer_is_initialized = true; + } +} + static void emscripten_comm_reset(void) { - printf("emscripten_comm_reset"); + if (emscripten_buffer_is_initialized) { + resetStringInfo(&emscripten_buffer); + } else { + emscripten_init_buffer(); + } } static int emscripten_flush(void) { - printf("emscripten_flush"); + if (emscripten_buffer.len > 0) { + emscripten_dispatch_result(emscripten_buffer.data, emscripten_buffer.len); + resetStringInfo(&emscripten_buffer); + } return 0; } static int emscripten_flush_if_writable(void) { - printf("emscripten_flush_if_writable"); + return emscripten_flush(); return 0; } static bool emscripten_is_send_pending(void) { - printf("emscripten_is_send_pending"); - return false; + return emscripten_buffer.len > 0; } static int emscripten_putmessage(char msgtype, const char *s, size_t len) { - StringInfoData buf; - uint32 n32; - - Assert(msgtype != 0); - - if (PqCommBusy) - return 0; - PqCommBusy = true; - - // Initialize StringInfoData buffer - initStringInfo(&buf); - - // Append msgtype - appendStringInfoChar(&buf, msgtype); - - // Calculate message length (len + 4) and convert to network byte order - n32 = pg_hton32((uint32) (len + 4)); - // Append length - appendBinaryStringInfo(&buf, (char *) &n32, 4); + if (emscripten_buffer_busy) + return 0; + emscripten_buffer_busy = true; - // Append actual message - appendBinaryStringInfo(&buf, s, len); + emscripten_init_buffer(); - // Dispatch the result - emscripten_dispatch_result(buf.data, buf.len); + uint32 n32; + Assert(msgtype != 0); - // Free StringInfoData buffer memory - pfree(buf.data); + appendStringInfoChar(&emscripten_buffer, msgtype); + n32 = pg_hton32((uint32) (len + 4)); + appendBinaryStringInfo(&emscripten_buffer, (char *) &n32, 4); + appendBinaryStringInfo(&emscripten_buffer, s, len); - PqCommBusy = false; - return 0; + emscripten_buffer_busy = false; + return 0; fail: - PqCommBusy = false; - return EOF; + emscripten_buffer_busy = false; + return EOF; } static void emscripten_putmessage_noblock(char msgtype, const char *s, size_t len) { From fb0ddfbc8326907972018783060fe423174da264 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sat, 16 Mar 2024 12:53:27 +0000 Subject: [PATCH 4/4] Flush output buffer on each message --- src/backend/libpq/pqcomm.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 44dc8f0ee4..724bd68598 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -259,6 +259,10 @@ static int emscripten_putmessage(char msgtype, const char *s, size_t len) { appendBinaryStringInfo(&emscripten_buffer, s, len); emscripten_buffer_busy = false; + + // Flush buffer on every message + emscripten_flush(); + return 0; fail: