-
Notifications
You must be signed in to change notification settings - Fork 502
/
Copy pathstellar_core_runner.go
247 lines (200 loc) · 6.42 KB
/
stellar_core_runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
package ledgerbackend
import (
"context"
"fmt"
"io"
"math/rand"
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/stellar/go/support/log"
)
type stellarCoreRunnerInterface interface {
catchup(from, to uint32) error
runFrom(from uint32, hash string) error
getMetaPipe() (<-chan metaResult, bool)
context() context.Context
getProcessExitError() (error, bool)
close() error
}
type stellarCoreRunnerMode int
const (
_ stellarCoreRunnerMode = iota // unset
stellarCoreRunnerModeOnline
stellarCoreRunnerModeOffline
)
// stellarCoreRunner uses a named pipe ( https://en.wikipedia.org/wiki/Named_pipe ) to stream ledgers directly
// from Stellar Core
type pipe struct {
// stellarCoreRunner will be reading ledgers emitted by Stellar Core from the pipe.
// After the Stellar Core process exits, stellarCoreRunner should eventually close the reader.
Reader io.ReadCloser
// stellarCoreRunner is responsible for closing the named pipe file after the Stellar Core process exits.
// However, only the Stellar Core process will be writing to the pipe. stellarCoreRunner should not
// write anything to the named pipe file which is why the type of File is io.Closer.
File io.Closer
}
type executionState struct {
cmd cmdI
workingDir workingDir
ledgerBuffer *bufferedLedgerMetaReader
pipe pipe
wg sync.WaitGroup
processExitedLock sync.RWMutex
processExited bool
processExitError error
log *log.Entry
}
type stellarCoreRunner struct {
executablePath string
ctx context.Context
cancel context.CancelFunc
systemCaller systemCaller
stateLock sync.Mutex
state *executionState
closeOnce sync.Once
storagePath string
toml *CaptiveCoreToml
useDB bool
captiveCoreNewDBCounter prometheus.Counter
log *log.Entry
}
func createRandomHexString(n int) string {
hex := []rune("abcdef1234567890")
b := make([]rune, n)
for i := range b {
b[i] = hex[rand.Intn(len(hex))]
}
return string(b)
}
func newStellarCoreRunner(config CaptiveCoreConfig, captiveCoreNewDBCounter prometheus.Counter) *stellarCoreRunner {
ctx, cancel := context.WithCancel(config.Context)
runner := &stellarCoreRunner{
executablePath: config.BinaryPath,
ctx: ctx,
cancel: cancel,
storagePath: config.StoragePath,
useDB: config.UseDB,
log: config.Log,
toml: config.Toml,
captiveCoreNewDBCounter: captiveCoreNewDBCounter,
systemCaller: realSystemCaller{},
}
return runner
}
// context returns the context.Context instance associated with the running captive core instance
func (r *stellarCoreRunner) context() context.Context {
return r.ctx
}
// runFrom executes the run command with a starting ledger on the captive core subprocess
func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
return r.startMetaStream(newRunFromStream(r, from, hash, r.captiveCoreNewDBCounter))
}
// catchup executes the catchup command on the captive core subprocess
func (r *stellarCoreRunner) catchup(from, to uint32) error {
return r.startMetaStream(newCatchupStream(r, from, to))
}
type metaStream interface {
getWorkingDir() workingDir
start(ctx context.Context) (cmdI, pipe, error)
}
func (r *stellarCoreRunner) startMetaStream(stream metaStream) error {
r.stateLock.Lock()
defer r.stateLock.Unlock()
// check if we have already been closed
if r.ctx.Err() != nil {
return r.ctx.Err()
}
if r.state != nil {
return fmt.Errorf("runner already started")
}
state := &executionState{
workingDir: stream.getWorkingDir(),
log: r.log,
}
cmd, p, err := stream.start(r.ctx)
if err != nil {
state.workingDir.cleanup(nil)
return err
}
state.cmd = cmd
state.pipe = p
state.ledgerBuffer = newBufferedLedgerMetaReader(state.pipe.Reader)
go state.ledgerBuffer.start()
if binaryWatcher, err := newFileWatcher(r); err != nil {
r.log.Warnf("could not create captive core binary watcher: %v", err)
} else {
go binaryWatcher.loop()
}
state.wg.Add(1)
go state.handleExit()
r.state = state
return nil
}
func (r *stellarCoreRunner) getExecutionState() *executionState {
r.stateLock.Lock()
defer r.stateLock.Unlock()
return r.state
}
func (state *executionState) handleExit() {
defer state.wg.Done()
waitErr := state.cmd.Wait()
// By closing the pipe file we will send an EOF to the pipe reader used by ledgerBuffer.
if err := state.pipe.File.Close(); err != nil {
state.log.WithError(err).Warn("could not close captive core write pipe")
}
state.processExitedLock.Lock()
defer state.processExitedLock.Unlock()
state.processExited = true
state.processExitError = waitErr
}
func (state *executionState) getProcessExitError() (error, bool) {
state.processExitedLock.RLock()
defer state.processExitedLock.RUnlock()
return state.processExitError, state.processExited
}
func (state *executionState) cleanup() error {
// wait for the stellar core process to terminate
state.wg.Wait()
// drain meta pipe channel to make sure the ledger buffer goroutine exits
for range state.ledgerBuffer.getChannel() {
}
// now it's safe to close the pipe reader
// because the ledger buffer is no longer reading from it
if err := state.pipe.Reader.Close(); err != nil {
state.log.WithError(err).Warn("could not close captive core read pipe")
}
processExitError, _ := state.getProcessExitError()
return state.workingDir.cleanup(processExitError)
}
// getMetaPipe returns a channel which contains ledgers streamed from the captive core subprocess
func (r *stellarCoreRunner) getMetaPipe() (<-chan metaResult, bool) {
state := r.getExecutionState()
if state == nil {
return nil, false
}
return state.ledgerBuffer.getChannel(), true
}
// getProcessExitError returns an exit error (can be nil) of the process and a bool indicating
// if the process has exited yet
// getProcessExitError is thread safe
func (r *stellarCoreRunner) getProcessExitError() (error, bool) {
state := r.getExecutionState()
if state == nil {
return nil, false
}
return state.getProcessExitError()
}
// close kills the captive core process if it is still running and performs
// the necessary cleanup on the resources associated with the captive core process
// close is both thread safe and idempotent
func (r *stellarCoreRunner) close() error {
var closeError error
r.closeOnce.Do(func() {
r.cancel()
state := r.getExecutionState()
if state != nil {
closeError = state.cleanup()
}
})
return closeError
}