Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question\Websocket] How to send binary data with neffos(websocket)? #10

Closed
StartAt24 opened this issue Aug 22, 2019 · 13 comments
Closed

Comments

@StartAt24
Copy link

#387 I have read this issue. But cannot found, websocket.Config

Then i found that, Iris websocket library is now merged with the neffos real-time framework
But i cann't find a correct way to send binary data with neffos.

Is there any config for neffos to send binary instead of string?

@StartAt24 StartAt24 changed the title [question] [Question\Websocket] How to send binary data with neffos(websocket)? Aug 22, 2019
@majidbigdeli
Copy link

Please set SetBinary = true in neffos.Message

neffos.Message Struct

type Message struct {
	wait string

	// The Namespace that this message sent to/received from.
	Namespace string
	// The Room that this message sent to/received from.
	Room string
	// The Event that this message sent to/received from.
	Event string
	// The actual body of the incoming/outcoming data.
	Body []byte
	// The Err contains any message's error, if any.
	// Note that server-side and client-side connections can return an error instead of a message from each event callbacks,
	// except the clients's force Disconnect which its local event doesn't matter when disconnected manually.
	Err error

	// if true then `Err` is filled by the error message and
	// the last segment of incoming/outcoming serialized message is the error message instead of the body.
	isError bool
	isNoOp  bool

	isInvalid bool

	// the CONN ID, filled automatically if `Server#Broadcast` first parameter of sender connection's ID is not empty,
	// not exposed to the subscribers (rest of the clients).
	// This is the ID across neffos servers when scale.
	from string
	// When sent by the same connection of the current running server instance.
	// This field is serialized/deserialized but it's clean on sending or receiving from a client
	// and it's only used on StackExchange feature.
	// It's serialized as the first parameter, instead of wait signal, if incoming starts with 0x.
	FromExplicit string // the exact Conn's pointer in this server instance.
	// Reports whether this message is coming from a stackexchange.
	// This field is not exposed and it's not serialized at all, ~local-use only~.
	//
	// The "wait" field can determinate if this message is coming from a stackexchange using its second char,
	// This value set based on "wait" on deserialization when coming from remote side.
	// Only server-side can actually set it.
	FromStackExchange bool

	// To is the connection ID of the receiver, used only when `Server#Broadcast` is called, indeed when we only need to send a message to a single connection.
	// The Namespace, Room are still respected at all.
	//
	// However, sending messages to a group of connections is done by the `Room` field for groups inside a namespace or just `Namespace` field as usual.
	// This field is not filled on sending/receiving.
	To string

	// True when event came from local (i.e client if running client) on force disconnection,
	// i.e OnNamespaceDisconnect and OnRoomLeave when closing a conn.
	// This field is not filled on sending/receiving.
	// Err does not matter and never sent to the other side.
	IsForced bool
	// True when asking the other side and fire the respond's event (which matches the sent for connect/disconnect/join/leave),
	// i.e if a client (or server) onnection want to connect
	// to a namespace or join to a room.
	// Should be used rarely, state can be checked by `Conn#IsClient() bool`.
	// This field is not filled on sending/receiving.
	IsLocal bool

	// True when user define it for writing, only its body is written as raw native websocket message, namespace, event and all other fields are empty.
	// The receiver should accept it on the `OnNativeMessage` event.
	// This field is not filled on sending/receiving.
	IsNative bool

	// Useful rarely internally on `Conn#Write` namespace and rooms checks, i.e `Conn#DisconnectAll` and `NSConn#RemoveAll`.
	// If true then the writer's checks will not lock connectedNamespacesMutex or roomsMutex again. May be useful in the future, keep that solution.
	locked bool

	// if server or client should write using Binary message.
	// This field is not filled on sending/receiving.
	SetBinary bool
}

@StartAt24
Copy link
Author

@majidbigdeli Hi, thanks for your help.
I have set SetBinary = true . But i got this error from neffos.js in my client.
jserror

