Skip to content
This repository has been archived by the owner on Aug 6, 2018. It is now read-only.

How to stop threads after main function is over in AWS FirehoseAsync putRecordAsync? #18

Open
mahatosourav91 opened this issue Jan 12, 2018 · 0 comments

Comments

@mahatosourav91
Copy link

mahatosourav91 commented Jan 12, 2018

I am trying to use AWS Firehose as a log appender. My requirement is to put logs in the firehose in async mode. I have made the log4j appender following this issue

The problem I am facing is that AWS FirehoseAsync opens multiple threads in the background but when the tasks are over, the main process keeps on running even if there is nothing to execute.

Stack overflow question :- link

public class MainTest {

    private static Logger log = LoggerFactory.getLogger(MainTest.class);


    public static void main(String[] args) throws UnsupportedEncodingException {

        String streamName = "kinesis-firehose stream";
        ClientConfiguration clientConfiguration = new ClientConfiguration();

        clientConfiguration.setMaxErrorRetry(5);
        clientConfiguration.setRetryPolicy(new RetryPolicy(PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION,
                PredefinedRetryPolicies.DEFAULT_BACKOFF_STRATEGY, 5, true));
        clientConfiguration.setUserAgentPrefix(AppenderConstants.USER_AGENT_STRING);

        BlockingQueue<Runnable> taskBuffer = new LinkedBlockingDeque<Runnable>(1000);

        ThreadPoolExecutorFactory factory = new ThreadPoolExecutorFactory(3, taskBuffer);
        AmazonKinesisFirehoseAsync kinesisFirehoseAsync = AmazonKinesisFirehoseAsyncClientBuilder
                .standard().withCredentials(new DefaultAWSCredentialsProviderChain())
                .withClientConfiguration(clientConfiguration).withExecutorFactory(factory)
                .withRegion(ApplicationContext.getInstance().getAwsRegion()).build();

        for(int i=0; i<10; i++) {
            log.debug("Counter [{}]", i);
            String message = String.valueOf(i);
            ByteBuffer data = ByteBuffer.wrap(message.getBytes("UTF-8"));

            Record record = new Record().withData(data);
            PutRecordRequest request = new PutRecordRequest().withDeliveryStreamName(streamName).withRecord(record);
            kinesisFirehoseAsync.putRecordAsync(request);
        }

    }

}
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant