Skip to content

Commit

Permalink
Implemented schema type resolution against the new TypeManager and Ty…
Browse files Browse the repository at this point in the history
…pe classes
  • Loading branch information
noctarius committed Jul 11, 2023
1 parent 4b16131 commit 9dab01a
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 129 deletions.
136 changes: 136 additions & 0 deletions spi/pgtypes/datatypes/builtin_converters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package datatypes

import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/hashicorp/go-uuid"
"github.com/jackc/pgx/v5/pgtype"
"github.com/noctarius/timescaledb-event-streamer/spi/schema/schemamodel"
"net"
"net/netip"
"time"
)

func char2text(_ uint32, value any) (any, error) {
if v, ok := value.(int32); ok {
return string(v), nil
}
return nil, ErrIllegalValue
}

func timestamp2text(oid uint32, value any) (any, error) {
if v, ok := value.(time.Time); ok {
switch oid {
case pgtype.DateOID:
return v.Format(time.DateOnly), nil
default:
return v.In(time.UTC).String(), nil
}
}
return nil, ErrIllegalValue
}

func time2text(_ uint32, value any) (any, error) {
if v, ok := value.(pgtype.Time); ok {
remaining := int64(time.Microsecond) * v.Microseconds
hours := remaining / int64(time.Hour)
remaining = remaining % int64(time.Hour)
minutes := remaining / int64(time.Minute)
remaining = remaining % int64(time.Minute)
seconds := remaining / int64(time.Second)
remaining = remaining % int64(time.Second)
return fmt.Sprintf(
"%02d:%02d:%02d.%06d", hours, minutes, seconds,
(time.Nanosecond * time.Duration(remaining)).Microseconds(),
), nil
}
return nil, ErrIllegalValue
}

func timestamp2int64(_ uint32, value any) (any, error) {
if v, ok := value.(time.Time); ok {
return v.UnixMilli(), nil
}
return nil, ErrIllegalValue
}

/*func bit2bool(_ uint32, value any) (any, error) {
if v, ok := value.(pgtype.Bits); ok {
return v.Bytes[0]&0xF0 == 128, nil
}
return nil, ErrIllegalValue
}
func bits2bytes(_ uint32, value any) (any, error) {
if v, ok := value.(pgtype.Bits); ok {
return v.Bytes, nil
}
return nil, ErrIllegalValue
}*/

func json2text(_ uint32, value any) (any, error) {
if v, ok := value.(map[string]any); ok {
d, err := json.Marshal(v)
if err != nil {
return nil, err
}
return string(d), nil
}
return nil, ErrIllegalValue
}

func uuid2text(_ uint32, value any) (any, error) {
if v, ok := value.(pgtype.UUID); ok {
u, err := uuid.FormatUUID(v.Bytes[:])
if err != nil {
return nil, err
}
return u, nil
} else if v, ok := value.([16]byte); ok {
u, err := uuid.FormatUUID(v[:])
if err != nil {
return nil, err
}
return u, nil
}
return nil, ErrIllegalValue
}

func uint322int64(_ uint32, value any) (any, error) {
if v, ok := value.(uint32); ok {
return int64(v), nil
}
return nil, ErrIllegalValue
}

func macaddr2text(_ uint32, value any) (any, error) {
if v, ok := value.(net.HardwareAddr); ok {
return v.String(), nil
}
return nil, ErrIllegalValue
}

func addr2text(_ uint32, value any) (any, error) {
if v, ok := value.(netip.Prefix); ok {
return v.String(), nil
}
return nil, ErrIllegalValue
}

func interval2int64(_ uint32, value any) (any, error) {
if v, ok := value.(pgtype.Interval); ok {
return v.Microseconds, nil
}
return nil, ErrIllegalValue
}

