Skip to content

Commit

Permalink
Deflake socket reads on Fuchsia
Browse files Browse the repository at this point in the history
Handles a case where a socket may be signalled with 0 available bytes to read,
where we would never resubscribe to the socket for reading.

Change-Id: If9662873ac862bee749ccc7f0ce4b4370b639680
Bug: https://fuchsia.atlassian.net/browse/DX-710
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/97783
Commit-Queue: Zach Anderson <zra@google.com>
Auto-Submit: Ross Wang <rosswang@google.com>
Reviewed-by: Zach Anderson <zra@google.com>
  • Loading branch information
AsturaPhoenix authored and commit-bot@chromium.org committed Mar 26, 2019
1 parent 45c367d commit 0e48504
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 10 deletions.
28 changes: 22 additions & 6 deletions runtime/bin/eventhandler_fuchsia.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
#include "platform/utils.h"

// The EventHandler for Fuchsia uses its "ports v2" API:
// https://fuchsia.googlesource.com/zircon/+/HEAD/docs/syscalls/port_create.md
// https://fuchsia.googlesource.com/fuchsia/+/HEAD/zircon/docs/syscalls/port_create.md
// This API does not have epoll()-like edge triggering (EPOLLET). Since clients
// of the EventHandler expect edge-triggered notifications, we must simulate it.
// When a packet from zx_port_wait() indicates that a signal is asserted for a
Expand All @@ -45,7 +45,7 @@
// 4. Some time later the Dart thread actually does a write().
// 5. After writing, the Dart thread resubscribes to write events.
//
// We use he same procedure for ZX_SOCKET_READABLE, and read()/accept().
// We use the same procedure for ZX_SOCKET_READABLE, and read()/accept().

// define EVENTHANDLER_LOG_ERROR to get log messages only for errors.
// define EVENTHANDLER_LOG_INFO to get log messages for both information and
Expand Down Expand Up @@ -267,7 +267,23 @@ intptr_t IOHandle::ToggleEvents(intptr_t event_mask) {
LOG_INFO("IOHandle::ToggleEvents: fd=%ld de-asserting read\n", fd_);
event_mask = event_mask & ~(1 << kInEvent);
}
if ((event_mask & (1 << kInEvent)) != 0) {
// We may get In events without available bytes, so we must make sure there
// are actually bytes, or we will never resubscribe (due to a short-circuit
// on the Dart side).
//
// This happens due to how packets get enqueued on the port with all signals
// asserted at that time. Sometimes we enqueue a packet due to
// zx_object_wait_async e.g. for POLLOUT (writability) while the socket is
// readable and while we have a Read queued up on the Dart side. This packet
// will also have POLLIN (readable) asserted. We may then perform the Read
// and drain the socket before our zx_port_wait is serviced, at which point
// when we process the packet for POLLOUT with its stale POLLIN (readable)
// signal, the socket is no longer actually readable.
//
// As a detail, negative available bytes (errors) are handled specially; see
// IOHandle::AvailableBytes for more information.
if ((event_mask & (1 << kInEvent)) != 0 &&
FDUtils::AvailableBytes(fd_) != 0) {
LOG_INFO("IOHandle::ToggleEvents: fd = %ld asserting read and disabling\n",
fd_);
read_events_enabled_ = false;
Expand Down Expand Up @@ -468,15 +484,15 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {

void EventHandlerImplementation::HandlePacket(zx_port_packet_t* pkt) {
LOG_INFO("HandlePacket: Got event packet: key=%lx\n", pkt->key);
LOG_INFO("HandlePacket: Got event packet: type=%lx\n", pkt->type);
LOG_INFO("HandlePacket: Got event packet: status=%ld\n", pkt->status);
LOG_INFO("HandlePacket: Got event packet: type=%x\n", pkt->type);
LOG_INFO("HandlePacket: Got event packet: status=%d\n", pkt->status);
if (pkt->type == ZX_PKT_TYPE_USER) {
ASSERT(pkt->key == kInterruptPacketKey);
InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt->user);
HandleInterrupt(msg);
return;
}
LOG_INFO("HandlePacket: Got event packet: observed = %lx\n",
LOG_INFO("HandlePacket: Got event packet: observed = %x\n",
pkt->signal.observed);
LOG_INFO("HandlePacket: Got event packet: count = %ld\n", pkt->signal.count);

Expand Down
9 changes: 5 additions & 4 deletions runtime/bin/socket_fuchsia.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "bin/eventhandler.h"
#include "bin/fdutils.h"
#include "bin/log.h"
#include "platform/signal_blocker.h"

// #define SOCKET_LOG_INFO 1
Expand All @@ -19,6 +20,7 @@
// define SOCKET_LOG_ERROR to get log messages only for errors.
// define SOCKET_LOG_INFO to get log messages for both information and errors.
#if defined(SOCKET_LOG_INFO) || defined(SOCKET_LOG_ERROR)

#define LOG_ERR(msg, ...) \
{ \
int err = errno; \
Expand Down Expand Up @@ -204,7 +206,7 @@ intptr_t ServerSocket::Accept(intptr_t fd) {
intptr_t socket;
struct sockaddr clientaddr;
socklen_t addrlen = sizeof(clientaddr);
LOG_INFO("ServerSocket::Accept: calling accept(%ld)\n", listen_fd);
LOG_INFO("ServerSocket::Accept: calling accept(%ld)\n", fd);
socket = listen_handle->Accept(&clientaddr, &addrlen);
if (socket == -1) {
if (IsTemporaryAcceptError(errno)) {
Expand All @@ -214,12 +216,11 @@ intptr_t ServerSocket::Accept(intptr_t fd) {
ASSERT(kTemporaryFailure != -1);
socket = kTemporaryFailure;
} else {
LOG_ERR("ServerSocket::Accept: accept(%ld) failed\n", listen_fd);
LOG_ERR("ServerSocket::Accept: accept(%ld) failed\n", fd);
}
} else {
IOHandle* io_handle = new IOHandle(socket);
LOG_INFO("ServerSocket::Accept: accept(%ld) -> socket %ld\n", listen_fd,
socket);
LOG_INFO("ServerSocket::Accept: accept(%ld) -> socket %ld\n", fd, socket);
if (!FDUtils::SetCloseOnExec(socket)) {
LOG_ERR("FDUtils::SetCloseOnExec(%ld) failed\n", socket);
FDUtils::SaveErrorAndClose(socket);
Expand Down

0 comments on commit 0e48504

Please sign in to comment.