Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Aggregator] Support setting consumer service filters dynamically via topic updates #4297

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
6 changes: 3 additions & 3 deletions src/aggregator/aggregator/handler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ type storagePolicyFilterConfiguration struct {
}

func (c storagePolicyFilterConfiguration) NewConsumerServiceFilter() (services.ServiceID, producer.FilterFunc) {
return c.ServiceID.NewServiceID(), writer.NewStoragePolicyFilter(c.StoragePolicies)
return c.ServiceID.NewServiceID(), writer.NewStoragePolicyFilter(c.StoragePolicies, producer.StaticConfig)
}

type percentageFilterConfiguration struct {
Expand All @@ -225,7 +225,7 @@ type percentageFilterConfiguration struct {
}

func (c percentageFilterConfiguration) NewConsumerServiceFilter() (services.ServiceID, producer.FilterFunc) {
return c.ServiceID.NewServiceID(), filter.NewPercentageFilter(c.Percentage)
return c.ServiceID.NewServiceID(), filter.NewPercentageFilter(c.Percentage, producer.StaticConfig)
}

// ConsumerServiceFilterConfiguration - exported to be able to write unit tests
Expand All @@ -236,7 +236,7 @@ type ConsumerServiceFilterConfiguration struct {

// NewConsumerServiceFilter - exported to be able to write unit tests
func (c ConsumerServiceFilterConfiguration) NewConsumerServiceFilter() (services.ServiceID, producer.FilterFunc) {
return c.ServiceID.NewServiceID(), filter.NewShardSetFilter(c.ShardSet)
return c.ServiceID.NewServiceID(), filter.NewShardSetFilter(c.ShardSet, producer.StaticConfig)
}

// StaticBackendConfiguration configures a static backend as a flush handler.
Expand Down
4 changes: 2 additions & 2 deletions src/aggregator/aggregator/handler/filter/percentageFilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type percentageFilter struct {
}

// NewPercentageFilter creates a filter based on percentage.
func NewPercentageFilter(percentage float64) producer.FilterFunc {
func NewPercentageFilter(percentage float64, configSource producer.FilterFuncConfigSourceType) producer.FilterFunc {
rate := uint32(_maxRate)
if percentage <= 0 {
rate = 0
Expand All @@ -41,7 +41,7 @@ func NewPercentageFilter(percentage float64) producer.FilterFunc {
}

f := percentageFilter{rate: rate}
return f.Filter
return producer.NewFilterFunc(f.Filter, producer.PercentageFilter, configSource)
}

func (f percentageFilter) Filter(_ producer.Message) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ func TestPercentageFilter(t *testing.T) {
defer ctrl.Finish()
mm := producer.NewMockMessage(ctrl)

f0 := NewPercentageFilter(0)
require.False(t, f0(mm))
f0 := NewPercentageFilter(0, producer.StaticConfig)
require.False(t, f0.Function((mm)))

f1 := NewPercentageFilter(1)
require.True(t, f1(mm))
f1 := NewPercentageFilter(1, producer.DynamicConfig)
require.True(t, f1.Function(mm))
}

var filterResult bool
Expand All @@ -55,10 +55,10 @@ BenchmarkPercentageFilter-12 280085259 4.232 ns/op
PASS
*/
func BenchmarkPercentageFilter(b *testing.B) {
f := NewPercentageFilter(0.5)
f := NewPercentageFilter(0.5, producer.StaticConfig)
var r bool
for i := 0; i < b.N; i++ {
r = f(nil)
r = f.Function(nil)
}
filterResult = r
}
4 changes: 2 additions & 2 deletions src/aggregator/aggregator/handler/filter/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ type shardSetFilter struct {
}

// NewShardSetFilter creates a filter based on ShardSet.
func NewShardSetFilter(shardSet sharding.ShardSet) producer.FilterFunc {
func NewShardSetFilter(shardSet sharding.ShardSet, sourceType producer.FilterFuncConfigSourceType) producer.FilterFunc {
f := shardSetFilter{shardSet: shardSet}
return f.Filter
return producer.NewFilterFunc(f.Filter, producer.ShardSetFilter, sourceType)
}

func (f shardSetFilter) Filter(m producer.Message) bool {
Expand Down
10 changes: 5 additions & 5 deletions src/aggregator/aggregator/handler/filter/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ func TestShardSetFilter(t *testing.T) {
defer ctrl.Finish()

ss := sharding.MustParseShardSet("0..511")
f := NewShardSetFilter(ss)
f := NewShardSetFilter(ss, producer.StaticConfig)

mm := producer.NewMockMessage(ctrl)
mm.EXPECT().Shard().Return(uint32(0))
require.True(t, f(mm))
require.True(t, f.Function(mm))
mm.EXPECT().Shard().Return(uint32(10))
require.True(t, f(mm))
require.True(t, f.Function(mm))
mm.EXPECT().Shard().Return(uint32(511))
require.True(t, f(mm))
require.True(t, f.Function(mm))
mm.EXPECT().Shard().Return(uint32(512))
require.False(t, f(mm))
require.False(t, f.Function(mm))
}
9 changes: 7 additions & 2 deletions src/aggregator/aggregator/handler/writer/protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,13 @@ type storagePolicyFilter struct {
}

// NewStoragePolicyFilter creates a new storage policy based filter.
func NewStoragePolicyFilter(acceptedStoragePolicies []policy.StoragePolicy) producer.FilterFunc {
return storagePolicyFilter{acceptedStoragePolicies}.Filter
func NewStoragePolicyFilter(
acceptedStoragePolicies []policy.StoragePolicy,
configSource producer.FilterFuncConfigSourceType) producer.FilterFunc {
return producer.NewFilterFunc(
storagePolicyFilter{acceptedStoragePolicies}.Filter,
producer.StoragePolicyFilter,
configSource)
}

func (f storagePolicyFilter) Filter(m producer.Message) bool {
Expand Down
8 changes: 4 additions & 4 deletions src/aggregator/aggregator/handler/writer/protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ func TestStoragePolicyFilter(t *testing.T) {

m2 := producer.NewMockMessage(nil)

f := NewStoragePolicyFilter([]policy.StoragePolicy{sp2})
f := NewStoragePolicyFilter([]policy.StoragePolicy{sp2}, producer.StaticConfig)

require.True(t, f(m2))
require.False(t, f(newMessage(0, sp1, protobuf.Buffer{})))
require.True(t, f(newMessage(0, sp2, protobuf.Buffer{})))
require.True(t, f.Function(m2))
require.False(t, f.Function(newMessage(0, sp1, protobuf.Buffer{})))
require.True(t, f.Function(newMessage(0, sp2, protobuf.Buffer{})))
}

func TestProtobufWriterWriteClosed(t *testing.T) {
Expand Down
20 changes: 0 additions & 20 deletions src/msg/generated/proto/msgpb/msg.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading