Skip to content

Commit

Permalink
增加 Message.end 用于中止 chain 事务
Browse files Browse the repository at this point in the history
  • Loading branch information
xicilion committed Mar 11, 2017
1 parent 232657c commit b61b9d9
Show file tree
Hide file tree
Showing 12 changed files with 129 additions and 12 deletions.
2 changes: 2 additions & 0 deletions fibjs/include/HttpRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class HttpRequest: public HttpRequest_base
virtual result_t readAll(obj_ptr<Buffer_base> &retVal, AsyncEvent *ac);
virtual result_t write(Buffer_base *data, AsyncEvent *ac);
virtual result_t get_length(int64_t &retVal);
virtual result_t end();
virtual result_t isEnded(bool& retVal);
virtual result_t clear();
virtual result_t sendTo(Stream_base *stm, AsyncEvent *ac);
virtual result_t readFrom(Stream_base *stm, AsyncEvent *ac);
Expand Down
2 changes: 2 additions & 0 deletions fibjs/include/HttpResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class HttpResponse: public HttpResponse_base
virtual result_t readAll(obj_ptr<Buffer_base> &retVal, AsyncEvent *ac);
virtual result_t write(Buffer_base *data, AsyncEvent *ac);
virtual result_t get_length(int64_t &retVal);
virtual result_t end();
virtual result_t isEnded(bool& retVal);
virtual result_t clear();
virtual result_t sendTo(Stream_base *stm, AsyncEvent *ac);
virtual result_t readFrom(Stream_base *stm, AsyncEvent *ac);
Expand Down
5 changes: 4 additions & 1 deletion fibjs/include/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class Message: public Message_base
virtual result_t readAll(obj_ptr<Buffer_base> &retVal, AsyncEvent *ac);
virtual result_t write(Buffer_base *data, AsyncEvent *ac);
virtual result_t get_length(int64_t &retVal);
virtual result_t end();
virtual result_t isEnded(bool& retVal);
virtual result_t clear();
virtual result_t sendTo(Stream_base *stm, AsyncEvent *ac);
virtual result_t readFrom(Stream_base *stm, AsyncEvent *ac);
Expand All @@ -40,7 +42,7 @@ class Message: public Message_base
virtual result_t set_lastError(exlib::string newVal);

public:
Message(bool bRep = false) : m_bRep(bRep)
Message(bool bRep = false) : m_bRep(bRep), m_end(false)
{
}

