diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 89a5f901aa0..724bd685987 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -83,6 +83,10 @@ #include "utils/guc.h" #include "utils/memutils.h" +#ifdef EMSCRIPTEN +#include +#endif + /* * Cope with the various platform-specific ways to spell TCP keepalive socket * options. This doesn't cover Windows, which as usual does its own thing. @@ -149,11 +153,28 @@ static void socket_putmessage_noblock(char msgtype, const char *s, size_t len); static int internal_putbytes(const char *s, size_t len); static int internal_flush(void); +static void emscripten_comm_reset(void); +static int emscripten_flush(void); +static int emscripten_flush_if_writable(void); +static bool emscripten_is_send_pending(void); +static int emscripten_putmessage(char msgtype, const char *s, size_t len); +static void emscripten_putmessage_noblock(char msgtype, const char *s, size_t len); + #ifdef HAVE_UNIX_SOCKETS static int Lock_AF_UNIX(const char *unixSocketDir, const char *unixSocketPath); static int Setup_AF_UNIX(const char *sock_path); #endif /* HAVE_UNIX_SOCKETS */ +#ifdef EMSCRIPTEN +static const PQcommMethods PqCommSocketMethods = { + emscripten_comm_reset, + emscripten_flush, + emscripten_flush_if_writable, + emscripten_is_send_pending, + emscripten_putmessage, + emscripten_putmessage_noblock +}; +#else static const PQcommMethods PqCommSocketMethods = { socket_comm_reset, socket_flush, @@ -162,12 +183,97 @@ static const PQcommMethods PqCommSocketMethods = { socket_putmessage, socket_putmessage_noblock }; +#endif const PQcommMethods *PqCommMethods = &PqCommSocketMethods; WaitEventSet *FeBeWaitSet; +/* -------------------------------- + * Emscripten implementation + * -------------------------------- + */ + +static StringInfoData emscripten_buffer; +static bool emscripten_buffer_is_initialized = false; +static bool emscripten_buffer_busy = false; + +#ifdef EMSCRIPTEN +EM_JS(void, emscripten_dispatch_result, (char *res, int len), { + // Dispatch the result to JS land + if (!Module.eventTarget) return; + var heapBytes = new Uint8Array(Module.HEAPU8.buffer, res, len); + var resultBytes = new Uint8Array(heapBytes); + Module.eventTarget.dispatchEvent(new Module.Event("result", { + detail: resultBytes + })); +}); +#endif + +static void emscripten_init_buffer(void) { + if (!emscripten_buffer_is_initialized) { + initStringInfo(&emscripten_buffer); + emscripten_buffer_is_initialized = true; + } +} + +static void emscripten_comm_reset(void) { + if (emscripten_buffer_is_initialized) { + resetStringInfo(&emscripten_buffer); + } else { + emscripten_init_buffer(); + } +} + +static int emscripten_flush(void) { + if (emscripten_buffer.len > 0) { + emscripten_dispatch_result(emscripten_buffer.data, emscripten_buffer.len); + resetStringInfo(&emscripten_buffer); + } + return 0; +} + +static int emscripten_flush_if_writable(void) { + return emscripten_flush(); + return 0; +} + +static bool emscripten_is_send_pending(void) { + return emscripten_buffer.len > 0; +} + +static int emscripten_putmessage(char msgtype, const char *s, size_t len) { + if (emscripten_buffer_busy) + return 0; + emscripten_buffer_busy = true; + + emscripten_init_buffer(); + + uint32 n32; + Assert(msgtype != 0); + + appendStringInfoChar(&emscripten_buffer, msgtype); + n32 = pg_hton32((uint32) (len + 4)); + appendBinaryStringInfo(&emscripten_buffer, (char *) &n32, 4); + appendBinaryStringInfo(&emscripten_buffer, s, len); + + emscripten_buffer_busy = false; + + // Flush buffer on every message + emscripten_flush(); + + return 0; + +fail: + emscripten_buffer_busy = false; + return EOF; +} + +static void emscripten_putmessage_noblock(char msgtype, const char *s, size_t len) { + emscripten_putmessage(msgtype, s, len); +} + /* -------------------------------- * pq_init - initialize libpq at backend startup * -------------------------------- diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 27e597d2c41..e9b695a34ea 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -93,7 +93,7 @@ const char *debug_query_string; /* client-supplied query string */ /* Note: whereToSendOutput is initialized for the bootstrap/standalone case */ -CommandDest whereToSendOutput = DestDebugJson; +CommandDest whereToSendOutput = DestRemote; /* flag for logging end of session */ bool Log_disconnections = false; @@ -351,15 +351,17 @@ interactive_getc(void) #ifdef EMSCRIPTEN EM_ASYNC_JS(char *, await_query, (), { // Await a query from JS land - var event = new Module.Event("waiting"); - Module.eventTarget.dispatchEvent(event); + Module.eventTarget.dispatchEvent(new Module.Event("waiting")); var query = await new Promise((resolve, reject) => { Module.eventTarget.addEventListener("query", (e) => { resolve(e.detail); }, {once: true}); }); - var cstring_ptr = allocateUTF8(query); - return cstring_ptr; + // `query` is a Uint8Array containing the query in pg wire format + var bytes = query.length; + var ptr = _malloc(bytes); + Module.HEAPU8.set(query, ptr); + return ptr; }); #endif @@ -367,15 +369,18 @@ static int EmscriptenBackend(StringInfo inBuf) { char *query = await_query(); - char qtype = *query; // First character is qtype - int qlen = strlen(query); + char qtype = *query; // First byte is qtype + + int32 msgLen = *((int32 *)(query + 1)); // Next 4 bytes are message length + msgLen = pg_ntoh32(msgLen); + int dataLen = msgLen - 4; // The rest of the message is the data resetStringInfo(inBuf); - if (qlen > 1) - { - appendBinaryStringInfoNT(inBuf, query + 1, qlen - 1); - appendStringInfoChar(inBuf, (char) '\0'); - } + if (dataLen > 0) + { + // Append the data to the buffer + appendBinaryStringInfo(inBuf, query + 5, dataLen); + } free(query); @@ -524,7 +529,7 @@ ReadCommand(StringInfo inBuf) int result; if (whereToSendOutput == DestRemote) - result = SocketBackend(inBuf); + result = EmscriptenBackend(inBuf); else if (whereToSendOutput == DestDebugJson) result = EmscriptenBackend(inBuf); else