forked from jojojames/PhxSocketCPP
-
Notifications
You must be signed in to change notification settings - Fork 0
/
PhxChannel.cpp
138 lines (115 loc) · 3.91 KB
/
PhxChannel.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#include "PhxChannel.h"
#include "PhxPush.h"
#include "PhxSocket.h"
PhxChannel::PhxChannel(std::shared_ptr<PhxSocket> socket,
const std::string& topic,
std::map<std::string, std::string> params) {
this->state = ChannelState::CLOSED;
this->topic = topic;
this->params = params;
this->socket = socket;
this->joinedOnce = false;
}
void PhxChannel::bootstrap() {
// NOTE: This can't be done in the constructor.
// So we bootstrap the connection here.
this->socket->addChannel(this->shared_from_this());
this->socket->onOpen([this]() { this->rejoin(); });
this->socket->onClose([this](const std::string& event) {
this->state = ChannelState::CLOSED;
this->socket->removeChannel(this->shared_from_this());
});
this->socket->onError([this](
const std::string& error) { this->state = ChannelState::ERRORED; });
std::shared_ptr<PhxPush> n = std::make_shared<PhxPush>(
this->shared_from_this(), "phx_join", this->params);
this->joinPush = std::move(n);
this->joinPush->onReceive("ok",
[this](nlohmann::json message) { this->state = ChannelState::JOINED; });
this->onEvent("phx_reply", [this](nlohmann::json message, int64_t ref) {
this->triggerEvent(this->replyEventName(ref), message, ref);
});
}
std::shared_ptr<PhxPush> PhxChannel::join() {
if (this->joinedOnce) {
// ERROR
} else {
this->joinedOnce = true;
}
this->sendJoin();
return this->joinPush;
}
void PhxChannel::rejoin() {
if (this->joinedOnce && this->state != ChannelState::JOINING
&& this->state != ChannelState::JOINED) {
this->sendJoin();
}
}
void PhxChannel::sendJoin() {
this->state = ChannelState::JOINING;
this->joinPush->setPayload(this->params);
this->joinPush->send();
}
void PhxChannel::leave() {
this->state = ChannelState::CLOSED;
nlohmann::json payload;
this->pushEvent("phx_leave", payload)
->onReceive("ok", [this](nlohmann::json message) {
this->triggerEvent("phx_close", "leave", -1);
});
}
void PhxChannel::onClose(OnClose callback) {
this->onEvent(
"phx_close", [this, callback](nlohmann::json message, int64_t ref) {
callback(message);
});
}
void PhxChannel::onError(OnError callback) {
this->onEvent("phx_error",
[callback](nlohmann::json error, int64_t ref) { callback(error); });
}
void PhxChannel::onEvent(const std::string& event, OnReceive callback) {
this->bindings.emplace_back(event, callback);
}
void PhxChannel::offEvent(const std::string& event) {
// Remove all Event bindings that match event.
std::vector<std::tuple<std::string, OnReceive>> v = this->bindings;
v.erase(std::remove_if(v.begin(),
v.end(),
[this, event](std::tuple<std::string, OnReceive> x) {
return std::get<0>(x) == event;
}),
v.end());
}
bool PhxChannel::isMemberOfTopic(const std::string& topic) {
return this->topic == topic;
}
void PhxChannel::triggerEvent(
const std::string& event, nlohmann::json message, int64_t ref) {
// Trigger OnReceive callbacks that match event.
std::vector<std::tuple<std::string, OnReceive>> v = this->bindings;
for (std::tuple<std::string, OnReceive>& it : v) {
if (std::get<0>(it) == event) {
std::get<1>(it)(message, ref);
}
}
}
std::shared_ptr<PhxPush> PhxChannel::pushEvent(
const std::string& event,
nlohmann::json payload) {
std::shared_ptr<PhxPush> p
= std::make_shared<PhxPush>(this->shared_from_this(), event, payload);
p->send();
return p;
}
std::shared_ptr<PhxSocket> PhxChannel::getSocket() {
return this->socket;
}
std::string PhxChannel::replyEventName(int64_t ref) {
std::string text = "chan_reply_";
text += std::to_string(ref);
return text;
}
std::string PhxChannel::getTopic() {
return this->topic;
}