diff --git a/runtime/lib/isolate_patch.dart b/runtime/lib/isolate_patch.dart index cd71ced68f43..d05c51a702f0 100644 --- a/runtime/lib/isolate_patch.dart +++ b/runtime/lib/isolate_patch.dart @@ -371,11 +371,13 @@ patch class Isolate { _sendOOB(controlPort, msg); } - /* patch */ void addOnExitListener(SendPort responsePort) { - var msg = new List(3) + /* patch */ void addOnExitListener(SendPort responsePort, + {Object response}) { + var msg = new List(4) ..[0] = 0 // Make room for OOB message type. ..[1] = _ADD_EXIT - ..[2] = responsePort; + ..[2] = responsePort + ..[3] = response; _sendOOB(controlPort, msg); } @@ -396,7 +398,7 @@ patch class Isolate { _sendOOB(controlPort, msg); } - /* patch */ void kill([int priority = BEFORE_NEXT_EVENT]) { + /* patch */ void kill({int priority: BEFORE_NEXT_EVENT}) { var msg = new List(4) ..[0] = 0 // Make room for OOB message type. ..[1] = _KILL @@ -405,12 +407,14 @@ patch class Isolate { _sendOOB(controlPort, msg); } - /* patch */ void ping(SendPort responsePort, [int pingType = IMMEDIATE]) { - var msg = new List(4) + /* patch */ void ping(SendPort responsePort, {Object response, + int priority: IMMEDIATE}) { + var msg = new List(5) ..[0] = 0 // Make room for OOM message type. ..[1] = _PING ..[2] = responsePort - ..[3] = pingType; + ..[3] = priority + ..[4] = response; _sendOOB(controlPort, msg); } diff --git a/runtime/vm/isolate.cc b/runtime/vm/isolate.cc index abc53430c65a..cb5da450520e 100644 --- a/runtime/vm/isolate.cc +++ b/runtime/vm/isolate.cc @@ -202,18 +202,22 @@ bool IsolateMessageHandler::HandleLibMessage(const Array& message) { break; } case kPingMsg: { - // [ OOB, kPingMsg, responsePort, priority ] - if (message.Length() != 4) return true; + // [ OOB, kPingMsg, responsePort, priority, response ] + if (message.Length() != 5) return true; const Object& obj2 = Object::Handle(I, message.At(2)); if (!obj2.IsSendPort()) return true; const SendPort& send_port = SendPort::Cast(obj2); const Object& obj3 = Object::Handle(I, message.At(3)); if (!obj3.IsSmi()) return true; const intptr_t priority = Smi::Cast(obj3).Value(); + const Object& obj4 = Object::Handle(I, message.At(4)); + if (!obj4.IsInstance() && !obj4.IsNull()) return true; + const Instance& response = + obj4.IsNull() ? Instance::null_instance() : Instance::Cast(obj4); if (priority == kImmediateAction) { uint8_t* data = NULL; intptr_t len = 0; - SerializeObject(Object::null_instance(), &data, &len, false); + SerializeObject(response, &data, &len, false); PortMap::PostMessage(new Message(send_port.Id(), data, len, Message::kNormalPriority)); @@ -268,21 +272,31 @@ bool IsolateMessageHandler::HandleLibMessage(const Array& message) { case kAddErrorMsg: case kDelErrorMsg: { // [ OOB, msg, listener port ] - if (message.Length() != 3) return true; + if (message.Length() < 3) return true; const Object& obj = Object::Handle(I, message.At(2)); if (!obj.IsSendPort()) return true; const SendPort& listener = SendPort::Cast(obj); switch (msg_type) { - case kAddExitMsg: - I->AddExitListener(listener); + case kAddExitMsg: { + if (message.Length() != 4) return true; + // [ OOB, msg, listener port, response object ] + const Object& response = Object::Handle(I, message.At(3)); + if (!response.IsInstance() && !response.IsNull()) return true; + I->AddExitListener(listener, + response.IsNull() ? Instance::null_instance() + : Instance::Cast(response)); break; + } case kDelExitMsg: + if (message.Length() != 3) return true; I->RemoveExitListener(listener); break; case kAddErrorMsg: + if (message.Length() != 3) return true; I->AddErrorListener(listener); break; case kDelErrorMsg: + if (message.Length() != 3) return true; I->RemoveErrorListener(listener); break; default: @@ -1002,21 +1016,23 @@ bool Isolate::RemoveResumeCapability(const Capability& capability) { // TODO(iposva): Remove duplicated code and start using some hash based // structure instead of these linear lookups. -void Isolate::AddExitListener(const SendPort& listener) { +void Isolate::AddExitListener(const SendPort& listener, + const Instance& response) { // Ensure a limit for the number of listeners remembered. - static const intptr_t kMaxListeners = kSmiMax / (6 * kWordSize); + static const intptr_t kMaxListeners = kSmiMax / (12 * kWordSize); const GrowableObjectArray& listeners = GrowableObjectArray::Handle( this, object_store()->exit_listeners()); SendPort& current = SendPort::Handle(this); intptr_t insertion_index = -1; - for (intptr_t i = 0; i < listeners.Length(); i++) { + for (intptr_t i = 0; i < listeners.Length(); i += 2) { current ^= listeners.At(i); if (current.IsNull()) { if (insertion_index < 0) { insertion_index = i; } } else if (current.Id() == listener.Id()) { + listeners.SetAt(i + 1, response); return; } } @@ -1028,8 +1044,10 @@ void Isolate::AddExitListener(const SendPort& listener) { return; } listeners.Add(listener); + listeners.Add(response); } else { listeners.SetAt(insertion_index, listener); + listeners.SetAt(insertion_index + 1, response); } } @@ -1038,12 +1056,13 @@ void Isolate::RemoveExitListener(const SendPort& listener) { const GrowableObjectArray& listeners = GrowableObjectArray::Handle( this, object_store()->exit_listeners()); SendPort& current = SendPort::Handle(this); - for (intptr_t i = 0; i < listeners.Length(); i++) { + for (intptr_t i = 0; i < listeners.Length(); i += 2) { current ^= listeners.At(i); if (!current.IsNull() && (current.Id() == listener.Id())) { // Remove the matching listener from the list. current = SendPort::null(); listeners.SetAt(i, current); + listeners.SetAt(i + 1, Object::null_instance()); return; } } @@ -1056,13 +1075,15 @@ void Isolate::NotifyExitListeners() { if (listeners.IsNull()) return; SendPort& listener = SendPort::Handle(this); - for (intptr_t i = 0; i < listeners.Length(); i++) { + Instance& response = Instance::Handle(this); + for (intptr_t i = 0; i < listeners.Length(); i += 2) { listener ^= listeners.At(i); if (!listener.IsNull()) { Dart_Port port_id = listener.Id(); uint8_t* data = NULL; intptr_t len = 0; - SerializeObject(Object::null_instance(), &data, &len, false); + response ^= listeners.At(i + 1); + SerializeObject(response, &data, &len, false); Message* msg = new Message(port_id, data, len, Message::kNormalPriority); PortMap::PostMessage(msg); } diff --git a/runtime/vm/isolate.h b/runtime/vm/isolate.h index beafd9637cd9..9396217d1eec 100644 --- a/runtime/vm/isolate.h +++ b/runtime/vm/isolate.h @@ -421,7 +421,7 @@ class Isolate : public BaseIsolate { bool AddResumeCapability(const Capability& capability); bool RemoveResumeCapability(const Capability& capability); - void AddExitListener(const SendPort& listener); + void AddExitListener(const SendPort& listener, const Instance& response); void RemoveExitListener(const SendPort& listener); void NotifyExitListeners(); diff --git a/sdk/lib/_internal/compiler/js_lib/isolate_helper.dart b/sdk/lib/_internal/compiler/js_lib/isolate_helper.dart index 9d36be556b0a..6f3e3611ded9 100644 --- a/sdk/lib/_internal/compiler/js_lib/isolate_helper.dart +++ b/sdk/lib/_internal/compiler/js_lib/isolate_helper.dart @@ -311,7 +311,7 @@ class _IsolateContext implements IsolateContext { List<_IsolateEvent> delayedEvents = []; Set pauseTokens = new Set(); - // Container with the "on exit" handler send-ports. + // Container with the "on exit" handler send-ports and responses. var doneHandlers; /** @@ -355,14 +355,12 @@ class _IsolateContext implements IsolateContext { _updateGlobalState(); } - void addDoneListener(SendPort responsePort) { + void addDoneListener(SendPort responsePort, Object response) { if (doneHandlers == null) { - doneHandlers = []; + // TODO(lrn): Use map optimized for few keys. + doneHandlers = new HashMap(); } - // If necessary, we can switch doneHandlers to a Set if it gets larger. - // That is not expected to happen in practice. - if (doneHandlers.contains(responsePort)) return; - doneHandlers.add(responsePort); + doneHandlers[responsePort] = response; } void removeDoneListener(SendPort responsePort) { @@ -375,18 +373,14 @@ class _IsolateContext implements IsolateContext { this.errorsAreFatal = errorsAreFatal; } - void handlePing(SendPort responsePort, int pingType) { + void handlePing(SendPort responsePort, int pingType, Object response) { if (pingType == Isolate.IMMEDIATE || (pingType == Isolate.BEFORE_NEXT_EVENT && !_isExecutingEvent)) { - responsePort.send(null); - return; - } - void respond() { responsePort.send(null); } - if (pingType == Isolate.AS_EVENT) { - _globalState.topEventLoop.enqueue(this, respond, "ping"); + responsePort.send(response); return; } + void respond() { responsePort.send(response); } assert(pingType == Isolate.BEFORE_NEXT_EVENT); if (_scheduledControlEvents == null) { _scheduledControlEvents = new Queue(); @@ -402,10 +396,6 @@ class _IsolateContext implements IsolateContext { kill(); return; } - if (priority == Isolate.AS_EVENT) { - _globalState.topEventLoop.enqueue(this, kill, "kill"); - return; - } assert(priority == Isolate.BEFORE_NEXT_EVENT); if (_scheduledControlEvents == null) { _scheduledControlEvents = new Queue(); @@ -499,7 +489,7 @@ class _IsolateContext implements IsolateContext { removePause(message[1]); break; case 'add-ondone': - addDoneListener(message[1]); + addDoneListener(message[1], message[2]); break; case 'remove-ondone': removeDoneListener(message[1]); @@ -508,7 +498,7 @@ class _IsolateContext implements IsolateContext { setErrorsFatal(message[1], message[2]); break; case "ping": - handlePing(message[1], message[2]); + handlePing(message[1], message[2], message[3]); break; case "kill": handleKill(message[1], message[2]); @@ -574,9 +564,7 @@ class _IsolateContext implements IsolateContext { _globalState.isolates.remove(id); // indicate this isolate is not active errorPorts.clear(); if (doneHandlers != null) { - for (SendPort port in doneHandlers) { - port.send(null); - } + doneHandlers.forEach((port, response) { port.send(response); }); doneHandlers = null; } } diff --git a/sdk/lib/_internal/compiler/js_lib/isolate_patch.dart b/sdk/lib/_internal/compiler/js_lib/isolate_patch.dart index b612cced5694..b01318c713da 100644 --- a/sdk/lib/_internal/compiler/js_lib/isolate_patch.dart +++ b/sdk/lib/_internal/compiler/js_lib/isolate_patch.dart @@ -76,12 +76,13 @@ class Isolate { } @patch - void addOnExitListener(SendPort responsePort) { + void addOnExitListener(SendPort responsePort, {Object response}) { // TODO(lrn): Can we have an internal method that checks if the receiving // isolate of a SendPort is still alive? - var message = new List(2) + var message = new List(3) ..[0] = "add-ondone" - ..[1] = responsePort; + ..[1] = responsePort + ..[2] = response; controlPort.send(message); } @@ -103,16 +104,18 @@ class Isolate { } @patch - void kill([int priority = BEFORE_NEXT_EVENT]) { + void kill({int priority: BEFORE_NEXT_EVENT}) { controlPort.send(["kill", terminateCapability, priority]); } @patch - void ping(SendPort responsePort, [int pingType = IMMEDIATE]) { - var message = new List(3) + void ping(SendPort responsePort, {Object response, + int priority: IMMEDIATE}) { + var message = new List(4) ..[0] = "ping" ..[1] = responsePort - ..[2] = pingType; + ..[2] = priority + ..[3] = response; controlPort.send(message); } diff --git a/sdk/lib/isolate/isolate.dart b/sdk/lib/isolate/isolate.dart index b91ec267a06c..cbea21b5afc5 100644 --- a/sdk/lib/isolate/isolate.dart +++ b/sdk/lib/isolate/isolate.dart @@ -63,8 +63,6 @@ class Isolate { static const int IMMEDIATE = 0; /** Argument to `ping` and `kill`: Ask for action before the next event. */ static const int BEFORE_NEXT_EVENT = 1; - /** Argument to `ping` and `kill`: Ask for action after normal events. */ - static const int AS_EVENT = 2; /** * Control port used to send control messages to the isolate. @@ -259,20 +257,26 @@ class Isolate { external void resume(Capability resumeCapability); /** - * Asks the isolate to send a message on [responsePort] when it terminates. + * Asks the isolate to send [response] on [responsePort] when it terminates. * * WARNING: This method is experimental and not handled on every platform yet. * - * The isolate will send a `null` message on [responsePort] as the last + * The isolate will send a `response` message on `responsePort` as the last * thing before it terminates. It will run no further code after the message * has been sent. * + * Adding the same port more than once will only cause it to receive one + * message, using the last response value that was added. + * * If the isolate is already dead, no message will be sent. + * If `response` cannot be sent to the isolate, then the request is ignored. + * It is recommended to only use simple values that can be sent to all + * isolates, like `null`, booleans, numbers or strings. */ /* TODO(lrn): Can we do better? Can the system recognize this message and * send a reply if the receiving isolate is dead? */ - external void addOnExitListener(SendPort responsePort); + external void addOnExitListener(SendPort responsePort, {Object response}); /** * Stop listening on exit messages from the isolate. @@ -308,8 +312,7 @@ class Isolate { * The isolate is requested to terminate itself. * The [priority] argument specifies when this must happen. * - * The [priority] must be one of [IMMEDIATE], [BEFORE_NEXT_EVENT] or - * [AS_EVENT]. + * The [priority] must be one of [IMMEDIATE] or [BEFORE_NEXT_EVENT]. * The shutdown is performed at different times depending on the priority: * * * `IMMEDIATE`: The the isolate shuts down as soon as possible. @@ -323,44 +326,35 @@ class Isolate { * control returns to the event loop of the receiving isolate, * after the current event, and any already scheduled control events, * are completed. - * * `AS_EVENT`: The shutdown does not happen until all prevously sent - * non-control messages from the current isolate to the receiving isolate - * have been processed. - * The kill operation effectively puts the shutdown into the normal event - * queue after previously sent messages, and it is affected by any control - * messages that affect normal events, including `pause`. - * This can be used to wait for a another event to be processed. */ - external void kill([int priority = BEFORE_NEXT_EVENT]); + external void kill({int priority: BEFORE_NEXT_EVENT}); /** - * Request that the isolate send a response on the [responsePort]. + * Request that the isolate send [response] on the [responsePort]. * * WARNING: This method is experimental and not handled on every platform yet. * - * If the isolate is alive, it will eventually send a `null` response on - * the response port. + * If the isolate is alive, it will eventually send `response` + * (defaulting to `null`) on the response port. * - * The [pingType] must be one of [IMMEDIATE], [BEFORE_NEXT_EVENT] or - * [AS_EVENT]. + * The [priority] must be one of [IMMEDIATE] or [BEFORE_NEXT_EVENT]. * The response is sent at different times depending on the ping type: * * * `IMMEDIATE`: The the isolate responds as soon as it receives the * control message. This is after any previous control message - * from the same isolate has been received. + * from the same isolate has been received, but may be during + * execution of another event. * * `BEFORE_NEXT_EVENT`: The response is scheduled for the next time * control returns to the event loop of the receiving isolate, * after the current event, and any already scheduled control events, * are completed. - * * `AS_EVENT`: The response is not sent until all prevously sent - * non-control messages from the current isolate to the receiving isolate - * have been processed. - * The ping effectively puts the response into the normal event queue - * after previously sent messages, and it is affected by any control - * messages that affect normal events, including `pause`. - * This can be used to wait for a another event to be processed. + * + * If `response` cannot be sent to the isolate, then the request is ignored. + * It is recommended to only use simple values that can be sent to all + * isolates, like `null`, booleans, numbers or strings. */ - external void ping(SendPort responsePort, [int pingType = IMMEDIATE]); + external void ping(SendPort responsePort, {Object response, + int priority: IMMEDIATE}); /** * Requests that uncaught errors of the isolate are sent back to [port]. diff --git a/tests/isolate/kill2_test.dart b/tests/isolate/kill2_test.dart index 911cd698e998..4fb317190f30 100644 --- a/tests/isolate/kill2_test.dart +++ b/tests/isolate/kill2_test.dart @@ -26,7 +26,7 @@ void main() { reply.handler = (v) { result.add(v); if (v == 2) { - isolate.kill(Isolate.BEFORE_NEXT_EVENT); + isolate.kill(priority: Isolate.BEFORE_NEXT_EVENT); } echoPort.send(v - 1); }; diff --git a/tests/isolate/kill3_test.dart b/tests/isolate/kill3_test.dart deleted file mode 100644 index 006f7aee9b4d..000000000000 --- a/tests/isolate/kill3_test.dart +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -import "dart:isolate"; -import "dart:async"; -import "package:expect/expect.dart"; -import "package:async_helper/async_helper.dart"; - -isomain1(replyPort) { - RawReceivePort port = new RawReceivePort(); - port.handler = (v) { - replyPort.send(v); - if (v == 0) port.close(); - }; - replyPort.send(port.sendPort); -} - -void main() { - asyncStart(); - var completer = new Completer(); // Completed by first reply from isolate. - RawReceivePort reply = new RawReceivePort(completer.complete); - Isolate.spawn(isomain1, reply.sendPort).then((Isolate isolate) { - List result = []; - completer.future.then((echoPort) { - reply.handler = (v) { - result.add(v); - echoPort.send(v - 1); - if (v == 2) { - isolate.kill(Isolate.AS_EVENT); - } - }; - RawReceivePort exitSignal; - exitSignal = new RawReceivePort((_) { - Expect.listEquals([4, 3, 2, 1], result); - exitSignal.close(); - reply.close(); - asyncEnd(); - }); - isolate.addOnExitListener(exitSignal.sendPort); - echoPort.send(4); - }); - }); -} diff --git a/tests/isolate/kill_self_test.dart b/tests/isolate/kill_self_test.dart index 932dca6fb38d..cf2a81d4bae8 100644 --- a/tests/isolate/kill_self_test.dart +++ b/tests/isolate/kill_self_test.dart @@ -19,7 +19,7 @@ isomain1(replyPort) { firstEvent = false; var isolate = new Isolate(controlPort, terminateCapability: killCapability); - isolate.kill(Isolate.IMMEDIATE); + isolate.kill(priority: Isolate.IMMEDIATE); }; replyPort.send(port.sendPort); } diff --git a/tests/isolate/kill_test.dart b/tests/isolate/kill_test.dart index d0da5abc6c29..0a0ba85d2def 100644 --- a/tests/isolate/kill_test.dart +++ b/tests/isolate/kill_test.dart @@ -26,7 +26,7 @@ void main() { reply.handler = (v) { result.add(v); if (v == 2) { - isolate.kill(Isolate.IMMEDIATE); + isolate.kill(priority: Isolate.IMMEDIATE); } echoPort.send(v - 1); }; diff --git a/tests/isolate/ondone_test.dart b/tests/isolate/ondone_test.dart index af990ad31e8e..ec0007cab06d 100644 --- a/tests/isolate/ondone_test.dart +++ b/tests/isolate/ondone_test.dart @@ -23,6 +23,7 @@ void isomain(SendPort replyPort) { void main() { testExit(); testCancelExit(); + testOverrideResponse(); } void testExit() { @@ -31,14 +32,15 @@ void testExit() { var completer = new Completer(); // Completed by first reply from isolate. RawReceivePort reply = new RawReceivePort(completer.complete); RawReceivePort onExitPort; - onExitPort = new RawReceivePort((_) { + onExitPort = new RawReceivePort((v) { + if (v != "RESPONSE") throw "WRONG RESPONSE: $v"; reply.close(); onExitPort.close(); if (!mayComplete) throw "COMPLETED EARLY"; asyncEnd(); }); Isolate.spawn(isomain, reply.sendPort).then((Isolate isolate) { - isolate.addOnExitListener(onExitPort.sendPort); + isolate.addOnExitListener(onExitPort.sendPort, response: "RESPONSE"); return completer.future; }).then((echoPort) { int counter = 4; @@ -92,3 +94,33 @@ void testCancelExit() { }); }); } + +void testOverrideResponse() { + bool mayComplete = false; + asyncStart(); + var completer = new Completer(); // Completed by first reply from isolate. + RawReceivePort reply = new RawReceivePort(completer.complete); + RawReceivePort onExitPort; + onExitPort = new RawReceivePort((v) { + if (v != "RESPONSE2") throw "WRONG RESPONSE: $v"; + reply.close(); + onExitPort.close(); + if (!mayComplete) throw "COMPLETED EARLY"; + asyncEnd(); + }); + Isolate.spawn(isomain, reply.sendPort).then((Isolate isolate) { + isolate.addOnExitListener(onExitPort.sendPort, response: "RESPONSE"); + isolate.addOnExitListener(onExitPort.sendPort, response: "RESPONSE2"); + return completer.future; + }).then((echoPort) { + int counter = 4; + reply.handler = (v) { + if (v != counter) throw "WRONG REPLY"; + if (v == 0) throw "REPLY INSTEAD OF SHUTDOWN"; + counter--; + mayComplete = (counter == 0); + echoPort.send(counter); + }; + echoPort.send(counter); + }); +} diff --git a/tests/isolate/ping_pause_test.dart b/tests/isolate/ping_pause_test.dart index ec47f7afb612..1515e982b489 100644 --- a/tests/isolate/ping_pause_test.dart +++ b/tests/isolate/ping_pause_test.dart @@ -41,7 +41,7 @@ void main() { isolate.resume(resume); pingPort.close(); }; - isolate.ping(pingPort.sendPort, Isolate.BEFORE_NEXT_EVENT); + isolate.ping(pingPort.sendPort, priority: Isolate.BEFORE_NEXT_EVENT); echoPort.send(2); echoPort.send(1); }); diff --git a/tests/isolate/ping_test.dart b/tests/isolate/ping_test.dart index 48b1ec5b9553..ee726127e973 100644 --- a/tests/isolate/ping_test.dart +++ b/tests/isolate/ping_test.dart @@ -26,37 +26,34 @@ void main(){ reply.handler = (v) { result.add(v); if (v == 0) { - Expect.listEquals(["alive", "control", "event"], + Expect.listEquals(["alive", "control"], result.where((x) => x is String).toList(), "control events"); - Expect.listEquals([4, 3, 2, 1, 0], + Expect.listEquals([3, 2, 1, 0], result.where((x) => x is int).toList(), "data events"); - Expect.isTrue(result.indexOf("alive") < result.indexOf(3), - "alive index < 3"); - Expect.isTrue(result.indexOf("control") < result.indexOf(2), - "control index < 2"); - int eventIndex = result.indexOf("event"); - Expect.isTrue(eventIndex > result.indexOf(2), "event index > 2"); - Expect.isTrue(eventIndex < result.indexOf(1), "event index < 1"); + Expect.isTrue(result.indexOf("alive") < result.indexOf(2), + "alive index < 2"); + Expect.isTrue(result.indexOf("control") < result.indexOf(1), + "control index < 1"); reply.close(); asyncEnd(); } }; - SendPort createPingPort(message) { - var pingPort = new RawReceivePort(); - pingPort.handler = (_) { - result.add(message); - pingPort.close(); - }; - return pingPort.sendPort; + var pingPort = new RawReceivePort(); + int pingCount = 0; + pingPort.handler = (response) { + result.add(response); + pingCount++; + if (pingCount == 2) pingPort.close(); + }; + ping(message, priority) { + isolate.ping(pingPort.sendPort, response: message, priority: priority); } - echoPort.send(4); - isolate.ping(createPingPort("alive"), Isolate.IMMEDIATE); echoPort.send(3); - isolate.ping(createPingPort("control"), Isolate.BEFORE_NEXT_EVENT); + ping("alive", Isolate.IMMEDIATE); echoPort.send(2); - isolate.ping(createPingPort("event"), Isolate.AS_EVENT); + ping("control", Isolate.BEFORE_NEXT_EVENT); echoPort.send(1); echoPort.send(0); });