-
Notifications
You must be signed in to change notification settings - Fork 18
/
ipython-services.rkt
137 lines (117 loc) · 4.43 KB
/
ipython-services.rkt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#lang racket/base
(require racket/string
racket/match
racket/contract
(for-syntax racket/base)
net/zmq
"./ipython-message.rkt"
"./ipython.rkt")
(provide call-with-services
receive-request
send-response
make-stream-port
send-exec-result
send-status
(struct-out services))
(define-struct/contract services
([heartbeat thread?]
[shell thread?]
[control thread?]
[iopub thread?])
#:transparent)
(define (call-with-services cfg action)
(call-with-context
(λ (ctx)
(define worker (current-thread))
(define (serve port port-type thunk)
(serve-socket/thread ctx cfg (port cfg) port-type worker thunk))
(define services
(make-services
(serve config-hb-port 'REP heartbeat)
(serve config-shell-port 'ROUTER shell)
(serve config-control-port 'ROUTER control)
(serve config-iopub-port 'PUB iopub)))
(begin0
(action services)
(kill-services services)))))
(define (receive-request services)
(define msg (thread-receive))
(define respond-to (thread-receive))
(values msg respond-to))
(define (send-response services respond-to response)
(thread-send respond-to response))
(define (send-exec-result msg services execution-count data)
(thread-send (services-iopub services)
(make-response msg (hasheq 'execution_count execution-count
'data data
'metadata (hasheq))
#:msg-type 'execute_result)))
(define/contract (send-status services parent-header status)
(services? header? (symbols 'idle 'busy) . -> . void?)
(define iopub (services-iopub services))
(define header (make-response-header parent-header #:msg-type 'status))
(define msg (make-message header (hasheq 'execution_state (symbol->string status))))
(thread-send iopub msg))
(define/contract (make-stream-port services name orig-msg)
(services? (symbols 'stdout 'stderr) message? . -> . output-port?)
(define iopub (services-iopub services))
(define-values (port-name stream-name)
(case name
[(stdout) (values "pyout" "stdout")]
[(stderr) (values "pyerr" "stderr")]))
(define (send-stream str)
(thread-send iopub (make-response orig-msg (hasheq 'name stream-name
'text str)
#:msg-type 'stream)))
(make-output-port
port-name
iopub
(λ (bstr start end enable-buffer? enable-break?)
(send-stream (bytes->string/utf-8 (subbytes bstr start end)))
(- end start))
void))
;; implements the ipython heartbeat protocol
(define (heartbeat socket _worker)
(let loop ()
(define msg (socket-recv! socket))
(socket-send! socket msg)
(loop)))
;; implements shell and control protocol
(define (shell-like who socket worker)
(let loop ()
(define msg (receive-message! socket))
(thread-send worker msg)
(thread-send worker (current-thread))
(define response (thread-receive))
(send-message! socket response)
(loop)))
(define (shell socket worker) (shell-like 'shell socket worker))
(define (control socket worker) (shell-like 'control socket worker))
(define (iopub socket worker)
(let loop ()
(define msg (thread-receive))
(send-message! socket msg)
(loop)))
(define (serve-socket ctx endpoint socket-type action)
(call-with-socket ctx socket-type
(lambda (socket)
(socket-bind! socket endpoint)
(action socket))))
(define (serve-socket/thread ctx cfg port socket-type worker action)
(define transport (config-transport cfg))
(define ip (config-ip cfg))
(define endpoint (format "~a://~a:~a" transport ip port))
(thread
(λ () (serve-socket ctx endpoint socket-type
(λ (socket) (action socket worker))))))
(define (ipython-serve cfg ctx worker)
(make-services
(serve-socket/thread ctx cfg (config-hb-port cfg) 'REP worker heartbeat)
(serve-socket/thread ctx cfg (config-shell-port cfg) 'ROUTER worker shell)
(serve-socket/thread ctx cfg (config-control-port cfg) 'ROUTER worker control)
(serve-socket/thread ctx cfg (config-iopub-port cfg) 'PUB worker iopub)))
(define (kill-services services)
(kill-thread (services-shell services))
(kill-thread (services-control services))
(kill-thread (services-iopub services))
(kill-thread (services-heartbeat services)))