Skip to content

Commit

Permalink
feat(logging): add cloudwatch logger plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Nika Hassani committed Aug 24, 2023
1 parent b8ec5b4 commit b5f8b74
Show file tree
Hide file tree
Showing 9 changed files with 822 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,8 @@

/// aws_logging_cloudwatch
library aws_logging_cloudwatch;

export 'src/cloudwatch_logger_plugin.dart';
export 'src/log_stream_provider.dart';
export 'src/plugin_config.dart';
export 'src/remote_constraint_provider.dart';
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import 'dart:async';
import 'dart:math';

import 'package:aws_common/aws_common.dart';
import 'package:aws_logging_cloudwatch/aws_logging_cloudwatch.dart';
import 'package:aws_logging_cloudwatch/src/queued_item_store/in_memory_queued_item_store.dart';
import 'package:aws_logging_cloudwatch/src/queued_item_store/queued_item_store.dart';
import 'package:aws_logging_cloudwatch/src/sdk/cloud_watch_logs.dart';
import 'package:aws_logging_cloudwatch/src/stoppable_timer.dart';
import 'package:fixnum/fixnum.dart';
import 'package:meta/meta.dart';

// https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html

// The batch of events must satisfy the following constraints:

// The maximum batch size is 1,048,576 bytes. This size is calculated as the sum of all event messages in UTF-8, plus 26 bytes for each log event.

// None of the log events in the batch can be more than 2 hours in the future.

// None of the log events in the batch can be more than 14 days in the past. Also, none of the log events can be from earlier than the retention period of the log group.

// The log events in the batch must be in chronological order by their timestamp. The timestamp is the time that the event occurred, expressed as the number of milliseconds after Jan 1, 1970 00:00:00 UTC. (In AWS Tools for PowerShell and the AWS SDK for .NET, the timestamp is specified in .NET format: yyyy-mm-ddThh:mm:ss. For example, 2017-09-15T13:45:30.)

// A batch of log events in a single request cannot span more than 24 hours. Otherwise, the operation fails.

// Each log event can be no larger than 256 KB.

// The maximum number of log events in a batch is 10,000.

const int _maxNumberOfLogEventsInBatch = 10000;
const int _maxLogEventsTimeSpanInBatch = 24 * 3600;
const int _maxLogEventsBatchSize = 1048576;
const int _baseBufferSize = 26;
const int _maxLogEventSize = 256000;

typedef _LogBatch = (List<QueuedItem> logQueues, List<InputLogEvent> logEvents);

