Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Achieve compatibility with Java 1.6 #151

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
/bin
/third_party
/java/amazon-kinesis-producer/src/main/resources/amazon-kinesis-producer-native-binaries/**/*
.classpath
.project
.settings
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ There are two options. You can either pack the binaries into the jar like we did

#### Packing Native Binaries into the Jar

You will need JDK 1.7+, Apache Maven and Python 2.7 installed.
You will need JDK 1.6+, Apache Maven and Python 2.7 installed.

If you're on Windows, do the following in the git bash shell we used for building. You will need to add `java` and `python` to the `PATH`, as well as set `JAVA_HOME` for maven to work.

Expand Down
10 changes: 7 additions & 3 deletions java/amazon-kinesis-producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
Expand Down Expand Up @@ -116,7 +116,11 @@
<version>2.2.22</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.2.12</version>
</dependency>
</dependencies>

<developers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
package com.amazonaws.services.kinesis.producer;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -74,8 +74,8 @@ public static interface MessageHandler {
public void onError(Throwable t);
}

private BlockingQueue<Message> outgoingMessages = new LinkedBlockingQueue<>();
private BlockingQueue<Message> incomingMessages = new LinkedBlockingQueue<>();
private BlockingQueue<Message> outgoingMessages = new LinkedBlockingQueue<Message>();
private BlockingQueue<Message> incomingMessages = new LinkedBlockingQueue<Message>();

