diff --git a/lib/src/api_options.dart b/lib/src/api_options.dart index c9b107e6e..6e6b41017 100644 --- a/lib/src/api_options.dart +++ b/lib/src/api_options.dart @@ -99,7 +99,8 @@ class GatewayApiOptions extends RestApiOptions { Map get gatewayConnectionOptions => { 'v': apiVersion.toString(), 'encoding': payloadFormat.value, - if (compression == GatewayCompression.transport) 'compress': 'zlib-stream', + if (compression == GatewayCompression.transportZLib) 'compress': 'zlib-stream', + if (compression == GatewayCompression.transportZStd) 'compress': 'zstd-stream', }; /// Create a new [GatewayApiOptions]. @@ -108,7 +109,7 @@ class GatewayApiOptions extends RestApiOptions { super.userAgent, required this.intents, this.payloadFormat = GatewayPayloadFormat.json, - this.compression = GatewayCompression.transport, + this.compression = GatewayCompression.transportZLib, this.shards, this.totalShards, this.largeThreshold, @@ -138,11 +139,17 @@ enum GatewayCompression { /// No compression is used. none, - /// The entire connection is compressed. - transport, + /// The entire connection is compressed with ZLib. + transportZLib, + + /// The entire connection is compressed with ZStd + transportZStd, /// Each packet is individually compressed. /// /// Cannot be used if [GatewayPayloadFormat.etf] is used. - payload, + payload; + + @Deprecated('Use GatewayCompression.transportZLib for ZLib transport compression.') + static const transport = GatewayCompression.transportZLib; } diff --git a/lib/src/gateway/shard_runner.dart b/lib/src/gateway/shard_runner.dart index 756e08577..f48f28d86 100644 --- a/lib/src/gateway/shard_runner.dart +++ b/lib/src/gateway/shard_runner.dart @@ -3,6 +3,7 @@ import 'dart:convert'; import 'dart:io'; import 'dart:math'; +import 'package:es_compression/zstd.dart'; import 'package:eterl/eterl.dart'; import 'package:nyxx/src/api_options.dart'; import 'package:nyxx/src/errors.dart'; @@ -367,7 +368,8 @@ class ShardConnection extends Stream implements StreamSink { final connection = await WebSocket.connect(gatewayUri); final uncompressedStream = switch (runner.data.apiOptions.compression) { - GatewayCompression.transport => decompressTransport(connection.cast>()), + GatewayCompression.transportZLib => decompressZLibTransport(connection.cast>()), + GatewayCompression.transportZStd => decompressZStdTransport(connection.cast>()), GatewayCompression.payload => decompressPayloads(connection), GatewayCompression.none => connection, }; @@ -450,7 +452,7 @@ class ShardConnection extends Stream implements StreamSink { Future get done => websocket.done.then((_) => _sentController.done); } -Stream decompressTransport(Stream> raw) { +Stream decompressZLibTransport(Stream> raw) { final filter = RawZLibFilter.inflateFilter(); return raw.map((chunk) { @@ -465,6 +467,9 @@ Stream decompressTransport(Stream> raw) { }); } +// See https://github.com/instantiations/es_compression/issues/52 for why the isNotEmpty check is needed. +Stream decompressZStdTransport(Stream> raw) => raw.transform(zstd.decoder).where((message) => message.isNotEmpty); + Stream decompressPayloads(Stream raw) => raw.map((message) { if (message is String) { return message; diff --git a/pubspec.yaml b/pubspec.yaml index bd8da345f..0fa8d7465 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -13,11 +13,11 @@ dependencies: http: ^1.0.0 logging: ^1.0.1 path: ^1.8.3 - retry: ^3.1.0 eterl: ^1.0.1 runtime_type: ^1.0.1 meta: ^1.9.1 oauth2: ^2.0.2 + es_compression: ^2.0.12 dev_dependencies: test: ^1.22.0 diff --git a/test/integration/gateway_integration_test.dart b/test/integration/gateway_integration_test.dart index d7a47d80e..7539f65e1 100644 --- a/test/integration/gateway_integration_test.dart +++ b/test/integration/gateway_integration_test.dart @@ -19,55 +19,21 @@ void main() { await expectLater(client.close(), completes); } - test( - 'JSON (uncompressed)', - () => testClient(GatewayApiOptions( - token: testToken!, - intents: GatewayIntents.none, - compression: GatewayCompression.none, - payloadFormat: GatewayPayloadFormat.json, - )), - ); - - test( - 'JSON (payload compression)', - () => testClient(GatewayApiOptions( - token: testToken!, - intents: GatewayIntents.none, - compression: GatewayCompression.payload, - payloadFormat: GatewayPayloadFormat.json, - )), - ); - - test( - 'JSON (transport compression)', - () => testClient(GatewayApiOptions( - token: testToken!, - intents: GatewayIntents.none, - compression: GatewayCompression.transport, - payloadFormat: GatewayPayloadFormat.json, - )), - ); - - test( - 'ETF (uncompressed)', - () => testClient(GatewayApiOptions( - token: testToken!, - intents: GatewayIntents.none, - compression: GatewayCompression.none, - payloadFormat: GatewayPayloadFormat.etf, - )), - ); - - test( - 'ETF (transport compression)', - () => testClient(GatewayApiOptions( - token: testToken!, - intents: GatewayIntents.none, - compression: GatewayCompression.transport, - payloadFormat: GatewayPayloadFormat.etf, - )), - ); + for (final compression in GatewayCompression.values) { + for (final payloadFormat in GatewayPayloadFormat.values) { + if (payloadFormat != GatewayPayloadFormat.json && compression == GatewayCompression.payload) continue; + + test( + '${payloadFormat.value.toUpperCase()} (${compression.name})', + () => testClient(GatewayApiOptions( + token: testToken!, + intents: GatewayIntents.none, + compression: compression, + payloadFormat: payloadFormat, + )), + ); + } + } test('Multiple shards', () async { const shardCount = 5;