Skip to content

Commit

Permalink
client now read from blcok of data and schedule message according to …
Browse files Browse the repository at this point in the history
…offset
  • Loading branch information
qnkhuat committed Aug 8, 2021
1 parent e780eb5 commit 2f4c2bf
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 97 deletions.
21 changes: 11 additions & 10 deletions client/src/components/StreamPreview.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import PubSub from "./../lib/pubsub";
import * as base64 from "../lib/base64";
import * as util from "../lib/util";
import * as constants from "../lib/constants";
import * as message from "../lib/message";

import dayjs from "dayjs";
import customParseFormat from 'dayjs/plugin/customParseFormat';
Expand Down Expand Up @@ -41,19 +42,19 @@ const StreamPreview: FC<Props> = ({ title, wsUrl, streamerID, nViewers, startedT
ws.onmessage = (ev: MessageEvent) => {
let msg = JSON.parse(ev.data);

if (msg.Type === constants.MSG_TWRITE) {

let buffer = base64.str2ab(JSON.parse(window.atob(msg.Data)).Data);
tempMsg.pub(msg.Type, buffer);

} else if (msg.Type === constants.MSG_TWINSIZE) {

let winSizeMsg = msg.Data;
tempMsg.pub(msg.Type, winSizeMsg);
switch (msg.Type) {
case constants.MSG_TWRITEBLOCK:
let blockMsg: message.TermWriteBlock = JSON.parse(window.atob(msg.Data));
tempMsg.pub(msg.Type, blockMsg);
break;

case constants.MSG_TWINSIZE:
let winSizeMsg = msg.Data;
tempMsg.pub(msg.Type, winSizeMsg);
break;
}
}

}
tempMsg.sub("request", (msgType: string) => {

var payload = JSON.stringify({
Expand Down
116 changes: 107 additions & 9 deletions client/src/components/WSTerminal.tsx
Original file line number Diff line number Diff line change
@@ -1,43 +1,55 @@
import React from "react";
import Xterm from "./Xterm";
import * as constants from "../lib/constants";
import * as message from "../lib/message";
import * as base64 from "../lib/base64";
import * as pako from "pako";
import PubSub from "../lib/pubsub";

// TODO: add handle % and px
interface Props {
msgManager: PubSub;
width: number; // in pixel
height: number; // in pixel
className?: string;
}

interface Winsize {
Rows: number;
Cols: number;
}

// TODO: add handle % and px for size
interface Props {
msgManager: PubSub;
width: number; // in pixel
height: number; // in pixel
delay?: number;
className?: string;
}

class WSTerminal extends React.Component<Props, {}> {

static defaultProps = {
width: -1,
height: -1,
delay: 0,
className: "",
}

termRef : React.RefObject<Xterm>;
divRef: React.RefObject<HTMLDivElement>;
writeManager: WriteManager;


constructor(props: Props) {
super(props)
this.termRef = React.createRef<Xterm>();
this.divRef = React.createRef<HTMLDivElement>();
let writeCB = (buffer: Uint8Array) => {
this.termRef.current?.writeUtf8(buffer);
};

this.writeManager = new WriteManager(writeCB, this.props.delay);
}


componentDidMount() {
this.props.msgManager.sub(constants.MSG_TWRITEBLOCK, (buffer: Uint8Array) => {
this.termRef.current?.writeUtf8(buffer);
this.props.msgManager.sub(constants.MSG_TWRITEBLOCK, (block: message.TermWriteBlock) => {
this.writeManager.addBlock(block);
})

this.props.msgManager.sub(constants.MSG_TWINSIZE, (winsize: Winsize) => {
Expand Down Expand Up @@ -105,4 +117,90 @@ class WSTerminal extends React.Component<Props, {}> {

}

class WriteManager {

queue: message.TermWrite[] = [];
writeCB: (arr:Uint8Array) => void;
delay: number; // in milliseconds

constructor(writeCB: (arr: Uint8Array) => void, delay: number = 0) {
this.writeCB = writeCB;
this.delay = delay;
}

resetQueue() {
this.queue = [];
}


addQueue(q: message.TermWrite[]) {
this.queue.push(...q); // Concatnate
this.consume();
}

consume() {
if (this.queue.length == 0) {
return
} else {

// any message has offset < 0 => messages from the past with respect to render time
// concat all these messages into one buffer and render at once
let bufferArray: Uint8Array[] = [];
while (true && this.queue.length != 0) {
let msg = this.queue[0];

if (msg.Offset < 0) {
bufferArray.push()
let buffer = base64.str2ab(msg.Data)
bufferArray.push(buffer);
this.queue.shift();
} else break;
}
if ( bufferArray.length > 0) this.writeCB(base64.concatab(bufferArray));

// schedule to render upcomming messages
// TODO: are there any ways we don't have to create many settimeout liek this?
// tried sequentially call to settimeout but the performance is much worse
this.queue.forEach((msg) => {
let buffer = base64.str2ab(msg.Data);
setTimeout(() => {
this.writeCB(buffer);
}, msg.Offset);
})
this.resetQueue();
}
}

addBlock(block: message.TermWriteBlock) {
// when viewers receive this block
// it only contains the actual start-time
// we need to be able to re-compute the render time based on
// - now time
// - when does this block being created
// - the delay factor. In case of play back the delay = now - stream sesion start time
let blockDelayTime = (new Date()).getTime() - (new Date(block.StartTime)).getTime() - this.delay;

// this is a big chunk of encoding/decoding
// Since we have to : reduce message size by usign gzip and also
// every single termwrite have to be decoded, or else the rendering will screw up
// the whole block often took 9-20 milliseconds to decode a 3 seconds block of message
let data = pako.ungzip(base64.str2ab(block.Data));
let dataArrString: string[] = JSON.parse(base64.ab2str(data));

// convert from string[] to message.TermWrite[]
let dataArrTermWrite: message.TermWrite[] = [];
dataArrString.forEach((data: string, index: number) => {
let writeMsg = JSON.parse(window.atob(data));

// re-compute the offset of this message with respect to the render time
writeMsg.Offset = writeMsg.Offset - blockDelayTime;
dataArrTermWrite.push(writeMsg);
})

this.addQueue(dataArrTermWrite);

}
}


export default WSTerminal;
31 changes: 30 additions & 1 deletion client/src/lib/message.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
//import * as pako from "pako";
//import * as constants from "./constants";
//import * as base64 from "./base64";
//import PubSub from "./pubsub";

export interface TermWriteBlock {
Data: string;
Offset: number;
Duration: number;
StartTime: string;
}

export interface TermWrite {
Expand All @@ -15,3 +21,26 @@ export interface ChatMsg {
Time: string;
}



// *** Message handlers ***
//export function hanldeWriteBlockMessage(msgData: string, msgManager: PubSub) {
// let blockMsg: TermWriteBlock = JSON.parse(window.atob(msgData));
// // this is a big chunk of encoding/decoding
// // Since we have to : reduce message size by usign gzip and also
// // every single termwrite have to be decoded, or else the rendering will screw up
// // the whole block often took 9-20 milliseconds to decode a 3 seconds block of message
// let data = pako.ungzip(base64.str2ab(blockMsg.Data));
// let dataArr: string[] = JSON.parse(base64.ab2str(data));
// //let bufferArray: Uint8Array[] = [];
// dataArr.forEach((data: string) => {
// let writeMsg: TermWrite = JSON.parse(window.atob(data));
// let buffer = base64.str2ab(writeMsg.Data)
// //bufferArray.push(buffer);
// setTimeout(() => {
// msgManager.pub(, buffer);
// }, writeMsg.Offset as number);
// })
// //console.log(base64.concatab(bufferArray).length);
// //msgManager.pub(msg.Type, base64.concatab(bufferArray));
//}
1 change: 1 addition & 0 deletions client/src/lib/pubsub.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Generic pubsub class
export default class PubSub {

// set to true if one topic can only have one subscriber
Expand Down
26 changes: 6 additions & 20 deletions client/src/pages/[room]/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ interface RoomInfo {
NViewers: number;
Title: string;
Status: RoomStatus;
Delay: number;
}

interface Params {
Expand Down Expand Up @@ -209,26 +210,10 @@ class Room extends React.Component<Props, State> {
switch (msg.Type) {

case constants.MSG_TWRITEBLOCK:
let blockMsg: message.TermWriteBlock = JSON.parse(window.atob(msg.Data));
// this is a big chunk of encoding/decoding
// Since we have to : reduce message size by usign gzip and also
// every single termwrite have to be decoded, or else the rendering will screw up
// the whole block often took 9-20 milliseconds to decode a 3 seconds block of message
let data = pako.ungzip(base64.str2ab(blockMsg.Data));
let dataArr: string[] = JSON.parse(base64.ab2str(data));
//let bufferArray: Uint8Array[] = [];
dataArr.forEach((data: string) => {
let writeMsg: message.TermWrite = JSON.parse(window.atob(data));
let buffer = base64.str2ab(writeMsg.Data)
//bufferArray.push(buffer);
setTimeout(() => {
console.log("Buffer : ", buffer.length, " offset ", writeMsg.Offset);
msgManager.pub(msg.Type, buffer);
}, writeMsg.Offset as number);
})
//console.log(base64.concatab(bufferArray).length);
//msgManager.pub(msg.Type, base64.concatab(bufferArray));
let blockMsg: message.TermWriteBlock = JSON.parse(window.atob(msg.Data));
msgManager.pub(msg.Type, blockMsg);
break;

case constants.MSG_TWINSIZE:

msgManager.pub(msg.Type, msg.Data);
Expand Down Expand Up @@ -329,12 +314,13 @@ class Room extends React.Component<Props, State> {

<div id="terminal-window">

{!isStreamStopped && isRoomExisted &&
{!isStreamStopped && isRoomExisted && this.state.roomInfo &&
<WSTerminal
className="bg-black"
msgManager={this.msgManager}
width={terminalSize.width}
height={terminalSize.height}
delay={this.state.roomInfo.Delay}
/>}

{isStreamStopped && isRoomExisted &&
Expand Down
11 changes: 6 additions & 5 deletions tstream/internal/cfg/cfg.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package cfg

const (
SERVER_VERSION = "1.2.0" // Used to verify compatible verion of streamer
STREAMER_VERSION = "1.2.0" // retry connect with server if websocket is broke
SERVER_STREAMER_REQUIRED_VERSION = "1.2.0" // Used to verify compatible verion of streamer
SERVER_VERSION = "1.3.0" // Used to verify compatible verion of streamer
STREAMER_VERSION = "1.3.0" // retry connect with server if websocket is broke
SERVER_STREAMER_REQUIRED_VERSION = "1.3.0" // Used to verify compatible verion of streamer

// Room
ROOM_BUFFER_SIZE = 3 // number of recent broadcast message to buffer
ROOM_CACHE_MSG_SIZE = 25 // number of recent chat messages to buffer
ROOM_BUFFER_SIZE = 3 // number of recent broadcast message to buffer
ROOM_CACHE_MSG_SIZE = 25 // number of recent chat messages to buffer
ROOM_DEFAULT_DELAY = 3000 // act as both block size and delay time of streaming

// Streamer
STREAMER_READ_BUFFER_SIZE = 1024 // streamer websocket read buffer size
Expand Down
1 change: 1 addition & 0 deletions tstream/pkg/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ type RoomInfo struct {
Title string
StreamerID string
Status RoomStatus
Delay uint64 // Viewer delay time with streamer ( in milliseconds )
}

// used for streamer to update room info
Expand Down
Loading

0 comments on commit 2f4c2bf

Please sign in to comment.