Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove AS uplink storage #6317

Merged
merged 25 commits into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
530472f
as,cli: Deprecate the uplink storage registry
cvetkovski98 May 22, 2023
93dbc2b
cli: Add cleanup uplink registry command
cvetkovski98 May 23, 2023
5fa3f03
as: Move application uplink storage to GLS package association
cvetkovski98 Jun 8, 2023
a5cb706
as: Add clear methods to application package registry
cvetkovski98 Jun 8, 2023
4bacfb9
as: Use gob to serialize recent md in gls pkg data
cvetkovski98 Jun 12, 2023
47578a4
as: Cleanup associations on device and app link removal
cvetkovski98 Jun 13, 2023
49b1434
as: Add gls package description
cvetkovski98 Jun 14, 2023
7bad117
as: Add gls package data tests
cvetkovski98 Jun 14, 2023
852c6e4
as: Add association registry mock
cvetkovski98 Jun 15, 2023
8696b6d
as: Add association registry clear methods tests
cvetkovski98 Jun 15, 2023
fc5d60f
as: Ensure device registry invokes association clear
cvetkovski98 Jun 15, 2023
1261ee3
as: Add redis pkg description
cvetkovski98 Jun 15, 2023
7f75ef7
as: Use updated package data after uplink push
cvetkovski98 Jun 16, 2023
aea5c98
as: Use locked watch when clearing association registry
cvetkovski98 Jun 16, 2023
e600580
as: Improve association registry clear tests
cvetkovski98 Jun 16, 2023
c9c6492
as: Move GLS struct definitions to api package
cvetkovski98 Jun 16, 2023
df95b56
as: Remove location source field
cvetkovski98 Jun 16, 2023
4fbf3c2
cli: Rename uplink registry cleanup command to purge
cvetkovski98 Jun 16, 2023
1567d0c
all: Update CHANGELOG.md
cvetkovski98 Jun 26, 2023
73f40ac
as: Encode gob bytes as base64
cvetkovski98 Jun 26, 2023
c05e224
as: Handle possible nil association in GLS integration
cvetkovski98 Jun 27, 2023
00b835a
as: Fix GLS package data merging
cvetkovski98 Jun 27, 2023
c198ba1
as: Add constraint to the GLS multi frame query window size
cvetkovski98 Jun 27, 2023
b731b4e
console: Remove checkbox for automatic window size in GLS integration
cvetkovski98 Jun 27, 2023
57e9872
as: Remove stored recent metadata when using single frame GLS query
cvetkovski98 Jun 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@ For details about compatibility between different releases, see the **Commitment

### Added

- The `as-db purge` command to purge unused data from the Application Server database.

### Changed

- Instead of retrying application downlinks indefinitely, the Application Server now retries them for a configured number of times. Each `ApplicationDownlink` message contains the `attempt` and `max_attempts` fields to indicate the current and maximum number of attempts for a specific `application downlink`.
- The Application Server configuration has the `as.downlinks.confirmation.default-retry-attempts` and `as.downlinks.confirmation.max-retry-attempts` fields that configure the allowed number of retries for application downlinks. The default values are `8` for the `as.downlinks.confirmation.default-retry-attempts` and `32` for the `as.downlinks.confirmation.max-retry-attempts`.
- The `as.downlinks.confirmation.default-retry-attempts` field is used for all application downlinks that were scheduled before this change and for every application downlink that does not have the `max_attempts` field set. On the other hand, the `as.downlinks.confirmation.max-retry-attempts` field ensures that the `max_attempts` field's upper bound is contained and does not exceed its value.
- The number of historical frames considered for the multi-frame query window size in the LoRa Geolocation Services integration. The window size is now limited between 1 and 16 frames with 16 being the default value.

### Deprecated

- The `as.uplink-storage.limit` configuration option.

### Removed

### Fixed
Expand Down
3 changes: 0 additions & 3 deletions cmd/internal/shared/applicationserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ var DefaultApplicationServerConfig = applicationserver.Config{
Workers: 1024,
Downlinks: web.DownlinksConfig{PublicAddress: shared.DefaultPublicURL + "/api/v3"},
},
UplinkStorage: applicationserver.UplinkStorageConfig{
Limit: 16,
},
EndDeviceMetadataStorage: applicationserver.EndDeviceMetadataStorageConfig{
Location: applicationserver.EndDeviceLocationStorageConfig{
Timeout: 5 * time.Second,
Expand Down
34 changes: 34 additions & 0 deletions cmd/ttn-lw-stack/commands/as_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,39 @@ var (
return nil
},
}
asDBPurgeCommand = &cobra.Command{
Use: "purge",
Short: "Purge unused Application Server data",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

logger.Info("Connecting to Redis database...")
cl := ttnredis.New(config.Redis.WithNamespace("as", "applicationups"))

logger.Info("Purging uplink registry data")

purged := 0
uidApplicationUpsKey := cl.Key("uid", "*")
pipeliner := cl.Pipeline()
err := ttnredis.RangeRedisKeys(ctx, cl, uidApplicationUpsKey, ttnredis.DefaultRangeCount,
func(k string) (bool, error) {
pipeliner.Del(ctx, k)
purged++
return true, nil
})
if err != nil {
logger.WithError(err).Error("Failed to purge uplink registry data")
return err
}
if _, err := pipeliner.Exec(ctx); err != nil {
logger.WithError(err).Error("Failed to purge uplink registry data")
return err
}

logger.WithField("records_purged_count", purged).Info("Purged uplink registry data")
return nil
},
}
)

