Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subrequest streaming API #291

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
3f39535
Added an optional parameter for ngx.req.set_header and ngx.req.clear_…
Apr 24, 2013
886e27b
Merged with 0.8.1
May 8, 2013
1c5e5b9
Changed the replace_underscores parameter to a table of parameters (o…
May 8, 2013
f2e849a
Added an options table to clean_header as well.
May 8, 2013
e1281a2
Merge branch 'master' of https://github.com/chaoslawful/lua-nginx-module
Sep 29, 2013
d4d2988
Removed unneeded variable.
Sep 29, 2013
c111582
Added ngx.location.capture_stream, which enables a single subrequest …
Sep 30, 2013
b8d4209
Added all necessary safe guards to protect the subrequest streaming f…
Oct 3, 2013
e699fcc
Merge branch 'master' of https://github.com/chaoslawful/lua-nginx-module
Oct 3, 2013
e9e1dec
Reverted unnecessary changes.
Oct 3, 2013
24cb685
Reverted unnecessary changes.
Oct 3, 2013
2064ed9
Reverted unnecessary changes.
Oct 3, 2013
2112982
Made a correction in the safe guard of the capture streaming.
Oct 13, 2013
974112f
Merge remote-tracking branch 'original-repository/master'
Dec 10, 2013
c23fb51
Added another criteria for a subrequest ending (this may happen only …
Dec 10, 2013
48fdf3d
Merge branch 'master' of https://github.com/chaoslawful/lua-nginx-module
Dec 18, 2013
d0789e8
Merge branch master into subrequest-streaming
Dec 18, 2013
2165d50
Added lmcf initialization.
Dec 18, 2013
305f0a5
Merge branch 'master' of https://github.com/chaoslawful/lua-nginx-module
Dec 24, 2013
c449008
Merge branch 'master' into subrequest-streaming
Dec 24, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion src/ngx_http_lua_capturefilter.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include "ngx_http_lua_util.h"
#include "ngx_http_lua_exception.h"
#include "ngx_http_lua_subrequest.h"

#include "ngx_http_lua_contentby.h"

ngx_http_output_header_filter_pt ngx_http_lua_next_header_filter;
ngx_http_output_body_filter_pt ngx_http_lua_next_body_filter;
Expand Down Expand Up @@ -105,6 +105,15 @@ ngx_http_lua_capture_header_filter(ngx_http_request_t *r)
}


#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING
static ngx_int_t
_should_store_chain_link(ngx_chain_t *in)
{
return ((in != NULL) && ((in->buf->pos != in->buf->last)));
}
#endif


static ngx_int_t
ngx_http_lua_capture_body_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
Expand Down Expand Up @@ -163,6 +172,40 @@ ngx_http_lua_capture_body_filter(ngx_http_request_t *r, ngx_chain_t *in)
ctx->seen_last_for_subreq = 1;
}

#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING
if (pr_ctx->async_capture) {
/* In order to wake the parent up, we should call post and not discard
the buffer */
pr_ctx->current_subrequest = r; /* Required for wake up (?) */
pr_ctx->current_subrequest_ctx = ctx; /* Required for the buffer */

/* XXX: In some cases, pr_ctx->current_subrequest_buffer is being
cleaned by Nginx and buf gets the value 0x1... */
if (((pr_ctx->current_subrequest_buffer == NULL)
|| (pr_ctx->current_subrequest_buffer->buf == (void *) 1))
&& (_should_store_chain_link(in))) {
pr_ctx->current_subrequest_buffer = in;
}

/* TODO: Is this line needed? */
r->parent->write_event_handler = ngx_http_lua_content_wev_handler;

if (!eof) {
pr_ctx->wakeup_subrequest = 1;
/* On EOF, the post subrequest callback is called, and it handles
the setting of the resume handler. The parent request would be
woken up anyway by Nginx.
*/
if (ngx_http_post_request(r->parent, NULL) != NGX_OK) {
return NGX_ERROR;
}
return NGX_OK;
} else {
pr_ctx->wakeup_subrequest = 0;
}
}
#endif

