-
Notifications
You must be signed in to change notification settings - Fork 896
Aeron Archive
The Aeron Archive service can record and replay messages streams from durable/persistent storage. The service is designed to be high performance and tests show that its major limitation is the performance of the storage for the recordings. With sufficiently fast storage the archive can record or replay a stream of message at full 10GigE line rate.
Samples can be found here and systems tests here.
To start the Archive service you launch the ArchivingMediaDriver which includes the media driver by composition. This can run standalone or be embedded in another service. The configuration options can be found in the Archive.
The service offers these main features:
-
Record: service can record a particular subscription, described by
<channel, streamId>
. Each resulting image for the subscription will be recorded under a newrecordingId
. Local network publications are recorded using the spy feature for efficiency. If no subscribers are active then the recording can advance the stream by setting theaeron.spies.simulate.connection
system property to true. -
Extend: service can extend an existing recording by appending.
-
Replay: service can replay a recorded
recordingId
from a particularposition
, and for a particularlength
which can beAeron.NULL_VALUE
for an open ended replay. -
Query: the catalog for existing recordings and the recorded position of an active recording.
-
Truncate: allows a stopped recording to have its length truncated, and if truncated to the start position then it is effectively deleted.
-
Replay Merge: allows a late joining subscriber of a recorded stream to replay a recording and then merge with the live stream for cut over if the consumer is fast enough to keep up.
-
Replicate: allows for the replication of a recording from one archive to another, plus have the option to merge with a live multicast stream and record it after using the source archive to catchup. When using replication it is necessary to configure the replication channel for the destination archive with
aeron.archive.replication.channel
. -
Purge & Restore: allows for history of a long running recording, which is likely to keep extending, to have the oldest history purged, to save disk space, and restored if required later. This works at the granularity of recording segments which are a multiple of term buffers. The oldest segments can be detached from a recording to allow for compression or copying elsewhere, they can also be optionally deleted via the API, or a combination of detach and delete can be achieved with the purge operation. Segments can be copied back into place and then attached. Alternatively, a backed up stream to another archive can be replayed and recorded for the range required and then migrated to the beginning of the recording to restore them.
-
Checksums: allows computing checksums such as CRC32 and CRC32C while recording and verifying them during replay.
The Archive service can be run in one of three threading modes:
- ArchiveThreadingMode.DEDICATED: 3 threads are used. One for each of the conductor responding to control signals and queries, one for recording streams, and one for replaying streams.
- ArchiveThreadingMode.SHARED: 1 thread is used for all the control, recording, and replay.
-
ArchiveThreadingMode.INVOKER: No threads are used in the archive and the duty cycle is driven externally by calling the
AgentInvoker.invoke()
method on theArchive.invoker()
object. Each call to invoke with perform one duty cycle of the Archive.
The overall threading usage of the Archive is a combination for the ArchviveThreadingMode
and the ThreadingMode
for the composed MediaDriver
.
The Archive Client can communicate with an Archive using the control protocol and receive events on the progress of recording via the recording events stream. To support dynamic subscribers for the recording events stream then its channel can be multicast or MDC (Multi-Destination-Cast).
Samples using an archive can be found here.
An archive can be instructed to record streams, i.e. <channel, streamId>
pairs. These streams are recorded with the file sync level the archive has been launched with. Progress is reported on the recording events stream.
-
aeron.archive.file.sync.level=0
: for normal writes to the OS page cache for background writing to disk. -
aeron.archive.file.sync.level=1
: for forcing the dirty data pages to disk. -
aeron.archive.file.sync.level=2
: for forcing the dirty data pages and file metadata to disk.
When setting file sync level greater than zero it is also important to sync the archive catalog with the
aeron.archive.catalog.file.sync.level
to the same value.
Recordings will be assigned a recordingId
and a full description of the stream is captured in the Archive Catalog. The Catalog chronicles the contents of an archive as RecordingDescriptor
s which can be queried.
The progress of active recordings can be tracked using AeronStat
to view the rec-pos
counter for each stream.
It is possible to record a stream, over a UDP based channel, when no subscribers have connected by enabling a property so that spy subscriptions can simulate a connection on the media driver via either of:
-
aeron.spies.simulate.connection
system property totrue
. -
MediaDriver.Context.spiesSimulateConnection(boolean)
totrue
.
Spy subscriptions are very efficient when used to record a local outbound network publication without requiring a local subscription to receive the outbound data stream on a inbound connection. IPC based channels will be connected when the archive starts to record regardless of if this property is set or not.
The contents for Archive can be queried by listing the RecordingDescriptor
s. This can be a simple paging through all descriptors in the catalog, or the query can be filtered by <channel, streamId>
to reduce the result set.
The SBE message format for the descriptors is as follows:
<sbe:message name="RecordingDescriptor"
id="22"
description="Describes a recording in the catalog">
<field name="controlSessionId" id="1" type="int64"/>
<field name="correlationId" id="2" type="int64"/>
<field name="recordingId" id="3" type="int64"/>
<field name="startTimestamp" id="4" type="time_t"/>
<field name="stopTimestamp" id="5" type="time_t"/>
<field name="startPosition" id="6" type="int64"/>
<field name="stopPosition" id="7" type="int64"/>
<field name="initialTermId" id="8" type="int32"/>
<field name="segmentFileLength" id="9" type="int32"/>
<field name="termBufferLength" id="10" type="int32"/>
<field name="mtuLength" id="11" type="int32"/>
<field name="sessionId" id="12" type="int32"/>
<field name="streamId" id="13" type="int32"/>
<data name="strippedChannel" id="14" type="varAsciiEncoding"/>
<data name="originalChannel" id="15" type="varAsciiEncoding"/>
<data name="sourceIdentity" id="16" type="varAsciiEncoding"/>
</sbe:message>
Note: A stopPosition
of -1
means a live recording is in progress.
A replay of a recorded stream can be requested by recordingId
. It is possible to replay a recording that has stopped or one that is currently in progress. Replays are request from a position and for a length. The replay session id returned from the replay request will be the sessionId
of the replayed Aeron stream in the lower 32-bits which can be obtained with a downcast to an int
. The full 64-bit return value for the replay session is required to stop the replay early.
If the requested replay is for a recording that is currently in progress and the length goes beyond the current recorded position then replay will track the live recording. This can be useful for a subscriber that wants to track a live stream but does not want to participate in flow control of the transient stream if it make slow the others down.
Replays of live streams should sent to a different <channel, streamId>
pairing to avoid congestion.
Archive can compute checksums while recording data and verify them during replay. Both features are enabled separately via the properties aeron.archive.record.checksum
(for record) and aeron.archive.replay.checksum
(for replay) or the corresponding configuration API (i.e. io.aeron.archive.Archive.Context#recordChecksum(io.aeron.archive.checksum.Checksum)
and io.aeron.archive.Archive.Context#replayChecksum(io.aeron.archive.checksum.Checksum)
respectively). The value to either of the properties should be a fully qualified class name that implements the io.aeron.archive.checksum.Checksum
interface.
NOTE: Enabling checksums for replay should use the same io.aeron.archive.checksum.Checksum
implementation as for the recording, otherwise replay will fail with the io.aeron.archive.client.ArchiveException
.
Out of the box Aeron provides two implementations of the io.aeron.archive.checksum.Checksum
interface:
-
io.aeron.archive.checksum.Crc32
- implements CRC32 checksum algorithm. Available always. -
io.aeron.archive.checksum.Crc32c
- implements CRC32C checksum algorithm. Available on JDK 9+. It is an error to configure it for JDK 8, i.e.java.lang.IllegalStateException
will be thrown in this case.
It is also possible to access these implementations programmatically via the io.aeron.archive.checksum.Checksums
class.
A custom implementation of the io.aeron.archive.checksum.Checksum
interface can be provided but it should (ideally) be stateless or at the very least thread safe since Aeron will use a single instance for all invocations.
Besides being computed and checked during normal archive operations the checksums can be checked and/or re-computed by using the io.aeron.archive.ArchiveTool
(CLI and API), i.e.:
-
verify
command accepts an optional-checksum className
parameter. When specified the recorded checksums in the segment files will be verified using the specifiedio.aeron.archive.checksum.Checksum
implementation. If a mismatch is detected the corresponding recording is marked as invalid. -
checksum
command allows computing and overwriting checksum information in the segment files using the specifiedio.aeron.archive.checksum.Checksum
implementation.
For more information please refer to the JavaDoc of the io.aeron.archive.ArchiveTool
class.