-
Notifications
You must be signed in to change notification settings - Fork 2
/
clorb-conn.lisp
422 lines (319 loc) · 13.5 KB
/
clorb-conn.lisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
;;; clorb-conn.lisp --- Connection Layer
(in-package :clorb)
(defvar *request-id-seq* 0)
;;(defvar *default-trace-connection* nil)
;;; Managing connections
;; defun create-connection (orb host port)
;; defun make-associated-connection (orb desc)
;; defun connection-working-p (conn)
;; defun connection-destroy (conn)
;; defun connection-shutdown (conn)
;;; Sending
;; defun connection-send-buffer (conn buffer)
;;; Client interface
;; defmethod next-request-id ((conn Connection))
;;- defun connection-add-client-request (conn request)
;;- defun connection-remove-client-request (conn request)
;; defun connection-send-request (conn buffer req)
;;; IIOP interface
;; defun connection-init-read (conn continue-p n callback)
;; defun connection-receive-reply (conn request-id buffer status ..)
;; defun connection-receive-locate-reply (conn request-id buffer status)
;; -defun find-waiting-client-request (conn request-id)
;; defun connection-add-fragment (conn buffer header-size)
;; defun connection-error (conn)
;; defun connection-close (conn)
;; defun connection-read-ready (conn)
;; defmethod connection-write-ready ((conn connection))
;; defun connection-init-defragmentation (conn handler)
;;; Server interface
;; defun connection-add-server-request (conn request)
;; defun connection-remove-server-request (conn request)
;;; Gc-connections (also Managing connections ?)
;; defun gc-connections (&optional except n)
;; defun auto-gc-read-handler (q desc)
;;; From IIOP
;;new
;; (defun connection-get-buffer (conn)
;;; Sending messages
;; defun connection-reply (conn giop-version reply-type request-id status
;; defun connection-message-error (conn &optional (version giop-1-0))
;;; IIOP
;; defun get-fragment (conn)
;; defun get-fragment-last (conn)
;; defun get-response-0 (conn)
;; defun get-response-reply (conn)
;; defun get-response-locate-reply (conn &aux (buffer (read-buffer-of conn)))
;; defun setup-outgoing-connection (conn)
;;; Message Format
;; defun server-close-connection-msg (conn)
;; defun marshal-locate-request (buffer req-id profile)
;; defun marshal-request-message (buffer ..)
;;;; Connection Class
(defclass Connection (synchronized)
((the-orb :initarg :orb :accessor the-orb)
(read-buffer :accessor read-buffer-of)
(read-callback :accessor connection-read-callback)
(write-buffer :initform nil :accessor write-buffer-of)
(write-request :initform nil :accessor write-request-of)
(write-count :initform 0 :accessor write-count-of )
(io-descriptor :initarg :io-descriptor :initform nil :accessor connection-io-descriptor)
(client-requests :initform nil :accessor connection-client-requests)
(server-requests :initform nil :accessor connection-server-requests)
(activity :initarg :activity :initform t :accessor activity)
(server-p :initarg :server-p :initform nil :accessor server-p)
(shutdown-status :initform nil :accessor shutdown-status
:documentation "State of shutdown nil, :sent, :closed")
;; Defrag support
(assembled-handler :initform nil :accessor assembled-handler)
(fragment-buffer :initform nil :accessor fragment-buffer)))
(defmethod print-object ((conn connection) stream)
(print-unreadable-object (conn stream :identity t :type t)
(let ((desc (connection-io-descriptor conn)))
(when desc (write-string (io-describe-descriptor desc) stream)))))
;;;; Connection Methods
(defmethod next-request-id ((conn Connection))
(incf *request-id-seq*))
;;(defvar *desc-conn* (make-hash-table))
(defun make-associated-connection (orb desc &key server-p)
(let* ((conn (make-instance 'Connection :orb orb :io-descriptor desc
:server-p server-p)))
(io-descriptor-associate-connection desc conn)
conn))
(defun connection-destroy (conn)
(with-synchronization conn
(when-let (desc (connection-io-descriptor conn))
(io-descriptor-destroy desc t))))
(defun create-connection (orb host port)
(let ((desc (io-create-descriptor)))
(handler-case
(progn (io-descriptor-connect desc host port)
(make-associated-connection orb desc))
(error (err)
(mess 4 "(connect ~S ~S): ~A" host port err)
(setf (io-descriptor-error desc) err)
(setf (io-descriptor-status desc) :broken)
(io-descriptor-destroy desc)
nil))))
(defun connection-working-p (conn)
(when-let (desc (connection-io-descriptor conn))
(io-descriptor-working-p desc)))
(defun connection-init-read (conn continue-p n callback)
(setf (connection-read-callback conn) callback)
(let ((desc (connection-io-descriptor conn)))
(let* ((buffer (if continue-p
(read-buffer-of conn)
(get-work-buffer (the-orb conn))))
(octets (buffer-octets buffer))
(start (fill-pointer octets)))
(unless continue-p
(setf (read-buffer-of conn) buffer))
(when (< (array-total-size octets) n)
(adjust-array octets n))
(setf (fill-pointer octets) n)
(io-descriptor-set-read desc octets start n))))
(defun connection-shutdown (conn)
(let ((status nil) (action nil))
(with-synchronization conn
(case (setq status (shutdown-status conn))
((nil)
(setf action :send)
(setq status :sent)
(setf (shutdown-status conn) status))
((:sent) nil)
((:closed)
(setq action :destroy)
(setf status :destroyed))))
(case action
((:send)
(connection-send-buffer conn (server-close-connection-msg conn)))
((:destroy)
(connection-destroy conn)))
status))
(defun %add-client-request (conn request)
(push request (connection-client-requests conn)))
(defun connection-send-buffer (conn buffer &optional request)
(flet ((write-free-p (conn)
(not (write-buffer-of conn))))
(loop
(with-synchronization conn
(when (write-free-p conn)
(setf (write-buffer-of conn) buffer
(write-request-of conn) request)
(when request
(%add-client-request conn request))
(return)))
(orb-condition-wait conn #'write-free-p conn))
(let ((desc (connection-io-descriptor conn))
(octets (buffer-octets buffer)))
(cond ((io-descriptor-set-write desc octets 0 (length octets))
(connection-write-ready conn))))))
(defun connection-send-request (conn buffer req)
(connection-send-buffer conn buffer req))
(defun find-waiting-client-request (conn request-id)
(with-synchronization conn
(let ((req-list (connection-client-requests conn)))
(let ((req (find request-id req-list :key #'request-id)))
(if req
(setf (connection-client-requests conn) (delete req req-list))
(mess 4 "Unexpected response with request id ~d" request-id))
req))))
(defun connection-receive-reply (conn request-id buffer status service-context)
(let ((req (find-waiting-client-request conn request-id)))
(when req
(request-reply req status buffer service-context))))
(defun connection-receive-locate-reply (conn request-id buffer status)
(let ((req (find-waiting-client-request conn request-id)))
(when req
(request-locate-reply req status buffer))))
(defun connection-add-server-request (conn request)
(with-synchronization conn
(push request (connection-server-requests conn))))
(defun connection-remove-server-request (conn request)
(with-synchronization conn
(setf (connection-server-requests conn)
(delete request (connection-server-requests conn)))))
(defun connection-add-fragment (conn buffer header-size)
(with-accessors ((fragment-buffer fragment-buffer)) conn
(when fragment-buffer
(setf (buffer-octets buffer)
(concatenate 'octets
(buffer-octets fragment-buffer)
(subseq (buffer-octets buffer) header-size))))
(setf fragment-buffer buffer)))
;;;; Cleaning and Garbing Connections
(defun gc-connections (&optional except n)
(dolist (desc (io-descriptions-of *io-system*))
(let ((conn (io-descriptor-connection desc)))
(unless (or (null conn) (member desc except)
(io-descriptor-shortcut-p desc)
(write-buffer-of conn)
(connection-client-requests conn)
(connection-server-requests conn))
(if (activity conn)
(setf (activity conn) nil)
(progn
(if (server-p conn)
(connection-shutdown conn)
(io-descriptor-destroy desc t))
(if (numberp n)
(if (< (decf n) 1)
(return)))))))))
(defun auto-gc-read-handler (q desc)
;;(declare (ignore q))
;; max-processes handler for read queue
;; Does a gc-connections and then allow new process any way!
;;
(ignore-errors
(gc-connections (list desc) 1))
(unless (< (process-count q) (max-processes q))
(ignore-errors
(gc-connections (list desc) 1)))
t)
;;(setf (max-handler (read-queue *io-system*)) 'auto-gc-read-handler)
(setq *io-mt-read-queue-garb* 'auto-gc-read-handler)
;;;; Connection events
(defun connection-error (conn)
;; Called when there is IO error
(with-synchronization conn
(dolist (req (connection-client-requests conn))
(request-reply-exception req :error
(system-exception 'CORBA:COMM_FAILURE)))
(setf (connection-client-requests conn) nil)))
(defun connection-close (conn)
;; Called on recipt of a connection close message
(with-synchronization conn
(io-descriptor-close (connection-io-descriptor conn))
;; The server should not have started on any of the outstanding requests.
(dolist (req (connection-client-requests conn))
(request-reply-exception req :error
(system-exception 'CORBA:TRANSIENT 3 :completed_no)))
(setf (connection-client-requests conn) nil)))
(defun connection-read-ready (conn)
(funcall (connection-read-callback conn) conn))
(defmethod connection-write-ready ((conn connection))
(with-synchronization conn
(setf (activity conn) t)
(incf (write-count-of conn))
(setf (write-buffer-of conn) nil
(write-request-of conn) nil)
(when (eql (shutdown-status conn) :sent)
(io-descriptor-shutdown (connection-io-descriptor conn))
(setf (shutdown-status conn) :closed))
(synch-notify conn)))
(defun connection-no-write (conn)
(with-synchronization conn
(when-let (req (write-request-of conn))
(when (connection-client-requests conn)
;; if (connection-client-requests conn) is nil then there must
;; have been a read error
(request-no-write req)
(deletef req (connection-client-requests conn))))
(setf (write-buffer-of conn) nil
(write-request-of conn) nil)
(synch-notify conn)))
(defun connection-init-defragmentation (conn handler)
(cond ((assembled-handler conn)
(mess 5 "Fragment overrun")
(connection-error conn)
nil)
(t
(setf (assembled-handler conn) handler)
(setf (fragment-buffer conn) nil)
t)))
;;;; Event loop (orb-wait)
(defvar *new-connection-callback*
(lambda (desc)
(io-descriptor-destroy desc)))
(defvar *running-orb* t
"Will be set to true in the process that is running the ORB server part.
If this is true, orb-wait will check server streams also.
Can be set to true globally for singel-process / development.")
(defun orb-wait (wait-func &rest wait-args)
(if *running-orb*
(loop until (apply wait-func wait-args) do (orb-work *the-orb* t nil))
(apply #'process-wait "orb-wait" wait-func wait-args)))
(defun orb-condition-wait (obj wait-func &rest wait-args)
"Wait for (wait-func . wait-args) to be true.
Use the syncronization of obj, expect obj not to be locked when called.
Obj should be synch-notify when the condition is possibly changed."
(if *running-orb*
;; Also handle server stuff
(loop until (apply wait-func wait-args) do (orb-work *the-orb* t nil))
;; No server stuff
(apply #'synch-wait-on-condition obj wait-func wait-args)))
(defun orb-run-queue (orb)
(loop while (work-queue orb)
do (funcall (pop (work-queue orb)))))
(defun process-event (event)
(let* ((desc (second event))
(conn (io-descriptor-connection desc)))
(mess 1 "io-event: ~S ~A ~A" (car event) (io-descriptor-stream desc) conn)
(case (car event)
(:read-ready
(when conn (connection-read-ready conn)))
(:write-ready
(io-descriptor-set-write desc nil 0 0)
(when conn (connection-write-ready conn)))
(:no-write
(io-descriptor-set-write desc nil 0 0)
(when conn (connection-no-write conn)))
(:new
(funcall *new-connection-callback* desc))
(:connected
;; Not implemented yet..; for outgoing connections setup
nil)
(:error
(mess 4 "Error: ~A" (io-descriptor-error desc))
(if conn
(connection-error conn)
(io-descriptor-destroy desc))))))
(defun orb-work (orb run-queue poll)
(when run-queue
(orb-run-queue orb))
(when poll
(io-driver poll))
(loop for event = (io-get-event poll)
while event
do (process-event event)
(setq poll t)))