Skip to content

Commit

Permalink
Use 30seconds cache for realtime stats
Browse files Browse the repository at this point in the history
Attempt to solve #140
  • Loading branch information
alok87 committed Feb 19, 2021
1 parent 2524b5c commit 01be655
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 19 deletions.
1 change: 1 addition & 0 deletions cmd/redshiftsink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func main() {
KafkaWatchers: new(sync.Map),
KafkaTopicRegexes: new(sync.Map),
KafkaTopicsCache: new(sync.Map),
KafkaRealtimeCache: new(sync.Map),
GitCache: new(sync.Map),
DefaultBatcherImage: batcherImage,
DefaultLoaderImage: loaderImage,
Expand Down
13 changes: 7 additions & 6 deletions controllers/redshiftsink_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ type RedshiftSinkReconciler struct {
Scheme *runtime.Scheme
Recorder record.EventRecorder

KafkaTopicRegexes *sync.Map
KafkaWatchers *sync.Map
KafkaTopicsCache *sync.Map
KafkaTopicRegexes *sync.Map
KafkaWatchers *sync.Map
KafkaTopicsCache *sync.Map
KafkaRealtimeCache *sync.Map

DefaultBatcherImage string
DefaultLoaderImage string
Expand Down Expand Up @@ -415,9 +416,9 @@ func (r *RedshiftSinkReconciler) reconcile(
buildLoader(secret, r.DefaultLoaderImage, ReloadTableSuffix, r.DefaultKafkaVersion, tlsConfig).
build()

currentRealtime, err := reload.realtimeTopics(kafkaWatcher)
currentRealtime, err := reload.realtimeTopics(kafkaWatcher, r.KafkaRealtimeCache)
if err != nil {
klog.Errorf("Error fetching realtime stats for: %v, err: %v (existing realtime would continue releasing)", rsk.Name, err)
klog.Errorf("Error fetching realtime stats for rsk/%s, existing realtime would continue releasing, err: %v", rsk.Name, err)
// #140 causes too many failures here so instead of
// breaking and hanging behaviour is stopped to
// make existing realtimes get released quickly
Expand Down Expand Up @@ -470,7 +471,7 @@ func (r *RedshiftSinkReconciler) reconcile(
}

if len(status.realtime) == 0 {
klog.V(2).Infof("rsk/%s Nothing done in reconcile", rsk.Name)
klog.V(2).Infof("rsk/%s nothing done in reconcile", rsk.Name)
return result, nil, nil
}

Expand Down
48 changes: 44 additions & 4 deletions controllers/sink_group_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controllers
import (
"context"
"fmt"
"sync"
"time"

klog "github.com/practo/klog/v2"
Expand All @@ -27,7 +28,7 @@ const (

type sinkGroupInterface interface {
Reconcile(ctx context.Context) (ctrl.Result, ReconcilerEvent, error)
RealtimeTopics(watcher kafka.Watcher) ([]string, error)
RealtimeTopics(watcher kafka.Watcher, cache *sync.Map) ([]string, error)
}

type Deployment interface {
Expand Down Expand Up @@ -564,20 +565,55 @@ func (s *sinkGroup) lagBelowThreshold(
return false
}

func cacheValid(validity time.Duration, lastCachedTime *int64) bool {
if lastCachedTime == nil {
return false
}

if (*lastCachedTime + validity.Nanoseconds()) > time.Now().UnixNano() {
return true
}

return false
}

type kafkaRealtimeCache struct {
lastCacheRefresh *int64
realtime bool
}

// realtimeTopics gives back the list of topics whose consumer lags are
// less than or equal to the specified thresholds to be considered realtime
func (s *sinkGroup) realtimeTopics(
watcher kafka.Watcher,
cache *sync.Map,
) (
[]string, error,
) {
realtimeTopics := []string{}
now := time.Now().UnixNano()

for _, topic := range s.topics {
// use cache to prevent calls to kafka
var realtimeCache kafkaRealtimeCache
cacheLoaded, ok := cache.Load(topic)
if ok {
realtimeCache = cacheLoaded.(kafkaRealtimeCache)
if cacheValid(time.Second*time.Duration(30), realtimeCache.lastCacheRefresh) {
klog.V(4).Infof("topic: %s (realtime cache hit)", topic)
if realtimeCache.realtime {
realtimeTopics = append(realtimeTopics, topic)
}
continue
}
}

klog.V(4).Infof("topic: %s (fetching realtime stats)", topic)
group, ok := s.topicGroups[topic]
if !ok {
cache.Store(topic, kafkaRealtimeCache{lastCacheRefresh: &now, realtime: false})
return realtimeTopics, fmt.Errorf("groupID not found for %s", topic)
}

batcherCGID := consumerGroupID(s.rsk.Name, s.rsk.Namespace, group.ID, "-batcher")
batcherLag, err := watcher.ConsumerGroupLag(
batcherCGID,
Expand All @@ -588,6 +624,7 @@ func (s *sinkGroup) realtimeTopics(
return realtimeTopics, err
}
if batcherLag == -1 {
cache.Store(topic, kafkaRealtimeCache{lastCacheRefresh: &now, realtime: false})
klog.V(4).Infof("%v: lag=-1, condition unmet", batcherCGID)
continue
}
Expand All @@ -602,16 +639,19 @@ func (s *sinkGroup) realtimeTopics(
return realtimeTopics, err
}
if loaderLag == -1 {
cache.Store(topic, kafkaRealtimeCache{lastCacheRefresh: &now, realtime: false})
klog.V(4).Infof("%v: lag=-1, condition unmet", loaderCGID)
continue
}

klog.V(3).Infof("%v: lag=%v", batcherCGID, batcherLag)
klog.V(3).Infof("%v: lag=%v", loaderCGID, loaderLag)
klog.V(4).Infof("lag=%v for %v", batcherLag, batcherCGID)
klog.V(4).Infof("lag=%v for %v", loaderLag, loaderCGID)

if s.lagBelowThreshold(topic, batcherLag, loaderLag) {
realtimeTopics = append(realtimeTopics, topic)
cache.Store(topic, kafkaRealtimeCache{lastCacheRefresh: &now, realtime: true})
} else {
cache.Store(topic, kafkaRealtimeCache{lastCacheRefresh: &now, realtime: false})
klog.V(2).Infof("%v: waiting to reach realtime", topic)
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/git/git_cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package git

import (
klog "github.com/practo/klog/v2"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -46,10 +47,11 @@ func NewGitCache(repoURL string, accessToken string) (GitCacheInterface, error)
// it fetches the latest version every cacheValidity seconds and keeps a cache
func (g *GitCache) GetFileVersion(filePath string) (string, error) {
if cacheValid(g.cacheValidity, g.lastCacheRefresh) {
klog.V(5).Infof("Git cache valid for: %s", filePath)
version, ok := g.fileVersion[filePath]
if ok {
klog.V(5).Infof("Git cache hit for: %s", filePath)
return version, nil
} else {
}
}
now := time.Now().UnixNano()
Expand Down
18 changes: 11 additions & 7 deletions pkg/kafka/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,22 +104,22 @@ func (t *kafkaWatch) ConsumerGroupLag(

err := t.client.RefreshBrokers(t.brokers)
if err != nil {
return lag, err
return lag, fmt.Errorf("Error refreshing kafka brokers, err: %v", err)
}
err = t.client.RefreshMetadata(topic)
if err != nil {
return lag, err
return lag, fmt.Errorf("Error refreshing kafka metadata, err: %v", err)
}

for _, broker := range t.client.Brokers() {
err = broker.Open(t.client.Config())
if err != nil && err != sarama.ErrAlreadyConnected {
return lag, err
return lag, fmt.Errorf("Error opening broker connection, err: %v", err)
}

lag, err = t.consumerGroupLag(id, topic, 0, broker)
if err != nil {
return lag, err
return lag, fmt.Errorf("Error calculating consumerGroupLag, err: %v", err)
}

if lag != -1 {
Expand All @@ -143,7 +143,7 @@ func (t *kafkaWatch) consumerGroupLag(

lastOffset, err := t.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
return defaultLag, err
return defaultLag, fmt.Errorf("Error getting offset for topic partition: %s, err: %v", topic, err)
}

offsetFetchRequest := sarama.OffsetFetchRequest{
Expand All @@ -154,7 +154,9 @@ func (t *kafkaWatch) consumerGroupLag(

offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest)
if err != nil {
return defaultLag, err
return defaultLag, fmt.Errorf(
"Error fetching offset for offsetFetchRequest: %s %v, err: %v",
topic, offsetFetchRequest, err)
}
if offsetFetchResponse == nil {
return defaultLag, fmt.Errorf(
Expand All @@ -178,7 +180,9 @@ func (t *kafkaWatch) consumerGroupLag(
return defaultLag, nil
}
if offsetFetchResponseBlock.Err != sarama.ErrNoError {
return defaultLag, err
return defaultLag, fmt.Errorf(
"Error since %s offsetFetchResponseBlock.Err != sarama.ErrNoError for offsetFetchResponseBlock.Err: %+v",
offsetFetchResponseBlock.Err)
}
return lastOffset - offsetFetchResponseBlock.Offset, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/redshiftbatcher/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func newBatchProcessor(
)
}

klog.Infof("AutoCommit: %v", saramaConfig.AutoCommit)
klog.Infof("topic: %v, autoCommit: %v", topic, saramaConfig.AutoCommit)

return &batchProcessor{
topic: topic,
Expand Down

0 comments on commit 01be655

Please sign in to comment.