Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
lightning: fix parquet parser for decimal type (#1272) (#1330)
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv authored Jul 14, 2021
1 parent f2fcbfa commit b82128c
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 60 deletions.
Binary file added pkg/lightning/mydump/examples/test.parquet
Binary file not shown.
234 changes: 200 additions & 34 deletions pkg/lightning/mydump/parquet_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"math/big"
"reflect"
"strings"
"time"
Expand Down Expand Up @@ -191,7 +192,16 @@ func NewParquetParser(
// NOTE: the SchemaElement.Name is capitalized, SchemaHandler.Infos.ExName is the raw column name
// though in this context, there is no difference between these two fields
columns = append(columns, strings.ToLower(c.Name))
columnMetas = append(columnMetas, c)
// transfer old ConvertedType to LogicalType
columnMeta := c
if c.ConvertedType != nil && c.LogicalType == nil {
newMeta := *c
columnMeta = &newMeta
if err := convertToLogicType(columnMeta); err != nil {
return nil, err
}
}
columnMetas = append(columnMetas, columnMeta)
}
}

Expand All @@ -203,6 +213,100 @@ func NewParquetParser(
}, nil
}

func convertToLogicType(se *parquet.SchemaElement) error {
logicalType := &parquet.LogicalType{}
switch *se.ConvertedType {
case parquet.ConvertedType_UTF8:
logicalType.STRING = &parquet.StringType{}
case parquet.ConvertedType_ENUM:
logicalType.ENUM = &parquet.EnumType{}
case parquet.ConvertedType_DECIMAL:
logicalType.DECIMAL = &parquet.DecimalType{
Scale: *se.Scale,
Precision: *se.Precision,
}
case parquet.ConvertedType_DATE:
logicalType.DATE = &parquet.DateType{}
case parquet.ConvertedType_TIME_MILLIS:
logicalType.TIME = &parquet.TimeType{
IsAdjustedToUTC: true,
Unit: &parquet.TimeUnit{
MILLIS: parquet.NewMilliSeconds(),
},
}
case parquet.ConvertedType_TIME_MICROS:
logicalType.TIME = &parquet.TimeType{
IsAdjustedToUTC: true,
Unit: &parquet.TimeUnit{
MICROS: parquet.NewMicroSeconds(),
},
}
case parquet.ConvertedType_TIMESTAMP_MILLIS:
logicalType.TIMESTAMP = &parquet.TimestampType{
IsAdjustedToUTC: true,
Unit: &parquet.TimeUnit{
MILLIS: parquet.NewMilliSeconds(),
},
}
case parquet.ConvertedType_TIMESTAMP_MICROS:
logicalType.TIMESTAMP = &parquet.TimestampType{
IsAdjustedToUTC: true,
Unit: &parquet.TimeUnit{
MICROS: parquet.NewMicroSeconds(),
},
}
case parquet.ConvertedType_UINT_8:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 8,
IsSigned: false,
}
case parquet.ConvertedType_UINT_16:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 16,
IsSigned: false,
}
case parquet.ConvertedType_UINT_32:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 32,
IsSigned: false,
}
case parquet.ConvertedType_UINT_64:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 64,
IsSigned: false,
}
case parquet.ConvertedType_INT_8:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 8,
IsSigned: true,
}
case parquet.ConvertedType_INT_16:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 16,
IsSigned: true,
}
case parquet.ConvertedType_INT_32:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 32,
IsSigned: true,
}
case parquet.ConvertedType_INT_64:
logicalType.INTEGER = &parquet.IntType{
BitWidth: 64,
IsSigned: true,
}
case parquet.ConvertedType_JSON:
logicalType.JSON = &parquet.JsonType{}
case parquet.ConvertedType_BSON:
logicalType.BSON = &parquet.BsonType{}
// case parquet.ConvertedType_INTERVAL, parquet.ConvertedType_MAP, parquet.ConvertedType_MAP_KEY_VALUE, parquet.ConvertedType_LIST:
default:
return errors.Errorf("unsupported type: '%s'", *se.ConvertedType)
}
se.LogicalType = logicalType
return nil
}

// Pos returns the currently row number of the parquet file
func (pp *ParquetParser) Pos() (pos int64, rowID int64) {
return pp.curStart + int64(pp.curIndex), pp.lastRow.RowID
Expand Down Expand Up @@ -272,79 +376,141 @@ func (pp *ParquetParser) ReadRow() error {
pp.lastRow.Row = pp.lastRow.Row[:length]
}
for i := 0; i < length; i++ {
setDatumValue(&pp.lastRow.Row[i], v.Field(i), pp.columnMetas[i])
if err := setDatumValue(&pp.lastRow.Row[i], v.Field(i), pp.columnMetas[i]); err != nil {
return err
}
}
return nil
}

