diff --git a/crontab/README.md b/crontab/README.md index a5ad0ceb..db510b25 100644 --- a/crontab/README.md +++ b/crontab/README.md @@ -9,44 +9,20 @@ import ( "context" "github.com/flc1125/go-cron/v4" - "github.com/go-kratos/kratos/v2" - "github.com/redis/go-redis/v9" - "github.com/go-kratos-ecosystem/components/v2/crontab" - "github.com/go-kratos-ecosystem/components/v2/crontab/middleware/distributednooverlapping" - "github.com/go-kratos-ecosystem/components/v2/crontab/middleware/distributednooverlapping/redismutex" + "github.com/go-kratos/kratos/v2" ) -type testJob struct { - distributednooverlapping.DefaultTTLJobWithMutex // optional, if you don't need to implement GetMutexTTL -} - -var _ distributednooverlapping.JobWithMutex = (*testJob)(nil) - -func (m *testJob) Run(context.Context) error { - // do something - return nil -} - -func (m *testJob) GetMutexKey() string { - return "test" -} - func main() { - rdb := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - }) - c := cron.New( cron.WithSeconds(), - cron.WithMiddleware( - distributednooverlapping.New( - redismutex.New(rdb, redismutex.WithPrefix("cron:mutex")), - ), - ), + cron.WithMiddleware( /*...*/ ), ) - _, _ = c.AddJob("* * * * * *", &testJob{}) + _, _ = c.AddFunc("* * * * * *", func(context.Context) error { + // do something + return nil + }) // kratos app start app := kratos.New( diff --git a/crontab/example_test.go b/crontab/example_test.go index b593e060..50c2a114 100644 --- a/crontab/example_test.go +++ b/crontab/example_test.go @@ -4,44 +4,20 @@ import ( "context" "github.com/flc1125/go-cron/v4" - "github.com/go-kratos/kratos/v2" - "github.com/redis/go-redis/v9" - "github.com/go-kratos-ecosystem/components/v2/crontab" - "github.com/go-kratos-ecosystem/components/v2/crontab/middleware/distributednooverlapping" - "github.com/go-kratos-ecosystem/components/v2/crontab/middleware/distributednooverlapping/redismutex" + "github.com/go-kratos/kratos/v2" ) -type testJob struct { - distributednooverlapping.DefaultTTLJobWithMutex // optional, if you don't need to implement GetMutexTTL -} - -var _ distributednooverlapping.JobWithMutex = (*testJob)(nil) - -func (m *testJob) Run(context.Context) error { - // do something - return nil -} - -func (m *testJob) GetMutexKey() string { - return "test" -} - func Example() { - rdb := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - }) - c := cron.New( cron.WithSeconds(), - cron.WithMiddleware( - distributednooverlapping.New( - redismutex.New(rdb, redismutex.WithPrefix("cron:mutex")), - ), - ), + cron.WithMiddleware( /*...*/ ), ) - _, _ = c.AddJob("* * * * * *", &testJob{}) + _, _ = c.AddFunc("* * * * * *", func(context.Context) error { + // do something + return nil + }) // kratos app start app := kratos.New( diff --git a/crontab/middleware/distributednooverlapping/distributednooverlapping.go b/crontab/middleware/distributednooverlapping/distributednooverlapping.go deleted file mode 100644 index 79c24e38..00000000 --- a/crontab/middleware/distributednooverlapping/distributednooverlapping.go +++ /dev/null @@ -1,58 +0,0 @@ -package distributednooverlapping - -import ( - "context" - - "github.com/flc1125/go-cron/v4" -) - -type options struct { - mu Mutex - logger cron.Logger -} - -type Option func(*options) - -func WithLogger(logger cron.Logger) Option { - return func(o *options) { - o.logger = logger - } -} - -func newOptions(mu Mutex, opts ...Option) options { - opt := options{ - mu: mu, - logger: cron.DefaultLogger, - } - for _, o := range opts { - o(&opt) - } - return opt -} - -func New(mu Mutex, opts ...Option) cron.Middleware { - o := newOptions(mu, opts...) - return func(original cron.Job) cron.Job { - return cron.JobFunc(func(ctx context.Context) error { - job, ok := any(original).(JobWithMutex) - if !ok { - return original.Run(ctx) - } - - acquired, err := o.mu.Lock(ctx, job) - if err != nil { - o.logger.Error(err, "failed to lock mutex", "key", job.GetMutexKey()) - return err - } - if !acquired { - o.logger.Info("skip job [%s], because distributed no overlapping", "key", job.GetMutexKey()) - return nil - } - - defer func() { - _ = o.mu.Unlock(ctx, job) - }() - return original.Run(ctx) - }) - } -} diff --git a/crontab/middleware/distributednooverlapping/redismutex/mutex.go b/crontab/middleware/distributednooverlapping/redismutex/mutex.go deleted file mode 100644 index dbdf1c86..00000000 --- a/crontab/middleware/distributednooverlapping/redismutex/mutex.go +++ /dev/null @@ -1,48 +0,0 @@ -package redismutex - -import ( - "context" - - "github.com/redis/go-redis/v9" - - "github.com/go-kratos-ecosystem/components/v2/crontab/middleware/distributednooverlapping" -) - -type Mutex struct { - redis redis.UniversalClient - prefix string -} - -type Option func(*Mutex) - -func WithPrefix(prefix string) Option { - return func(m *Mutex) { - if prefix != "" { - if prefix[len(prefix)-1] == ':' { - prefix = prefix[:len(prefix)-1] - } - m.prefix = prefix + ":" - } - } -} - -var _ distributednooverlapping.Mutex = (*Mutex)(nil) - -func New(redis redis.UniversalClient, opts ...Option) *Mutex { - mutex := &Mutex{ - redis: redis, - prefix: "cron:mutex", - } - for _, opt := range opts { - opt(mutex) - } - return mutex -} - -func (m *Mutex) Lock(ctx context.Context, job distributednooverlapping.JobWithMutex) (bool, error) { - return m.redis.SetNX(ctx, m.prefix+job.GetMutexKey(), "1", job.GetMutexTTL()).Result() -} - -func (m *Mutex) Unlock(ctx context.Context, job distributednooverlapping.JobWithMutex) error { - return m.redis.Del(ctx, m.prefix+job.GetMutexKey()).Err() -} diff --git a/crontab/middleware/distributednooverlapping/types.go b/crontab/middleware/distributednooverlapping/types.go deleted file mode 100644 index c8288ef8..00000000 --- a/crontab/middleware/distributednooverlapping/types.go +++ /dev/null @@ -1,36 +0,0 @@ -package distributednooverlapping - -import ( - "context" - "time" - - "github.com/flc1125/go-cron/v4" -) - -type Mutex interface { - // Lock tries to acquire the mutex for the job. - // If the mutex is acquired, it returns true. - // If the mutex is not acquired, it returns false. - Lock(ctx context.Context, job JobWithMutex) (bool, error) - - // Unlock releases the mutex for the job. - Unlock(ctx context.Context, job JobWithMutex) error -} - -type JobWithMutex interface { - cron.Job - - // GetMutexKey returns the key of the mutex. - GetMutexKey() string - - // GetMutexTTL returns the ttl of the mutex. - // The ttl suggests greater than the running time of the job. - GetMutexTTL() time.Duration -} - -// DefaultTTLJobWithMutex is a default implementation of JobWithMutex. -type DefaultTTLJobWithMutex struct{} - -func (DefaultTTLJobWithMutex) GetMutexTTL() time.Duration { - return time.Hour -} diff --git a/crontab/middleware/tracing/tracing.go b/crontab/middleware/tracing/tracing.go deleted file mode 100644 index 2500b37e..00000000 --- a/crontab/middleware/tracing/tracing.go +++ /dev/null @@ -1,107 +0,0 @@ -package tracing - -import ( - "context" - "time" - - "github.com/flc1125/go-cron/v4" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" -) - -const scopeName = "github.com/go-kratos-ecosystem/components/v2/crontab/middleware/otel" - -var ( - attrJobName = attribute.Key("cron.job.name") - attrJobID = attribute.Key("cron.job.id") - attrJobPrevTime = attribute.Key("cron.job.prev.time") - attrJobNextTime = attribute.Key("cron.job.next.time") - attrJobDuration = attribute.Key("cron.job.duration") -) - -type JobWithName interface { - cron.Job - Name() string -} - -type options struct { - tp trace.TracerProvider -} - -type Option func(*options) - -func WithTracerProvider(tp trace.TracerProvider) Option { - return func(o *options) { - o.tp = tp - } -} - -func newOptions(opts ...Option) *options { - opt := &options{ - tp: otel.GetTracerProvider(), - } - for _, o := range opts { - o(opt) - } - return opt -} - -func New(opts ...Option) cron.Middleware { - o := newOptions(opts...) - - tracer := o.tp.Tracer(scopeName) - return func(original cron.Job) cron.Job { - return cron.JobFunc(func(ctx context.Context) (err error) { - job, ok := any(original).(JobWithName) - if !ok { - return original.Run(ctx) - } - - // The span is created here, and it will be ended when the job is done. - var span trace.Span - ctx, span = tracer.Start(ctx, job.Name(), - trace.WithSpanKind(trace.SpanKindInternal), - ) - defer span.End() - defer func(starting time.Time) { - span.SetAttributes( - attrJobDuration.String(time.Since(starting).String()), - ) - - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - } else { - span.SetStatus(codes.Ok, "") - } - }(time.Now()) - - // Set attributes. - span.SetAttributes(append( - []attribute.KeyValue{ - attrJobName.String(job.Name()), - }, - entryAttributes(ctx)..., - )...) - - // The job is run here. - err = job.Run(ctx) - return - }) - } -} - -func entryAttributes(ctx context.Context) []attribute.KeyValue { - entry, ok := cron.EntryFromContext(ctx) - if !ok { - return []attribute.KeyValue{} - } - - return []attribute.KeyValue{ - attrJobID.Int(int(entry.ID())), - attrJobPrevTime.String(entry.Prev().String()), - attrJobNextTime.String(entry.Next().String()), - } -}