Skip to content

Commit

Permalink
Merge pull request #47 from qnkhuat/playback
Browse files Browse the repository at this point in the history
Improve stream quality and big changes in how we send data
  • Loading branch information
qnkhuat authored Aug 8, 2021
2 parents 6c94231 + 9c50e90 commit 3c2a05d
Show file tree
Hide file tree
Showing 21 changed files with 479 additions and 137 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@
- [x] In room Chat
- [x] Voice chat
- [ ] Stream playback
- [ ] Private session
- [ ] Multiple tabs support
- [ ] User management system

14 changes: 14 additions & 0 deletions client/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"axios": "^0.21.1",
"dayjs": "^1.10.5",
"moment": "^2.29.1",
"pako": "^1.0.11",
"react": "^17.0.2",
"react-dom": "^17.0.2",
"react-router-dom": "^5.2.0",
Expand Down Expand Up @@ -61,6 +62,7 @@
]
},
"devDependencies": {
"@types/pako": "^1.0.2",
"@types/react-router-dom": "^5.1.7",
"@types/string-similarity": "^4.0.0",
"@types/url-join": "^4.0.0",
Expand Down
20 changes: 7 additions & 13 deletions client/src/components/Chat.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import React from "react";
import * as constants from "../lib/constants";
import * as message from "../lib/message";
import PubSub from "../lib/pubsub";
import TextField from '@material-ui/core/TextField';
import KeyboardArrowRightRoundedIcon from '@material-ui/icons/KeyboardArrowRightRounded';
Expand All @@ -19,22 +20,15 @@ interface Props {
className?: string;
}

export interface ChatMsg {
Name: string;
Content: string;
Color: string;
Time: string;
}

interface State {
msgList: ChatMsg[];
msgList: message.ChatMsg[];
inputContent: string;
userConfig: TstreamUser | null;
isWaitingUsername: boolean,
tempMsg: string,
}

const ChatSection: React.FC<ChatMsg> = ({ Name, Content, Color, Time}) => {
const ChatSection: React.FC<message.ChatMsg> = ({ Name, Content, Color, Time}) => {
return (
<>
<div className="w-full flex p-2 hover:bg-gray-900 rounded-lg">
Expand Down Expand Up @@ -71,8 +65,8 @@ class Chat extends React.Component<Props, State> {
}
}

addNewMsg(chatMsg: ChatMsg) {
let newMsgList = this.state.msgList as ChatMsg[];
addNewMsg(chatMsg: message.ChatMsg) {
let newMsgList = this.state.msgList as message.ChatMsg[];
newMsgList.push(chatMsg);
this.setState({
msgList: newMsgList,
Expand All @@ -81,11 +75,11 @@ class Chat extends React.Component<Props, State> {
}

componentDidMount() {
this.props.msgManager?.sub(constants.MSG_TCHAT_IN, (cacheChat: Array<ChatMsg>) => {
this.props.msgManager?.sub(constants.MSG_TCHAT_IN, (cacheChat: Array<message.ChatMsg>) => {
if (cacheChat === null) {
return;
}
let newMsgList = this.state.msgList as ChatMsg[];
let newMsgList = this.state.msgList as message.ChatMsg[];
for (let i = 0; i < cacheChat.length; i++) {
newMsgList.push(cacheChat[i]);
}
Expand Down
21 changes: 11 additions & 10 deletions client/src/components/StreamPreview.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import PubSub from "./../lib/pubsub";
import * as base64 from "../lib/base64";
import * as util from "../lib/util";
import * as constants from "../lib/constants";
import * as message from "../lib/message";

import dayjs from "dayjs";
import customParseFormat from 'dayjs/plugin/customParseFormat';
Expand Down Expand Up @@ -41,19 +42,19 @@ const StreamPreview: FC<Props> = ({ title, wsUrl, streamerID, nViewers, startedT
ws.onmessage = (ev: MessageEvent) => {
let msg = JSON.parse(ev.data);

if (msg.Type === constants.MSG_TWRITE) {

let buffer = base64.str2ab(JSON.parse(window.atob(msg.Data)).Data);
tempMsg.pub(msg.Type, buffer);

} else if (msg.Type === constants.MSG_TWINSIZE) {

let winSizeMsg = msg.Data;
tempMsg.pub(msg.Type, winSizeMsg);
switch (msg.Type) {
case constants.MSG_TWRITEBLOCK:
let blockMsg: message.TermWriteBlock = JSON.parse(window.atob(msg.Data));
tempMsg.pub(msg.Type, blockMsg);
break;

case constants.MSG_TWINSIZE:
let winSizeMsg = msg.Data;
tempMsg.pub(msg.Type, winSizeMsg);
break;
}
}

}
tempMsg.sub("request", (msgType: string) => {

var payload = JSON.stringify({
Expand Down
118 changes: 108 additions & 10 deletions client/src/components/WSTerminal.tsx
Original file line number Diff line number Diff line change
@@ -1,43 +1,55 @@
import React from "react";
import Xterm from "./Xterm";
import * as constants from "../lib/constants";
import * as message from "../lib/message";
import * as base64 from "../lib/base64";
import * as pako from "pako";
import PubSub from "../lib/pubsub";

// TODO: add handle % and px
interface Props {
msgManager: PubSub;
width: number; // in pixel
height: number; // in pixel
className?: string;
}

interface Winsize {
Rows: number;
Cols: number;
}

// TODO: add handle % and px for size
interface Props {
msgManager: PubSub;
width: number; // in pixel
height: number; // in pixel
delay?: number;
className?: string;
}

class WSTerminal extends React.Component<Props, {}> {

static defaultProps = {
width: -1,
height: -1,
delay: 0,
className: "",
}

termRef : React.RefObject<Xterm>;
divRef: React.RefObject<HTMLDivElement>;
writeManager: WriteManager;


constructor(props: Props) {
super(props)
this.termRef = React.createRef<Xterm>();
this.divRef = React.createRef<HTMLDivElement>();
let writeCB = (buffer: Uint8Array) => {
this.termRef.current?.writeUtf8(buffer);
};

this.writeManager = new WriteManager(writeCB, this.props.delay);
}


componentDidMount() {
this.props.msgManager.sub(constants.MSG_TWRITE, (buffer: Uint8Array) => {
this.termRef.current?.writeUtf8(buffer);
this.props.msgManager.sub(constants.MSG_TWRITEBLOCK, (block: message.TermWriteBlock) => {
this.writeManager.addBlock(block);
})

this.props.msgManager.sub(constants.MSG_TWINSIZE, (winsize: Winsize) => {
Expand All @@ -60,7 +72,7 @@ class WSTerminal extends React.Component<Props, {}> {
}

componentWillUnmount() {
this.props.msgManager.unsub(constants.MSG_TWRITE);
this.props.msgManager.unsub(constants.MSG_TWRITEBLOCK);
this.props.msgManager.unsub(constants.MSG_TWINSIZE);
}

Expand Down Expand Up @@ -105,4 +117,90 @@ class WSTerminal extends React.Component<Props, {}> {

}

class WriteManager {

queue: message.TermWrite[] = [];
writeCB: (arr:Uint8Array) => void;
delay: number; // in milliseconds

constructor(writeCB: (arr: Uint8Array) => void, delay: number = 0) {
this.writeCB = writeCB;
this.delay = delay;
}

resetQueue() {
this.queue = [];
}


addQueue(q: message.TermWrite[]) {
this.queue.push(...q); // Concatnate
this.consume();
}

consume() {
if (this.queue.length == 0) {
return
} else {

// any message has offset < 0 => messages from the past with respect to render time
// concat all these messages into one buffer and render at once
let bufferArray: Uint8Array[] = [];
while (true && this.queue.length != 0) {
let msg = this.queue[0];

if (msg.Offset < 0) {
bufferArray.push()
let buffer = base64.str2ab(msg.Data)
bufferArray.push(buffer);
this.queue.shift();
} else break;
}
if ( bufferArray.length > 0) this.writeCB(base64.concatab(bufferArray));

// schedule to render upcomming messages
// TODO: are there any ways we don't have to create many settimeout liek this?
// tried sequentially call to settimeout but the performance is much worse
this.queue.forEach((msg) => {
let buffer = base64.str2ab(msg.Data);
setTimeout(() => {
this.writeCB(buffer);
}, msg.Offset);
})
this.resetQueue();
}
}

addBlock(block: message.TermWriteBlock) {
// when viewers receive this block
// it only contains the actual start-time
// we need to be able to re-compute the render time based on
// - now time
// - when does this block being created
// - the delay factor. In case of play back the delay = now - stream sesion start time
let blockDelayTime = (new Date()).getTime() - (new Date(block.StartTime)).getTime() - this.delay;

// this is a big chunk of encoding/decoding
// Since we have to : reduce message size by usign gzip and also
// every single termwrite have to be decoded, or else the rendering will screw up
// the whole block often took 9-20 milliseconds to decode a 3 seconds block of message
let data = pako.ungzip(base64.str2ab(block.Data));
let dataArrString: string[] = JSON.parse(base64.ab2str(data));

// convert from string[] to message.TermWrite[]
let dataArrTermWrite: message.TermWrite[] = [];
dataArrString.forEach((data: string, index: number) => {
let writeMsg = JSON.parse(window.atob(data));

// re-compute the offset of this message with respect to the render time
writeMsg.Offset = writeMsg.Offset - blockDelayTime;
dataArrTermWrite.push(writeMsg);
})

this.addQueue(dataArrTermWrite);

}
}


export default WSTerminal;
19 changes: 16 additions & 3 deletions client/src/lib/base64.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,20 @@ export function str2ab(input:string): Uint8Array{
return bytes;
}

// array buffer to string
export function ab2str(buf: number[]): string{
return String.fromCharCode.apply(null, buf);
export function ab2str(buf: any): string{
return new TextDecoder().decode(buf);
}

export function concatab(array: Uint8Array[]): Uint8Array {
let len = 0;
array.forEach((a) => { len += a.byteLength; });

let result = new Uint8Array(len);
let runningIndex = 0
array.forEach((a) => {
result.set(new Uint8Array(a), runningIndex);
runningIndex += a.byteLength;
});
return result;

}
1 change: 1 addition & 0 deletions client/src/lib/constants.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Message type
export const MSG_TWRITE = "Write";
export const MSG_TWRITEBLOCK = "WriteBlock";
export const MSG_TWINSIZE = "Winsize";
export const MSG_TROOM_INFO = "RoomInfo";
export const MSG_TCHAT = "Chat";
Expand Down
17 changes: 17 additions & 0 deletions client/src/lib/message.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export interface TermWriteBlock {
Data: string;
Duration: number;
StartTime: string;
}

export interface TermWrite {
Data: string;
Offset: number;
}

export interface ChatMsg {
Name: string;
Content: string;
Color: string;
Time: string;
}
1 change: 1 addition & 0 deletions client/src/lib/pubsub.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Generic pubsub class
export default class PubSub {

// set to true if one topic can only have one subscriber
Expand Down
Loading

1 comment on commit 3c2a05d

@vercel
Copy link

@vercel vercel bot commented on 3c2a05d Aug 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.