Skip to content

Commit

Permalink
[MSHARED-1072] Poll data from input stream
Browse files Browse the repository at this point in the history
Input stream can be a System.in - all read will be blocked
We need read data in no blocking mode
  • Loading branch information
slawekjaranowski committed May 8, 2023
1 parent 37fa530 commit 1c264ba
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,13 @@ public void run() {

@Override
public Integer call() throws CommandLineException {
StreamFeeder inputFeeder = null;
StreamPollFeeder inputFeeder = null;
StreamPumper outputPumper = null;
StreamPumper errorPumper = null;
try {
if (systemIn != null) {
inputFeeder = new StreamFeeder(systemIn, p.getOutputStream());
inputFeeder.setName("StreamFeeder-systemIn");
inputFeeder = new StreamPollFeeder(systemIn, p.getOutputStream());
inputFeeder.setName("StreamPollFeeder-systemIn");
inputFeeder.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,51 +24,64 @@
import java.util.Objects;

/**
* Read from an InputStream and write the output to an OutputStream.
* Poll InputStream for available data and write the output to an OutputStream.
*
* @author <a href="mailto:trygvis@inamo.no">Trygve Laugst&oslash;l</a>
*/
class StreamFeeder extends Thread {
class StreamPollFeeder extends Thread {

private final InputStream input;
public static final int BUF_LEN = 80;

private final InputStream input;
private final OutputStream output;

private Throwable exception;
private boolean done;

private boolean done;
private final Object lock = new Object();

/**
* Create a new StreamFeeder
* Create a new StreamPollFeeder
*
* @param input Stream to read from
* @param output Stream to write to
*/
StreamFeeder(InputStream input, OutputStream output) {
StreamPollFeeder(InputStream input, OutputStream output) {
this.input = Objects.requireNonNull(input);
this.output = Objects.requireNonNull(output);
this.done = false;
}

@Override
@SuppressWarnings("checkstyle:innerassignment")
public void run() {

byte[] buf = new byte[BUF_LEN];

try {
for (int data; !isInterrupted() && (data = input.read()) != -1; ) {
output.write(data);
while (!done) {
if (input.available() > 0) {
int i = input.read(buf);
if (i > 0) {
output.write(buf, 0, i);
output.flush();
} else {
done = true;
}
} else {
synchronized (lock) {
if (!done) {
lock.wait(100);
}
}
}
}
output.flush();
} catch (IOException e) {
exception = e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
close();
}

synchronized (lock) {
done = true;
lock.notifyAll();
}
}

private void close() {
Expand All @@ -89,15 +102,16 @@ public Throwable getException() {
}

public void waitUntilDone() {
this.interrupt();

synchronized (lock) {
while (!done) {
try {
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
done = true;
lock.notifyAll();
}

try {
join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.maven.shared.utils.cli;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;

import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

public class StreamPollFeederTest {

@Test
public void waitUntilFeederDoneOnInputStream() throws Exception {

ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
StreamPollFeeder streamPollFeeder = new StreamPollFeeder(System.in, outputStream);

// start thread
streamPollFeeder.start();

// wait a moment
Thread.sleep(100);

// wait until process finish
streamPollFeeder.waitUntilDone();
assertNull(streamPollFeeder.getException());
}

@Test
public void dataShouldBeCopied() throws InterruptedException, IOException {

StringBuilder TEST_DATA = new StringBuilder();
for (int i = 0; i < 100; i++) {
TEST_DATA.append("TestData");
}

ByteArrayInputStream inputStream =
new ByteArrayInputStream(TEST_DATA.toString().getBytes());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

StreamPollFeeder streamPollFeeder = new StreamPollFeeder(inputStream, outputStream);

streamPollFeeder.start();

// wait until all data from steam will be read
while (outputStream.size() < TEST_DATA.length()) {
Thread.sleep(100);
}

// wait until process finish
streamPollFeeder.waitUntilDone();
assertNull(streamPollFeeder.getException());

assertEquals(TEST_DATA.toString(), outputStream.toString());
}
}

0 comments on commit 1c264ba

Please sign in to comment.