Here is my client

        var scheme = document.location.protocol == "https:" ? "wss" : "ws";
        var port = document.location.port ? ":" + document.location.port : "";
        var wsURL = scheme + "://" + document.location.hostname + port + "/stream";
        
        function handleError(reason) {
            console.log(reason);
        }
        
        function handleNamespaceConnectedConn(nsConn) {

        }
        // const username = window.prompt("Your username?");
        async function runExample() {
            // You can omit the "default" and simply define only Events, the namespace will be an empty string"",
            // however if you decide to make any changes on this example make sure the changes are reflecting inside the ../server.go file as well.
            try {
                const conn = await neffos.dial(wsURL, {
                    default: { // "default" namespace.
                        _OnNamespaceConnected: function (nsConn, msg) {
                            handleNamespaceConnectedConn(nsConn)
                        },
                        _OnNamespaceDisconnect: function (nsConn, msg) {
                        },
                        stream: function (nsConn, msg) { // "stream" event.
                            console.log(msg.Body);
                            console.log(msg)
                        }
                    }
                },{
                    headers: {
                        "X-Username": "",
                    }
                });
                // You can either wait to conenct or just conn.connect("connect")
                // and put the `handleNamespaceConnectedConn` inside `_OnNamespaceConnected` callback instead.
                // const nsConn = await conn.connect("default");
                // nsConn.emit(...); handleNamespaceConnectedConn(nsConn);
                conn.connect("default");
            } catch (err) {
                handleError(err);
            }
        }

@kataras kataras transferred this issue from kataras/iris Aug 22, 2019
@kataras
Copy link
Owner

kataras commented Aug 30, 2019

Hello @StartAt24, can you provide us a reproducible code?

@majidbigdeli
Copy link

majidbigdeli commented Aug 30, 2019

Hello @kataras I think this issue Related To Neffos.js.

in neffos.js function splitN

function splitN(s: string, sep: string, limit: number): Array<string> {
    if (limit == 0) return [s];
    var arr = s.split(sep, limit);
    if (arr.length == limit) {
        let curr = arr.join(sep) + sep;
        arr.push(s.substr(curr.length));
        return arr;
    } else {
        return [s];
    }
}

you use s.split(sep, limit);

When We set SetBinary = true
s is binary not string then you get split is not function .

@StartAt24
Copy link
Author

@majidbigdeli
Yes, i know that.
So can i say that Neffos.js donn't support SetBinary = true ?

Here is my current solution:
I've changed to neffos.Message{SetBinary: true, IsNative: true, Body:png}. And use native websocket api instead of Neffos.js .
Everything works fine. But on the other side, i cannot use some advance features of websocket module like namespace room.

@StartAt24
Copy link
Author

@kataras sure. Here is my test code.

Server:

package main

import (
	"github.com/kataras/iris"
	"fmt"
	"github.com/kataras/iris/websocket"
	"github.com/kataras/neffos"
	"time"
)

// 全局变量
var page = struct {
    Title string
}{"Collector"}

func main(){
	app := iris.New()

	ws_server := startWebSocketServer(app)

	go pub_thread(ws_server)

	app.RegisterView(iris.HTML("./static", ".html"))
	app.Get("/" ,func(ctx iris.Context){
		ctx.ViewData("Page", page)
		ctx.View("index.html")
	})

	app.HandleDir("/", "./static")	
    app.Run(iris.Addr(":5000"), iris.WithoutPathCorrection)
}

var serverEvents = websocket.Namespaces{
	"default": websocket.Events{
		websocket.OnNamespaceConnected: func(nsConn *websocket.NSConn, msg websocket.Message) error {
			// with `websocket.GetContext` you can retrieve the Iris' `Context`.
			ctx := websocket.GetContext(nsConn.Conn)

			fmt.Printf("[%s] connected to namespace [%s] with IP [%s]\n",
				nsConn, msg.Namespace,
				ctx.RemoteAddr())
			return nil
		},
		websocket.OnNamespaceDisconnect: func(nsConn *websocket.NSConn, msg websocket.Message) error {
			fmt.Printf("[%s] disconnected from namespace [%s]\n", nsConn, msg.Namespace)
			return nil
		},
		"stream": func(nsConn *websocket.NSConn, msg websocket.Message) error {
			// room.String() returns -> NSConn.String() returns -> Conn.String() returns -> Conn.ID()
			fmt.Printf("[%s] sent: %s", nsConn, string(msg.Body))

			// Write message back to the client message owner with:
			// nsConn.Emit("chat", msg)
			// Write message to all except this client with:
			nsConn.Conn.Server().Broadcast(nsConn, msg)
			return nil
		},
	},
}

func startWebSocketServer(app *iris.Application) *neffos.Server{
	server := websocket.New(websocket.DefaultGorillaUpgrader, serverEvents)
	server.OnConnect = func(c *websocket.Conn) error {
		fmt.Printf("[%s] connected to the server.\n", c)
		
		return nil
	}

	server.OnDisconnect = func(c *websocket.Conn){
		fmt.Printf("[%s] disconnected from the server.", c)
	}

	fmt.Printf("Listening on: %d\nPress CTRL/CMD+C to interrupt.\n", 5000)

	idGen := func(ctx iris.Context) string {
		if username := ctx.GetHeader("X-Username"); username != "" {
			return username
		}

		return websocket.DefaultIDGenerator(ctx)
	}

	app.Get("/stream", websocket.Handler(server, idGen))

	return server
}

func pub_thread(serve *neffos.Server){
	// for png:= range in{
	// 	serve.Broadcast(nil, neffos.Message{SetBinary: true,  Body:png, Namespace: "default"})
	// }
	png:= [...] byte{'h','o','l','l','e'}
	slice := png[:]
	for{
		serve.Broadcast(nil, neffos.Message{SetBinary: true,  Body:slice, Namespace: "default"})
		time.Sleep(1*time.Second)
	}
}

web:

<html>
    <button> useless button</button>

    <script src="https://cdn.jsdelivr.net/npm/neffos.js@latest/dist/neffos.js"></script>
    <script>
    
        var scheme = document.location.protocol == "https:" ? "wss" : "ws";
        var port = document.location.port ? ":" + document.location.port : "";
        var wsURL = scheme + "://" + document.location.hostname + port + "/stream";
        
        function handleError(reason) {
            console.log(reason);
        }
        
        function handleNamespaceConnectedConn(nsConn) {

        }
        // const username = window.prompt("Your username?");
        async function runExample() {
            // You can omit the "default" and simply define only Events, the namespace will be an empty string"",
            // however if you decide to make any changes on this example make sure the changes are reflecting inside the ../server.go file as well.
            try {
                const conn = await neffos.dial(wsURL, {
                    default: { // "default" namespace.
                        _OnNamespaceConnected: function (nsConn, msg) {
                            handleNamespaceConnectedConn(nsConn)
                        },
                        _OnNamespaceDisconnect: function (nsConn, msg) {
                        },
                        stream: function (nsConn, msg) { // "stream" event.
                            console.log(msg.Body);
                            console.log(msg)
                        }
                    }
                },{
                    headers: {
                        "X-Username": "",
                    }
                });
                // You can either wait to conenct or just conn.connect("connect")
                // and put the `handleNamespaceConnectedConn` inside `_OnNamespaceConnected` callback instead.
                // const nsConn = await conn.connect("default");
                // nsConn.emit(...); handleNamespaceConnectedConn(nsConn);
                conn.connect("default");
            } catch (err) {
                handleError(err);
            }
        }

        runExample()
    </script>
</html>

Then you will got is from console

Uncaught TypeError: s.split is not a function
    at splitN (neffos.ts:249)
    at deserializeMessage (neffos.ts:273)
    at Conn.handleMessage (neffos.ts:1053)
    at Conn.handle (neffos.ts:1020)
    at WebSocket.<anonymous> (neffos.ts:791)

@kataras
Copy link
Owner

kataras commented Sep 19, 2019

Hello @StartAt24 @majidbigdeli, try to use the latest neffos.js e.g.

<script src="//cdn.jsdelivr.net/npm/neffos.js@latest/dist/neffos.min.js"></script>

