Skip to content

Commit

Permalink
Added geometry and gemometry array data type support
Browse files Browse the repository at this point in the history
  • Loading branch information
noctarius committed Aug 24, 2023
1 parent 1eea4b4 commit fbee5d3
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 22 deletions.
31 changes: 29 additions & 2 deletions internal/typemanager/builtin_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package typemanager

import (
"encoding/base64"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
Expand All @@ -26,6 +28,7 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/noctarius/timescaledb-event-streamer/spi/pgtypes"
"github.com/samber/lo"
"github.com/twpayne/go-geom/encoding/wkb"
"math"
"math/big"
"net"
Expand Down Expand Up @@ -96,12 +99,36 @@ func reflectiveArrayConverter(
}
}

func geometry2struct(
_ uint32, value any,
) (any, error) {

if v, ok := value.(pgtypes.Geometry); ok {
if !v.Valid {
return nil, nil
}

b, err := wkb.Marshal(v.Geometry, binary.BigEndian)
if err != nil {
return nil, err
}

val := base64.StdEncoding.EncodeToString(b)
srid := v.Geometry.SRID()

return map[string]any{
"wkb": val,
"srid": srid,
}, nil
}
return nil, errIllegalValue
}

func enum2string(
_ uint32, value any,
) (any, error) {

switch v := value.(type) {
case string:
if v, ok := value.(string); ok {
return v, nil
}
return nil, errIllegalValue
Expand Down
27 changes: 18 additions & 9 deletions internal/typemanager/coretypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ var coreTypeMap = map[uint32]typeRegistration{
schemaType: schema.ARRAY,
oidElement: pgtype.BoxOID,
converter: arrayConverter[[]string](pgtype.BoxOID, box2string),
codecFactory: func(typeMap *pgtype.Map) pgtype.Codec {
codecFactory: func(typeMap *pgtype.Map, typ pgtypes.PgType) pgtype.Codec {
if pt, present := typeMap.TypeForOID(pgtype.BoxOID); present {
return &pgtypes.BoxArrayCodec{
PgxArrayCodec: &pgtype.ArrayCodec{ElementType: pt},
Expand Down Expand Up @@ -436,16 +436,25 @@ var coreTypeMap = map[uint32]typeRegistration{

var optimizedTypes = map[string]typeRegistration{
"geometry": {
schemaType: schema.STRING,
codec: pgtypes.GeometryCodec{},
converter: func(oid uint32, value any) (any, error) {
return value, nil
},
schemaType: schema.STRING,
codec: pgtypes.GeometryCodec{},
converter: geometry2struct,
schemaBuilder: schema.Geography(),
},
/*"_geometry": {
"_geometry": {
schemaType: schema.ARRAY,
isArray: true,
},*/
codecFactory: func(typeMap *pgtype.Map, typ pgtypes.PgType) pgtype.Codec {
if pt, present := typeMap.TypeForOID(typ.OidElement()); present {
return &pgtypes.GeometryArrayCodec{
PgxArrayCodec: &pgtype.ArrayCodec{ElementType: pt},
}
}
return nil
},
converterFactory: func(typeMap *pgtype.Map, typ pgtypes.PgType) pgtypes.TypeConverter {
return arrayConverter[[]map[string]any](typ.OidElement(), geometry2struct)
},
},
"ltree": {
schemaType: schema.STRING,
schemaBuilder: schema.Ltree(),
Expand Down
36 changes: 25 additions & 11 deletions internal/typemanager/typemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ var (
mapType = reflect.TypeOf(map[string]any{})
)

type ttypeFactory func(typeMap *pgtype.Map, typ pgtypes.PgType) *pgtype.Type
type typeFactory func(typeMap *pgtype.Map, typ pgtypes.PgType) *pgtype.Type

type codecFactory func(typeMap *pgtype.Map) pgtype.Codec
type codecFactory func(typeMap *pgtype.Map, typ pgtypes.PgType) pgtype.Codec

type converterFactory func(typeMap *pgtype.Map, typ pgtypes.PgType) pgtypes.TypeConverter

type typeRegistration struct {
schemaType schema.Type
Expand All @@ -55,8 +57,9 @@ type typeRegistration struct {
oidElement uint32
converter pgtypes.TypeConverter
codec pgtype.Codec
converterFactory converterFactory
codecFactory codecFactory
typeFactory ttypeFactory
typeFactory typeFactory
overrideExistingCodec bool
}

Expand Down Expand Up @@ -174,6 +177,13 @@ func (tm *typeManager) ResolveTypeConverter(
return registration.converter, nil
}
if registration, present := tm.optimizedConverterCache.Get(oid); present {
if registration.converterFactory != nil {
typ, err := tm.ResolveDataType(oid)
if err != nil {
return nil, err
}
return registration.converterFactory(tm.typeMap, typ), nil
}
return registration.converter, nil
}
if registration, present := tm.dynamicConverterCache.Get(oid); present {
Expand Down Expand Up @@ -480,13 +490,14 @@ func (tm *typeManager) registerType(
}

optimizedConverterSetter(typ.Oid(), typeRegistration{
schemaType: registration.schemaType,
schemaBuilder: registration.schemaBuilder,
isArray: registration.isArray,
oidElement: typ.OidElement(),
converter: converter,
codec: registration.codec,
codecFactory: registration.codecFactory,
schemaType: registration.schemaType,
schemaBuilder: registration.schemaBuilder,
isArray: registration.isArray,
oidElement: typ.OidElement(),
converter: converter,
converterFactory: registration.converterFactory,
codec: registration.codec,
codecFactory: registration.codecFactory,
})

if err := tm.registerTypeInTypeMap(typ, registration); err != nil {
Expand Down Expand Up @@ -542,7 +553,7 @@ func (tm *typeManager) registerTypeInTypeMap(
if registration.codec != nil || registration.codecFactory != nil {
codec := registration.codec
if registration.codecFactory != nil {
codec = registration.codecFactory(tm.typeMap)
codec = registration.codecFactory(tm.typeMap, typ)
}

if codec == nil {
Expand Down Expand Up @@ -587,5 +598,8 @@ func (tm *typeManager) resolveOptimizedTypeConverter(
}
return lazyConverter.convert
}
if registration.converterFactory != nil {
return registration.converterFactory(tm.typeMap, typ)
}
return registration.converter
}
119 changes: 119 additions & 0 deletions spi/pgtypes/geometryarray.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package pgtypes

import (
"database/sql/driver"
"github.com/jackc/pgx/v5/pgtype"
"strings"
)

// GeometryArrayCodec is a wrapper codec for geometry[] which isn't handled gracefully in
// the pgx base source code (at least in PG < 14 without binary wire protocol
// support) due to the way the text protocol sends back the value of a geometry item.
type GeometryArrayCodec struct {
PgxArrayCodec *pgtype.ArrayCodec
}

func (c *GeometryArrayCodec) FormatSupported(
format int16,
) bool {

if format == pgtype.TextFormatCode {
return true
}
return c.PgxArrayCodec.FormatSupported(format)
}

func (c *GeometryArrayCodec) PreferredFormat() int16 {
return c.PgxArrayCodec.PreferredFormat()
}

func (c *GeometryArrayCodec) PlanEncode(
m *pgtype.Map, oid uint32, format int16, value any,
) pgtype.EncodePlan {

return c.PgxArrayCodec.PlanEncode(m, oid, format, value)
}

func (c *GeometryArrayCodec) PlanScan(
m *pgtype.Map, oid uint32, format int16, target any,
) pgtype.ScanPlan {

if format == pgtype.TextFormatCode {
return &scanPlanGeometryArrayTextCodec{
typeMap: m,
oid: oid,
}
}

return c.PgxArrayCodec.PlanScan(m, oid, format, target)
}

func (c *GeometryArrayCodec) DecodeDatabaseSQLValue(
m *pgtype.Map, oid uint32, format int16, src []byte,
) (driver.Value, error) {

return c.PgxArrayCodec.DecodeDatabaseSQLValue(m, oid, format, src)
}

func (c *GeometryArrayCodec) DecodeValue(
m *pgtype.Map, oid uint32, format int16, src []byte,
) (any, error) {

if src == nil {
return nil, nil
}

if format == pgtype.TextFormatCode {
var slice []Geometry
err := m.PlanScan(oid, format, &slice).Scan(src, &slice)
return slice, err
}

return c.PgxArrayCodec.DecodeValue(m, oid, format, src)
}

type scanPlanGeometryArrayTextCodec struct {
geometryCodec GeometryCodec
typeMap *pgtype.Map
oid uint32
}

func (spbac *scanPlanGeometryArrayTextCodec) Scan(
src []byte, dst any,
) error {

array := dst.(*[]Geometry)

scanPlan := spbac.geometryCodec.PlanScan(spbac.typeMap, spbac.oid, pgtype.TextFormatCode, &Geometry{})
if scanPlan == nil {
scanPlan = spbac.typeMap.PlanScan(spbac.oid, pgtype.TextFormatCode, &Geometry{})
}

// Semicolon seems to be the separator here
elements := strings.Split(string(src[1:len(src)-1]), ":")
for _, element := range elements {
item := Geometry{}
if err := scanPlan.Scan([]byte(element), &item); err != nil {
return err
}
*array = append(*array, item)
}
return nil
}
6 changes: 6 additions & 0 deletions spi/schema/schemabuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ func HStore() Builder {
return Map().KeySchema(String()).ValueSchema(String().Optional())
}

func Geography() Builder {
return NewSchemaBuilder(STRUCT).
Field("wkb", 0, String()).
Field("srid", 1, Int32())
}

type fieldImpl struct {
name string
index int
Expand Down
31 changes: 31 additions & 0 deletions tests/datatype_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,37 @@ var dataTypeTable = []DataTypeTest{
},
expected: quickCheckValue[[]map[string]any],
},
{
name: "Geometry",
pgTypeName: "geometry",
schemaType: schema.STRUCT,
value: "'010100000000000000000024C000000000000034C0'::geometry",
insertPlain: true,
expectedValueOverride: map[string]any{
"wkb": "AAAAAAHAJAAAAAAAAMA0AAAAAAAA",
"srid": float64(0),
},
expected: quickCheckValue[map[string]any],
},
{
name: "Geometry Array",
pgTypeName: "geometry[]",
schemaType: schema.ARRAY,
elementSchemaType: schema.STRUCT,
value: "array['010100000000000000000024C000000000000034C0','010100000000000000000000000000000000000000']::geometry[]",
insertPlain: true,
expectedValueOverride: []map[string]any{
{
"wkb": "AAAAAAHAJAAAAAAAAMA0AAAAAAAA",
"srid": float64(0),
},
{
"wkb": "AAAAAAEAAAAAAAAAAAAAAAAAAAAA",
"srid": float64(0),
},
},
expected: quickCheckValue[[]map[string]any],
},
}

const lookupTypeOidQuery = "SELECT oid FROM pg_catalog.pg_type where typname = $1"
Expand Down

0 comments on commit fbee5d3

Please sign in to comment.