Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added new cache package for query service #6733

Merged
merged 20 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,4 @@ check-no-ee-references:
fi

test:
go test ./pkg/query-service/...
go test ./pkg/...
25 changes: 24 additions & 1 deletion conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,27 @@ web:
# The prefix to serve web on
prefix: /
# The directory containing the static build files.
directory: /etc/signoz/web
directory: /etc/signoz/web
# Skip serving frontend from query service
skip_web_frontend: "false"

##################### Cache #####################
cache:
# specifies the caching provider to use.
provider: memory
# memory: Uses in-memory caching.
memory:
# Time-to-live for cache entries in memory. Specify the duration in ns
ttl: 60000000000
# The interval at which the cache will be cleaned up
cleanupInterval:
# redis: Uses Redis as the caching backend.
redis:
# The hostname or IP address of the Redis server.
host: localhost
# The port on which the Redis server is running. Default is usually 6379.
port: 6379
# The password for authenticating with the Redis server, if required.
password:
# The Redis database number to use
db: 0
9 changes: 5 additions & 4 deletions ee/query-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
baseauth "go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/migrate"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/signoz"
"go.signoz.io/signoz/pkg/web"

licensepkg "go.signoz.io/signoz/ee/query-service/license"
Expand Down Expand Up @@ -62,6 +63,7 @@ import (
const AppDbEngine = "sqlite"

type ServerOptions struct {
SigNoz *signoz.SigNoz
PromConfigPath string
SkipTopLvlOpsPath string
HTTPHostPort string
Expand All @@ -79,7 +81,6 @@ type ServerOptions struct {
GatewayUrl string
UseLogsNewSchema bool
UseTraceNewSchema bool
SkipWebFrontend bool
}

// Server runs HTTP api service
Expand Down Expand Up @@ -109,7 +110,7 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status {
}

// NewServer creates and initializes Server
func NewServer(serverOptions *ServerOptions, web *web.Web) (*Server, error) {
func NewServer(serverOptions *ServerOptions) (*Server, error) {

modelDao, err := dao.InitDao("sqlite", baseconst.RELATIONAL_DATASOURCE_PATH)
if err != nil {
Expand Down Expand Up @@ -291,7 +292,7 @@ func NewServer(serverOptions *ServerOptions, web *web.Web) (*Server, error) {
usageManager: usageManager,
}

httpServer, err := s.createPublicServer(apiHandler, web)
httpServer, err := s.createPublicServer(apiHandler, serverOptions.SigNoz.Web)

if err != nil {
return nil, err
Expand Down Expand Up @@ -384,7 +385,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web *web.Web) (*

handler = handlers.CompressHandler(handler)

if !s.serverOptions.SkipWebFrontend {
if web != nil {
err := web.AddToRouter(r)
if err != nil {
return nil, err
Expand Down
14 changes: 6 additions & 8 deletions ee/query-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
baseconst "go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/migrate"
"go.signoz.io/signoz/pkg/query-service/version"
signozweb "go.signoz.io/signoz/pkg/web"
"go.signoz.io/signoz/pkg/signoz"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

Expand Down Expand Up @@ -108,7 +108,6 @@ func main() {
var dialTimeout time.Duration
var gatewayUrl string
var useLicensesV3 bool
var skipWebFrontend bool

flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
flag.BoolVar(&useTraceNewSchema, "use-trace-new-schema", false, "use new schema for traces")
Expand All @@ -126,7 +125,6 @@ func main() {
flag.StringVar(&cluster, "cluster", "cluster", "(cluster name - defaults to 'cluster')")
flag.StringVar(&gatewayUrl, "gateway-url", "", "(url to the gateway)")
flag.BoolVar(&useLicensesV3, "use-licenses-v3", false, "use licenses_v3 schema for licenses")
flag.BoolVar(&skipWebFrontend, "skip-web-frontend", false, "skip web frontend")
flag.Parse()

loggerMgr := initZapLog(enableQueryServiceLogOTLPExport)
Expand All @@ -148,12 +146,13 @@ func main() {
zap.L().Fatal("Failed to create config", zap.Error(err))
}

web, err := signozweb.New(zap.L(), config.Web)
if err != nil && !skipWebFrontend {
zap.L().Fatal("Failed to create web", zap.Error(err))
vikrantgupta25 marked this conversation as resolved.
Show resolved Hide resolved
signoz, err := signoz.New(config)
if err != nil {
zap.L().Fatal("Failed to create signoz struct", zap.Error(err))
}

serverOptions := &app.ServerOptions{
SigNoz: signoz,
HTTPHostPort: baseconst.HTTPHostPort,
PromConfigPath: promConfigPath,
SkipTopLvlOpsPath: skipTopLvlOpsPath,
Expand All @@ -170,7 +169,6 @@ func main() {
GatewayUrl: gatewayUrl,
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
SkipWebFrontend: skipWebFrontend,
}

// Read the jwt secret key
Expand All @@ -188,7 +186,7 @@ func main() {
zap.L().Info("Migration successful")
}

server, err := app.NewServer(serverOptions, web)
server, err := app.NewServer(serverOptions)
if err != nil {
zap.L().Fatal("Failed to create server", zap.Error(err))
}
Expand Down
71 changes: 71 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package cache
vikrantgupta25 marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"encoding"
"fmt"
"reflect"
"time"
)

// cacheable entity
type CacheableEntity interface {
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
}

func WrapCacheableEntityErrors(rt reflect.Type, caller string) error {
if rt == nil {
return fmt.Errorf("%s: (nil)", caller)
}

if rt.Kind() != reflect.Pointer {
return fmt.Errorf("%s: (non-pointer \"%s\")", caller, rt.String())
}

return fmt.Errorf("%s: (nil \"%s\")", caller, rt.String())

}

// cache status
type RetrieveStatus int

const (
RetrieveStatusHit = RetrieveStatus(iota)
RetrieveStatusPartialHit
RetrieveStatusRangeMiss
RetrieveStatusKeyMiss
RetrieveStatusRevalidated

RetrieveStatusError
)

func (s RetrieveStatus) String() string {
switch s {
case RetrieveStatusHit:
return "hit"
case RetrieveStatusPartialHit:
return "partial hit"
case RetrieveStatusRangeMiss:
return "range miss"
case RetrieveStatusKeyMiss:
return "key miss"
case RetrieveStatusRevalidated:
return "revalidated"
case RetrieveStatusError:
return "error"
default:
return "unknown"
}
}

// cache interface
type Cache interface {
Connect(ctx context.Context) error
Store(ctx context.Context, cacheKey string, data CacheableEntity, ttl time.Duration) error
Retrieve(ctx context.Context, cacheKey string, dest CacheableEntity, allowExpired bool) (RetrieveStatus, error)
SetTTL(ctx context.Context, cacheKey string, ttl time.Duration)
Remove(ctx context.Context, cacheKey string)
BulkRemove(ctx context.Context, cacheKeys []string)
Close(ctx context.Context) error
}
49 changes: 49 additions & 0 deletions pkg/cache/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package cache

import (
"time"

go_cache "github.com/patrickmn/go-cache"
"go.signoz.io/signoz/pkg/confmap"
)

// Config satisfies the confmap.Config interface
var _ confmap.Config = (*Config)(nil)

type Memory struct {
TTL time.Duration `mapstructure:"ttl"`
CleanupInterval time.Duration `mapstructure:"cleanupInterval"`
}

type Redis struct {
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
Password string `mapstructure:"password"`
DB int `mapstructure:"db"`
}

type Config struct {
Provider string `mapstructure:"provider"`
Memory Memory `mapstructure:"memory"`
Redis Redis `mapstructure:"redis"`
}

func (c *Config) NewWithDefaults() confmap.Config {
return &Config{
Provider: "memory",
Memory: Memory{
TTL: go_cache.NoExpiration,
CleanupInterval: 1 * time.Minute,
},
Redis: Redis{
Host: "localhost",
Port: 6379,
Password: "",
DB: 0,
},
}
}

func (c *Config) Validate() error {
return nil
}
96 changes: 96 additions & 0 deletions pkg/cache/strategy/memory/memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package memory

import (
"context"
"fmt"
"reflect"
"time"

go_cache "github.com/patrickmn/go-cache"
_cache "go.signoz.io/signoz/pkg/cache"
)

type cache struct {
cc *go_cache.Cache
}

func New(opts *_cache.Memory) *cache {
vikrantgupta25 marked this conversation as resolved.
Show resolved Hide resolved
return &cache{cc: go_cache.New(opts.TTL, opts.CleanupInterval)}
}

// Connect does nothing
func (c *cache) Connect(_ context.Context) error {
return nil
}

// Store stores the data in the cache
func (c *cache) Store(_ context.Context, cacheKey string, data _cache.CacheableEntity, ttl time.Duration) error {
// check if the data being passed is a pointer and is not nil
rv := reflect.ValueOf(data)
if rv.Kind() != reflect.Pointer || rv.IsNil() {
return _cache.WrapCacheableEntityErrors(reflect.TypeOf(data), "inmemory")
}

c.cc.Set(cacheKey, data, ttl)
return nil
}

// Retrieve retrieves the data from the cache
func (c *cache) Retrieve(_ context.Context, cacheKey string, dest _cache.CacheableEntity, allowExpired bool) (_cache.RetrieveStatus, error) {
// check if the destination being passed is a pointer and is not nil
dstv := reflect.ValueOf(dest)
if dstv.Kind() != reflect.Pointer || dstv.IsNil() {
return _cache.RetrieveStatusError, _cache.WrapCacheableEntityErrors(reflect.TypeOf(dest), "inmemory")
}

// check if the destination value is settable
if !dstv.Elem().CanSet() {
return _cache.RetrieveStatusError, fmt.Errorf("destination value is not settable, %s", dstv.Elem())
}

data, found := c.cc.Get(cacheKey)
if !found {
return _cache.RetrieveStatusKeyMiss, nil
}

// check the type compatbility between the src and dest
srcv := reflect.ValueOf(data)
if !srcv.Type().AssignableTo(dstv.Type()) {
return _cache.RetrieveStatusError, fmt.Errorf("src type is not assignable to dst type")
}

// set the value to from src to dest
dstv.Elem().Set(srcv.Elem())
return _cache.RetrieveStatusHit, nil
}

// SetTTL sets the TTL for the cache entry
func (c *cache) SetTTL(_ context.Context, cacheKey string, ttl time.Duration) {
item, found := c.cc.Get(cacheKey)
if !found {
return
}
c.cc.Replace(cacheKey, item, ttl)
}

// Remove removes the cache entry
func (c *cache) Remove(_ context.Context, cacheKey string) {
c.cc.Delete(cacheKey)
}

// BulkRemove removes the cache entries
func (c *cache) BulkRemove(_ context.Context, cacheKeys []string) {
for _, cacheKey := range cacheKeys {
c.cc.Delete(cacheKey)
}
}

// Close does nothing
func (c *cache) Close(_ context.Context) error {
return nil
}

// Configuration returns the cache configuration
func (c *cache) Configuration() *_cache.Memory {
return nil
}
Loading
Loading