-
Notifications
You must be signed in to change notification settings - Fork 805
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' into refactor/removing-cross-cluster
- Loading branch information
Showing
37 changed files
with
3,068 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
# MAPQ: Multi-tenant, Auto-partitioned, Persistent Queue | ||
|
||
NOTE: This component is WIP. | ||
|
||
## Overview | ||
|
||
MAPQ is a new queue framework (introduced in June 2024), aiming to unify Cadence's internal task/request queues. The existing implementations for these applications are cumbersome and maintenance-heavy, with significant overlap and limited extensibility. | ||
MAPQ will address the challenges of scalability, throughput, consistency, and ordering guarantees required by these diverse needs. | ||
|
||
|
||
Challenges and Motivation | ||
- History Task Queues: These queues are poorly understood and difficult to maintain, owing to the departure of their original developer and the non-maintainable state of the code. The design struggles with burst loads from timer/child workflow cases, requiring introduction of more granular task types and automated partitioning that the current system cannot support without extensive refactoring. | ||
- Matching Task Lists: These are basic FIFO queues with some advanced features like sticky task lists, zonal isolation groups and partitioning. The most pressing issue is auto partitioning to reduce operational overhead. | ||
- Async Request Queues: Initially integrated with Kafka topics as the request queue. Initial testing faced challenges like complex provisioning, inability to dynamically create topics/register consumers, poor visibility into the requests in the queue and difficult to tweak alerts. Async APIs are already designed with pluggable queue implementation already so swapping Kafka with something else will not be tricky. | ||
|
||
|
||
### Goals | ||
|
||
MAPQ will provide a solution tailored to meet the following goals: | ||
|
||
- Multi-Tenancy: Guarantees fair access to resources for each tenant based on predefined quotas. | ||
- Auto-Partitioning: Dynamically adjusts partitions based on specified fine-grained policies, supporting both random and deterministic message distribution across physically or virtually partitioned queues. | ||
- Burst-Protection: Detects incoming message spikes and mitigates by utilizing dynamic auto-partitioning. | ||
- Skew-Protection: Detects incoming message skews for given partition keys and mitigates by utilizing dynamic auto-partitioning. | ||
- Advanced Partitioning Policies: Executes on a tree-like partitioning policy to support various levels of partition key hierarchies and strategies. | ||
- Persistent: Ensures message durability via pluggable persistent layer. | ||
- Delivery Guarantees: Guarantees at least once delivery. | ||
|
||
|
||
### Problems with Existing Queues in Cadence | ||
|
||
History Queues: | ||
|
||
- Lack of granular partitioning and inextensibility of history queues make it difficult to address following pathological scenarios: | ||
- Task prioritization: Background tasks like workflow deletion timer tasks share the same queue and consume from the same “processing budget” as other high priority tasks such as user created timers. This is because all timer tasks for a given shard are managed by a single queue. | ||
- Multi tenancy: Tasks of the same type (e.g. all timers) are managed by a single queue and a noisy domain can drastically regress the experience of other domains. It is not possible to write tasks of a specific domain(s) to a separate queue and adjust read/write qps. Current queue granularity ends at task type (timer or transfer). | ||
- Burst cases: Bursts of timers or child workflows are known issues that Cadence has no answers to. These bursts usually cause timeouts and may also impact processing of other domains’ tasks. | ||
|
||
## High Level Design | ||
|
||
MAPQ uses a tree data structure where nodes route incoming messages to child nodes. Nodes can be splitted/merged based on given policies. All leaf nodes are at the same level. Leaf nodes are the actual “queues” where messages are written to/read from via a provided persistent layer plugin. | ||
|
||
The routing key per level, partitioning/departitioning strategy, RPS limits and other options are provided to MAPQ during initialization as a tree-like policy. It contains per level defaults and per-node (identified via path from root) overrides. | ||
|
||
Once initialized the tree will have a minimal number of nodes provided in the policy but it respects policies for not-yet-existing nodes. Since MAPQ supports auto-partitioning there will be new nodes added/removed and it accepts providing policies for such nodes. For example, you might want to partition by domain only for bursty domains and allocate them specific RPS. | ||
|
||
|
||
#### Tree structure with policies | ||
|
||
![MAPQ partitioned queue tree](../../docs/images/mapq_partitioned_queue_tree_example.png) | ||
|
||
|
||
#### Initialization and Object Hierarcy | ||
|
||
![MAPQ initialization](../../docs/images/mapq_initialization.png) | ||
|
||
|
||
#### Enqueue Flow | ||
|
||
![MAPQ enqueue flow](../../docs/images/mapq_enqueue_flow.png) | ||
|
||
|
||
#### Dispatch Flow | ||
|
||
![MAPQ enqueue flow](../../docs/images/mapq_dispatch_flow.png) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
// The MIT License (MIT) | ||
|
||
// Copyright (c) 2017-2020 Uber Technologies Inc. | ||
|
||
// Permission is hereby granted, free of charge, to any person obtaining a copy | ||
// of this software and associated documentation files (the "Software"), to deal | ||
// in the Software without restriction, including without limitation the rights | ||
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
// copies of the Software, and to permit persons to whom the Software is | ||
// furnished to do so, subject to the following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included in all | ||
// copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
// SOFTWARE. | ||
|
||
package mapq | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
|
||
"github.com/uber/cadence/common/log" | ||
"github.com/uber/cadence/common/mapq/tree" | ||
"github.com/uber/cadence/common/mapq/types" | ||
"github.com/uber/cadence/common/metrics" | ||
) | ||
|
||
type clientImpl struct { | ||
logger log.Logger | ||
scope metrics.Scope | ||
persister types.Persister | ||
consumerFactory types.ConsumerFactory | ||
tree *tree.QueueTree | ||
partitions []string | ||
policies []types.NodePolicy | ||
} | ||
|
||
func (c *clientImpl) Start(ctx context.Context) error { | ||
c.logger.Info("Starting MAPQ client") | ||
err := c.tree.Start(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
c.logger.Info("Started MAPQ client") | ||
return nil | ||
} | ||
|
||
func (c *clientImpl) Stop(ctx context.Context) error { | ||
c.logger.Info("Stopping MAPQ client") | ||
|
||
// Stop the tree which will stop the dispatchers | ||
if err := c.tree.Stop(ctx); err != nil { | ||
return fmt.Errorf("failed to stop tree: %w", err) | ||
} | ||
|
||
// stop the consumer factory which will stop the consumers | ||
err := c.consumerFactory.Stop(ctx) | ||
if err != nil { | ||
return fmt.Errorf("failed to stop consumer factory: %w", err) | ||
} | ||
|
||
c.logger.Info("Stopped MAPQ client") | ||
return nil | ||
} | ||
|
||
func (c *clientImpl) Enqueue(ctx context.Context, items []types.Item) ([]types.ItemToPersist, error) { | ||
return c.tree.Enqueue(ctx, items) | ||
} | ||
|
||
func (c *clientImpl) Ack(context.Context, types.Item) error { | ||
return errors.New("not implemented") | ||
} | ||
|
||
func (c *clientImpl) Nack(context.Context, types.Item) error { | ||
return errors.New("not implemented") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
// The MIT License (MIT) | ||
|
||
// Copyright (c) 2017-2020 Uber Technologies Inc. | ||
|
||
// Permission is hereby granted, free of charge, to any person obtaining a copy | ||
// of this software and associated documentation files (the "Software"), to deal | ||
// in the Software without restriction, including without limitation the rights | ||
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
// copies of the Software, and to permit persons to whom the Software is | ||
// furnished to do so, subject to the following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included in all | ||
// copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
// SOFTWARE. | ||
|
||
package mapq | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/golang/mock/gomock" | ||
"go.uber.org/goleak" | ||
|
||
"github.com/uber/cadence/common/log/testlogger" | ||
"github.com/uber/cadence/common/mapq/types" | ||
"github.com/uber/cadence/common/metrics" | ||
) | ||
|
||
func TestNew(t *testing.T) { | ||
ctrl := gomock.NewController(t) | ||
|
||
tests := []struct { | ||
name string | ||
opts []Options | ||
wantErr bool | ||
}{ | ||
{ | ||
name: "success", | ||
opts: []Options{ | ||
WithPersister(types.NewMockPersister(ctrl)), | ||
WithConsumerFactory(types.NewMockConsumerFactory(ctrl)), | ||
}, | ||
}, | ||
{ | ||
name: "no persister", | ||
wantErr: true, | ||
opts: []Options{ | ||
WithConsumerFactory(types.NewMockConsumerFactory(ctrl)), | ||
}, | ||
}, | ||
{ | ||
name: "no consumer factoru", | ||
wantErr: true, | ||
opts: []Options{ | ||
WithPersister(types.NewMockPersister(ctrl)), | ||
}, | ||
}, | ||
} | ||
|
||
for _, tc := range tests { | ||
t.Run(tc.name, func(t *testing.T) { | ||
logger := testlogger.New(t) | ||
scope := metrics.NoopScope(0) | ||
cl, err := New(logger, scope, tc.opts...) | ||
if (err != nil) != tc.wantErr { | ||
t.Errorf("New() error: %v, wantErr: %v", err, tc.wantErr) | ||
} | ||
|
||
if err != nil { | ||
return | ||
} | ||
|
||
_, ok := cl.(*clientImpl) | ||
if !ok { | ||
t.Errorf("New() = %T, want *clientImpl", cl) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestStartStop(t *testing.T) { | ||
defer goleak.VerifyNone(t) | ||
ctrl := gomock.NewController(t) | ||
consumerFactory := types.NewMockConsumerFactory(ctrl) | ||
consumer := types.NewMockConsumer(ctrl) | ||
consumerFactory.EXPECT().Stop(gomock.Any()).Return(nil).Times(1) | ||
consumerFactory.EXPECT().New(gomock.Any()).Return(consumer, nil).Times(1) | ||
opts := []Options{ | ||
WithPersister(types.NewMockPersister(ctrl)), | ||
WithConsumerFactory(consumerFactory), | ||
} | ||
logger := testlogger.New(t) | ||
scope := metrics.NoopScope(0) | ||
cl, err := New(logger, scope, opts...) | ||
if err != nil { | ||
t.Fatalf("New() error: %v", err) | ||
} | ||
|
||
cl.Start(context.Background()) | ||
defer cl.Stop(context.Background()) | ||
} | ||
|
||
func TestAck(t *testing.T) { | ||
ctrl := gomock.NewController(t) | ||
consumerFactory := types.NewMockConsumerFactory(ctrl) | ||
consumer := types.NewMockConsumer(ctrl) | ||
consumerFactory.EXPECT().Stop(gomock.Any()).Return(nil).Times(1) | ||
consumerFactory.EXPECT().New(gomock.Any()).Return(consumer, nil).Times(1) | ||
opts := []Options{ | ||
WithPersister(types.NewMockPersister(ctrl)), | ||
WithConsumerFactory(consumerFactory), | ||
} | ||
logger := testlogger.New(t) | ||
scope := metrics.NoopScope(0) | ||
cl, err := New(logger, scope, opts...) | ||
if err != nil { | ||
t.Fatalf("New() error: %v", err) | ||
} | ||
|
||
cl.Start(context.Background()) | ||
defer cl.Stop(context.Background()) | ||
|
||
err = cl.Ack(context.Background(), nil) | ||
if err == nil || err.Error() != "not implemented" { | ||
t.Errorf("Ack() error: %q, want %q", err, "not implemented") | ||
} | ||
} | ||
|
||
func TestNack(t *testing.T) { | ||
ctrl := gomock.NewController(t) | ||
consumerFactory := types.NewMockConsumerFactory(ctrl) | ||
consumer := types.NewMockConsumer(ctrl) | ||
consumerFactory.EXPECT().Stop(gomock.Any()).Return(nil).Times(1) | ||
consumerFactory.EXPECT().New(gomock.Any()).Return(consumer, nil).Times(1) | ||
opts := []Options{ | ||
WithPersister(types.NewMockPersister(ctrl)), | ||
WithConsumerFactory(consumerFactory), | ||
} | ||
logger := testlogger.New(t) | ||
scope := metrics.NoopScope(0) | ||
cl, err := New(logger, scope, opts...) | ||
if err != nil { | ||
t.Fatalf("New() error: %v", err) | ||
} | ||
|
||
cl.Start(context.Background()) | ||
defer cl.Stop(context.Background()) | ||
|
||
err = cl.Nack(context.Background(), nil) | ||
if err == nil || err.Error() != "not implemented" { | ||
t.Errorf("Ack() error: %q, want %q", err, "not implemented") | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
// The MIT License (MIT) | ||
|
||
// Copyright (c) 2017-2020 Uber Technologies Inc. | ||
|
||
// Permission is hereby granted, free of charge, to any person obtaining a copy | ||
// of this software and associated documentation files (the "Software"), to deal | ||
// in the Software without restriction, including without limitation the rights | ||
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
// copies of the Software, and to permit persons to whom the Software is | ||
// furnished to do so, subject to the following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included in all | ||
// copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
// SOFTWARE. | ||
|
||
package dispatcher | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/uber/cadence/common" | ||
"github.com/uber/cadence/common/mapq/types" | ||
) | ||
|
||
type Dispatcher struct { | ||
consumer types.Consumer | ||
ctx context.Context | ||
cancelCtx context.CancelFunc | ||
wg sync.WaitGroup | ||
} | ||
|
||
func New(c types.Consumer) *Dispatcher { | ||
ctx, cancelCtx := context.WithCancel(context.Background()) | ||
return &Dispatcher{ | ||
consumer: c, | ||
ctx: ctx, | ||
cancelCtx: cancelCtx, | ||
} | ||
} | ||
|
||
func (d *Dispatcher) Start(ctx context.Context) error { | ||
d.wg.Add(1) | ||
go d.run() | ||
return nil | ||
} | ||
|
||
func (d *Dispatcher) Stop(ctx context.Context) error { | ||
d.cancelCtx() | ||
timeout := 10 * time.Second | ||
if dl, ok := ctx.Deadline(); ok { | ||
timeout = time.Until(dl) | ||
} | ||
if !common.AwaitWaitGroup(&d.wg, timeout) { | ||
return fmt.Errorf("failed to stop dispatcher in %v", timeout) | ||
} | ||
return nil | ||
} | ||
|
||
func (d *Dispatcher) run() { | ||
defer d.wg.Done() | ||
// TODO: implement | ||
} |
Oops, something went wrong.