Skip to content

Commit

Permalink
Bug fix #9: util/event_loop_apple_cf.cpp: Defer completion of connect…
Browse files Browse the repository at this point in the history
… operation even if it fails immediatly during EventHandler::async_connect()
  • Loading branch information
kspangsege committed May 23, 2016
1 parent a686172 commit 1b64a85
Showing 1 changed file with 124 additions and 72 deletions.
196 changes: 124 additions & 72 deletions src/realm/util/event_loop_apple_cf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ ReleaseGuard<CFStringRef> make_cf_string(std::string str)
static_assert(std::is_same<UInt8, char>::value || std::is_same<UInt8, unsigned char>::value,
"Unexpected byte type");
const UInt8* bytes = reinterpret_cast<const UInt8*>(str.data());
Boolean isExternalRepresentation = FALSE;
Boolean is_external_representation = FALSE;
CFStringRef str_2 = CFStringCreateWithBytes(kCFAllocatorDefault, bytes, str.size(),
kCFStringEncodingUTF8, isExternalRepresentation);
kCFStringEncodingUTF8, is_external_representation);
if (!str_2)
throw std::bad_alloc();
return ReleaseGuard<CFStringRef>(str_2);
Expand Down Expand Up @@ -555,19 +555,19 @@ class SocketImpl: public Socket {
REALM_ASSERT(!m_is_connected);
REALM_ASSERT(!m_read_oper->in_progress() && !m_write_oper->in_progress());
if (m_connect_oper->is_incomplete())
on_connect_complete();
complete_connect();
m_connect_oper->cancel();
discard_streams();
return;
}
if (m_read_oper->in_progress()) {
if (m_read_oper->is_incomplete())
on_read_complete();
complete_read();
m_read_oper->cancel();
}
if (m_write_oper->in_progress()) {
if (m_write_oper->is_incomplete())
on_write_complete();
complete_write();
m_write_oper->cancel();
}
}
Expand Down Expand Up @@ -644,41 +644,87 @@ class SocketImpl: public Socket {
REALM_ASSERT(!m_read_stream && !m_write_stream);
REALM_ASSERT(!m_read_oper->in_progress() && !m_write_oper->in_progress());

ReleaseGuard<CFStringRef> host_2 = make_cf_string(std::move(host)); // Throws
UInt32 port_2 = UInt32(port);
CFReadStreamRef read_stream;
CFWriteStreamRef write_stream;
CFStreamCreatePairWithSocketToHost(kCFAllocatorDefault, host_2.get(), port_2,
&read_stream, &write_stream);
ReleaseGuard<CFReadStreamRef> read_stream_2(read_stream);
ReleaseGuard<CFWriteStreamRef> write_stream_2(write_stream);
if (!read_stream_2 || !write_stream_2)
ReleaseGuard<CFReadStreamRef> read_stream;
ReleaseGuard<CFWriteStreamRef> write_stream;
{
ReleaseGuard<CFStringRef> host_2 = make_cf_string(std::move(host)); // Throws
UInt32 port_2 = UInt32(port);
CFReadStreamRef read_stream_2;
CFWriteStreamRef write_stream_2;
CFStreamCreatePairWithSocketToHost(kCFAllocatorDefault, host_2.get(), port_2,
&read_stream_2, &write_stream_2);
read_stream.reset(read_stream_2);
write_stream.reset(write_stream_2);
}
if (!read_stream || !write_stream)
throw std::bad_alloc();

set_security_level(read_stream_2.get(), write_stream_2.get(), security); // Throws
set_io_callbacks(read_stream_2.get(), write_stream_2.get()); // Throws

Boolean success_1 = CFReadStreamOpen(read_stream_2.get());
Boolean success_2 = CFWriteStreamOpen(write_stream_2.get());
if (!success_1 || !success_2) {
if (success_1)
CFReadStreamClose(read_stream_2.get());
if (success_2)
CFWriteStreamClose(write_stream_2.get());
throw std::runtime_error("Failed to open socket streams");
set_security_level(read_stream.get(), write_stream.get(), security); // Throws
set_io_callbacks(read_stream.get(), write_stream.get()); // Throws

std::error_code ec;
{
Boolean success_1 = CFReadStreamOpen(read_stream.get());
Boolean success_2 = CFWriteStreamOpen(write_stream.get());
if (!success_1 || !success_2) {
auto seh = [&]() noexcept {
if (success_1)
CFReadStreamClose(read_stream.get());
if (success_2)
CFWriteStreamClose(write_stream.get());
read_stream.reset();
write_stream.reset();
};
auto seg = util::make_scope_exit(seh);
CFStreamStatus status_1 = CFReadStreamGetStatus(read_stream.get());
CFStreamStatus status_2 = CFWriteStreamGetStatus(write_stream.get());
if (success_1) {
REALM_ASSERT(status_1 != kCFStreamStatusNotOpen &&
status_1 != kCFStreamStatusError);
}
else {
REALM_ASSERT(status_1 == kCFStreamStatusNotOpen ||
status_1 == kCFStreamStatusError);
if (status_1 != kCFStreamStatusError)
throw std::runtime_error("Failed to open read stream");
}
if (success_2) {
REALM_ASSERT(status_2 != kCFStreamStatusNotOpen &&
status_2 != kCFStreamStatusError);
}
else {
REALM_ASSERT(status_2 == kCFStreamStatusNotOpen ||
status_2 == kCFStreamStatusError);
if (status_2 != kCFStreamStatusError)
throw std::runtime_error("Failed to open write stream");
}
if (!success_1) {
ec = get_error(read_stream.get()); // Throws
}
else {
ec = get_error(write_stream.get()); // Throws
}
if (!ec)
ec = error::unknown;
}
}

m_read_stream = std::move(read_stream_2);
m_write_stream = std::move(write_stream_2);
m_read_stream = std::move(read_stream);
m_write_stream = std::move(write_stream);
m_connect_oper->initiate(std::move(handler));
++m_event_loop.num_operations_in_progress;

if (m_cf_run_loop) {
bool is_complete = bool(ec);
if (is_complete) {
bool not_yet_attached_to_cf_run_loop = true;
complete_connect(ec, not_yet_attached_to_cf_run_loop);
}
else if (m_cf_run_loop) {
CFReadStreamScheduleWithRunLoop(m_read_stream.get(), m_cf_run_loop,
kCFRunLoopDefaultMode);
}

// Discard previopusly buffered input
// Discard previously buffered input
m_read_buffer_begin = m_read_buffer.get();
m_read_buffer_end = m_read_buffer_begin;
}
Expand All @@ -702,7 +748,7 @@ class SocketImpl: public Socket {
bool is_complete = process_buffered_input(ec);
if (is_complete) {
bool not_yet_attached_to_cf_run_loop = true;
on_read_complete(ec, not_yet_attached_to_cf_run_loop);
complete_read(ec, not_yet_attached_to_cf_run_loop);
}
else if (m_cf_run_loop) {
CFReadStreamScheduleWithRunLoop(m_read_stream.get(), m_cf_run_loop,
Expand All @@ -727,7 +773,7 @@ class SocketImpl: public Socket {
bool is_complete = (m_write_curr == m_write_end);
if (is_complete) {
bool not_yet_attached_to_cf_run_loop = true;
on_write_complete(ec, not_yet_attached_to_cf_run_loop);
complete_write(ec, not_yet_attached_to_cf_run_loop);
}
else if (m_cf_run_loop) {
CFWriteStreamScheduleWithRunLoop(m_write_stream.get(), m_cf_run_loop,
Expand Down Expand Up @@ -800,7 +846,7 @@ class SocketImpl: public Socket {
switch (event_type) {
case kCFStreamEventOpenCompleted: {
m_is_connected = true;
on_connect_complete();
complete_connect();
m_event_loop.process_completed_operations(); // Throws
return;
}
Expand All @@ -811,7 +857,7 @@ class SocketImpl: public Socket {
if (REALM_UNLIKELY(n == 0)) {
// Read error
REALM_ASSERT(ec);
on_read_complete(ec);
complete_read(ec);
m_event_loop.process_completed_operations(); // Throws
return;
}
Expand All @@ -820,7 +866,7 @@ class SocketImpl: public Socket {
m_read_buffer_end = m_read_buffer_begin + n;
bool is_complete = process_buffered_input(ec);
if (is_complete) {
on_read_complete(ec);
complete_read(ec);
m_event_loop.process_completed_operations(); // Throws
}
return;
Expand All @@ -831,20 +877,19 @@ class SocketImpl: public Socket {
// why it happens so rarelygiven that we do simulate read errors
// by closing the connection on the remote side)
REALM_ASSERT(m_connect_oper->is_incomplete() != m_read_oper->is_incomplete());
bool is_write_error = false;
std::error_code ec = get_error(is_write_error); // Throws
std::error_code ec = get_error(m_read_stream.get()); // Throws
if (m_connect_oper->is_incomplete()) {
on_connect_complete(ec);
complete_connect(ec);
}
else {
on_read_complete(ec);
complete_read(ec);
}
m_event_loop.process_completed_operations(); // Throws
return;
}
case kCFStreamEventEndEncountered: {
// FIXME: It seems this this event never happens. Why is that?
on_read_complete(network::end_of_input);
complete_read(network::end_of_input);
m_event_loop.process_completed_operations(); // Throws
return;
}
Expand All @@ -863,7 +908,7 @@ class SocketImpl: public Socket {
if (REALM_UNLIKELY(n == 0)) {
// Write error
REALM_ASSERT(ec);
on_write_complete(ec);
complete_write(ec);
m_event_loop.process_completed_operations(); // Throws
return;
}
Expand All @@ -872,23 +917,22 @@ class SocketImpl: public Socket {
m_write_curr += n;
bool is_complete = (m_write_curr == m_write_end);
if (is_complete) {
on_write_complete(ec);
complete_write(ec);
m_event_loop.process_completed_operations(); // Throws
}
return;
}
case kCFStreamEventErrorOccurred: {
// FIXME: It seems this this event never happens. Why is that?
REALM_ASSERT(m_write_oper->is_incomplete());
bool is_write_error = true;
std::error_code ec = get_error(is_write_error); // Throws
on_write_complete(ec);
std::error_code ec = get_error(m_write_stream.get()); // Throws
complete_write(ec);
m_event_loop.process_completed_operations(); // Throws
return;
}
case kCFStreamEventEndEncountered: {
// FIXME: It seems this this event never happens. Why is that?
on_write_complete(error::connection_reset);
complete_write(error::connection_reset);
m_event_loop.process_completed_operations(); // Throws
return;
}
Expand All @@ -913,8 +957,7 @@ class SocketImpl: public Socket {
return 0;
}
REALM_ASSERT(n == -1);
bool is_write_error = false;
ec = get_error(is_write_error); // Throws
ec = get_error(m_read_stream.get()); // Throws
return 0;
}

Expand All @@ -930,8 +973,7 @@ class SocketImpl: public Socket {
if (REALM_LIKELY(n > 0))
return n;
REALM_ASSERT(n == -1);
bool is_write_error = true;
ec = get_error(is_write_error); // Throws
ec = get_error(m_write_stream.get()); // Throws
return 0;
}

Expand Down Expand Up @@ -959,20 +1001,21 @@ class SocketImpl: public Socket {
return true; // Complete
}

void on_connect_complete(std::error_code ec = std::error_code()) noexcept
void complete_connect(std::error_code ec = std::error_code(),
bool not_yet_attached_to_cf_run_loop = false) noexcept
{
m_connect_oper->complete(ec);
bind_ptr<ConnectOper> oper = m_connect_oper;
m_event_loop.add_completed_operation(std::move(oper));
--m_event_loop.num_operations_in_progress;
if (m_cf_run_loop) {
if (!not_yet_attached_to_cf_run_loop && m_cf_run_loop) {
CFReadStreamUnscheduleFromRunLoop(m_read_stream.get(), m_cf_run_loop,
kCFRunLoopDefaultMode);
}
}

void on_read_complete(std::error_code ec = std::error_code(),
bool not_yet_attached_to_cf_run_loop = false) noexcept
void complete_read(std::error_code ec = std::error_code(),
bool not_yet_attached_to_cf_run_loop = false) noexcept
{
size_t n = size_t(m_read_curr - m_read_begin);
m_read_oper->complete(ec, n);
Expand All @@ -985,8 +1028,8 @@ class SocketImpl: public Socket {
}
}

void on_write_complete(std::error_code ec = std::error_code(),
bool not_yet_attached_to_cf_run_loop = false) noexcept
void complete_write(std::error_code ec = std::error_code(),
bool not_yet_attached_to_cf_run_loop = false) noexcept
{
size_t n = size_t(m_write_curr - m_write_begin);
m_write_oper->complete(ec, n);
Expand All @@ -1008,28 +1051,37 @@ class SocketImpl: public Socket {
m_write_stream.reset();
}

std::error_code get_error(bool is_write_error)
static std::error_code get_error(CFReadStreamRef read_stream)
{
REALM_ASSERT(CFReadStreamGetStatus(read_stream) == kCFStreamStatusError);
ReleaseGuard<CFErrorRef> error;
if (is_write_error) {
REALM_ASSERT(CFWriteStreamGetStatus(m_write_stream.get()) == kCFStreamStatusError);
error.reset(CFWriteStreamCopyError(m_write_stream.get()));
}
else {
REALM_ASSERT(CFReadStreamGetStatus(m_read_stream.get()) == kCFStreamStatusError);
error.reset(CFReadStreamCopyError(m_read_stream.get()));
}
error.reset(CFReadStreamCopyError(read_stream));
if (!error)
throw std::bad_alloc();
return translate_error(error.get()); // Throws
}

static std::error_code get_error(CFWriteStreamRef write_stream)
{
REALM_ASSERT(CFWriteStreamGetStatus(write_stream) == kCFStreamStatusError);
ReleaseGuard<CFErrorRef> error;
error.reset(CFWriteStreamCopyError(write_stream));
if (!error)
throw std::bad_alloc();
CFStringRef domain = CFErrorGetDomain(error.get());
return translate_error(error.get()); // Throws
}

static std::error_code translate_error(CFErrorRef error)
{
CFStringRef domain = CFErrorGetDomain(error);
if (CFStringCompare(domain, kCFErrorDomainPOSIX, 0) == kCFCompareEqualTo) {
CFIndex code = CFErrorGetCode(error.get());
CFIndex code = CFErrorGetCode(error);
return make_basic_system_error_code(int(code));
}
else if (CFStringCompare(domain, kCFErrorDomainCFNetwork, 0) == kCFCompareEqualTo) {
CFIndex code = CFErrorGetCode(error.get());
CFIndex code = CFErrorGetCode(error);
if (code == 2) {
ReleaseGuard<CFDictionaryRef> user_info(CFErrorCopyUserInfo(error.get()));
ReleaseGuard<CFDictionaryRef> user_info(CFErrorCopyUserInfo(error));
if (!user_info)
throw std::bad_alloc();
const void* value =
Expand Down Expand Up @@ -1058,7 +1110,7 @@ class SocketImpl: public Socket {
return error::unknown;
}

std::error_code translate_addrinfo_error(int err) noexcept
static std::error_code translate_addrinfo_error(int err) noexcept
{
switch (err) {
case EAI_AGAIN:
Expand Down Expand Up @@ -1143,7 +1195,7 @@ class DeadlineTimerImpl: public DeadlineTimer {
{
if (m_wait_oper->in_progress()) {
if (m_wait_oper->is_incomplete())
on_wait_complete();
complete_wait();
m_wait_oper->cancel();
}
}
Expand Down Expand Up @@ -1189,11 +1241,11 @@ class DeadlineTimerImpl: public DeadlineTimer {
void wait_callback(CFRunLoopTimerRef cf_timer)
{
REALM_ASSERT(cf_timer == m_cf_timer.get());
on_wait_complete(); // Throws
complete_wait(); // Throws
m_event_loop.process_completed_operations(); // Throws
}

void on_wait_complete(std::error_code ec = std::error_code()) noexcept
void complete_wait(std::error_code ec = std::error_code()) noexcept
{
m_wait_oper->complete(ec);
bind_ptr<WaitOper> oper = m_wait_oper;
Expand Down

0 comments on commit 1b64a85

Please sign in to comment.