Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[amtool] - Add a new silence import command #1082

Merged
merged 2 commits into from
Dec 7, 2017
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 30 additions & 27 deletions cli/silence_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/prometheus/alertmanager/types"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
"sync"
)

var importFlags *flag.FlagSet
Expand All @@ -35,8 +36,8 @@ func init() {
importFlags = importCmd.Flags()
}

func addSilenceWorker(silences <-chan *types.Silence, errs chan<- error) {
for s := range silences {
func addSilenceWorker(silencec <-chan *types.Silence, errc chan<- error) {
for s := range silencec {
silenceId, err := addSilence(s)
sid := s.ID
if err != nil && err.Error() == "[bad_data] not found" {
Expand All @@ -50,7 +51,7 @@ func addSilenceWorker(silences <-chan *types.Silence, errs chan<- error) {
} else {
fmt.Println(silenceId)
}
errs <- err
errc <- err
}
}

Expand All @@ -60,6 +61,11 @@ func bulkImport(cmd *cobra.Command, args []string) error {
return err
}

workers, err := importFlags.GetInt("worker")
if err != nil {
return err
}

input := os.Stdin
if len(args) == 1 {
input, err = os.Open(args[0])
Copy link
Contributor

@josedonizetti josedonizetti Nov 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to close the opened fd. defer input.Close()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be added after the err is checked below

Expand All @@ -70,23 +76,32 @@ func bulkImport(cmd *cobra.Command, args []string) error {
}

dec := json.NewDecoder(input)

// read open square bracket
_, err = dec.Token()
if err != nil {
return errors.Wrap(err, "couldn't unmarshal input data, is it JSON?")
}

silences := make(chan *types.Silence, 100)
errs := make(chan error, 100)
workers, err := importFlags.GetInt("worker")
if err != nil {
return err
}
silencec := make(chan *types.Silence, 100)
errc := make(chan error, 100)
var wg sync.WaitGroup
for w := 0; w < workers; w++ {
go addSilenceWorker(silences, errs)
go func() {
wg.Add(1)
addSilenceWorker(silencec, errc)
wg.Done()
}()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These three lines can be updated to help prevent a deadlock:

var wg sync.WaitGroup
for w := 0; w < workers; w++ {
	go func(w int) {
		wg.Add(1)
		addSilenceWorker(silc, errc)
		wg.Done()
	}(w)
}

go func() {
	for err := range errc {
		if err != nil {
			errCount++
		}
	}
}()

Check out down below where we wg.Wait(), which indicates that all the silence workers are finished, and then we can close(errc).


errCount := 0
go func() {
for err := range errc {
if err != nil {
errCount++
}
}
}()

count := 0
for dec.More() {
var s types.Silence
Expand All @@ -100,28 +115,16 @@ func bulkImport(cmd *cobra.Command, args []string) error {
s.ID = ""
}

silences <- &s
silencec <- &s
count++
}
close(silences)

// read closing bracket
_, err = dec.Token()
if err != nil {
return errors.Wrap(err, "invalid JSON")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this as I think it doesn't really make sense to throw an error over JSON format here, the silences are added already at this point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would actually prefer to keep it in. Even though all the silences have been added, a user should still be informed if they are attempting to use invalid json.

}

errCount := 0
for i := 0; i < count; i++ {
err = <-errs
if err != nil {
errCount++
}
}
close(silencec)
wg.Wait()
close(errc)

if errCount > 0 {
return fmt.Errorf("couldn't import %v out of %v silences", errCount, count)
}

return nil
}