Skip to content

Commit

Permalink
Better API example (#1392)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnkerl authored Sep 10, 2023
1 parent 03eed30 commit 39fa3a1
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 98 deletions.
118 changes: 69 additions & 49 deletions docs/src/miller-as-library.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,41 @@ $ go run main1.go
## Another example use

<pre class="pre-non-highlight-non-pair">
// This is an example of using Miller as a library.
package main

import (
"bufio"
"container/list"
"errors"
"fmt"
"os"

"github.com/johnkerl/miller/pkg/bifs"
"github.com/johnkerl/miller/pkg/cli"
"github.com/johnkerl/miller/pkg/input"
"github.com/johnkerl/miller/pkg/output"
"github.com/johnkerl/miller/pkg/transformers"
"github.com/johnkerl/miller/pkg/types"
)

func convert_csv_to_json(fileNames []string) error {
options := &cli.TOptions{
// Put your record-processing logic here.
func custom_record_processor(irac *types.RecordAndContext) (*types.RecordAndContext, error) {
irec := irac.Record

v := irec.Get("i")
if v == nil {
return nil, fmt.Errorf("did not find key \"i\" at filename %s record number %d",
irac.Context.FILENAME, irac.Context.FNR,
)
}
v2 := bifs.BIF_times(v, v)
irec.PutReference("i2", v2)

return irac, nil
}

// Put your various options here.
func custom_options() *cli.TOptions {
return &cli.TOptions{
ReaderOptions: cli.TReaderOptions{
InputFileFormat: "csv",
IFS: ",",
Expand All @@ -105,6 +122,14 @@ func convert_csv_to_json(fileNames []string) error {
OutputFileFormat: "json",
},
}
}

// This function you don't need to modify.
func convert_csv_to_json(
fileNames []string,
options *cli.TOptions,
record_processor func (irac *types.RecordAndContext) (*types.RecordAndContext, error),
) error {
outputStream := os.Stdout
outputIsStdout := true

Expand All @@ -120,60 +145,55 @@ func convert_csv_to_json(fileNames []string) error {
return err
}

// Set up the channels for the record-reader.
readerChannel := make(chan *list.List, 2) // list of *types.RecordAndContext
inputErrorChannel := make(chan error, 1)
// Not needed in this example
readerDownstreamDoneChannel := make(chan bool, 1)

// Instantiate the record-writer
recordWriter, err := output.Create(&options.WriterOptions)
if err != nil {
return err
}

cat, err := transformers.NewTransformerCat(
false, // doCounters bool,
"", // counterFieldName string,
nil, // groupByFieldNames []string,
false, // doFileName bool,
false, // doFileNum bool,
)
if err != nil {
return err
}
recordTransformers := []transformers.IRecordTransformer{cat}

// Set up the reader-to-transformer and transformer-to-writer channels.
readerChannel := make(chan *list.List, 2) // list of *types.RecordAndContext
writerChannel := make(chan *list.List, 1) // list of *types.RecordAndContext

// We're done when a fatal error is registered on input (file not found,
// etc) or when the record-writer has written all its output. We use
// channels to communicate both of these conditions.
inputErrorChannel := make(chan error, 1)
doneWritingChannel := make(chan bool, 1)
dataProcessingErrorChannel := make(chan bool, 1)

readerDownstreamDoneChannel := make(chan bool, 1)

// Start the reader, transformer, and writer. Let them run until fatal input
// error or end-of-processing happens.
bufferedOutputStream := bufio.NewWriter(outputStream)

go recordReader.Read(fileNames, *initialContext, readerChannel, inputErrorChannel, readerDownstreamDoneChannel)
go transformers.ChainTransformer(readerChannel, readerDownstreamDoneChannel, recordTransformers,
writerChannel, options)
go output.ChannelWriter(writerChannel, recordWriter, &options.WriterOptions, doneWritingChannel,
dataProcessingErrorChannel, bufferedOutputStream, outputIsStdout)
// Start the record-reader.
go recordReader.Read(
fileNames, *initialContext, readerChannel, inputErrorChannel, readerDownstreamDoneChannel)

// Loop through the record stream.
var retval error
done := false
for !done {
select {

case ierr := &lt;-inputErrorChannel:
retval = ierr
break
case _ = &lt;-dataProcessingErrorChannel:
retval = errors.New("exiting due to data error") // details already printed
break
case _ = &lt;-doneWritingChannel:
done = true

case iracs := &lt;-readerChannel:
// Handle the record batch
for e := iracs.Front(); e != nil; e = e.Next() {
irac := e.Value.(*types.RecordAndContext)
if irac.Record != nil {
orac, err := record_processor(irac)
if err != nil {
retval = err
done = true
break
}
recordWriter.Write(orac.Record, bufferedOutputStream, outputIsStdout)
}
if irac.OutputString != "" {
fmt.Fprintln(bufferedOutputStream, irac.OutputString)
}
if irac.EndOfStream {
done = true
}
}
break

}
}

Expand All @@ -183,7 +203,8 @@ func convert_csv_to_json(fileNames []string) error {
}

func main() {
err := convert_csv_to_json(os.Args[1:])
options := custom_options()
err := convert_csv_to_json(os.Args[1:], options, custom_record_processor)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
Expand All @@ -198,10 +219,9 @@ nadir.west.our.org,down

```
$ go build main2.go
$ ./main2 data/hostnames.csv
{"host": "apoapsis.east.our.org", "status": "up"}
{"host": "nadir.west.our.org", "status": "down"}
{"a": "pan", "b": "pan", "i": 1, "x": 0.3467901443380824, "y": 0.7268028627434533, "i2": 1}
{"a": "eks", "b": "pan", "i": 2, "x": 0.7586799647899636, "y": 0.5221511083334797, "i2": 4}
{"a": "wye", "b": "wye", "i": 3, "x": 0.20460330576630303, "y": 0.33831852551664776, "i2": 9}
{"a": "eks", "b": "wye", "i": 4, "x": 0.38139939387114097, "y": 0.13418874328430463, "i2": 16}
{"a": "wye", "b": "pan", "i": 5, "x": 0.5732889198020006, "y": 0.8636244699032729, "i2": 25}$ ./main2 data/small.csv
```



11 changes: 5 additions & 6 deletions docs/src/miller-as-library.md.in
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ GENMD-INCLUDE-ESCAPED(data/hostnames.csv)

```
$ go build main2.go
$ ./main2 data/hostnames.csv
{"host": "apoapsis.east.our.org", "status": "up"}
{"host": "nadir.west.our.org", "status": "down"}
{"a": "pan", "b": "pan", "i": 1, "x": 0.3467901443380824, "y": 0.7268028627434533, "i2": 1}
{"a": "eks", "b": "pan", "i": 2, "x": 0.7586799647899636, "y": 0.5221511083334797, "i2": 4}
{"a": "wye", "b": "wye", "i": 3, "x": 0.20460330576630303, "y": 0.33831852551664776, "i2": 9}
{"a": "eks", "b": "wye", "i": 4, "x": 0.38139939387114097, "y": 0.13418874328430463, "i2": 16}
{"a": "wye", "b": "pan", "i": 5, "x": 0.5732889198020006, "y": 0.8636244699032729, "i2": 25}$ ./main2 data/small.csv
```



107 changes: 64 additions & 43 deletions docs/src/miller-as-library/main2.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,38 @@
// This is an example of using Miller as a library.
package main

import (
"bufio"
"container/list"
"errors"
"fmt"
"os"

"github.com/johnkerl/miller/pkg/bifs"
"github.com/johnkerl/miller/pkg/cli"
"github.com/johnkerl/miller/pkg/input"
"github.com/johnkerl/miller/pkg/output"
"github.com/johnkerl/miller/pkg/transformers"
"github.com/johnkerl/miller/pkg/types"
)

func convert_csv_to_json(fileNames []string) error {
options := &cli.TOptions{
// Put your record-processing logic here.
func custom_record_processor(irac *types.RecordAndContext) (*types.RecordAndContext, error) {
irec := irac.Record

v := irec.Get("i")
if v == nil {
return nil, fmt.Errorf("did not find key \"i\" at filename %s record number %d",
irac.Context.FILENAME, irac.Context.FNR,
)
}
v2 := bifs.BIF_times(v, v)
irec.PutReference("i2", v2)

return irac, nil
}

// Put your various options here.
func custom_options() *cli.TOptions {
return &cli.TOptions{
ReaderOptions: cli.TReaderOptions{
InputFileFormat: "csv",
IFS: ",",
Expand All @@ -26,6 +43,14 @@ func convert_csv_to_json(fileNames []string) error {
OutputFileFormat: "json",
},
}
}

// This function you don't need to modify.
func convert_csv_to_json(
fileNames []string,
options *cli.TOptions,
record_processor func (irac *types.RecordAndContext) (*types.RecordAndContext, error),
) error {
outputStream := os.Stdout
outputIsStdout := true

Expand All @@ -41,60 +66,55 @@ func convert_csv_to_json(fileNames []string) error {
return err
}

// Set up the channels for the record-reader.
readerChannel := make(chan *list.List, 2) // list of *types.RecordAndContext
inputErrorChannel := make(chan error, 1)
// Not needed in this example
readerDownstreamDoneChannel := make(chan bool, 1)

// Instantiate the record-writer
recordWriter, err := output.Create(&options.WriterOptions)
if err != nil {
return err
}

cat, err := transformers.NewTransformerCat(
false, // doCounters bool,
"", // counterFieldName string,
nil, // groupByFieldNames []string,
false, // doFileName bool,
false, // doFileNum bool,
)
if err != nil {
return err
}
recordTransformers := []transformers.IRecordTransformer{cat}

// Set up the reader-to-transformer and transformer-to-writer channels.
readerChannel := make(chan *list.List, 2) // list of *types.RecordAndContext
writerChannel := make(chan *list.List, 1) // list of *types.RecordAndContext

// We're done when a fatal error is registered on input (file not found,
// etc) or when the record-writer has written all its output. We use
// channels to communicate both of these conditions.
inputErrorChannel := make(chan error, 1)
doneWritingChannel := make(chan bool, 1)
dataProcessingErrorChannel := make(chan bool, 1)

readerDownstreamDoneChannel := make(chan bool, 1)

// Start the reader, transformer, and writer. Let them run until fatal input
// error or end-of-processing happens.
bufferedOutputStream := bufio.NewWriter(outputStream)

go recordReader.Read(fileNames, *initialContext, readerChannel, inputErrorChannel, readerDownstreamDoneChannel)
go transformers.ChainTransformer(readerChannel, readerDownstreamDoneChannel, recordTransformers,
writerChannel, options)
go output.ChannelWriter(writerChannel, recordWriter, &options.WriterOptions, doneWritingChannel,
dataProcessingErrorChannel, bufferedOutputStream, outputIsStdout)
// Start the record-reader.
go recordReader.Read(
fileNames, *initialContext, readerChannel, inputErrorChannel, readerDownstreamDoneChannel)

// Loop through the record stream.
var retval error
done := false
for !done {
select {

case ierr := <-inputErrorChannel:
retval = ierr
break
case _ = <-dataProcessingErrorChannel:
retval = errors.New("exiting due to data error") // details already printed
break
case _ = <-doneWritingChannel:
done = true

case iracs := <-readerChannel:
// Handle the record batch
for e := iracs.Front(); e != nil; e = e.Next() {
irac := e.Value.(*types.RecordAndContext)
if irac.Record != nil {
orac, err := record_processor(irac)
if err != nil {
retval = err
done = true
break
}
recordWriter.Write(orac.Record, bufferedOutputStream, outputIsStdout)
}
if irac.OutputString != "" {
fmt.Fprintln(bufferedOutputStream, irac.OutputString)
}
if irac.EndOfStream {
done = true
}
}
break

}
}

Expand All @@ -104,7 +124,8 @@ func convert_csv_to_json(fileNames []string) error {
}

func main() {
err := convert_csv_to_json(os.Args[1:])
options := custom_options()
err := convert_csv_to_json(os.Args[1:], options, custom_record_processor)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
Expand Down
Loading

0 comments on commit 39fa3a1

Please sign in to comment.