Skip to content

Commit

Permalink
Fix crash for web socket in some race conditions (facebook#22439)
Browse files Browse the repository at this point in the history
Summary:
Fixes facebook#21086.
Fixes facebook#6117.

This PR fixes a crash caused by a race condition when `webSocket` deallocated and `NSStream` delegate callback, because `NSStream`'s delegate callback be called on `RCTSR_networkRunLoop`.

This PR mainly changes:

* Remove unnecessary `nil` operation in `dealloc` method.
* Add a new method `_scheduleCleanUp` to schedule `webSocket` cleanup also on `RCTSR_networkRunLoop`.
* In `stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode` delegate method, add a `wself` to make safe further.
Pull Request resolved: facebook#22439

Differential Revision: D13564247

Pulled By: cpojer

fbshipit-source-id: 675c1b2805aa45c54d7708d796f5843ef7ea34e2
  • Loading branch information
zhongwuzw authored and marcin-spotio committed Feb 13, 2019
1 parent de90192 commit 8269e88
Showing 1 changed file with 133 additions and 96 deletions.
229 changes: 133 additions & 96 deletions Libraries/WebSocket/RCTSRWebSocket.m
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ @implementation RCTSRWebSocket
int _closeCode;

BOOL _isPumping;

BOOL _cleanupScheduled;

NSMutableSet<NSArray *> *_scheduledRunloops;

Expand Down Expand Up @@ -324,17 +326,11 @@ - (void)dealloc

[_inputStream close];
[_outputStream close];

_workQueue = NULL;


if (_receivedHTTPHeaders) {
CFRelease(_receivedHTTPHeaders);
_receivedHTTPHeaders = NULL;
}

if (_delegateDispatchQueue) {
_delegateDispatchQueue = NULL;
}
}

#ifndef NDEBUG
Expand Down Expand Up @@ -626,11 +622,11 @@ - (void)_failWithError:(NSError *)error;
}];

self.readyState = RCTSR_CLOSED;
self->_selfRetain = nil;


RCTSRLog(@"Failing with error %@", error.localizedDescription);

[self _disconnect];
[self _scheduleCleanup];
}
});
}
Expand Down Expand Up @@ -1036,12 +1032,7 @@ - (void)_pumpWriting;
!_sentClose) {
_sentClose = YES;

[_outputStream close];
[_inputStream close];

for (NSArray *runLoop in [_scheduledRunloops copy]) {
[self unscheduleFromRunLoop:runLoop[0] forMode:runLoop[1]];
}
[self _scheduleCleanup];

if (!_failed) {
[self _performDelegateBlock:^{
Expand All @@ -1050,8 +1041,6 @@ - (void)_pumpWriting;
}
}];
}

_selfRetain = nil;
}
}

Expand Down Expand Up @@ -1345,94 +1334,142 @@ - (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode;
}
}
}

assert(_workQueue != NULL);

