Skip to content

Commit

Permalink
add static records_count field
Browse files Browse the repository at this point in the history
  • Loading branch information
akashorabek committed Jul 17, 2022
1 parent e952d9a commit b84ab8c
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public class CustomReceiverWithOffset extends Receiver<String> implements HasOff

private static final Logger LOG = LoggerFactory.getLogger(CustomReceiverWithOffset.class);
private static final int TIMEOUT_MS = 500;

private static final List<String> STORED_RECORDS = new ArrayList<>();
private static final int RECORDS_COUNT = 20;
private Long startOffset;

CustomReceiverWithOffset() {
Expand Down Expand Up @@ -67,7 +67,7 @@ public Long getEndOffset() {
private void receive() {
Long currentOffset = startOffset;
while (!isStopped()) {
if (currentOffset <= 20) {
if (currentOffset <= RECORDS_COUNT) {
STORED_RECORDS.add(currentOffset.toString());
store((currentOffset++).toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class CustomReceiverWithoutOffset extends Receiver<String> {
private static final Logger LOG = LoggerFactory.getLogger(CustomReceiverWithoutOffset.class);
private static final int TIMEOUT_MS = 500;
private static final List<String> STORED_RECORDS = new ArrayList<>();
private static final int RECORDS_COUNT = 20;

CustomReceiverWithoutOffset() {
super(StorageLevel.MEMORY_AND_DISK_2());
Expand All @@ -53,7 +54,7 @@ public void onStop() {}
private void receive() {
Long currentOffset = 0L;
while (!isStopped()) {
if (currentOffset <= 20) {
if (currentOffset <= RECORDS_COUNT) {
STORED_RECORDS.add(currentOffset.toString());
store((currentOffset++).toString());
}
Expand Down

0 comments on commit b84ab8c

Please sign in to comment.