func init() {
Expand All @@ -236,4 +269,5 @@ func init() {
asDBCleanupCommand.Flags().Bool("dry-run", false, "Dry run")
asDBCleanupCommand.Flags().Duration("pagination-delay", 100, "Delay between batch requests")
asDBCommand.AddCommand(asDBCleanupCommand)
asDBCommand.AddCommand(asDBPurgeCommand)
}
15 changes: 5 additions & 10 deletions cmd/ttn-lw-stack/commands/as_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,16 @@ func NewPubSubCleaner(ctx context.Context, config *redis.Config) (*pubsub.Regist
// NewPackagesCleaner returns a new instance of packages RegistryCleaner with a local set
// of applications and devices.
func NewPackagesCleaner(ctx context.Context, config *redis.Config) (*packages.RegistryCleaner, error) {
applicationPackagesRegistry := &packageredis.ApplicationPackagesRegistry{
Redis: redis.New(config.WithNamespace("as", "io", "applicationpackages")),
LockTTL: defaultLockTTL,
}
if err := applicationPackagesRegistry.Init(ctx); err != nil {
applicationPackagesRegistry, err := packageredis.NewApplicationPackagesRegistry(
ctx, redis.New(config.WithNamespace("as", "io", "applicationpackages")), defaultLockTTL,
)
if err != nil {
return nil, shared.ErrInitializeApplicationServer.WithCause(err)
}
cleaner := &packages.RegistryCleaner{
ApplicationPackagesRegistry: applicationPackagesRegistry,
}
err := cleaner.RangeToLocalSet(ctx)
if err != nil {
if err := cleaner.RangeToLocalSet(ctx); err != nil {
return nil, err
}
return cleaner, nil
Expand All @@ -80,9 +78,6 @@ func NewASDeviceRegistryCleaner(ctx context.Context, config *redis.Config) (*as.
}
cleaner := &as.RegistryCleaner{
DevRegistry: deviceRegistry,
AppUpsRegistry: &asredis.ApplicationUplinkRegistry{
Redis: redis.New(config.WithNamespace("as", "applicationups")),
},
}
err := cleaner.RangeToLocalSet(ctx)
if err != nil {
Expand Down
15 changes: 6 additions & 9 deletions cmd/ttn-lw-stack/commands/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,6 @@ var startCommand = &cobra.Command{
return shared.ErrInitializeApplicationServer.WithCause(err)
}
config.AS.Devices = deviceRegistry
config.AS.UplinkStorage.Registry = &asredis.ApplicationUplinkRegistry{
Redis: redis.New(config.Redis.WithNamespace("as", "applicationups")),
Limit: config.AS.UplinkStorage.Limit,
}
config.AS.Distribution.Global.PubSub = &asdistribredis.PubSub{
Redis: redis.New(config.Cache.Redis.WithNamespace("as", "traffic")),
}
Expand All @@ -374,11 +370,12 @@ var startCommand = &cobra.Command{
return shared.ErrInitializeApplicationServer.WithCause(err)
}
config.AS.PubSub.Registry = pubsubRegistry
applicationPackagesRegistry := &asioapredis.ApplicationPackagesRegistry{
Redis: redis.New(config.Redis.WithNamespace("as", "io", "applicationpackages")),
LockTTL: defaultLockTTL,
}
if err := applicationPackagesRegistry.Init(ctx); err != nil {
applicationPackagesRegistry, err := asioapredis.NewApplicationPackagesRegistry(
ctx,
redis.New(config.Redis.WithNamespace("as", "io", "applicationpackages")),
defaultLockTTL,
)
if err != nil {
return shared.ErrInitializeApplicationServer.WithCause(err)
}
config.AS.Packages.Registry = applicationPackagesRegistry
Expand Down
18 changes: 18 additions & 0 deletions config/messages.json
Original file line number Diff line number Diff line change
Expand Up @@ -2285,6 +2285,24 @@
"file": "messages.go"
}
},
"error:pkg/applicationserver/io/packages/loragls/v3:decoding_field": {
"translations": {
"en": "decoding field `{field}`"
},
"description": {
"package": "pkg/applicationserver/io/packages/loragls/v3",
"file": "data.go"
}
},
"error:pkg/applicationserver/io/packages/loragls/v3:encoding_field": {
"translations": {
"en": "encoding field `{field}`"
},
"description": {
"package": "pkg/applicationserver/io/packages/loragls/v3",
"file": "data.go"
}
},
"error:pkg/applicationserver/io/packages/loragls/v3:field_not_found": {
"translations": {
"en": "field `{field}` not found"
Expand Down
44 changes: 2 additions & 42 deletions pkg/applicationserver/applicationserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ type ApplicationServer struct {

linkRegistry LinkRegistry
deviceRegistry DeviceRegistry
appUpsRegistry ApplicationUplinkRegistry
locationRegistry metadata.EndDeviceLocationRegistry
formatters messageprocessors.MapPayloadProcessor
webhooks ioweb.Webhooks
webhookTemplates ioweb.TemplateStore
pubsub *pubsub.PubSub
appPackages packages.Server
appPkgRegistry packages.Registry
deviceLastSeenProvider lastseen.LastSeenProvider

clusterDistributor distribution.Distributor
Expand Down Expand Up @@ -153,7 +153,7 @@ func New(c *component.Component, conf *Config) (as *ApplicationServer, err error
config: conf,
linkRegistry: conf.Links,
deviceRegistry: wrapEndDeviceRegistryWithReplacedFields(conf.Devices, replacedEndDeviceFields...),
appUpsRegistry: conf.UplinkStorage.Registry,
appPkgRegistry: conf.Packages.Registry,
locationRegistry: conf.EndDeviceMetadataStorage.Location.Registry,
formatters: make(messageprocessors.MapPayloadProcessor),
clusterDistributor: distribution.NewPubSubDistributor(
Expand Down Expand Up @@ -1072,38 +1072,6 @@ func (as *ApplicationServer) matchSession(ctx context.Context, ids *ttnpb.EndDev
return mask, nil
}

// storeUplink stores the provided *ttnpb.ApplicationUplink in the device uplink storage.
// Only fields which are used by integrations are stored.
// The fields which are stored are based on the following usages:
// - io/packages/loragls/v3/package.go#multiFrameQuery
// - io/packages/loragls/v3/api/objects.go#parseRxMetadata.
func (as *ApplicationServer) storeUplink(
ctx context.Context,
ids *ttnpb.EndDeviceIdentifiers,
uplink *ttnpb.ApplicationUplink,
) error {
cleanUplink := &ttnpb.ApplicationUplink{
RxMetadata: make([]*ttnpb.RxMetadata, 0, len(uplink.RxMetadata)),
ReceivedAt: uplink.ReceivedAt,
}
for _, md := range uplink.RxMetadata {
if md.GatewayIds == nil {
continue
}
cleanUplink.RxMetadata = append(cleanUplink.RxMetadata, &ttnpb.RxMetadata{
GatewayIds: &ttnpb.GatewayIdentifiers{
GatewayId: md.GatewayIds.GatewayId,
},
AntennaIndex: md.AntennaIndex,
FineTimestamp: md.FineTimestamp,
Location: md.Location,
Rssi: md.Rssi,
Snr: md.Snr,
})
}
return as.appUpsRegistry.Push(ctx, ids, cleanUplink)
}

// setActivated attempts to mark the end device as activated in the Entity Registry.
// If the update succeeds, the end device will be updated in the Application Server end device registry
// in order to avoid subsequent calls.
Expand Down Expand Up @@ -1220,9 +1188,6 @@ func (as *ApplicationServer) handleUplink(ctx context.Context, info uplinkInfo)
if err := as.publishNormalizedUplink(ctx, info); err != nil {
return err
}
if err := as.storeUplink(ctx, info.ids, info.uplink); err != nil {
return err
}
} else if appSKey := dev.GetSession().GetKeys().GetAppSKey(); appSKey != nil {
info.uplink.AppSKey = appSKey
info.uplink.LastAFCntDown = dev.Session.LastAFCntDown
Expand Down Expand Up @@ -1511,11 +1476,6 @@ func (as *ApplicationServer) GetMQTTConfig(ctx context.Context) (*config.MQTT, e
return &cfg.MQTT, nil
}

// RangeUplinks ranges the application uplinks and calls the callback function, until false is returned.
func (as *ApplicationServer) RangeUplinks(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, paths []string, f func(ctx context.Context, up *ttnpb.ApplicationUplink) bool) error {
return as.appUpsRegistry.Range(ctx, ids, paths, f)
}

// GetEndDevice retrieves the end device associated with the provided identifiers from the end device registry.
func (as *ApplicationServer) GetEndDevice(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, paths []string) (*ttnpb.EndDevice, error) {
return as.deviceRegistry.Get(ctx, ids, paths)
Expand Down
70 changes: 6 additions & 64 deletions pkg/applicationserver/applicationserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,6 @@ func TestApplicationServer(t *testing.T) {
t.Fatalf("Failed to set link in registry: %s", err)
}

applicationUpsRedisClient, applicationUpsFlush := test.NewRedis(ctx, "applicationserver_test", "applicationups")
defer applicationUpsFlush()
defer applicationUpsRedisClient.Close()
applicationUpsRegistry := &redis.ApplicationUplinkRegistry{
Redis: applicationUpsRedisClient,
Limit: 16,
}

webhooksRedisClient, webhooksFlush := test.NewRedis(ctx, "applicationserver_test", "webhooks")
defer webhooksFlush()
defer webhooksRedisClient.Close()
Expand Down Expand Up @@ -296,10 +288,6 @@ func TestApplicationServer(t *testing.T) {
config := &applicationserver.Config{
Devices: deviceRegistry,
Links: linkRegistry,
UplinkStorage: applicationserver.UplinkStorageConfig{
Registry: applicationUpsRegistry,
Limit: 16,
},
MQTT: config.MQTT{
Listen: ":1883",
},
Expand Down Expand Up @@ -2399,14 +2387,6 @@ func TestSkipPayloadCrypto(t *testing.T) {
t.Fatalf("Failed to set link in registry: %s", err)
}

applicationUpsRedisClient, applicationUpsFlush := test.NewRedis(ctx, "applicationserver_test", "applicationups")
defer applicationUpsFlush()
defer applicationUpsRedisClient.Close()
applicationUpsRegistry := &redis.ApplicationUplinkRegistry{
Redis: applicationUpsRedisClient,
Limit: 16,
}

distribRedisClient, distribFlush := test.NewRedis(ctx, "applicationserver_test", "traffic")
defer distribFlush()
defer distribRedisClient.Close()
Expand Down Expand Up @@ -2434,10 +2414,6 @@ func TestSkipPayloadCrypto(t *testing.T) {
config := &applicationserver.Config{
Devices: deviceRegistry,
Links: linkRegistry,
UplinkStorage: applicationserver.UplinkStorageConfig{
Registry: applicationUpsRegistry,
Limit: 16,
},
Distribution: applicationserver.DistributionConfig{
Global: applicationserver.GlobalDistributorConfig{
PubSub: distribPubSub,
Expand Down Expand Up @@ -2926,14 +2902,6 @@ func TestLocationFromPayload(t *testing.T) {
defer distribRedisClient.Close()
distribPubSub := distribredis.PubSub{Redis: distribRedisClient}

applicationUpsRedisClient, applicationUpsFlush := test.NewRedis(ctx, "applicationserver_test", "applicationups")
defer applicationUpsFlush()
defer applicationUpsRedisClient.Close()
applicationUpsRegistry := &redis.ApplicationUplinkRegistry{
Redis: applicationUpsRedisClient,
Limit: 16,
}

c := componenttest.NewComponent(t, &component.Config{
ServiceBase: config.ServiceBase{
GRPC: config.GRPC{
Expand All @@ -2951,10 +2919,6 @@ func TestLocationFromPayload(t *testing.T) {
config := &applicationserver.Config{
Devices: deviceRegistry,
Links: linkRegistry,
UplinkStorage: applicationserver.UplinkStorageConfig{
Registry: applicationUpsRegistry,
Limit: 16,
},
Distribution: applicationserver.DistributionConfig{
Global: applicationserver.GlobalDistributorConfig{
PubSub: distribPubSub,
Expand Down Expand Up @@ -3126,14 +3090,6 @@ func TestUplinkNormalized(t *testing.T) {
defer distribRedisClient.Close()
distribPubSub := distribredis.PubSub{Redis: distribRedisClient}

applicationUpsRedisClient, applicationUpsFlush := test.NewRedis(ctx, "applicationserver_test", "applicationups")
defer applicationUpsFlush()
defer applicationUpsRedisClient.Close()
applicationUpsRegistry := &redis.ApplicationUplinkRegistry{
Redis: applicationUpsRedisClient,
Limit: 16,
}

c := componenttest.NewComponent(t, &component.Config{
ServiceBase: config.ServiceBase{
GRPC: config.GRPC{
Expand All @@ -3151,10 +3107,6 @@ func TestUplinkNormalized(t *testing.T) {
config := &applicationserver.Config{
Devices: deviceRegistry,
Links: linkRegistry,
UplinkStorage: applicationserver.UplinkStorageConfig{
Registry: applicationUpsRegistry,
Limit: 16,
},
Distribution: applicationserver.DistributionConfig{
Global: applicationserver.GlobalDistributorConfig{
PubSub: distribPubSub,
Expand Down Expand Up @@ -3392,22 +3344,13 @@ func TestApplicationServerCleanup(t *testing.T) {
t.FailNow()
}

applicationUpsRedisClient, applicationUpsFlush := test.NewRedis(ctx, "applicationserver_test", "applicationups")
defer applicationUpsFlush()
defer applicationUpsRedisClient.Close()
applicationUpsRegistry := &redis.ApplicationUplinkRegistry{
Redis: applicationUpsRedisClient,
Limit: 16,
}

applicationPackagesRedisClient, applicationPackagesFlush := test.NewRedis(ctx, "applicationserver_test", "applicationpackages")
defer applicationPackagesFlush()
defer applicationPackagesRedisClient.Close()
applicationPackagesRegistry := &asioapredis.ApplicationPackagesRegistry{
Redis: applicationPackagesRedisClient,
LockTTL: test.Delay << 10,
}
if err := applicationPackagesRegistry.Init(ctx); !a.So(err, should.BeNil) {
applicationPackagesRegistry, err := asioapredis.NewApplicationPackagesRegistry(
ctx, applicationPackagesRedisClient, test.Delay<<10,
)
if !a.So(err, should.BeNil) {
t.FailNow()
}

Expand Down Expand Up @@ -3538,7 +3481,7 @@ func TestApplicationServerCleanup(t *testing.T) {
pubsubCleaner := &pubsub.RegistryCleaner{
PubSubRegistry: pubsubRegistry,
}
err := pubsubCleaner.RangeToLocalSet(ctx)
err = pubsubCleaner.RangeToLocalSet(ctx)
a.So(err, should.BeNil)
a.So(pubsubCleaner.LocalSet, should.HaveLength, 3)

Expand All @@ -3550,8 +3493,7 @@ func TestApplicationServerCleanup(t *testing.T) {
a.So(webhookCleaner.LocalSet, should.HaveLength, 3)

devCleaner := &applicationserver.RegistryCleaner{
DevRegistry: deviceRegistry,
AppUpsRegistry: applicationUpsRegistry,
DevRegistry: deviceRegistry,
}
err = devCleaner.RangeToLocalSet(ctx)
a.So(err, should.BeNil)
Expand Down
Loading