|
7 | 7 |
|
8 | 8 | -module(rabbit_queue_index). |
9 | 9 |
|
| 10 | +-compile({inline, [segment_entry_count/0]}). |
| 11 | + |
10 | 12 | -export([erase/1, init/3, reset_state/1, recover/6, |
11 | 13 | terminate/3, delete_and_terminate/1, |
12 | 14 | pre_publish/7, flush_pre_publish_cache/2, |
|
43 | 45 | %% then delivered, then ack'd. |
44 | 46 | %% |
45 | 47 | %% In order to be able to clean up ack'd messages, we write to segment |
46 | | -%% files. These files have a fixed number of entries: ?SEGMENT_ENTRY_COUNT |
| 48 | +%% files. These files have a fixed number of entries: segment_entry_count() |
47 | 49 | %% publishes, delivers and acknowledgements. They are numbered, and so |
48 | 50 | %% it is known that the 0th segment contains messages 0 -> |
49 | | -%% ?SEGMENT_ENTRY_COUNT - 1, the 1st segment contains messages |
50 | | -%% ?SEGMENT_ENTRY_COUNT -> 2*?SEGMENT_ENTRY_COUNT - 1 and so on. As |
| 51 | +%% segment_entry_count() - 1, the 1st segment contains messages |
| 52 | +%% segment_entry_count() -> 2*segment_entry_count() - 1 and so on. As |
51 | 53 | %% such, in the segment files, we only refer to message sequence ids |
52 | | -%% by the LSBs as SeqId rem ?SEGMENT_ENTRY_COUNT. This gives them a |
| 54 | +%% by the LSBs as SeqId rem segment_entry_count(). This gives them a |
53 | 55 | %% fixed size. |
54 | 56 | %% |
55 | 57 | %% However, transient messages which are not sent to disk at any point |
|
127 | 129 | %% binary generation/matching with constant vs variable lengths. |
128 | 130 |
|
129 | 131 | -define(REL_SEQ_BITS, 14). |
130 | | -%% calculated as trunc(math:pow(2,?REL_SEQ_BITS))). |
131 | | --define(SEGMENT_ENTRY_COUNT, 16384). |
132 | 132 |
|
133 | 133 | %% seq only is binary 01 followed by 14 bits of rel seq id |
134 | 134 | %% (range: 0 - 16383) |
@@ -349,11 +349,11 @@ pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint |
349 | 349 | %% pre_publish_cache is the entry with most elements when compared to |
350 | 350 | %% delivered_cache so we only check the former in the guard. |
351 | 351 | maybe_flush_pre_publish_cache(JournalSizeHint, |
352 | | - #qistate{pre_publish_cache = PPC} = State) |
353 | | - when length(PPC) >= ?SEGMENT_ENTRY_COUNT -> |
354 | | - flush_pre_publish_cache(JournalSizeHint, State); |
355 | | -maybe_flush_pre_publish_cache(_JournalSizeHint, State) -> |
356 | | - State. |
| 352 | + #qistate{pre_publish_cache = PPC} = State) -> |
| 353 | + case length(PPC) >= segment_entry_count() of |
| 354 | + true -> flush_pre_publish_cache(JournalSizeHint, State); |
| 355 | + false -> State |
| 356 | + end. |
357 | 357 |
|
358 | 358 | flush_pre_publish_cache(JournalSizeHint, State) -> |
359 | 359 | State1 = flush_pre_publish_cache(State), |
@@ -985,10 +985,11 @@ notify_sync(State = #qistate{unconfirmed = UC, |
985 | 985 | %%---------------------------------------------------------------------------- |
986 | 986 |
|
987 | 987 | seq_id_to_seg_and_rel_seq_id(SeqId) -> |
988 | | - { SeqId div ?SEGMENT_ENTRY_COUNT, SeqId rem ?SEGMENT_ENTRY_COUNT }. |
| 988 | + SegmentEntryCount = segment_entry_count(), |
| 989 | + { SeqId div SegmentEntryCount, SeqId rem SegmentEntryCount }. |
989 | 990 |
|
990 | 991 | reconstruct_seq_id(Seg, RelSeq) -> |
991 | | - (Seg * ?SEGMENT_ENTRY_COUNT) + RelSeq. |
| 992 | + (Seg * segment_entry_count()) + RelSeq. |
992 | 993 |
|
993 | 994 | all_segment_nums(#qistate { dir = Dir, segments = Segments }) -> |
994 | 995 | lists:sort( |
@@ -1157,7 +1158,12 @@ array_new() -> |
1157 | 1158 | array_new(undefined). |
1158 | 1159 |
|
1159 | 1160 | array_new(Default) -> |
1160 | | - array:new([{default, Default}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). |
| 1161 | + array:new([{default, Default}, fixed, {size, segment_entry_count()}]). |
| 1162 | + |
| 1163 | +segment_entry_count() -> |
| 1164 | + {ok, SegmentEntryCount} = |
| 1165 | + application:get_env(rabbit, queue_index_segment_entry_count), |
| 1166 | + SegmentEntryCount. |
1161 | 1167 |
|
1162 | 1168 | bool_to_int(true ) -> 1; |
1163 | 1169 | bool_to_int(false) -> 0. |
|
0 commit comments