Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSocketWebSocketClient is not a constructor #263

Open
dev-morph opened this issue Jul 6, 2023 · 2 comments
Open

RSocketWebSocketClient is not a constructor #263

dev-morph opened this issue Jul 6, 2023 · 2 comments
Assignees

Comments

@dev-morph
Copy link

dev-morph commented Jul 6, 2023

First of all, Thanks for your nice work!
But when I follow your guide, I got in trouble.

Expected Behavior

I followed example page code of Link
I thought I could easily connect RSocketServer.

Actual Behavior

But I got RSocketWebSocketClient is not a constructor ERROR, even though I just copy and paste guide code.
I searched it, I found I should downgrade to ^0.0.27. both of rsocket-core and rsocket-websocket-client.
and with 0.0.27 version, it works like a charm!
maybe guide is only for version 0.0.27? I am little bit confused.

Steps to Reproduce

After npm install rsocket-core rsocket-websocket-client;
it will install 1.0.0-alpha.3.
and then copy paste guide code.

@viglucci
Copy link
Member

viglucci commented Jul 7, 2023

Hey @dev-morph thanks for the heads up. We'll update the guide to help avoid this issue. With that being said, I would recommend using the latest alpha version if possible. We likely won't be supporting the 0.27 versions long term once the 1.0.0 versions land and the 1.0.0 alpha versions are not receiving large breaking changes at the moment so they should be relatively safe to begin learning, especially if this is your first introduction to RSocket-js.

The best way to learn the 1.0.0 versions is currently going to be through the examples under packages/RSocket-examples.

@saleweaver
Copy link

@dev-morph if you're still looking to get it working, here's the Websocket Client converted to Typescript

import type {
  DuplexConnection,
  Frame,
  ISubject,
  ISubscriber,
  ISubscription,
} from 'rsocket-types';
import type { Encoders } from 'rsocket-core';

import { Flowable } from 'rsocket-flowable';
import {
  deserializeFrame,
  deserializeFrameWithLength,
  printFrame,
  serializeFrame,
  serializeFrameWithLength,
  toBuffer,
} from 'rsocket-core';
/**
 * Connection status types representing the various states of the connection.
 */
export type ConnectionStatus =
  | { kind: "NOT_CONNECTED" }
  | { kind: "CONNECTING" }
  | { kind: "CONNECTED" }
  | { kind: "CLOSED" }
  | { kind: "ERROR"; error: Error };

/**
 * Constants representing each non-error connection status.
 */
export const CONNECTION_STATUS = {
  NOT_CONNECTED: { kind: "NOT_CONNECTED" } as ConnectionStatus,
  CONNECTING: { kind: "CONNECTING" } as ConnectionStatus,
  CONNECTED: { kind: "CONNECTED" } as ConnectionStatus,
  CLOSED: { kind: "CLOSED" } as ConnectionStatus,
} as const;

export type ClientOptions = {
  url: string;
  wsCreator?: (url: string) => WebSocket;
  debug?: boolean;
  lengthPrefixedFrames?: boolean;
};

/**
 * A WebSocket transport client for use in browser environments.
 */
export default class RSocketWebSocketClient implements DuplexConnection {
  private _encoders?: Encoders<any>;
  private _options: ClientOptions;
  private _receivers: Set<ISubscriber<Frame>>;
  private _senders: Set<ISubscription>;
  private _socket?: WebSocket;
  private _status: ConnectionStatus;
  private _statusSubscribers: Set<ISubject<ConnectionStatus>>;

  constructor(options: ClientOptions, encoders?: Encoders<any>) {
    this._encoders = encoders;
    this._options = options;
    this._receivers = new Set();
    this._senders = new Set();
    this._socket = undefined;
    this._status = CONNECTION_STATUS.NOT_CONNECTED;
    this._statusSubscribers = new Set();
  }

  close(): void {
    this._close();
  }

  connect(): void {
    if (this._status.kind !== 'NOT_CONNECTED') {
      throw new Error(
        'RSocketWebSocketClient: Cannot connect(), a connection is already established.',
      );
    }
    this._setConnectionStatus(CONNECTION_STATUS.CONNECTING);

    const wsCreator = this._options.wsCreator;
    const url = this._options.url;
    this._socket = wsCreator ? wsCreator(url) : new WebSocket(url);

    if (!this._socket) {
      throw new Error('RSocketWebSocketClient: Failed to create WebSocket.');
    }

    const socket = this._socket;
    socket.binaryType = 'arraybuffer';

    socket.addEventListener('close', this._handleClosed);
    socket.addEventListener('error', this._handleError);
    socket.addEventListener('open', this._handleOpened);
    socket.addEventListener('message', this._handleMessage);
  }

