-
Notifications
You must be signed in to change notification settings - Fork 1
/
websocket.clj
236 lines (203 loc) · 8.17 KB
/
websocket.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
(ns clj-wamp-example.websocket
(:use [clojure.string :only [split]]
[clj-wamp-example.config :only [conf]])
(:require [clojure.tools.logging :as log]
[org.httpkit.server :as http-kit]
[org.httpkit.timer :as timer]
[clj-wamp.server :as wamp]))
(declare usernames add-username del-username get-username get-user-list truncate-str)
;; Topic BaseUrls
(def ws-base-url "http://clj-wamp-example")
(def rpc-base-url (str ws-base-url "/rpc#"))
(def evt-base-url (str ws-base-url "/event#"))
(defn rpc-url [path] (str rpc-base-url path))
(defn evt-url [path] (str evt-base-url path))
;; RPC functions
(def calc-state (atom {}))
(defn- calc-op
"calculates the previously received value and operation
with the current value, and returns the result"
[sess-id op v]
(let [{last-val :last-val last-op :last-op} (@calc-state sess-id)
new-val (if last-op (last-op last-val v) v)]
(swap! calc-state assoc sess-id {:last-val new-val :last-op op})
new-val))
(defn rpc-calc
"a simple calculator via rpc"
[op v]
(let [sess-id wamp/*call-sess-id*
v (Double. v)]
(log/debug "Websocket RPC [" sess-id "] Calc: " op v)
(try
(case op
"C" (do (swap! calc-state dissoc sess-id) 0)
"+" (calc-op sess-id + v)
"-" (calc-op sess-id - v)
"/" (calc-op sess-id / v)
"*" (calc-op sess-id * v)
"=" (let [v (calc-op sess-id nil v)]
(swap! calc-state dissoc sess-id) v)
; default:
{:error {:uri "http://clj-wamp-example/error#calc"
:message "Invalid Operation"}})
; clear the calc-state on any exception and rethrow
(catch Exception e
(swap! calc-state dissoc sess-id)
(throw e)))))
;; PubSub Subscription Handlers
(defn ws-chat-subscribe?
"Should the client be allowed to subscribe to the 'chat' topic?"
[sess-id topic]
(log/debug "Websocket subscribe? [" sess-id "] " topic)
true) ; return false to deny subscription to this topic
(defn ws-on-subscribe
"After successfully subscribing to any topic"
[sess-id topic]
(log/debug "Websocket subscribed [" sess-id "] " topic)
(when (= topic (evt-url "chat"))
(let [username (get-username sess-id)]
(wamp/send-event! topic
{:type "user-joined"
:clientId sess-id
:username username})
; The following events are sent only to the client who is subscribing (4th param true)
(wamp/emit-event! topic
{:type "user-list"
:users (get-user-list topic)}
sess-id)
(wamp/emit-event! topic
{:type "message"
:clientId 0
:username "clj-wamp"
:message (str "Hello, *" username "*, welcome to clj-wamp chat! "
"To change your username, click the orange name in the
\"Users\" list on the right.")}
sess-id)
; TODO: it would be better to cancel timers upon disconnect/unsubscribe
(timer/schedule-task 3000
(wamp/emit-event! topic
{:type "message"
:clientId 0
:username "clj-wamp"
:message "Type a message in the input below and hit enter to start chatting."}
sess-id))
(timer/schedule-task 6000
(wamp/emit-event! topic
{:type "message"
:clientId 0
:username "clj-wamp"
:message "To hide system messages (join, leave, etc.),
click the button in the top right. Enjoy!"}
sess-id)))))
(defn ws-on-unsubscribe
"After unsubscribing from any topic, notify other users and
clean up the stored username"
[sess-id topic]
(log/debug "Websocket unsubscribed [" sess-id "] " topic)
(when (= topic (evt-url "chat"))
(wamp/send-event! topic {:type "user-left"
:clientId sess-id
:username (get-username sess-id)})
(del-username sess-id)))
;; PubSub Event Message Handlers
(defn on-chat-message
"When receiving a 'chat message', validate/filter the message
and broker with additional info (username, clientId)"
[sess-id topic event exclude eligible]
; Ignore empty messages
(when (> (count (event "message")) 0)
(let [message (event "message")
event (assoc event
; Truncate message
"message" (truncate-str message 140)
; Include username
"username" (get-username sess-id)
; Include client id
"clientId" sess-id)]
; return rewritten params for publish
[sess-id topic event exclude eligible])))
(defn on-chat-username
"When receiving a username changed event, validate username
and broker with additional info (oldUsername, clientId)"
[sess-id topic event exclude eligible]
; Ignore empty names
(when (> (count (event "newUsername")) 0)
(let [newUsername (truncate-str (event "newUsername") 20)
oldUsername (get-username sess-id)
event (assoc event
; Truncate message
"newUsername" newUsername
; Include old username
"oldUsername" oldUsername
; Include client id
"clientId" sess-id)]
; Ignore if same username
(when (not= newUsername oldUsername)
(add-username sess-id newUsername)
; return rewritten params for publish
[sess-id topic event exclude eligible]))))
(defn ws-on-chat-publish
"handles all publish events that are received in the 'chat' topic"
[sess-id topic event exclude eligible]
(log/debug "Websocket publish [" sess-id "] " topic event)
(when-let [event-type (event "type")]
(case event-type
"message" (on-chat-message sess-id topic event exclude eligible)
"username" (on-chat-username sess-id topic event exclude eligible)
false))) ; deny publish of unknown types
;; Main http-kit/WAMP WebSocket handler
(defn ws-on-open [sess-id]
(log/info "New websocket client connected [" sess-id "] Num:" (count @usernames)))
(defn ws-on-close [sess-id status]
(swap! calc-state dissoc sess-id) ; clean up old state
(log/info "Websocket client disconnected [" sess-id "] " status))
(defn- auth-secret [sess-id auth-key extra]
"Returns the auth key's secret (ie. password), typically retrieved from a database."
"secret-password")
(defn- auth-permissions
"Returns the permissions for a client session by auth key."
[sess-id auth-key]
{:rpc {(rpc-url "echo") true
(rpc-url "throw") true
(rpc-url "calc") true
(rpc-url "not-found") true
(rpc-url "ping") false}
:subscribe {(evt-url "chat") true}
:publish {(evt-url "chat") true}})
(defn wamp-handler
"Returns a http-kit websocket handler with wamp subprotocol"
[req]
(wamp/with-channel-validation req channel (:ws-origins-re (conf))
(wamp/http-kit-handler channel
{:on-open ws-on-open
:on-close ws-on-close
:on-call {(rpc-url "echo") identity
(rpc-url "throw") (fn [] (throw (Exception. "An exception")))
(rpc-url "calc") rpc-calc
(rpc-url "ping") (fn [] "pong")}
:on-subscribe {(evt-url "chat") ws-chat-subscribe?
:on-after ws-on-subscribe}
:on-publish {(evt-url "chat") ws-on-chat-publish}
:on-unsubscribe ws-on-unsubscribe
:on-auth {:secret auth-secret
:permissions auth-permissions}})))
;; Utilities
; { :sess-id-1 "username", :sess-id-2 "username" }
(def usernames (atom {}))
(defn add-username
[sess-id username]
(swap! usernames assoc sess-id username))
(defn del-username
[sess-id]
(swap! usernames dissoc sess-id))
(defn get-username [sess-id]
(if-let [username (@usernames sess-id)]
username
(let [new-username (str "guest-" (last (split sess-id #"-")))]
(add-username sess-id new-username)
new-username)))
(defn get-user-list [topic]
(map (fn [id] {:clientId id, :username (get-username id)})
(wamp/get-topic-clients topic)))
(defn truncate-str [s, n]
(apply str (take n s)))