Skip to content

Commit

Permalink
Fix ConcurrentModificationException in Datafeed loop finos#475
Browse files Browse the repository at this point in the history
A ConcurrentModificationException can happen in the Datafeed (DF) loop upon startup.
The access to the listeners collection in the DF loop is not protected
and accessed by multiple threads.

Because of the for loop we need to synchronize on the list itself.
  • Loading branch information
symphony-youri committed Mar 17, 2021
1 parent ba4ad60 commit 5e74aa2
Show file tree
Hide file tree
Showing 3 changed files with 430 additions and 333 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ abstract class AbstractDatafeedLoop implements DatafeedLoop {

protected final AuthSession authSession;
protected final BdkConfig bdkConfig;
protected final List<RealTimeEventListener> listeners;
protected final RetryWithRecoveryBuilder retryWithRecoveryBuilder;
protected DatafeedApi datafeedApi;
protected ApiClient apiClient;

// access needs to be thread safe (DF loop is usually running on its own thread)
private final List<RealTimeEventListener> listeners;

public AbstractDatafeedLoop(DatafeedApi datafeedApi, AuthSession authSession, BdkConfig config) {
this.datafeedApi = datafeedApi;
this.listeners = new ArrayList<>();
Expand All @@ -55,15 +57,19 @@ public AbstractDatafeedLoop(DatafeedApi datafeedApi, AuthSession authSession, Bd
*/
@Override
public void subscribe(RealTimeEventListener listener) {
listeners.add(listener);
synchronized (listeners) {
listeners.add(listener);
}
}

/**
* {@inheritDoc}
*/
@Override
public void unsubscribe(RealTimeEventListener listener) {
listeners.remove(listener);
synchronized (listeners) {
listeners.remove(listener);
}
}

/**
Expand All @@ -79,9 +85,11 @@ protected void handleV4EventList(List<V4Event> events) {

try {
RealTimeEventType eventType = RealTimeEventType.valueOf(event.getType());
for (RealTimeEventListener listener : listeners) {
if (listener.isAcceptingEvent(event, bdkConfig.getBot().getUsername())) {
eventType.dispatch(listener, event);
synchronized (listeners) {
for (RealTimeEventListener listener : listeners) {
if (listener.isAcceptingEvent(event, bdkConfig.getBot().getUsername())) {
eventType.dispatch(listener, event);
}
}
}
} catch (IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void start() throws ApiException, AuthUnauthorizedException {
} catch (AuthUnauthorizedException | ApiException | NestedRetryException exception) {
throw exception;
} catch (Throwable throwable) {
log.error(networkIssueMessageError(throwable,datafeedApi.getApiClient().getBasePath()) + "\n" + throwable);
log.error(networkIssueMessageError(throwable, datafeedApi.getApiClient().getBasePath()) + "\n" + throwable);
}
}

Expand Down
Loading

0 comments on commit 5e74aa2

Please sign in to comment.