Skip to content

Commit

Permalink
Merge pull request #35 from mimiro-io/feature/csv-encoder
Browse files Browse the repository at this point in the history
Feature/csv encoder
  • Loading branch information
gra-moore authored Mar 7, 2024
2 parents 9c8a4ab + 609d02f commit 9780737
Show file tree
Hide file tree
Showing 5 changed files with 295 additions and 102 deletions.
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,25 @@ You can create a Markdown table from the data structure definition like this:

#### source_config

The `source_config` is a JSON Object and used to provide information about the dataset. This is intended to contain things like the name of the database table, any queries templates that are needed etc.
The `source_config` is a JSON Object and used to provide information about the dataset. This is intended to contain things like the name of the database table, any queries templates that are needed etc. Example-usage below.

##### example source_config for csv-encoded data

| JSON Field | Description |
| ----------------------- | --------------------------------------------------------------- |
| encoding | Specifies what type of encoding the incoming data has |
| columns | Names and orders the different columns of the data |
| hasHeader | Field decides if first row should be header or entitiy. |
| columnSeparator | Define what character is used to separete the data in columns |

```json
"sourceConfig":{
"encoding": "csv",
"columns" : ["id", "name", "age", "worksfor"],
"hasHeader": true,
"columnSeparator": ","
}
```

#### incoming_mapping_config

Expand Down
4 changes: 4 additions & 0 deletions encoder/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ func NewItemIterator(sourceConfig map[string]any, data io.ReadCloser) (ItemItera
return NewJsonItemIterator(sourceConfig, data)
}

if encoding == "csv" {
return NewCSVItemIterator(sourceConfig, data)
}

return nil, nil
}

Expand Down
228 changes: 134 additions & 94 deletions encoder/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ package encoder
import (
"encoding/csv"
"errors"
"fmt"
common_datalayer "github.com/mimiro-io/common-datalayer"
"io"
"slices"
"strconv"
"strings"
)

