forked from tunein/go-avro
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathschema_prepared.go
156 lines (138 loc) · 3.97 KB
/
schema_prepared.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package avro
import (
"fmt"
"reflect"
"sync"
)
/*
Prepare optimizes a schema for decoding/encoding.
It makes a recursive copy of the schema given and returns an immutable
wrapper of the schema with some optimizations applied.
*/
func Prepare(schema Schema) Schema {
return PrepareResolving(schema, schema)
}
// PrepareResolving handles reading where the writer and reader schemas
// differ but are compatible. Support is currently limited to adding or
// removing record fields.
func PrepareResolving(writer, reader Schema) Schema {
job := prepareJob{
seen: make(map[Schema]Schema),
}
return job.prepare(writer, reader)
}
type prepareJob struct {
// the seen struct prevents infinite recursion by caching conversions.
seen map[Schema]Schema
}
func (job *prepareJob) prepare(writer, reader Schema) Schema {
output := writer
switch writer := writer.(type) {
case *RecordSchema:
output = job.prepareRecordSchema(writer, reader.(*RecordSchema))
case *RecursiveSchema:
if seen := job.seen[writer.Actual]; seen != nil {
return seen
} else {
return job.prepare(writer.Actual, writer.Actual)
}
case *UnionSchema:
output = job.prepareUnionSchema(writer)
case *ArraySchema:
output = job.prepareArraySchema(writer)
default:
return writer
}
job.seen[writer] = output
return output
}
func (job *prepareJob) prepareUnionSchema(input *UnionSchema) Schema {
output := &UnionSchema{
Types: make([]Schema, len(input.Types)),
}
for i, t := range input.Types {
output.Types[i] = job.prepare(t, t)
}
return output
}
func (job *prepareJob) prepareArraySchema(input *ArraySchema) Schema {
return &ArraySchema{
Properties: input.Properties,
Items: job.prepare(input.Items, input.Items),
}
}
func (job *prepareJob) prepareMapSchema(input *MapSchema) Schema {
return &MapSchema{
Properties: input.Properties,
Values: job.prepare(input.Values, input.Values),
}
}
func (job *prepareJob) prepareRecordSchema(writer, reader *RecordSchema) *preparedRecordSchema {
output := &preparedRecordSchema{
RecordSchema: *writer,
ReaderSchema: reader,
pool: sync.Pool{New: func() interface{} { return make(map[reflect.Type]*recordPlan) }},
}
output.Fields = nil
for _, field := range writer.Fields {
output.Fields = append(output.Fields, &SchemaField{
Name: field.Name,
Doc: field.Doc,
Default: field.Default,
Type: job.prepare(field.Type, field.Type),
})
}
return output
}
type preparedRecordSchema struct {
RecordSchema // WriterSchema
ReaderSchema *RecordSchema
pool sync.Pool
}
func (rs *preparedRecordSchema) getPlan(t reflect.Type) (plan *recordPlan, err error) {
cache := rs.pool.Get().(map[reflect.Type]*recordPlan)
if plan = cache[t]; plan != nil {
rs.pool.Put(cache)
return
}
// Use the reflectmap to get field info.
ri := reflectEnsureRi(t)
readerFieldNames := make(map[string]struct{})
for _, schemaField := range rs.ReaderSchema.Fields {
readerFieldNames[schemaField.Name] = struct{}{}
}
decodePlan := make([]structFieldPlan, len(rs.Fields))
for i, schemafield := range rs.Fields {
_, readerHasField := readerFieldNames[schemafield.Name]
index, ok := ri.names[schemafield.Name]
if !ok && readerHasField {
err = fmt.Errorf("Type %v does not have field %s required for decoding schema", t, schemafield.Name)
}
entry := &decodePlan[i]
entry.schema = schemafield.Type
entry.name = schemafield.Name
entry.index = index
entry.dec = specificDecoder(entry)
}
plan = &recordPlan{
// Over time, we will create decode/encode plans for more things.
decodePlan: decodePlan,
}
cache[t] = plan
rs.pool.Put(cache)
return
}
// This is used
var sdr sDatumReader
type recordPlan struct {
decodePlan []structFieldPlan
}
// For right now, until we implement more optimizations,
// we have a lot of cases we want a *RecordSchema. This makes it a bit easier to deal with.
func assertRecordSchema(s Schema) *RecordSchema {
rs, ok := s.(*RecordSchema)
if !ok {
rs = &s.(*preparedRecordSchema).RecordSchema
}
return rs
}