/// {@template aws_logging_cloudwatch.cloudwatch_logger_plugin}
/// An [AWSLoggerPlugin] for sending logs to AWS CloudWatch Logs.
/// {@endtemplate}
class CloudWatchLoggerPlugin extends AWSLoggerPlugin
with AWSDebuggable, AWSLoggerMixin {
/// {@macro aws_logging_cloudwatch.cloudwatch_logger_plugin}
factory CloudWatchLoggerPlugin({
required AWSCredentialsProvider credentialsProvider,
required CloudWatchLoggerPluginConfiguration pluginConfig,
RemoteLoggingConstraintProvider? remoteLoggingConstraintProvider,
CloudWatchLogStreamProvider? logStreamProvider,
}) {
return CloudWatchLoggerPlugin._(
credentialsProvider: credentialsProvider,
pluginConfig: pluginConfig,
remoteLoggingConstraintProvider: remoteLoggingConstraintProvider,
logStreamProvider: logStreamProvider,
logStore: InMemoryQueuedItemStore(),
);
}

/// An [AWSLoggerPlugin] to use only for testing.
@protected
@visibleForTesting
CloudWatchLoggerPlugin.testPlugin({
required CloudWatchLogsClient client,
required CloudWatchLoggerPluginConfiguration pluginConfig,
required CloudWatchLogStreamProvider logStreamProvider,
required QueuedItemStore logStore,
RemoteLoggingConstraintProvider? remoteLoggingConstraintProvider,
}) : _enabled = pluginConfig.enable,
_pluginConfig = pluginConfig,
_logStore = logStore,
_remoteLoggingConstraintProvider = remoteLoggingConstraintProvider,
_logStreamProvider = logStreamProvider,
_client = client;

CloudWatchLoggerPlugin._({
required AWSCredentialsProvider credentialsProvider,
required CloudWatchLoggerPluginConfiguration pluginConfig,
RemoteLoggingConstraintProvider? remoteLoggingConstraintProvider,
CloudWatchLogStreamProvider? logStreamProvider,
QueuedItemStore? logStore,
}) : _enabled = pluginConfig.enable,
_pluginConfig = pluginConfig,
_logStore = logStore ?? InMemoryQueuedItemStore(),
_remoteLoggingConstraintProvider = remoteLoggingConstraintProvider ??
(pluginConfig.defaultRemoteConfiguration != null
? DefaultRemoteLoggingConstraintProvider(
config: pluginConfig.defaultRemoteConfiguration!,
credentialsProvider: credentialsProvider,
)
: null),
_client = CloudWatchLogsClient(
region: pluginConfig.region,
credentialsProvider: credentialsProvider,
),
_logStreamProvider = logStreamProvider ??
DefaultCloudWatchLogStreamProvider(
logGroupName: pluginConfig.logGroupName,
client: CloudWatchLogsClient(
region: pluginConfig.region,
credentialsProvider: credentialsProvider,
),
) {
_timer = pluginConfig.flushIntervalInSeconds > Duration.zero
? StoppableTimer(
duration: pluginConfig.flushIntervalInSeconds,
callback: _startSyncingIfNotInProgress,
onError: _onTimerError,
)
: null;
if (!pluginConfig.enable) {
_timer?.stop();
}
}

final CloudWatchLoggerPluginConfiguration _pluginConfig;
final CloudWatchLogsClient _client;
final CloudWatchLogStreamProvider _logStreamProvider;
final QueuedItemStore _logStore;
bool _syncing = false;
bool _enabled;
StoppableTimer? _timer;
RemoteLoggingConstraintProvider? _remoteLoggingConstraintProvider;

set remoteLoggingConstraintProvider(
RemoteLoggingConstraintProvider remoteProvider,
) {
if (_remoteLoggingConstraintProvider != null) {
throw StateError(
'remoteLoggingConstraintProvider is already configured.',
);
}
_remoteLoggingConstraintProvider = remoteProvider;
}

Future<void> _startSyncingIfNotInProgress() async {
Future<void> startSyncing() async {
String logStream;
try {
logStream = await _logStreamProvider.logStream;
} on Exception catch (e) {
if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) {
await _logStore.clear();
logger.warn(
'Reached local store max size of: '
'${_pluginConfig.localStoreMaxSizeInMB}.Hence logs are deleted from '
'local store.',
);
}
logger.error('Failed to create CloudWatch log stream', e);
return;
}

final batcheStream = _getLogBatchesToSync();
await for (final (logs, events) in batcheStream) {
try {
final response = await _sendToCloudWatch(events, logStream);
if (response.rejectedLogEventsInfo?.tooNewLogEventStartIndex !=
null) {
break;
}
await _logStore.deleteItems(logs);
} on Exception catch (e) {
if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) {
await _logStore.deleteItems(logs);
logger.warn(
'Reached local store max size of: '
'${_pluginConfig.localStoreMaxSizeInMB}.Hence logs are deleted '
'from local store.',
);
}
logger.error('Failed to sync batched logs to CloudWatch', e);
}
}
}

if (!_syncing) {
_syncing = true;
try {
await startSyncing();
} on Exception catch (e) {
logger.error('Failed to sync logs to CloudWatch.', e);
} finally {
_syncing = false;
}
}
}

void _onTimerError(Object e) {
logger.error('Failed to sync logs to CloudWatch.', e);
}

LoggingConstraint _getLoggingConstraint() {
final result = _remoteLoggingConstraintProvider?.loggingConstraint;
return result ?? _pluginConfig.localLoggingConstraint;
}

Future<PutLogEventsResponse> _sendToCloudWatch(
List<InputLogEvent> logEvents,
String logStreamName,
) async {
final request = PutLogEventsRequest(
logGroupName: _pluginConfig.logGroupName,
logStreamName: logStreamName,
logEvents: logEvents,
);
return _client.putLogEvents(request).result;
}

Stream<_LogBatch> _getLogBatchesToSync() async* {
final queuedLogs = (await _logStore.getAll()).toList();
if (queuedLogs.isEmpty) {
yield ([], []);
}
final logEvents = <InputLogEvent>[];
final logQueues = <QueuedItem>[];
var totalByteSize = 0;

for (final currentLog in queuedLogs) {
final currentLogEvent = currentLog.toInputLogEvent();
final size = currentLogEvent.message.length + _baseBufferSize;
if (totalByteSize + size >= _maxLogEventsBatchSize ||
logEvents.length >= _maxNumberOfLogEventsInBatch ||
(logEvents.length > 1 &&
currentLogEvent.timestamp - logEvents.first.timestamp >=
_maxLogEventsTimeSpanInBatch)) {
yield (logQueues, logEvents);
totalByteSize = 0;
logEvents.clear();
logQueues.clear();
}
totalByteSize += size;
logEvents.add(currentLogEvent);
logQueues.add(currentLog);
}
yield (logQueues, logEvents);
}