func NewCSVItemFactory() ItemFactory {
Expand All @@ -20,24 +19,21 @@ func (c *CSVItemFactory) NewItem() common_datalayer.Item {
return &CSVItem{data: make(map[string]any)}
}

// TODO: fix me
// differentiate between ENCODER and ENCODING with better naming
type CSVItemWriter struct {
data io.WriteCloser
batchInfo *common_datalayer.BatchInfo
encoder *csv.Writer
writer *csv.Writer
columns []string
hasHeader bool
separator string
encoding string
validateFields bool
headerWritten bool
firstItemWritten bool
}

func NewCSVItemWriter(sourceConfig map[string]any, data io.WriteCloser, batchInfo *common_datalayer.BatchInfo) (*CSVItemWriter, error) {
enc := csv.NewWriter(data)
writer := &CSVItemWriter{data: data, encoder: enc, batchInfo: batchInfo}
writer := &CSVItemWriter{data: data, writer: enc, batchInfo: batchInfo}

// this can be implicitly creating the order of the columns
// OR we add the order field to the config to specify the order of the columns
Expand All @@ -46,108 +42,102 @@ func NewCSVItemWriter(sourceConfig map[string]any, data io.WriteCloser, batchInf
if ok {
writer.columns = columnNames.([]string)
}
columnSeparator, ok := sourceConfig["columnSeparator"]
if ok {
var err error
writer.separator = columnSeparator.(string)
writer.writer.Comma, err = stringToRune(columnSeparator.(string))
if err != nil {
return nil, errors.New("input string does not match allowed characters")
}
}

encoding, ok := sourceConfig["encoding"]
if ok {
writer.encoding = encoding.(string)
}
validateFields, ok := sourceConfig["validateFields"]
if ok {
writer.validateFields = validateFields.(bool)
}
hasHeader, ok := sourceConfig["hasHeader"]
// if no hasHeader still want to write data based on columns

if ok {
writer.hasHeader = hasHeader.(bool)
// Should this check be here?
if writer.hasHeader {
if batchInfo != nil {
if batchInfo.IsStartBatch {
// create comma separated header row from writer.columns
headerRow := strings.Join(writer.columns, writer.separator)
_, err := data.Write([]byte(headerRow)) // write the header for the csv-file
// set headerWritten to true, so we don't write the header again
writer.headerWritten = true
err := writer.writer.Write(writer.columns) // write the header for the csv-file
if err != nil {
return nil, err
}
} else {
writer.firstItemWritten = true
}
} else {
err := writer.writer.Write(writer.columns) // write the header for the csv-file
if err != nil {
return nil, err
}
}
}
}
columnSeparator, ok := sourceConfig["columnSeparator"]
if ok {
writer.separator = columnSeparator.(string)
}
encoding, ok := sourceConfig["encoding"]
if ok {
writer.encoding = encoding.(string)
}
validateFields, ok := sourceConfig["validateFields"]
if ok {
writer.validateFields = validateFields.(bool)
}

return writer, nil
}

func (c *CSVItemWriter) Close() error {
c.writer.Flush()
return c.data.Close()
}

func (c *CSVItemWriter) Write(item common_datalayer.Item) error {
// TODO: fix me
// should this check be here?
written := 0
if c.headerWritten {
if c.firstItemWritten {
var r []string

row := make(map[string]interface{})

for _, h := range c.columns {
if _, ok := row[h]; ok {
switch v := row[h].(type) {
case float64:
r = append(r, strconv.FormatFloat(v, 'f', 0, 64))
case string:
r = append(r, v)
case bool:
r = append(r, strconv.FormatBool(v))
}
} else {
r = append(r, "")
}
}
for _, col := range r {
written += len([]byte(col))
}
err := c.Write(item)
if err != nil {
return err
}
_, err = c.data.Write([]byte(","))
if err != nil {
return err
var r []string

row := item.NativeItem().(map[string]any)
for _, h := range c.columns {
if _, ok := row[h]; ok {
switch v := row[h].(type) {
case float64:
r = append(r, strconv.FormatFloat(v, 'f', 0, 64))
case string:
r = append(r, v)
case bool:
r = append(r, strconv.FormatBool(v))
case int:
r = append(r, strconv.Itoa(v))
case int64:
r = append(r, strconv.FormatInt(v, 10))
case int32:
r = append(r, strconv.FormatInt(int64(v), 10))
case int16:
r = append(r, strconv.FormatInt(int64(v), 10))
case int8:
r = append(r, strconv.FormatInt(int64(v), 10))
case uint:
r = append(r, strconv.FormatUint(uint64(v), 10))
case uint64:
r = append(r, strconv.FormatUint(v, 10))
case uint32:
r = append(r, strconv.FormatUint(uint64(v), 10))
case uint16:
r = append(r, strconv.FormatUint(uint64(v), 10))
case uint8:
r = append(r, strconv.FormatUint(uint64(v), 10))
default:
r = append(r, "")
}
} else {
c.firstItemWritten = true
}
} else {
// write the header
headerRow := strings.Join(c.columns, c.separator)
_, err := c.data.Write([]byte(headerRow)) // write the header for the csv-file
// set headerWritten to true, so we don't write the header again
c.headerWritten = true
if err != nil {
return err
r = append(r, "")
}
}
/* if writer.hasHeader {
if batchInfo != nil {
if batchInfo.IsStartBatch {
// create comma separated header row from writer.columns
headerRow := strings.Join(writer.columns, writer.separator)
_, err := data.Write([]byte(headerRow)) // write the header for the csv-file
// set headerWritten to true, so we don't write the header again
writer.headerWritten = true
if err != nil {
return nil, err
}
}
}
}*/
for _, col := range r {
written += len([]byte(col))
}
err := c.writer.Write(r)
if err != nil {
return err
}
return nil
}

Expand All @@ -159,16 +149,23 @@ type CSVItemIterator struct {
columns []string
separator string
encoding string
ignoreColumns []string
}

func NewCSVItemIterator(sourceConfig map[string]any, data io.ReadCloser) (*CSVItemIterator, error) {
// TODO: fix me
dec := csv.NewReader(data)
reader := &CSVItemIterator{data: data, decoder: dec}

columnSeparator, ok := sourceConfig["columnSeparator"]
if ok {
reader.separator = columnSeparator.(string)
// only working with characters for now, tabs doesn't work
//TODO: add support for tabs
//reader.decoder.Comma = rune(columnSeparator.(string)[0])
err := errors.New("input string does not match allowed characters")
reader.decoder.Comma, err = stringToRune(columnSeparator.(string))
if err != nil {
return nil, err
}
}
encoding, ok := sourceConfig["encoding"]
if ok {
Expand All @@ -178,29 +175,59 @@ func NewCSVItemIterator(sourceConfig map[string]any, data io.ReadCloser) (*CSVIt
if ok {
reader.columns = columnNames.([]string)
}
// check if header is present and if so, validate amount fields
// do we want to do this here?
header, err := dec.Read()
fmt.Sprintf(strings.Join(header, reader.separator))

ignoreColumns, ok := sourceConfig["ignoreColumns"]
if ok {
reader.ignoreColumns = ignoreColumns.([]string)
}
validateFields, ok := sourceConfig["validateFields"]
if ok {
if !validateFields.(bool) {
reader.decoder.FieldsPerRecord = -1
} else {
//checks the first field and sees how many columns there is, uses that as validation going forward.
//another option is to explicity set the number of columns in the config
reader.decoder.FieldsPerRecord = len(reader.columns)
}
} else {
// default is not to validate fields
reader.decoder.FieldsPerRecord = -1
}
header, err := reader.decoder.Read()
if err != nil {
return nil, err
}
//
if len(header) > len(reader.columns) {
return nil, errors.New("header row does not match columns in source config")
}

return &CSVItemIterator{data: data, decoder: dec}, nil
return reader, nil
}

func (c *CSVItemIterator) Close() error {
return c.data.Close()
}

func (c *CSVItemIterator) Read() (common_datalayer.Item, error) {
// TODO: fix me
return nil, nil
record, err := c.decoder.Read()
// care about data types here? look at sourceConfig for that if needed the config might be extended as:
// columns: [{name: "name", type: "string"}, {name: "age", type: "int"}]
// this means a change to how we are reading out columns in NewCSVItemIterator and NewCSVItemWriter

if err != nil {
if err == io.EOF {
return nil, nil
}
return nil, err
}
var entityProps = make(map[string]interface{})
for j, key := range c.columns {
if slices.Contains(c.ignoreColumns, key) {
continue
}
entityProps[key] = record[j]
}

return &CSVItem{data: entityProps}, nil
}

type CSVItem struct {
Expand All @@ -226,3 +253,16 @@ func (item *CSVItem) GetPropertyNames() []string {
func (item *CSVItem) NativeItem() any {
return item.data
}

func stringToRune(input string) (rune, error) {
switch input {
case ",":
return ',', nil
case "\t":
return '\t', nil
case " ":
return ' ', nil
default:
return 0, errors.New("input string does not match allowed characters")
}
}
Loading

0 comments on commit 9780737

Please sign in to comment.