Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

lightning: fix parquet parser for decimal type (#1272) #1276

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added pkg/lightning/mydump/examples/test.parquet
Binary file not shown.
234 changes: 200 additions & 34 deletions pkg/lightning/mydump/parquet_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"math/big"
"reflect"
"strings"
"time"
Expand Down Expand Up @@ -191,7 +192,16 @@ func NewParquetParser(
// NOTE: the SchemaElement.Name is capitalized, SchemaHandler.Infos.ExName is the raw column name
// though in this context, there is no difference between these two fields
columns = append(columns, strings.ToLower(c.Name))
columnMetas = append(columnMetas, c)
// transfer old ConvertedType to LogicalType
columnMeta := c
if c.ConvertedType != nil && c.LogicalType == nil {
newMeta := *c
columnMeta = &newMeta
if err := convertToLogicType(columnMeta); err != nil {
return nil, err
}
}
columnMetas = append(columnMetas, columnMeta)
}
}

Expand All @@ -203,6 +213,100 @@ func NewParquetParser(
}, nil
}

func convertToLogicType(se *parquet.SchemaElement) error {
logicalType := &parquet.LogicalType{}
switch *se.ConvertedType {
case parquet.ConvertedType_UTF8:
logicalType.STRING = &parquet.StringType{}
case parquet.ConvertedType_ENUM:
logicalType.ENUM = &parquet.EnumType{}
case parquet.ConvertedType_DECIMAL:
logicalType.DECIMAL = &parquet.DecimalType{
Scale: *se.Scale,
Precision: *se.Precision,
}
case parquet.ConvertedType_DATE:
logicalType.DATE = &parquet.DateType{}
case parquet.ConvertedType_TIME_MILLIS:
logicalType.TIME = &parquet.TimeType{
IsAdjustedToUTC: true,
Unit: &parquet.TimeUnit{
MILLIS: parquet.NewMilliSeconds(),
},
}
case parquet.ConvertedType_TIME_MICROS:
logicalType.TIME = &parquet.TimeType{
IsAdjustedToUTC: true,
Unit: &parquet.TimeUnit{
MICROS: parquet.NewMicroSeconds(),
},
}
case parquet.ConvertedType_TIMESTAMP_MILLIS:
logicalType.TIMESTAMP = &parquet.TimestampType{
IsAdjustedToUTC: true,
Unit: &parquet.TimeUnit{
MILLIS: parquet.NewMilliSeconds(),
},
}
case parquet.ConvertedType_TIMESTAMP_MICROS:
logicalType.TIMESTAMP = &parquet.TimestampType{
IsAdjustedToUTC: true,
Unit: &parquet.TimeUnit{
MICROS: parquet.NewMicroSeconds(),
},
}
case parquet.ConvertedType_UINT_8:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 8,
IsSigned: false,
}
case parquet.ConvertedType_UINT_16:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 16,
IsSigned: false,
}
case parquet.ConvertedType_UINT_32:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 32,
IsSigned: false,
}
case parquet.ConvertedType_UINT_64:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 64,
IsSigned: false,
}
case parquet.ConvertedType_INT_8:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 8,
IsSigned: true,
}
case parquet.ConvertedType_INT_16:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 16,
IsSigned: true,
}
case parquet.ConvertedType_INT_32:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 32,
IsSigned: true,
}
case parquet.ConvertedType_INT_64:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 64,
IsSigned: true,
}
case parquet.ConvertedType_JSON:
logicalType.JSON = &parquet.JsonType{}
case parquet.ConvertedType_BSON:
logicalType.BSON = &parquet.BsonType{}
// case parquet.ConvertedType_INTERVAL, parquet.ConvertedType_MAP, parquet.ConvertedType_MAP_KEY_VALUE, parquet.ConvertedType_LIST:
default:
return errors.Errorf("unsupported type: '%s'", *se.ConvertedType)
}
se.LogicalType = logicalType
return nil
}

// Pos returns the currently row number of the parquet file
func (pp *ParquetParser) Pos() (pos int64, rowID int64) {
return pp.curStart + int64(pp.curIndex), pp.lastRow.RowID
Expand Down Expand Up @@ -272,79 +376,141 @@ func (pp *ParquetParser) ReadRow() error {
pp.lastRow.Row = pp.lastRow.Row[:length]
}
for i := 0; i < length; i++ {
setDatumValue(&pp.lastRow.Row[i], v.Field(i), pp.columnMetas[i])
if err := setDatumValue(&pp.lastRow.Row[i], v.Field(i), pp.columnMetas[i]); err != nil {
return err
}
}
return nil
}

