Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supporting multiple IIPs in one network? #77

Open
an0nym05 opened this issue Feb 28, 2023 · 0 comments
Open

Supporting multiple IIPs in one network? #77

an0nym05 opened this issue Feb 28, 2023 · 0 comments

Comments

@an0nym05
Copy link

I'm trying to execute the following network:

'./sample.csv' -> FILE Source(csv/reader)
'./output.csv' -> FILE Target(csv/writer)
Source DATA -> DATA Target

But for some reason the network hangs and 2nd IIP never fired:

00:48:00 INF 1 run.go:29 > file="examples/simple.fbp" reading flow file
00:48:00 DBG 1 run.go:70 > IIP="'./sample.csv' -> FILE Source" adding IIP
00:48:00 DBG 1 run.go:70 > IIP="'./output.csv' -> FILE Target" adding IIP
00:48:00 DBG 10 csv.go:15 > CSVReader.Process()
00:48:00 DBG 12 csv.go:30 > CSVWriter.Process()
00:48:03 DBG 10 csv.go:18 > file="./sample.csv" CSVReader.Process()
00:48:03 DBG 12 csv.go:41 > file="" data="demo_table" CSVWriter.Process() 
00:48:03 DBG 10 csv.go:21 > CSVReader.Process() shutting down
...

And execution keeps waiting...

Expectation is to see the log line where both file and data variables are provided.

The components code is provided below:

package components

import (
	"time"

	"github.com/phuslu/log"
)

type CSVReader struct {
	File <-chan string
	Data chan<- string
}

func (reader *CSVReader) Process() {
	log.Debug().Msg("CSVReader.Process()")
	for file := range reader.File {
		time.Sleep(time.Second * 3)
		log.Debug().Str("file", file).Msg("CSVReader.Process()")
		reader.Data <- "demo_table"
	}
	log.Debug().Msg("CSVReader.Process() shutting down")
}

type CSVWriter struct {
	File <-chan string
	Data <-chan string
}

func (writer *CSVWriter) Process() {
	log.Debug().Msg("CSVWriter.Process()")
	defer func() {
		log.Debug().Msg("CSVWriter.Process() shutting down")
	}()
	var (
		file, table string
	)
	for {
		select {
		case file = <-writer.File:
			if table != "" {
				log.Debug().Str("file", file).Str("data", table).Msg("CSVWriter.Process() ")
				file = ""
				table = ""
			}
		case table = <-writer.Data:
			if file != "" {
				log.Debug().Str("file", file).Str("data", table).Msg("CSVWriter.Process() ")
				file = ""
				table = ""
			}
		}
	}

}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant