|
7 | 7 |
|
8 | 8 | -module(rabbit_queue_index).
|
9 | 9 |
|
| 10 | +-compile({inline, [segment_entry_count/0]}). |
| 11 | + |
10 | 12 | -export([erase/1, init/3, reset_state/1, recover/6,
|
11 | 13 | terminate/3, delete_and_terminate/1,
|
12 | 14 | pre_publish/7, flush_pre_publish_cache/2,
|
|
43 | 45 | %% then delivered, then ack'd.
|
44 | 46 | %%
|
45 | 47 | %% In order to be able to clean up ack'd messages, we write to segment
|
46 |
| -%% files. These files have a fixed number of entries: ?SEGMENT_ENTRY_COUNT |
| 48 | +%% files. These files have a fixed number of entries: segment_entry_count() |
47 | 49 | %% publishes, delivers and acknowledgements. They are numbered, and so
|
48 | 50 | %% it is known that the 0th segment contains messages 0 ->
|
49 |
| -%% ?SEGMENT_ENTRY_COUNT - 1, the 1st segment contains messages |
50 |
| -%% ?SEGMENT_ENTRY_COUNT -> 2*?SEGMENT_ENTRY_COUNT - 1 and so on. As |
| 51 | +%% segment_entry_count() - 1, the 1st segment contains messages |
| 52 | +%% segment_entry_count() -> 2*segment_entry_count() - 1 and so on. As |
51 | 53 | %% such, in the segment files, we only refer to message sequence ids
|
52 |
| -%% by the LSBs as SeqId rem ?SEGMENT_ENTRY_COUNT. This gives them a |
| 54 | +%% by the LSBs as SeqId rem segment_entry_count(). This gives them a |
53 | 55 | %% fixed size.
|
54 | 56 | %%
|
55 | 57 | %% However, transient messages which are not sent to disk at any point
|
|
127 | 129 | %% binary generation/matching with constant vs variable lengths.
|
128 | 130 |
|
129 | 131 | -define(REL_SEQ_BITS, 14).
|
130 |
| -%% calculated as trunc(math:pow(2,?REL_SEQ_BITS))). |
131 |
| --define(SEGMENT_ENTRY_COUNT, 16384). |
132 | 132 |
|
133 | 133 | %% seq only is binary 01 followed by 14 bits of rel seq id
|
134 | 134 | %% (range: 0 - 16383)
|
@@ -352,11 +352,11 @@ pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint
|
352 | 352 | %% pre_publish_cache is the entry with most elements when compared to
|
353 | 353 | %% delivered_cache so we only check the former in the guard.
|
354 | 354 | maybe_flush_pre_publish_cache(JournalSizeHint,
|
355 |
| - #qistate{pre_publish_cache = PPC} = State) |
356 |
| - when length(PPC) >= ?SEGMENT_ENTRY_COUNT -> |
357 |
| - flush_pre_publish_cache(JournalSizeHint, State); |
358 |
| -maybe_flush_pre_publish_cache(_JournalSizeHint, State) -> |
359 |
| - State. |
| 355 | + #qistate{pre_publish_cache = PPC} = State) -> |
| 356 | + case length(PPC) >= segment_entry_count() of |
| 357 | + true -> flush_pre_publish_cache(JournalSizeHint, State); |
| 358 | + false -> State |
| 359 | + end. |
360 | 360 |
|
361 | 361 | flush_pre_publish_cache(JournalSizeHint, State) ->
|
362 | 362 | State1 = flush_pre_publish_cache(State),
|
@@ -991,10 +991,11 @@ notify_sync(State = #qistate{unconfirmed = UC,
|
991 | 991 | %%----------------------------------------------------------------------------
|
992 | 992 |
|
993 | 993 | seq_id_to_seg_and_rel_seq_id(SeqId) ->
|
994 |
| - { SeqId div ?SEGMENT_ENTRY_COUNT, SeqId rem ?SEGMENT_ENTRY_COUNT }. |
| 994 | + SegmentEntryCount = segment_entry_count(), |
| 995 | + { SeqId div SegmentEntryCount, SeqId rem SegmentEntryCount }. |
995 | 996 |
|
996 | 997 | reconstruct_seq_id(Seg, RelSeq) ->
|
997 |
| - (Seg * ?SEGMENT_ENTRY_COUNT) + RelSeq. |
| 998 | + (Seg * segment_entry_count()) + RelSeq. |
998 | 999 |
|
999 | 1000 | all_segment_nums(#qistate { dir = Dir, segments = Segments }) ->
|
1000 | 1001 | lists:sort(
|
@@ -1163,7 +1164,12 @@ array_new() ->
|
1163 | 1164 | array_new(undefined).
|
1164 | 1165 |
|
1165 | 1166 | array_new(Default) ->
|
1166 |
| - array:new([{default, Default}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). |
| 1167 | + array:new([{default, Default}, fixed, {size, segment_entry_count()}]). |
| 1168 | + |
| 1169 | +segment_entry_count() -> |
| 1170 | + {ok, SegmentEntryCount} = |
| 1171 | + application:get_env(rabbit, queue_index_segment_entry_count), |
| 1172 | + SegmentEntryCount. |
1167 | 1173 |
|
1168 | 1174 | bool_to_int(true ) -> 1;
|
1169 | 1175 | bool_to_int(false) -> 0.
|
|
0 commit comments