Skip to content

Commit

Permalink
Provide a public factory for BiTransportObserver (#1135)
Browse files Browse the repository at this point in the history
Motivation:

Users who need to report events to two `TransportObserver`s simultaneously
on the server-side or who need their own variant of
`TransportObserverConnectionFactoryFilter` will find it useful.

Modifications:

- Move `BiTransportObserver` from `client-api` to `transport-api`;
- Convert passes first and second observers into safe observers;
- Add public `TransportObservers.biTransportObserver` factory;

Result:

Users can easily report events to two `TransportObserver`s.
  • Loading branch information
idelpivnitskiy authored Aug 25, 2020
1 parent 63e95ee commit 11f911d
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.transport.api.TransportObservers.combine;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -71,7 +72,7 @@ public Single<C> newConnection(final ResolvedAddress resolvedAddress,
return failed(t);
}
return delegate().newConnection(resolvedAddress, originalObserver == null ? newObserver :
newObserver == null ? originalObserver : new BiTransportObserver(originalObserver, newObserver));
newObserver == null ? originalObserver : combine(originalObserver, newObserver));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.client.api;
package io.servicetalk.transport.api;

import io.servicetalk.transport.api.ConnectionInfo;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.ConnectionObserver.DataObserver;
import io.servicetalk.transport.api.ConnectionObserver.MultiplexedObserver;
import io.servicetalk.transport.api.ConnectionObserver.ReadObserver;
import io.servicetalk.transport.api.ConnectionObserver.SecurityHandshakeObserver;
import io.servicetalk.transport.api.ConnectionObserver.StreamObserver;
import io.servicetalk.transport.api.ConnectionObserver.WriteObserver;
import io.servicetalk.transport.api.TransportObserver;

import javax.net.ssl.SSLSession;

import static java.util.Objects.requireNonNull;
import static io.servicetalk.transport.api.TransportObservers.asSafeObserver;

/**
* Combines two {@link TransportObserver}s into a single {@link TransportObserver}.
*/
final class BiTransportObserver implements TransportObserver {

private final TransportObserver first;
private final TransportObserver second;

/**
* Creates a new instance.
*
* @param first the {@link TransportObserver} that will receive events first
* @param second the {@link TransportObserver} that will receive events second
*/
BiTransportObserver(final TransportObserver first, final TransportObserver second) {
this.first = requireNonNull(first);
this.second = requireNonNull(second);
this.first = asSafeObserver(first);
this.second = asSafeObserver(second);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.servicetalk.transport.api;

import static java.util.Objects.requireNonNull;

/**
* A factory to create different {@link TransportObserver}s.
*/
Expand All @@ -36,6 +38,31 @@ public static TransportObserver asSafeObserver(final TransportObserver observer)
if (observer instanceof CatchAllTransportObserver) {
return observer;
}
if (observer instanceof BiTransportObserver) {
// BiTransportObserver is always safe
return observer;
}
return new CatchAllTransportObserver(observer);
}

/**
* Combines multiple {@link TransportObserver}s into a single {@link TransportObserver}.
*
* @param other {@link TransportObserver}s to combine
* @return a {@link TransportObserver} that delegates all invocations to the provided {@link TransportObserver}s
*/
public static TransportObserver combine(final TransportObserver... other) {
switch (other.length) {
case 0:
throw new IllegalArgumentException("At least one TransportObserver is required to combine");
case 1:
return requireNonNull(other[0]);
default:
BiTransportObserver bi = new BiTransportObserver(other[0], other[1]);
for (int i = 2; i < other.length; i++) {
bi = new BiTransportObserver(bi, other[i]);
}
return bi;
}
}
}

0 comments on commit 11f911d

Please sign in to comment.