Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Writer will not handle records, if batchsize is not reached #413

Open
thom2batki opened this issue Apr 26, 2023 · 1 comment
Open

Writer will not handle records, if batchsize is not reached #413

thom2batki opened this issue Apr 26, 2023 · 1 comment

Comments

@thom2batki
Copy link

When using BlockingQueues for processing incoming records, the writer-job will not execute, if the batchsize is not reached.

I have following Code:

public class Launcher {

    public static void main(String[] args) {

        BlockingQueue<Record<String>> processingQueue = new LinkedBlockingQueue<>();
        BlockingQueue<Record<String>> resultQueue = new LinkedBlockingQueue<>();

        Job readerJob = new JobBuilder<String, String>()
                .named("reader-job")
                .reader(new IterableRecordReader<>(Arrays.asList("one", "two", "three", "four")))
                .writer(new BlockingQueueRecordWriter<>(processingQueue))
                .batchSize(5)
                .build();

        Job processorJob = new JobBuilder<String, String>()
                .named("processor-job")
                .reader(new BlockingQueueRecordReader<>(processingQueue))
                .processor(new MyProcessor("processor-job"))
                .writer(new BlockingQueueRecordWriter<>(resultQueue))
                .batchSize(5)
                .build();

        Job writerJob = new JobBuilder<String, String>()
                .named("writer-job")
                .reader(new BlockingQueueRecordReader<>(resultQueue))
                .writer(new StandardOutputRecordWriter<>())
                .batchSize(5)
                .build();

        JobExecutor jobExecutor = new JobExecutor();
        jobExecutor.submitAll(readerJob, processorJob, writerJob);
        jobExecutor.shutdown();
    }

    static class MyProcessor implements RecordProcessor<String, String> {

        private String workerName;

        public MyProcessor(String workerName) {
            this.workerName = workerName;
        }

        @Override
        public Record processRecord(Record record) throws Exception {
            System.out.println(workerName + ": processing record " + record.getPayload());
            return record;
        }
    }
}

As you can see, batchsize is defined as 5 for all jobs and 4 records need to be processed. In this case, all records will be read and processed, but never be written.

There are basically three different behaviours, depending on batchsize and number of records:

  1. batchsize never reached -> records never written
  2. batchsize exacly reached -> all records written
  3. batchsize exceeded -> only common multiple of batchsize written, rest will never be written

It seems that the writer-job is closing the queue to early and not waiting for incoming records anymore. In earlier versions of easy-batch PoisonRecords were used to sync between jobs. Is there anything comparable in actual versions? I don't want to just define a QUEUE_TIMEOUT , which does not feel like a clever solution for this problem.

Because of this fact, the provided easy-batch-tutorial is not working properly:

https://github.com/j-easy/easy-batch/blob/master/easy-batch-tutorials/src/main/java/org/jeasy/batch/tutorials/advanced/parallel/ForkJoin.java

Here, all records are getting processed by the workingJobs, but they will never be written to Std.out by the defined StandardOutputRecordWriter<>().

Can someone help me out with this issue or someone out there dealing with the same problem?

@joytools
Copy link

joytools commented Jun 9, 2023

I am using a LOT parallel jobs and I found the same problem when migrating to the latest EasyBatch release.
I prefer using PoisonRecords to achieve a finer control over the entire processing flow.

Here is how I solved the problem

  1. Downloaded previous EasyBatch version source code

  2. Rebuilt the following 5 classes from the original source code to a custom package

org.mypackage.PoisonRecordFilter
org.mypackage.PoisonrecordBroadcaster
org.mypackage.PoisonRecord
org.mypackage.PoisonBlockingQueueRecordReader
org.mypackage.PoisonBlockingQueueRecordWriter
  1. Reintroduced the previous PoisonRecord based functionalities using the above classes in my package

Hope this may help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants