Skip to content

Commit

Permalink
Report RTMP client connection error (#42)
Browse files Browse the repository at this point in the history
* progress

* tweak

* allow disconnect event

* Revert "allow disconnect event"

This reverts commit cf1f143.

* add swig file

* update cxx file

* Review changes

* Return value

* tweak

* Revert

* tweak

* Add unit tests

* minor type info change
  • Loading branch information
harryz2000 authored Jun 20, 2024
1 parent 1400231 commit 9065d8d
Show file tree
Hide file tree
Showing 7 changed files with 360 additions and 57 deletions.
16 changes: 10 additions & 6 deletions lib/ClientConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const OutgoingStreamBridge = require("./OutgoingStreamBridge.js")
/**
* @typedef {Object} ClientConnectionEvents
* @property {(self: ClientConnection) => void} connected
* @property {(self: ClientConnection) => void} disconnected
* @property {(self: ClientConnection, errorCode: Number) => void} disconnected
* @property {(self: ClientConnection) => void} stopped
* @property {(self: ClientConnection, name: string, cmd: any[]) => void} cmd
*/
Expand Down Expand Up @@ -66,27 +66,30 @@ class ClientConnection extends Emitter
this.emit("cmd", this, name, cmd);
}
};

this.ondisconnected = () =>
this.ondisconnected = (/** @type {Number} */errorCode) =>
{
//Check if already stopped
if (this.stopped)
//Do nothing
return;

//Emit event
this.emit("disconnected", this);
this.emit("disconnected", this, errorCode);
//Stop us
this.stop();
};
}

/**
* @returns {Number}
*/
connect(
/** @type {string} */ server,
/** @type {number} */ port,
/** @type {string} */ app)
{
this.connection.Connect(server, port, app);
return this.connection.Connect(server, port, app);
}


Expand All @@ -96,6 +99,7 @@ class ClientConnection extends Emitter
const [,streamId] = await new Promise((resolve,reject) => this.connection.CreateStream({resolve,reject}));
//Send publish cmd for stream
this.connection.Publish(streamId, name);

//Create new outgoingstream
const outgoingStream = new OutgoingStreamBridge(streamId, this.connection);

Expand Down
15 changes: 15 additions & 0 deletions lib/ErrorCode.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

const RtmpClientConnectionErrorCode = Object.freeze({
NoError: 0,
Generic : 1,
FailedToResolveURL : 2,
GetSockOptError : 3,
FailedToConnectSocket : 4,
ConnectCommandFailed : 5,
FailedToParseData : 6,
PeerClosed : 7,
ReadError : 8,
PollError : 9
});

module.exports = RtmpClientConnectionErrorCode;
1 change: 1 addition & 0 deletions lib/RTMPServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const ClientConnection = require("./ClientConnection.js");
const RTMPServer = {};
RTMPServer.NetStream = require("./Status.js").NetStream;
RTMPServer.NetConnection = require("./Status.js").NetConnection;
RTMPServer.NetConnectionErrorCode = require("./ErrorCode.js");

//INitialize Stuff
Native.RTMPServerModule.Initialize();
Expand Down
34 changes: 29 additions & 5 deletions src/RTMPClientConnectionImpl.i
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ public:
persistent = std::make_shared<Persistent<v8::Object>>(object);
}

void Connect(const char* server,int port, const char* app)
RTMPClientConnection::ErrorCode Connect(const char* server,int port, const char* app)
{
RTMPClientConnection::Connect(server, port, app, this);
return RTMPClientConnection::Connect(server, port, app, this);
}

void CreateStream(v8::Local<v8::Object> promise)
Expand All @@ -36,6 +36,7 @@ public:
RTMPClientConnection::SendCommand(id, L"publish", nullptr, new AMFString(parser.GetWChar()));
}


void onConnected(RTMPClientConnection* conn) override
{
Log("-RTMPClientConnectionImpl::onConnected()\n");
Expand All @@ -48,15 +49,18 @@ public:
});
}

void onDisconnected(RTMPClientConnection* conn) override
void onDisconnected(RTMPClientConnection* conn, ErrorCode code) override
{
Log("-RTMPClientConnectionImpl::onDisconnected()\n");

//Run function on main node thread
RTMPServerModule::Async([=,cloned=persistent](){
Nan::HandleScope scope;
//Call object method with arguments
MakeCallback(cloned, "ondisconnected");
v8::Local<v8::Value> argv[1];
argv[0] = Nan::New<v8::Int32>(static_cast<int32_t>(code));

MakeCallback(cloned, "ondisconnected", 1, argv);
});
}

Expand Down Expand Up @@ -126,13 +130,33 @@ private:

%}

%nodefaultctor RTMPClientConnection;
class RTMPClientConnection
{
public:
enum class ErrorCode
{
NoError = 0,
Generic = 1,
FailedToResolveURL = 2,
GetSockOptError = 3,
FailedToConnectSocket = 4,
ConnectCommandFailed = 5,
FailedToParseData = 6,
PeerClosed = 7,
ReadError = 8,
PollError = 9
};
};


%nodefaultctor RTMPClientConnectionImpl;
class RTMPClientConnectionImpl :
public RTMPMediaStreamListener
{
public:
RTMPClientConnectionImpl(v8::Local<v8::Object> object);
void Connect(const char* server,int port, const char* app);
RTMPClientConnection::ErrorCode Connect(const char* server,int port, const char* app);
void CreateStream(v8::Local<v8::Object> object);
void Publish(DWORD streamId,v8::Local<v8::Object> url);
void DeleteStream(DWORD streamId, v8::Local<v8::Object> object);
Expand Down
2 changes: 1 addition & 1 deletion src/rtmp-server.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ export class RTMPClientConnectionImpl extends RTMPMediaStreamListener {

constructor(object: any);

Connect(server: string, port: number, app: string): void;
Connect(server: string, port: number, app: string): number;

CreateStream(object: any): void;

Expand Down
Loading

0 comments on commit 9065d8d

Please sign in to comment.