diff --git a/config b/config index 84f717218f..c7c750e549 100644 --- a/config +++ b/config @@ -219,6 +219,7 @@ NGX_ADDON_SRCS="$NGX_ADDON_SRCS \ $ngx_addon_dir/src/ngx_http_lua_initby.c \ $ngx_addon_dir/src/ngx_http_lua_socket_udp.c \ $ngx_addon_dir/src/ngx_http_lua_req_method.c \ + $ngx_addon_dir/src/ngx_http_lua_req_keepalive.c \ $ngx_addon_dir/src/ngx_http_lua_phase.c \ $ngx_addon_dir/src/ngx_http_lua_uthread.c \ $ngx_addon_dir/src/ngx_http_lua_timer.c \ @@ -269,6 +270,7 @@ NGX_ADDON_DEPS="$NGX_ADDON_DEPS \ $ngx_addon_dir/src/ngx_http_lua_initby.h \ $ngx_addon_dir/src/ngx_http_lua_socket_udp.h \ $ngx_addon_dir/src/ngx_http_lua_req_method.h \ + $ngx_addon_dir/src/ngx_http_lua_req_keepalive.h \ $ngx_addon_dir/src/ngx_http_lua_phase.h \ $ngx_addon_dir/src/ngx_http_lua_probe.h \ $ngx_addon_dir/src/ngx_http_lua_uthread.h \ diff --git a/src/ngx_http_lua_capturefilter.c b/src/ngx_http_lua_capturefilter.c index 9a945ff80d..01243e2c6e 100644 --- a/src/ngx_http_lua_capturefilter.c +++ b/src/ngx_http_lua_capturefilter.c @@ -16,7 +16,7 @@ #include "ngx_http_lua_util.h" #include "ngx_http_lua_exception.h" #include "ngx_http_lua_subrequest.h" - +#include "ngx_http_lua_contentby.h" ngx_http_output_header_filter_pt ngx_http_lua_next_header_filter; ngx_http_output_body_filter_pt ngx_http_lua_next_body_filter; @@ -104,6 +104,15 @@ ngx_http_lua_capture_header_filter(ngx_http_request_t *r) } +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING +static ngx_int_t +_should_store_chain_link(ngx_chain_t *in) +{ + return ((in != NULL) && ((in->buf->pos != in->buf->last))); +} +#endif + + static ngx_int_t ngx_http_lua_capture_body_filter(ngx_http_request_t *r, ngx_chain_t *in) { @@ -162,6 +171,40 @@ ngx_http_lua_capture_body_filter(ngx_http_request_t *r, ngx_chain_t *in) ctx->seen_last_for_subreq = 1; } +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + if (pr_ctx->async_capture) { + /* In order to wake the parent up, we should call post and not discard + the buffer */ + pr_ctx->current_subrequest = r; /* Required for wake up (?) */ + pr_ctx->current_subrequest_ctx = ctx; /* Required for the buffer */ + + /* XXX: In some cases, pr_ctx->current_subrequest_buffer is being + cleaned by Nginx and buf gets the value 0x1... */ + if (((pr_ctx->current_subrequest_buffer == NULL) + || (pr_ctx->current_subrequest_buffer->buf == (void *) 1)) + && (_should_store_chain_link(in))) { + pr_ctx->current_subrequest_buffer = in; + } + + /* TODO: Is this line needed? */ + r->parent->write_event_handler = ngx_http_lua_content_wev_handler; + + if (!eof) { + pr_ctx->wakeup_subrequest = 1; + /* On EOF, the post subrequest callback is called, and it handles + the setting of the resume handler. The parent request would be + woken up anyway by Nginx. + */ + if (ngx_http_post_request(r->parent, NULL) != NGX_OK) { + return NGX_ERROR; + } + return NGX_OK; + } else { + pr_ctx->wakeup_subrequest = 0; + } + } +#endif + ngx_http_lua_discard_bufs(r->pool, in); return NGX_OK; diff --git a/src/ngx_http_lua_common.h b/src/ngx_http_lua_common.h index b03d7bb994..208cc4367d 100644 --- a/src/ngx_http_lua_common.h +++ b/src/ngx_http_lua_common.h @@ -201,6 +201,16 @@ typedef struct { ngx_flag_t transform_underscores_in_resp_headers; ngx_flag_t log_socket_errors; ngx_flag_t check_client_abort; + + ngx_flag_t enforce_content_type; + ngx_flag_t correct_location_header; + +#if (NGX_HTTP_SSL) + ngx_ssl_t *ssl; + ngx_flag_t ssl_verify; + ngx_uint_t ssl_verify_depth; + ngx_str_t ssl_trusted_certificate; +#endif } ngx_http_lua_loc_conf_t; @@ -330,6 +340,19 @@ typedef struct ngx_http_lua_ctx_s { ngx_int_t exit_code; +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + ngx_int_t async_capture; + ngx_http_request_t *current_subrequest; + struct ngx_http_lua_ctx_s *current_subrequest_ctx; + ngx_chain_t *current_subrequest_buffer; + ngx_int_t returned_headers; +#endif + + ngx_http_lua_co_ctx_t *calling_coctx; /* co ctx for the caller to location.capture */ + + ngx_http_lua_co_ctx_t *req_body_reader_co_ctx; /* co ctx for the coroutine + reading the request + body */ ngx_http_lua_co_ctx_t *downstream_co_ctx; /* co ctx for the coroutine reading the request body */ @@ -371,6 +394,11 @@ typedef struct ngx_http_lua_ctx_s { unsigned headers_set:1; /* whether the user has set custom response headers */ +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + unsigned wakeup_subrequest:1; + unsigned subrequest_yield:1; +#endif + unsigned entered_rewrite_phase:1; unsigned entered_access_phase:1; unsigned entered_content_phase:1; diff --git a/src/ngx_http_lua_contentby.c b/src/ngx_http_lua_contentby.c index c635ab72cb..1168040ed5 100644 --- a/src/ngx_http_lua_contentby.c +++ b/src/ngx_http_lua_contentby.c @@ -20,6 +20,12 @@ static void ngx_http_lua_content_phase_post_read(ngx_http_request_t *r); +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING +static ngx_int_t _is_chain_valid(ngx_chain_t * cl); +static ngx_int_t _is_last_chain_link(ngx_chain_t * cl); +static ngx_int_t _post_request_if_not_posted(ngx_http_request_t *r, + ngx_http_posted_request_t *pr); +#endif ngx_int_t ngx_http_lua_content_by_chunk(lua_State *L, ngx_http_request_t *r) @@ -115,17 +121,67 @@ ngx_http_lua_content_by_chunk(lua_State *L, ngx_http_request_t *r) } +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING +static ngx_int_t +_post_request_if_not_posted(ngx_http_request_t *r, ngx_http_posted_request_t *pr) +{ + ngx_http_posted_request_t *p; + + /* Search request in the posted requests list, so that it would not be posted twice. */ + for (p = r->main->posted_requests; p; p = p->next) { + if (p->request == r) { + return NGX_OK; + } + } + + return ngx_http_post_request(r, pr); +} + +static ngx_int_t +_is_chain_valid(ngx_chain_t * cl) +{ + /* For some reason, sometimes when cl->buf is cleaned, 1 is assigned to it. */ + return ((cl != NULL) && (cl->buf != NULL) && (cl->buf != (void *) 1)); +} +static ngx_int_t +_is_last_chain_link(ngx_chain_t * cl) +{ + /* last_in_chain is for subrequests. */ + return cl->buf->last_in_chain || cl->buf->last_buf; +} +#endif + void ngx_http_lua_content_wev_handler(ngx_http_request_t *r) { ngx_http_lua_ctx_t *ctx; + ngx_int_t rc; ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); if (ctx == NULL) { return; } - (void) ctx->resume_handler(r); + rc = ctx->resume_handler(r); + + if (rc == NGX_DONE) { + return; + } + +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + if (ctx->current_subrequest && ctx->wakeup_subrequest) { + /* Make sure that the subrequest continues */ + if (_post_request_if_not_posted(ctx->current_subrequest, NULL) != NGX_OK) { + ngx_http_lua_finalize_request(r, NGX_ERROR); + } + /* Don't try to discard the last buffer, as it will cause a NULL dereference... */ + if (_is_chain_valid(ctx->current_subrequest_buffer) && (!_is_last_chain_link(ctx->current_subrequest_buffer))) { + ngx_http_lua_discard_bufs(ctx->current_subrequest->pool, ctx->current_subrequest_buffer); + } + ctx->current_subrequest_buffer = NULL; + ctx->wakeup_subrequest = 0; + } +#endif } diff --git a/src/ngx_http_lua_headers.c b/src/ngx_http_lua_headers.c index 3afbd7ba76..fda5390c10 100644 --- a/src/ngx_http_lua_headers.c +++ b/src/ngx_http_lua_headers.c @@ -485,7 +485,7 @@ ngx_http_lua_ngx_header_set(lua_State *L) } } - if (!ctx->headers_set) { + if (!ctx->headers_set && llcf->enforce_content_type) { rc = ngx_http_set_content_type(r); if (rc != NGX_OK) { return luaL_error(L, @@ -531,6 +531,7 @@ ngx_http_lua_ngx_header_set(lua_State *L) } } + ctx->headers_set = 1; return 0; } @@ -555,6 +556,7 @@ ngx_http_lua_ngx_header_set(lua_State *L) key.data, (int) rc); } + ctx->headers_set = 1; return 0; } @@ -562,12 +564,20 @@ ngx_http_lua_ngx_header_set(lua_State *L) static int ngx_http_lua_ngx_req_header_clear(lua_State *L) { - if (lua_gettop(L) != 1) { - return luaL_error(L, "expecting one arguments, but seen %d", + ngx_uint_t n; + n = lua_gettop(L); + if ((n != 1) && (n != 2)) { + return luaL_error(L, "expecting one or two arguments, but seen %d", lua_gettop(L)); } - lua_pushnil(L); + if (n == 2) { + lua_pushnil(L); + /* Top element is now 3, replace it with element 3 */ + lua_insert(L, 2); + } else { + lua_pushnil(L); + } return ngx_http_lua_ngx_req_header_set_helper(L); } @@ -576,7 +586,7 @@ ngx_http_lua_ngx_req_header_clear(lua_State *L) static int ngx_http_lua_ngx_req_header_set(lua_State *L) { - if (lua_gettop(L) != 2) { + if ((lua_gettop(L) != 2) && (lua_gettop(L) != 3)) { return luaL_error(L, "expecting two arguments, but seen %d", lua_gettop(L)); } @@ -614,9 +624,10 @@ ngx_http_lua_ngx_req_header_set_helper(lua_State *L) #if 0 /* replace "_" with "-" */ - for (i = 0; i < len; i++) { - if (p[i] == '_') { - p[i] = '-'; + for (i = 0; i < len; i++) { + if (p[i] == '_') { + p[i] = '-'; + } } } #endif diff --git a/src/ngx_http_lua_headers_out.c b/src/ngx_http_lua_headers_out.c index 2ee10d1ffb..4f0daa1795 100644 --- a/src/ngx_http_lua_headers_out.c +++ b/src/ngx_http_lua_headers_out.c @@ -434,6 +434,9 @@ ngx_http_lua_set_output_header(ngx_http_request_t *r, ngx_str_t key, ngx_http_lua_header_val_t hv; ngx_http_lua_set_header_t *handlers = ngx_http_lua_set_handlers; ngx_uint_t i; + ngx_http_lua_loc_conf_t *llcf; + + llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); dd("set header value: %.*s", (int) value.len, value.data); @@ -455,6 +458,17 @@ ngx_http_lua_set_output_header(ngx_http_request_t *r, ngx_str_t key, continue; } + if (!llcf->correct_location_header + && ngx_strncasecmp(hv.key.data, + (u_char *) "Location", + sizeof("Location")) + == 0) { + /* XXX The best way to get the index of the last of the structure */ + i = (sizeof(ngx_http_lua_set_handlers) / + sizeof(ngx_http_lua_set_handlers[0])) - 1; + break; + } + dd("Matched handler: %s %s", handlers[i].name.data, hv.key.data); hv.offset = handlers[i].offset; diff --git a/src/ngx_http_lua_module.c b/src/ngx_http_lua_module.c index 163f6d64e2..bf2eab212f 100644 --- a/src/ngx_http_lua_module.c +++ b/src/ngx_http_lua_module.c @@ -34,6 +34,8 @@ static char *ngx_http_lua_init_main_conf(ngx_conf_t *cf, void *conf); static void *ngx_http_lua_create_loc_conf(ngx_conf_t *cf); static char *ngx_http_lua_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child); +static ngx_int_t ngx_http_lua_set_ssl(ngx_conf_t *cf, + ngx_http_lua_loc_conf_t *plcf); static char *ngx_http_lua_init_vm(ngx_conf_t *cf, ngx_http_lua_main_conf_t *lmcf); static void ngx_http_lua_cleanup_vm(void *data); @@ -343,6 +345,47 @@ static ngx_command_t ngx_http_lua_cmds[] = { offsetof(ngx_http_lua_loc_conf_t, check_client_abort), NULL }, + { ngx_string("lua_enforce_content_type"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF + |NGX_CONF_FLAG, + ngx_conf_set_flag_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_lua_loc_conf_t, enforce_content_type), + NULL }, + + { ngx_string("lua_correct_location_header"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF + |NGX_CONF_FLAG, + ngx_conf_set_flag_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_lua_loc_conf_t, correct_location_header), + NULL }, + +#if (NGX_HTTP_SSL) + + { ngx_string("lua_ssl_verify"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_flag_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_lua_loc_conf_t, ssl_verify), + NULL }, + + { ngx_string("lua_ssl_verify_depth"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_num_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_lua_loc_conf_t, ssl_verify_depth), + NULL }, + + { ngx_string("lua_ssl_trusted_certificate"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_lua_loc_conf_t, ssl_trusted_certificate), + NULL }, + +#endif + ngx_null_command }; @@ -610,12 +653,16 @@ ngx_http_lua_create_loc_conf(ngx_conf_t *cf) * conf->body_filter_src = {{ 0, NULL }, NULL, NULL, NULL}; * conf->body_filter_src_key = NULL * conf->body_filter_handler = NULL; + * + * conf->ssl_trusted_certificate = NULL; */ conf->force_read_body = NGX_CONF_UNSET; conf->enable_code_cache = NGX_CONF_UNSET; conf->http10_buffering = NGX_CONF_UNSET; conf->check_client_abort = NGX_CONF_UNSET; + conf->enforce_content_type = NGX_CONF_UNSET; + conf->correct_location_header = NGX_CONF_UNSET; conf->keepalive_timeout = NGX_CONF_UNSET_MSEC; conf->connect_timeout = NGX_CONF_UNSET_MSEC; @@ -628,7 +675,13 @@ ngx_http_lua_create_loc_conf(ngx_conf_t *cf) conf->transform_underscores_in_resp_headers = NGX_CONF_UNSET; conf->log_socket_errors = NGX_CONF_UNSET; - +#if (NGX_HTTP_SSL) + + conf->ssl_verify = NGX_CONF_UNSET; + conf->ssl_verify_depth = NGX_CONF_UNSET_UINT; + +#endif + return conf; } @@ -679,6 +732,8 @@ ngx_http_lua_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_value(conf->enable_code_cache, prev->enable_code_cache, 1); ngx_conf_merge_value(conf->http10_buffering, prev->http10_buffering, 1); ngx_conf_merge_value(conf->check_client_abort, prev->check_client_abort, 0); + ngx_conf_merge_value(conf->enforce_content_type, prev->enforce_content_type, 1); + ngx_conf_merge_value(conf->correct_location_header, prev->correct_location_header, 1); ngx_conf_merge_msec_value(conf->keepalive_timeout, prev->keepalive_timeout, 60000); @@ -706,10 +761,80 @@ ngx_http_lua_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_value(conf->log_socket_errors, prev->log_socket_errors, 1); +#if (NGX_HTTP_SSL) + + if (ngx_http_lua_set_ssl(cf, conf) != NGX_OK) { + return NGX_CONF_ERROR; + } + + ngx_conf_merge_value(conf->ssl_verify, + prev->ssl_verify, 0); + ngx_conf_merge_uint_value(conf->ssl_verify_depth, + prev->ssl_verify_depth, 1); + ngx_conf_merge_str_value(conf->ssl_trusted_certificate, + prev->ssl_trusted_certificate, ""); + + if (conf->ssl_verify) { + if (conf->ssl_trusted_certificate.len == 0) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "no \"lua_ssl_trusted_certificate\" is " + " defined for the \"lua_ssl_verify\" " + "directive"); + + return NGX_CONF_ERROR; + } + } + + if (ngx_ssl_trusted_certificate(cf, conf->ssl, + &conf->ssl_trusted_certificate, + conf->ssl_verify_depth) + != NGX_OK) + { + return NGX_CONF_ERROR; + } + +#endif + return NGX_CONF_OK; } +#if (NGX_HTTP_SSL) + +static ngx_int_t +ngx_http_lua_set_ssl(ngx_conf_t *cf, ngx_http_lua_loc_conf_t *plcf) +{ + ngx_pool_cleanup_t *cln; + + plcf->ssl = ngx_pcalloc(cf->pool, sizeof(ngx_ssl_t)); + if (plcf->ssl == NULL) { + return NGX_ERROR; + } + + plcf->ssl->log = cf->log; + + if (ngx_ssl_create(plcf->ssl, + NGX_SSL_SSLv2|NGX_SSL_SSLv3|NGX_SSL_TLSv1 + |NGX_SSL_TLSv1_1|NGX_SSL_TLSv1_2, + NULL) + != NGX_OK) + { + return NGX_ERROR; + } + + cln = ngx_pool_cleanup_add(cf->pool, 0); + if (cln == NULL) { + return NGX_ERROR; + } + + cln->handler = ngx_ssl_cleanup_ctx; + cln->data = plcf->ssl; + + return NGX_OK; +} + +#endif + static void ngx_http_lua_cleanup_vm(void *data) { diff --git a/src/ngx_http_lua_req_keepalive.c b/src/ngx_http_lua_req_keepalive.c new file mode 100644 index 0000000000..73aa510cbb --- /dev/null +++ b/src/ngx_http_lua_req_keepalive.c @@ -0,0 +1,79 @@ + +/* + * Copyright (C) Yichun Zhang (agentzh) + */ + + +#ifndef DDEBUG +#define DDEBUG 0 +#endif + + +#include "ddebug.h" +#include "ngx_http_lua_req_keepalive.h" +#include "ngx_http_lua_util.h" + + +static int ngx_http_lua_ngx_req_get_keepalive(lua_State *L); +static int ngx_http_lua_ngx_req_set_keepalive(lua_State *L); + + +void +ngx_http_lua_inject_req_keepalive_api(lua_State *L) +{ + lua_pushcfunction(L, ngx_http_lua_ngx_req_get_keepalive); + lua_setfield(L, -2, "get_keepalive"); + + lua_pushcfunction(L, ngx_http_lua_ngx_req_set_keepalive); + lua_setfield(L, -2, "set_keepalive"); +} + + +static int +ngx_http_lua_ngx_req_get_keepalive(lua_State *L) +{ + int n; + ngx_http_request_t *r; + + n = lua_gettop(L); + if (n != 0) { + return luaL_error(L, "no arguments expected but got %d", n); + } + + r = ngx_http_lua_get_req(L); + if (r == NULL) { + return luaL_error(L, "request object not found"); + } + + ngx_http_lua_check_fake_request(L, r); + + lua_pushboolean(L, r->keepalive); + return 1; +} + + +static int +ngx_http_lua_ngx_req_set_keepalive(lua_State *L) +{ + int n; + ngx_http_request_t *r; + int keepalive; + + n = lua_gettop(L); + if (n != 1) { + return luaL_error(L, "only one argument expected but got %d", n); + } + + keepalive = lua_toboolean(L, 1); + + r = ngx_http_lua_get_req(L); + if (r == NULL) { + return luaL_error(L, "request object not found"); + } + + ngx_http_lua_check_fake_request(L, r); + + r->keepalive = keepalive; + + return 1; +} diff --git a/src/ngx_http_lua_req_keepalive.h b/src/ngx_http_lua_req_keepalive.h new file mode 100644 index 0000000000..9c8da71c55 --- /dev/null +++ b/src/ngx_http_lua_req_keepalive.h @@ -0,0 +1,19 @@ + +/* + * Copyright (C) Yichun Zhang (agentzh) + */ + + +#ifndef _NGX_HTTP_LUA_KEEPALIVE_H_INCLUDED_ +#define _NGX_HTTP_LUA_KEEPALIVE_H_INCLUDED_ + + +#include "ngx_http_lua_common.h" + + +void ngx_http_lua_inject_req_keepalive_api(lua_State *L); + + +#endif /* _NGX_HTTP_LUA_KEEPALIVE_H_INCLUDED_ */ + +/* vi:set ft=c ts=4 sw=4 et fdm=marker: */ diff --git a/src/ngx_http_lua_socket_tcp.c b/src/ngx_http_lua_socket_tcp.c index d27edbb60a..52a16e4425 100644 --- a/src/ngx_http_lua_socket_tcp.c +++ b/src/ngx_http_lua_socket_tcp.c @@ -28,8 +28,11 @@ static int ngx_http_lua_socket_tcp_connect(lua_State *L); static int ngx_http_lua_socket_tcp_receive(lua_State *L); static int ngx_http_lua_socket_tcp_send(lua_State *L); static int ngx_http_lua_socket_tcp_close(lua_State *L); +static int ngx_http_lua_socket_tcp_fake_close(lua_State *L); static int ngx_http_lua_socket_tcp_setoption(lua_State *L); static int ngx_http_lua_socket_tcp_settimeout(lua_State *L); +static int ngx_http_lua_socket_tcp_set_receive_timeout(lua_State *L); +static int ngx_http_lua_socket_tcp_set_send_timeout(lua_State *L); static void ngx_http_lua_socket_tcp_handler(ngx_event_t *ev); static ngx_int_t ngx_http_lua_socket_tcp_get_peer(ngx_peer_connection_t *pc, void *data); @@ -49,8 +52,16 @@ static ngx_int_t ngx_http_lua_socket_test_connect(ngx_http_request_t *r, ngx_connection_t *c); static void ngx_http_lua_socket_handle_error(ngx_http_request_t *r, ngx_http_lua_socket_tcp_upstream_t *u, ngx_uint_t ft_type); +static void ngx_http_lua_socket_handle_read_error(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u, ngx_uint_t ft_type); +static void ngx_http_lua_socket_handle_write_error(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u, ngx_uint_t ft_type); static void ngx_http_lua_socket_handle_success(ngx_http_request_t *r, ngx_http_lua_socket_tcp_upstream_t *u); +static void ngx_http_lua_socket_handle_read_success(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u); +static void ngx_http_lua_socket_handle_write_success(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u); static int ngx_http_lua_socket_tcp_send_retval_handler(ngx_http_request_t *r, ngx_http_lua_socket_tcp_upstream_t *u, lua_State *L); static int ngx_http_lua_socket_tcp_connect_retval_handler(ngx_http_request_t *r, @@ -72,6 +83,7 @@ static int ngx_http_lua_socket_error_retval_handler(ngx_http_request_t *r, static ngx_int_t ngx_http_lua_socket_read_all(void *data, ssize_t bytes); static ngx_int_t ngx_http_lua_socket_read_until(void *data, ssize_t bytes); static ngx_int_t ngx_http_lua_socket_read_chunk(void *data, ssize_t bytes); +static ngx_int_t ngx_http_lua_socket_read_exact_chunk(void *data, ssize_t bytes); static int ngx_http_lua_socket_tcp_receiveuntil(lua_State *L); static int ngx_http_lua_socket_receiveuntil_iterator(lua_State *L); static ngx_int_t ngx_http_lua_socket_compile_pattern(u_char *data, size_t len, @@ -106,6 +118,16 @@ static void ngx_http_lua_tcp_resolve_cleanup(void *data); static void ngx_http_lua_tcp_socket_cleanup(void *data); +#if (NGX_HTTP_SSL) + +static int ngx_http_lua_socket_ssl_handshake_ended(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u, lua_State *L); +static void ngx_http_lua_socket_ssl_handshake(ngx_connection_t *c); +static ngx_int_t ngx_http_lua_socket_ssl_init_connection(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u, lua_State *L); + +#endif + enum { SOCKET_CTX_INDEX = 1, SOCKET_TIMEOUT_INDEX = 2, @@ -160,6 +182,12 @@ ngx_http_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L) lua_pushcfunction(L, ngx_http_lua_socket_tcp_settimeout); lua_setfield(L, -2, "settimeout"); /* ngx socket mt */ + lua_pushcfunction(L, ngx_http_lua_socket_tcp_set_receive_timeout); + lua_setfield(L, -2, "set_receive_timeout"); /* ngx socket mt */ + + lua_pushcfunction(L, ngx_http_lua_socket_tcp_fake_close); + lua_setfield(L, -2, "fake_close"); + lua_pushvalue(L, -1); lua_setfield(L, -2, "__index"); @@ -182,6 +210,15 @@ ngx_http_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L) lua_pushcfunction(L, ngx_http_lua_socket_tcp_settimeout); lua_setfield(L, -2, "settimeout"); /* ngx socket mt */ + lua_pushcfunction(L, ngx_http_lua_socket_tcp_set_receive_timeout); + lua_setfield(L, -2, "set_receive_timeout"); /* ngx socket mt */ + + lua_pushcfunction(L, ngx_http_lua_socket_tcp_set_send_timeout); + lua_setfield(L, -2, "set_send_timeout"); /* ngx socket mt */ + + lua_pushcfunction(L, ngx_http_lua_socket_tcp_fake_close); + lua_setfield(L, -2, "fake_close"); + lua_pushvalue(L, -1); lua_setfield(L, -2, "__index"); @@ -213,6 +250,12 @@ ngx_http_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L) lua_pushcfunction(L, ngx_http_lua_socket_tcp_settimeout); lua_setfield(L, -2, "settimeout"); /* ngx socket mt */ + lua_pushcfunction(L, ngx_http_lua_socket_tcp_set_receive_timeout); + lua_setfield(L, -2, "set_receive_timeout"); /* ngx socket mt */ + + lua_pushcfunction(L, ngx_http_lua_socket_tcp_set_send_timeout); + lua_setfield(L, -2, "set_send_timeout"); /* ngx socket mt */ + lua_pushcfunction(L, ngx_http_lua_socket_tcp_getreusedtimes); lua_setfield(L, -2, "getreusedtimes"); @@ -294,6 +337,10 @@ ngx_http_lua_socket_tcp_connect(lua_State *L) const char *msg; ngx_http_lua_co_ctx_t *coctx; +#if (NGX_HTTP_SSL) + ngx_int_t ssl = 0; +#endif + ngx_http_lua_socket_tcp_upstream_t *u; n = lua_gettop(L); @@ -355,7 +402,6 @@ ngx_http_lua_socket_tcp_connect(lua_State *L) break; case LUA_TNIL: - lua_pop(L, 2); break; default: @@ -365,6 +411,24 @@ ngx_http_lua_socket_tcp_connect(lua_State *L) break; } +#if (NGX_HTTP_SSL) + + lua_getfield(L, n, "ssl"); + + if (lua_type(L, -1) != LUA_TBOOLEAN) { + msg = lua_pushfstring(L, "bad \"ssl\" option type: %s", + luaL_typename(L, -1)); + return luaL_argerror(L, n, msg); + } + + ssl = lua_toboolean(L, -1); + lua_pop(L, 1); + +#endif + if (!custom_pool) { + lua_pop(L, 2); + } + n--; } @@ -400,6 +464,8 @@ ngx_http_lua_socket_tcp_connect(lua_State *L) u = lua_touserdata(L, -1); lua_pop(L, 1); + llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); + if (u) { if (u->waiting) { lua_pushnil(L); @@ -439,12 +505,18 @@ ngx_http_lua_socket_tcp_connect(lua_State *L) ngx_memzero(u, sizeof(ngx_http_lua_socket_tcp_upstream_t)); +#if (NGX_HTTP_SSL) + + if (llcf->ssl) { + u->ssl = ssl; + } + +#endif + coctx = ctx->cur_co_ctx; u->request = r; /* set the controlling request */ - llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); - u->conf = llcf; pc = &u->peer; @@ -561,7 +633,8 @@ ngx_http_lua_socket_tcp_connect(lua_State *L) rctx->timeout = clcf->resolver_timeout; u->resolved->ctx = rctx; - u->co_ctx = ctx->cur_co_ctx; + /* For connect and resolve, use co_ctx_read. */ + u->co_ctx_read = ctx->cur_co_ctx; coctx->data = u; @@ -640,9 +713,9 @@ ngx_http_lua_socket_resolve_handler(ngx_resolver_ctx_t *ctx) return; } - lctx->cur_co_ctx = u->co_ctx; + lctx->cur_co_ctx = u->co_ctx_read; - u->co_ctx->cleanup = NULL; + u->co_ctx_read->cleanup = NULL; L = lctx->cur_co_ctx->co; @@ -766,6 +839,104 @@ ngx_http_lua_socket_resolve_handler(ngx_resolver_ctx_t *ctx) } +#if (NGX_HTTP_SSL) +static ngx_int_t +ngx_http_lua_socket_ssl_init_connection(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u, lua_State *L) +{ + ngx_connection_t *c; + ngx_int_t rc; + + c = u->peer.connection; + + if (ngx_ssl_create_connection(u->conf->ssl, c, + NGX_SSL_BUFFER|NGX_SSL_CLIENT) + != NGX_OK) + { + return luaL_error(L, "error creating ssl session"); + } + /* TODO: Add SSL session reuse here. */ + + rc = ngx_ssl_handshake(c); + + if (rc == NGX_AGAIN) { + c->ssl->handler = ngx_http_lua_socket_ssl_handshake; + u->prepare_retvals = ngx_http_lua_socket_ssl_handshake_ended; + return NGX_AGAIN; + } + + return ngx_http_lua_socket_ssl_handshake_ended(r, u, L); +} + + +static void +ngx_http_lua_socket_ssl_handshake(ngx_connection_t *c) +{ + ngx_http_request_t *r; + ngx_http_lua_socket_tcp_upstream_t *u; + ngx_http_lua_ctx_t *ctx; + + u = c->data; + r = u->request; + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + + if (ctx == NULL) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "ngx_lua ctx not found"); + return; + } + + c->write->handler = ngx_http_lua_socket_tcp_handler; + c->read->handler = ngx_http_lua_socket_tcp_handler; + + ctx->resume_handler = ngx_http_lua_socket_tcp_resume; + r->write_event_handler(r); + ngx_http_run_posted_requests(r->connection); +} + + +static int +ngx_http_lua_socket_ssl_handshake_ended(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u, lua_State *L) +{ + ngx_connection_t *c; + ngx_http_lua_loc_conf_t *llcf; + X509 *cert; + + llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); + + c = u->peer.connection; + + if (!c->ssl->handshaked) { + lua_pushnil(L); + lua_pushliteral(L, "SSL handshake failed"); + return 2; + } + + if (llcf->ssl_verify) { + if (SSL_get_verify_result(c->ssl->connection) != X509_V_OK) { + lua_pushnil(L); + lua_pushliteral(L, "SSL certificate verfication failed"); + return 2; + } + + cert = SSL_get_peer_certificate(c->ssl->connection); + if (cert == NULL) { + lua_pushnil(L); + lua_pushliteral(L, "SSL server did not return certificate"); + return 2; + } + + X509_free(cert); + } + + c->log->action = "SSL connection transaction"; + lua_pushinteger(L, 1); + return 1; +} +#endif + + static int ngx_http_lua_socket_resolve_retval_handler(ngx_http_request_t *r, ngx_http_lua_socket_tcp_upstream_t *u, lua_State *L) @@ -898,13 +1069,21 @@ ngx_http_lua_socket_resolve_retval_handler(ngx_http_request_t *r, ngx_http_lua_socket_handle_error(r, u, NGX_HTTP_LUA_SOCKET_FT_ERROR); lua_pushnil(L); - lua_pushliteral(L, "failed to handle write event"); + lua_pushliteral(L, "failed to handle read event"); return 2; } u->read_event_handler = ngx_http_lua_socket_dummy_handler; u->write_event_handler = ngx_http_lua_socket_dummy_handler; +#if (NGX_HTTP_SSL) + + if (u->ssl) { + return ngx_http_lua_socket_ssl_init_connection(r, u, L); + } + +#endif + lua_pushinteger(L, 1); return 1; } @@ -922,7 +1101,8 @@ ngx_http_lua_socket_resolve_retval_handler(ngx_http_request_t *r, r->write_event_handler = ngx_http_core_run_phases; } - u->co_ctx = ctx->cur_co_ctx; + /* For connect and resolve, use co_ctx_read. */ + u->co_ctx_read = ctx->cur_co_ctx; u->waiting = 1; u->prepare_retvals = ngx_http_lua_socket_tcp_connect_retval_handler; @@ -950,8 +1130,12 @@ ngx_http_lua_socket_error_retval_handler(ngx_http_request_t *r, ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "lua tcp socket error retval handler"); - if (u->co_ctx) { - u->co_ctx->cleanup = NULL; + if (u->co_ctx_read) { + u->co_ctx_read->cleanup = NULL; + } + + if (u->co_ctx_write) { + u->co_ctx_write->cleanup = NULL; } ft_type = u->ft_type; @@ -1014,6 +1198,14 @@ ngx_http_lua_socket_tcp_connect_retval_handler(ngx_http_request_t *r, return ngx_http_lua_socket_error_retval_handler(r, u, L); } +#if (NGX_HTTP_SSL) + + if (u->ssl) { + return ngx_http_lua_socket_ssl_init_connection(r, u, L); + } + +#endif + lua_pushinteger(L, 1); return 1; } @@ -1033,10 +1225,11 @@ ngx_http_lua_socket_tcp_receive(lua_State *L) int typ; ngx_http_lua_loc_conf_t *llcf; ngx_http_lua_co_ctx_t *coctx; + int bsd_read; n = lua_gettop(L); - if (n != 1 && n != 2) { - return luaL_error(L, "expecting 1 or 2 arguments " + if (n != 1 && n != 2 && n != 3) { + return luaL_error(L, "expecting 1, 2 or 3 arguments " "(including the object), but got %d", n); } @@ -1070,9 +1263,9 @@ ngx_http_lua_socket_tcp_receive(lua_State *L) return 2; } - if (u->waiting) { + if (u->waiting_read) { lua_pushnil(L); - lua_pushliteral(L, "socket busy"); + lua_pushliteral(L, "socket already busy reading"); return 2; } @@ -1129,7 +1322,37 @@ ngx_http_lua_socket_tcp_receive(lua_State *L) } #endif - u->input_filter = ngx_http_lua_socket_read_chunk; + bsd_read = 0; + + /* Check if the options table is given. */ + if (n == 3) { + luaL_checktype(L, 3, LUA_TTABLE); + lua_getfield(L, 3, "bsd_read"); + + switch (lua_type(L, -1)) { + case LUA_TNIL: + /* use default value - false */ + break; + + case LUA_TBOOLEAN: + bsd_read = lua_toboolean(L, -1); + break; + + default: + return luaL_error(L, "bad \"bsd_read\" option value type: %s", + luaL_typename(L, -1)); + + } + + lua_pop(L, 1); + } + + if (bsd_read) { + u->input_filter = ngx_http_lua_socket_read_exact_chunk; + } else { + u->input_filter = ngx_http_lua_socket_read_chunk; + } + u->length = (size_t) bytes; u->rest = u->length; @@ -1172,7 +1395,7 @@ ngx_http_lua_socket_tcp_receive(lua_State *L) r->read_event_handler = ngx_http_lua_req_socket_rev_handler; } - u->waiting = 0; + u->waiting_read = 0; rc = ngx_http_lua_socket_tcp_read(r, u); @@ -1194,7 +1417,6 @@ ngx_http_lua_socket_tcp_receive(lua_State *L) /* rc == NGX_AGAIN */ u->read_event_handler = ngx_http_lua_socket_read_handler; - u->write_event_handler = ngx_http_lua_socket_dummy_handler; ctx->cur_co_ctx->cleanup = ngx_http_lua_tcp_socket_cleanup; @@ -1205,8 +1427,8 @@ ngx_http_lua_socket_tcp_receive(lua_State *L) r->write_event_handler = ngx_http_core_run_phases; } - u->co_ctx = ctx->cur_co_ctx; - u->waiting = 1; + u->co_ctx_read = ctx->cur_co_ctx; + u->waiting_read = 1; u->prepare_retvals = ngx_http_lua_socket_tcp_receive_retval_handler; coctx = ctx->cur_co_ctx; @@ -1264,6 +1486,48 @@ ngx_http_lua_socket_read_chunk(void *data, ssize_t bytes) } +static ngx_int_t +ngx_http_lua_socket_read_exact_chunk(void *data, ssize_t bytes) +{ + ngx_http_lua_socket_tcp_upstream_t *u = data; + + ngx_buf_t *b; +#if (NGX_DEBUG) + ngx_http_request_t *r; + + r = u->request; +#endif + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "lua tcp socket read max chunk %z", bytes); + + if (bytes == 0) { + u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_CLOSED; + return NGX_ERROR; + } + + b = &u->buffer; + + if (bytes >= (ssize_t) u->rest) { + + u->buf_in->buf->last += u->rest; + b->pos += u->rest; + u->rest = 0; + + return NGX_OK; + } + + /* bytes < u->rest */ + + u->buf_in->buf->last += bytes; + b->pos += bytes; + + u->rest = 0; + + return NGX_OK; +} + + static ngx_int_t ngx_http_lua_socket_read_all(void *data, ssize_t bytes) { @@ -1384,7 +1648,7 @@ ngx_http_lua_socket_tcp_read(ngx_http_request_t *r, rev = c->read; ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, - "lua tcp socket read data: waiting: %d", (int) u->waiting); + "lua tcp socket read data: waiting: %d", (int) u->waiting_read); b = &u->buffer; read = 0; @@ -1401,7 +1665,7 @@ ngx_http_lua_socket_tcp_read(ngx_http_request_t *r, ngx_log_debug4(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "lua tcp socket receive done: wait:%d, eof:%d, " - "uri:\"%V?%V\"", (int) u->waiting, (int) u->eof, + "uri:\"%V?%V\"", (int) u->waiting_read, (int) u->eof, &r->uri, &r->args); if (u->body_downstream @@ -1419,12 +1683,12 @@ ngx_http_lua_socket_tcp_read(ngx_http_request_t *r, } if (rc == NGX_HTTP_CLIENT_CLOSED_REQUEST) { - ngx_http_lua_socket_handle_error(r, u, - NGX_HTTP_LUA_SOCKET_FT_CLIENTABORT); + ngx_http_lua_socket_handle_read_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_CLIENTABORT); } else { - ngx_http_lua_socket_handle_error(r, u, - NGX_HTTP_LUA_SOCKET_FT_ERROR); + ngx_http_lua_socket_handle_read_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_ERROR); } return NGX_ERROR; @@ -1433,23 +1697,23 @@ ngx_http_lua_socket_tcp_read(ngx_http_request_t *r, #if 1 if (ngx_handle_read_event(rev, 0) != NGX_OK) { - ngx_http_lua_socket_handle_error(r, u, - NGX_HTTP_LUA_SOCKET_FT_ERROR); + ngx_http_lua_socket_handle_read_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_ERROR); return NGX_ERROR; } #endif success: - ngx_http_lua_socket_handle_success(r, u); + ngx_http_lua_socket_handle_read_success(r, u); return NGX_OK; } if (rc == NGX_ERROR) { dd("input filter error: ft_type:%d waiting:%d", - (int) u->ft_type, (int) u->waiting); + (int) u->ft_type, (int) u->waiting_read); - ngx_http_lua_socket_handle_error(r, u, - NGX_HTTP_LUA_SOCKET_FT_ERROR); + ngx_http_lua_socket_handle_read_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_ERROR); return NGX_ERROR; } @@ -1472,8 +1736,8 @@ ngx_http_lua_socket_tcp_read(ngx_http_request_t *r, if (size == 0) { rc = ngx_http_lua_socket_add_input_buffer(r, u); if (rc == NGX_ERROR) { - ngx_http_lua_socket_handle_error(r, u, - NGX_HTTP_LUA_SOCKET_FT_NOMEM); + ngx_http_lua_socket_handle_read_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_NOMEM); return NGX_ERROR; } @@ -1482,25 +1746,7 @@ ngx_http_lua_socket_tcp_read(ngx_http_request_t *r, size = (size_t) (b->end - b->last); } - if (u->raw_downstream) { - preread = r->header_in->last - r->header_in->pos; - - if (preread) { - - if ((off_t) size > preread) { - size = (size_t) preread; - } - - ngx_http_lua_probe_req_socket_consume_preread(r, - r->header_in->pos, - size); - - b->last = ngx_copy(b->last, r->header_in->pos, size); - r->header_in->pos += size; - continue; - } - - } else if (u->body_downstream) { + if (u->body_downstream || u->raw_downstream) { if (r->request_body->rest == 0) { @@ -1542,7 +1788,7 @@ ngx_http_lua_socket_tcp_read(ngx_http_request_t *r, r->header_in->pos += size; r->request_length += size; - if (r->request_body->rest) { + if (r->request_body->rest >= 0) { r->request_body->rest -= size; } @@ -1589,7 +1835,7 @@ ngx_http_lua_socket_tcp_read(ngx_http_request_t *r, if (llcf->check_client_abort) { - ngx_http_lua_socket_handle_error(r, u, + ngx_http_lua_socket_handle_read_error(r, u, NGX_HTTP_LUA_SOCKET_FT_CLIENTABORT); return NGX_ERROR; } @@ -1597,7 +1843,7 @@ ngx_http_lua_socket_tcp_read(ngx_http_request_t *r, /* llcf->check_client_abort == 0 */ if (u->body_downstream && r->request_body->rest) { - ngx_http_lua_socket_handle_error(r, u, + ngx_http_lua_socket_handle_read_error(r, u, NGX_HTTP_LUA_SOCKET_FT_CLIENTABORT); return NGX_ERROR; } @@ -1613,8 +1859,8 @@ ngx_http_lua_socket_tcp_read(ngx_http_request_t *r, if (n == NGX_ERROR) { u->socket_errno = ngx_socket_errno; - ngx_http_lua_socket_handle_error(r, u, - NGX_HTTP_LUA_SOCKET_FT_ERROR); + ngx_http_lua_socket_handle_read_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_ERROR); return NGX_ERROR; } @@ -1628,8 +1874,8 @@ ngx_http_lua_socket_tcp_read(ngx_http_request_t *r, #if 1 if (ngx_handle_read_event(rev, 0) != NGX_OK) { - ngx_http_lua_socket_handle_error(r, u, - NGX_HTTP_LUA_SOCKET_FT_ERROR); + ngx_http_lua_socket_handle_read_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_ERROR); return NGX_ERROR; } #endif @@ -1695,9 +1941,9 @@ ngx_http_lua_socket_tcp_send(lua_State *L) return 2; } - if (u->waiting) { + if (u->waiting_write) { lua_pushnil(L); - lua_pushliteral(L, "socket busy"); + lua_pushliteral(L, "socket already busy writing"); return 2; } @@ -1774,7 +2020,7 @@ ngx_http_lua_socket_tcp_send(lua_State *L) /* mimic ngx_http_upstream_init_request here */ #if 1 - u->waiting = 0; + u->waiting_write = 0; #endif ngx_http_lua_probe_socket_tcp_send_start(r, u, b->pos, len); @@ -1804,8 +2050,8 @@ ngx_http_lua_socket_tcp_send(lua_State *L) r->write_event_handler = ngx_http_core_run_phases; } - u->co_ctx = ctx->cur_co_ctx; - u->waiting = 1; + u->co_ctx_write = ctx->cur_co_ctx; + u->waiting_write = 1; u->prepare_retvals = ngx_http_lua_socket_tcp_send_retval_handler; dd("setting data to %p", u); @@ -1943,7 +2189,7 @@ ngx_http_lua_socket_tcp_close(lua_State *L) return 2; } - if (u->waiting) { + if ((u->waiting_read) || (u->waiting_write) || (u->waiting)) { lua_pushnil(L); lua_pushliteral(L, "socket busy"); return 2; @@ -1961,6 +2207,42 @@ ngx_http_lua_socket_tcp_close(lua_State *L) return 1; } +static int +ngx_http_lua_socket_tcp_fake_close(lua_State *L) +{ + ngx_http_request_t *r; + ngx_http_lua_socket_tcp_upstream_t *u; + ngx_http_lua_ctx_t *ctx; + ngx_http_lua_co_ctx_t *coctx; + + if (lua_gettop(L) != 1) { + return luaL_error(L, "expecting 1 argument " + "(including the object) but seen %d", lua_gettop(L)); + } + + r = ngx_http_lua_get_req(L); + if (r == NULL) { + return luaL_error(L, "no request found"); + } + + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + if (ctx == NULL) { + return luaL_error(L, "no ctx found"); + } + + luaL_checktype(L, 1, LUA_TTABLE); + + lua_rawgeti(L, 1, SOCKET_CTX_INDEX); + u = lua_touserdata(L, -1); + lua_pop(L, 1); + + u->fake_eof = 1; + coctx = ctx->cur_co_ctx; + ngx_http_lua_socket_read_handler(r, u); + ctx->cur_co_ctx = coctx; + + return 0; +} static int ngx_http_lua_socket_tcp_setoption(lua_State *L) @@ -2010,6 +2292,76 @@ ngx_http_lua_socket_tcp_settimeout(lua_State *L) } +static int +ngx_http_lua_socket_tcp_set_receive_timeout(lua_State *L) +{ + int n; + ngx_int_t timeout; + + ngx_http_lua_socket_tcp_upstream_t *u; + + n = lua_gettop(L); + + if (n != 2) { + return luaL_error(L, "ngx.socket set_receive_timeout: expecting at least 2 " + "arguments (including the object) but seen %d", + lua_gettop(L)); + } + + timeout = (ngx_int_t) lua_tonumber(L, 2); + + lua_rawseti(L, 1, SOCKET_TIMEOUT_INDEX); + + lua_rawgeti(L, 1, SOCKET_CTX_INDEX); + u = lua_touserdata(L, -1); + + if (u) { + if (timeout > 0) { + u->read_timeout = (ngx_msec_t) timeout; + } else { + u->read_timeout = u->conf->read_timeout; + } + } + + return 0; +} + + +static int +ngx_http_lua_socket_tcp_set_send_timeout(lua_State *L) +{ + int n; + ngx_int_t timeout; + + ngx_http_lua_socket_tcp_upstream_t *u; + + n = lua_gettop(L); + + if (n != 2) { + return luaL_error(L, "ngx.socket set_send_timeout: expecting at least 2 " + "arguments (including the object) but seen %d", + lua_gettop(L)); + } + + timeout = (ngx_int_t) lua_tonumber(L, 2); + + lua_rawseti(L, 1, SOCKET_TIMEOUT_INDEX); + + lua_rawgeti(L, 1, SOCKET_CTX_INDEX); + u = lua_touserdata(L, -1); + + if (u) { + if (timeout > 0) { + u->send_timeout = (ngx_msec_t) timeout; + } else { + u->send_timeout = u->conf->send_timeout; + } + } + + return 0; +} + + static void ngx_http_lua_socket_tcp_handler(ngx_event_t *ev) { @@ -2062,6 +2414,11 @@ ngx_http_lua_socket_read_handler(ngx_http_request_t *r, ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "lua tcp socket read handler"); + if (u->fake_eof) { + ngx_http_lua_socket_handle_read_error(r, u, NGX_HTTP_LUA_SOCKET_FT_CLOSED); + return; + } + if (c->read->timedout) { c->read->timedout = 0; @@ -2072,7 +2429,7 @@ ngx_http_lua_socket_read_handler(ngx_http_request_t *r, "lua tcp socket read timed out"); } - ngx_http_lua_socket_handle_error(r, u, NGX_HTTP_LUA_SOCKET_FT_TIMEOUT); + ngx_http_lua_socket_handle_read_error(r, u, NGX_HTTP_LUA_SOCKET_FT_TIMEOUT); return; } @@ -2108,7 +2465,7 @@ ngx_http_lua_socket_send_handler(ngx_http_request_t *r, "lua tcp socket write timed out"); } - ngx_http_lua_socket_handle_error(r, u, NGX_HTTP_LUA_SOCKET_FT_TIMEOUT); + ngx_http_lua_socket_handle_write_error(r, u, NGX_HTTP_LUA_SOCKET_FT_TIMEOUT); return; } @@ -2136,8 +2493,8 @@ ngx_http_lua_socket_send(ngx_http_request_t *r, ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); if (ctx == NULL) { - ngx_http_lua_socket_handle_error(r, u, - NGX_HTTP_LUA_SOCKET_FT_ERROR); + ngx_http_lua_socket_handle_write_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_ERROR); return NGX_ERROR; } @@ -2170,12 +2527,12 @@ ngx_http_lua_socket_send(ngx_http_request_t *r, u->write_event_handler = ngx_http_lua_socket_dummy_handler; if (ngx_handle_write_event(c->write, 0) != NGX_OK) { - ngx_http_lua_socket_handle_error(r, u, - NGX_HTTP_LUA_SOCKET_FT_ERROR); + ngx_http_lua_socket_handle_write_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_ERROR); return NGX_ERROR; } - ngx_http_lua_socket_handle_success(r, u); + ngx_http_lua_socket_handle_write_success(r, u); return NGX_OK; } @@ -2189,7 +2546,7 @@ ngx_http_lua_socket_send(ngx_http_request_t *r, if (n == NGX_ERROR) { u->socket_errno = ngx_socket_errno; - ngx_http_lua_socket_handle_error(r, u, NGX_HTTP_LUA_SOCKET_FT_ERROR); + ngx_http_lua_socket_handle_write_error(r, u, NGX_HTTP_LUA_SOCKET_FT_ERROR); return NGX_ERROR; } @@ -2200,13 +2557,12 @@ ngx_http_lua_socket_send(ngx_http_request_t *r, } u->write_event_handler = ngx_http_lua_socket_send_handler; - u->read_event_handler = ngx_http_lua_socket_dummy_handler; ngx_add_timer(c->write, u->send_timeout); if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) { - ngx_http_lua_socket_handle_error(r, u, - NGX_HTTP_LUA_SOCKET_FT_ERROR); + ngx_http_lua_socket_handle_write_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_ERROR); return NGX_ERROR; } @@ -2225,8 +2581,8 @@ ngx_http_lua_socket_handle_success(ngx_http_request_t *r, u->write_event_handler = ngx_http_lua_socket_dummy_handler; #endif - if (u->co_ctx) { - u->co_ctx->cleanup = NULL; + if (u->co_ctx_read) { + u->co_ctx_read->cleanup = NULL; } #if 0 @@ -2244,7 +2600,85 @@ ngx_http_lua_socket_handle_success(ngx_http_request_t *r, } ctx->resume_handler = ngx_http_lua_socket_tcp_resume; - ctx->cur_co_ctx = u->co_ctx; + ctx->cur_co_ctx = u->co_ctx_read; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "lua tcp socket waking up the current request"); + + r->write_event_handler(r); + } +} + + +static void +ngx_http_lua_socket_handle_read_success(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u) +{ + ngx_http_lua_ctx_t *ctx; + +#if 1 + u->read_event_handler = ngx_http_lua_socket_dummy_handler; +#endif + + if (u->co_ctx_read) { + u->co_ctx_read->cleanup = NULL; + } + +#if 0 + if (u->eof) { + ngx_http_lua_socket_tcp_finalize(r, u); + } +#endif + + if (u->waiting_read) { + u->waiting_read = 0; + + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + if (ctx == NULL) { + return; + } + + ctx->resume_handler = ngx_http_lua_socket_tcp_resume; + ctx->cur_co_ctx = u->co_ctx_read; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "lua tcp socket waking up the current request"); + + r->write_event_handler(r); + } +} + + +static void +ngx_http_lua_socket_handle_write_success(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u) +{ + ngx_http_lua_ctx_t *ctx; + +#if 1 + u->write_event_handler = ngx_http_lua_socket_dummy_handler; +#endif + + if (u->co_ctx_write) { + u->co_ctx_write->cleanup = NULL; + } + +#if 0 + if (u->eof) { + ngx_http_lua_socket_tcp_finalize(r, u); + } +#endif + + if (u->waiting_write) { + u->waiting_write = 0; + + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + if (ctx == NULL) { + return; + } + + ctx->resume_handler = ngx_http_lua_socket_tcp_resume; + ctx->cur_co_ctx = u->co_ctx_write; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "lua tcp socket waking up the current request"); @@ -2269,8 +2703,8 @@ ngx_http_lua_socket_handle_error(ngx_http_request_t *r, ngx_http_lua_socket_tcp_finalize(r, u); #endif - if (u->co_ctx) { - u->co_ctx->cleanup = NULL; + if (u->co_ctx_read) { + u->co_ctx_read->cleanup = NULL; } u->read_event_handler = ngx_http_lua_socket_dummy_handler; @@ -2285,7 +2719,87 @@ ngx_http_lua_socket_handle_error(ngx_http_request_t *r, } ctx->resume_handler = ngx_http_lua_socket_tcp_resume; - ctx->cur_co_ctx = u->co_ctx; + ctx->cur_co_ctx = u->co_ctx_read; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "lua tcp socket waking up the current request"); + + r->write_event_handler(r); + } +} + + +static void +ngx_http_lua_socket_handle_read_error(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u, ngx_uint_t ft_type) +{ + ngx_http_lua_ctx_t *ctx; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "lua tcp socket handle error"); + + u->ft_type |= ft_type; + +#if 0 + ngx_http_lua_socket_tcp_finalize(r, u); +#endif + + if (u->co_ctx_read) { + u->co_ctx_read->cleanup = NULL; + } + + u->read_event_handler = ngx_http_lua_socket_dummy_handler; + + if (u->waiting_read) { + u->waiting_read = 0; + + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + if (ctx == NULL) { + return; + } + + ctx->resume_handler = ngx_http_lua_socket_tcp_resume; + ctx->cur_co_ctx = u->co_ctx_read; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "lua tcp socket waking up the current request"); + + r->write_event_handler(r); + } +} + + +static void +ngx_http_lua_socket_handle_write_error(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u, ngx_uint_t ft_type) +{ + ngx_http_lua_ctx_t *ctx; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "lua tcp socket handle error"); + + u->ft_type |= ft_type; + +#if 0 + ngx_http_lua_socket_tcp_finalize(r, u); +#endif + + if (u->co_ctx_write) { + u->co_ctx_write->cleanup = NULL; + } + + u->write_event_handler = ngx_http_lua_socket_dummy_handler; + + if (u->waiting_write) { + u->waiting_write = 0; + + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + if (ctx == NULL) { + return; + } + + ctx->resume_handler = ngx_http_lua_socket_tcp_resume; + ctx->cur_co_ctx = u->co_ctx_write; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "lua tcp socket waking up the current request"); @@ -2429,6 +2943,15 @@ ngx_http_lua_socket_tcp_finalize(ngx_http_request_t *r, ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "lua close socket connection"); +#if (NGX_HTTP_SSL) + + if (u->peer.connection->ssl) { + u->peer.connection->ssl->no_wait_shutdown = 1; + (void) ngx_ssl_shutdown(u->peer.connection); + } + +#endif + ngx_close_connection(u->peer.connection); u->peer.connection = NULL; @@ -2656,7 +3179,7 @@ ngx_http_lua_socket_receiveuntil_iterator(lua_State *L) return 2; } - if (u->waiting) { + if (u->waiting_read) { lua_pushnil(L); lua_pushliteral(L, "socket busy"); return 2; @@ -2718,7 +3241,7 @@ ngx_http_lua_socket_receiveuntil_iterator(lua_State *L) r->read_event_handler = ngx_http_lua_req_socket_rev_handler; } - u->waiting = 0; + u->waiting_read = 0; rc = ngx_http_lua_socket_tcp_read(r, u); @@ -2751,8 +3274,8 @@ ngx_http_lua_socket_receiveuntil_iterator(lua_State *L) r->write_event_handler = ngx_http_core_run_phases; } - u->co_ctx = ctx->cur_co_ctx; - u->waiting = 1; + u->co_ctx_read = ctx->cur_co_ctx; + u->waiting_read = 1; u->prepare_retvals = ngx_http_lua_socket_tcp_receive_retval_handler; coctx = ctx->cur_co_ctx; @@ -3150,9 +3673,14 @@ ngx_http_lua_req_socket(lua_State *L) return 2; #else if (!r->request_body) { - lua_pushnil(L); - lua_pushliteral(L, "requesty body not read yet"); - return 2; + rb = ngx_pcalloc(r->pool, sizeof(ngx_http_request_body_t)); + if (rb == NULL) { + return luaL_error(L, "out of memory"); + } + + rb->rest = r->headers_in.content_length_n; + + r->request_body = rb; } if (c->buffered) { @@ -3231,8 +3759,8 @@ ngx_http_lua_req_socket(lua_State *L) } dd("req content length: %d", (int) r->headers_in.content_length_n); - - if (r->headers_in.content_length_n <= 0) { + + if (r->headers_in.content_length_n == 0) { lua_pushnil(L); lua_pushliteral(L, "no body"); return 2; @@ -3449,7 +3977,7 @@ static int ngx_http_lua_socket_tcp_setkeepalive(lua_State *L) return 2; } - if (u->waiting) { + if ((u->waiting_read) || (u->waiting_write) || (u->waiting)) { lua_pushnil(L); lua_pushliteral(L, "socket busy"); return 2; diff --git a/src/ngx_http_lua_socket_tcp.h b/src/ngx_http_lua_socket_tcp.h index f977ef7133..518941986f 100644 --- a/src/ngx_http_lua_socket_tcp.h +++ b/src/ngx_http_lua_socket_tcp.h @@ -81,13 +81,20 @@ struct ngx_http_lua_socket_tcp_upstream_s { size_t request_len; ngx_chain_t *request_bufs; - ngx_http_lua_co_ctx_t *co_ctx; + ngx_http_lua_co_ctx_t *co_ctx_read; + ngx_http_lua_co_ctx_t *co_ctx_write; ngx_uint_t reused; unsigned no_close:1; unsigned waiting:1; + unsigned waiting_read:1; + unsigned waiting_write:1; unsigned eof:1; + unsigned fake_eof:1; +#if (NGX_HTTP_SSL) + unsigned ssl:1; +#endif unsigned body_downstream:1; unsigned raw_downstream:1; }; diff --git a/src/ngx_http_lua_subrequest.c b/src/ngx_http_lua_subrequest.c index bb10fff4b0..5c368a167d 100644 --- a/src/ngx_http_lua_subrequest.c +++ b/src/ngx_http_lua_subrequest.c @@ -50,6 +50,23 @@ ngx_str_t ngx_http_lua_patch_method = ngx_str_t ngx_http_lua_trace_method = ngx_http_lua_method_name("TRACE"); +ngx_str_t * ngx_http_lua_ordered_methods[] = { + &ngx_http_lua_get_method, + &ngx_http_lua_head_method, + &ngx_http_lua_post_method, + &ngx_http_lua_put_method, + &ngx_http_lua_delete_method, + &ngx_http_lua_mkcol_method, + &ngx_http_lua_copy_method, + &ngx_http_lua_move_method, + &ngx_http_lua_options_method, + &ngx_http_lua_propfind_method, + &ngx_http_lua_proppatch_method, + &ngx_http_lua_lock_method, + &ngx_http_lua_unlock_method, + &ngx_http_lua_patch_method, + &ngx_http_lua_trace_method, +}; static ngx_str_t ngx_http_lua_content_length_header_key = ngx_string("Content-Length"); @@ -58,7 +75,7 @@ static ngx_str_t ngx_http_lua_content_length_header_key = static ngx_int_t ngx_http_lua_set_content_length_header(ngx_http_request_t *r, off_t len); static ngx_int_t ngx_http_lua_adjust_subrequest(ngx_http_request_t *sr, - ngx_uint_t method, int forward_body, + ngx_uint_t method, ngx_str_t *method_name, int forward_body, ngx_http_request_body_t *body, unsigned vars_action, ngx_array_t *extra_vars); static int ngx_http_lua_ngx_location_capture(lua_State *L); @@ -70,13 +87,18 @@ static ngx_int_t ngx_http_lua_subrequest_add_extra_vars(ngx_http_request_t *r, static ngx_int_t ngx_http_lua_subrequest(ngx_http_request_t *r, ngx_str_t *uri, ngx_str_t *args, ngx_http_request_t **psr, ngx_http_post_subrequest_t *ps, ngx_uint_t flags); -static ngx_int_t ngx_http_lua_subrequest_resume(ngx_http_request_t *r); +ngx_int_t ngx_http_lua_subrequest_resume(ngx_http_request_t *r); static void ngx_http_lua_handle_subreq_responses(ngx_http_request_t *r, ngx_http_lua_ctx_t *ctx); static void ngx_http_lua_cancel_subreq(ngx_http_request_t *r); static ngx_int_t ngx_http_post_request_to_head(ngx_http_request_t *r); static ngx_int_t ngx_http_lua_copy_in_file_request_body(ngx_http_request_t *r); +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING +static int ngx_http_lua_ngx_location_capture_stream(lua_State *L); +static int ngx_http_lua_ngx_location_get_subrequest_buffer(lua_State *L); +static ngx_int_t _prepare_subrequest_body_chunk(ngx_http_request_t *r, ngx_http_lua_ctx_t *ctx, u_char ** dest_buffer, unsigned long * length); +#endif /* ngx.location.capture is just a thin wrapper around * ngx.location.capture_multi */ @@ -108,6 +130,51 @@ ngx_http_lua_ngx_location_capture(lua_State *L) return ngx_http_lua_ngx_location_capture_multi(L); } +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING +static int +ngx_http_lua_ngx_location_capture_stream(lua_State *L) +{ + int n; + ngx_http_request_t *r; + ngx_http_lua_ctx_t *ctx; + + r = ngx_http_lua_get_req(L); + + if (r == NULL) { + return luaL_error(L, "no request object found"); + } + + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + if (ctx == NULL) { + return luaL_error(L, "no ctx found"); + } + + n = lua_gettop(L); + + if (n != 1 && n != 2) { + return luaL_error(L, "expecting one or two arguments"); + } + + ctx->async_capture = 1; + ctx->calling_coctx = ctx->cur_co_ctx; + + lua_createtable(L, n, 0); /* uri opts? table */ + lua_insert(L, 1); /* table uri opts? */ + if (n == 1) { /* table uri */ + lua_rawseti(L, 1, 1); /* table */ + + } else { /* table uri opts */ + lua_rawseti(L, 1, 2); /* table uri */ + lua_rawseti(L, 1, 1); /* table */ + } + + lua_createtable(L, 1, 0); /* table table' */ + lua_insert(L, 1); /* table' table */ + lua_rawseti(L, 1, 1); /* table' */ + + return ngx_http_lua_ngx_location_capture_multi(L); +} +#endif static int ngx_http_lua_ngx_location_capture_multi(lua_State *L) @@ -136,6 +203,11 @@ ngx_http_lua_ngx_location_capture_multi(lua_State *L) unsigned vars_action; ngx_uint_t nsubreqs; ngx_uint_t index; + ngx_uint_t method_index; + ngx_uint_t methods_number; + ngx_str_t *ngx_method_name = NULL; + u_char *lua_method_name; + size_t method_name_length; size_t sr_statuses_len; size_t sr_headers_len; size_t sr_bodies_len; @@ -386,15 +458,62 @@ ngx_http_lua_ngx_location_capture_multi(lua_State *L) type = lua_type(L, -1); - if (type == LUA_TNIL) { - method = NGX_HTTP_GET; + switch (type) { + case LUA_TNIL: + method = NGX_HTTP_GET; + break; + case LUA_TNUMBER: + method = (ngx_uint_t) lua_tonumber(L, -1); + break; + case LUA_TSTRING: + lua_method_name = (u_char *) lua_tolstring(L, -1, + &method_name_length); + if (method_name_length == 0) { + return luaL_error(L, "Bad http request method"); + } + + methods_number = sizeof(ngx_http_lua_ordered_methods) / + sizeof(ngx_http_lua_ordered_methods[0]); + for (method_index = 0; method_index < methods_number; + method_index++) { + if (ngx_strncasecmp( + ngx_http_lua_ordered_methods[method_index]->data, + lua_method_name, + method_name_length) + == 0) { + break; + } + } - } else { - if (type != LUA_TNUMBER) { + if (method_index == methods_number) { + /* unknown method */ + method = NGX_HTTP_UNKNOWN; + ngx_method_name = ngx_palloc(r->pool, + sizeof(ngx_str_t)); + if (ngx_method_name == NULL) { + return luaL_error(L, "out of memory"); + } + ngx_method_name->data = ngx_palloc(r->pool, + method_name_length); + if (ngx_method_name->data == NULL) { + return luaL_error(L, "out of memory"); + } + ngx_memcpy(ngx_method_name->data, + lua_method_name, + method_name_length); + ngx_memcpy(ngx_method_name->data + method_name_length, + " ", + 1); + ngx_method_name->len = method_name_length; + } else { + /* the method is a bit field, the first value is + of NGX_HTTP_GET */ + method = NGX_HTTP_GET << method_index; + ngx_method_name = NULL; + } + break; + default: return luaL_error(L, "Bad http request method"); - } - - method = (ngx_uint_t) lua_tonumber(L, -1); } lua_pop(L, 1); @@ -577,7 +696,8 @@ ngx_http_lua_ngx_location_capture_multi(lua_State *L) ngx_http_set_ctx(sr, sr_ctx, ngx_http_lua_module); - rc = ngx_http_lua_adjust_subrequest(sr, method, always_forward_body, + rc = ngx_http_lua_adjust_subrequest(sr, method, ngx_method_name, + always_forward_body, body, vars_action, extra_vars); if (rc != NGX_OK) { @@ -607,14 +727,354 @@ ngx_http_lua_ngx_location_capture_multi(lua_State *L) ctx->no_abort = 1; +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + /* Different resume handler for async requests. */ + if (ctx->async_capture) { + ctx->resume_handler = ngx_http_lua_ngx_capture_buffer_handler; + } +#endif + return lua_yield(L, 0); } +static void +_create_headers_table(lua_State *L, ngx_http_request_t *request) +{ + ngx_http_headers_out_t *sr_headers; + ngx_table_elt_t *header; + ngx_list_part_t *part; + ngx_uint_t i; + u_char buf[sizeof("Mon, 28 Sep 1970 06:00:00 GMT") - 1]; + + lua_newtable(L); + + sr_headers = &(request->headers_out); + + dd("saving subrequest response headers"); + + part = &sr_headers->headers.part; + header = part->elts; + + for (i = 0; /* void */; i++) { + + if (i >= part->nelts) { + if (part->next == NULL) { + break; + } + + part = part->next; + header = part->elts; + i = 0; + } + + dd("checking sr header %.*s", (int) header[i].key.len, + header[i].key.data); + +#if 1 + if (header[i].hash == 0) { + continue; + } +#endif + + header[i].hash = 0; + + dd("pushing sr header %.*s", (int) header[i].key.len, + header[i].key.data); + + lua_pushlstring(L, (char *) header[i].key.data, + header[i].key.len); /* header key */ + lua_pushvalue(L, -1); /* stack: table key key */ + + /* check if header already exists */ + lua_rawget(L, -3); /* stack: table key value */ + + if (lua_isnil(L, -1)) { + lua_pop(L, 1); /* stack: table key */ + + lua_pushlstring(L, (char *) header[i].value.data, + header[i].value.len); + /* stack: table key value */ + + lua_rawset(L, -3); /* stack: table */ + + } else { + + if (!lua_istable(L, -1)) { /* already inserted one value */ + lua_createtable(L, 4, 0); + /* stack: table key value table */ + + lua_insert(L, -2); /* stack: table key table value */ + lua_rawseti(L, -2, 1); /* stack: table key table */ + + lua_pushlstring(L, (char *) header[i].value.data, + header[i].value.len); + /* stack: table key table value */ + + lua_rawseti(L, -2, lua_objlen(L, -2) + 1); + /* stack: table key table */ + + lua_rawset(L, -3); /* stack: table */ + + } else { + lua_pushlstring(L, (char *) header[i].value.data, + header[i].value.len); + /* stack: table key table value */ + + lua_rawseti(L, -2, lua_objlen(L, -2) + 1); + /* stack: table key table */ + + lua_pop(L, 2); /* stack: table */ + } + } + } + + if (sr_headers->content_type.len) { + lua_pushliteral(L, "Content-Type"); /* header key */ + lua_pushlstring(L, (char *) sr_headers->content_type.data, + sr_headers->content_type.len); /* head key value */ + lua_rawset(L, -3); /* head */ + } + + if (sr_headers->content_length == NULL + && sr_headers->content_length_n >= 0) + { + lua_pushliteral(L, "Content-Length"); /* header key */ + + lua_pushnumber(L, sr_headers->content_length_n); + /* head key value */ + + lua_rawset(L, -3); /* head */ + } + + /* to work-around an issue in ngx_http_static_module + * (github issue #41) */ + if (sr_headers->location && sr_headers->location->value.len) { + lua_pushliteral(L, "Location"); /* header key */ + lua_pushlstring(L, (char *) sr_headers->location->value.data, + sr_headers->location->value.len); + /* head key value */ + lua_rawset(L, -3); /* head */ + } + + if (sr_headers->last_modified_time != -1) { + if (sr_headers->status != NGX_HTTP_OK + && sr_headers->status != NGX_HTTP_PARTIAL_CONTENT + && sr_headers->status != NGX_HTTP_NOT_MODIFIED + && sr_headers->status != NGX_HTTP_NO_CONTENT) + { + sr_headers->last_modified_time = -1; + sr_headers->last_modified = NULL; + } + } + + if (sr_headers->last_modified == NULL + && sr_headers->last_modified_time != -1) + { + (void) ngx_http_time(buf, sr_headers->last_modified_time); + + lua_pushliteral(L, "Last-Modified"); /* header key */ + lua_pushlstring(L, (char *) buf, sizeof(buf)); /* head key value */ + lua_rawset(L, -3); /* head */ + } +} + + +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING +static int +ngx_http_lua_ngx_location_get_subrequest_buffer(lua_State *L) +{ + ngx_http_request_t *r = NULL; + ngx_http_lua_ctx_t *ctx = NULL; + unsigned long buffer_length = 0; + u_char *current_buffer = NULL; + + r = ngx_http_lua_get_req(L); + + if (r == NULL) { + return luaL_error(L, "no request object found"); + } + + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + if (ctx == NULL) { + return luaL_error(L, "no ctx found"); + } + + if ((!ctx->async_capture) || (ctx->current_subrequest_ctx == NULL)) { + return luaL_error(L, "no capture streaming currently"); + } + + /* If there are chunks currently, we should return them immediately. Otherwise, wait.. */ + if (ctx->current_subrequest_ctx->body) { + if (_prepare_subrequest_body_chunk(r, ctx, ¤t_buffer, &buffer_length) != NGX_OK) { + return luaL_error(L, "memory allocation failure"); + } + ctx->subrequest_yield = 0; + lua_pushlstring(L, (char *) current_buffer, buffer_length); + /* Free the buffer that was copied to Lua */ + ngx_pfree(r->pool, current_buffer); + return 1; + } + + /* If the body is NULL and the post subrequest handler was called, this means there are no more buffers. */ + if (ctx->current_subrequest_ctx->run_post_subrequest) { + /* TODO: Tight coupling between the ctx and the request object. */ + ctx->current_subrequest_ctx = NULL; + ctx->current_subrequest = NULL; + ctx->resume_handler = ngx_http_lua_wev_handler; + return 0; + } + + ctx->resume_handler = ngx_http_lua_ngx_capture_buffer_handler; + + ctx->subrequest_yield = 1; + + return lua_yield(L, 0); +} + + +static unsigned long _get_chain_size(ngx_chain_t *in) +{ + ngx_chain_t *current_chain_link = NULL; + unsigned long chain_link_size = 0; + + for (current_chain_link = in, chain_link_size = 0; current_chain_link != NULL ;current_chain_link = current_chain_link->next) { + chain_link_size += current_chain_link->buf->last - current_chain_link->buf->pos; + } + + return chain_link_size; +} + + +static ngx_int_t _prepare_subrequest_body_chunk(ngx_http_request_t *r, ngx_http_lua_ctx_t *ctx, u_char ** dest_buffer, unsigned long * length) +{ + ngx_chain_t *cl; + unsigned long buffer_length = 0; + u_char *current_buffer; + + /* Prepare the buffer for the callback */ + buffer_length = _get_chain_size(ctx->current_subrequest_ctx->body); + current_buffer = ngx_palloc(r->pool, buffer_length); + if (current_buffer == NULL) { + return NGX_ERROR; + } + + *length = buffer_length; + *dest_buffer = current_buffer; + + /* TODO: Support the limitation of the buffer size */ + for (cl = ctx->current_subrequest_ctx->body; cl; cl = cl->next) { + current_buffer = ngx_copy(current_buffer, cl->buf->pos, cl->buf->last - cl->buf->pos); + /* TODO: Should we acutally call ngx_pfree? According to the post subrequest callback, we shouldn't. */ + cl->buf->last = cl->buf->pos; + ctx->current_subrequest_ctx->body = NULL; + ctx->current_subrequest_ctx->last_body = &(ctx->current_subrequest_ctx->body); + } + + return NGX_OK; +} + +ngx_int_t +ngx_http_lua_ngx_capture_buffer_handler(ngx_http_request_t *r) +{ + ngx_http_lua_ctx_t *ctx; + unsigned long buffer_length = 0; + u_char *current_buffer; + ngx_http_lua_co_ctx_t *current_co_ctx; + lua_State * co; + ngx_int_t rc; + int returned_values; + ngx_http_lua_main_conf_t *lmcf; + + lmcf = ngx_http_get_module_main_conf(r, ngx_http_lua_module); + + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + if (ctx == NULL) { + goto error_handling; + } + + if ((!ctx->current_subrequest_ctx) || (!ctx->current_subrequest)) { + return NGX_AGAIN; + } + + current_co_ctx = ctx->cur_co_ctx; + ctx->cur_co_ctx = ctx->calling_coctx; + co = ctx->cur_co_ctx->co; + + if (_prepare_subrequest_body_chunk(r, ctx, ¤t_buffer, &buffer_length) != NGX_OK) { + goto error_handling; + } + + if (buffer_length == 0) { + ctx->current_subrequest_buffer = NULL; + } + + /* Headers are to be returned only once. */ + if (!ctx->returned_headers) { + _create_headers_table(co, ctx->current_subrequest); + lua_pushinteger(co, ctx->current_subrequest->headers_out.status); + ctx->returned_headers = 1; + returned_values = 3; + } else { + returned_values = 1; + } + + lua_pushlstring(co, (char *) current_buffer, buffer_length); + /* Free the buffer that was copied to Lua */ + ngx_pfree(r->pool, current_buffer); + + ctx->subrequest_yield = 0; + rc = ngx_http_lua_run_thread(co, r, ctx, returned_values); + + ctx->cur_co_ctx = current_co_ctx; + + if (rc == NGX_AGAIN) { + /* NGX_AGAIN can be returned if: + - Any built-in Lua function in the current thread needs waiting. + - The function get_subrequest_buffer needs to wait for the subrequest. + + If a Lua function needs waiting, we'll remove our resume handler. + If we need waiting, we shall keep it. + Anyway, if the subrequest isn't over, we should try to wake it up. + */ + if (!ctx->current_subrequest_ctx || !ctx->current_subrequest_ctx->seen_last_for_subreq) { + ctx->wakeup_subrequest = 1; + } else { + ctx->wakeup_subrequest = 0; + } + + if (ctx->subrequest_yield) { + ctx->subrequest_yield = 0; + } else { + ctx->resume_handler = ngx_http_lua_wev_handler; + } + + return NGX_AGAIN; + } + + if (rc == NGX_DONE) { + ngx_http_lua_finalize_request(r, NGX_DONE); + return ngx_http_lua_run_posted_threads(r->connection, lmcf->lua, r, ctx); + } + + if (ctx->entered_content_phase) { + ngx_http_lua_finalize_request(r, rc); + return NGX_DONE; + } + + return rc; + + error_handling: + ngx_http_lua_finalize_request(r, NGX_ERROR); + return NGX_ERROR; +} +#endif + static ngx_int_t ngx_http_lua_adjust_subrequest(ngx_http_request_t *sr, ngx_uint_t method, - int always_forward_body, ngx_http_request_body_t *body, - unsigned vars_action, ngx_array_t *extra_vars) + ngx_str_t *method_name, int always_forward_body, + ngx_http_request_body_t *body, unsigned vars_action, + ngx_array_t *extra_vars) { ngx_http_request_t *r; ngx_int_t rc; @@ -665,6 +1125,13 @@ ngx_http_lua_adjust_subrequest(ngx_http_request_t *sr, ngx_uint_t method, sr->method = method; switch (method) { + case NGX_HTTP_UNKNOWN: + if (method_name == NULL) { + return NGX_ERROR; + } + sr->method_name = *method_name; + break; + case NGX_HTTP_GET: sr->method_name = ngx_http_lua_get_method; break; @@ -975,7 +1442,13 @@ ngx_http_lua_post_subrequest(ngx_http_request_t *r, void *data, ngx_int_t rc) dd("all subrequests are done"); pr_ctx->no_abort = 0; - pr_ctx->resume_handler = ngx_http_lua_subrequest_resume; + if (!pr_ctx->async_capture) { + pr_ctx->resume_handler = ngx_http_lua_subrequest_resume; + } else { + /* XXX: Make sure that the parent request has the correct context. */ + pr_ctx->current_subrequest = r; + pr_ctx->current_subrequest_ctx = ctx; + } pr_ctx->cur_co_ctx = pr_coctx; } @@ -1033,45 +1506,47 @@ ngx_http_lua_post_subrequest(ngx_http_request_t *r, void *data, ngx_int_t rc) body_str->len = len; - if (len == 0) { - body_str->data = NULL; + /* If we're in capture streaming mode, we want to keep the buffer, and to free it from the resume handler's context. */ + if (!pr_ctx->async_capture) { + if (len == 0) { + body_str->data = NULL; - } else { - p = ngx_palloc(r->pool, len); - if (p == NULL) { - return NGX_ERROR; - } - - body_str->data = p; + } else { + p = ngx_palloc(r->pool, len); + if (p == NULL) { + return NGX_ERROR; + } - /* copy from and then free the data buffers */ + body_str->data = p; - for (cl = ctx->body; cl; cl = cl->next) { - p = ngx_copy(p, cl->buf->pos, cl->buf->last - cl->buf->pos); + /* copy from and then free the data buffers */ - cl->buf->last = cl->buf->pos; + for (cl = ctx->body; cl; cl = cl->next) { + p = ngx_copy(p, cl->buf->pos, cl->buf->last - cl->buf->pos); + cl->buf->last = cl->buf->pos; #if 0 - dd("free body chain link buf ASAP"); - ngx_pfree(r->pool, cl->buf->start); + dd("free body chain link buf ASAP"); + ngx_pfree(r->pool, cl->buf->start); #endif + } } - } - if (ctx->body) { + if (ctx->body) { #if defined(nginx_version) && nginx_version >= 1001004 - ngx_chain_update_chains(r->pool, + ngx_chain_update_chains(r->pool, #else - ngx_chain_update_chains( + ngx_chain_update_chains( #endif - &pr_ctx->free_bufs, &pr_ctx->busy_bufs, - &ctx->body, - (ngx_buf_tag_t) &ngx_http_lua_module); + &pr_ctx->free_bufs, &pr_ctx->busy_bufs, + &ctx->body, + (ngx_buf_tag_t) &ngx_http_lua_module); - dd("free bufs: %p", pr_ctx->free_bufs); + dd("free bufs: %p", pr_ctx->free_bufs); + } } - + ngx_http_post_request_to_head(pr); if (r != r->connection->data) { @@ -1417,6 +1892,15 @@ ngx_http_lua_inject_subrequest_api(lua_State *L) lua_pushcfunction(L, ngx_http_lua_ngx_location_capture); lua_setfield(L, -2, "capture"); +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + lua_pushcfunction(L, ngx_http_lua_ngx_location_capture_stream); + lua_setfield(L, -2, "capture_stream"); + + /* TODO: Call this 'get_subrequest_body_chunk' */ + lua_pushcfunction(L, ngx_http_lua_ngx_location_get_subrequest_buffer); + lua_setfield(L, -2, "get_subrequest_buffer"); +#endif + lua_pushcfunction(L, ngx_http_lua_ngx_location_capture_multi); lua_setfield(L, -2, "capture_multi"); @@ -1544,7 +2028,7 @@ ngx_http_lua_subrequest(ngx_http_request_t *r, } -static ngx_int_t +ngx_int_t ngx_http_lua_subrequest_resume(ngx_http_request_t *r) { ngx_int_t rc; @@ -1552,6 +2036,7 @@ ngx_http_lua_subrequest_resume(ngx_http_request_t *r) ngx_http_lua_ctx_t *ctx; ngx_http_lua_co_ctx_t *coctx; ngx_http_lua_main_conf_t *lmcf; + int returned_values; ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); if (ctx == NULL) { @@ -1569,7 +2054,20 @@ ngx_http_lua_subrequest_resume(ngx_http_request_t *r) dd("nsubreqs: %d", (int) coctx->nsubreqs); - ngx_http_lua_handle_subreq_responses(r, ctx); + if (ctx->async_capture) { + if (!ctx->returned_headers) { + _create_headers_table(coctx->co, ctx->current_subrequest); + lua_pushinteger(coctx->co, ctx->current_subrequest->headers_out.status); + ctx->returned_headers = 1; + returned_values = 3; + } else { + returned_values = 1; + } + lua_pushnil(coctx->co); + } else { + ngx_http_lua_handle_subreq_responses(r, ctx); + returned_values = coctx->nsubreqs; + } dd("free sr_statues/headers/bodies memory ASAP"); @@ -1584,7 +2082,7 @@ ngx_http_lua_subrequest_resume(ngx_http_request_t *r) c = r->connection; - rc = ngx_http_lua_run_thread(lmcf->lua, r, ctx, coctx->nsubreqs); + rc = ngx_http_lua_run_thread(lmcf->lua, r, ctx, returned_values); ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "lua run thread returned %d", rc); diff --git a/src/ngx_http_lua_subrequest.h b/src/ngx_http_lua_subrequest.h index aad4236d84..ba7c3b3fc6 100644 --- a/src/ngx_http_lua_subrequest.h +++ b/src/ngx_http_lua_subrequest.h @@ -40,6 +40,8 @@ typedef struct ngx_http_lua_post_subrequest_data_s { } ngx_http_lua_post_subrequest_data_t; +ngx_int_t ngx_http_lua_ngx_capture_buffer_handler(ngx_http_request_t *r); +ngx_int_t ngx_http_lua_subrequest_resume(ngx_http_request_t *r); #endif /* _NGX_HTTP_LUA_SUBREQUEST_H_INCLUDED_ */ diff --git a/src/ngx_http_lua_uthread.c b/src/ngx_http_lua_uthread.c index a93b84c553..72bbfaa243 100644 --- a/src/ngx_http_lua_uthread.c +++ b/src/ngx_http_lua_uthread.c @@ -24,7 +24,8 @@ static int ngx_http_lua_uthread_spawn(lua_State *L); static int ngx_http_lua_uthread_wait(lua_State *L); - +static int ngx_http_lua_uthread_kill(lua_State *L); +static int ngx_http_lua_uthread_kill_sleeping_thread(lua_State *L); void ngx_http_lua_inject_uthread_api(ngx_log_t *log, lua_State *L) @@ -38,6 +39,12 @@ ngx_http_lua_inject_uthread_api(ngx_log_t *log, lua_State *L) lua_pushcfunction(L, ngx_http_lua_uthread_wait); lua_setfield(L, -2, "wait"); + lua_pushcfunction(L, ngx_http_lua_uthread_kill); + lua_setfield(L, -2, "kill"); + + lua_pushcfunction(L, ngx_http_lua_uthread_kill_sleeping_thread); + lua_setfield(L, -2, "kill_sleeping"); + lua_setfield(L, -2, "thread"); } @@ -181,4 +188,136 @@ ngx_http_lua_uthread_wait(lua_State *L) return lua_yield(L, 0); } + +static int +ngx_http_lua_uthread_kill(lua_State *L) +{ + int i, nargs; + lua_State *sub_co; + ngx_http_request_t *r; + ngx_http_lua_ctx_t *ctx; + ngx_http_lua_co_ctx_t *coctx, *sub_coctx; + + r = ngx_http_lua_get_req(L); + if (r == NULL) { + return luaL_error(L, "no request found"); + } + + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + if (ctx == NULL) { + return luaL_error(L, "no request ctx found"); + } + + ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_REWRITE + | NGX_HTTP_LUA_CONTEXT_ACCESS + | NGX_HTTP_LUA_CONTEXT_CONTENT + | NGX_HTTP_LUA_CONTEXT_TIMER); + + coctx = ctx->cur_co_ctx; + + nargs = lua_gettop(L); + + for (i = 1; i <= nargs; i++) { + sub_co = lua_tothread(L, i); + + luaL_argcheck(L, sub_co, i, "lua thread expected"); + + sub_coctx = ngx_http_lua_get_co_ctx(sub_co, ctx); + if (sub_coctx == NULL) { + return luaL_error(L, "no co ctx found for the ngx.thread " + "instance given"); + } + + if (!sub_coctx->is_uthread) { + return luaL_error(L, "attempt to kill a coroutine that is " + "not a user thread"); + } + + if (sub_coctx->parent_co_ctx != coctx) { + return luaL_error(L, "only the parent coroutine can kill the " + "thread"); + } + + ngx_http_lua_del_thread(r, L, ctx, sub_coctx); + ctx->uthreads--; + } + + return 0; +} + + +static int +ngx_http_lua_uthread_kill_sleeping_thread(lua_State *L) +{ + int nargs; + lua_State *sub_co; + ngx_http_request_t *r; + ngx_http_lua_ctx_t *ctx; + ngx_http_lua_co_ctx_t *coctx, *sub_coctx; + int wait_for_non_sleeping = 1; + + r = ngx_http_lua_get_req(L); + if (r == NULL) { + return luaL_error(L, "no request found"); + } + + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + if (ctx == NULL) { + return luaL_error(L, "no request ctx found"); + } + + ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_REWRITE + | NGX_HTTP_LUA_CONTEXT_ACCESS + | NGX_HTTP_LUA_CONTEXT_CONTENT + | NGX_HTTP_LUA_CONTEXT_TIMER); + + coctx = ctx->cur_co_ctx; + + nargs = lua_gettop(L); + + if ((nargs != 2) && (nargs != 1)) { + return luaL_error(L, "one or two arguments expected"); + } + + sub_co = lua_tothread(L, 1); + luaL_argcheck(L, sub_co, 1, "lua thread expected"); + + if (nargs == 2) { + wait_for_non_sleeping = lua_toboolean(L, 2); + } else { + wait_for_non_sleeping = 1; + } + + sub_coctx = ngx_http_lua_get_co_ctx(sub_co, ctx); + if (sub_coctx == NULL) { + return luaL_error(L, "no co ctx found for the ngx.thread " + "instance given"); + } + + if (!sub_coctx->is_uthread) { + return luaL_error(L, "attempt to wait on a coroutine that is " + "not a user thread"); + } + + if (sub_coctx->parent_co_ctx != coctx) { + return luaL_error(L, "only the parent coroutine can wait on the " + "thread"); + } + + /* If the process is not sleeping - wait for it. + If the process is sleeping - delete it. + */ + if (sub_coctx->sleep.timer_set) { + ngx_http_lua_del_thread(r, L, ctx, sub_coctx); + ctx->uthreads--; + return 0; + } else { + if (wait_for_non_sleeping) { + ngx_http_lua_probe_user_thread_wait(L, sub_coctx->co); + sub_coctx->waited_by_parent = 1; + return lua_yield(L, 0); + } + } + return 0; +} /* vi:set ft=c ts=4 sw=4 et fdm=marker: */ diff --git a/src/ngx_http_lua_util.c b/src/ngx_http_lua_util.c index 481f95a135..7d98294ff0 100644 --- a/src/ngx_http_lua_util.c +++ b/src/ngx_http_lua_util.c @@ -32,6 +32,7 @@ #include "ngx_http_lua_misc.h" #include "ngx_http_lua_consts.h" #include "ngx_http_lua_req_method.h" +#include "ngx_http_lua_req_keepalive.h" #include "ngx_http_lua_shdict.h" #include "ngx_http_lua_coroutine.h" #include "ngx_http_lua_socket_tcp.h" @@ -334,6 +335,10 @@ ngx_http_lua_del_thread(ngx_http_request_t *r, lua_State *L, coctx->co_ref = LUA_NOREF; coctx->co_status = NGX_HTTP_LUA_CO_DEAD; + if (coctx->sleep.timer_set) { + ngx_del_timer(&coctx->sleep); + } + lua_pop(L, 1); } @@ -477,14 +482,19 @@ ngx_int_t ngx_http_lua_send_header_if_needed(ngx_http_request_t *r, ngx_http_lua_ctx_t *ctx) { - ngx_int_t rc; + ngx_http_lua_loc_conf_t *llcf; + ngx_int_t rc; + + llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); if (!r->header_sent) { if (r->headers_out.status == 0) { r->headers_out.status = NGX_HTTP_OK; } - if (!ctx->headers_set && ngx_http_set_content_type(r) != NGX_OK) { + if (!ctx->headers_set + && llcf->enforce_content_type + && ngx_http_set_content_type(r) != NGX_OK) { return NGX_ERROR; } @@ -2128,6 +2138,7 @@ ngx_http_lua_inject_req_api(ngx_log_t *log, lua_State *L) ngx_http_lua_inject_req_body_api(L); ngx_http_lua_inject_req_socket_api(L); ngx_http_lua_inject_req_method_api(L); + ngx_http_lua_inject_req_keepalive_api(L); ngx_http_lua_inject_req_time_api(L); lua_setfield(L, -2, "req");