Skip to content

Commit

Permalink
#42: Create software release 1.2.0. Monitor cancel no longer closes d…
Browse files Browse the repository at this point in the history
…isruptor.
  • Loading branch information
simondelabici committed Jul 26, 2018
1 parent 7700335 commit e619521
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions src/main/java/org/epics/ca/impl/requests/MonitorRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class MonitorRequest<T> implements Monitor<T>, NotifyResponseRequest
/**
* Reference to an object which will push out notifications to the Consumer.
*/
private MonitorNotificationService<T> notifier;
private MonitorNotificationService<T> monitorNotificationService;

/**
* Reference to an object which will consume monitor update events.
Expand All @@ -70,15 +70,15 @@ public class MonitorRequest<T> implements Monitor<T>, NotifyResponseRequest
* @param transport the transport.
* @param typeSupport the object which will provide type support.
* @param mask the mask.
* @param notifier the monitor notification service.
* @param monitorNotificationService the monitor notification service.
* @param consumer the consumer to be informed of monitor update events.
*/
public MonitorRequest( ChannelImpl<?> channel, Transport transport, TypeSupport<T> typeSupport, int mask, MonitorNotificationService<T> notifier, Consumer<? super T> consumer )
public MonitorRequest( ChannelImpl<?> channel, Transport transport, TypeSupport<T> typeSupport, int mask, MonitorNotificationService<T> monitorNotificationService, Consumer<? super T> consumer )
{
this.channel = Validate.notNull( channel );
this.typeSupport = Validate.notNull(typeSupport );
this.typeSupport = Validate.notNull( typeSupport );
this.mask = mask;
this.notifier = Validate.notNull( notifier );
this.monitorNotificationService = Validate.notNull( monitorNotificationService );
this.consumer = Validate.notNull( consumer );

context = transport.getContext ();
Expand All @@ -103,13 +103,13 @@ public void response( int status, short dataType, int dataCount, ByteBuffer data
if ( caStatus == Status.NORMAL )
{
// Publish the new value to the consumer.
final boolean overrun = ! notifier.publish( dataPayloadBuffer, typeSupport, dataCount );
final boolean overrun = ! monitorNotificationService.publish( dataPayloadBuffer, typeSupport, dataCount );
if ( overrun )
{
bufferOverrunWarningCount++;
if ( bufferOverrunWarningCount < 3 )
{
logger.log(Level.WARNING, "Buffer Overrun: the monitor notifier service implementation discarded the oldest data in the notification buffer.");
logger.log(Level.WARNING, "Buffer Overrun: the monitor notification service implementation discarded the oldest data in the notification buffer.");
}
else if ( bufferOverrunWarningCount == 3 )
{
Expand All @@ -128,13 +128,17 @@ else if ( bufferOverrunWarningCount == 3 )
public void cancel()
{
// unregister response request
context.unregisterResponseRequest (this);
channel.unregisterResponseRequest (this);
context.unregisterResponseRequest( this );
channel.unregisterResponseRequest( this );

// Tell the monitor notifier that we are finished handling events for
// this consumer.
notifier.dispose();
// THE FOLLOWING IS FROM THE ORIGINAL IMPLEMENTATION:
// NOTE: this does not wait until all events in the ring buffer are processed
// but we do not want to block by calling shutdown()
// disruptor.halt();

// TODO: decide whether this implementation is appropriate.
// TODO: Where slow consumers are still processing the call below can block !
// monitorNotificationService.close();
}

public void resubscribe( Transport transport )
Expand Down Expand Up @@ -173,7 +177,7 @@ else if ( status == Status.DISCONN )
// in the buffer the event was quietly dropped. Since this feature
// was not documented and this behaviour is somewhat unintuitive
// for the moment the feature has been dropped.
// notifier.publish( null );
// monitorNotificationService.publish( null );
}
else
{
Expand All @@ -184,7 +188,6 @@ else if ( status == Status.DISCONN )
@Override
public void close()
{

if ( closed.getAndSet (true) )
return;

Expand All @@ -201,15 +204,15 @@ public void close()

try
{
Messages.cancelSubscriptionMessage (
transport, typeSupport.getDataType (), dataCount,
channel.getSID (), ioid);
Messages.cancelSubscriptionMessage ( transport, typeSupport.getDataType (), dataCount, channel.getSID (), ioid );
transport.flush ();
}
catch ( Throwable th )
{
logger.log( Level.FINER, "Failed to send 'cancel subscription' message.", th);
}

monitorNotificationService.close();
}


Expand Down

0 comments on commit e619521

Please sign in to comment.