Skip to content

Commit

Permalink
Avoid exposing webrtc library specifics in WebRTCConnection
Browse files Browse the repository at this point in the history
This avoids putting `PeerConnectionObserver` and `RTCDataChannelObserver` in the API of `WebRTCConnection`. Not doing
this will require subtle dependency requirements once in a separate project.
  • Loading branch information
kwvanderlinde committed Apr 7, 2024
1 parent cff9ff4 commit 41d5956
Showing 1 changed file with 177 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;

public class WebRTCConnection extends AbstractConnection
implements Connection, PeerConnectionObserver, RTCDataChannelObserver {
public class WebRTCConnection extends AbstractConnection implements Connection {
public interface Listener {
void onLoginError();
}

private static final Logger log = LogManager.getLogger(WebRTCConnection.class);

private final PeerConnectionObserver peerConnectionObserver = new PeerConnectionObserverImpl();
private final RTCDataChannelObserver rtcDataChannelObserver = new RTCDataChannelObserverImpl();
private final PeerConnectionFactory factory = new PeerConnectionFactory();
private final String serverName;
private final String id;
Expand Down Expand Up @@ -68,7 +69,7 @@ public WebRTCConnection(OfferMessageDto message, WebRTCServer webRTCServer) {
this.signalingClient = server.getSignalingClient();
init();

peerConnection = factory.createPeerConnection(rtcConfig, this);
peerConnection = factory.createPeerConnection(rtcConfig, peerConnectionObserver);
peerConnection.setRemoteDescription(
message.offer,
new SetSessionDescriptionObserver() {
Expand Down Expand Up @@ -284,11 +285,11 @@ private void onLogin(LoginMessageDto message) {
return;
}

peerConnection = factory.createPeerConnection(rtcConfig, this);
peerConnection = factory.createPeerConnection(rtcConfig, peerConnectionObserver);

var initDict = new RTCDataChannelInit();
localDataChannel = peerConnection.createDataChannel("myDataChannel", initDict);
localDataChannel.registerObserver(this);
localDataChannel.registerObserver(rtcDataChannelObserver);

var offerOptions = new RTCOfferOptions();
peerConnection.createOffer(
Expand Down Expand Up @@ -326,181 +327,11 @@ private String prefix() {
return isServerSide() ? "S " : "C ";
}

@Override
public void onSignalingChange(RTCSignalingState state) {
// set thread name for better logs.
Thread.currentThread().setName("WebRTCConnection.WebRTCThread_" + getId());
log.info(prefix() + "PeerConnection.onSignalingChange: " + state);
}

@Override
public void onConnectionChange(RTCPeerConnectionState state) {
log.info(prefix() + "PeerConnection.onConnectionChange " + state);
switch (state) {
case FAILED -> {
lastError = "PeerConnection failed";
peerConnection = null;
fireDisconnectAsync();
}
case CONNECTED -> {
if (hasMoreMessages()) {
synchronized (sendThread) {
sendThread.notify();
}
}
}
}
}

@Override
public void onIceConnectionChange(RTCIceConnectionState state) {
log.info(prefix() + "PeerConnection.onIceConnectionChange " + state);
}

@Override
public void onStandardizedIceConnectionChange(RTCIceConnectionState state) {
log.info(prefix() + "PeerConnection.onStandardizedIceConnectionChange " + state);
}

@Override
public void onIceConnectionReceivingChange(boolean receiving) {
log.info(prefix() + "PeerConnection.onIceConnectionReceivingChange " + receiving);
}

@Override
public void onIceGatheringChange(RTCIceGatheringState state) {
log.info(prefix() + "PeerConnection.onIceGatheringChange " + state);
}

@Override
public void onIceCandidate(RTCIceCandidate candidate) {
var msg = new CandidateMessageDto();

if (isServerSide()) {
msg.source = serverName;
msg.destination = getSource();
} else {
msg.destination = serverName;
msg.source = getSource();
}
msg.candidate = candidate;
sendSignalingMessage(gson.toJson(msg));
}

@Override
public void onIceCandidateError(RTCPeerConnectionIceErrorEvent event) {
log.debug(
prefix()
+ "PeerConnection.onIceCandidateError: code:"
+ event.getErrorCode()
+ " url: "
+ event.getUrl()
+ " address/port: "
+ event.getAddress()
+ ":"
+ event.getPort()
+ " text: "
+ event.getErrorText());
}

@Override
public void onIceCandidatesRemoved(RTCIceCandidate[] candidates) {
log.info(prefix() + "PeerConnection.onIceCandidatesRemoved");
}

@Override
public void onAddStream(MediaStream stream) {
log.info(prefix() + "PeerConnection.onAddStream");
}

@Override
public void onRemoveStream(MediaStream stream) {
log.info(prefix() + "PeerConnection.onRemoveStream");
}

@Override
public void onDataChannel(RTCDataChannel newDataChannel) {
log.info(prefix() + "PeerConnection.onDataChannel");
this.localDataChannel = newDataChannel;
localDataChannel.registerObserver(this);

if (isServerSide()) {
server.onDataChannelOpened(this);
}
}

@Override
public void onRenegotiationNeeded() {
// set thread name for better logs
Thread.currentThread().setName("WebRTCConnection.WebRTCThread_" + getId());
log.info(prefix() + "PeerConnection.onRenegotiationNeeded");
}

@Override
public void onAddTrack(RTCRtpReceiver receiver, MediaStream[] mediaStreams) {
log.info(prefix() + "PeerConnection.onTrack(multiple Streams)");
}

@Override
public void onRemoveTrack(RTCRtpReceiver receiver) {
log.info(prefix() + "PeerConnection.onRemoveTrack");
}

@Override
public void onTrack(RTCRtpTransceiver transceiver) {
log.info(prefix() + "PeerConnection.onTrack");
}

public void addIceCandidate(RTCIceCandidate candidate) {
log.info(prefix() + "PeerConnection.addIceCandidate: " + candidate.toString());
peerConnection.addIceCandidate(candidate);
}

// dataChannel
@Override
public void onBufferedAmountChange(long previousAmount) {
log.info(prefix() + "dataChannel onBufferedAmountChange " + previousAmount);
}

// dataChannel
@Override
public void onStateChange() {
var state = localDataChannel.getState();
log.info(prefix() + "localDataChannel onStateChange " + state);
switch (state) {
case OPEN -> {
// connection established we don't need the signaling server anymore
// for now disabled. We may get additional ice candidates.
if (!isServerSide() && signalingClient.isOpen()) {
signalingClient.close();
}

sendThread.start();
}
case CLOSED -> {
close();
fireDisconnectAsync();
}
}
}

// dataChannel
@Override
public void onMessage(RTCDataChannelBuffer channelBuffer) {
log.debug(
prefix() + "localDataChannel onMessage: got " + channelBuffer.data.capacity() + " bytes");

if (Thread.currentThread().getContextClassLoader() == null) {
ClassLoader cl = ClassLoader.getSystemClassLoader();
Thread.currentThread().setContextClassLoader(cl);
}

var message = readMessage(channelBuffer.data);
if (message != null) {
dispatchCompressedMessage(id, message);
}
}

private void fireDisconnectAsync() {
handleDisconnect =
new Thread(
Expand Down Expand Up @@ -606,4 +437,175 @@ public void run() {
log.debug(prefix() + " sendThread ended");
}
}

private final class PeerConnectionObserverImpl implements PeerConnectionObserver {
@Override
public void onIceCandidate(RTCIceCandidate candidate) {
var msg = new CandidateMessageDto();

if (isServerSide()) {
msg.source = serverName;
msg.destination = getSource();
} else {
msg.destination = serverName;
msg.source = getSource();
}
msg.candidate = candidate;
sendSignalingMessage(gson.toJson(msg));
}

@Override
public void onAddStream(MediaStream stream) {
log.info(prefix() + "PeerConnection.onAddStream");
}

@Override
public void onAddTrack(RTCRtpReceiver receiver, MediaStream[] mediaStreams) {
log.info(prefix() + "PeerConnection.onTrack(multiple Streams)");
}

@Override
public void onConnectionChange(RTCPeerConnectionState state) {
log.info(prefix() + "PeerConnection.onConnectionChange " + state);
switch (state) {
case FAILED -> {
lastError = "PeerConnection failed";
peerConnection = null;
fireDisconnectAsync();
}
case CONNECTED -> {
if (hasMoreMessages()) {
synchronized (sendThread) {
sendThread.notify();
}
}
}
}
}

@Override
public void onDataChannel(RTCDataChannel newDataChannel) {
log.info(prefix() + "PeerConnection.onDataChannel");
localDataChannel = newDataChannel;
localDataChannel.registerObserver(rtcDataChannelObserver);

if (isServerSide()) {
server.onDataChannelOpened(WebRTCConnection.this);
}
}

@Override
public void onIceCandidateError(RTCPeerConnectionIceErrorEvent event) {
log.debug(
prefix()
+ "PeerConnection.onIceCandidateError: code:"
+ event.getErrorCode()
+ " url: "
+ event.getUrl()
+ " address/port: "
+ event.getAddress()
+ ":"
+ event.getPort()
+ " text: "
+ event.getErrorText());
}

@Override
public void onIceCandidatesRemoved(RTCIceCandidate[] candidates) {
log.info(prefix() + "PeerConnection.onIceCandidatesRemoved");
}

@Override
public void onIceConnectionChange(RTCIceConnectionState state) {
log.info(prefix() + "PeerConnection.onIceConnectionChange " + state);
}

@Override
public void onIceConnectionReceivingChange(boolean receiving) {
log.info(prefix() + "PeerConnection.onIceConnectionReceivingChange " + receiving);
}

@Override
public void onIceGatheringChange(RTCIceGatheringState state) {
log.info(prefix() + "PeerConnection.onIceGatheringChange " + state);
}

@Override
public void onRemoveStream(MediaStream stream) {
log.info(prefix() + "PeerConnection.onRemoveStream");
}

@Override
public void onRemoveTrack(RTCRtpReceiver receiver) {
log.info(prefix() + "PeerConnection.onRemoveTrack");
}

@Override
public void onRenegotiationNeeded() {
// set thread name for better logs
Thread.currentThread().setName("WebRTCConnection.WebRTCThread_" + getId());
log.info(prefix() + "PeerConnection.onRenegotiationNeeded");
}

@Override
public void onSignalingChange(RTCSignalingState state) {
// set thread name for better logs.
Thread.currentThread().setName("WebRTCConnection.WebRTCThread_" + getId());
log.info(prefix() + "PeerConnection.onSignalingChange: " + state);
}

@Override
public void onStandardizedIceConnectionChange(RTCIceConnectionState state) {
log.info(prefix() + "PeerConnection.onStandardizedIceConnectionChange " + state);
}

@Override
public void onTrack(RTCRtpTransceiver transceiver) {
log.info(prefix() + "PeerConnection.onTrack");
}
}

private final class RTCDataChannelObserverImpl implements RTCDataChannelObserver {
@Override
public void onBufferedAmountChange(long previousAmount) {
log.info(prefix() + "dataChannel onBufferedAmountChange " + previousAmount);
}

@Override
public void onStateChange() {
var state = localDataChannel.getState();
log.info(prefix() + "localDataChannel onStateChange " + state);
switch (state) {
case OPEN -> {
// connection established we don't need the signaling server anymore
// for now disabled. We may get additional ice candidates.
if (!isServerSide() && signalingClient.isOpen()) {
signalingClient.close();
}

sendThread.start();
}
case CLOSED -> {
close();
fireDisconnectAsync();
}
}
}

@Override
public void onMessage(RTCDataChannelBuffer channelBuffer) {
log.debug(
prefix() + "localDataChannel onMessage: got " + channelBuffer.data.capacity() + " bytes");

if (Thread.currentThread().getContextClassLoader() == null) {
ClassLoader cl = ClassLoader.getSystemClassLoader();
Thread.currentThread().setContextClassLoader(cl);
}

var message = readMessage(channelBuffer.data);
if (message != null) {
dispatchCompressedMessage(id, message);
}
}
}
}

0 comments on commit 41d5956

Please sign in to comment.