Skip to content

Commit

Permalink
Merge pull request #144 from wazsone/bug/fix-oom
Browse files Browse the repository at this point in the history
fix OOM issue on real clusters
  • Loading branch information
denis-tingaikin authored Mar 7, 2023
2 parents 2b2311a + 7c07297 commit 69eae3b
Showing 1 changed file with 20 additions and 9 deletions.
29 changes: 20 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,15 @@ func Start(ctx context.Context, conf *Config, c kubernetes.Interface) <-chan str

var eventsCh = make(chan mapipwriter.Event, 64)

cm, err := c.CoreV1().ConfigMaps(conf.Namespace).Get(ctx, conf.FromConfigMap, v1.GetOptions{})
if err == nil {
for _, event := range translateFromConfigmap(watch.Event{
Type: watch.Added,
Object: cm,
}) {
eventsCh <- event
if conf.FromConfigMap != "" {
cm, err := c.CoreV1().ConfigMaps(conf.Namespace).Get(ctx, conf.FromConfigMap, v1.GetOptions{})
if err == nil {
for _, event := range translateFromConfigmap(watch.Event{
Type: watch.Added,
Object: cm,
}) {
eventsCh <- event
}
}
}

Expand Down Expand Up @@ -205,17 +207,26 @@ func Start(ctx context.Context, conf *Config, c kubernetes.Interface) <-chan str
}

func monitorEvents(ctx context.Context, out chan<- mapipwriter.Event, getWatchFn func() watch.Interface, translateFn func(watch.Event) []mapipwriter.Event) {
for ctx.Err() == nil {
w := getWatchFn()
w := getWatchFn()
defer func() {
if w != nil {
w.Stop()
}
}()

for ctx.Err() == nil {
if w == nil {
log.FromContext(ctx).Errorf("cant supply watcher")
time.Sleep(time.Second / 2)
w = getWatchFn()
continue
}

select {
case e, ok := <-w.ResultChan():
if !ok {
w.Stop()
w = getWatchFn()
continue
}
events := translateFn(e)
Expand Down

0 comments on commit 69eae3b

Please sign in to comment.