diff --git a/.gitignore b/.gitignore index 79dee61e..93d91956 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,6 @@ /bin /third_party /java/amazon-kinesis-producer/src/main/resources/amazon-kinesis-producer-native-binaries/**/* +.classpath +.project +.settings diff --git a/README.md b/README.md index d48b7835..ce621c50 100644 --- a/README.md +++ b/README.md @@ -198,7 +198,7 @@ There are two options. You can either pack the binaries into the jar like we did #### Packing Native Binaries into the Jar -You will need JDK 1.7+, Apache Maven and Python 2.7 installed. +You will need JDK 1.6+, Apache Maven and Python 2.7 installed. If you're on Windows, do the following in the git bash shell we used for building. You will need to add `java` and `python` to the `PATH`, as well as set `JAVA_HOME` for maven to work. diff --git a/java/amazon-kinesis-producer/pom.xml b/java/amazon-kinesis-producer/pom.xml index 9fec8e32..24571d52 100644 --- a/java/amazon-kinesis-producer/pom.xml +++ b/java/amazon-kinesis-producer/pom.xml @@ -20,8 +20,8 @@ maven-compiler-plugin 3.1 - 1.7 - 1.7 + 1.6 + 1.6 @@ -116,7 +116,11 @@ 2.2.22 test - + + javax.xml.bind + jaxb-api + 2.2.12 + diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/Daemon.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/Daemon.java index 8fe140e5..2022e689 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/Daemon.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/Daemon.java @@ -16,14 +16,14 @@ package com.amazonaws.services.kinesis.producer; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.Channels; import java.nio.channels.FileChannel; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -74,8 +74,8 @@ public static interface MessageHandler { public void onError(Throwable t); } - private BlockingQueue outgoingMessages = new LinkedBlockingQueue<>(); - private BlockingQueue incomingMessages = new LinkedBlockingQueue<>(); + private BlockingQueue outgoingMessages = new LinkedBlockingQueue(); + private BlockingQueue incomingMessages = new LinkedBlockingQueue(); private ExecutorService executor = Executors .newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("kpl-daemon-%04d").build()); @@ -99,6 +99,8 @@ public static interface MessageHandler { private final String workingDir; private final KinesisProducerConfiguration config; private final Map environmentVariables; + private FileInputStream fileInputStream; + private FileOutputStream fileOutputStream; /** * Starts up the child process, connects to it, and beings sending and @@ -161,7 +163,7 @@ protected Daemon(File inPipe, File outPipe, MessageHandler handler) { try { connectToChild(); startLoops(); - } catch (IOException e) { + } catch (Exception e) { fatalError("Could not connect to child", e, false); } } @@ -230,7 +232,9 @@ private void sendMessage() { outChannel.write(lenBuf); m.writeTo(outStream); outStream.flush(); - } catch (IOException | InterruptedException e) { + } catch (IOException ioe) { + fatalError("Error writing message to daemon", ioe); + } catch (InterruptedException e) { fatalError("Error writing message to daemon", e); } } @@ -256,7 +260,9 @@ private void receiveMessage() { // Deserialize message and add it to the queue Message m = Message.parseFrom(ByteString.copyFrom(rcvBuf)); incomingMessages.put(m); - } catch (IOException | InterruptedException e) { + } catch (IOException ioe) { + fatalError("Error reading message from daemon", ioe); + } catch (InterruptedException e) { fatalError("Error reading message from daemon", e); } } @@ -327,21 +333,20 @@ public void run() { }); } - private void connectToChild() throws IOException { + private void connectToChild() throws Exception { long start = System.nanoTime(); + while (true) { - try { - inChannel = FileChannel.open(Paths.get(inPipe.getAbsolutePath()), StandardOpenOption.READ); - outChannel = FileChannel.open(Paths.get(outPipe.getAbsolutePath()), StandardOpenOption.WRITE); + fileInputStream = new FileInputStream(inPipe.getAbsolutePath()); + fileOutputStream = new FileOutputStream(outPipe.getAbsolutePath()); + + try { + inChannel = fileInputStream.getChannel(); + outChannel = fileOutputStream.getChannel(); + outStream = Channels.newOutputStream(outChannel); break; - } catch (IOException e) { - if (inChannel != null && inChannel.isOpen()) { - inChannel.close(); - } - if (outChannel != null && outChannel.isOpen()) { - outChannel.close(); - } + } catch (Exception e) { try { Thread.sleep(100); } catch (InterruptedException e1) {} @@ -365,11 +370,11 @@ private void createPipes() throws IOException { private void createPipesWindows() { do { - inPipe = Paths.get("\\\\.\\pipe\\amz-aws-kpl-in-pipe-" + uuid8Chars()).toFile(); + inPipe = new File("\\\\.\\pipe\\amz-aws-kpl-in-pipe-" + uuid8Chars()); } while (inPipe.exists()); do { - outPipe = Paths.get("\\\\.\\pipe\\amz-aws-kpl-out-pipe-" + uuid8Chars()).toFile(); + outPipe = new File("\\\\.\\pipe\\amz-aws-kpl-out-pipe-" + uuid8Chars()); } while (outPipe.exists()); } @@ -380,13 +385,13 @@ private void createPipesUnix() { } do { - inPipe = Paths.get(dir.getAbsolutePath(), - "amz-aws-kpl-in-pipe-" + uuid8Chars()).toFile(); + inPipe = new File(dir.getAbsolutePath(), + "amz-aws-kpl-in-pipe-" + uuid8Chars()); } while (inPipe.exists()); do { - outPipe = Paths.get(dir.getAbsolutePath(), - "amz-aws-kpl-out-pipe-" + uuid8Chars()).toFile(); + outPipe = new File(dir.getAbsolutePath(), + "amz-aws-kpl-out-pipe-" + uuid8Chars()); } while (outPipe.exists()); try { @@ -414,12 +419,15 @@ private void deletePipes() { outChannel.close(); inPipe.delete(); outPipe.delete(); + + if(fileInputStream != null) fileInputStream.close(); + if(fileOutputStream != null) fileOutputStream.close(); } catch (Exception e) { } } private void startChildProcess() throws IOException, InterruptedException { log.info("Asking for trace"); - List args = new ArrayList<>(Arrays.asList(pathToExecutable, "-o", outPipe.getAbsolutePath(), "-i", + List args = new ArrayList(Arrays.asList(pathToExecutable, "-o", outPipe.getAbsolutePath(), "-i", inPipe.getAbsolutePath(), "-c", protobufToHex(config.toProtobufMessage()), "-k", protobufToHex(makeSetCredentialsMessage(config.getCredentialsProvider(), false)), "-t")); @@ -444,7 +452,7 @@ public void run() { try { connectToChild(); startLoops(); - } catch (IOException e) { + } catch (Exception e) { fatalError("Unexpected error connecting to child process", e, false); } } diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/FileAgeManager.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/FileAgeManager.java index f0c799c6..a77ed94c 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/FileAgeManager.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/FileAgeManager.java @@ -46,7 +46,7 @@ static synchronized FileAgeManager instance() { } FileAgeManager(ScheduledExecutorService executorService) { - this.watchedFiles = new HashSet<>(); + this.watchedFiles = new HashSet(); this.executorService = executorService; this.executorService.scheduleAtFixedRate(this, 1, 1, TimeUnit.MINUTES); } diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java index c33d4d6e..7ff9df11 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java @@ -24,7 +24,6 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.channels.FileLock; -import java.nio.file.Paths; import java.security.MessageDigest; import java.util.ArrayList; import java.util.Arrays; @@ -94,7 +93,7 @@ public class KinesisProducer { private final KinesisProducerConfiguration config; private final Map env; private final AtomicLong messageNumber = new AtomicLong(1); - private final Map> futures = new ConcurrentHashMap<>(); + private final Map> futures = new ConcurrentHashMap>(); private final ExecutorService callbackCompletionExecutor = new ThreadPoolExecutor( 1, @@ -201,7 +200,7 @@ private void onPutRecordResult(Message msg) { private void onMetricsResponse(Message msg) { SettableFuture> f = getFuture(msg); - List userMetrics = new ArrayList<>(); + List userMetrics = new ArrayList(); MetricsResponse res = msg.getMetricsResponse(); for (Messages.Metric metric : res.getMetricsList()) { userMetrics.add(new Metric(metric)); @@ -768,7 +767,7 @@ public void flushSync() { private void extractBinaries() { synchronized (EXTRACT_BIN_MUTEX) { - final List watchFiles = new ArrayList<>(2); + final List watchFiles = new ArrayList(2); String os = SystemUtils.OS_NAME; if (SystemUtils.IS_OS_WINDOWS) { os = "windows"; @@ -786,7 +785,8 @@ private void extractBinaries() { if (tmpDir.trim().length() == 0) { tmpDir = System.getProperty("java.io.tmpdir"); } - tmpDir = Paths.get(tmpDir, root).toString(); + + tmpDir = new File(tmpDir, root).getAbsolutePath(); pathToTmpDir = tmpDir; String binPath = config.getNativeExecutable(); @@ -809,21 +809,28 @@ private void extractBinaries() { MessageDigest md = MessageDigest.getInstance("SHA1"); String mdHex = DatatypeConverter.printHexBinary(md.digest(bin)).toLowerCase(); - pathToExecutable = Paths.get(pathToTmpDir, "kinesis_producer_" + mdHex + extension).toString(); + pathToExecutable = new File(pathToTmpDir, "kinesis_producer_" + mdHex + extension).getAbsolutePath(); File extracted = new File(pathToExecutable); watchFiles.add(extracted); // use dedicated lock-file to limit access to executable by a single process - final String pathToLock = Paths.get(pathToTmpDir, "kinesis_producer_" + mdHex + ".lock").toString(); + final String pathToLock = new File(pathToTmpDir, "kinesis_producer_" + mdHex + ".lock").getAbsolutePath(); final File lockFile = new File(pathToLock); - try (FileOutputStream lockFOS = new FileOutputStream(lockFile); - FileLock lock = lockFOS.getChannel().lock()) { + + FileOutputStream lockFOS = new FileOutputStream(lockFile); + FileLock lock = lockFOS.getChannel().lock(); + + try { if (extracted.exists()) { boolean contentEqual = false; if (extracted.length() == bin.length) { - try (InputStream executableIS = new FileInputStream(extracted)) { + InputStream executableIS = new FileInputStream(extracted); + + try { byte[] existingBin = IOUtils.toByteArray(executableIS); contentEqual = Arrays.equals(bin, existingBin); + } finally { + if (executableIS != null) executableIS.close(); } } if (!contentEqual) { @@ -831,21 +838,32 @@ private void extractBinaries() { + " is not what it's expected to be."); } } else { - try (OutputStream fos = new FileOutputStream(extracted)) { + OutputStream fos = new FileOutputStream(extracted); + + try { IOUtils.write(bin, fos); + } finally { + if (fos != null) fos.close(); } extracted.setExecutable(true); } + } finally { + if (lockFOS != null) lockFOS.close(); } String certFileName = "b204d74a.0"; File certFile = new File(pathToTmpDir, certFileName); if (!certFile.exists()) { - try (FileOutputStream fos = new FileOutputStream(certFile); - FileLock lock = fos.getChannel().lock()) { + FileOutputStream fos = new FileOutputStream(certFile); + + try { + lock = fos.getChannel().lock(); byte[] certs = IOUtils.toByteArray( this.getClass().getClassLoader().getResourceAsStream("cacerts/" + certFileName)); IOUtils.write(certs, fos); + } finally { + if (lock != null) lock.release(); + if (fos != null) fos.close(); } } diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java index d997ab47..caddae9a 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java @@ -38,7 +38,7 @@ public class KinesisProducerConfiguration { private static final Logger log = LoggerFactory.getLogger(KinesisProducerConfiguration.class); - private List additionalDims = new ArrayList<>(); + private List additionalDims = new ArrayList(); private AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); private AWSCredentialsProvider metricsCredentialsProvider = null; @@ -156,7 +156,9 @@ public static KinesisProducerConfiguration fromPropertiesFile(String path) { log.info("Attempting to load config from file " + path); Properties props = new Properties(); - try (InputStream is = new FileInputStream(path)) { + + try { + InputStream is = new FileInputStream(path); props.load(is); } catch (Exception e) { throw new RuntimeException("Error loading config from properties file", e); diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/LogInputStreamReader.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/LogInputStreamReader.java index 0838a3c2..da4fe856 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/LogInputStreamReader.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/LogInputStreamReader.java @@ -40,7 +40,7 @@ public class LogInputStreamReader implements Runnable { private static Map makeEmitters() { - Map emitters = new HashMap<>(); + Map emitters = new HashMap(); emitters.put("trace", new LoggingFunction() { @Override public void apply(String message) { @@ -94,7 +94,7 @@ public void apply(String message) { private volatile boolean shuttingDown = false; private boolean isReadingRecord = false; - private final LinkedList messageData = new LinkedList<>(); + private final LinkedList messageData = new LinkedList(); public LogInputStreamReader(InputStream is, String streamType, DefaultLoggingFunction logFunction) { this.streamType = streamType; @@ -174,7 +174,7 @@ private void startRead() { private LoggingFunction getLevelOrDefault(String message) { Matcher matcher = LEVEL_REGEX.matcher(message); if (matcher.find()) { - String level = matcher.group("level"); + String level = matcher.group(1); if (level != null) { LoggingFunction res = EMITTERS.get(level.toLowerCase()); if (res != null) { @@ -210,4 +210,4 @@ static interface DefaultLoggingFunction { void apply(Logger logger, String message); } -} +} \ No newline at end of file diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/UserRecordResult.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/UserRecordResult.java index b3188a2b..66d03401 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/UserRecordResult.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/UserRecordResult.java @@ -80,7 +80,7 @@ public boolean isSuccessful() { } protected static UserRecordResult fromProtobufMessage(Messages.PutRecordResult r) { - final List attempts = new ArrayList<>(r.getAttemptsCount()); + final List attempts = new ArrayList(r.getAttemptsCount()); for (Messages.Attempt a : r.getAttemptsList()) { attempts.add(Attempt.fromProtobufMessage(a)); } diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/_ConfigTemplate.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/_ConfigTemplate.java index 47fb24d7..242abb6f 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/_ConfigTemplate.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/_ConfigTemplate.java @@ -39,7 +39,7 @@ class _ConfigTemplate { private static final Logger log = LoggerFactory.getLogger(_ConfigTemplate.class); - private List additionalDims = new ArrayList<>(); + private List additionalDims = new ArrayList(); private AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); private AWSCredentialsProvider metricsCredentialsProvider = null; @@ -167,7 +167,9 @@ public static _ConfigTemplate fromPropertiesFile(String path) { log.info("Attempting to load config from file " + path); Properties props = new Properties(); - try (InputStream is = new FileInputStream(path)) { + + try { + InputStream is = new FileInputStream(path); props.load(is); } catch (Exception e) { throw new RuntimeException("Error loading config from properties file", e); diff --git a/java/amazon-kinesis-producer/src/test/java/com/amazonaws/services/kinesis/producer/FileAgeManagerTest.java b/java/amazon-kinesis-producer/src/test/java/com/amazonaws/services/kinesis/producer/FileAgeManagerTest.java index edad3185..848d3dc9 100644 --- a/java/amazon-kinesis-producer/src/test/java/com/amazonaws/services/kinesis/producer/FileAgeManagerTest.java +++ b/java/amazon-kinesis-producer/src/test/java/com/amazonaws/services/kinesis/producer/FileAgeManagerTest.java @@ -54,7 +54,7 @@ public void simpleTest() { } private List makeFileList() { - List files = new ArrayList<>(); + List files = new ArrayList(); files.add(testFile); when(testFile.getAbsoluteFile()).thenReturn(testFile); return files;