Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix not to use goroutine for each rule registration #98

Merged
merged 1 commit into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ require (
github.com/oapi-codegen/runtime v1.1.1
github.com/prometheus/client_golang v1.18.0
github.com/urfave/cli/v2 v2.27.1
go.uber.org/multierr v1.11.0
golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc
golang.org/x/sync v0.5.0
)

require (
Expand Down
8 changes: 2 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
Expand Down Expand Up @@ -207,8 +205,6 @@ github.com/nats-io/nats.go v1.8.1/go.mod h1:BrFz9vVn0fU3AcH9Vn4Kd7W0NpJ651tD5omQ
github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/oapi-codegen/runtime v1.1.0 h1:rJpoNUawn5XTvekgfkvSZr0RqEnoYpFkyvrzfWeFKWM=
github.com/oapi-codegen/runtime v1.1.0/go.mod h1:BeSfBkWWWnAnGdyS+S/GnlbmHKzf8/hwkvelJZDeKA8=
github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro=
github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down Expand Up @@ -290,6 +286,8 @@ github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDf
github.com/yudai/pp v2.0.1+incompatible/go.mod h1:PuxR/8QJ7cyCkFp/aUDS+JY727OFEZkTdatxwunjIkc=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand All @@ -298,8 +296,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b h1:kLiC65FbiHWFAOu+lxwNPujcsl8VYyTYYEZnsOO1WK4=
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc h1:ao2WRsKSzW6KuUY9IWPwWahcHCgR0s52IfwutMfEbdM=
golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down
165 changes: 82 additions & 83 deletions internal/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import (
"github.com/cockroachdb/pebble"
"github.com/musaprg/annict-epgstation-connector/annict"
"github.com/musaprg/annict-epgstation-connector/epgstation"
"golang.org/x/exp/slices"
"go.uber.org/multierr"
"golang.org/x/exp/slog"
"golang.org/x/sync/errgroup"
"net/http"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -135,102 +134,102 @@ func (s *syncer) sync(ctx context.Context) error {
} else {
titles = append(titles, ts...)
}
slices.Compact(titles)

if err := s.registerRulesToEpgStation(ctx, titles); err != nil {
if err := s.registerRulesToEPGStation(ctx, titles); err != nil {
return err
}
return nil
}

func (s *syncer) registerRulesToEpgStation(ctx context.Context, works []annictWork) error {
eg, ctx := errgroup.WithContext(ctx)
func (s *syncer) registerRulesToEPGStation(ctx context.Context, works []annictWork) error {
var errs error
for _, work := range works {
work := work
eg.Go(func() error {
syncerAnnictWorkStartedAt.WithLabelValues(
work.ID,
work.Title,
work.SeasonName,
strconv.Itoa(work.SeasonYear),
).Set(float64(work.StartedAt.Unix()))
errs = multierr.Append(errs, s.registerRuleToEPGStation(ctx, work))
}
if errs != nil {
return fmt.Errorf("failed to register rules into EPGStation: %w", errs)
}
return nil
}

ruleIDs, err := s.getRecordingRuleIDsByAnnictWorkID(work.ID)
switch {
case err != nil && !errors.Is(err, pebble.ErrNotFound):
return fmt.Errorf("failed to get recording rule IDs for Annict work ID %s: %w", work.ID, err)
case err == nil:
// recording rule IDs found for the given Annict work ID
for _, id := range ruleIDs {
syncerRecordingRuleSynced.WithLabelValues(strconv.Itoa(int(id)), work.ID).Set(1)
}
return nil
}
if rules, _ := s.getRulesByKeyword(ctx, work.Title); len(rules) != 0 {
// recording rule with same keyword has already been registered
// skip registration
// TODO: Remove this logic after introducing cleanup logic
slog.Debug("recording rule with same keyword has already been registered", slog.String("keyword", work.Title))
return nil
}
body := epgstation.PostRulesJSONRequestBody{
SearchOption: epgstation.RuleSearchOption{
GR: epgstation.NewTruePointer(),
BS: epgstation.NewTruePointer(),
func (s *syncer) registerRuleToEPGStation(ctx context.Context, work annictWork) error {
syncerAnnictWorkStartedAt.WithLabelValues(
work.ID,
work.Title,
work.SeasonName,
strconv.Itoa(work.SeasonYear),
).Set(float64(work.StartedAt.Unix()))

// Only search by work
Keyword: &work.Title,
Name: epgstation.NewTruePointer(),
Description: epgstation.NewFalsePointer(),
Extended: epgstation.NewFalsePointer(),
ruleIDs, err := s.getRecordingRuleIDsByAnnictWorkID(work.ID)
switch {
case err != nil && !errors.Is(err, pebble.ErrNotFound):
return fmt.Errorf("failed to get recording rule IDs for Annict work ID %s: %w", work.ID, err)
case err == nil:
// recording rule IDs found for the given Annict work ID
for _, id := range ruleIDs {
syncerRecordingRuleSynced.WithLabelValues(strconv.Itoa(int(id)), work.ID).Set(1)
}
return nil
}
if rules, _ := s.getRulesByKeyword(ctx, work.Title); len(rules) != 0 {
// recording rule with same keyword has already been registered
// skip registration
// TODO: Remove this logic after introducing cleanup logic
slog.Debug("recording rule with same keyword has already been registered", slog.String("keyword", work.Title))
return nil
}
body := epgstation.PostRulesJSONRequestBody{
SearchOption: epgstation.RuleSearchOption{
GR: epgstation.NewTruePointer(),
BS: epgstation.NewTruePointer(),

// https://github.com/l3tnun/EPGStation/blob/master/client/src/lib/event.ts
Genres: &[]epgstation.Genre{
{Genre: 0x6}, // 0x6 = 映画
{Genre: 0x7}, // 0x7 = アニメ・特撮
},
// Only search by work
Keyword: &work.Title,
Name: epgstation.NewTruePointer(),
Description: epgstation.NewFalsePointer(),
Extended: epgstation.NewFalsePointer(),

Times: &[]epgstation.SearchTime{
{
// whole week
Week: 0b1111111,
},
},
// https://github.com/l3tnun/EPGStation/blob/master/client/src/lib/event.ts
Genres: &[]epgstation.Genre{
{Genre: 0x6}, // 0x6 = 映画
{Genre: 0x7}, // 0x7 = アニメ・特撮
},

IsFree: epgstation.NewTruePointer(), // TODO(musaprg): how about NHK?
Times: &[]epgstation.SearchTime{
{
// whole week
Week: 0b1111111,
},
IsTimeSpecification: false,
SaveOption: &epgstation.ReserveSaveOption{},
EncodeOption: &epgstation.ReserveEncodedOption{},
ReserveOption: epgstation.RuleReserveOption{
AvoidDuplicate: false,
Enable: true,
AllowEndLack: false,
},
}
r, err := s.esClient.PostRules(ctx, body)
if err != nil {
return err
}
res, err := epgstation.ParsePostRulesResponse(r)
if err != nil {
return err
}
if res.JSON201 == nil {
return fmt.Errorf("failed to register rules into EPGStation: %s", res.Body)
}
ids := RecordingRuleIDs{RecordingRuleID(res.JSON201.RuleId)}
if err := s.setRecordingRuleIDsByAnnictWorkID(work.ID, ids); err != nil {
return err
}
syncerRecordingRuleSynced.WithLabelValues(strconv.Itoa(int(ids[0])), work.ID).Set(1)
// TODO(musaprg): output response in the log message
return nil
})
},

IsFree: epgstation.NewTruePointer(), // TODO(musaprg): how about NHK?
},
IsTimeSpecification: false,
SaveOption: &epgstation.ReserveSaveOption{},
EncodeOption: &epgstation.ReserveEncodedOption{},
ReserveOption: epgstation.RuleReserveOption{
AvoidDuplicate: false,
Enable: true,
AllowEndLack: false,
},
}
if err := eg.Wait(); err != nil {
return fmt.Errorf("failed to register rules into EPGStation: %w", err)
r, err := s.esClient.PostRules(ctx, body)
if err != nil {
return err
}
res, err := epgstation.ParsePostRulesResponse(r)
if err != nil {
return err
}
if res.JSON201 == nil {
return fmt.Errorf("failed to register rules into EPGStation: %s", res.Body)
}
ids := RecordingRuleIDs{RecordingRuleID(res.JSON201.RuleId)}
if err := s.setRecordingRuleIDsByAnnictWorkID(work.ID, ids); err != nil {
return err
}
syncerRecordingRuleSynced.WithLabelValues(strconv.Itoa(int(ids[0])), work.ID).Set(1)
// TODO(musaprg): output response in the log message
return nil
}

Expand Down