From 816ca769ca0419b3a37cc234f6c820102da89276 Mon Sep 17 00:00:00 2001 From: Earther Date: Thu, 5 Aug 2021 16:50:08 +0700 Subject: [PATCH 1/5] Simple implementation of caching instead of sending for each request --- client/src/components/Chat.tsx | 20 +++---- client/src/components/WSTerminal.tsx | 1 + client/src/lib/message.ts | 12 ++++ client/src/pages/[room]/index.tsx | 18 ++++-- tstream/pkg/message/message.go | 1 + tstream/pkg/playback/playback.go | 60 ++++++++++++++++++++ tstream/pkg/ptyMaster/ptyMaster.go | 1 - tstream/pkg/room/room.go | 7 ++- tstream/pkg/room/sfu.go | 10 ---- tstream/pkg/streamer/streamer.go | 19 +++++-- tstream/pkg/streamer/transporter.go | 83 ++++++++++++++++++++++++++++ 11 files changed, 196 insertions(+), 36 deletions(-) create mode 100644 client/src/lib/message.ts create mode 100644 tstream/pkg/playback/playback.go create mode 100644 tstream/pkg/streamer/transporter.go diff --git a/client/src/components/Chat.tsx b/client/src/components/Chat.tsx index 4ccb138..bf3258a 100644 --- a/client/src/components/Chat.tsx +++ b/client/src/components/Chat.tsx @@ -1,5 +1,6 @@ import React from "react"; import * as constants from "../lib/constants"; +import * as message from "../lib/message"; import PubSub from "../lib/pubsub"; import TextField from '@material-ui/core/TextField'; import KeyboardArrowRightRoundedIcon from '@material-ui/icons/KeyboardArrowRightRounded'; @@ -19,22 +20,15 @@ interface Props { className?: string; } -export interface ChatMsg { - Name: string; - Content: string; - Color: string; - Time: string; -} - interface State { - msgList: ChatMsg[]; + msgList: message.ChatMsg[]; inputContent: string; userConfig: TstreamUser | null; isWaitingUsername: boolean, tempMsg: string, } -const ChatSection: React.FC = ({ Name, Content, Color, Time}) => { +const ChatSection: React.FC = ({ Name, Content, Color, Time}) => { return ( <>
@@ -71,8 +65,8 @@ class Chat extends React.Component { } } - addNewMsg(chatMsg: ChatMsg) { - let newMsgList = this.state.msgList as ChatMsg[]; + addNewMsg(chatMsg: message.ChatMsg) { + let newMsgList = this.state.msgList as message.ChatMsg[]; newMsgList.push(chatMsg); this.setState({ msgList: newMsgList, @@ -81,11 +75,11 @@ class Chat extends React.Component { } componentDidMount() { - this.props.msgManager?.sub(constants.MSG_TCHAT_IN, (cacheChat: Array) => { + this.props.msgManager?.sub(constants.MSG_TCHAT_IN, (cacheChat: Array) => { if (cacheChat === null) { return; } - let newMsgList = this.state.msgList as ChatMsg[]; + let newMsgList = this.state.msgList as message.ChatMsg[]; for (let i = 0; i < cacheChat.length; i++) { newMsgList.push(cacheChat[i]); } diff --git a/client/src/components/WSTerminal.tsx b/client/src/components/WSTerminal.tsx index 6aaaa97..1309e2d 100644 --- a/client/src/components/WSTerminal.tsx +++ b/client/src/components/WSTerminal.tsx @@ -37,6 +37,7 @@ class WSTerminal extends React.Component { componentDidMount() { this.props.msgManager.sub(constants.MSG_TWRITE, (buffer: Uint8Array) => { + console.log("Got message ", buffer.length); this.termRef.current?.writeUtf8(buffer); }) diff --git a/client/src/lib/message.ts b/client/src/lib/message.ts new file mode 100644 index 0000000..65fd548 --- /dev/null +++ b/client/src/lib/message.ts @@ -0,0 +1,12 @@ +export interface TermWrite { + Data: string; + Time: number; +} + +export interface ChatMsg { + Name: string; + Content: string; + Color: string; + Time: string; +} + diff --git a/client/src/pages/[room]/index.tsx b/client/src/pages/[room]/index.tsx index 02a6d3d..dbf5128 100644 --- a/client/src/pages/[room]/index.tsx +++ b/client/src/pages/[room]/index.tsx @@ -4,9 +4,10 @@ import { RouteComponentProps, withRouter } from "react-router-dom"; import * as base64 from "../../lib/base64"; import * as util from "../../lib/util"; import * as constants from "../../lib/constants"; +import * as message from "../../lib/message"; import PubSub from "../../lib/pubsub"; -import Chat, { ChatMsg } from "../../components/Chat"; +import Chat from "../../components/Chat"; import Navbar from "../../components/Navbar"; import WSTerminal from "../../components/WSTerminal"; import Uptime from "../../components/Uptime"; @@ -206,8 +207,15 @@ class Room extends React.Component { if (msg.Type === constants.MSG_TWRITE) { - let buffer = base64.str2ab(JSON.parse(window.atob(msg.Data)).Data); - msgManager.pub(msg.Type, buffer); + let arr = JSON.parse(window.atob(msg.Data)); + arr.forEach((el: string, i: number) => { + let singleMsg = JSON.parse(window.atob(el)); + let buffer = base64.str2ab(singleMsg.Data); + setTimeout(() => { + msgManager.pub(msg.Type, buffer); + }, singleMsg.Time as number); + }) + } else if (msg.Type === constants.MSG_TWINSIZE) { @@ -236,8 +244,8 @@ class Room extends React.Component { }) - msgManager.sub(constants.MSG_TCHAT_OUT, (chat:ChatMsg) => { - let chatList: ChatMsg[] = [chat]; + msgManager.sub(constants.MSG_TCHAT_OUT, (chat:message.ChatMsg) => { + let chatList: message.ChatMsg[] = [chat]; let payload = JSON.stringify({ Type: constants.MSG_TCHAT, diff --git a/tstream/pkg/message/message.go b/tstream/pkg/message/message.go index 9d296c4..6129415 100644 --- a/tstream/pkg/message/message.go +++ b/tstream/pkg/message/message.go @@ -61,6 +61,7 @@ type Winsize struct { type TermWrite struct { Data []byte + Time int64 } type Chat struct { diff --git a/tstream/pkg/playback/playback.go b/tstream/pkg/playback/playback.go new file mode 100644 index 0000000..3f5bc61 --- /dev/null +++ b/tstream/pkg/playback/playback.go @@ -0,0 +1,60 @@ +package playback + +/*** +File size note: +7 Min parrot: 8.8 Mbs + +***/ +import ( + "encoding/json" + "github.com/qnkhuat/tstream/pkg/message" + "log" + "os" + "path/filepath" + "strconv" +) + +type Playback struct { + id uint64 + root string + queue []message.TermWrite + In chan message.Wrapper +} + +func New(id uint64, root string) *Playback { + in := make(chan message.Wrapper, 256) + return &Playback{ + id: id, + root: root, + In: in, + } +} + +func (p *Playback) Start() { + // Open a json writer file + // fileroot is root/id.json + path := filepath.Join(p.root, strconv.FormatUint(p.id, 10)+".json") + + f, err := os.Create(path) + if err != nil { + log.Printf("Failed to create playback file") + return + } + + log.Printf("Playback are saving to %s", path) + + defer f.Close() + // Start the playback + for { + msg, ok := <-p.In + if !ok { + log.Printf("Failed to read from channel") + continue + } + + // Write the message to the file + if err := json.NewEncoder(f).Encode(msg); err != nil { + log.Printf("Failed ot save message: %s", err) + } + } +} diff --git a/tstream/pkg/ptyMaster/ptyMaster.go b/tstream/pkg/ptyMaster/ptyMaster.go index dec84be..f0fff81 100644 --- a/tstream/pkg/ptyMaster/ptyMaster.go +++ b/tstream/pkg/ptyMaster/ptyMaster.go @@ -8,7 +8,6 @@ package ptyMaster import ( ptyDevice "github.com/creack/pty" - //"golang.org/x/term" term "golang.org/x/crypto/ssh/terminal" "log" "os" diff --git a/tstream/pkg/room/room.go b/tstream/pkg/room/room.go index 3a1c76a..47a0e54 100644 --- a/tstream/pkg/room/room.go +++ b/tstream/pkg/room/room.go @@ -9,6 +9,7 @@ import ( "github.com/gorilla/websocket" "github.com/qnkhuat/tstream/internal/cfg" "github.com/qnkhuat/tstream/pkg/message" + "github.com/qnkhuat/tstream/pkg/playback" "log" "strings" "sync" @@ -33,12 +34,14 @@ type Room struct { status message.RoomStatus secret string // used to verify streamer sfu *SFU + playback *playback.Playback } func New(name, title, secret string) *Room { clients := make(map[string]*Client) var buffer []message.Wrapper var cacheChat []message.Chat + playback := playback.New(234234324, "./") return &Room{ name: name, accViewers: 0, @@ -51,6 +54,7 @@ func New(name, title, secret string) *Room { secret: secret, cacheChat: cacheChat, sfu: NewSFU(), + playback: playback, } } @@ -110,7 +114,7 @@ func (r *Room) Streamer() *websocket.Conn { // Wait for request from streamer and broadcast those message to clients func (r *Room) Start() { - r.sfu.Start() + go r.playback.Start() go func() { for _ = range time.Tick(cfg.SERVER_CLEAN_INTERVAL * time.Second) { @@ -147,6 +151,7 @@ func (r *Room) Start() { r.addMsgBuffer(msg) r.lastActiveTime = time.Now() r.Broadcast(msg, []message.CRole{message.RViewer}, []string{}) + r.playback.In <- msg default: log.Printf("Unknown message type: %s", msgType) diff --git a/tstream/pkg/room/sfu.go b/tstream/pkg/room/sfu.go index 2b874a3..59c0020 100644 --- a/tstream/pkg/room/sfu.go +++ b/tstream/pkg/room/sfu.go @@ -40,16 +40,6 @@ func NewSFU() *SFU { } } -func (s *SFU) Start() { - // TODO: this should be requested by client, not server auto send it every 3 seconds - // request a keyframe every 3 seconds - //go func() { - // for range time.NewTicker(time.Second * 3).C { - // s.sendKeyFrame() - // } - //}() -} - // TODO : break down this method func (s *SFU) AddPeer(cl *Client) error { diff --git a/tstream/pkg/streamer/streamer.go b/tstream/pkg/streamer/streamer.go index 98afea2..8875c73 100644 --- a/tstream/pkg/streamer/streamer.go +++ b/tstream/pkg/streamer/streamer.go @@ -29,14 +29,15 @@ type Streamer struct { secret string title string conn *websocket.Conn - Out chan interface{} - In chan interface{} + tr *Transporter + Out chan message.Wrapper + In chan message.Wrapper } func New(clientAddr, serverAddr, username, title string) *Streamer { pty := ptyMaster.New() - out := make(chan interface{}, 256) // buffer 256 send requests - in := make(chan interface{}, 256) // buffer 256 send requests + out := make(chan message.Wrapper, 256) // buffer 256 send requests + in := make(chan message.Wrapper, 256) // buffer 256 send requests secret := GetSecret(CONFIG_PATH) @@ -64,6 +65,7 @@ func (s *Streamer) Start() error { fmt.Printf("Press Enter to continue!") bufio.NewReader(os.Stdin).ReadString('\n') + // Init websocket connection err := s.ConnectWS() if err != nil { log.Println(err) @@ -72,6 +74,10 @@ func (s *Streamer) Start() error { return err } + // Init transporter + s.tr = NewTransporter(s.Out) + go s.tr.Start() + fmt.Printf("🔥 Streaming at: %s/%s\n", s.clientAddr, s.username) s.pty.MakeRaw() @@ -87,7 +93,8 @@ func (s *Streamer) Start() error { // Pipe command response to Pty and server go func() { - mw := io.MultiWriter(os.Stdout, s) + //mw := io.MultiWriter(os.Stdout, s, s.tr) + mw := io.MultiWriter(os.Stdout, s.tr) _, err := io.Copy(mw, s.pty.F()) if err != nil { log.Printf("Failed to send pty to mw: %s", err) @@ -253,7 +260,7 @@ func (s *Streamer) Write(data []byte) (int, error) { // the xterm will show duplciated text // Clue: marshal ensure data is encoded in UTF-8 dataByte, _ := json.Marshal(message.TermWrite{Data: data}) - payload := &message.Wrapper{ + payload := message.Wrapper{ Type: message.TWrite, Data: dataByte, } diff --git a/tstream/pkg/streamer/transporter.go b/tstream/pkg/streamer/transporter.go new file mode 100644 index 0000000..003976b --- /dev/null +++ b/tstream/pkg/streamer/transporter.go @@ -0,0 +1,83 @@ +package streamer + +import ( + //"compress/gzip" + "encoding/json" + //"fmt" + "github.com/qnkhuat/tstream/pkg/message" + "log" + //"os" + "sync" + "time" +) + +type Transporter struct { + out chan<- message.Wrapper + queue [][]byte + lock sync.Mutex + blockID uint + blockStartTime time.Time +} + +func NewTransporter(out chan<- message.Wrapper) *Transporter { + var queue [][]byte + return &Transporter{ + out: out, + queue: queue, + blockID: 0, + } +} + +func (t *Transporter) Start() { + for _ = range time.Tick(3 * time.Second) { + //if len(t.queue) == 1 { + log.Printf("sending, queue len: %d", len(t.queue)) + + //f, _ := os.Create(fmt.Sprintf("gzip_%d.gz", t.blockID)) + //defer f.Close() + //gz := gzip.NewWriter(f) + //defer gz.Close() + + //fraw, _ := os.Create(fmt.Sprintf("raw_%d", t.blockID)) + //defer fraw.Close() + + //if _, err := gz.Write(dataArray); err != nil { + // log.Fatal(err) + //} + //fraw.Write(dataArray) + + dataByte, _ := json.Marshal(t.queue) + payload := message.Wrapper{ + Type: message.TWrite, + Data: dataByte, + } + + t.out <- payload + t.newBlock() + + //} + } +} + +/*** + Note: 5 seconds of parrot generate 62Kb of raw bytes. With gzip the data is just 977B +***/ +func (t *Transporter) Write(data []byte) (int, error) { + // log.Printf("Transporter Receiving message: %d", len(data)) + + // log.Printf("data: %s, %v", string(data), time.Since(t.blockStartTime).Milliseconds()) + + t.lock.Lock() + byteData, _ := json.Marshal(message.TermWrite{Data: data, Time: time.Since(t.blockStartTime).Milliseconds()}) + t.queue = append(t.queue, byteData) + t.lock.Unlock() + //if time.Since(t.blockStartTime) > 3*time.Second { + + return len(data), nil +} + +func (t *Transporter) newBlock() { + t.blockID += 1 + t.queue = t.queue[:0] // clear it after backup + t.blockStartTime = time.Now() +} From e780eb58619a210a8c63d8d95ae35cc6969c6945 Mon Sep 17 00:00:00 2001 From: Earther Date: Sun, 8 Aug 2021 09:52:07 +0700 Subject: [PATCH 2/5] terminal are now exchaing block of message --- README.md | 2 + client/package-lock.json | 14 +++ client/package.json | 2 + client/src/components/WSTerminal.tsx | 5 +- client/src/lib/base64.ts | 19 ++- client/src/lib/constants.ts | 1 + client/src/lib/message.ts | 7 +- client/src/pages/[room]/index.tsx | 65 ++++++---- client/yarn.lock | 7 +- tstream/internal/cfg/cfg.go | 2 +- tstream/pkg/exwebsocket/exwebsocket.go | 22 ---- tstream/pkg/message/message.go | 15 ++- tstream/pkg/playback/playback.go | 60 --------- tstream/pkg/room/recorder.go | 1 + tstream/pkg/room/room.go | 11 +- tstream/pkg/streamer/recorder.go | 166 +++++++++++++++++++++++++ tstream/pkg/streamer/streamer.go | 26 ++-- tstream/pkg/streamer/transporter.go | 83 ------------- 18 files changed, 283 insertions(+), 225 deletions(-) delete mode 100644 tstream/pkg/exwebsocket/exwebsocket.go delete mode 100644 tstream/pkg/playback/playback.go create mode 100644 tstream/pkg/room/recorder.go create mode 100644 tstream/pkg/streamer/recorder.go delete mode 100644 tstream/pkg/streamer/transporter.go diff --git a/README.md b/README.md index a8ddc4a..4cdd505 100644 --- a/README.md +++ b/README.md @@ -31,4 +31,6 @@ - [x] In room Chat - [x] Voice chat - [ ] Stream playback +- [ ] Private session - [ ] User management system + diff --git a/client/package-lock.json b/client/package-lock.json index fb48e47..bc01de8 100644 --- a/client/package-lock.json +++ b/client/package-lock.json @@ -28,6 +28,7 @@ "axios": "^0.21.1", "dayjs": "^1.10.5", "moment": "^2.29.1", + "pako": "^1.0.11", "react": "^17.0.2", "react-dom": "^17.0.2", "react-router-dom": "^5.2.0", @@ -41,6 +42,7 @@ "xterm-addon-fit": "^0.5.0" }, "devDependencies": { + "@types/pako": "^1.0.2", "@types/react-router-dom": "^5.1.7", "@types/string-similarity": "^4.0.0", "@types/url-join": "^4.0.0", @@ -3776,6 +3778,12 @@ "resolved": "https://registry.npmjs.org/@types/normalize-package-data/-/normalize-package-data-2.4.1.tgz", "integrity": "sha512-Gj7cI7z+98M282Tqmp2K5EIsoouUEzbBJhQQzDE3jSIRk6r9gsz0oUokqIUR4u1R3dMHo0pDHM7sNOHyhulypw==" }, + "node_modules/@types/pako": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/@types/pako/-/pako-1.0.2.tgz", + "integrity": "sha512-8UJl2MjkqqS6ncpLZqRZ5LmGiFBkbYxocD4e4jmBqGvfRG1RS23gKsBQbdtV9O9GvRyjFTiRHRByjSlKCLlmZw==", + "dev": true + }, "node_modules/@types/parse-json": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/@types/parse-json/-/parse-json-4.0.0.tgz", @@ -25055,6 +25063,12 @@ "resolved": "https://registry.npmjs.org/@types/normalize-package-data/-/normalize-package-data-2.4.1.tgz", "integrity": "sha512-Gj7cI7z+98M282Tqmp2K5EIsoouUEzbBJhQQzDE3jSIRk6r9gsz0oUokqIUR4u1R3dMHo0pDHM7sNOHyhulypw==" }, + "@types/pako": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/@types/pako/-/pako-1.0.2.tgz", + "integrity": "sha512-8UJl2MjkqqS6ncpLZqRZ5LmGiFBkbYxocD4e4jmBqGvfRG1RS23gKsBQbdtV9O9GvRyjFTiRHRByjSlKCLlmZw==", + "dev": true + }, "@types/parse-json": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/@types/parse-json/-/parse-json-4.0.0.tgz", diff --git a/client/package.json b/client/package.json index e6376ef..4e039b2 100644 --- a/client/package.json +++ b/client/package.json @@ -24,6 +24,7 @@ "axios": "^0.21.1", "dayjs": "^1.10.5", "moment": "^2.29.1", + "pako": "^1.0.11", "react": "^17.0.2", "react-dom": "^17.0.2", "react-router-dom": "^5.2.0", @@ -61,6 +62,7 @@ ] }, "devDependencies": { + "@types/pako": "^1.0.2", "@types/react-router-dom": "^5.1.7", "@types/string-similarity": "^4.0.0", "@types/url-join": "^4.0.0", diff --git a/client/src/components/WSTerminal.tsx b/client/src/components/WSTerminal.tsx index 1309e2d..16673e6 100644 --- a/client/src/components/WSTerminal.tsx +++ b/client/src/components/WSTerminal.tsx @@ -36,8 +36,7 @@ class WSTerminal extends React.Component { componentDidMount() { - this.props.msgManager.sub(constants.MSG_TWRITE, (buffer: Uint8Array) => { - console.log("Got message ", buffer.length); + this.props.msgManager.sub(constants.MSG_TWRITEBLOCK, (buffer: Uint8Array) => { this.termRef.current?.writeUtf8(buffer); }) @@ -61,7 +60,7 @@ class WSTerminal extends React.Component { } componentWillUnmount() { - this.props.msgManager.unsub(constants.MSG_TWRITE); + this.props.msgManager.unsub(constants.MSG_TWRITEBLOCK); this.props.msgManager.unsub(constants.MSG_TWINSIZE); } diff --git a/client/src/lib/base64.ts b/client/src/lib/base64.ts index 867d0eb..52cdcb1 100644 --- a/client/src/lib/base64.ts +++ b/client/src/lib/base64.ts @@ -9,7 +9,20 @@ export function str2ab(input:string): Uint8Array{ return bytes; } -// array buffer to string -export function ab2str(buf: number[]): string{ - return String.fromCharCode.apply(null, buf); +export function ab2str(buf: any): string{ + return new TextDecoder().decode(buf); +} + +export function concatab(array: Uint8Array[]): Uint8Array { + let len = 0; + array.forEach((a) => { len += a.byteLength; }); + + let result = new Uint8Array(len); + let runningIndex = 0 + array.forEach((a) => { + result.set(new Uint8Array(a), runningIndex); + runningIndex += a.byteLength; + }); + return result; + } diff --git a/client/src/lib/constants.ts b/client/src/lib/constants.ts index 881598e..7e030a0 100644 --- a/client/src/lib/constants.ts +++ b/client/src/lib/constants.ts @@ -1,5 +1,6 @@ // Message type export const MSG_TWRITE = "Write"; +export const MSG_TWRITEBLOCK = "WriteBlock"; export const MSG_TWINSIZE = "Winsize"; export const MSG_TROOM_INFO = "RoomInfo"; export const MSG_TCHAT = "Chat"; diff --git a/client/src/lib/message.ts b/client/src/lib/message.ts index 65fd548..9e3b0b1 100644 --- a/client/src/lib/message.ts +++ b/client/src/lib/message.ts @@ -1,6 +1,11 @@ +export interface TermWriteBlock { + Data: string; + Offset: number; +} + export interface TermWrite { Data: string; - Time: number; + Offset: number; } export interface ChatMsg { diff --git a/client/src/pages/[room]/index.tsx b/client/src/pages/[room]/index.tsx index dbf5128..94d23b4 100644 --- a/client/src/pages/[room]/index.tsx +++ b/client/src/pages/[room]/index.tsx @@ -5,6 +5,7 @@ import * as base64 from "../../lib/base64"; import * as util from "../../lib/util"; import * as constants from "../../lib/constants"; import * as message from "../../lib/message"; +import * as pako from "pako"; import PubSub from "../../lib/pubsub"; import Chat from "../../components/Chat"; @@ -205,29 +206,47 @@ class Room extends React.Component { ws.onmessage = (ev: MessageEvent) => { let msg = JSON.parse(ev.data); - if (msg.Type === constants.MSG_TWRITE) { - - let arr = JSON.parse(window.atob(msg.Data)); - arr.forEach((el: string, i: number) => { - let singleMsg = JSON.parse(window.atob(el)); - let buffer = base64.str2ab(singleMsg.Data); - setTimeout(() => { - msgManager.pub(msg.Type, buffer); - }, singleMsg.Time as number); - }) - - - } else if (msg.Type === constants.MSG_TWINSIZE) { - - msgManager.pub(msg.Type, msg.Data); - - } else if (msg.Type === constants.MSG_TCHAT) { - - msgManager.pub(constants.MSG_TCHAT_IN, msg.Data); - - } else if (msg.Type === constants.MSG_TROOM_INFO) { - - this.setState({roomInfo: msg.Data}); + switch (msg.Type) { + + case constants.MSG_TWRITEBLOCK: + let blockMsg: message.TermWriteBlock = JSON.parse(window.atob(msg.Data)); + // this is a big chunk of encoding/decoding + // Since we have to : reduce message size by usign gzip and also + // every single termwrite have to be decoded, or else the rendering will screw up + // the whole block often took 9-20 milliseconds to decode a 3 seconds block of message + let data = pako.ungzip(base64.str2ab(blockMsg.Data)); + let dataArr: string[] = JSON.parse(base64.ab2str(data)); + //let bufferArray: Uint8Array[] = []; + dataArr.forEach((data: string) => { + let writeMsg: message.TermWrite = JSON.parse(window.atob(data)); + let buffer = base64.str2ab(writeMsg.Data) + //bufferArray.push(buffer); + setTimeout(() => { + console.log("Buffer : ", buffer.length, " offset ", writeMsg.Offset); + msgManager.pub(msg.Type, buffer); + }, writeMsg.Offset as number); + }) + //console.log(base64.concatab(bufferArray).length); + //msgManager.pub(msg.Type, base64.concatab(bufferArray)); + break; + case constants.MSG_TWINSIZE: + + msgManager.pub(msg.Type, msg.Data); + break; + + case constants.MSG_TCHAT: + + msgManager.pub(constants.MSG_TCHAT_IN, msg.Data); + break; + + case constants.MSG_TROOM_INFO: + + this.setState({roomInfo: msg.Data}); + break; + + default: + + console.error("Unhandled message: ", msg.Type) } } diff --git a/client/yarn.lock b/client/yarn.lock index 3b37487..c62fb5c 100644 --- a/client/yarn.lock +++ b/client/yarn.lock @@ -2129,6 +2129,11 @@ "resolved" "https://registry.npmjs.org/@types/normalize-package-data/-/normalize-package-data-2.4.1.tgz" "version" "2.4.1" +"@types/pako@^1.0.2": + "integrity" "sha512-8UJl2MjkqqS6ncpLZqRZ5LmGiFBkbYxocD4e4jmBqGvfRG1RS23gKsBQbdtV9O9GvRyjFTiRHRByjSlKCLlmZw==" + "resolved" "https://registry.npmjs.org/@types/pako/-/pako-1.0.2.tgz" + "version" "1.0.2" + "@types/parse-json@^4.0.0": "integrity" "sha512-//oorEZjL6sbPcKUaCdIGlIUeH26mgzimjBB77G6XRgnDl/L5wOnpyBGRe/Mmf5CVW3PwEBE1NjiMZ/ssFh4wA==" "resolved" "https://registry.npmjs.org/@types/parse-json/-/parse-json-4.0.0.tgz" @@ -8657,7 +8662,7 @@ "resolved" "https://registry.npmjs.org/p-try/-/p-try-2.2.0.tgz" "version" "2.2.0" -"pako@~1.0.5": +"pako@^1.0.11", "pako@~1.0.5": "integrity" "sha512-4hLB8Py4zZce5s4yd9XzopqwVv/yGNhV1Bl8NTmCq1763HeK2+EwVTv+leGeL13Dnh2wfbqowVPXCIO0z4taYw==" "resolved" "https://registry.npmjs.org/pako/-/pako-1.0.11.tgz" "version" "1.0.11" diff --git a/tstream/internal/cfg/cfg.go b/tstream/internal/cfg/cfg.go index d4f2f2d..04a6797 100644 --- a/tstream/internal/cfg/cfg.go +++ b/tstream/internal/cfg/cfg.go @@ -6,7 +6,7 @@ const ( SERVER_STREAMER_REQUIRED_VERSION = "1.2.0" // Used to verify compatible verion of streamer // Room - ROOM_BUFFER_SIZE = 20 // number of recent broadcast message to buffer + ROOM_BUFFER_SIZE = 3 // number of recent broadcast message to buffer ROOM_CACHE_MSG_SIZE = 25 // number of recent chat messages to buffer // Streamer diff --git a/tstream/pkg/exwebsocket/exwebsocket.go b/tstream/pkg/exwebsocket/exwebsocket.go deleted file mode 100644 index d5f7e15..0000000 --- a/tstream/pkg/exwebsocket/exwebsocket.go +++ /dev/null @@ -1,22 +0,0 @@ -package exwebsocket - -import ( - "github.com/gorilla/websocket" - "sync" -) - -type Conn struct { - *websocket.Conn - mu sync.Mutex -} - -func New(conn *websocket.Conn) *Conn { - return &Conn{Conn: conn} -} - -func (ws *Conn) SafeWriteMessage(msgType int, data []byte) error { - ws.mu.Lock() - err := ws.WriteMessage(msgType, data) - ws.mu.Unlock() - return err -} diff --git a/tstream/pkg/message/message.go b/tstream/pkg/message/message.go index 6129415..6901309 100644 --- a/tstream/pkg/message/message.go +++ b/tstream/pkg/message/message.go @@ -17,6 +17,7 @@ type MType string const ( TWrite MType = "Write" + TWriteBlock MType = "WriteBlock" TChat MType = "Chat" TClose MType = "Close" TError MType = "Error" @@ -59,9 +60,21 @@ type Winsize struct { Cols uint16 } +type TermWriteBlock struct { + StartTime time.Time + + // how many milliseconds of data this block contains + Duration int64 + + // gzipped array of TermWrite + Data []byte +} + type TermWrite struct { Data []byte - Time int64 + + // Time Offset is the offset of this message with its parent block's start time + Offset int64 // milliseconds } type Chat struct { diff --git a/tstream/pkg/playback/playback.go b/tstream/pkg/playback/playback.go deleted file mode 100644 index 3f5bc61..0000000 --- a/tstream/pkg/playback/playback.go +++ /dev/null @@ -1,60 +0,0 @@ -package playback - -/*** -File size note: -7 Min parrot: 8.8 Mbs - -***/ -import ( - "encoding/json" - "github.com/qnkhuat/tstream/pkg/message" - "log" - "os" - "path/filepath" - "strconv" -) - -type Playback struct { - id uint64 - root string - queue []message.TermWrite - In chan message.Wrapper -} - -func New(id uint64, root string) *Playback { - in := make(chan message.Wrapper, 256) - return &Playback{ - id: id, - root: root, - In: in, - } -} - -func (p *Playback) Start() { - // Open a json writer file - // fileroot is root/id.json - path := filepath.Join(p.root, strconv.FormatUint(p.id, 10)+".json") - - f, err := os.Create(path) - if err != nil { - log.Printf("Failed to create playback file") - return - } - - log.Printf("Playback are saving to %s", path) - - defer f.Close() - // Start the playback - for { - msg, ok := <-p.In - if !ok { - log.Printf("Failed to read from channel") - continue - } - - // Write the message to the file - if err := json.NewEncoder(f).Encode(msg); err != nil { - log.Printf("Failed ot save message: %s", err) - } - } -} diff --git a/tstream/pkg/room/recorder.go b/tstream/pkg/room/recorder.go new file mode 100644 index 0000000..0191265 --- /dev/null +++ b/tstream/pkg/room/recorder.go @@ -0,0 +1 @@ +package room diff --git a/tstream/pkg/room/room.go b/tstream/pkg/room/room.go index 47a0e54..80f118b 100644 --- a/tstream/pkg/room/room.go +++ b/tstream/pkg/room/room.go @@ -9,7 +9,6 @@ import ( "github.com/gorilla/websocket" "github.com/qnkhuat/tstream/internal/cfg" "github.com/qnkhuat/tstream/pkg/message" - "github.com/qnkhuat/tstream/pkg/playback" "log" "strings" "sync" @@ -34,14 +33,12 @@ type Room struct { status message.RoomStatus secret string // used to verify streamer sfu *SFU - playback *playback.Playback } func New(name, title, secret string) *Room { clients := make(map[string]*Client) var buffer []message.Wrapper var cacheChat []message.Chat - playback := playback.New(234234324, "./") return &Room{ name: name, accViewers: 0, @@ -54,7 +51,6 @@ func New(name, title, secret string) *Room { secret: secret, cacheChat: cacheChat, sfu: NewSFU(), - playback: playback, } } @@ -114,7 +110,6 @@ func (r *Room) Streamer() *websocket.Conn { // Wait for request from streamer and broadcast those message to clients func (r *Room) Start() { - go r.playback.Start() go func() { for _ = range time.Tick(cfg.SERVER_CLEAN_INTERVAL * time.Second) { @@ -147,11 +142,10 @@ func (r *Room) Start() { log.Printf("Failed to decode winsize message: %s", err) } - case message.TWrite: + case message.TWriteBlock: r.addMsgBuffer(msg) r.lastActiveTime = time.Now() r.Broadcast(msg, []message.CRole{message.RViewer}, []string{}) - r.playback.In <- msg default: log.Printf("Unknown message type: %s", msgType) @@ -358,7 +352,9 @@ func (r *Room) ReadAndHandleClientMessage(ID string) { } func (r *Room) Broadcast(msg message.Wrapper, roles []message.CRole, IDExclude []string) { + log.Printf("Broadcasting message: %s", msg.Type) + // TODO : make this run concurrently for id, client := range r.clients { // Check if client is in the list of roles to broadcast found := false @@ -373,7 +369,6 @@ func (r *Room) Broadcast(msg message.Wrapper, roles []message.CRole, IDExclude [ continue } - // TODO: make this for loop run in parallel var isExcluded bool = false for _, idExclude := range IDExclude { if id == idExclude { diff --git a/tstream/pkg/streamer/recorder.go b/tstream/pkg/streamer/recorder.go new file mode 100644 index 0000000..22ab9f3 --- /dev/null +++ b/tstream/pkg/streamer/recorder.go @@ -0,0 +1,166 @@ +/*** +Recorder service for streamer. +It receives package from pty and manage when to send to server +Each message is a TermBlock. Inside termblock is multiple TermWrite message during a time interval +***/ +package streamer + +import ( + "bytes" + "compress/gzip" + //"encoding/base64" + "encoding/json" + "github.com/qnkhuat/tstream/pkg/message" + "log" + "sync" + "time" +) + +type Recorder struct { + lock sync.Mutex + + // A queue to store message + queue [][]byte + + // Channel to send message to + out chan<- message.Wrapper + + // duration of each termwriteblock + blockDuration time.Duration + + currentBlock *Block +} + +func NewRecorder(blockDuration time.Duration, out chan<- message.Wrapper) *Recorder { + currentBlock := NewBlock(blockDuration) + return &Recorder{ + blockDuration: blockDuration, + out: out, + currentBlock: currentBlock, + } +} + +/*** +Note: 3 seconds of parrot generate 70Kb of raw bytes. With gzip the data is just 6k +***/ +func (r *Recorder) Start() { + if r.out == nil { + log.Printf("No output channel for recorder") + return + } + + // Send all message in queue after each block duration + for _ = range time.Tick(r.blockDuration) { + if r.currentBlock.NQueue() == 0 { + r.newBlock() + continue + } + + payload, err := r.currentBlock.Serialize() + if err != nil { + log.Printf("Failed to serialize block") + r.newBlock() + continue + } + r.out <- payload + r.newBlock() + } +} + +func (r *Recorder) Write(data []byte) (int, error) { + r.lock.Lock() + r.currentBlock.AddMessage(data) + r.lock.Unlock() + return len(data), nil + +} + +func (r *Recorder) newBlock() { + r.lock.Lock() + r.currentBlock = NewBlock(r.blockDuration) + defer r.lock.Unlock() +} + +/*** Block ***/ +type Block struct { + lock sync.Mutex + + // Each data block will have its own start time + // Any message in queue will be offset to this startime + startTime time.Time + + // how many milliseconds of data this block contains + duration time.Duration + + // queue of encoded termwrite message + queue [][]byte +} + +func NewBlock(duration time.Duration) *Block { + var queue [][]byte + return &Block{ + duration: duration, + queue: queue, + startTime: time.Now(), + } +} + +func (bl *Block) Serialize() (message.Wrapper, error) { + var msg message.Wrapper + + // Serialize message queue + dataByte, err := json.Marshal(bl.queue) + if err != nil { + log.Printf("Failed to marshal message: %s", err) + return msg, err + } + + // compress with gzip + var b bytes.Buffer + gz := gzip.NewWriter(&b) + if _, err := gz.Write(dataByte); err != nil { + log.Printf("Failed to gzip: %s", err) + gz.Close() + return msg, err + } + gz.Close() + + // construct return message + blockMsg := message.TermWriteBlock{ + StartTime: bl.startTime, + Duration: bl.duration.Milliseconds(), + Data: b.Bytes(), + } + log.Printf("bytes: %d, raw: %d", len(b.Bytes()), len(dataByte)) + + blockByte, err := json.Marshal(blockMsg) + if err != nil { + log.Printf("Failed to encode termwrite block message") + return msg, err + } + + msg = message.Wrapper{ + Type: message.TWriteBlock, + Data: blockByte, + } + + return msg, nil +} + +func (bl *Block) AddMessage(data []byte) { + bl.lock.Lock() + + // have to marshal any single termwrite message + // or else the rendering will screw up + byteData, _ := json.Marshal(message.TermWrite{ + Data: data, + Offset: time.Since(bl.startTime).Milliseconds(), + }) + bl.queue = append(bl.queue, byteData) + + bl.lock.Unlock() +} + +func (bl *Block) NQueue() int { + return len(bl.queue) +} diff --git a/tstream/pkg/streamer/streamer.go b/tstream/pkg/streamer/streamer.go index 8875c73..8424c26 100644 --- a/tstream/pkg/streamer/streamer.go +++ b/tstream/pkg/streamer/streamer.go @@ -29,9 +29,11 @@ type Streamer struct { secret string title string conn *websocket.Conn - tr *Transporter + recorder *Recorder Out chan message.Wrapper In chan message.Wrapper + // interval of sending message in queue + interval time.Duration } func New(clientAddr, serverAddr, username, title string) *Streamer { @@ -50,6 +52,7 @@ func New(clientAddr, serverAddr, username, title string) *Streamer { title: title, Out: out, In: in, + interval: 3 * time.Second, } } @@ -75,8 +78,8 @@ func (s *Streamer) Start() error { } // Init transporter - s.tr = NewTransporter(s.Out) - go s.tr.Start() + s.recorder = NewRecorder(s.interval, s.Out) + go s.recorder.Start() fmt.Printf("🔥 Streaming at: %s/%s\n", s.clientAddr, s.username) @@ -94,7 +97,7 @@ func (s *Streamer) Start() error { // Pipe command response to Pty and server go func() { //mw := io.MultiWriter(os.Stdout, s, s.tr) - mw := io.MultiWriter(os.Stdout, s.tr) + mw := io.MultiWriter(os.Stdout, s.recorder) _, err := io.Copy(mw, s.pty.F()) if err != nil { log.Printf("Failed to send pty to mw: %s", err) @@ -254,21 +257,6 @@ func (s *Streamer) Stop(msg string) { fmt.Println(msg) } -// Default behavior of Write is to send Write message -func (s *Streamer) Write(data []byte) (int, error) { - // TODO: find out why if we don't encode this - // the xterm will show duplciated text - // Clue: marshal ensure data is encoded in UTF-8 - dataByte, _ := json.Marshal(message.TermWrite{Data: data}) - payload := message.Wrapper{ - Type: message.TWrite, - Data: dataByte, - } - - s.Out <- payload - return len(data), nil -} - func (s *Streamer) Winsize(rows, cols uint16) { payload := message.Wrapper{ Type: message.TWinsize, diff --git a/tstream/pkg/streamer/transporter.go b/tstream/pkg/streamer/transporter.go deleted file mode 100644 index 003976b..0000000 --- a/tstream/pkg/streamer/transporter.go +++ /dev/null @@ -1,83 +0,0 @@ -package streamer - -import ( - //"compress/gzip" - "encoding/json" - //"fmt" - "github.com/qnkhuat/tstream/pkg/message" - "log" - //"os" - "sync" - "time" -) - -type Transporter struct { - out chan<- message.Wrapper - queue [][]byte - lock sync.Mutex - blockID uint - blockStartTime time.Time -} - -func NewTransporter(out chan<- message.Wrapper) *Transporter { - var queue [][]byte - return &Transporter{ - out: out, - queue: queue, - blockID: 0, - } -} - -func (t *Transporter) Start() { - for _ = range time.Tick(3 * time.Second) { - //if len(t.queue) == 1 { - log.Printf("sending, queue len: %d", len(t.queue)) - - //f, _ := os.Create(fmt.Sprintf("gzip_%d.gz", t.blockID)) - //defer f.Close() - //gz := gzip.NewWriter(f) - //defer gz.Close() - - //fraw, _ := os.Create(fmt.Sprintf("raw_%d", t.blockID)) - //defer fraw.Close() - - //if _, err := gz.Write(dataArray); err != nil { - // log.Fatal(err) - //} - //fraw.Write(dataArray) - - dataByte, _ := json.Marshal(t.queue) - payload := message.Wrapper{ - Type: message.TWrite, - Data: dataByte, - } - - t.out <- payload - t.newBlock() - - //} - } -} - -/*** - Note: 5 seconds of parrot generate 62Kb of raw bytes. With gzip the data is just 977B -***/ -func (t *Transporter) Write(data []byte) (int, error) { - // log.Printf("Transporter Receiving message: %d", len(data)) - - // log.Printf("data: %s, %v", string(data), time.Since(t.blockStartTime).Milliseconds()) - - t.lock.Lock() - byteData, _ := json.Marshal(message.TermWrite{Data: data, Time: time.Since(t.blockStartTime).Milliseconds()}) - t.queue = append(t.queue, byteData) - t.lock.Unlock() - //if time.Since(t.blockStartTime) > 3*time.Second { - - return len(data), nil -} - -func (t *Transporter) newBlock() { - t.blockID += 1 - t.queue = t.queue[:0] // clear it after backup - t.blockStartTime = time.Now() -} From 2f4c2bfbcb28f12944b3f3d2861aa7494b00c7fe Mon Sep 17 00:00:00 2001 From: Earther Date: Sun, 8 Aug 2021 14:45:25 +0700 Subject: [PATCH 3/5] client now read from blcok of data and schedule message according to offset --- client/src/components/StreamPreview.tsx | 21 +++-- client/src/components/WSTerminal.tsx | 116 ++++++++++++++++++++++-- client/src/lib/message.ts | 31 ++++++- client/src/lib/pubsub.ts | 1 + client/src/pages/[room]/index.tsx | 26 ++---- tstream/internal/cfg/cfg.go | 11 ++- tstream/pkg/message/message.go | 1 + tstream/pkg/room/room.go | 48 ++++++---- tstream/pkg/streamer/recorder.go | 60 +++++++----- tstream/pkg/streamer/streamer.go | 26 +++--- 10 files changed, 244 insertions(+), 97 deletions(-) diff --git a/client/src/components/StreamPreview.tsx b/client/src/components/StreamPreview.tsx index c5f6199..16d6f23 100644 --- a/client/src/components/StreamPreview.tsx +++ b/client/src/components/StreamPreview.tsx @@ -4,6 +4,7 @@ import PubSub from "./../lib/pubsub"; import * as base64 from "../lib/base64"; import * as util from "../lib/util"; import * as constants from "../lib/constants"; +import * as message from "../lib/message"; import dayjs from "dayjs"; import customParseFormat from 'dayjs/plugin/customParseFormat'; @@ -41,19 +42,19 @@ const StreamPreview: FC = ({ title, wsUrl, streamerID, nViewers, startedT ws.onmessage = (ev: MessageEvent) => { let msg = JSON.parse(ev.data); - if (msg.Type === constants.MSG_TWRITE) { - - let buffer = base64.str2ab(JSON.parse(window.atob(msg.Data)).Data); - tempMsg.pub(msg.Type, buffer); - - } else if (msg.Type === constants.MSG_TWINSIZE) { - - let winSizeMsg = msg.Data; - tempMsg.pub(msg.Type, winSizeMsg); + switch (msg.Type) { + case constants.MSG_TWRITEBLOCK: + let blockMsg: message.TermWriteBlock = JSON.parse(window.atob(msg.Data)); + tempMsg.pub(msg.Type, blockMsg); + break; + case constants.MSG_TWINSIZE: + let winSizeMsg = msg.Data; + tempMsg.pub(msg.Type, winSizeMsg); + break; } - } + } tempMsg.sub("request", (msgType: string) => { var payload = JSON.stringify({ diff --git a/client/src/components/WSTerminal.tsx b/client/src/components/WSTerminal.tsx index 16673e6..bcdace8 100644 --- a/client/src/components/WSTerminal.tsx +++ b/client/src/components/WSTerminal.tsx @@ -1,43 +1,55 @@ import React from "react"; import Xterm from "./Xterm"; import * as constants from "../lib/constants"; +import * as message from "../lib/message"; +import * as base64 from "../lib/base64"; +import * as pako from "pako"; import PubSub from "../lib/pubsub"; -// TODO: add handle % and px -interface Props { - msgManager: PubSub; - width: number; // in pixel - height: number; // in pixel - className?: string; -} interface Winsize { Rows: number; Cols: number; } +// TODO: add handle % and px for size +interface Props { + msgManager: PubSub; + width: number; // in pixel + height: number; // in pixel + delay?: number; + className?: string; +} class WSTerminal extends React.Component { static defaultProps = { width: -1, height: -1, + delay: 0, className: "", } termRef : React.RefObject; divRef: React.RefObject; + writeManager: WriteManager; + constructor(props: Props) { super(props) this.termRef = React.createRef(); this.divRef = React.createRef(); + let writeCB = (buffer: Uint8Array) => { + this.termRef.current?.writeUtf8(buffer); + }; + + this.writeManager = new WriteManager(writeCB, this.props.delay); } componentDidMount() { - this.props.msgManager.sub(constants.MSG_TWRITEBLOCK, (buffer: Uint8Array) => { - this.termRef.current?.writeUtf8(buffer); + this.props.msgManager.sub(constants.MSG_TWRITEBLOCK, (block: message.TermWriteBlock) => { + this.writeManager.addBlock(block); }) this.props.msgManager.sub(constants.MSG_TWINSIZE, (winsize: Winsize) => { @@ -105,4 +117,90 @@ class WSTerminal extends React.Component { } +class WriteManager { + + queue: message.TermWrite[] = []; + writeCB: (arr:Uint8Array) => void; + delay: number; // in milliseconds + + constructor(writeCB: (arr: Uint8Array) => void, delay: number = 0) { + this.writeCB = writeCB; + this.delay = delay; + } + + resetQueue() { + this.queue = []; + } + + + addQueue(q: message.TermWrite[]) { + this.queue.push(...q); // Concatnate + this.consume(); + } + + consume() { + if (this.queue.length == 0) { + return + } else { + + // any message has offset < 0 => messages from the past with respect to render time + // concat all these messages into one buffer and render at once + let bufferArray: Uint8Array[] = []; + while (true && this.queue.length != 0) { + let msg = this.queue[0]; + + if (msg.Offset < 0) { + bufferArray.push() + let buffer = base64.str2ab(msg.Data) + bufferArray.push(buffer); + this.queue.shift(); + } else break; + } + if ( bufferArray.length > 0) this.writeCB(base64.concatab(bufferArray)); + + // schedule to render upcomming messages + // TODO: are there any ways we don't have to create many settimeout liek this? + // tried sequentially call to settimeout but the performance is much worse + this.queue.forEach((msg) => { + let buffer = base64.str2ab(msg.Data); + setTimeout(() => { + this.writeCB(buffer); + }, msg.Offset); + }) + this.resetQueue(); + } + } + + addBlock(block: message.TermWriteBlock) { + // when viewers receive this block + // it only contains the actual start-time + // we need to be able to re-compute the render time based on + // - now time + // - when does this block being created + // - the delay factor. In case of play back the delay = now - stream sesion start time + let blockDelayTime = (new Date()).getTime() - (new Date(block.StartTime)).getTime() - this.delay; + + // this is a big chunk of encoding/decoding + // Since we have to : reduce message size by usign gzip and also + // every single termwrite have to be decoded, or else the rendering will screw up + // the whole block often took 9-20 milliseconds to decode a 3 seconds block of message + let data = pako.ungzip(base64.str2ab(block.Data)); + let dataArrString: string[] = JSON.parse(base64.ab2str(data)); + + // convert from string[] to message.TermWrite[] + let dataArrTermWrite: message.TermWrite[] = []; + dataArrString.forEach((data: string, index: number) => { + let writeMsg = JSON.parse(window.atob(data)); + + // re-compute the offset of this message with respect to the render time + writeMsg.Offset = writeMsg.Offset - blockDelayTime; + dataArrTermWrite.push(writeMsg); + }) + + this.addQueue(dataArrTermWrite); + + } +} + + export default WSTerminal; diff --git a/client/src/lib/message.ts b/client/src/lib/message.ts index 9e3b0b1..0095a8a 100644 --- a/client/src/lib/message.ts +++ b/client/src/lib/message.ts @@ -1,6 +1,12 @@ +//import * as pako from "pako"; +//import * as constants from "./constants"; +//import * as base64 from "./base64"; +//import PubSub from "./pubsub"; + export interface TermWriteBlock { Data: string; - Offset: number; + Duration: number; + StartTime: string; } export interface TermWrite { @@ -15,3 +21,26 @@ export interface ChatMsg { Time: string; } + + +// *** Message handlers *** +//export function hanldeWriteBlockMessage(msgData: string, msgManager: PubSub) { +// let blockMsg: TermWriteBlock = JSON.parse(window.atob(msgData)); +// // this is a big chunk of encoding/decoding +// // Since we have to : reduce message size by usign gzip and also +// // every single termwrite have to be decoded, or else the rendering will screw up +// // the whole block often took 9-20 milliseconds to decode a 3 seconds block of message +// let data = pako.ungzip(base64.str2ab(blockMsg.Data)); +// let dataArr: string[] = JSON.parse(base64.ab2str(data)); +// //let bufferArray: Uint8Array[] = []; +// dataArr.forEach((data: string) => { +// let writeMsg: TermWrite = JSON.parse(window.atob(data)); +// let buffer = base64.str2ab(writeMsg.Data) +// //bufferArray.push(buffer); +// setTimeout(() => { +// msgManager.pub(, buffer); +// }, writeMsg.Offset as number); +// }) +// //console.log(base64.concatab(bufferArray).length); +// //msgManager.pub(msg.Type, base64.concatab(bufferArray)); +//} diff --git a/client/src/lib/pubsub.ts b/client/src/lib/pubsub.ts index db21ae2..baf80c7 100644 --- a/client/src/lib/pubsub.ts +++ b/client/src/lib/pubsub.ts @@ -1,3 +1,4 @@ +// Generic pubsub class export default class PubSub { // set to true if one topic can only have one subscriber diff --git a/client/src/pages/[room]/index.tsx b/client/src/pages/[room]/index.tsx index 94d23b4..c5ee2c2 100644 --- a/client/src/pages/[room]/index.tsx +++ b/client/src/pages/[room]/index.tsx @@ -54,6 +54,7 @@ interface RoomInfo { NViewers: number; Title: string; Status: RoomStatus; + Delay: number; } interface Params { @@ -209,26 +210,10 @@ class Room extends React.Component { switch (msg.Type) { case constants.MSG_TWRITEBLOCK: - let blockMsg: message.TermWriteBlock = JSON.parse(window.atob(msg.Data)); - // this is a big chunk of encoding/decoding - // Since we have to : reduce message size by usign gzip and also - // every single termwrite have to be decoded, or else the rendering will screw up - // the whole block often took 9-20 milliseconds to decode a 3 seconds block of message - let data = pako.ungzip(base64.str2ab(blockMsg.Data)); - let dataArr: string[] = JSON.parse(base64.ab2str(data)); - //let bufferArray: Uint8Array[] = []; - dataArr.forEach((data: string) => { - let writeMsg: message.TermWrite = JSON.parse(window.atob(data)); - let buffer = base64.str2ab(writeMsg.Data) - //bufferArray.push(buffer); - setTimeout(() => { - console.log("Buffer : ", buffer.length, " offset ", writeMsg.Offset); - msgManager.pub(msg.Type, buffer); - }, writeMsg.Offset as number); - }) - //console.log(base64.concatab(bufferArray).length); - //msgManager.pub(msg.Type, base64.concatab(bufferArray)); + let blockMsg: message.TermWriteBlock = JSON.parse(window.atob(msg.Data)); + msgManager.pub(msg.Type, blockMsg); break; + case constants.MSG_TWINSIZE: msgManager.pub(msg.Type, msg.Data); @@ -329,12 +314,13 @@ class Room extends React.Component {
- {!isStreamStopped && isRoomExisted && + {!isStreamStopped && isRoomExisted && this.state.roomInfo && } {isStreamStopped && isRoomExisted && diff --git a/tstream/internal/cfg/cfg.go b/tstream/internal/cfg/cfg.go index 04a6797..f906c86 100644 --- a/tstream/internal/cfg/cfg.go +++ b/tstream/internal/cfg/cfg.go @@ -1,13 +1,14 @@ package cfg const ( - SERVER_VERSION = "1.2.0" // Used to verify compatible verion of streamer - STREAMER_VERSION = "1.2.0" // retry connect with server if websocket is broke - SERVER_STREAMER_REQUIRED_VERSION = "1.2.0" // Used to verify compatible verion of streamer + SERVER_VERSION = "1.3.0" // Used to verify compatible verion of streamer + STREAMER_VERSION = "1.3.0" // retry connect with server if websocket is broke + SERVER_STREAMER_REQUIRED_VERSION = "1.3.0" // Used to verify compatible verion of streamer // Room - ROOM_BUFFER_SIZE = 3 // number of recent broadcast message to buffer - ROOM_CACHE_MSG_SIZE = 25 // number of recent chat messages to buffer + ROOM_BUFFER_SIZE = 3 // number of recent broadcast message to buffer + ROOM_CACHE_MSG_SIZE = 25 // number of recent chat messages to buffer + ROOM_DEFAULT_DELAY = 3000 // act as both block size and delay time of streaming // Streamer STREAMER_READ_BUFFER_SIZE = 1024 // streamer websocket read buffer size diff --git a/tstream/pkg/message/message.go b/tstream/pkg/message/message.go index 6901309..341710e 100644 --- a/tstream/pkg/message/message.go +++ b/tstream/pkg/message/message.go @@ -104,6 +104,7 @@ type RoomInfo struct { Title string StreamerID string Status RoomStatus + Delay uint64 // Viewer delay time with streamer ( in milliseconds ) } // used for streamer to update room info diff --git a/tstream/pkg/room/room.go b/tstream/pkg/room/room.go index 80f118b..7bc761a 100644 --- a/tstream/pkg/room/room.go +++ b/tstream/pkg/room/room.go @@ -18,21 +18,30 @@ import ( var emptyByteArray []byte type Room struct { - lock sync.Mutex - streamer *websocket.Conn - clients map[string]*Client // Chats + viewrer connection - accViewers uint64 // accumulated viewers - name string // also is streamerID - id uint64 // Id in DB - title string + lock sync.Mutex + + streamer *websocket.Conn + sfu *SFU + clients map[string]*Client // Chats + viewrer connection + + msgBuffer []message.Wrapper + cacheChat []message.Chat + + // config + delay uint64 // Viewer delay time with streamer ( in milliseconds ) + + // states lastWinsize message.Winsize startedTime time.Time lastActiveTime time.Time - msgBuffer []message.Wrapper - cacheChat []message.Chat - status message.RoomStatus - secret string // used to verify streamer - sfu *SFU + accViewers uint64 // accumulated viewers + + // room info + id uint64 // Id in DB + name string // also is streamerID + title string + secret string // used to verify streamer + status message.RoomStatus } func New(name, title, secret string) *Room { @@ -41,16 +50,17 @@ func New(name, title, secret string) *Room { var cacheChat []message.Chat return &Room{ name: name, - accViewers: 0, - clients: clients, - lastActiveTime: time.Now(), - startedTime: time.Now(), - msgBuffer: buffer, - status: message.RStreaming, title: title, secret: secret, + clients: clients, + accViewers: 0, + msgBuffer: buffer, cacheChat: cacheChat, sfu: NewSFU(), + lastActiveTime: time.Now(), + startedTime: time.Now(), + status: message.RStreaming, + delay: 3000, } } @@ -352,7 +362,6 @@ func (r *Room) ReadAndHandleClientMessage(ID string) { } func (r *Room) Broadcast(msg message.Wrapper, roles []message.CRole, IDExclude []string) { - log.Printf("Broadcasting message: %s", msg.Type) // TODO : make this run concurrently for id, client := range r.clients { @@ -409,6 +418,7 @@ func (r *Room) PrepareRoomInfo() message.RoomInfo { StreamerID: r.name, Status: r.status, AccNViewers: r.accViewers, + Delay: r.delay, } } diff --git a/tstream/pkg/streamer/recorder.go b/tstream/pkg/streamer/recorder.go index 22ab9f3..3983629 100644 --- a/tstream/pkg/streamer/recorder.go +++ b/tstream/pkg/streamer/recorder.go @@ -25,46 +25,60 @@ type Recorder struct { // Channel to send message to out chan<- message.Wrapper - // duration of each termwriteblock + // duration of each termwriteblock to send blockDuration time.Duration + // delay of stream + delay time.Duration + currentBlock *Block } -func NewRecorder(blockDuration time.Duration, out chan<- message.Wrapper) *Recorder { - currentBlock := NewBlock(blockDuration) +func NewRecorder(blockDuration time.Duration, delay time.Duration, out chan<- message.Wrapper) *Recorder { + if delay < blockDuration { + log.Printf("Block duration(%d) should smaller than delay(%d)", blockDuration, delay) + blockDuration = delay + } + currentBlock := NewBlock(blockDuration, delay) return &Recorder{ blockDuration: blockDuration, out: out, currentBlock: currentBlock, + delay: delay, } } -/*** -Note: 3 seconds of parrot generate 70Kb of raw bytes. With gzip the data is just 6k -***/ func (r *Recorder) Start() { if r.out == nil { log.Printf("No output channel for recorder") return } + // First message + time.Sleep(r.delay) + r.Send() + // Send all message in queue after each block duration for _ = range time.Tick(r.blockDuration) { - if r.currentBlock.NQueue() == 0 { - r.newBlock() - continue - } - - payload, err := r.currentBlock.Serialize() - if err != nil { - log.Printf("Failed to serialize block") - r.newBlock() - continue - } - r.out <- payload + r.Send() + } +} + +// send all message in block and create a new one +func (r *Recorder) Send() { + if r.currentBlock.NQueue() == 0 { r.newBlock() + return } + + payload, err := r.currentBlock.Serialize() + if err != nil { + log.Printf("Failed to serialize block") + r.newBlock() + return + } + r.out <- payload + r.newBlock() } func (r *Recorder) Write(data []byte) (int, error) { @@ -77,7 +91,7 @@ func (r *Recorder) Write(data []byte) (int, error) { func (r *Recorder) newBlock() { r.lock.Lock() - r.currentBlock = NewBlock(r.blockDuration) + r.currentBlock = NewBlock(r.blockDuration, r.delay) defer r.lock.Unlock() } @@ -92,16 +106,19 @@ type Block struct { // how many milliseconds of data this block contains duration time.Duration + delay time.Duration + // queue of encoded termwrite message queue [][]byte } -func NewBlock(duration time.Duration) *Block { +func NewBlock(duration time.Duration, delay time.Duration) *Block { var queue [][]byte return &Block{ duration: duration, queue: queue, startTime: time.Now(), + delay: delay, } } @@ -116,6 +133,8 @@ func (bl *Block) Serialize() (message.Wrapper, error) { } // compress with gzip + // with gzip data often compressed to 1/10 -> 1/8 its original + // Note: 3 seconds of parrot generate 70Kb of raw bytes. With gzip the data is just 6k var b bytes.Buffer gz := gzip.NewWriter(&b) if _, err := gz.Write(dataByte); err != nil { @@ -131,7 +150,6 @@ func (bl *Block) Serialize() (message.Wrapper, error) { Duration: bl.duration.Milliseconds(), Data: b.Bytes(), } - log.Printf("bytes: %d, raw: %d", len(b.Bytes()), len(dataByte)) blockByte, err := json.Marshal(blockMsg) if err != nil { diff --git a/tstream/pkg/streamer/streamer.go b/tstream/pkg/streamer/streamer.go index 8424c26..5f6b947 100644 --- a/tstream/pkg/streamer/streamer.go +++ b/tstream/pkg/streamer/streamer.go @@ -32,8 +32,9 @@ type Streamer struct { recorder *Recorder Out chan message.Wrapper In chan message.Wrapper - // interval of sending message in queue - interval time.Duration + // delay of sending message in queue + delay time.Duration + blockDuration time.Duration } func New(clientAddr, serverAddr, username, title string) *Streamer { @@ -44,15 +45,16 @@ func New(clientAddr, serverAddr, username, title string) *Streamer { secret := GetSecret(CONFIG_PATH) return &Streamer{ - secret: secret, - pty: pty, - serverAddr: serverAddr, - clientAddr: clientAddr, - username: username, - title: title, - Out: out, - In: in, - interval: 3 * time.Second, + secret: secret, + pty: pty, + serverAddr: serverAddr, + clientAddr: clientAddr, + username: username, + title: title, + Out: out, + In: in, + delay: 3 * time.Second, + blockDuration: 3 * time.Second, // block size has to smaller than delay } } @@ -78,7 +80,7 @@ func (s *Streamer) Start() error { } // Init transporter - s.recorder = NewRecorder(s.interval, s.Out) + s.recorder = NewRecorder(s.blockDuration, s.delay, s.Out) go s.recorder.Start() fmt.Printf("🔥 Streaming at: %s/%s\n", s.clientAddr, s.username) From 794dddb97367c01a685401d764be8a1df07aac0f Mon Sep 17 00:00:00 2001 From: Earther Date: Sun, 8 Aug 2021 15:38:44 +0700 Subject: [PATCH 4/5] fix bug timing in message offset time --- tstream/pkg/streamer/recorder.go | 14 ++++++++++---- tstream/pkg/streamer/streamer.go | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/tstream/pkg/streamer/recorder.go b/tstream/pkg/streamer/recorder.go index 3983629..089ecaf 100644 --- a/tstream/pkg/streamer/recorder.go +++ b/tstream/pkg/streamer/recorder.go @@ -35,7 +35,10 @@ type Recorder struct { } func NewRecorder(blockDuration time.Duration, delay time.Duration, out chan<- message.Wrapper) *Recorder { + if delay < blockDuration { + // delay should be larger than blockduraiton about .5 seconds for transmission time + // this will ensure a smooth stream log.Printf("Block duration(%d) should smaller than delay(%d)", blockDuration, delay) blockDuration = delay } @@ -56,11 +59,11 @@ func (r *Recorder) Start() { // First message time.Sleep(r.delay) - r.Send() + go r.Send() // Send all message in queue after each block duration for _ = range time.Tick(r.blockDuration) { - r.Send() + go r.Send() } } @@ -171,8 +174,11 @@ func (bl *Block) AddMessage(data []byte) { // have to marshal any single termwrite message // or else the rendering will screw up byteData, _ := json.Marshal(message.TermWrite{ - Data: data, - Offset: time.Since(bl.startTime).Milliseconds(), + Data: data, + // offset of a single message is + // the different between now and block start time + // plus the (delay - duration) + Offset: time.Since(bl.startTime).Milliseconds() + bl.delay.Milliseconds() - bl.duration.Milliseconds(), }) bl.queue = append(bl.queue, byteData) diff --git a/tstream/pkg/streamer/streamer.go b/tstream/pkg/streamer/streamer.go index 5f6b947..e2082f4 100644 --- a/tstream/pkg/streamer/streamer.go +++ b/tstream/pkg/streamer/streamer.go @@ -53,7 +53,7 @@ func New(clientAddr, serverAddr, username, title string) *Streamer { title: title, Out: out, In: in, - delay: 3 * time.Second, + delay: 4 * time.Second, blockDuration: 3 * time.Second, // block size has to smaller than delay } } From cb2c7617bad92832a4552247950f537134113d82 Mon Sep 17 00:00:00 2001 From: Earther Date: Sun, 8 Aug 2021 15:59:20 +0700 Subject: [PATCH 5/5] remove unused code --- client/src/lib/message.ts | 29 ----------------------------- 1 file changed, 29 deletions(-) diff --git a/client/src/lib/message.ts b/client/src/lib/message.ts index 0095a8a..2472e88 100644 --- a/client/src/lib/message.ts +++ b/client/src/lib/message.ts @@ -1,8 +1,3 @@ -//import * as pako from "pako"; -//import * as constants from "./constants"; -//import * as base64 from "./base64"; -//import PubSub from "./pubsub"; - export interface TermWriteBlock { Data: string; Duration: number; @@ -20,27 +15,3 @@ export interface ChatMsg { Color: string; Time: string; } - - - -// *** Message handlers *** -//export function hanldeWriteBlockMessage(msgData: string, msgManager: PubSub) { -// let blockMsg: TermWriteBlock = JSON.parse(window.atob(msgData)); -// // this is a big chunk of encoding/decoding -// // Since we have to : reduce message size by usign gzip and also -// // every single termwrite have to be decoded, or else the rendering will screw up -// // the whole block often took 9-20 milliseconds to decode a 3 seconds block of message -// let data = pako.ungzip(base64.str2ab(blockMsg.Data)); -// let dataArr: string[] = JSON.parse(base64.ab2str(data)); -// //let bufferArray: Uint8Array[] = []; -// dataArr.forEach((data: string) => { -// let writeMsg: TermWrite = JSON.parse(window.atob(data)); -// let buffer = base64.str2ab(writeMsg.Data) -// //bufferArray.push(buffer); -// setTimeout(() => { -// msgManager.pub(, buffer); -// }, writeMsg.Offset as number); -// }) -// //console.log(base64.concatab(bufferArray).length); -// //msgManager.pub(msg.Type, base64.concatab(bufferArray)); -//}