-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathregistration.go
110 lines (100 loc) · 3.55 KB
/
registration.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package logstream
import (
"context"
"time"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)
// controller is the singleton streamController used by the exported interface
// of the logstream package.
var controller = newStreamController()
// streamController provides the primary functionality of the logstream package, which is built
// to route structured events logged via log.Structured to one or more processors, registered via
// RegisterProcessor. This is done using tenant separation.
type streamController struct {
rmu struct {
syncutil.RWMutex
tenantProcessors map[roachpb.TenantID]*processorBuffer
}
}
var _ log.StructuredLogProcessor = (*streamController)(nil)
func newStreamController() *streamController {
sc := &streamController{}
sc.rmu.tenantProcessors = make(map[roachpb.TenantID]*processorBuffer)
return sc
}
// Inject our controller to pkg/util/log as a dependency.
func init() {
log.SetStructuredLogProcessor(controller)
}
// RegisterProcessor registers the given Processor to consume all log stream events of
// the given EventType, logged via log.Structured.
//
// The processing is done asynchronously from the log.Structured call, which simply buffers the event.
// Each tenant has their own processor(s) and async buffer, meaning each tenant is responsible for
// making their own calls to RegisterProcessor.
func RegisterProcessor(
ctx context.Context, stopper *stop.Stopper, eventType log.EventType, processor Processor,
) {
if controller == nil {
panic(errors.AssertionFailedf("attempted to registry logstream processor before controller was initialized"))
}
tID, ok := roachpb.ClientTenantFromContext(ctx)
if !ok {
tID = roachpb.SystemTenantID
}
controller.rmu.Lock()
defer controller.rmu.Unlock()
tenantProcessor, ok := controller.rmu.tenantProcessors[tID]
if !ok {
// TODO(abarganier): config knobs around flush trigger criteria.
tenantProcessor = newProcessorBuffer(
newLogTypeEventRouter(),
10*time.Second,
100, /* triggerLen */
1000, /* maxLen */
)
controller.rmu.tenantProcessors[tID] = tenantProcessor
if err := tenantProcessor.Start(ctx, stopper); err != nil {
panic(errors.AssertionFailedf(
"failed to start processorBuffer for tenant ID %d, event type %s", tID, eventType))
}
}
child, ok := tenantProcessor.child.(*LogTypeEventRouter)
if !ok {
panic(errors.AssertionFailedf("unexpected child type for structuredProcessorBuffer"))
}
child.register(ctx, eventType, processor)
}
// Process implements the log.StructuredLogProcessor interface.
func (l *streamController) Process(ctx context.Context, eventType log.EventType, e any) {
tID, ok := roachpb.ClientTenantFromContext(ctx)
if !ok {
tID = roachpb.SystemTenantID
}
l.rmu.RLock()
defer l.rmu.RUnlock()
processor, ok := l.rmu.tenantProcessors[tID]
if !ok {
// We don't mandate that a Processor has been registered.
return
}
if err := processor.Process(ctx, &TypedEvent{
eventType: eventType,
event: e,
}); err != nil {
log.Errorf(ctx, "error consuming exhaust: %v", err)
}
}