func numeric2variableScaleDecimal(_ uint32, value any) (any, error) {
if v, ok := value.(pgtype.Numeric); ok {
return schemamodel.Struct{
"value": hex.EncodeToString(v.Int.Bytes()),
"scale": v.Exp,
}, nil
}
return nil, ErrIllegalValue
}
12 changes: 2 additions & 10 deletions spi/pgtypes/datatypes/pgtype.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,8 @@ func (t Type) Equal(other Type) bool {
stringArrayEqual(t.enumValues, other.enumValues)
}

func getSchemaType(oid uint32, arrayType bool, typType TypeType) schemamodel.SchemaType {
if coreType, present := coreTypes[oid]; present {
return coreType
}
if arrayType {
return schemamodel.ARRAY
} else if typType == EnumType {
return schemamodel.STRING
}
return schemamodel.STRUCT
func (t Type) resolveSchemaBuilder(oid uint32, modifier int) schemamodel.SchemaBuilder {
return nil //FIXME
}

func stringArrayEqual(this, that []string) bool {
Expand Down
161 changes: 43 additions & 118 deletions spi/pgtypes/datatypes/typemanager.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
package datatypes

import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/go-errors/errors"
"github.com/hashicorp/go-uuid"
"github.com/jackc/pgx/v5/pgtype"
"github.com/noctarius/timescaledb-event-streamer/internal/supporting"
"github.com/noctarius/timescaledb-event-streamer/internal/supporting/logging"
"github.com/noctarius/timescaledb-event-streamer/spi/schema/schemamodel"
"net"
"net/netip"
"sync"
"time"
)

type TypeFactory func(name string, typ TypeType, oid uint32, category TypeCategory, arrayType bool, oidArray uint32,
Expand Down Expand Up @@ -149,11 +143,13 @@ func (tm *TypeManager) typeFactory(name string, typ TypeType, oid uint32, catego
arrayType bool, oidArray uint32, oidElement uint32, recordType bool, parentOid uint32, modifiers int,
enumValues []string, delimiter string) Type {

nType := NewType(name, typ, oid, category, arrayType, oidArray, oidElement,
pgType := NewType(name, typ, oid, category, arrayType, oidArray, oidElement,
recordType, parentOid, modifiers, enumValues, delimiter)

nType.typeManager = tm
return nType
pgType.typeManager = tm
pgType.schemaBuilder = resolveSchemaBuilder(pgType)

return pgType
}

func (tm *TypeManager) DataType(oid uint32) (Type, error) {
Expand All @@ -180,6 +176,7 @@ func (tm *TypeManager) DataType(oid uint32) (Type, error) {
}

func (tm *TypeManager) SchemaBuilder(oid uint32) schemamodel.SchemaBuilder {

//TODO implement me
panic("implement me")
}
Expand All @@ -200,125 +197,53 @@ func (tm *TypeManager) NumKnownTypes() int {
return len(tm.typeCache)
}

func char2text(_ uint32, value any) (any, error) {
if v, ok := value.(int32); ok {
return string(v), nil
func getSchemaType(oid uint32, arrayType bool, typType TypeType) schemamodel.SchemaType {
if coreType, present := coreTypes[oid]; present {
return coreType
}
return nil, ErrIllegalValue
}

func timestamp2text(oid uint32, value any) (any, error) {
if v, ok := value.(time.Time); ok {
switch oid {
case pgtype.DateOID:
return v.Format(time.DateOnly), nil
default:
return v.In(time.UTC).String(), nil
}
if arrayType {
return schemamodel.ARRAY
} else if typType == EnumType {
return schemamodel.STRING
}
return nil, ErrIllegalValue
return schemamodel.STRUCT
}

func time2text(_ uint32, value any) (any, error) {
if v, ok := value.(pgtype.Time); ok {
remaining := int64(time.Microsecond) * v.Microseconds
hours := remaining / int64(time.Hour)
remaining = remaining % int64(time.Hour)
minutes := remaining / int64(time.Minute)
remaining = remaining % int64(time.Minute)
seconds := remaining / int64(time.Second)
remaining = remaining % int64(time.Second)
return fmt.Sprintf(
"%02d:%02d:%02d.%06d", hours, minutes, seconds,
(time.Nanosecond * time.Duration(remaining)).Microseconds(),
), nil
func resolveSchemaBuilder(pgType Type) schemamodel.SchemaBuilder {
switch pgType.schemaType {
case schemamodel.INT8:
return schemamodel.Int8()
case schemamodel.INT16:
return schemamodel.Int16()
case schemamodel.INT32:
return schemamodel.Int32()
case schemamodel.INT64:
return schemamodel.Int64()
case schemamodel.FLOAT32:
return schemamodel.Float32()
case schemamodel.FLOAT64:
return schemamodel.Float64()
case schemamodel.BOOLEAN:
return schemamodel.Boolean()
case schemamodel.STRING:
return schemamodel.String()
default:
return &lazySchemaBuilder{pgType: pgType}
}
return nil, ErrIllegalValue
}

func timestamp2int64(_ uint32, value any) (any, error) {
if v, ok := value.(time.Time); ok {
return v.UnixMilli(), nil
}
return nil, ErrIllegalValue
type lazySchemaBuilder struct {
pgType Type
schemaBuilder schemamodel.SchemaBuilder
}

/*func bit2bool(_ uint32, value any) (any, error) {
if v, ok := value.(pgtype.Bits); ok {
return v.Bytes[0]&0xF0 == 128, nil
}
return nil, ErrIllegalValue
}
func bits2bytes(_ uint32, value any) (any, error) {
if v, ok := value.(pgtype.Bits); ok {
return v.Bytes, nil
}
return nil, ErrIllegalValue
}*/

func json2text(_ uint32, value any) (any, error) {
if v, ok := value.(map[string]any); ok {
d, err := json.Marshal(v)
if err != nil {
return nil, err
}
return string(d), nil
}
return nil, ErrIllegalValue
}

func uuid2text(_ uint32, value any) (any, error) {
if v, ok := value.(pgtype.UUID); ok {
u, err := uuid.FormatUUID(v.Bytes[:])
if err != nil {
return nil, err
}
return u, nil
} else if v, ok := value.([16]byte); ok {
u, err := uuid.FormatUUID(v[:])
if err != nil {
return nil, err
}
return u, nil
}
return nil, ErrIllegalValue
}

func uint322int64(_ uint32, value any) (any, error) {
if v, ok := value.(uint32); ok {
return int64(v), nil
}
return nil, ErrIllegalValue
}

func macaddr2text(_ uint32, value any) (any, error) {
if v, ok := value.(net.HardwareAddr); ok {
return v.String(), nil
}
return nil, ErrIllegalValue
}

func addr2text(_ uint32, value any) (any, error) {
if v, ok := value.(netip.Prefix); ok {
return v.String(), nil
}
return nil, ErrIllegalValue
}

func interval2int64(_ uint32, value any) (any, error) {
if v, ok := value.(pgtype.Interval); ok {
return v.Microseconds, nil
}
return nil, ErrIllegalValue
func (l *lazySchemaBuilder) BaseSchemaType() schemamodel.SchemaType {
return l.pgType.schemaType
}

func numeric2variableScaleDecimal(_ uint32, value any) (any, error) {
if v, ok := value.(pgtype.Numeric); ok {
return schemamodel.Struct{
"value": hex.EncodeToString(v.Int.Bytes()),
"scale": v.Exp,
}, nil
func (l *lazySchemaBuilder) Schema(oid uint32, modifier int) schemamodel.Struct {
if l.schemaBuilder == nil {
l.schemaBuilder = l.pgType.resolveSchemaBuilder(oid, modifier)
}
return nil, ErrIllegalValue
return l.schemaBuilder.Schema(oid, modifier)
}
Loading

0 comments on commit 9dab01a

Please sign in to comment.