Skip to content

Commit

Permalink
Update Isolate API.
Browse files Browse the repository at this point in the history
Remove AS_EVENT as priority of ping/kill. It wasn't very usable or predictable.
Make priority consistently named and a named parameter.
Add response object to ping/addExitListener so it doesn't have to send null.
This is useful for cases where you want to use the same receive port for
multiple purposes.

R=sgjesse@google.com

Review URL: https://codereview.chromium.org//1074223002

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@45092 260f80e4-7a28-3924-810f-c04153c831b5
  • Loading branch information
lrhn committed Apr 13, 2015
1 parent 9557f01 commit 1b208bd
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 149 deletions.
18 changes: 11 additions & 7 deletions runtime/lib/isolate_patch.dart
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,13 @@ patch class Isolate {
_sendOOB(controlPort, msg);
}

/* patch */ void addOnExitListener(SendPort responsePort) {
var msg = new List(3)
/* patch */ void addOnExitListener(SendPort responsePort,
{Object response}) {
var msg = new List(4)
..[0] = 0 // Make room for OOB message type.
..[1] = _ADD_EXIT
..[2] = responsePort;
..[2] = responsePort
..[3] = response;
_sendOOB(controlPort, msg);
}

Expand All @@ -396,7 +398,7 @@ patch class Isolate {
_sendOOB(controlPort, msg);
}

/* patch */ void kill([int priority = BEFORE_NEXT_EVENT]) {
/* patch */ void kill({int priority: BEFORE_NEXT_EVENT}) {
var msg = new List(4)
..[0] = 0 // Make room for OOB message type.
..[1] = _KILL
Expand All @@ -405,12 +407,14 @@ patch class Isolate {
_sendOOB(controlPort, msg);
}

/* patch */ void ping(SendPort responsePort, [int pingType = IMMEDIATE]) {
var msg = new List(4)
/* patch */ void ping(SendPort responsePort, {Object response,
int priority: IMMEDIATE}) {
var msg = new List(5)
..[0] = 0 // Make room for OOM message type.
..[1] = _PING
..[2] = responsePort
..[3] = pingType;
..[3] = priority
..[4] = response;
_sendOOB(controlPort, msg);
}

Expand Down
45 changes: 33 additions & 12 deletions runtime/vm/isolate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,18 +202,22 @@ bool IsolateMessageHandler::HandleLibMessage(const Array& message) {
break;
}
case kPingMsg: {
// [ OOB, kPingMsg, responsePort, priority ]
if (message.Length() != 4) return true;
// [ OOB, kPingMsg, responsePort, priority, response ]
if (message.Length() != 5) return true;
const Object& obj2 = Object::Handle(I, message.At(2));
if (!obj2.IsSendPort()) return true;
const SendPort& send_port = SendPort::Cast(obj2);
const Object& obj3 = Object::Handle(I, message.At(3));
if (!obj3.IsSmi()) return true;
const intptr_t priority = Smi::Cast(obj3).Value();
const Object& obj4 = Object::Handle(I, message.At(4));
if (!obj4.IsInstance() && !obj4.IsNull()) return true;
const Instance& response =
obj4.IsNull() ? Instance::null_instance() : Instance::Cast(obj4);
if (priority == kImmediateAction) {
uint8_t* data = NULL;
intptr_t len = 0;
SerializeObject(Object::null_instance(), &data, &len, false);
SerializeObject(response, &data, &len, false);
PortMap::PostMessage(new Message(send_port.Id(),
data, len,
Message::kNormalPriority));
Expand Down Expand Up @@ -268,21 +272,31 @@ bool IsolateMessageHandler::HandleLibMessage(const Array& message) {
case kAddErrorMsg:
case kDelErrorMsg: {
// [ OOB, msg, listener port ]
if (message.Length() != 3) return true;
if (message.Length() < 3) return true;
const Object& obj = Object::Handle(I, message.At(2));
if (!obj.IsSendPort()) return true;
const SendPort& listener = SendPort::Cast(obj);
switch (msg_type) {
case kAddExitMsg:
I->AddExitListener(listener);
case kAddExitMsg: {
if (message.Length() != 4) return true;
// [ OOB, msg, listener port, response object ]
const Object& response = Object::Handle(I, message.At(3));
if (!response.IsInstance() && !response.IsNull()) return true;
I->AddExitListener(listener,
response.IsNull() ? Instance::null_instance()
: Instance::Cast(response));
break;
}
case kDelExitMsg:
if (message.Length() != 3) return true;
I->RemoveExitListener(listener);
break;
case kAddErrorMsg:
if (message.Length() != 3) return true;
I->AddErrorListener(listener);
break;
case kDelErrorMsg:
if (message.Length() != 3) return true;
I->RemoveErrorListener(listener);
break;
default:
Expand Down Expand Up @@ -1002,21 +1016,23 @@ bool Isolate::RemoveResumeCapability(const Capability& capability) {

// TODO(iposva): Remove duplicated code and start using some hash based
// structure instead of these linear lookups.
void Isolate::AddExitListener(const SendPort& listener) {
void Isolate::AddExitListener(const SendPort& listener,
const Instance& response) {
// Ensure a limit for the number of listeners remembered.
static const intptr_t kMaxListeners = kSmiMax / (6 * kWordSize);
static const intptr_t kMaxListeners = kSmiMax / (12 * kWordSize);

const GrowableObjectArray& listeners = GrowableObjectArray::Handle(
this, object_store()->exit_listeners());
SendPort& current = SendPort::Handle(this);
intptr_t insertion_index = -1;
for (intptr_t i = 0; i < listeners.Length(); i++) {
for (intptr_t i = 0; i < listeners.Length(); i += 2) {
current ^= listeners.At(i);
if (current.IsNull()) {
if (insertion_index < 0) {
insertion_index = i;
}
} else if (current.Id() == listener.Id()) {
listeners.SetAt(i + 1, response);
return;
}
}
Expand All @@ -1028,8 +1044,10 @@ void Isolate::AddExitListener(const SendPort& listener) {
return;
}
listeners.Add(listener);
listeners.Add(response);
} else {
listeners.SetAt(insertion_index, listener);
listeners.SetAt(insertion_index + 1, response);
}
}

Expand All @@ -1038,12 +1056,13 @@ void Isolate::RemoveExitListener(const SendPort& listener) {
const GrowableObjectArray& listeners = GrowableObjectArray::Handle(
this, object_store()->exit_listeners());
SendPort& current = SendPort::Handle(this);
for (intptr_t i = 0; i < listeners.Length(); i++) {
for (intptr_t i = 0; i < listeners.Length(); i += 2) {
current ^= listeners.At(i);
if (!current.IsNull() && (current.Id() == listener.Id())) {
// Remove the matching listener from the list.
current = SendPort::null();
listeners.SetAt(i, current);
listeners.SetAt(i + 1, Object::null_instance());
return;
}
}
Expand All @@ -1056,13 +1075,15 @@ void Isolate::NotifyExitListeners() {
if (listeners.IsNull()) return;

SendPort& listener = SendPort::Handle(this);
for (intptr_t i = 0; i < listeners.Length(); i++) {
Instance& response = Instance::Handle(this);
for (intptr_t i = 0; i < listeners.Length(); i += 2) {
listener ^= listeners.At(i);
if (!listener.IsNull()) {
Dart_Port port_id = listener.Id();
uint8_t* data = NULL;
intptr_t len = 0;
SerializeObject(Object::null_instance(), &data, &len, false);
response ^= listeners.At(i + 1);
SerializeObject(response, &data, &len, false);
Message* msg = new Message(port_id, data, len, Message::kNormalPriority);
PortMap::PostMessage(msg);
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/vm/isolate.h
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ class Isolate : public BaseIsolate {
bool AddResumeCapability(const Capability& capability);
bool RemoveResumeCapability(const Capability& capability);

void AddExitListener(const SendPort& listener);
void AddExitListener(const SendPort& listener, const Instance& response);
void RemoveExitListener(const SendPort& listener);
void NotifyExitListeners();

Expand Down
34 changes: 11 additions & 23 deletions sdk/lib/_internal/compiler/js_lib/isolate_helper.dart
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ class _IsolateContext implements IsolateContext {
List<_IsolateEvent> delayedEvents = [];
Set<Capability> pauseTokens = new Set();

// Container with the "on exit" handler send-ports.
// Container with the "on exit" handler send-ports and responses.
var doneHandlers;

/**
Expand Down Expand Up @@ -355,14 +355,12 @@ class _IsolateContext implements IsolateContext {
_updateGlobalState();
}

void addDoneListener(SendPort responsePort) {
void addDoneListener(SendPort responsePort, Object response) {
if (doneHandlers == null) {
doneHandlers = [];
// TODO(lrn): Use map optimized for few keys.
doneHandlers = new HashMap();
}
// If necessary, we can switch doneHandlers to a Set if it gets larger.
// That is not expected to happen in practice.
if (doneHandlers.contains(responsePort)) return;
doneHandlers.add(responsePort);
doneHandlers[responsePort] = response;
}

void removeDoneListener(SendPort responsePort) {
Expand All @@ -375,18 +373,14 @@ class _IsolateContext implements IsolateContext {
this.errorsAreFatal = errorsAreFatal;
}

void handlePing(SendPort responsePort, int pingType) {
void handlePing(SendPort responsePort, int pingType, Object response) {
if (pingType == Isolate.IMMEDIATE ||
(pingType == Isolate.BEFORE_NEXT_EVENT &&
!_isExecutingEvent)) {
responsePort.send(null);
return;
}
void respond() { responsePort.send(null); }
if (pingType == Isolate.AS_EVENT) {
_globalState.topEventLoop.enqueue(this, respond, "ping");
responsePort.send(response);
return;
}
void respond() { responsePort.send(response); }
assert(pingType == Isolate.BEFORE_NEXT_EVENT);
if (_scheduledControlEvents == null) {
_scheduledControlEvents = new Queue();
Expand All @@ -402,10 +396,6 @@ class _IsolateContext implements IsolateContext {
kill();
return;
}
if (priority == Isolate.AS_EVENT) {
_globalState.topEventLoop.enqueue(this, kill, "kill");
return;
}
assert(priority == Isolate.BEFORE_NEXT_EVENT);
if (_scheduledControlEvents == null) {
_scheduledControlEvents = new Queue();
Expand Down Expand Up @@ -499,7 +489,7 @@ class _IsolateContext implements IsolateContext {
removePause(message[1]);
break;
case 'add-ondone':
addDoneListener(message[1]);
addDoneListener(message[1], message[2]);
break;
case 'remove-ondone':
removeDoneListener(message[1]);
Expand All @@ -508,7 +498,7 @@ class _IsolateContext implements IsolateContext {
setErrorsFatal(message[1], message[2]);
break;
case "ping":
handlePing(message[1], message[2]);
handlePing(message[1], message[2], message[3]);
break;
case "kill":
handleKill(message[1], message[2]);
Expand Down Expand Up @@ -574,9 +564,7 @@ class _IsolateContext implements IsolateContext {
_globalState.isolates.remove(id); // indicate this isolate is not active
errorPorts.clear();
if (doneHandlers != null) {
for (SendPort port in doneHandlers) {
port.send(null);
}
doneHandlers.forEach((port, response) { port.send(response); });
doneHandlers = null;
}
}
Expand Down
17 changes: 10 additions & 7 deletions sdk/lib/_internal/compiler/js_lib/isolate_patch.dart
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ class Isolate {
}

@patch
void addOnExitListener(SendPort responsePort) {
void addOnExitListener(SendPort responsePort, {Object response}) {
// TODO(lrn): Can we have an internal method that checks if the receiving
// isolate of a SendPort is still alive?
var message = new List(2)
var message = new List(3)
..[0] = "add-ondone"
..[1] = responsePort;
..[1] = responsePort
..[2] = response;
controlPort.send(message);
}

Expand All @@ -103,16 +104,18 @@ class Isolate {
}

@patch
void kill([int priority = BEFORE_NEXT_EVENT]) {
void kill({int priority: BEFORE_NEXT_EVENT}) {
controlPort.send(["kill", terminateCapability, priority]);
}

@patch
void ping(SendPort responsePort, [int pingType = IMMEDIATE]) {
var message = new List(3)
void ping(SendPort responsePort, {Object response,
int priority: IMMEDIATE}) {
var message = new List(4)
..[0] = "ping"
..[1] = responsePort
..[2] = pingType;
..[2] = priority
..[3] = response;
controlPort.send(message);
}

Expand Down
Loading

0 comments on commit 1b208bd

Please sign in to comment.