From 763381f8a72bb838bb66d1af3437d3d3aac4cee3 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 25 Jun 2021 16:11:24 +0800 Subject: [PATCH] lightning: fix parquet parser for decimal type (#1272) (#1277) --- pkg/lightning/mydump/examples/test.parquet | Bin 0 -> 2686 bytes pkg/lightning/mydump/parquet_parser.go | 234 +++++++++++++++++--- pkg/lightning/mydump/parquet_parser_test.go | 102 ++++++--- 3 files changed, 276 insertions(+), 60 deletions(-) create mode 100644 pkg/lightning/mydump/examples/test.parquet diff --git a/pkg/lightning/mydump/examples/test.parquet b/pkg/lightning/mydump/examples/test.parquet new file mode 100644 index 0000000000000000000000000000000000000000..3e5c4897897fc0530e9e9368c6dddd7100bb879f GIT binary patch literal 2686 zcmbtWZ%k8H6u<9LXkl-a81LKb(q&B-nxduY-?u^tF!y0*5J#59ZCRAQ_O;m2@~16| zG6w9AlDS1(Nb~~;esIa6iA2m|2#X41qS*jVT!tZWn#@hM4|Q&0+@w4AwXBb|7`io0 zdha>s_dCCH?>#52ec@FrMmUIHGhqf6d03_8RCy@z;Z#&|#Uf({D>$D$ph|?~cs*{n z`-^(93mJ^xGRNN>a%}4Jzke0ISKG_*5Q7z1hg~)BqgVk0!2Xv$c0Y|yzfuOD&<+)JA59z%+K<69`=o z{Ww?-PLRvtsv6L|qQDb=x4XIRtEuLy`!_mK+u4o*h6E$L0hgzW)VOua>KF^E$|N;mg>6&gB)|?pXEwM9h8Q zUMH&DIQ#BuxwZ;pkn4vtFRmdM!fm*hN*8@P|6!l&*wy|RBJUyX zG7vQrs5$(`$ia(u4EN&kc*94NEu)NR+5d@oZP}hnuTyfUBtW77ZUJ# zc+*8RM+d~jLa|E-S`k)5f6^S;*oq23q!F+rOtebd!sRXTDloIWwZpa$gExVvMaME) z4U>W3oswY-iQpsyFF{<0Wi(n%zd@zN3XM~R@-##fRp{3D+W)KB6 z8@{6wlnS7fs*^~+rr3Qn_<%Ki$dFN(ZbcdsnQXuc)Z_=q=X11JozyFxmPG!LV#g@( zmCHtpY`N<6k;^`ktA*=ws)%ejc8Rx#V!x*hM{>*mHA*rFi{;7THW@sYC&6XYAp?90 z?je=kE`v|0B)F{MGr)%mQpW<1K!VRw?9Y^8sDQvT_q)9O9!AdKlo-Zo_B3tiVF<%o znpe&@s+RNd5?>e1&Qpd_HQ{?))5&<)SvAYRL*TfkaJ^Y*7NCpFQbU<{`NEMvQzMsa zGds*Y;)>HEA}v8P^j=G=FKP}q?KX#eQ9%@<0(V={u8;0%_UX&?k!ZNp8`Yci{(vti zM)YNGwd)&&5OD=WFek%;O^x-iYitb$1y2yX0P~sj%Z&Q6_KptIKS 127 + if negative { + for i := 0; i < len(rawBytes); i++ { + rawBytes[i] = ^rawBytes[i] + } + for i := len(rawBytes) - 1; i >= 0; i-- { + rawBytes[i] += 1 + if rawBytes[i] != 0 { + break + } + } + } + + intValue := big.NewInt(0) + intValue = intValue.SetBytes(rawBytes) + val := fmt.Sprintf("%0*d", scale, intValue) + dotIndex := len(val) - scale + var res strings.Builder + if negative { + res.WriteByte('-') + } + if dotIndex == 0 { + res.WriteByte('0') + } else { + res.WriteString(val[:dotIndex]) + } + if scale > 0 { + res.WriteByte('.') + res.WriteString(val[dotIndex:]) + } + return res.String() } // when the value type is int32/int64, convert to value to target logical type in tidb -func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) { - if meta.ConvertedType == nil { +func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) error { + if meta.ConvertedType == nil && meta.LogicalType == nil { d.SetInt64(v) - return + return nil } - switch *meta.ConvertedType { - // decimal - case parquet.ConvertedType_DECIMAL: - minLen := *meta.Scale + 1 + + logicalType := meta.LogicalType + switch { + case logicalType.DECIMAL != nil: + if logicalType.DECIMAL.Scale == 0 { + d.SetInt64(v) + return nil + } + minLen := logicalType.DECIMAL.Scale + 1 if v < 0 { minLen++ } val := fmt.Sprintf("%0*d", minLen, v) dotIndex := len(val) - int(*meta.Scale) d.SetString(val[:dotIndex]+"."+val[dotIndex:], "") - case parquet.ConvertedType_DATE: + case logicalType.DATE != nil: dateStr := time.Unix(v*86400, 0).Format("2006-01-02") d.SetString(dateStr, "") - // convert all timestamp types (datetime/timestamp) to string - case parquet.ConvertedType_TIMESTAMP_MICROS: - dateStr := time.Unix(v/1e6, (v%1e6)*1e3).Format("2006-01-02 15:04:05.999") - d.SetString(dateStr, "") - case parquet.ConvertedType_TIMESTAMP_MILLIS: - dateStr := time.Unix(v/1e3, (v%1e3)*1e6).Format("2006-01-02 15:04:05.999") - d.SetString(dateStr, "") - // covert time types to string - case parquet.ConvertedType_TIME_MILLIS, parquet.ConvertedType_TIME_MICROS: - if *meta.ConvertedType == parquet.ConvertedType_TIME_MICROS { - v /= 1e3 - } - millis := v % 1e3 - v /= 1e3 - sec := v % 60 - v /= 60 - min := v % 60 - v /= 60 - d.SetString(fmt.Sprintf("%d:%d:%d.%3d", v, min, sec, millis), "") + case logicalType.TIMESTAMP != nil: + // convert all timestamp types (datetime/timestamp) to string + timeStr := formatTime(v, logicalType.TIMESTAMP.Unit, "2006-01-02 15:04:05.999999", + "2006-01-02 15:04:05.999999Z", logicalType.TIMESTAMP.IsAdjustedToUTC) + d.SetString(timeStr, "") + case logicalType.TIME != nil: + // convert all timestamp types (datetime/timestamp) to string + timeStr := formatTime(v, logicalType.TIME.Unit, "15:04:05.999999", "15:04:05.999999Z", + logicalType.TIME.IsAdjustedToUTC) + d.SetString(timeStr, "") default: d.SetInt64(v) } + return nil +} + +func formatTime(v int64, units *parquet.TimeUnit, format, utcFormat string, utc bool) string { + var sec, nsec int64 + if units.MICROS != nil { + sec = v / 1e6 + nsec = (v % 1e6) * 1e3 + } else if units.MILLIS != nil { + sec = v / 1e3 + nsec = (v % 1e3) * 1e6 + } else { + // nano + sec = v / 1e9 + nsec = v % 1e9 + } + t := time.Unix(sec, nsec).UTC() + if utc { + return t.Format(utcFormat) + } + return t.Format(format) } func (pp *ParquetParser) LastRow() Row { diff --git a/pkg/lightning/mydump/parquet_parser_test.go b/pkg/lightning/mydump/parquet_parser_test.go index d86136a65..b6f280b9c 100644 --- a/pkg/lightning/mydump/parquet_parser_test.go +++ b/pkg/lightning/mydump/parquet_parser_test.go @@ -97,12 +97,10 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { TimestampMillis int64 `parquet:"name=timestampmillis, type=TIMESTAMP_MILLIS"` TimestampMicros int64 `parquet:"name=timestampmicros, type=TIMESTAMP_MICROS"` - Decimal1 int32 `parquet:"name=decimal1, type=DECIMAL, scale=2, precision=9, basetype=INT32"` - Decimal2 int32 `parquet:"name=decimal2, type=DECIMAL, scale=4, precision=4, basetype=INT32"` - Decimal3 int64 `parquet:"name=decimal3, type=DECIMAL, scale=2, precision=18, basetype=INT64"` - Decimal4 string `parquet:"name=decimal4, type=DECIMAL, scale=2, precision=10, basetype=FIXED_LEN_BYTE_ARRAY, length=12"` - Decimal5 string `parquet:"name=decimal5, type=DECIMAL, scale=2, precision=20, basetype=BYTE_ARRAY"` - Decimal6 int32 `parquet:"name=decimal6, type=DECIMAL, scale=4, precision=4, basetype=INT32"` + Decimal1 int32 `parquet:"name=decimal1, type=DECIMAL, scale=2, precision=9, basetype=INT32"` + Decimal2 int32 `parquet:"name=decimal2, type=DECIMAL, scale=4, precision=4, basetype=INT32"` + Decimal3 int64 `parquet:"name=decimal3, type=DECIMAL, scale=2, precision=18, basetype=INT64"` + Decimal6 int32 `parquet:"name=decimal6, type=DECIMAL, scale=4, precision=4, basetype=INT32"` } dir := c.MkDir() @@ -118,15 +116,13 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { v := &Test{ Date: 18564, // 2020-10-29 TimeMillis: 62775123, // 17:26:15.123 (note all time are in UTC+8!) - TimeMicros: 62775123000, // 17:26:15.123 - TimestampMillis: 1603963672356, // 2020-10-29T17:27:52.356 - TimestampMicros: 1603963672356956, // 2020-10-29T17:27:52.356956 + TimeMicros: 62775123456, // 17:26:15.123 + TimestampMillis: 1603963672356, // 2020-10-29T09:27:52.356Z + TimestampMicros: 1603963672356956, // 2020-10-29T09:27:52.356956Z Decimal1: -12345678, // -123456.78 Decimal2: 456, // 0.0456 Decimal3: 123456789012345678, // 1234567890123456.78 - Decimal4: "-12345678.09", - Decimal5: "-1234567890123456.78", - Decimal6: -1, // -0.0001 + Decimal6: -1, // -0.0001 } c.Assert(writer.Write(v), IsNil) c.Assert(writer.WriteStop(), IsNil) @@ -140,22 +136,19 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { c.Assert(err, IsNil) defer reader.Close() - c.Assert(len(reader.columns), Equals, 11) + c.Assert(len(reader.columns), Equals, 9) c.Assert(reader.ReadRow(), IsNil) - c.Assert(reader.lastRow.Row, DeepEquals, []types.Datum{ - types.NewCollationStringDatum("2020-10-29", "", 0), - types.NewCollationStringDatum("17:26:15.123", "", 0), - types.NewCollationStringDatum("17:26:15.123", "", 0), - types.NewCollationStringDatum("2020-10-29 17:27:52.356", "", 0), - types.NewCollationStringDatum("2020-10-29 17:27:52.356", "", 0), - types.NewCollationStringDatum("-123456.78", "", 0), - types.NewCollationStringDatum("0.0456", "", 0), - types.NewCollationStringDatum("1234567890123456.78", "", 0), - types.NewCollationStringDatum("-12345678.09", "", 0), - types.NewCollationStringDatum("-1234567890123456.78", "", 0), - types.NewCollationStringDatum("-0.0001", "", 0), - }) + rowValue := []string{ + "2020-10-29", "17:26:15.123Z", "17:26:15.123456Z", "2020-10-29 09:27:52.356Z", "2020-10-29 09:27:52.356956Z", + "-123456.78", "0.0456", "1234567890123456.78", "-0.0001", + } + row := reader.lastRow.Row + c.Assert(len(rowValue), Equals, len(row)) + for i := 0; i < len(row); i++ { + c.Assert(row[i].Kind(), Equals, types.KindString) + c.Assert(rowValue[i], Equals, row[i].GetString()) + } type TestDecimal struct { Decimal1 int32 `parquet:"name=decimal1, type=DECIMAL, scale=3, precision=5, basetype=INT32"` @@ -215,3 +208,60 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) { } } } + +func (s testParquetParserSuite) TestParquetAurora(c *C) { + store, err := storage.NewLocalStorage("examples") + c.Assert(err, IsNil) + + fileName := "test.parquet" + r, err := store.Open(context.TODO(), fileName) + c.Assert(err, IsNil) + parser, err := NewParquetParser(context.TODO(), store, r, fileName) + c.Assert(err, IsNil) + + c.Assert(parser.Columns(), DeepEquals, []string{"id", "val1", "val2", "d1", "d2", "d3", "d4", "d5", "d6"}) + + expectedRes := [][]interface{}{ + {int64(1), int64(1), "0", int64(123), "1.23", "0.00000001", "1234567890", "123", "1.23000000"}, + { + int64(2), int64(123456), "0", int64(123456), "9999.99", "0.12345678", "99999999999999999999", + "999999999999999999999999999999999999", "99999999999999999999.99999999", + }, + { + int64(3), int64(123456), "0", int64(-123456), "-9999.99", "-0.12340000", "-99999999999999999999", + "-999999999999999999999999999999999999", "-99999999999999999999.99999999", + }, + { + int64(4), int64(1), "0", int64(123), "1.23", "0.00000001", "1234567890", "123", "1.23000000", + }, + { + int64(5), int64(123456), "0", int64(123456), "9999.99", "0.12345678", "12345678901234567890", + "123456789012345678901234567890123456", "99999999999999999999.99999999", + }, + { + int64(6), int64(123456), "0", int64(-123456), "-9999.99", "-0.12340000", + "-12345678901234567890", "-123456789012345678901234567890123456", + "-99999999999999999999.99999999", + }, + } + + for i := 0; i < len(expectedRes); i++ { + err = parser.ReadRow() + c.Assert(err, IsNil) + expectedValues := expectedRes[i] + row := parser.LastRow().Row + c.Assert(len(expectedValues), Equals, len(row)) + for j := 0; j < len(row); j++ { + switch v := expectedValues[j].(type) { + case int64: + c.Assert(v, Equals, row[j].GetInt64()) + case string: + c.Assert(v, Equals, row[j].GetString()) + default: + c.Error("unexpected value: ", expectedValues[j]) + } + } + } + + c.Assert(parser.ReadRow(), Equals, io.EOF) +}