// convert a parquet value to Datum
//
// See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
func setDatumValue(d *types.Datum, v reflect.Value, meta *parquet.SchemaElement) {
func setDatumValue(d *types.Datum, v reflect.Value, meta *parquet.SchemaElement) error {
switch v.Kind() {
case reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
d.SetUint64(v.Uint())
case reflect.Int8, reflect.Int16:
d.SetInt64(v.Int())
case reflect.Int32, reflect.Int64:
setDatumByInt(d, v.Int(), meta)
return setDatumByInt(d, v.Int(), meta)
case reflect.String:
d.SetString(v.String(), "")
setDatumByString(d, v.String(), meta)
case reflect.Float32, reflect.Float64:
d.SetFloat64(v.Float())
case reflect.Ptr:
if v.IsNil() {
d.SetNull()
} else {
setDatumValue(d, v.Elem(), meta)
return setDatumValue(d, v.Elem(), meta)
}
default:
log.L().Fatal("unknown value", zap.Stringer("kind", v.Kind()),
log.L().Error("unknown value", zap.Stringer("kind", v.Kind()),
zap.String("type", v.Type().Name()), zap.Reflect("value", v.Interface()))
return errors.Errorf("unknown value: %v", v)
}
return nil
}

func setDatumByString(d *types.Datum, v string, meta *parquet.SchemaElement) {
if meta.LogicalType != nil && meta.LogicalType.DECIMAL != nil {
v = binaryToDecimalStr([]byte(v), int(meta.LogicalType.DECIMAL.Scale))
}
d.SetString(v, "")
}

func binaryToDecimalStr(rawBytes []byte, scale int) string {
negative := rawBytes[0] > 127
if negative {
for i := 0; i < len(rawBytes); i++ {
rawBytes[i] = ^rawBytes[i]
}
for i := len(rawBytes) - 1; i >= 0; i-- {
rawBytes[i] += 1
if rawBytes[i] != 0 {
break
}
}
}

intValue := big.NewInt(0)
intValue = intValue.SetBytes(rawBytes)
val := fmt.Sprintf("%0*d", scale, intValue)
dotIndex := len(val) - scale
var res strings.Builder
if negative {
res.WriteByte('-')
}
if dotIndex == 0 {
res.WriteByte('0')
} else {
res.WriteString(val[:dotIndex])
}
if scale > 0 {
res.WriteByte('.')
res.WriteString(val[dotIndex:])
}
return res.String()
}

// when the value type is int32/int64, convert to value to target logical type in tidb
func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) {
if meta.ConvertedType == nil {
func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) error {
if meta.ConvertedType == nil && meta.LogicalType == nil {
d.SetInt64(v)
return
return nil
}
switch *meta.ConvertedType {
// decimal
case parquet.ConvertedType_DECIMAL:
minLen := *meta.Scale + 1

logicalType := meta.LogicalType
switch {
case logicalType.DECIMAL != nil:
if logicalType.DECIMAL.Scale == 0 {
d.SetInt64(v)
return nil
}
minLen := logicalType.DECIMAL.Scale + 1
if v < 0 {
minLen++
}
val := fmt.Sprintf("%0*d", minLen, v)
dotIndex := len(val) - int(*meta.Scale)
d.SetString(val[:dotIndex]+"."+val[dotIndex:], "")
case parquet.ConvertedType_DATE:
case logicalType.DATE != nil:
dateStr := time.Unix(v*86400, 0).Format("2006-01-02")
d.SetString(dateStr, "")
// convert all timestamp types (datetime/timestamp) to string
case parquet.ConvertedType_TIMESTAMP_MICROS:
dateStr := time.Unix(v/1e6, (v%1e6)*1e3).Format("2006-01-02 15:04:05.999")
d.SetString(dateStr, "")
case parquet.ConvertedType_TIMESTAMP_MILLIS:
dateStr := time.Unix(v/1e3, (v%1e3)*1e6).Format("2006-01-02 15:04:05.999")
d.SetString(dateStr, "")
// covert time types to string
case parquet.ConvertedType_TIME_MILLIS, parquet.ConvertedType_TIME_MICROS:
if *meta.ConvertedType == parquet.ConvertedType_TIME_MICROS {
v /= 1e3
}
millis := v % 1e3
v /= 1e3
sec := v % 60
v /= 60
min := v % 60
v /= 60
d.SetString(fmt.Sprintf("%d:%d:%d.%3d", v, min, sec, millis), "")
case logicalType.TIMESTAMP != nil:
// convert all timestamp types (datetime/timestamp) to string
timeStr := formatTime(v, logicalType.TIMESTAMP.Unit, "2006-01-02 15:04:05.999999",
"2006-01-02 15:04:05.999999Z", logicalType.TIMESTAMP.IsAdjustedToUTC)
d.SetString(timeStr, "")
case logicalType.TIME != nil:
// convert all timestamp types (datetime/timestamp) to string
timeStr := formatTime(v, logicalType.TIME.Unit, "15:04:05.999999", "15:04:05.999999Z",
logicalType.TIME.IsAdjustedToUTC)
d.SetString(timeStr, "")
default:
d.SetInt64(v)
}
return nil
}

func formatTime(v int64, units *parquet.TimeUnit, format, utcFormat string, utc bool) string {
var sec, nsec int64
if units.MICROS != nil {
sec = v / 1e6
nsec = (v % 1e6) * 1e3
} else if units.MILLIS != nil {
sec = v / 1e3
nsec = (v % 1e3) * 1e6
} else {
// nano
sec = v / 1e9
nsec = v % 1e9
}
t := time.Unix(sec, nsec).UTC()
if utc {
return t.Format(utcFormat)
}
return t.Format(format)
}

func (pp *ParquetParser) LastRow() Row {
Expand Down
Loading