Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eventsource custom headers #1242

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions src/AsyncEventSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ size_t AsyncEventSourceMessage::send(AsyncClient *client) {
if(client->canSend())
client->send();
_sent += sent;
return sent;
return sent;
}

// Client
Expand All @@ -159,7 +159,7 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A
_lastId = 0;
if(request->hasHeader("Last-Event-ID"))
_lastId = atoi(request->getHeader("Last-Event-ID")->value().c_str());

_client->setRxTimeout(0);
_client->onError(NULL, NULL);
_client->onAck([](void *r, AsyncClient* c, size_t len, uint32_t time){ (void)c; ((AsyncEventSourceClient*)(r))->_onAck(len, time); }, this);
Expand Down Expand Up @@ -253,6 +253,7 @@ AsyncEventSource::AsyncEventSource(const String& url)
: _url(url)
, _clients(LinkedList<AsyncEventSourceClient *>([](AsyncEventSourceClient *c){ delete c; }))
, _connectcb(NULL)
, _responsecb(NULL)
{}

AsyncEventSource::~AsyncEventSource(){
Expand All @@ -262,6 +263,9 @@ AsyncEventSource::~AsyncEventSource(){
void AsyncEventSource::onConnect(ArEventHandlerFunction cb){
_connectcb = cb;
}
void AsyncEventSource::onResponse(ArOnResponseFunction cb){
_responsecb = cb;
}

void AsyncEventSource::_addClient(AsyncEventSourceClient * client){
/*char * temp = (char *)malloc(2054);
Expand All @@ -276,7 +280,7 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient * client){
client->write((const char *)temp, 2053);
free(temp);
}*/

_clients.add(client);
if(_connectcb)
_connectcb(client);
Expand All @@ -297,10 +301,10 @@ void AsyncEventSource::close(){
size_t AsyncEventSource::avgPacketsWaiting() const {
if(_clients.isEmpty())
return 0;

size_t aql=0;
uint32_t nConnectedClients=0;

for(const auto &c: _clients){
if(c->connected()) {
aql+=c->packetsWaiting();
Expand Down Expand Up @@ -339,18 +343,20 @@ bool AsyncEventSource::canHandle(AsyncWebServerRequest *request){
void AsyncEventSource::handleRequest(AsyncWebServerRequest *request){
if((_username != "" && _password != "") && !request->authenticate(_username.c_str(), _password.c_str()))
return request->requestAuthentication();
request->send(new AsyncEventSourceResponse(this));
request->send(new AsyncEventSourceResponse(this, _responsecb));
}

// Response

AsyncEventSourceResponse::AsyncEventSourceResponse(AsyncEventSource *server){
AsyncEventSourceResponse::AsyncEventSourceResponse(AsyncEventSource *server, ArOnResponseFunction responsecb){
_server = server;
_code = 200;
_contentType = "text/event-stream";
_sendContentLength = false;
addHeader("Cache-Control", "no-cache");
addHeader("Connection","keep-alive");
if (responsecb)
responsecb(this);
}

void AsyncEventSourceResponse::_respond(AsyncWebServerRequest *request){
Expand Down
11 changes: 7 additions & 4 deletions src/AsyncEventSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ class AsyncEventSource;
class AsyncEventSourceResponse;
class AsyncEventSourceClient;
typedef std::function<void(AsyncEventSourceClient *client)> ArEventHandlerFunction;
typedef std::function<void(AsyncEventSourceResponse *)> ArOnResponseFunction;

class AsyncEventSourceMessage {
private:
uint8_t * _data;
uint8_t * _data;
size_t _len;
size_t _sent;
//size_t _ack;
size_t _acked;
size_t _acked;
public:
AsyncEventSourceMessage(const char * data, size_t len);
~AsyncEventSourceMessage();
Expand Down Expand Up @@ -90,7 +91,7 @@ class AsyncEventSourceClient {

//system callbacks (do not call)
void _onAck(size_t len, uint32_t time);
void _onPoll();
void _onPoll();
void _onTimeout(uint32_t time);
void _onDisconnect();
};
Expand All @@ -100,13 +101,15 @@ class AsyncEventSource: public AsyncWebHandler {
String _url;
LinkedList<AsyncEventSourceClient *> _clients;
ArEventHandlerFunction _connectcb;
ArOnResponseFunction _responsecb;
public:
AsyncEventSource(const String& url);
~AsyncEventSource();

const char * url() const { return _url.c_str(); }
void close();
void onConnect(ArEventHandlerFunction cb);
void onResponse(ArOnResponseFunction cb);
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
size_t count() const; //number clinets connected
size_t avgPacketsWaiting() const;
Expand All @@ -123,7 +126,7 @@ class AsyncEventSourceResponse: public AsyncWebServerResponse {
String _content;
AsyncEventSource *_server;
public:
AsyncEventSourceResponse(AsyncEventSource *server);
AsyncEventSourceResponse(AsyncEventSource *server, ArOnResponseFunction);
void _respond(AsyncWebServerRequest *request);
size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time);
bool _sourceValid() const { return true; }
Expand Down