/// Whether a [logEntry] should be logged by this plugin.
Future<bool> _isLoggable(LogEntry logEntry) async {
if (!_enabled) {
return false;
}
final loggingConstraint = _getLoggingConstraint();
return logEntry.level >= loggingConstraint.defaultLogLevel;
}

@override
Future<void> handleLogEntry(LogEntry logEntry) async {
if (!(await _isLoggable(logEntry))) {
return;
}
final item = logEntry.toQueuedItem();
await _logStore.addItem(
item.value,
item.timestamp,
);
if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) {
await _startSyncingIfNotInProgress();
}
}

/// Enables the plugin.
void enable() {
if (!_enabled) {
_enabled = true;
_timer?.start();
}
}

/// Disables the plugin.
///
/// It deletes cached logs from local store and stops sending logs to
/// CloudWatch.
///
/// To send cached logs to CloudWatch call `flushLogs()` before calling
/// `disable()`.
Future<void> disable() async {
_enabled = false;
_timer?.stop();
await _logStore.clear();
}

/// Sends logs on-demand to CloudWatch.
Future<void> flushLogs() async {
await _startSyncingIfNotInProgress();
}

@override
String get runtimeTypeName => 'CloudWatchLoggerPlugin';
}

extension on QueuedItem {
InputLogEvent toInputLogEvent() {
// message is truncated to satisfy the max size of `_maxLogEventSize` with
// 8 byte reserved for the `timestamp`.

final end = min(value.length, _maxLogEventSize - 8);
return InputLogEvent(
message: value.substring(0, end),
timestamp: Int64(DateTime.parse(timestamp).millisecondsSinceEpoch),
);
}
}

extension on LogEntry {
({String value, String timestamp}) toQueuedItem() {
return (
value: '${level.name.toUpperCase()}/$loggerName: $message',
timestamp: time.toUtc().toIso8601String()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import 'package:aws_logging_cloudwatch/src/sdk/cloud_watch_logs.dart';
import 'package:intl/intl.dart';

/// {@template aws_logging_cloudwatch.cloudwatch_logstream_provider}
/// An interface to provide custom implementation for
/// [CloudWatchLogStreamProvider]
/// {@endtemplate}
abstract class CloudWatchLogStreamProvider {
/// Returns CloudWatch log stream name to use for sending logs to CloudWatch.
///
/// It creates the log stream if it does not exists.
Future<String> get logStream;
}

/// {@template aws_logging_cloudwatch.default_cloudwatch_logstream_provider}
/// The default implementaion of [CloudWatchLogStreamProvider].
///
/// It uses `logStreamName` if provided otherwise uses `yyyy-MM-dd` date format
/// of UTC time now for the `logStreamName`.
/// {@endtemplate}
class DefaultCloudWatchLogStreamProvider
implements CloudWatchLogStreamProvider {
/// {@macro aws_logging_cloudwatch.default_cloudwatch_logstream_provider}
DefaultCloudWatchLogStreamProvider({
required CloudWatchLogsClient client,
required String logGroupName,
String? logStreamName,
}) : _logStreamName = logStreamName,
_logGroupName = logGroupName,
_client = client;

final String? _logStreamName;
final String _logGroupName;
final CloudWatchLogsClient _client;
static final DateFormat _dateFormat = DateFormat('yyyy-MM-dd');

/// Creates CloudWatch log stream if does not exists and returns
/// the log stream name.
///
/// Throws an [Exception] if fails to create the log stream.
@override
Future<String> get logStream async {
final logStreamName =
_logStreamName ?? _dateFormat.format(DateTime.timestamp());
try {
await _client
.createLogStream(
CreateLogStreamRequest(
logGroupName: _logGroupName,
logStreamName: logStreamName,
),
)
.result;
} on ResourceAlreadyExistsException {
return logStreamName;
} on Exception {
rethrow;
}
return logStreamName;
}
}
Loading

0 comments on commit b5f8b74

Please sign in to comment.