// _workQueue cannot be NULL
if (!_workQueue) {
return;
}
__weak typeof(self) weakSelf = self;
dispatch_async(_workQueue, ^{
switch (eventCode) {
case NSStreamEventOpenCompleted: {
RCTSRLog(@"NSStreamEventOpenCompleted %@", aStream);
if (self.readyState >= RCTSR_CLOSING) {
return;
}
assert(self->_readBuffer);

if (self.readyState == RCTSR_CONNECTING && aStream == self->_inputStream) {
[self didConnect];
}
[self _pumpWriting];
[self _pumpScanner];
break;
}

case NSStreamEventErrorOccurred: {
RCTSRLog(@"NSStreamEventErrorOccurred %@ %@", aStream, [aStream.streamError copy]);
// TODO: specify error better!
[self _failWithError:aStream.streamError];
self->_readBufferOffset = 0;
self->_readBuffer.length = 0;
break;
typeof(self) strongSelf = weakSelf;
if (!strongSelf) {
return;
}
[strongSelf safeHandleEvent:eventCode stream:aStream];
});
}

- (void)safeHandleEvent:(NSStreamEvent)eventCode stream:(NSStream *)aStream
{
switch (eventCode) {
case NSStreamEventOpenCompleted: {
RCTSRLog(@"NSStreamEventOpenCompleted %@", aStream);
if (self.readyState >= RCTSR_CLOSING) {
return;
}

case NSStreamEventEndEncountered: {
[self _pumpScanner];
RCTSRLog(@"NSStreamEventEndEncountered %@", aStream);
if (aStream.streamError) {
[self _failWithError:aStream.streamError];
} else {
dispatch_async(self->_workQueue, ^{
if (self.readyState != RCTSR_CLOSED) {
self.readyState = RCTSR_CLOSED;
self->_selfRetain = nil;
}

if (!self->_sentClose && !self->_failed) {
self->_sentClose = YES;
// If we get closed in this state it's probably not clean because we should be sending this when we send messages
[self _performDelegateBlock:^{
if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) {
[self.delegate webSocket:self didCloseWithCode:RCTSRStatusCodeGoingAway reason:@"Stream end encountered" wasClean:NO];
}
}];
}
});
}

break;
assert(self->_readBuffer);

if (self.readyState == RCTSR_CONNECTING && aStream == self->_inputStream) {
[self didConnect];
}

case NSStreamEventHasBytesAvailable: {
RCTSRLog(@"NSStreamEventHasBytesAvailable %@", aStream);
const int bufferSize = 2048;
uint8_t buffer[bufferSize];

while (self->_inputStream.hasBytesAvailable) {
NSInteger bytes_read = [self->_inputStream read:buffer maxLength:bufferSize];

if (bytes_read > 0) {
[self->_readBuffer appendBytes:buffer length:bytes_read];
} else if (bytes_read < 0) {
[self _failWithError:self->_inputStream.streamError];
[self _pumpWriting];
[self _pumpScanner];
break;
}

case NSStreamEventErrorOccurred: {
RCTSRLog(@"NSStreamEventErrorOccurred %@ %@", aStream, [aStream.streamError copy]);
// TODO: specify error better!
[self _failWithError:aStream.streamError];
self->_readBufferOffset = 0;
self->_readBuffer.length = 0;
break;

}

case NSStreamEventEndEncountered: {
[self _pumpScanner];
RCTSRLog(@"NSStreamEventEndEncountered %@", aStream);
if (aStream.streamError) {
[self _failWithError:aStream.streamError];
} else {
dispatch_async(self->_workQueue, ^{
if (self.readyState != RCTSR_CLOSED) {
self.readyState = RCTSR_CLOSED;
[self _scheduleCleanup];
}

if (bytes_read != bufferSize) {
break;

if (!self->_sentClose && !self->_failed) {
self->_sentClose = YES;
// If we get closed in this state it's probably not clean because we should be sending this when we send messages
[self _performDelegateBlock:^{
if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) {
[self.delegate webSocket:self didCloseWithCode:RCTSRStatusCodeGoingAway reason:@"Stream end encountered" wasClean:NO];
}
}];
}
};
[self _pumpScanner];
break;
});
}

break;
}

case NSStreamEventHasBytesAvailable: {
RCTSRLog(@"NSStreamEventHasBytesAvailable %@", aStream);
const int bufferSize = 2048;
uint8_t buffer[bufferSize];

while (self->_inputStream.hasBytesAvailable) {
NSInteger bytes_read = [self->_inputStream read:buffer maxLength:bufferSize];

if (bytes_read > 0) {
[self->_readBuffer appendBytes:buffer length:bytes_read];
} else if (bytes_read < 0) {
[self _failWithError:self->_inputStream.streamError];
}

if (bytes_read != bufferSize) {
break;
}
};
[self _pumpScanner];
break;
}

case NSStreamEventHasSpaceAvailable: {
RCTSRLog(@"NSStreamEventHasSpaceAvailable %@", aStream);
[self _pumpWriting];
break;
}

default:
RCTSRLog(@"(default) %@", aStream);
break;
}
}

case NSStreamEventHasSpaceAvailable: {
RCTSRLog(@"NSStreamEventHasSpaceAvailable %@", aStream);
[self _pumpWriting];
break;
}
- (void)_scheduleCleanup
{
if (_cleanupScheduled) {
return;
}

_cleanupScheduled = YES;

// Cleanup NSStream's delegate in the same RunLoop used by the streams themselves:
// This way we'll prevent race conditions between handleEvent and SRWebsocket's dealloc
NSTimer *timer = [NSTimer timerWithTimeInterval:(0.0f) target:self selector:@selector(_cleanupSelfReference:) userInfo:nil repeats:NO];
[[NSRunLoop RCTSR_networkRunLoop] addTimer:timer forMode:NSDefaultRunLoopMode];
}

default:
RCTSRLog(@"(default) %@", aStream);
break;
}
- (void)_cleanupSelfReference:(NSTimer *)timer
{
// Remove the streams, right now, from the networkRunLoop
[_inputStream close];
[_outputStream close];

// Unschedule from RunLoop
for (NSArray *runLoop in [_scheduledRunloops copy]) {
[self unscheduleFromRunLoop:runLoop[0] forMode:runLoop[1]];
}

// Nuke NSStream's delegate
_inputStream.delegate = nil;
_outputStream.delegate = nil;

// Cleanup selfRetain in the same GCD queue as usual
dispatch_async(_workQueue, ^{
self->_selfRetain = nil;
});
}

Expand Down

0 comments on commit 8269e88

Please sign in to comment.