Skip to content

Commit

Permalink
embrace casesensitive, csv to db.
Browse files Browse the repository at this point in the history
everythings seems works
  • Loading branch information
yokofly committed Oct 24, 2024
1 parent 8d81a90 commit bd273cb
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 54 deletions.
127 changes: 84 additions & 43 deletions core/dbio/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1481,36 +1481,70 @@ func (conn *BaseConn) GetTableColumns(table *Table, fields ...string) (columns i

// if fields provided, check if exists in table
colMap := map[string]map[string]any{}
for _, rec := range colData.Records() {
colName := cast.ToString(rec["column_name"])
colMap[strings.ToLower(colName)] = rec
caseSensitive := conn.GetType().DBNameCaseSensitive()

if caseSensitive {
for _, rec := range colData.Records() {
colName := cast.ToString(rec["column_name"])
colMap[colName] = rec
}
} else {
for _, rec := range colData.Records() {
colName := cast.ToString(rec["column_name"])
colMap[strings.ToLower(colName)] = rec
}
}

var colTypes []ColumnType

// if fields provided, filter, keep order
if len(fields) > 0 {
for _, field := range fields {
rec, ok := colMap[strings.ToLower(field)]
if !ok {
err = g.Error(
"provided field '%s' not found in table %s",
strings.ToLower(field), table.FullName(),
)
return
}
if caseSensitive {
for _, field := range fields {
rec, ok := colMap[(field)]
if !ok {
err = g.Error(
"provided field '%s' not found in table %s",
(field), table.FullName(),
)
return
}

if conn.Type == dbio.TypeDbSnowflake {
rec["data_type"], rec["precision"], rec["scale"] = parseSnowflakeDataType(rec)
if conn.Type == dbio.TypeDbSnowflake {
rec["data_type"], rec["precision"], rec["scale"] = parseSnowflakeDataType(rec)
}

colTypes = append(colTypes, ColumnType{
Name: cast.ToString(rec["column_name"]),
DatabaseTypeName: cast.ToString(rec["data_type"]),
Precision: cast.ToInt(rec["precision"]),
Scale: cast.ToInt(rec["scale"]),
Sourced: true,
})
}
} else {
for _, field := range fields {
rec, ok := colMap[strings.ToLower(field)]
if !ok {
err = g.Error(
"provided field '%s' not found in table %s",
strings.ToLower(field), table.FullName(),
)
return
}

colTypes = append(colTypes, ColumnType{
Name: cast.ToString(rec["column_name"]),
DatabaseTypeName: cast.ToString(rec["data_type"]),
Precision: cast.ToInt(rec["precision"]),
Scale: cast.ToInt(rec["scale"]),
Sourced: true,
})
if conn.Type == dbio.TypeDbSnowflake {
rec["data_type"], rec["precision"], rec["scale"] = parseSnowflakeDataType(rec)
}

colTypes = append(colTypes, ColumnType{
Name: cast.ToString(rec["column_name"]),
DatabaseTypeName: cast.ToString(rec["data_type"]),
Precision: cast.ToInt(rec["precision"]),
Scale: cast.ToInt(rec["scale"]),
Sourced: true,
})
}
}
} else {
colTypes = lo.Map(colData.Records(), func(rec map[string]interface{}, i int) ColumnType {
Expand Down Expand Up @@ -2052,31 +2086,38 @@ func (conn *BaseConn) CastColumnsForSelect(srcColumns iop.Columns, tgtColumns io
// ValidateColumnNames verifies that source fields are present in the target table
// It will return quoted field names as `newColNames`, the same length as `colNames`
func (conn *BaseConn) ValidateColumnNames(tgtCols iop.Columns, colNames []string, quote bool) (newCols iop.Columns, err error) {

tgtFields := map[string]string{}
for _, colName := range tgtCols.Names() {
colName = conn.Self().Unquote(colName)
if quote {
tgtFields[strings.ToLower(colName)] = conn.Self().Quote(colName)
} else {
tgtFields[strings.ToLower(colName)] = colName
}
}

mismatches := []string{}
for _, colName := range colNames {
newCol := tgtCols.GetColumn(colName)
if newCol == nil || newCol.Name == "" {
// src field is missing in tgt field
mismatches = append(mismatches, g.F("source field '%s' is missing in target table", colName))
continue
caseSensitive := conn.GetType().DBNameCaseSensitive()
if caseSensitive {
for _, colName := range colNames {
newCol := tgtCols.GetColumnWithOriginalCase(colName)
if newCol == nil || newCol.Name == "" {
// src field is missing in tgt field
mismatches = append(mismatches, g.F("source field '%s' is missing in target table", colName))
continue
}
if quote {
newCol.Name = conn.Self().Quote(newCol.Name)
} else {
newCol.Name = conn.Self().Unquote(newCol.Name)
}
newCols = append(newCols, *newCol)
}
if quote {
newCol.Name = conn.Self().Quote(newCol.Name)
} else {
newCol.Name = conn.Self().Unquote(newCol.Name)
} else {
for _, colName := range colNames {
newCol := tgtCols.GetColumn(colName)
if newCol == nil || newCol.Name == "" {
// src field is missing in tgt field
mismatches = append(mismatches, g.F("source field '%s' is missing in target table", colName))
continue
}
if quote {
newCol.Name = conn.Self().Quote(newCol.Name)
} else {
newCol.Name = conn.Self().Unquote(newCol.Name)
}
newCols = append(newCols, *newCol)
}
newCols = append(newCols, *newCol)
}

if len(mismatches) > 0 {
Expand Down
8 changes: 8 additions & 0 deletions core/dbio/dbio_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ func (t Type) DBNameUpperCase() bool {
return g.In(t, TypeDbOracle, TypeDbSnowflake)
}

// timeplus@yokofly
// DBNameCaseSensitive returns true if case sensitive
// turn activate with clickhouse to fix the old issue https://github.com/slingdata-io/sling-cli/issues/417
func (t Type) DBNameCaseSensitive() bool {
return t == TypeDbProton
// return g.In(t, TypeDbClickhouse, TypeDbProton)
}

// Kind returns the kind of connection
func (t Type) Kind() Kind {
switch t {
Expand Down
14 changes: 3 additions & 11 deletions core/dbio/iop/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type CSV struct {
func CleanHeaderRow(header []string) []string {
// replace any other chars than regex expression
regexAllow := *regexp.MustCompile(`[^a-zA-Z0-9_]`)
fieldMap := map[string]string{}

transformer := transform.Chain(norm.NFD, runes.Remove(runes.In(unicode.Mn)), norm.NFC)
for i, field := range header {
Expand All @@ -64,16 +63,9 @@ func CleanHeaderRow(header []string) []string {
field = "col"
}

// avoid duplicates
j := 1
newField := field
for fieldMap[newField] != "" {
newField = g.F("%s%d", field, j)
j++
}

fieldMap[newField] = field
header[i] = strings.ToLower(newField)
header[i] = field
/// timeplus@yokofly, see issue https://github.com/slingdata-io/sling-cli/issues/417
/// we need to preserve the original case for proton
}

return header
Expand Down
9 changes: 9 additions & 0 deletions core/dbio/iop/datatype.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,15 @@ func (cols Columns) GetColumn(name string) *Column {
return colsMap[strings.ToLower(name)]
}

// GetColumnWithOriginalCase returns the matched Col
func (cols Columns) GetColumnWithOriginalCase(name string) *Column {
colsMap := map[string]*Column{}
for _, col := range cols {
colsMap[col.Name] = &col
}
return colsMap[name]
}

func (cols Columns) Merge(newCols Columns, overwrite bool) (col2 Columns, added schemaChg, changed []schemaChg) {
added = schemaChg{Added: true}

Expand Down

0 comments on commit bd273cb

Please sign in to comment.