diff --git a/services/horizon/internal/ingest/filters/main.go b/services/horizon/internal/ingest/filters/main.go index 94e762d0ac..06532e5848 100644 --- a/services/horizon/internal/ingest/filters/main.go +++ b/services/horizon/internal/ingest/filters/main.go @@ -2,6 +2,7 @@ package filters import ( "context" + "sync" "time" "github.com/stellar/go/services/horizon/internal/db2/history" @@ -13,9 +14,22 @@ var ( // the filter config cache will be checked against latest from db at most once per each of this interval. //lint:ignore ST1011, don't need the linter warn on literal assignment - FilterConfigCheckIntervalSeconds time.Duration = 100 + filterConfigCheckIntervalSeconds time.Duration = 100 + filterConfigCheckIntervalSecondsLock sync.RWMutex ) +func GetFilterConfigCheckIntervalSeconds() time.Duration { + filterConfigCheckIntervalSecondsLock.RLock() + defer filterConfigCheckIntervalSecondsLock.RUnlock() + return filterConfigCheckIntervalSeconds +} + +func SetFilterConfigCheckIntervalSeconds(t time.Duration) { + filterConfigCheckIntervalSecondsLock.Lock() + defer filterConfigCheckIntervalSecondsLock.Unlock() + filterConfigCheckIntervalSeconds = t +} + var ( LOG = log.WithFields(log.F{ "filters": "load", @@ -43,7 +57,7 @@ func NewFilters() Filters { // rebuild the list on expiration time interval. Method is NOT thread-safe. func (f *filtersCache) GetFilters(filterQ history.QFilter, ctx context.Context) []processors.LedgerTransactionFilterer { // only attempt to refresh filter config cache state at configured interval limit - if time.Now().Unix() < (f.lastFilterConfigCheckUnixEpoch + int64(FilterConfigCheckIntervalSeconds.Seconds())) { + if time.Now().Unix() < (f.lastFilterConfigCheckUnixEpoch + int64(GetFilterConfigCheckIntervalSeconds().Seconds())) { return f.convertCacheToList() } diff --git a/services/horizon/internal/integration/ingestion_filtering_test.go b/services/horizon/internal/integration/ingestion_filtering_test.go index 8eca0f3aa1..47cfc1ccbc 100644 --- a/services/horizon/internal/integration/ingestion_filtering_test.go +++ b/services/horizon/internal/integration/ingestion_filtering_test.go @@ -48,7 +48,7 @@ func TestFilteringAccountWhiteList(t *testing.T) { tt.NoError(err) // Setup a whitelisted account rule, force refresh of filter configs to be quick - filters.FilterConfigCheckIntervalSeconds = 1 + filters.SetFilterConfigCheckIntervalSeconds(1) expectedAccountFilter := hProtocol.AccountFilterConfig{ Whitelist: []string{whitelistedAccount.GetAccountID()}, @@ -64,7 +64,7 @@ func TestFilteringAccountWhiteList(t *testing.T) { tt.Equal(expectedAccountFilter.Enabled, accountFilter.Enabled) // Ensure the latest filter configs are reloaded by the ingestion state machine processor - time.Sleep(time.Duration(filters.FilterConfigCheckIntervalSeconds) * time.Second) + time.Sleep(time.Duration(filters.GetFilterConfigCheckIntervalSeconds()) * time.Second) // Make sure that when using a non-whitelisted account, the transaction is not stored txResp = itest.MustSubmitOperations(itest.MasterAccount(), itest.Master(), @@ -123,7 +123,7 @@ func TestFilteringAssetWhiteList(t *testing.T) { tt.NoError(err) // Setup a whitelisted asset rule, force refresh of filters to be quick - filters.FilterConfigCheckIntervalSeconds = 1 + filters.SetFilterConfigCheckIntervalSeconds(1) asset, err := whitelistedAsset.ToXDR() tt.NoError(err) @@ -141,7 +141,7 @@ func TestFilteringAssetWhiteList(t *testing.T) { tt.Equal(expectedAssetFilter.Enabled, assetFilter.Enabled) // Ensure the latest filter configs are reloaded by the ingestion state machine processor - time.Sleep(time.Duration(filters.FilterConfigCheckIntervalSeconds) * time.Second) + time.Sleep(time.Duration(filters.GetFilterConfigCheckIntervalSeconds()) * time.Second) // Make sure that when using a non-whitelisted asset, the transaction is not stored txResp = itest.MustSubmitOperations(itest.MasterAccount(), itest.Master(),