Results based on your example (just added the Event: "stream" in order to be logged to the browser's console):

neffos_issue_10

It's done by parsing the neffos part as utf-8 and let the body as array buffer (uint8) for maximum performance.

@majidbigdeli
Copy link

@kataras ThankYou .
I Read your Commits for this issue .
good solution.
Thank You Again .

@StartAt24
Copy link
Author

@kataras @majidbigdeli thanks for your work!
I still have some problem with my project.
Like, i have 17 messages to send at the same time. But only got 7 messages in the client.
I have set the server.SyncBroadcaster = true. But it does not work.
Here is my code

var serverEvents = websocket.Namespaces{
	"default": websocket.Events{
		websocket.OnNamespaceConnected: func(nsConn *websocket.NSConn, msg websocket.Message) error {
			// with `websocket.GetContext` you can retrieve the Iris' `Context`.
			ctx := websocket.GetContext(nsConn.Conn)

			fmt.Printf("[%s] connected to namespace [%s] with IP [%s]\n",
				nsConn, msg.Namespace,
				ctx.RemoteAddr())

			return nil
		},
		websocket.OnNamespaceDisconnect: func(nsConn *websocket.NSConn, msg websocket.Message) error {
			fmt.Printf("[%s] disconnected from namespace [%s]\n", nsConn, msg.Namespace)
			return nil
		},
	"stream": func(nsConn *websocket.NSConn, msg websocket.Message) error {
		// room.String() returns -> NSConn.String() returns -> Conn.String() returns -> Conn.ID()
		fmt.Printf("[%s] sent: %s", nsConn, string(msg.Body))

		// Write message back to the client message owner with:
		// nsConn.Emit("chat", msg)
		// Write message to all except this client with:
		// nsConn.Conn.Server().Broadcast(nsConn, msg)
		return nil
	},
	},
}

func startWebSocketServer(app *iris.Application) *neffos.Server{
	server := websocket.New(websocket.DefaultGorillaUpgrader, serverEvents)
	server.SyncBroadcaster = true

	server.OnConnect = func(c *websocket.Conn) error {
		fmt.Printf("[%s] connected to the server.\n", c)
		return nil
	}

	server.OnDisconnect = func(c *websocket.Conn){
		fmt.Printf("[%s] disconnected from the server.", c)
	}

	fmt.Printf("Listening on: %d\nPress CTRL/CMD+C to interrupt.", 5000)

	idGen := func(ctx iris.Context) string {
		if username := ctx.GetHeader("X-Username"); username != "" {
			return username
		}

		return websocket.DefaultIDGenerator(ctx)
	}

	app.Get("/stream", websocket.Handler(server, idGen))

	return server
}

Also i have write a test program. But in that program, everything works fine. So i have compared them. There is no diff.
Any thing wrong with my code?

@StartAt24
Copy link
Author

I have debug that. messages have all been send to the client.
But in the client only part of the messages, can trigger the callback.

So the bug must be in neffos.min.js. I have no time to debug it.
Can you give me some suggestions.
@majidbigdeli @kataras

@StartAt24
Copy link
Author

@kataras @majidbigdeli Sorry to bother you. But i have found the bug.
It is in neffos.js.

I'm using protobuf with neffos.js. The data that protobuf make counld contain ;. So that my message will be drop in neffos.js in here.

  var isArrayBuffer = data instanceof ArrayBuffer;
    var dts;

    console.log(isArrayBuffer)

    if (isArrayBuffer) {
        var arr = new Uint8Array(data);
        var sepCount = 1;
        var lastSepIndex = 0;
        for (var i = 0; i < arr.length; i++) {
            if (arr[i] == messageSeparatorCharCode) { // sep char.
                sepCount++;
                lastSepIndex = i;
            }
        }
       // Drop here!!!!!!!!!!!!
        if (sepCount != validMessageSepCount) {
            msg.isInvalid = true;
            console.log("return at validMessageSepCount")
            return msg;
        }
        dts = splitN(textDecoder.decode(arr.slice(0, lastSepIndex)), messageSeparator, validMessageSepCount - 2);
        dts.push(data.slice(lastSepIndex + 1, data.length));
        msg.SetBinary = true;
    }
    else {
        dts = splitN(data, messageSeparator, validMessageSepCount - 1);
    }

I thank it is a bug. Cannot just split message by ;.

@StartAt24
Copy link
Author

StartAt24 commented Sep 27, 2019

@kataras @majidbigdeli here is the reproducible code.
Server

package main

import (
	"github.com/kataras/iris"
	"fmt"
	"github.com/kataras/iris/websocket"
	"github.com/kataras/neffos"
	"time"
)

// 全局变量
var page = struct {
    Title string
}{"Collector"}

func main(){
	app := iris.New()

	ws_server := startWebSocketServer(app)

	go pub_thread(ws_server)

	app.RegisterView(iris.HTML("./static", ".html"))
	app.Get("/" ,func(ctx iris.Context){
		ctx.ViewData("Page", page)
		ctx.View("index.html")
	})

	app.HandleDir("/", "./static")	
    app.Run(iris.Addr(":5000"), iris.WithoutPathCorrection)
}

var serverEvents = websocket.Namespaces{
	"default": websocket.Events{
		websocket.OnNamespaceConnected: func(nsConn *websocket.NSConn, msg websocket.Message) error {
			// with `websocket.GetContext` you can retrieve the Iris' `Context`.
			ctx := websocket.GetContext(nsConn.Conn)

			fmt.Printf("[%s] connected to namespace [%s] with IP [%s]\n",
				nsConn, msg.Namespace,
				ctx.RemoteAddr())
			return nil
		},
		websocket.OnNamespaceDisconnect: func(nsConn *websocket.NSConn, msg websocket.Message) error {
			fmt.Printf("[%s] disconnected from namespace [%s]\n", nsConn, msg.Namespace)
			return nil
		},
		"stream": func(nsConn *websocket.NSConn, msg websocket.Message) error {
			// room.String() returns -> NSConn.String() returns -> Conn.String() returns -> Conn.ID()
			fmt.Printf("[%s] sent: %s", nsConn, string(msg.Body))

			// Write message back to the client message owner with:
			// nsConn.Emit("chat", msg)
			// Write message to all except this client with:
			nsConn.Conn.Server().Broadcast(nsConn, msg)
			return nil
		},
	},
}

func startWebSocketServer(app *iris.Application) *neffos.Server{
	server := websocket.New(websocket.DefaultGorillaUpgrader, serverEvents)
	server.OnConnect = func(c *websocket.Conn) error {
		fmt.Printf("[%s] connected to the server.\n", c)
		
		return nil
	}

	server.OnDisconnect = func(c *websocket.Conn){
		fmt.Printf("[%s] disconnected from the server.", c)
	}

	fmt.Printf("Listening on: %d\nPress CTRL/CMD+C to interrupt.\n", 5000)

	idGen := func(ctx iris.Context) string {
		if username := ctx.GetHeader("X-Username"); username != "" {
			return username
		}

		return websocket.DefaultIDGenerator(ctx)
	}

	app.Get("/stream", websocket.Handler(server, idGen))

	return server
}

func pub_thread(serve *neffos.Server){
	png:= [...] byte{';',';',';',';',';'}
	slice := png[:]
	for{
		serve.Broadcast(nil, neffos.Message{SetBinary: true,  Body:slice, Namespace: "default"})
		time.Sleep(1*time.Second)
	}
}

Client

<html>
    <button> useless button</button>

    <script src="https://cdn.jsdelivr.net/npm/neffos.js@latest/dist/neffos.js"></script>
    <script>
    
        var scheme = document.location.protocol == "https:" ? "wss" : "ws";
        var port = document.location.port ? ":" + document.location.port : "";
        var wsURL = scheme + "://" + document.location.hostname + port + "/stream";
        
        function handleError(reason) {
            console.log(reason);
        }
        
        function handleNamespaceConnectedConn(nsConn) {

        }
        // const username = window.prompt("Your username?");
        async function runExample() {
            // You can omit the "default" and simply define only Events, the namespace will be an empty string"",
            // however if you decide to make any changes on this example make sure the changes are reflecting inside the ../server.go file as well.
            try {
                const conn = await neffos.dial(wsURL, {
                    default: { // "default" namespace.
                        _OnNamespaceConnected: function (nsConn, msg) {
                            handleNamespaceConnectedConn(nsConn)
                        },
                        _OnNamespaceDisconnect: function (nsConn, msg) {
                        },
                        stream: function (nsConn, msg) { // "stream" event.
                            console.log(msg.Body);
                            console.log(msg)
                        }
                    }
                },{
                    headers: {
                        "X-Username": "",
                    }
                });
                // You can either wait to conenct or just conn.connect("connect")
                // and put the `handleNamespaceConnectedConn` inside `_OnNamespaceConnected` callback instead.
                // const nsConn = await conn.connect("default");
                // nsConn.emit(...); handleNamespaceConnectedConn(nsConn);
                conn.connect("default");
            } catch (err) {
                handleError(err);
            }
        }

        runExample()
    </script>
</html>

@kataras
Copy link
Owner

kataras commented Dec 16, 2019

For everybody else, continue at: #20

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants