-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathdocument_server.go
169 lines (142 loc) · 5.29 KB
/
document_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package editor
import (
"log"
"sync"
"cms.csesoc.unsw.edu.au/editor/OT/operations"
"cms.csesoc.unsw.edu.au/pkg/cmsjson"
"github.com/google/uuid"
)
type documentServer struct {
// todo: change to whatever data structure is being used for
// todo: stop the clientView map from growing too large using some compaction
// strategy or a more appropriate ds
// state management
ID uuid.UUID
state cmsjson.AstNode
stateLock sync.Mutex
clients map[int]*clientState
clientsLock sync.Mutex
operationHistory []operations.Operation
}
type clientState struct {
*clientView
canSendOps bool
}
// todo: newDocumentServer should take an initial state
func newDocumentServer() *documentServer {
// ideally state shouldn't be a string due to its immutability
// any update requires the allocation + copy of a new string in memory
return &documentServer{
state: nil,
stateLock: sync.Mutex{},
clients: make(map[int]*clientState),
clientsLock: sync.Mutex{},
}
}
// a pipe is a closure that the clientView can use to communicate
// with the server, it wraps its internal clientView ID for security reasons
type pipe = func(op operations.Operation)
// alertLeaving is like a pipe except a client uses it to tell a document
// that it is leaving
type alertLeaving = func()
// connectClient connects a clientView to a documentServer and returns a one way pipe
// it can use for communication with the documentServer
// TODO: synchronise this properly
func (s *documentServer) connectClient(c *clientView) (pipe, alertLeaving) {
// register this clientView
s.clientsLock.Lock()
clientID := len(s.clients)
s.clients[clientID] = &clientState{
clientView: c,
canSendOps: true,
}
s.clientsLock.Unlock()
// we need to create a new worker for this clientView too
workerHandle := make(chan func())
killHandle := make(chan empty)
go createAndStartWorker(workerHandle, killHandle)
// finally build a comm pipe for this clientView
return s.buildClientPipe(clientID, workerHandle, killHandle), s.buildAlertLeavingSignal(clientID, killHandle)
}
// disconnectClient removes a client from a document server
func (s *documentServer) disconnectClient(clientID int) {
s.clientsLock.Lock()
if _, ok := s.clients[clientID]; !ok {
panic("Trying to disconnect non-existent client")
}
delete(s.clients, clientID)
s.clientsLock.Unlock()
// if we have no more connected clients it may be time to terminate ourselves
GetDocumentServerFactoryInstance().closeDocumentServer(s.ID)
}
// buildClientPipe is a function that returns the "pipe" for a clientView
// this pipe contains all the necessary code that the clientView needs to communicate with the documentServer
// when the clientView wishes to send data to the documentServer they simply just call this pipe with the operation
func (s *documentServer) buildClientPipe(clientID int, workerWorkHandle chan func(), workerKillHandle chan empty) func(operations.Operation) {
return func(op operations.Operation) {
// this could also just be captured from the outer func
clientState := s.clients[clientID]
thisClient := clientState.clientView
if !clientState.canSendOps {
// terminate this clientView
// this is the only thing we can do in order to enforce
// consistency across all clients
s.disconnectClient(clientID)
go func() { clientState.sendTerminateSignal <- empty{} }()
go func() { workerKillHandle <- empty{} }()
return
}
// to deal with this incoming operation we need to push
// data to the worker assigned to this clientView
workerWorkHandle <- func() {
defer func() {
clientState.canSendOps = true
thisClient.sendAcknowledgement <- empty{}
}()
clientState.canSendOps = false
// apply op to clientView states
s.stateLock.Lock()
// apply the operation locally and log the new operation
transformedOperation := s.transformOperation(op)
s.operationHistory = append(s.operationHistory, transformedOperation)
if !transformedOperation.IsNoOp {
newState, err := op.ApplyTo(s.state)
if err != nil {
log.Fatal(err)
clientState.sendTerminateSignal <- empty{}
} else {
s.state = newState
}
}
s.stateLock.Unlock()
// propagate updates to all connected clients except this one
// if we send it to this clientView then we may deadlock the server and clientView
s.clientsLock.Lock()
for id, connectedClient := range s.clients {
if id == clientID {
continue
}
// push update
connectedClient.sendOp <- transformedOperation
}
s.clientsLock.Unlock()
}
}
}
// transformOperation transforms an incoming client operation against the history of applied server operations
// note: the baseOpIndex indicates what operation to start applying against
func (s *documentServer) transformOperation(incomingOp operations.Operation) operations.Operation {
for _, op := range s.operationHistory[incomingOp.AcknowledgedServerOps:] {
_, incomingOp = operations.TransformPipeline(incomingOp, op)
}
return incomingOp
}
// buildAlertLeavingSignal builds a leaving signal for the client view
// to use when it wants to tell the document server that it is leaving
func (s *documentServer) buildAlertLeavingSignal(clientID int, workerKillHandle chan empty) func() {
// go doesn't have currying :(
return func() {
workerKillHandle <- empty{}
s.disconnectClient(clientID)
}
}