Skip to content

Commit

Permalink
aviold event duplicates (#45)
Browse files Browse the repository at this point in the history
* aviold event duplicates

* safer use of flag vars
  • Loading branch information
apesternikov authored Aug 19, 2024
1 parent cd3b4c6 commit 0f0562b
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions testing/it_sidecar/it_sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (i *arrayFlags) Set(value string) error {
}

var (
namespace = flag.String("namespace", os.Getenv("NAMESPACE"), "kubernetes namespace")
timeout = flag.Duration("timeout", time.Second*30, "execution timeout")
namespace string
timeout time.Duration
pfconfig = portForwardConf{services: make(map[string][]uint16)}
kubeconfig string
waitForApps arrayFlags
Expand All @@ -76,6 +76,8 @@ var (
)

func init() {
flag.StringVar(&namespace, "namespace", os.Getenv("NAMESPACE"), "kubernetes namespace")
flag.DurationVar(&timeout, "timeout", time.Second*30, "execution timeout")
flag.Var(&pfconfig, "portforward", "set a port forward item in form of servicename:port")
flag.StringVar(&kubeconfig, "kubeconfig", os.Getenv("KUBECONFIG"), "path to kubernetes config file")
flag.Var(&waitForApps, "waitforapp", "wait for pods with label app=<this parameter>")
Expand Down Expand Up @@ -130,7 +132,7 @@ func listReadyApps(list []interface{}) (readypods, notReady []string) {
// listenForEvents listens for events and prints them to stdout. if event reason is "Failed" it will call the failure callback
func listenForEvents(ctx context.Context, clientset *kubernetes.Clientset, onFailure func(*v1.Event)) {

kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, *namespace, nil)
kubeInformerFactory := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(namespace))
eventsInformer := kubeInformerFactory.Core().V1().Events().Informer()

fn := func(obj interface{}) {
Expand All @@ -146,8 +148,7 @@ func listenForEvents(ctx context.Context, clientset *kubernetes.Clientset, onFai
}

handler := &cache.ResourceEventHandlerFuncs{
AddFunc: fn,
DeleteFunc: fn,
AddFunc: fn,
UpdateFunc: func(old interface{}, new interface{}) {
fn(new)
},
Expand All @@ -172,7 +173,7 @@ func waitForPods(ctx context.Context, clientset *kubernetes.Clientset) error {
},
}

kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, *namespace, nil)
kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, namespace, nil)
podsInformer := kubeInformerFactory.Core().V1().Pods().Informer()
podsInformer.AddEventHandler(handler)
go kubeInformerFactory.Start(ctx.Done())
Expand Down Expand Up @@ -233,7 +234,7 @@ func waitForEndpoints(ctx context.Context, clientset *kubernetes.Clientset, conf
},
}

kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, *namespace, nil)
kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, namespace, nil)
endpointsInformer := kubeInformerFactory.Core().V1().Endpoints().Informer()
endpointsInformer.AddEventHandler(handler)
go kubeInformerFactory.Start(ctx.Done())
Expand Down Expand Up @@ -276,7 +277,7 @@ func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *r
var wg sync.WaitGroup
wg.Add(len(ports))
for _, port := range ports {
ep, err := clientset.CoreV1().Endpoints(*namespace).Get(ctx, serviceName, meta_v1.GetOptions{})
ep, err := clientset.CoreV1().Endpoints(namespace).Get(ctx, serviceName, meta_v1.GetOptions{})
if err != nil {
return fmt.Errorf("error listing endpoints for service %s: %v", serviceName, err)
}
Expand Down Expand Up @@ -335,7 +336,7 @@ var ErrTermSignalReceived = errors.New("TERM signal received")
func main() {
flag.Parse()
log.SetOutput(os.Stdout)
ctx, timeoutCancel := context.WithTimeoutCause(context.Background(), *timeout, ErrTimedOut)
ctx, timeoutCancel := context.WithTimeoutCause(context.Background(), timeout, ErrTimedOut)
defer timeoutCancel()
ctx, cancel := context.WithCancelCause(ctx)
c := make(chan os.Signal, 1)
Expand Down Expand Up @@ -372,7 +373,7 @@ func main() {
clientset = kubernetes.NewForConfigOrDie(config)

go func() {
err := stern.Run(ctx, *namespace, clientset, allowErrors, disablePodLogs)
err := stern.Run(ctx, namespace, clientset, allowErrors, disablePodLogs)
if err != nil {
log.Print(err)
}
Expand Down

0 comments on commit 0f0562b

Please sign in to comment.