private ExecutorService executor = Executors
.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("kpl-daemon-%04d").build());
Expand All @@ -99,6 +99,8 @@ public static interface MessageHandler {
private final String workingDir;
private final KinesisProducerConfiguration config;
private final Map<String, String> environmentVariables;
private FileInputStream fileInputStream;
private FileOutputStream fileOutputStream;

/**
* Starts up the child process, connects to it, and beings sending and
Expand Down Expand Up @@ -161,7 +163,7 @@ protected Daemon(File inPipe, File outPipe, MessageHandler handler) {
try {
connectToChild();
startLoops();
} catch (IOException e) {
} catch (Exception e) {
fatalError("Could not connect to child", e, false);
}
}
Expand Down Expand Up @@ -230,7 +232,9 @@ private void sendMessage() {
outChannel.write(lenBuf);
m.writeTo(outStream);
outStream.flush();
} catch (IOException | InterruptedException e) {
} catch (IOException ioe) {
fatalError("Error writing message to daemon", ioe);
} catch (InterruptedException e) {
fatalError("Error writing message to daemon", e);
}
}
Expand All @@ -256,7 +260,9 @@ private void receiveMessage() {
// Deserialize message and add it to the queue
Message m = Message.parseFrom(ByteString.copyFrom(rcvBuf));
incomingMessages.put(m);
} catch (IOException | InterruptedException e) {
} catch (IOException ioe) {
fatalError("Error reading message from daemon", ioe);
} catch (InterruptedException e) {
fatalError("Error reading message from daemon", e);
}
}
Expand Down Expand Up @@ -327,21 +333,20 @@ public void run() {
});
}

private void connectToChild() throws IOException {
private void connectToChild() throws Exception {
long start = System.nanoTime();

while (true) {
try {
inChannel = FileChannel.open(Paths.get(inPipe.getAbsolutePath()), StandardOpenOption.READ);
outChannel = FileChannel.open(Paths.get(outPipe.getAbsolutePath()), StandardOpenOption.WRITE);
fileInputStream = new FileInputStream(inPipe.getAbsolutePath());
fileOutputStream = new FileOutputStream(outPipe.getAbsolutePath());

try {
inChannel = fileInputStream.getChannel();
outChannel = fileOutputStream.getChannel();

outStream = Channels.newOutputStream(outChannel);
break;
} catch (IOException e) {
if (inChannel != null && inChannel.isOpen()) {
inChannel.close();
}
if (outChannel != null && outChannel.isOpen()) {
outChannel.close();
}
} catch (Exception e) {
try {
Thread.sleep(100);
} catch (InterruptedException e1) {}
Expand All @@ -365,11 +370,11 @@ private void createPipes() throws IOException {

private void createPipesWindows() {
do {
inPipe = Paths.get("\\\\.\\pipe\\amz-aws-kpl-in-pipe-" + uuid8Chars()).toFile();
inPipe = new File("\\\\.\\pipe\\amz-aws-kpl-in-pipe-" + uuid8Chars());
} while (inPipe.exists());

do {
outPipe = Paths.get("\\\\.\\pipe\\amz-aws-kpl-out-pipe-" + uuid8Chars()).toFile();
outPipe = new File("\\\\.\\pipe\\amz-aws-kpl-out-pipe-" + uuid8Chars());
} while (outPipe.exists());
}

Expand All @@ -380,13 +385,13 @@ private void createPipesUnix() {
}

do {
inPipe = Paths.get(dir.getAbsolutePath(),
"amz-aws-kpl-in-pipe-" + uuid8Chars()).toFile();
inPipe = new File(dir.getAbsolutePath(),
"amz-aws-kpl-in-pipe-" + uuid8Chars());
} while (inPipe.exists());

do {
outPipe = Paths.get(dir.getAbsolutePath(),
"amz-aws-kpl-out-pipe-" + uuid8Chars()).toFile();
outPipe = new File(dir.getAbsolutePath(),
"amz-aws-kpl-out-pipe-" + uuid8Chars());
} while (outPipe.exists());

try {
Expand Down Expand Up @@ -414,12 +419,15 @@ private void deletePipes() {
outChannel.close();
inPipe.delete();
outPipe.delete();

if(fileInputStream != null) fileInputStream.close();
if(fileOutputStream != null) fileOutputStream.close();
} catch (Exception e) { }
}

private void startChildProcess() throws IOException, InterruptedException {
log.info("Asking for trace");
List<String> args = new ArrayList<>(Arrays.asList(pathToExecutable, "-o", outPipe.getAbsolutePath(), "-i",
List<String> args = new ArrayList<String>(Arrays.asList(pathToExecutable, "-o", outPipe.getAbsolutePath(), "-i",
inPipe.getAbsolutePath(), "-c", protobufToHex(config.toProtobufMessage()), "-k",
protobufToHex(makeSetCredentialsMessage(config.getCredentialsProvider(), false)), "-t"));

Expand All @@ -444,7 +452,7 @@ public void run() {
try {
connectToChild();
startLoops();
} catch (IOException e) {
} catch (Exception e) {
fatalError("Unexpected error connecting to child process", e, false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ static synchronized FileAgeManager instance() {
}

FileAgeManager(ScheduledExecutorService executorService) {
this.watchedFiles = new HashSet<>();
this.watchedFiles = new HashSet<File>();
this.executorService = executorService;
this.executorService.scheduleAtFixedRate(this, 1, 1, TimeUnit.MINUTES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.channels.FileLock;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -94,7 +93,7 @@ public class KinesisProducer {
private final KinesisProducerConfiguration config;
private final Map<String, String> env;
private final AtomicLong messageNumber = new AtomicLong(1);
private final Map<Long, SettableFuture<?>> futures = new ConcurrentHashMap<>();
private final Map<Long, SettableFuture<?>> futures = new ConcurrentHashMap<Long, SettableFuture<?>>();

private final ExecutorService callbackCompletionExecutor = new ThreadPoolExecutor(
1,
Expand Down Expand Up @@ -201,7 +200,7 @@ private void onPutRecordResult(Message msg) {
private void onMetricsResponse(Message msg) {
SettableFuture<List<Metric>> f = getFuture(msg);

List<Metric> userMetrics = new ArrayList<>();
List<Metric> userMetrics = new ArrayList<Metric>();
MetricsResponse res = msg.getMetricsResponse();
for (Messages.Metric metric : res.getMetricsList()) {
userMetrics.add(new Metric(metric));
Expand Down Expand Up @@ -768,7 +767,7 @@ public void flushSync() {

private void extractBinaries() {
synchronized (EXTRACT_BIN_MUTEX) {
final List<File> watchFiles = new ArrayList<>(2);
final List<File> watchFiles = new ArrayList<File>(2);
String os = SystemUtils.OS_NAME;
if (SystemUtils.IS_OS_WINDOWS) {
os = "windows";
Expand All @@ -786,7 +785,8 @@ private void extractBinaries() {
if (tmpDir.trim().length() == 0) {
tmpDir = System.getProperty("java.io.tmpdir");
}
tmpDir = Paths.get(tmpDir, root).toString();

tmpDir = new File(tmpDir, root).getAbsolutePath();
pathToTmpDir = tmpDir;

String binPath = config.getNativeExecutable();
Expand All @@ -809,43 +809,61 @@ private void extractBinaries() {
MessageDigest md = MessageDigest.getInstance("SHA1");
String mdHex = DatatypeConverter.printHexBinary(md.digest(bin)).toLowerCase();

pathToExecutable = Paths.get(pathToTmpDir, "kinesis_producer_" + mdHex + extension).toString();
pathToExecutable = new File(pathToTmpDir, "kinesis_producer_" + mdHex + extension).getAbsolutePath();
File extracted = new File(pathToExecutable);
watchFiles.add(extracted);

// use dedicated lock-file to limit access to executable by a single process
final String pathToLock = Paths.get(pathToTmpDir, "kinesis_producer_" + mdHex + ".lock").toString();
final String pathToLock = new File(pathToTmpDir, "kinesis_producer_" + mdHex + ".lock").getAbsolutePath();
final File lockFile = new File(pathToLock);
try (FileOutputStream lockFOS = new FileOutputStream(lockFile);
FileLock lock = lockFOS.getChannel().lock()) {

FileOutputStream lockFOS = new FileOutputStream(lockFile);
FileLock lock = lockFOS.getChannel().lock();

try {
if (extracted.exists()) {
boolean contentEqual = false;
if (extracted.length() == bin.length) {
try (InputStream executableIS = new FileInputStream(extracted)) {
InputStream executableIS = new FileInputStream(extracted);

try {
byte[] existingBin = IOUtils.toByteArray(executableIS);
contentEqual = Arrays.equals(bin, existingBin);
} finally {
if (executableIS != null) executableIS.close();
}
}
if (!contentEqual) {
throw new SecurityException("The contents of the binary " + extracted.getAbsolutePath()
+ " is not what it's expected to be.");
}
} else {
try (OutputStream fos = new FileOutputStream(extracted)) {
OutputStream fos = new FileOutputStream(extracted);

try {
IOUtils.write(bin, fos);
} finally {
if (fos != null) fos.close();
}
extracted.setExecutable(true);
}
} finally {
if (lockFOS != null) lockFOS.close();
}

String certFileName = "b204d74a.0";
File certFile = new File(pathToTmpDir, certFileName);
if (!certFile.exists()) {
try (FileOutputStream fos = new FileOutputStream(certFile);
FileLock lock = fos.getChannel().lock()) {
FileOutputStream fos = new FileOutputStream(certFile);

try {
lock = fos.getChannel().lock();
byte[] certs = IOUtils.toByteArray(
this.getClass().getClassLoader().getResourceAsStream("cacerts/" + certFileName));
IOUtils.write(certs, fos);
} finally {
if (lock != null) lock.release();
if (fos != null) fos.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
public class KinesisProducerConfiguration {
private static final Logger log = LoggerFactory.getLogger(KinesisProducerConfiguration.class);

private List<AdditionalDimension> additionalDims = new ArrayList<>();
private List<AdditionalDimension> additionalDims = new ArrayList<AdditionalDimension>();
private AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
private AWSCredentialsProvider metricsCredentialsProvider = null;

Expand Down Expand Up @@ -156,7 +156,9 @@ public static KinesisProducerConfiguration fromPropertiesFile(String path) {
log.info("Attempting to load config from file " + path);

Properties props = new Properties();
try (InputStream is = new FileInputStream(path)) {

try {
InputStream is = new FileInputStream(path);
props.load(is);
} catch (Exception e) {
throw new RuntimeException("Error loading config from properties file", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class LogInputStreamReader implements Runnable {

private static Map<String, LoggingFunction> makeEmitters() {

Map<String, LoggingFunction> emitters = new HashMap<>();
Map<String, LoggingFunction> emitters = new HashMap<String, LoggingFunction>();
emitters.put("trace", new LoggingFunction() {
@Override
public void apply(String message) {
Expand Down Expand Up @@ -94,7 +94,7 @@ public void apply(String message) {
private volatile boolean shuttingDown = false;

private boolean isReadingRecord = false;
private final LinkedList<String> messageData = new LinkedList<>();
private final LinkedList<String> messageData = new LinkedList<String>();

public LogInputStreamReader(InputStream is, String streamType, DefaultLoggingFunction logFunction) {
this.streamType = streamType;
Expand Down Expand Up @@ -174,7 +174,7 @@ private void startRead() {
private LoggingFunction getLevelOrDefault(String message) {
Matcher matcher = LEVEL_REGEX.matcher(message);
if (matcher.find()) {
String level = matcher.group("level");
String level = matcher.group(1);
if (level != null) {
LoggingFunction res = EMITTERS.get(level.toLowerCase());
if (res != null) {
Expand Down Expand Up @@ -210,4 +210,4 @@ static interface DefaultLoggingFunction {
void apply(Logger logger, String message);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public boolean isSuccessful() {
}

protected static UserRecordResult fromProtobufMessage(Messages.PutRecordResult r) {
final List<Attempt> attempts = new ArrayList<>(r.getAttemptsCount());
final List<Attempt> attempts = new ArrayList<Attempt>(r.getAttemptsCount());
for (Messages.Attempt a : r.getAttemptsList()) {
attempts.add(Attempt.fromProtobufMessage(a));
}
Expand Down
Loading