  connectionStatus(): Flowable<ConnectionStatus> {
    return new Flowable((subscriber) => {
      subscriber.onSubscribe({
        cancel: () => {
          this._statusSubscribers.delete(subscriber);
        },
        request: () => {
          this._statusSubscribers.add(subscriber);
          subscriber.onNext(this._status);
        },
      });
    });
  }

  receive(): Flowable<Frame> {
    return new Flowable((subject) => {
      subject.onSubscribe({
        cancel: () => {
          this._receivers.delete(subject);
        },
        request: () => {
          this._receivers.add(subject);
        },
      });
    });
  }

  sendOne(frame: Frame): void {
    this._writeFrame(frame);
  }

  send(frames: Flowable<Frame>): void {
    let subscription: ISubscription | undefined;
    frames.subscribe({
      onComplete: () => {
        if (subscription) {
          this._senders.delete(subscription);
        }
      },
      onError: (error) => {
        if (subscription) {
          this._senders.delete(subscription);
        }
        this._close(error);
      },
      onNext: (frame) => this._writeFrame(frame),
      onSubscribe: (_subscription) => {
        subscription = _subscription;
        this._senders.add(subscription);
        subscription.request(Number.MAX_SAFE_INTEGER);
      },
    });
  }

  private _close(error?: Error): void {
    if (this._status.kind === 'CLOSED' || this._status.kind === 'ERROR') {
      // already closed
      return;
    }
    const status: ConnectionStatus = error
      ? { error, kind: 'ERROR' }
      : CONNECTION_STATUS.CLOSED;
    this._setConnectionStatus(status);
    this._receivers.forEach((subscriber) => {
      if (error) {
        subscriber.onError(error);
      } else {
        subscriber.onComplete();
      }
    });
    this._receivers.clear();
    this._senders.forEach((subscription) => subscription.cancel());
    this._senders.clear();
    const socket = this._socket;
    if (socket) {
      socket.removeEventListener('close', this._handleClosed);
      socket.removeEventListener('error', this._handleError);
      socket.removeEventListener('open', this._handleOpened);
      socket.removeEventListener('message', this._handleMessage);
      socket.close();
      this._socket = undefined;
    }
  }

  private _setConnectionStatus(status: ConnectionStatus): void {
    this._status = status;
    this._statusSubscribers.forEach((subscriber) => subscriber.onNext(status));
  }

  private _handleClosed = (event: CloseEvent): void => {
    this._close(
      new Error(
        event.reason || 'RSocketWebSocketClient: Socket closed unexpectedly.',
      ),
    );
  };

  private _handleError = (event: Event): void => {
    this._close(new Error('RSocketWebSocketClient: WebSocket encountered an error.'));
  };

  private _handleOpened = (): void => {
    this._setConnectionStatus(CONNECTION_STATUS.CONNECTED);
  };

  private _handleMessage = (message: MessageEvent): void => {
    try {
      const frame = this._readFrame(message);
      this._receivers.forEach((subscriber) => subscriber.onNext(frame));
    } catch (error) {
      this._close(error instanceof Error ? error : new Error(String(error)));
    }
  };

  private _readFrame(message: MessageEvent): Frame {
    const buffer = toBuffer(message.data);
    const frame = this._options.lengthPrefixedFrames
      ? deserializeFrameWithLength(buffer, this._encoders)
      : deserializeFrame(buffer, this._encoders);


    return frame;
  }

  private _writeFrame(frame: Frame): void {
    try {

      const buffer = this._options.lengthPrefixedFrames
        ? serializeFrameWithLength(frame, this._encoders)
        : serializeFrame(frame, this._encoders);
      if (!this._socket) {
        throw new Error(
          'RSocketWebSocketClient: Cannot send frame, not connected.',
        );
      }
      this._socket.send(buffer);
    } catch (error) {
      this._close(error instanceof Error ? error : new Error(String(error)));
    }
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants