|
18 | 18 |
|
19 | 19 | #include "ts_lua_util.h" |
20 | 20 |
|
| 21 | +static int ts_lua_client_handler(TSCont contp, ts_lua_http_transform_ctx *transform_ctx, TSEvent event, int n); |
21 | 22 | static int ts_lua_transform_handler(TSCont contp, ts_lua_http_transform_ctx *transform_ctx, TSEvent event, int n); |
22 | 23 |
|
| 24 | +int |
| 25 | +ts_lua_client_entry(TSCont contp, TSEvent ev, void *edata) |
| 26 | +{ |
| 27 | + int n, event; |
| 28 | + TSVIO input_vio; |
| 29 | + ts_lua_http_transform_ctx *transform_ctx; |
| 30 | + |
| 31 | + event = (int)ev; |
| 32 | + transform_ctx = (ts_lua_http_transform_ctx *)TSContDataGet(contp); |
| 33 | + |
| 34 | + n = 0; |
| 35 | + |
| 36 | + switch (event) { |
| 37 | + case TS_EVENT_ERROR: |
| 38 | + input_vio = TSVConnWriteVIOGet(contp); |
| 39 | + TSContCall(TSVIOContGet(input_vio), TS_EVENT_ERROR, input_vio); |
| 40 | + break; |
| 41 | + |
| 42 | + case TS_EVENT_VCONN_WRITE_COMPLETE: |
| 43 | + TSDebug(TS_LUA_DEBUG_TAG, "[%s] received TS_EVENT_VCONN_WRITE_COMPLETE", __FUNCTION__); |
| 44 | + break; |
| 45 | + |
| 46 | + case TS_LUA_EVENT_COROUTINE_CONT: |
| 47 | + n = (intptr_t)edata; |
| 48 | + /* FALL THROUGH */ |
| 49 | + case TS_EVENT_VCONN_WRITE_READY: |
| 50 | + default: |
| 51 | + ts_lua_client_handler(contp, transform_ctx, event, n); |
| 52 | + break; |
| 53 | + } |
| 54 | + |
| 55 | + return 0; |
| 56 | +} |
| 57 | + |
| 58 | +static int |
| 59 | +ts_lua_client_handler(TSCont contp, ts_lua_http_transform_ctx *transform_ctx, TSEvent event, int n) |
| 60 | +{ |
| 61 | + TSVIO input_vio; |
| 62 | + TSIOBufferReader input_reader; |
| 63 | + TSIOBufferBlock blk; |
| 64 | + int64_t toread, towrite, blk_len, upstream_done, input_avail, input_wm_bytes; |
| 65 | + const char *start; |
| 66 | + int ret, eos, rc, top, empty_input; |
| 67 | + ts_lua_coroutine *crt; |
| 68 | + ts_lua_cont_info *ci; |
| 69 | + |
| 70 | + lua_State *L; |
| 71 | + TSMutex mtxp; |
| 72 | + |
| 73 | + ci = &transform_ctx->cinfo; |
| 74 | + crt = &ci->routine; |
| 75 | + |
| 76 | + mtxp = crt->mctx->mutexp; |
| 77 | + L = crt->lua; |
| 78 | + |
| 79 | + input_vio = TSVConnWriteVIOGet(contp); |
| 80 | + |
| 81 | + empty_input = 0; |
| 82 | + if (!TSVIOBufferGet(input_vio)) { |
| 83 | + TSDebug(TS_LUA_DEBUG_TAG, "[%s] no input VIO and output VIO", __FUNCTION__); |
| 84 | + empty_input = 1; |
| 85 | + } else { // input VIO exists |
| 86 | + input_wm_bytes = TSIOBufferWaterMarkGet(TSVIOBufferGet(input_vio)); |
| 87 | + if (transform_ctx->upstream_watermark_bytes >= 0 && transform_ctx->upstream_watermark_bytes != input_wm_bytes) { |
| 88 | + TSDebug(TS_LUA_DEBUG_TAG, "[%s] Setting input_vio watermark to %" PRId64 " bytes", __FUNCTION__, |
| 89 | + transform_ctx->upstream_watermark_bytes); |
| 90 | + TSIOBufferWaterMarkSet(TSVIOBufferGet(input_vio), transform_ctx->upstream_watermark_bytes); |
| 91 | + } |
| 92 | + } |
| 93 | + |
| 94 | + if (empty_input == 0) { |
| 95 | + input_reader = TSVIOReaderGet(input_vio); |
| 96 | + } |
| 97 | + |
| 98 | + if (!transform_ctx->output.buffer) { |
| 99 | + transform_ctx->output.buffer = TSIOBufferCreate(); |
| 100 | + transform_ctx->output.reader = TSIOBufferReaderAlloc(transform_ctx->output.buffer); |
| 101 | + |
| 102 | + transform_ctx->reserved.buffer = TSIOBufferCreate(); |
| 103 | + transform_ctx->reserved.reader = TSIOBufferReaderAlloc(transform_ctx->reserved.buffer); |
| 104 | + |
| 105 | + if (empty_input == 0) { |
| 106 | + transform_ctx->upstream_bytes = TSVIONBytesGet(input_vio); |
| 107 | + } else { |
| 108 | + transform_ctx->upstream_bytes = 0; |
| 109 | + } |
| 110 | + |
| 111 | + transform_ctx->downstream_bytes = INT64_MAX; |
| 112 | + } |
| 113 | + |
| 114 | + if (empty_input == 0) { |
| 115 | + input_avail = TSIOBufferReaderAvail(input_reader); |
| 116 | + upstream_done = TSVIONDoneGet(input_vio); |
| 117 | + toread = TSVIONTodoGet(input_vio); |
| 118 | + |
| 119 | + if (toread <= input_avail) { // upstream finished |
| 120 | + eos = 1; |
| 121 | + } else { |
| 122 | + eos = 0; |
| 123 | + } |
| 124 | + } else { |
| 125 | + input_avail = 0; |
| 126 | + upstream_done = 0; |
| 127 | + toread = 0; |
| 128 | + eos = 1; |
| 129 | + } |
| 130 | + |
| 131 | + if (input_avail > 0) { |
| 132 | + // move to the reserved.buffer |
| 133 | + TSIOBufferCopy(transform_ctx->reserved.buffer, input_reader, input_avail, 0); |
| 134 | + |
| 135 | + // reset input |
| 136 | + TSIOBufferReaderConsume(input_reader, input_avail); |
| 137 | + TSVIONDoneSet(input_vio, upstream_done + input_avail); |
| 138 | + } |
| 139 | + |
| 140 | + if (empty_input == 0) { |
| 141 | + towrite = TSIOBufferReaderAvail(transform_ctx->reserved.reader); |
| 142 | + } else { |
| 143 | + towrite = 0; |
| 144 | + } |
| 145 | + |
| 146 | + TSMutexLock(mtxp); |
| 147 | + ts_lua_set_cont_info(L, ci); |
| 148 | + |
| 149 | + do { |
| 150 | + if (event == TS_LUA_EVENT_COROUTINE_CONT) { |
| 151 | + event = 0; |
| 152 | + goto launch; |
| 153 | + } else { |
| 154 | + n = 2; |
| 155 | + } |
| 156 | + |
| 157 | + if (towrite == 0 && empty_input == 0) { |
| 158 | + break; |
| 159 | + } |
| 160 | + |
| 161 | + // additional condition for client |
| 162 | + if (towrite == 0) { |
| 163 | + break; |
| 164 | + } |
| 165 | + |
| 166 | + if (empty_input == 0) { |
| 167 | + blk = TSIOBufferReaderStart(transform_ctx->reserved.reader); |
| 168 | + start = TSIOBufferBlockReadStart(blk, transform_ctx->reserved.reader, &blk_len); |
| 169 | + |
| 170 | + lua_pushlightuserdata(L, transform_ctx); |
| 171 | + lua_rawget(L, LUA_GLOBALSINDEX); /* push function */ |
| 172 | + |
| 173 | + if (towrite > blk_len) { |
| 174 | + lua_pushlstring(L, start, (size_t)blk_len); |
| 175 | + towrite -= blk_len; |
| 176 | + TSIOBufferReaderConsume(transform_ctx->reserved.reader, blk_len); |
| 177 | + } else { |
| 178 | + lua_pushlstring(L, start, (size_t)towrite); |
| 179 | + TSIOBufferReaderConsume(transform_ctx->reserved.reader, towrite); |
| 180 | + towrite = 0; |
| 181 | + } |
| 182 | + |
| 183 | + if (!towrite && eos) { |
| 184 | + lua_pushinteger(L, 1); /* second param, data finished */ |
| 185 | + } else { |
| 186 | + lua_pushinteger(L, 0); /* second param, data not finish */ |
| 187 | + } |
| 188 | + } else { |
| 189 | + lua_pushlightuserdata(L, transform_ctx); |
| 190 | + lua_rawget(L, LUA_GLOBALSINDEX); /* push function */ |
| 191 | + |
| 192 | + lua_pushlstring(L, "", 0); |
| 193 | + lua_pushinteger(L, 1); /* second param, data finished */ |
| 194 | + } |
| 195 | + |
| 196 | + launch: |
| 197 | + rc = lua_resume(L, n); |
| 198 | + top = lua_gettop(L); |
| 199 | + |
| 200 | + switch (rc) { |
| 201 | + case LUA_YIELD: // coroutine yield |
| 202 | + TSMutexUnlock(mtxp); |
| 203 | + return 0; |
| 204 | + |
| 205 | + case 0: // coroutine success |
| 206 | + ret = 0; |
| 207 | + break; |
| 208 | + |
| 209 | + default: // coroutine failed |
| 210 | + TSError("[ts_lua][%s] lua_resume failed: %s", __FUNCTION__, lua_tostring(L, -1)); |
| 211 | + ret = 1; |
| 212 | + break; |
| 213 | + } |
| 214 | + |
| 215 | + lua_pop(L, top); |
| 216 | + |
| 217 | + if (ret || (eos && !towrite)) { // EOS |
| 218 | + eos = 1; |
| 219 | + break; |
| 220 | + } |
| 221 | + |
| 222 | + } while (towrite > 0); |
| 223 | + |
| 224 | + TSMutexUnlock(mtxp); |
| 225 | + |
| 226 | + if (toread > input_avail) { // upstream not finished. |
| 227 | + if (eos) { |
| 228 | + if (empty_input == 0) { |
| 229 | + TSContCall(TSVIOContGet(input_vio), TS_EVENT_VCONN_EOS, input_vio); |
| 230 | + } |
| 231 | + } else { |
| 232 | + if (empty_input == 0) { |
| 233 | + TSContCall(TSVIOContGet(input_vio), TS_EVENT_VCONN_WRITE_READY, input_vio); |
| 234 | + } |
| 235 | + } |
| 236 | + } else { // upstream is finished. |
| 237 | + if (empty_input == 0) { |
| 238 | + TSContCall(TSVIOContGet(input_vio), TS_EVENT_VCONN_WRITE_COMPLETE, input_vio); |
| 239 | + } |
| 240 | + } |
| 241 | + |
| 242 | + return 0; |
| 243 | +} |
| 244 | + |
23 | 245 | int |
24 | 246 | ts_lua_transform_entry(TSCont contp, TSEvent ev, void *edata) |
25 | 247 | { |
|
0 commit comments