forked from OpenHFT/Chronicle-Queue
-
Notifications
You must be signed in to change notification settings - Fork 0
Micro second messaging that stores everything to disk
License
kelvinpho/Chronicle-Queue
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
(~5 M messages / second for 96 byte messages on a i7-4790). The directory structure is as follows: [source] ---- base-directory / {cycle-name}.cq4 - The default format is yyyyMMdd for daily rolling. ---- The format consists of Size Prefixed Bytes which are formatted using BinaryWire or TextWire. The `ChronicleQueue.dump()` method can be used to dump the raw contents as a String. === Getting Started ==== Chronicle Construction Creating an instance of Chronicle is a little more complex than just calling a constructor. To create an instance you have to use the ChronicleQueueBuilder. [source, Java] ---- String basePath = System.getProperty("java.io.tmpdir") + "/getting-started" ChronicleQueue queue = ChronicleQueueBuilder.single("queue-dir").build(); ---- In this example we have created an IndexedChronicle which creates two RandomAccessFiles; one for indexes and one for data having names relatively: ${java.io.tmpdir}/getting-started/{today}.cq4 ==== Writing [source, Java] ---- // Obtain an ExcerptAppender ExcerptAppender appender = queue.acquireAppender(); // write - {msg: TestMessage} appender.writeDocument(w -> w.write(() -> "msg").text("TestMessage")); // write - TestMessage appender.writeText("TestMessage"); ---- ==== Reading [source, Java] ---- ExcerptTailer tailer = queue.createTailer(); tailer.readDocument(w -> System.out.println("msg: " + w.read(()->"msg").text())); assertEquals("TestMessage", tailer.readText()); ---- ==== Cleanup Chronicle Queue stores its data off heap, and it is recommended that you call `close()` once you have finished working with Chronicle-Queue to free resources, NOTE: no data will be lost if you don't do this, this is only to clean resources used. [source, Java] ---- queue.close(); ---- ==== Putting it all together [source, Java] ---- try (ChronicleQueue queue = ChronicleQueueBuilder.single("queue-dir").build()) { // Obtain an ExcerptAppender ExcerptAppender appender = queue.acquireAppender(); // write - {msg: TestMessage} appender.writeDocument(w -> w.write(() -> "msg").text("TestMessage")); // write - TestMessage appender.writeText("TestMessage"); ExcerptTailer tailer = queue.createTailer(); tailer.readDocument(w -> System.out.println("msg: " + w.read(()->"msg").text())); assertEquals("TestMessage", tailer.readText()); } ---- === FAQ ==== Do we have to use Wire, can we use Bytes? You can access the Bytes in wire as follows: .Writing to Bytes [source, Java] ---- try (DocumentContext dc = appender.writingDocument()) { Wire wire = dc.wire(); Bytes bytes = wire.bytes(); // write to bytes } ---- .Reading from Bytes [source, Java] ---- try (DocumentContext dc = tailer.readingDocument()) { Wire wire = dc.wire(); Bytes bytes = wire.bytes(); // read from the bytes } ---- ==== Is there a lower level interface? You can access native memory: .Writing to native memory [source, Java] ---- try (DocumentContext dc = appender.writingDocument()) { Wire wire = dc.wire(); Bytes bytes = wire.bytes(); long address = bytes.address(bytes.readPosition()); // write to native memory bytes.writeSkip(lengthActuallyWritten); } ---- .Reading from native memory [source, Java] ---- try (DocumentContext dc = appender.writingDocument()) { Wire wire = dc.wire(); Bytes bytes = wire.bytes(); long address = bytes.address(bytes.readPosition()); long length = bytes.readRemaining(); // read from native memory } ---- == Design === Motivation Chronicle Queue is designed to be a "record everything store" which can read with micro-second real time latency. This supports even the most demanding High Frequency Trading systems, however it can be used in any application where the recording of information is a concern. Chronicle Queue Enterprise is designed to support reliable replication with notification to either the appender or a tailer when a message has been successfully replicated. === Persistence Chronicle Queue assumes disk space is cheap (compared with memory). Enterprise SSD costs have come down. One GB of disk space is worth less than 1 minute of your time on minimum wage (in the UK at time of writing; July 2016). Queue makes full use of the disk space you have, and so you are not limited by the main memory of your machine. If you use spinning HDD, you can store many TB of disk space for little cost. The only piece of software Chronicle Queue needs to run is the Operating System. It doesn't have a broker, instead it uses your Operating System to do all the work. If your application dies, the OS keeps running for seconds longer, so no data is lost, even without replication. As Chronicle Queue stores all saved data in memory mapped files, this has a trivial on heap overhead, even if you have over 100 TB of data. === Efficiency We put significant effort into worrying about latency you can't see. Unlike products which focus on support of the web, we care about latency which are a fraction of the time you can see. Less than 40 ms is fine for web applications as it's faster than you can see (the frame rate of cinema is 24 Hz or about 40 ms). However, we attempt to be under 40 microsecond 99% to 99.99% of the time. Using queue without replication, we support applications with latencies below 40 microseconds end to end across multiple services. Often the 99% latency of queue is entirely dependant on the choice of OS and disk subsystem. === Compression Replication for Chronicle Queue supports Chronicle Wire Enterprise. This supports a real time compression which calculates the deltas for individual objects as they are written. This can reduce the size of messages to 1/10th, or better, without the need for batching i.e without introducing significant latency. Queue also supports LZW, Snappy and GZIP compression however, these add non-triviallatency. These are only useful if you have to have strict limitations on network bandwidth. === Delivery Mode Semantics Chronicle Queue supports a number of semantics. - Every message is replayed on restart. - Only new messages are played on restart. - Restart from any known point using the index of the entry. - Replay only the messages you have missed. This is supported directly using the methodReader/methodWriter builders. === Detailed tracing of timings. Chronicle Queue supports explicit or implicit nano-second resolution timing for messages as they pass end to end over across your system. We support using nanotime across machines, without the need for specialist hardware. .Enabling high resolution timings [source, Java] ---- SidedMarketDataListener combiner = out.acquireAppender() .methodWriterBuilder(SidedMarketDataListener.class) .recordHistory(true) .get(); combiner.onSidedPrice(new SidedPrice("EURUSD1", 123456789000L, Side.Sell, 1.1172, 2e6)); ---- A timestamp is added for each read and write as it passes from service to service. .Downstream message triggered by the event above [source, Yaml] ---- --- !!data #binary history: { sources: [ 1, 0x426700000000 # <4> ] timings: [ 1394278797664704, # <1> 1394278822632044, # <2> 1394278824073475 # <3> ] } onTopOfBookPrice: { symbol: EURUSD1, timestamp: 123456789000, buyPrice: NaN, buyQuantity: 0, sellPrice: 1.1172, sellQuantity: 2000000.0 } ---- <1> First write <2> First read <3> Write of the result of the read. <4> What triggered this event. === Using high resolution timings across machines. On most systems `System.nanoTime()` is roughly the number of nano-seconds since the system last rebooted (although different JVMs may behave differently). This is the same across JVM on the same machine, but wildly different between machines. The absolute difference when it comes to machines is meaningless, however the information can be used to detect outliers. i.e. you can't determine what the best latency is, but you can determine how far off the best latencies you are. This is useful if you are focusing on the 99th percentile latencies for example. We have a class called `RunningMinimum` to obtain timings from different machines while compensating for a drift in the nanoTime between machines. The more often you take measurements the more accurate this running minimum is. === Compacting logs Chronicle Queue manages storage by cycle. You can add a `StoreFileListener` which will notify you when a file is added and when it is no longer retained. You can move, compress or delete all the messages for a day at once. == Chronicle Queue vs Kafka Chronicle Queue is designed to support over an order of magnitude of throughput, with an order of magnitude lower than the latency of Kafka. While Karfa is faster than many of the alternatives it doesn't support both throughputs over million of events per second, and low latency e.g. 1 - 20 micro-seconds at the same time. Chronicle Queue attempts to handle more volume from a single thread, to a single partition. Thus avoiding the need for the complexity and the downsides of having partitions. NOTE: Chronicle Engine supports partitioning of queues across machines, though not the partitioning of a queue. Kafka uses a broker which uses the operating system's file system and cache. On the other hand, Chronicle Queue relies entirely on the file system and cache. === Similar product guides http://kafka.apache.org/documentation.html[Kafka Documentation] === Replication Chronicle Queue Enterprise supports TCP replication with optional filtering so only the required record or even fields are transmitted. This improves performances and reduces bandwidth requirements. image::http://chronicle.software/wp-content/uploads/2014/07/Screen-Shot-2015-01-16-at-15.06.49.png[] === Support * https://github.com/OpenHFT/Chronicle-Queue/blob/master/docs/FAQ.adoc[Chronicle FAQ] * http://stackoverflow.com/tags/chronicle/info[Chronicle support on StackOverflow] * https://groups.google.com/forum/?hl=en-GB#!forum/java-chronicle[Chronicle support on Google Groups] * https://higherfrequencytrading.atlassian.net/browse/CHRON[Development Tasks - JIRA] == Latency Test for Chronicle Queue replication The following charts time how long it takes to: - Write a 40 byte message to a Chronicle Queue. - Have the write replicated over TCP. - Have the second copy acknowledge receipt of the message. - Have a thread read the acknowledged message. The test is run for ten minutes and the distribution of latenices plotted. image:https://vanilla-java.github.io/images/Latency-to-993.png[] NOTE: There is a step in latency at around 10 million message per second jumps as the messages start to batch. At rates below this, each message can be sent individually. The 99.99%ile and above are believed to be delays in passing the message over TCP. Further reserach is needed to prove this. In any case, these delays are much the same regardless of the throughput. The 99.9%ile and 99.93%ile are a function of how quickly the system can recover after a delay. The higher the throughput, the less head room the system has to recover form a delay. image:https://vanilla-java.github.io/images/Latency-from-993.png[] === Summary In the test above, the typical latency varied between 14 and 40 micro-seconds, the 99%ile varied between 17 and 56 micro-seconds depending on the throughput being tested. Notably, the 99.93% latency varied between 21 micro-seconds and 41 milli-seconds, a factor of 2000. .Possible Throughput results depending on acceptable latencies |=== | Acceptable Latency | Throughput | < 30 micro-seconds 99.3% of the time | 7 million message per second | < 20 micro-seconds 99.9% of the time | 20 million messages per second | < 1 milli-seconds 99.9% of the time | 50 million messages per second | < 60 micro-seconds 99.3% of the time | 80 million message per second |===
About
Micro second messaging that stores everything to disk
Resources
License
Stars
Watchers
Forks
Packages 0
No packages published
Languages
- Java 100.0%