From 2f4c2bfbcb28f12944b3f3d2861aa7494b00c7fe Mon Sep 17 00:00:00 2001 From: Earther Date: Sun, 8 Aug 2021 14:45:25 +0700 Subject: [PATCH] client now read from blcok of data and schedule message according to offset --- client/src/components/StreamPreview.tsx | 21 +++-- client/src/components/WSTerminal.tsx | 116 ++++++++++++++++++++++-- client/src/lib/message.ts | 31 ++++++- client/src/lib/pubsub.ts | 1 + client/src/pages/[room]/index.tsx | 26 ++---- tstream/internal/cfg/cfg.go | 11 ++- tstream/pkg/message/message.go | 1 + tstream/pkg/room/room.go | 48 ++++++---- tstream/pkg/streamer/recorder.go | 60 +++++++----- tstream/pkg/streamer/streamer.go | 26 +++--- 10 files changed, 244 insertions(+), 97 deletions(-) diff --git a/client/src/components/StreamPreview.tsx b/client/src/components/StreamPreview.tsx index c5f6199..16d6f23 100644 --- a/client/src/components/StreamPreview.tsx +++ b/client/src/components/StreamPreview.tsx @@ -4,6 +4,7 @@ import PubSub from "./../lib/pubsub"; import * as base64 from "../lib/base64"; import * as util from "../lib/util"; import * as constants from "../lib/constants"; +import * as message from "../lib/message"; import dayjs from "dayjs"; import customParseFormat from 'dayjs/plugin/customParseFormat'; @@ -41,19 +42,19 @@ const StreamPreview: FC = ({ title, wsUrl, streamerID, nViewers, startedT ws.onmessage = (ev: MessageEvent) => { let msg = JSON.parse(ev.data); - if (msg.Type === constants.MSG_TWRITE) { - - let buffer = base64.str2ab(JSON.parse(window.atob(msg.Data)).Data); - tempMsg.pub(msg.Type, buffer); - - } else if (msg.Type === constants.MSG_TWINSIZE) { - - let winSizeMsg = msg.Data; - tempMsg.pub(msg.Type, winSizeMsg); + switch (msg.Type) { + case constants.MSG_TWRITEBLOCK: + let blockMsg: message.TermWriteBlock = JSON.parse(window.atob(msg.Data)); + tempMsg.pub(msg.Type, blockMsg); + break; + case constants.MSG_TWINSIZE: + let winSizeMsg = msg.Data; + tempMsg.pub(msg.Type, winSizeMsg); + break; } - } + } tempMsg.sub("request", (msgType: string) => { var payload = JSON.stringify({ diff --git a/client/src/components/WSTerminal.tsx b/client/src/components/WSTerminal.tsx index 16673e6..bcdace8 100644 --- a/client/src/components/WSTerminal.tsx +++ b/client/src/components/WSTerminal.tsx @@ -1,43 +1,55 @@ import React from "react"; import Xterm from "./Xterm"; import * as constants from "../lib/constants"; +import * as message from "../lib/message"; +import * as base64 from "../lib/base64"; +import * as pako from "pako"; import PubSub from "../lib/pubsub"; -// TODO: add handle % and px -interface Props { - msgManager: PubSub; - width: number; // in pixel - height: number; // in pixel - className?: string; -} interface Winsize { Rows: number; Cols: number; } +// TODO: add handle % and px for size +interface Props { + msgManager: PubSub; + width: number; // in pixel + height: number; // in pixel + delay?: number; + className?: string; +} class WSTerminal extends React.Component { static defaultProps = { width: -1, height: -1, + delay: 0, className: "", } termRef : React.RefObject; divRef: React.RefObject; + writeManager: WriteManager; + constructor(props: Props) { super(props) this.termRef = React.createRef(); this.divRef = React.createRef(); + let writeCB = (buffer: Uint8Array) => { + this.termRef.current?.writeUtf8(buffer); + }; + + this.writeManager = new WriteManager(writeCB, this.props.delay); } componentDidMount() { - this.props.msgManager.sub(constants.MSG_TWRITEBLOCK, (buffer: Uint8Array) => { - this.termRef.current?.writeUtf8(buffer); + this.props.msgManager.sub(constants.MSG_TWRITEBLOCK, (block: message.TermWriteBlock) => { + this.writeManager.addBlock(block); }) this.props.msgManager.sub(constants.MSG_TWINSIZE, (winsize: Winsize) => { @@ -105,4 +117,90 @@ class WSTerminal extends React.Component { } +class WriteManager { + + queue: message.TermWrite[] = []; + writeCB: (arr:Uint8Array) => void; + delay: number; // in milliseconds + + constructor(writeCB: (arr: Uint8Array) => void, delay: number = 0) { + this.writeCB = writeCB; + this.delay = delay; + } + + resetQueue() { + this.queue = []; + } + + + addQueue(q: message.TermWrite[]) { + this.queue.push(...q); // Concatnate + this.consume(); + } + + consume() { + if (this.queue.length == 0) { + return + } else { + + // any message has offset < 0 => messages from the past with respect to render time + // concat all these messages into one buffer and render at once + let bufferArray: Uint8Array[] = []; + while (true && this.queue.length != 0) { + let msg = this.queue[0]; + + if (msg.Offset < 0) { + bufferArray.push() + let buffer = base64.str2ab(msg.Data) + bufferArray.push(buffer); + this.queue.shift(); + } else break; + } + if ( bufferArray.length > 0) this.writeCB(base64.concatab(bufferArray)); + + // schedule to render upcomming messages + // TODO: are there any ways we don't have to create many settimeout liek this? + // tried sequentially call to settimeout but the performance is much worse + this.queue.forEach((msg) => { + let buffer = base64.str2ab(msg.Data); + setTimeout(() => { + this.writeCB(buffer); + }, msg.Offset); + }) + this.resetQueue(); + } + } + + addBlock(block: message.TermWriteBlock) { + // when viewers receive this block + // it only contains the actual start-time + // we need to be able to re-compute the render time based on + // - now time + // - when does this block being created + // - the delay factor. In case of play back the delay = now - stream sesion start time + let blockDelayTime = (new Date()).getTime() - (new Date(block.StartTime)).getTime() - this.delay; + + // this is a big chunk of encoding/decoding + // Since we have to : reduce message size by usign gzip and also + // every single termwrite have to be decoded, or else the rendering will screw up + // the whole block often took 9-20 milliseconds to decode a 3 seconds block of message + let data = pako.ungzip(base64.str2ab(block.Data)); + let dataArrString: string[] = JSON.parse(base64.ab2str(data)); + + // convert from string[] to message.TermWrite[] + let dataArrTermWrite: message.TermWrite[] = []; + dataArrString.forEach((data: string, index: number) => { + let writeMsg = JSON.parse(window.atob(data)); + + // re-compute the offset of this message with respect to the render time + writeMsg.Offset = writeMsg.Offset - blockDelayTime; + dataArrTermWrite.push(writeMsg); + }) + + this.addQueue(dataArrTermWrite); + + } +} + + export default WSTerminal; diff --git a/client/src/lib/message.ts b/client/src/lib/message.ts index 9e3b0b1..0095a8a 100644 --- a/client/src/lib/message.ts +++ b/client/src/lib/message.ts @@ -1,6 +1,12 @@ +//import * as pako from "pako"; +//import * as constants from "./constants"; +//import * as base64 from "./base64"; +//import PubSub from "./pubsub"; + export interface TermWriteBlock { Data: string; - Offset: number; + Duration: number; + StartTime: string; } export interface TermWrite { @@ -15,3 +21,26 @@ export interface ChatMsg { Time: string; } + + +// *** Message handlers *** +//export function hanldeWriteBlockMessage(msgData: string, msgManager: PubSub) { +// let blockMsg: TermWriteBlock = JSON.parse(window.atob(msgData)); +// // this is a big chunk of encoding/decoding +// // Since we have to : reduce message size by usign gzip and also +// // every single termwrite have to be decoded, or else the rendering will screw up +// // the whole block often took 9-20 milliseconds to decode a 3 seconds block of message +// let data = pako.ungzip(base64.str2ab(blockMsg.Data)); +// let dataArr: string[] = JSON.parse(base64.ab2str(data)); +// //let bufferArray: Uint8Array[] = []; +// dataArr.forEach((data: string) => { +// let writeMsg: TermWrite = JSON.parse(window.atob(data)); +// let buffer = base64.str2ab(writeMsg.Data) +// //bufferArray.push(buffer); +// setTimeout(() => { +// msgManager.pub(, buffer); +// }, writeMsg.Offset as number); +// }) +// //console.log(base64.concatab(bufferArray).length); +// //msgManager.pub(msg.Type, base64.concatab(bufferArray)); +//} diff --git a/client/src/lib/pubsub.ts b/client/src/lib/pubsub.ts index db21ae2..baf80c7 100644 --- a/client/src/lib/pubsub.ts +++ b/client/src/lib/pubsub.ts @@ -1,3 +1,4 @@ +// Generic pubsub class export default class PubSub { // set to true if one topic can only have one subscriber diff --git a/client/src/pages/[room]/index.tsx b/client/src/pages/[room]/index.tsx index 94d23b4..c5ee2c2 100644 --- a/client/src/pages/[room]/index.tsx +++ b/client/src/pages/[room]/index.tsx @@ -54,6 +54,7 @@ interface RoomInfo { NViewers: number; Title: string; Status: RoomStatus; + Delay: number; } interface Params { @@ -209,26 +210,10 @@ class Room extends React.Component { switch (msg.Type) { case constants.MSG_TWRITEBLOCK: - let blockMsg: message.TermWriteBlock = JSON.parse(window.atob(msg.Data)); - // this is a big chunk of encoding/decoding - // Since we have to : reduce message size by usign gzip and also - // every single termwrite have to be decoded, or else the rendering will screw up - // the whole block often took 9-20 milliseconds to decode a 3 seconds block of message - let data = pako.ungzip(base64.str2ab(blockMsg.Data)); - let dataArr: string[] = JSON.parse(base64.ab2str(data)); - //let bufferArray: Uint8Array[] = []; - dataArr.forEach((data: string) => { - let writeMsg: message.TermWrite = JSON.parse(window.atob(data)); - let buffer = base64.str2ab(writeMsg.Data) - //bufferArray.push(buffer); - setTimeout(() => { - console.log("Buffer : ", buffer.length, " offset ", writeMsg.Offset); - msgManager.pub(msg.Type, buffer); - }, writeMsg.Offset as number); - }) - //console.log(base64.concatab(bufferArray).length); - //msgManager.pub(msg.Type, base64.concatab(bufferArray)); + let blockMsg: message.TermWriteBlock = JSON.parse(window.atob(msg.Data)); + msgManager.pub(msg.Type, blockMsg); break; + case constants.MSG_TWINSIZE: msgManager.pub(msg.Type, msg.Data); @@ -329,12 +314,13 @@ class Room extends React.Component {
- {!isStreamStopped && isRoomExisted && + {!isStreamStopped && isRoomExisted && this.state.roomInfo && } {isStreamStopped && isRoomExisted && diff --git a/tstream/internal/cfg/cfg.go b/tstream/internal/cfg/cfg.go index 04a6797..f906c86 100644 --- a/tstream/internal/cfg/cfg.go +++ b/tstream/internal/cfg/cfg.go @@ -1,13 +1,14 @@ package cfg const ( - SERVER_VERSION = "1.2.0" // Used to verify compatible verion of streamer - STREAMER_VERSION = "1.2.0" // retry connect with server if websocket is broke - SERVER_STREAMER_REQUIRED_VERSION = "1.2.0" // Used to verify compatible verion of streamer + SERVER_VERSION = "1.3.0" // Used to verify compatible verion of streamer + STREAMER_VERSION = "1.3.0" // retry connect with server if websocket is broke + SERVER_STREAMER_REQUIRED_VERSION = "1.3.0" // Used to verify compatible verion of streamer // Room - ROOM_BUFFER_SIZE = 3 // number of recent broadcast message to buffer - ROOM_CACHE_MSG_SIZE = 25 // number of recent chat messages to buffer + ROOM_BUFFER_SIZE = 3 // number of recent broadcast message to buffer + ROOM_CACHE_MSG_SIZE = 25 // number of recent chat messages to buffer + ROOM_DEFAULT_DELAY = 3000 // act as both block size and delay time of streaming // Streamer STREAMER_READ_BUFFER_SIZE = 1024 // streamer websocket read buffer size diff --git a/tstream/pkg/message/message.go b/tstream/pkg/message/message.go index 6901309..341710e 100644 --- a/tstream/pkg/message/message.go +++ b/tstream/pkg/message/message.go @@ -104,6 +104,7 @@ type RoomInfo struct { Title string StreamerID string Status RoomStatus + Delay uint64 // Viewer delay time with streamer ( in milliseconds ) } // used for streamer to update room info diff --git a/tstream/pkg/room/room.go b/tstream/pkg/room/room.go index 80f118b..7bc761a 100644 --- a/tstream/pkg/room/room.go +++ b/tstream/pkg/room/room.go @@ -18,21 +18,30 @@ import ( var emptyByteArray []byte type Room struct { - lock sync.Mutex - streamer *websocket.Conn - clients map[string]*Client // Chats + viewrer connection - accViewers uint64 // accumulated viewers - name string // also is streamerID - id uint64 // Id in DB - title string + lock sync.Mutex + + streamer *websocket.Conn + sfu *SFU + clients map[string]*Client // Chats + viewrer connection + + msgBuffer []message.Wrapper + cacheChat []message.Chat + + // config + delay uint64 // Viewer delay time with streamer ( in milliseconds ) + + // states lastWinsize message.Winsize startedTime time.Time lastActiveTime time.Time - msgBuffer []message.Wrapper - cacheChat []message.Chat - status message.RoomStatus - secret string // used to verify streamer - sfu *SFU + accViewers uint64 // accumulated viewers + + // room info + id uint64 // Id in DB + name string // also is streamerID + title string + secret string // used to verify streamer + status message.RoomStatus } func New(name, title, secret string) *Room { @@ -41,16 +50,17 @@ func New(name, title, secret string) *Room { var cacheChat []message.Chat return &Room{ name: name, - accViewers: 0, - clients: clients, - lastActiveTime: time.Now(), - startedTime: time.Now(), - msgBuffer: buffer, - status: message.RStreaming, title: title, secret: secret, + clients: clients, + accViewers: 0, + msgBuffer: buffer, cacheChat: cacheChat, sfu: NewSFU(), + lastActiveTime: time.Now(), + startedTime: time.Now(), + status: message.RStreaming, + delay: 3000, } } @@ -352,7 +362,6 @@ func (r *Room) ReadAndHandleClientMessage(ID string) { } func (r *Room) Broadcast(msg message.Wrapper, roles []message.CRole, IDExclude []string) { - log.Printf("Broadcasting message: %s", msg.Type) // TODO : make this run concurrently for id, client := range r.clients { @@ -409,6 +418,7 @@ func (r *Room) PrepareRoomInfo() message.RoomInfo { StreamerID: r.name, Status: r.status, AccNViewers: r.accViewers, + Delay: r.delay, } } diff --git a/tstream/pkg/streamer/recorder.go b/tstream/pkg/streamer/recorder.go index 22ab9f3..3983629 100644 --- a/tstream/pkg/streamer/recorder.go +++ b/tstream/pkg/streamer/recorder.go @@ -25,46 +25,60 @@ type Recorder struct { // Channel to send message to out chan<- message.Wrapper - // duration of each termwriteblock + // duration of each termwriteblock to send blockDuration time.Duration + // delay of stream + delay time.Duration + currentBlock *Block } -func NewRecorder(blockDuration time.Duration, out chan<- message.Wrapper) *Recorder { - currentBlock := NewBlock(blockDuration) +func NewRecorder(blockDuration time.Duration, delay time.Duration, out chan<- message.Wrapper) *Recorder { + if delay < blockDuration { + log.Printf("Block duration(%d) should smaller than delay(%d)", blockDuration, delay) + blockDuration = delay + } + currentBlock := NewBlock(blockDuration, delay) return &Recorder{ blockDuration: blockDuration, out: out, currentBlock: currentBlock, + delay: delay, } } -/*** -Note: 3 seconds of parrot generate 70Kb of raw bytes. With gzip the data is just 6k -***/ func (r *Recorder) Start() { if r.out == nil { log.Printf("No output channel for recorder") return } + // First message + time.Sleep(r.delay) + r.Send() + // Send all message in queue after each block duration for _ = range time.Tick(r.blockDuration) { - if r.currentBlock.NQueue() == 0 { - r.newBlock() - continue - } - - payload, err := r.currentBlock.Serialize() - if err != nil { - log.Printf("Failed to serialize block") - r.newBlock() - continue - } - r.out <- payload + r.Send() + } +} + +// send all message in block and create a new one +func (r *Recorder) Send() { + if r.currentBlock.NQueue() == 0 { r.newBlock() + return } + + payload, err := r.currentBlock.Serialize() + if err != nil { + log.Printf("Failed to serialize block") + r.newBlock() + return + } + r.out <- payload + r.newBlock() } func (r *Recorder) Write(data []byte) (int, error) { @@ -77,7 +91,7 @@ func (r *Recorder) Write(data []byte) (int, error) { func (r *Recorder) newBlock() { r.lock.Lock() - r.currentBlock = NewBlock(r.blockDuration) + r.currentBlock = NewBlock(r.blockDuration, r.delay) defer r.lock.Unlock() } @@ -92,16 +106,19 @@ type Block struct { // how many milliseconds of data this block contains duration time.Duration + delay time.Duration + // queue of encoded termwrite message queue [][]byte } -func NewBlock(duration time.Duration) *Block { +func NewBlock(duration time.Duration, delay time.Duration) *Block { var queue [][]byte return &Block{ duration: duration, queue: queue, startTime: time.Now(), + delay: delay, } } @@ -116,6 +133,8 @@ func (bl *Block) Serialize() (message.Wrapper, error) { } // compress with gzip + // with gzip data often compressed to 1/10 -> 1/8 its original + // Note: 3 seconds of parrot generate 70Kb of raw bytes. With gzip the data is just 6k var b bytes.Buffer gz := gzip.NewWriter(&b) if _, err := gz.Write(dataByte); err != nil { @@ -131,7 +150,6 @@ func (bl *Block) Serialize() (message.Wrapper, error) { Duration: bl.duration.Milliseconds(), Data: b.Bytes(), } - log.Printf("bytes: %d, raw: %d", len(b.Bytes()), len(dataByte)) blockByte, err := json.Marshal(blockMsg) if err != nil { diff --git a/tstream/pkg/streamer/streamer.go b/tstream/pkg/streamer/streamer.go index 8424c26..5f6b947 100644 --- a/tstream/pkg/streamer/streamer.go +++ b/tstream/pkg/streamer/streamer.go @@ -32,8 +32,9 @@ type Streamer struct { recorder *Recorder Out chan message.Wrapper In chan message.Wrapper - // interval of sending message in queue - interval time.Duration + // delay of sending message in queue + delay time.Duration + blockDuration time.Duration } func New(clientAddr, serverAddr, username, title string) *Streamer { @@ -44,15 +45,16 @@ func New(clientAddr, serverAddr, username, title string) *Streamer { secret := GetSecret(CONFIG_PATH) return &Streamer{ - secret: secret, - pty: pty, - serverAddr: serverAddr, - clientAddr: clientAddr, - username: username, - title: title, - Out: out, - In: in, - interval: 3 * time.Second, + secret: secret, + pty: pty, + serverAddr: serverAddr, + clientAddr: clientAddr, + username: username, + title: title, + Out: out, + In: in, + delay: 3 * time.Second, + blockDuration: 3 * time.Second, // block size has to smaller than delay } } @@ -78,7 +80,7 @@ func (s *Streamer) Start() error { } // Init transporter - s.recorder = NewRecorder(s.interval, s.Out) + s.recorder = NewRecorder(s.blockDuration, s.delay, s.Out) go s.recorder.Start() fmt.Printf("🔥 Streaming at: %s/%s\n", s.clientAddr, s.username)