Skip to content

Commit

Permalink
feat(bigtable): Add support for new functions (#10582)
Browse files Browse the repository at this point in the history
* feat(bigtable): Add support for additional aggregate functions

* remove merge leftover

* remove merge leftover

* add new types to switch statement

* fix hll name

* fix hll name

* fix hll name

* renamed hll
  • Loading branch information
ron-gal authored Jul 26, 2024
1 parent 6bd2596 commit a49ab59
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 1 deletion.
27 changes: 27 additions & 0 deletions bigtable/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,27 @@ func (sum SumAggregator) fillProto(proto *btapb.Type_Aggregate) {
proto.Aggregator = &btapb.Type_Aggregate_Sum_{Sum: &btapb.Type_Aggregate_Sum{}}
}

// MinAggregator is an aggregation function that finds the minimum between the input and the accumulator.
type MinAggregator struct{}

func (min MinAggregator) fillProto(proto *btapb.Type_Aggregate) {
proto.Aggregator = &btapb.Type_Aggregate_Min_{Min: &btapb.Type_Aggregate_Min{}}
}

// MaxAggregator is an aggregation function that finds the maximum between the input and the accumulator.
type MaxAggregator struct{}

func (max MaxAggregator) fillProto(proto *btapb.Type_Aggregate) {
proto.Aggregator = &btapb.Type_Aggregate_Max_{Max: &btapb.Type_Aggregate_Max{}}
}

// HllppUniqueCountAggregator is an aggregation function that calculates the unique count of inputs and the accumulator.
type HllppUniqueCountAggregator struct{}

func (hll HllppUniqueCountAggregator) fillProto(proto *btapb.Type_Aggregate) {
proto.Aggregator = &btapb.Type_Aggregate_HllppUniqueCount{HllppUniqueCount: &btapb.Type_Aggregate_HyperLogLogPlusPlusUniqueCount{}}
}

type unknownAggregator struct {
wrapped *btapb.Type_Aggregate
}
Expand Down Expand Up @@ -238,6 +259,12 @@ func aggregateProtoToType(agg *btapb.Type_Aggregate) Type {
switch agg.Aggregator.(type) {
case *btapb.Type_Aggregate_Sum_:
aggregator = SumAggregator{}
case *btapb.Type_Aggregate_Min_:
aggregator = MinAggregator{}
case *btapb.Type_Aggregate_Max_:
aggregator = MaxAggregator{}
case *btapb.Type_Aggregate_HllppUniqueCount:
aggregator = HllppUniqueCountAggregator{}
default:
aggregator = unknownAggregator{wrapped: agg}
}
Expand Down
110 changes: 109 additions & 1 deletion bigtable/type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestStringProto(t *testing.T) {
}
}

func TestAggregateProto(t *testing.T) {
func TestSumAggregateProto(t *testing.T) {
want := &btapb.Type{
Kind: &btapb.Type_AggregateType{
AggregateType: &btapb.Type_Aggregate{
Expand Down Expand Up @@ -114,6 +114,114 @@ func TestProtoBijection(t *testing.T) {
}
}

func TestMinAggregateProto(t *testing.T) {
want := &btapb.Type{
Kind: &btapb.Type_AggregateType{
AggregateType: &btapb.Type_Aggregate{
InputType: &btapb.Type{
Kind: &btapb.Type_Int64Type{
Int64Type: &btapb.Type_Int64{
Encoding: &btapb.Type_Int64_Encoding{
Encoding: &btapb.Type_Int64_Encoding_BigEndianBytes_{
BigEndianBytes: &btapb.Type_Int64_Encoding_BigEndianBytes{
BytesType: &btapb.Type_Bytes{
Encoding: &btapb.Type_Bytes_Encoding{
Encoding: &btapb.Type_Bytes_Encoding_Raw_{
Raw: &btapb.Type_Bytes_Encoding_Raw{},
},
},
},
},
},
},
},
},
},
Aggregator: &btapb.Type_Aggregate_Min_{
Min: &btapb.Type_Aggregate_Min{},
},
},
},
}

got := AggregateType{Input: Int64Type{}, Aggregator: MinAggregator{}}.proto()
if !proto.Equal(got, want) {
t.Errorf("got type %v, want: %v", got, want)
}
}

func TestMaxAggregateProto(t *testing.T) {
want := &btapb.Type{
Kind: &btapb.Type_AggregateType{
AggregateType: &btapb.Type_Aggregate{
InputType: &btapb.Type{
Kind: &btapb.Type_Int64Type{
Int64Type: &btapb.Type_Int64{
Encoding: &btapb.Type_Int64_Encoding{
Encoding: &btapb.Type_Int64_Encoding_BigEndianBytes_{
BigEndianBytes: &btapb.Type_Int64_Encoding_BigEndianBytes{
BytesType: &btapb.Type_Bytes{
Encoding: &btapb.Type_Bytes_Encoding{
Encoding: &btapb.Type_Bytes_Encoding_Raw_{
Raw: &btapb.Type_Bytes_Encoding_Raw{},
},
},
},
},
},
},
},
},
},
Aggregator: &btapb.Type_Aggregate_Max_{
Max: &btapb.Type_Aggregate_Max{},
},
},
},
}

got := AggregateType{Input: Int64Type{}, Aggregator: MaxAggregator{}}.proto()
if !proto.Equal(got, want) {
t.Errorf("got type %v, want: %v", got, want)
}
}

func TestHllAggregateProto(t *testing.T) {
want := &btapb.Type{
Kind: &btapb.Type_AggregateType{
AggregateType: &btapb.Type_Aggregate{
InputType: &btapb.Type{
Kind: &btapb.Type_Int64Type{
Int64Type: &btapb.Type_Int64{
Encoding: &btapb.Type_Int64_Encoding{
Encoding: &btapb.Type_Int64_Encoding_BigEndianBytes_{
BigEndianBytes: &btapb.Type_Int64_Encoding_BigEndianBytes{
BytesType: &btapb.Type_Bytes{
Encoding: &btapb.Type_Bytes_Encoding{
Encoding: &btapb.Type_Bytes_Encoding_Raw_{
Raw: &btapb.Type_Bytes_Encoding_Raw{},
},
},
},
},
},
},
},
},
},
Aggregator: &btapb.Type_Aggregate_HllppUniqueCount{
HllppUniqueCount: &btapb.Type_Aggregate_HyperLogLogPlusPlusUniqueCount{},
},
},
},
}

got := AggregateType{Input: Int64Type{}, Aggregator: HllppUniqueCountAggregator{}}.proto()
if !proto.Equal(got, want) {
t.Errorf("got type %v, want: %v", got, want)
}
}

func TestNilChecks(t *testing.T) {
// protoToType
if val, ok := protoToType(nil).(unknown[btapb.Type]); !ok {
Expand Down

0 comments on commit a49ab59

Please sign in to comment.