Skip to content

Commit 29b2da1

Browse files
committed
parallelize device backups
1 parent 3164461 commit 29b2da1

File tree

5 files changed

+50
-52
lines changed

5 files changed

+50
-52
lines changed

cmd/backup.go

+18-10
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/spf13/cobra"
1414
"log"
1515
"net"
16+
"sync"
1617
)
1718

1819
func init() {
@@ -40,8 +41,7 @@ func initAuthProviderPool(providersCfg map[string]auth_providers.AuthProviderCon
4041
cfg := providerCfg.(*auth_providers.StaticAuthProviderConfig)
4142
static_provider := auth.NewStaticProvider()
4243
for path, auth := range cfg.Auths {
43-
err = static_provider.AddAuth(path, auth.Username, auth.Password, auth.Attributes)
44-
if err != nil {
44+
if err = static_provider.AddAuth(path, auth.Username, auth.Password, auth.Attributes); err != nil {
4545
return nil, errors.Errorf("Unable to add Auth(%s) to StaticAuthProvider(%s): %s", path, providerName, err)
4646
}
4747
}
@@ -59,8 +59,7 @@ func initAuthProviderPool(providersCfg map[string]auth_providers.AuthProviderCon
5959
default:
6060
return nil, errors.Errorf("Unsupported AuthProvider type (%s)", providerCfg.Type())
6161
}
62-
err = pool.RegisterProvider(providerName, provider)
63-
if err != nil {
62+
if err = pool.RegisterProvider(providerName, provider); err != nil {
6463
return nil, errors.Errorf("Unable to register provider '%s': %s", providerName, err)
6564
}
6665
}
@@ -100,28 +99,37 @@ func backupMain(cmd *cobra.Command, args []string) {
10099
log.Fatalln("Error initializing device classes:", err)
101100
}
102101

103-
devices, err := devices.LoadDevices(cfg.DeviceGroups, deviceClasses, authProviderPool)
102+
_devices, err := devices.LoadDevices(cfg.DeviceGroups, deviceClasses, authProviderPool)
104103
if err != nil {
105104
log.Fatalln("Error initializing devices:", err)
106105
}
107106

108-
deviceList := filterDevices(devices, deviceFilter)
107+
deviceList := filterDevices(_devices, deviceFilter)
109108
if len(deviceList) == 0 {
110109
log.Fatalln("No devices mached the given filter")
111110
}
112111

113112
tftpReceiver := device_processor.NewTFTPReceiver(hostIP)
114113
tftpReceiver.Run()
115114

115+
var wg sync.WaitGroup
116+
116117
for _, device := range deviceList {
117118
p := device_processor.NewDeviceProcessor(device, authProviderPool, cfg.Preferences.BackupDir)
118119

119-
err := p.Process(tftpReceiver)
120-
if err != nil {
121-
log.Printf("Device Processing Error '%s': %s", device.Name, err)
122-
}
120+
wg.Add(1)
121+
122+
go func(d *devices.Device) {
123+
defer wg.Done()
124+
125+
if err := p.Process(tftpReceiver); err != nil {
126+
log.Printf("Device Processing Error '%s': %s", d.Name, err)
127+
}
128+
}(device)
123129
}
124130

131+
wg.Wait()
132+
125133
tftpReceiver.Stop()
126134

127135
}

device_processor/device_processor.go

+25-39
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (t *DeviceProcessor) connect() (*ssh.Client, error) {
6161

6262
client, err := ssh.Dial("tcp", t.device.Address, sshClientConfig)
6363
if err != nil {
64-
return nil, errors.Errorf("Failed to connect to Device(%s): %s", t.device.Name, err)
64+
return nil, err
6565
}
6666

6767
return client, nil
@@ -90,13 +90,11 @@ func (t *DeviceProcessor) startShell(client *ssh.Client) (*ssh.Session, io.Write
9090
ssh.TTY_OP_OSPEED: 38400, // output speed = 14.4kbaud
9191
}
9292

93-
err = session.RequestPty("xterm", 0, 200, modes)
94-
if err != nil {
93+
if err = session.RequestPty("xterm", 0, 200, modes); err != nil {
9594
return nil, nil, nil, errors.Errorf("Request for pty failed: %s", err)
9695
}
9796

98-
err = session.Shell()
99-
if err != nil {
97+
if err = session.Shell(); err != nil {
10098
return nil, nil, nil, errors.Errorf("Request for shell failed: %s", err)
10199
}
102100

@@ -149,8 +147,7 @@ func (t *DeviceProcessor) initVM(stdIn io.WriteCloser, stdOut io.Reader, ctx vmC
149147
if err = vm.Set("_ctxRaw", ctxRaw); err != nil {
150148
return nil, errors.Errorf("Failed to set _ctxRaw variable: %s", err)
151149
}
152-
_, err = vm.Run(`ctx = JSON.parse(_ctxRaw);`)
153-
if err != nil {
150+
if _, err = vm.Run(`ctx = JSON.parse(_ctxRaw);`); err != nil {
154151
return nil, errors.Errorf("Failed to unserialize context variable: %s", err)
155152
}
156153

@@ -159,20 +156,18 @@ func (t *DeviceProcessor) initVM(stdIn io.WriteCloser, stdOut io.Reader, ctx vmC
159156

160157
func (t *DeviceProcessor) saveFile(backupTarget *devices.DeviceClassTarget, file *ReceivedFile) error {
161158

162-
dstPath := path.Join(t.configDir, t.device.Name, fmt.Sprintf("%s.conf", backupTarget.Name))
163-
164-
dirPath, _ := path.Split(dstPath)
159+
dirPath := path.Join(t.configDir, t.device.Name)
165160

166161
// Create the parent directory structure if needed
167162
if _, err := os.Stat(dirPath); os.IsNotExist(err) {
168-
err = os.MkdirAll(dirPath, os.ModePerm)
169-
if err != nil {
170-
return errors.Errorf("Parent directory '%s' doesn't exist and an error occurred while trying to create it: %s", dirPath, err)
163+
if err := os.MkdirAll(dirPath, os.ModePerm); err != nil {
164+
return errors.Errorf("Unable to create directory '%s': %s", dirPath, err)
171165
}
172166
}
173167

174-
err := ioutil.WriteFile(dstPath, file.Data.Bytes(), 0644)
175-
if err != nil {
168+
dstPath := path.Join(dirPath, fmt.Sprintf("%s.conf", backupTarget.Name))
169+
170+
if err := ioutil.WriteFile(dstPath, file.Data.Bytes(), 0644); err != nil {
176171
return errors.Errorf("Unable to write to file '%s': %s", dstPath, err)
177172
}
178173

@@ -183,8 +178,7 @@ func (t *DeviceProcessor) Process(reciever *TFTPReceiver) error {
183178
for target_name, _ := range t.device.Class.Targets {
184179
log.Printf("Processing backup target '%s':'%s'", t.device.Name, target_name)
185180

186-
err := t.ProcessTarget(target_name, reciever)
187-
if err != nil {
181+
if err := t.ProcessTarget(target_name, reciever); err != nil {
188182
return errors.Errorf("target %s: %s", target_name, err)
189183
}
190184
}
@@ -230,8 +224,7 @@ func (t *DeviceProcessor) ProcessTarget(target_name string, reciever *TFTPReceiv
230224
return errors.Errorf("Failed to init JavaScript VM: %s", err)
231225
}
232226

233-
_, err = vm.Run(backupTarget.Macro)
234-
if err != nil {
227+
if _, err := vm.Run(backupTarget.Macro); err != nil {
235228
return errors.Errorf("JavaScript VM Runtime Error: %s", err)
236229
}
237230

@@ -240,7 +233,7 @@ func (t *DeviceProcessor) ProcessTarget(target_name string, reciever *TFTPReceiv
240233
// Wait for a maximum of 60 seconds for the file on the receive channel
241234
select {
242235
case err = <-reciever.GetErrorChannel():
243-
log.Printf("TFTP Receiver error: %s\n", err)
236+
log.Fatalln("TFTP Receiver error:", err)
244237
case recvdFile = <-recvChan:
245238
case <-time.After(60 * time.Second):
246239
return errors.Errorf("Timed out waiting to receive file over TFTP")
@@ -251,36 +244,32 @@ func (t *DeviceProcessor) ProcessTarget(target_name string, reciever *TFTPReceiv
251244
return err
252245
}
253246

247+
log.Printf("Completed backup target: '%s':'%s'\n", t.device.Name, backupTarget.Name)
248+
254249
return nil
255250
}
256251

257252
func ottoExpect(vm *otto.Otto, expect *gexpect.ExpectIO) error {
258253

259-
var err error
260-
261-
err = vm.Set("dbgDump", func(call otto.FunctionCall) otto.Value {
254+
if err := vm.Set("dbgDump", func(call otto.FunctionCall) otto.Value {
262255

263256
v, _ := call.Argument(0).Export()
264257
fmt.Printf(">>> dbgDump >>>:\n%v<<< dbgDump <<<\n", spew.Sdump(v))
265258

266259
return otto.Value{}
267-
})
268-
if err != nil {
260+
}); err != nil {
269261
return err
270262
}
271263

272-
err = vm.Set("dbgLog", func(call otto.FunctionCall) otto.Value {
273-
264+
if err := vm.Set("dbgLog", func(call otto.FunctionCall) otto.Value {
274265
fmt.Printf("dbgLog: %s\n", call.Argument(0).String())
275-
276266
return otto.Value{}
277-
})
278-
if err != nil {
267+
}); err != nil {
279268
return err
280269
}
281270

282271
// function expect(val string) string {}
283-
err = vm.Set("expect", func(call otto.FunctionCall) otto.Value {
272+
if err := vm.Set("expect", func(call otto.FunctionCall) otto.Value {
284273

285274
// TODO: Make timeout configurable
286275
err := expect.ExpectTimeout(call.Argument(0).String(), 15*time.Second)
@@ -290,12 +279,11 @@ func ottoExpect(vm *otto.Otto, expect *gexpect.ExpectIO) error {
290279
}
291280

292281
return otto.Value{}
293-
})
294-
if err != nil {
282+
}); err != nil {
295283
return err
296284
}
297285

298-
err = vm.Set("readLine", func(call otto.FunctionCall) otto.Value {
286+
if err := vm.Set("readLine", func(call otto.FunctionCall) otto.Value {
299287

300288
line, err := expect.ReadLine()
301289
if err != nil {
@@ -308,21 +296,19 @@ func ottoExpect(vm *otto.Otto, expect *gexpect.ExpectIO) error {
308296
}
309297

310298
return v
311-
})
312-
if err != nil {
299+
}); err != nil {
313300
return err
314301
}
315302

316-
err = vm.Set("sendLine", func(call otto.FunctionCall) otto.Value {
303+
if err := vm.Set("sendLine", func(call otto.FunctionCall) otto.Value {
317304

318305
err := expect.SendLine(call.Argument(0).String())
319306
if err != nil {
320307
panic(vm.MakeCustomError("ExpectError", err.Error()))
321308
}
322309

323310
return otto.Value{}
324-
})
325-
if err != nil {
311+
}); err != nil {
326312
return err
327313
}
328314

device_processor/tftp_receiver.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (r *TFTPReceiver) Run() {
5454
r.server = tftp.NewServer(nil, r.tftpRecvHandler)
5555
r.server.SetTimeout(5 * time.Second)
5656

57-
log.Print("Starting TFTP Server...")
57+
log.Println("Starting TFTP Server...")
5858

5959
go func(server *tftp.Server, errChannel chan error) {
6060
err := server.ListenAndServe(":69") // blocks until s.Shutdown() is called
@@ -69,7 +69,7 @@ func (r *TFTPReceiver) Stop() {
6969
}
7070

7171
func (r *TFTPReceiver) tftpRecvHandler(filename string, wt io.WriterTo) error {
72-
log.Print("Recieving File on TFTP Server...")
72+
//log.Println("Recieving File on TFTP Server...")
7373

7474
var destChan chan ReceivedFile
7575
found := false
@@ -104,6 +104,7 @@ func (r *TFTPReceiver) tftpRecvHandler(filename string, wt io.WriterTo) error {
104104
Data: buf,
105105
}
106106

107-
log.Printf("TFTP: %d bytes received\n", n)
107+
_ = n
108+
//log.Printf("TFTP: %d bytes received\n", n)
108109
return nil
109110
}

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/hashicorp/go-multierror v1.0.0
88
github.com/hashicorp/go-uuid v1.0.1
99
github.com/hashicorp/hcl v1.0.0
10+
github.com/inconshreveable/mousetrap v1.0.0 // indirect
1011
github.com/mattn/go-colorable v0.1.0 // indirect
1112
github.com/mattn/go-isatty v0.0.4 // indirect
1213
github.com/mitchellh/cli v1.0.0

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
2222
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
2323
github.com/howeyc/gopass v0.0.0-20170109162249-bf9dde6d0d2c h1:kQWxfPIHVLbgLzphqk3QUflDy9QdksZR4ygR807bpy0=
2424
github.com/howeyc/gopass v0.0.0-20170109162249-bf9dde6d0d2c/go.mod h1:lADxMC39cJJqL93Duh1xhAs4I2Zs8mKS89XWXFGp9cs=
25+
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
26+
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
2527
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
2628
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
2729
github.com/kr/pty v1.1.3 h1:/Um6a/ZmD5tF7peoOJ5oN5KMQ0DrGVQSXLNwyckutPk=

0 commit comments

Comments
 (0)