Expand All @@ -60,6 +62,7 @@ class Message: public Message_base
obj_ptr<SeekableStream_base> m_body;
exlib::string m_lastError;
bool m_bRep;
bool m_end;
};

} /* namespace fibjs */
Expand Down
2 changes: 2 additions & 0 deletions fibjs/include/WebSocketMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class WebSocketMessage: public WebSocketMessage_base
virtual result_t readAll(obj_ptr<Buffer_base> &retVal, AsyncEvent *ac);
virtual result_t write(Buffer_base *data, AsyncEvent *ac);
virtual result_t get_length(int64_t &retVal);
virtual result_t end();
virtual result_t isEnded(bool& retVal);
virtual result_t clear();
virtual result_t sendTo(Stream_base *stm, AsyncEvent *ac);
virtual result_t readFrom(Stream_base *stm, AsyncEvent *ac);
Expand Down
32 changes: 32 additions & 0 deletions fibjs/include/ifs/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class Message_base : public object_base
virtual result_t readAll(obj_ptr<Buffer_base>& retVal, AsyncEvent* ac) = 0;
virtual result_t write(Buffer_base* data, AsyncEvent* ac) = 0;
virtual result_t get_length(int64_t& retVal) = 0;
virtual result_t end() = 0;
virtual result_t isEnded(bool& retVal) = 0;
virtual result_t clear() = 0;
virtual result_t sendTo(Stream_base* stm, AsyncEvent* ac) = 0;
virtual result_t readFrom(Stream_base* stm, AsyncEvent* ac) = 0;
Expand All @@ -67,6 +69,8 @@ class Message_base : public object_base
static void s_readAll(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_write(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_get_length(v8::Local<v8::String> property, const v8::PropertyCallbackInfo<v8::Value> &args);
static void s_end(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_isEnded(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_clear(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_sendTo(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_readFrom(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand Down Expand Up @@ -99,6 +103,8 @@ namespace fibjs
{"read", s_read, false},
{"readAll", s_readAll, false},
{"write", s_write, false},
{"end", s_end, false},
{"isEnded", s_isEnded, false},
{"clear", s_clear, false},
{"sendTo", s_sendTo, false},
{"readFrom", s_readFrom, false}
Expand Down Expand Up @@ -306,6 +312,32 @@ namespace fibjs
METHOD_RETURN();
}

inline void Message_base::s_end(const v8::FunctionCallbackInfo<v8::Value>& args)
{
METHOD_INSTANCE(Message_base);
METHOD_ENTER();

METHOD_OVER(0, 0);

hr = pInst->end();

METHOD_VOID();
}

inline void Message_base::s_isEnded(const v8::FunctionCallbackInfo<v8::Value>& args)
{
bool vr;

METHOD_INSTANCE(Message_base);
METHOD_ENTER();

METHOD_OVER(0, 0);

hr = pInst->isEnded(vr);

METHOD_RETURN();
}

inline void Message_base::s_clear(const v8::FunctionCallbackInfo<v8::Value>& args)
{
METHOD_INSTANCE(Message_base);
Expand Down
8 changes: 8 additions & 0 deletions fibjs/include/ifs/Message.idl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ interface Message : object
/*! @brief 消息数据部分的长度 */
readonly Long length;

/*! @brief 设置当前消息处理结束,Chain 处理器不再继续后面的事务 */
end();

/*! @brief 查询当前消息是否结束
@return 结束则返回 true
*/
Boolean isEnded();

/*! @brief 清除消息的内容 */
clear();

Expand Down
10 changes: 10 additions & 0 deletions fibjs/src/http/HttpRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,16 @@ result_t HttpRequest::set_lastError(exlib::string newVal)
return m_message->set_lastError(newVal);
}

result_t HttpRequest::end()
{
return m_message->end();
}

result_t HttpRequest::isEnded(bool& retVal)
{
return m_message->isEnded(retVal);
}

result_t HttpRequest::clear()
{
m_message = new HttpMessage();
Expand Down
10 changes: 10 additions & 0 deletions fibjs/src/http/HttpResponse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,16 @@ result_t HttpResponse::set_lastError(exlib::string newVal)
return m_message->set_lastError(newVal);
}

result_t HttpResponse::end()
{
return m_message->end();
}

result_t HttpResponse::isEnded(bool& retVal)
{
return m_message->isEnded(retVal);
}

result_t HttpResponse::clear()
{
m_message = new HttpMessage(true);
Expand Down
8 changes: 7 additions & 1 deletion fibjs/src/mq/Chain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ result_t Chain::invoke(object_base *v, obj_ptr<Handler_base> &retVal,
{
int32_t i;

m_msg = Message_base::getInstance(m_v);

m_array.resize(pThis->m_array.size());
for (i = 0; i < (int32_t) pThis->m_array.size(); i ++)
m_array[i] = pThis->m_array[i];
Expand All @@ -51,8 +53,11 @@ result_t Chain::invoke(object_base *v, obj_ptr<Handler_base> &retVal,
static int32_t invoke(AsyncState *pState, int32_t n)
{
asyncInvoke *pThis = (asyncInvoke *) pState;
bool end = false;

if (pThis->m_pos == (int32_t) pThis->m_array.size())
if (pThis->m_msg)
pThis->m_msg->isEnded(end);
if (end || (pThis->m_pos == (int32_t) pThis->m_array.size()))
return pThis->done(CALL_RETURN_NULL);

pThis->m_pos++;
Expand All @@ -63,6 +68,7 @@ result_t Chain::invoke(object_base *v, obj_ptr<Handler_base> &retVal,
private:
std::vector<obj_ptr<Handler_base> > m_array;
obj_ptr<object_base> m_v;
obj_ptr<Message_base> m_msg;
int32_t m_pos;
};

Expand Down
16 changes: 16 additions & 0 deletions fibjs/src/mq/Message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,24 @@ result_t Message_base::_new(obj_ptr<Message_base> &retVal, v8::Local<v8::Object>
return 0;
}

result_t Message::end()
{
m_end = true;
return 0;
}

result_t Message::isEnded(bool& retVal)
{
retVal = m_end;
if (!m_end && m_response)
m_response->isEnded(retVal);

return 0;
}

result_t Message::clear()
{
m_end = false;
m_params.Release();
m_result.clear();
m_value.clear();
Expand Down
10 changes: 10 additions & 0 deletions fibjs/src/websocket/WebSocketMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ result_t WebSocketMessage::set_lastError(exlib::string newVal)
return m_message->set_lastError(newVal);
}

result_t WebSocketMessage::end()
{
return m_message->end();
}

result_t WebSocketMessage::isEnded(bool& retVal)
{
return m_message->isEnded(retVal);
}

result_t WebSocketMessage::clear()
{
m_message = new Message(m_bRep);
Expand Down
36 changes: 26 additions & 10 deletions test/mq_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,15 @@ describe("mq", () => {
})

describe("chain handler", () => {
it("chain invoke",
() => {
var chain = new mq.Chain([hdlr1, hdlr2,
mq.jsHandler(hdlr3)
]);
it("chain invoke", () => {
var chain = new mq.Chain([hdlr1, hdlr2,
mq.jsHandler(hdlr3)
]);

n = 0;
chain.invoke(v);
assert.equal(7, n);
});
n = 0;
chain.invoke(v);
assert.equal(7, n);
});

it("params", () => {
function chain_params(v, p1, p2) {
Expand All @@ -169,7 +168,6 @@ describe("mq", () => {

it("Message", () => {
var handler = new mq.Chain([

(v) => {
return {};
},
Expand Down Expand Up @@ -198,6 +196,24 @@ describe("mq", () => {
mq.invoke(handler, req);
});

it("end chain", () => {
var handler = new mq.Chain([
(v) => {
return 1;
},
(v) => {
v.end();
},
(v) => {
return 3;
}
]);

var req = new mq.Message();
mq.invoke(handler, req);
assert.equal(1, req.result);
});

it("memory leak", () => {
var svr = new net.TcpServer(8888, () => {});
ss.push(svr.socket);
Expand Down

0 comments on commit b61b9d9

Please sign in to comment.