ngx_http_lua_discard_bufs(r->pool, in);

return NGX_OK;
Expand Down
18 changes: 18 additions & 0 deletions src/ngx_http_lua_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,19 @@ typedef struct ngx_http_lua_ctx_s {

ngx_int_t exit_code;

#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING
ngx_int_t async_capture;
ngx_http_request_t *current_subrequest;
struct ngx_http_lua_ctx_s *current_subrequest_ctx;
ngx_chain_t *current_subrequest_buffer;
ngx_int_t returned_headers;
#endif

ngx_http_lua_co_ctx_t *calling_coctx; /* co ctx for the caller to location.capture */

ngx_http_lua_co_ctx_t *req_body_reader_co_ctx; /* co ctx for the coroutine
reading the request
body */
ngx_http_lua_co_ctx_t *downstream_co_ctx; /* co ctx for the coroutine
reading the request body */

Expand Down Expand Up @@ -390,6 +403,11 @@ typedef struct ngx_http_lua_ctx_s {
unsigned headers_set:1; /* whether the user has set custom
response headers */

#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING
unsigned wakeup_subrequest:1;
unsigned subrequest_yield:1;
#endif

unsigned entered_rewrite_phase:1;
unsigned entered_access_phase:1;
unsigned entered_content_phase:1;
Expand Down
58 changes: 57 additions & 1 deletion src/ngx_http_lua_contentby.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@

static void ngx_http_lua_content_phase_post_read(ngx_http_request_t *r);

#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING
static ngx_int_t _is_chain_valid(ngx_chain_t * cl);
static ngx_int_t _is_last_chain_link(ngx_chain_t * cl);
static ngx_int_t _post_request_if_not_posted(ngx_http_request_t *r,
ngx_http_posted_request_t *pr);
#endif

ngx_int_t
ngx_http_lua_content_by_chunk(lua_State *L, ngx_http_request_t *r)
Expand Down Expand Up @@ -115,17 +121,67 @@ ngx_http_lua_content_by_chunk(lua_State *L, ngx_http_request_t *r)
}


#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING
static ngx_int_t
_post_request_if_not_posted(ngx_http_request_t *r, ngx_http_posted_request_t *pr)
{
ngx_http_posted_request_t *p;

/* Search request in the posted requests list, so that it would not be posted twice. */
for (p = r->main->posted_requests; p; p = p->next) {
if (p->request == r) {
return NGX_OK;
}
}

return ngx_http_post_request(r, pr);
}

static ngx_int_t
_is_chain_valid(ngx_chain_t * cl)
{
/* For some reason, sometimes when cl->buf is cleaned, 1 is assigned to it. */
return ((cl != NULL) && (cl->buf != NULL) && (cl->buf != (void *) 1));
}
static ngx_int_t
_is_last_chain_link(ngx_chain_t * cl)
{
/* last_in_chain is for subrequests. */
return cl->buf->last_in_chain || cl->buf->last_buf;
}
#endif

void
ngx_http_lua_content_wev_handler(ngx_http_request_t *r)
{
ngx_http_lua_ctx_t *ctx;
ngx_int_t rc;

ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
if (ctx == NULL) {
return;
}

(void) ctx->resume_handler(r);
rc = ctx->resume_handler(r);

if (rc == NGX_DONE) {
return;
}

#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING
if (ctx->current_subrequest && ctx->wakeup_subrequest) {
/* Make sure that the subrequest continues */
if (_post_request_if_not_posted(ctx->current_subrequest, NULL) != NGX_OK) {
ngx_http_lua_finalize_request(r, NGX_ERROR);
}
/* Don't try to discard the last buffer, as it will cause a NULL dereference... */
if (_is_chain_valid(ctx->current_subrequest_buffer) && (!_is_last_chain_link(ctx->current_subrequest_buffer))) {
ngx_http_lua_discard_bufs(ctx->current_subrequest->pool, ctx->current_subrequest_buffer);
}
ctx->current_subrequest_buffer = NULL;
ctx->wakeup_subrequest = 0;
}
#endif
}


Expand Down
Loading