// convert a parquet value to Datum
//
// See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
func setDatumValue(d *types.Datum, v reflect.Value, meta *parquet.SchemaElement) {
func setDatumValue(d *types.Datum, v reflect.Value, meta *parquet.SchemaElement) error {
switch v.Kind() {
case reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
d.SetUint64(v.Uint())
case reflect.Int8, reflect.Int16:
d.SetInt64(v.Int())
case reflect.Int32, reflect.Int64:
setDatumByInt(d, v.Int(), meta)
return setDatumByInt(d, v.Int(), meta)
case reflect.String:
d.SetString(v.String(), "")
setDatumByString(d, v.String(), meta)
case reflect.Float32, reflect.Float64:
d.SetFloat64(v.Float())
case reflect.Ptr:
if v.IsNil() {
d.SetNull()
} else {
setDatumValue(d, v.Elem(), meta)
return setDatumValue(d, v.Elem(), meta)
}
default:
log.L().Fatal("unknown value", zap.Stringer("kind", v.Kind()),
log.L().Error("unknown value", zap.Stringer("kind", v.Kind()),
zap.String("type", v.Type().Name()), zap.Reflect("value", v.Interface()))
return errors.Errorf("unknown value: %v", v)
}
return nil
}

func setDatumByString(d *types.Datum, v string, meta *parquet.SchemaElement) {
if meta.LogicalType != nil && meta.LogicalType.DECIMAL != nil {
v = binaryToDecimalStr([]byte(v), int(meta.LogicalType.DECIMAL.Scale))
}
d.SetString(v, "")
}

func binaryToDecimalStr(rawBytes []byte, scale int) string {
negative := rawBytes[0] > 127
if negative {
for i := 0; i < len(rawBytes); i++ {
rawBytes[i] = ^rawBytes[i]
}
for i := len(rawBytes) - 1; i >= 0; i-- {
rawBytes[i] += 1
if rawBytes[i] != 0 {
break
}
}
}

intValue := big.NewInt(0)
intValue = intValue.SetBytes(rawBytes)
val := fmt.Sprintf("%0*d", scale, intValue)
dotIndex := len(val) - scale
var res strings.Builder
if negative {
res.WriteByte('-')
}
if dotIndex == 0 {
res.WriteByte('0')
} else {
res.WriteString(val[:dotIndex])
}
if scale > 0 {
res.WriteByte('.')
res.WriteString(val[dotIndex:])
}
return res.String()
}

// when the value type is int32/int64, convert to value to target logical type in tidb
func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) {
if meta.ConvertedType == nil {
func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) error {
if meta.ConvertedType == nil && meta.LogicalType == nil {
d.SetInt64(v)
return
return nil
}
switch *meta.ConvertedType {
// decimal
case parquet.ConvertedType_DECIMAL:
minLen := *meta.Scale + 1

logicalType := meta.LogicalType
switch {
case logicalType.DECIMAL != nil:
if logicalType.DECIMAL.Scale == 0 {
d.SetInt64(v)
return nil
}
minLen := logicalType.DECIMAL.Scale + 1
if v < 0 {
minLen++
}
val := fmt.Sprintf("%0*d", minLen, v)
dotIndex := len(val) - int(*meta.Scale)
d.SetString(val[:dotIndex]+"."+val[dotIndex:], "")
case parquet.ConvertedType_DATE:
case logicalType.DATE != nil:
dateStr := time.Unix(v*86400, 0).Format("2006-01-02")
d.SetString(dateStr, "")
// convert all timestamp types (datetime/timestamp) to string
case parquet.ConvertedType_TIMESTAMP_MICROS:
dateStr := time.Unix(v/1e6, (v%1e6)*1e3).Format("2006-01-02 15:04:05.999")
d.SetString(dateStr, "")
case parquet.ConvertedType_TIMESTAMP_MILLIS:
dateStr := time.Unix(v/1e3, (v%1e3)*1e6).Format("2006-01-02 15:04:05.999")
d.SetString(dateStr, "")
// covert time types to string
case parquet.ConvertedType_TIME_MILLIS, parquet.ConvertedType_TIME_MICROS:
if *meta.ConvertedType == parquet.ConvertedType_TIME_MICROS {
v /= 1e3
}
millis := v % 1e3
v /= 1e3
sec := v % 60
v /= 60
min := v % 60
v /= 60
d.SetString(fmt.Sprintf("%d:%d:%d.%3d", v, min, sec, millis), "")
case logicalType.TIMESTAMP != nil:
// convert all timestamp types (datetime/timestamp) to string
timeStr := formatTime(v, logicalType.TIMESTAMP.Unit, "2006-01-02 15:04:05.999999",
"2006-01-02 15:04:05.999999Z", logicalType.TIMESTAMP.IsAdjustedToUTC)
d.SetString(timeStr, "")
case logicalType.TIME != nil:
// convert all timestamp types (datetime/timestamp) to string
timeStr := formatTime(v, logicalType.TIME.Unit, "15:04:05.999999", "15:04:05.999999Z",
logicalType.TIME.IsAdjustedToUTC)
d.SetString(timeStr, "")
default:
d.SetInt64(v)
}
return nil
}

func formatTime(v int64, units *parquet.TimeUnit, format, utcFormat string, utc bool) string {
var sec, nsec int64
if units.MICROS != nil {
sec = v / 1e6
nsec = (v % 1e6) * 1e3
} else if units.MILLIS != nil {
sec = v / 1e3
nsec = (v % 1e3) * 1e6
} else {
// nano
sec = v / 1e9
nsec = v % 1e9
}
t := time.Unix(sec, nsec).UTC()
if utc {
return t.Format(utcFormat)
}
return t.Format(format)
}

func (pp *ParquetParser) LastRow() Row {
Expand Down
Loading

0 comments on commit b82128c

Please sign in to comment.