Skip to content

Commit

Permalink
[modbus] Race condition fix
Browse files Browse the repository at this point in the history
The CountDownLatch was used as a guard (latch.await) in many tests to
wait for callbacks to be called before proceeding with assertions.

Since the latch was countDown() beginning of the callback, we introduced
a race condition with the subsequent assertions and updating the other
counters used in the subsequent assertions.

This commit updates the CountDownLatch as the last step of the callback,
resolving the race condition.

Signed-off-by: Sami Salonen <ssalonen@gmail.com>
  • Loading branch information
ssalonen committed Jun 24, 2020
1 parent 14860b0 commit e78cf01
Showing 1 changed file with 17 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,14 @@ public void testSlaveReadErrorResponse() throws Exception {
try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
comms.submitOneTimePoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 0, 5, 1), result -> {
callbackCalled.countDown();
if (result.getRegisters() != null) {
assert !result.hasError();
okCount.incrementAndGet();
} else {
errorCount.incrementAndGet();
lastError.set(result.getCause());
}
callbackCalled.countDown();
});
assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));

Expand Down Expand Up @@ -188,7 +188,6 @@ public void testSlaveConnectionError() throws Exception {
configuration)) {
comms.submitOneTimePoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 0, 5, 1), result -> {
callbackCalled.countDown();
if (result.getRegisters() != null) {
assert !result.hasError();
okCount.incrementAndGet();
Expand All @@ -197,6 +196,7 @@ public void testSlaveConnectionError() throws Exception {
errorCount.incrementAndGet();
lastError.set(result.getCause());
}
callbackCalled.countDown();
});
assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));

Expand Down Expand Up @@ -224,7 +224,6 @@ public void testIOError() throws Exception {
try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
comms.submitOneTimePoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 0, 5, 1), result -> {
callbackCalled.countDown();
if (result.getRegisters() != null) {
assert !result.hasError();
okCount.incrementAndGet();
Expand All @@ -233,6 +232,7 @@ public void testIOError() throws Exception {
errorCount.incrementAndGet();
lastError.set(result.getCause());
}
callbackCalled.countDown();
});
assertTrue(callbackCalled.await(15, TimeUnit.SECONDS));
assertThat(okCount.get(), is(equalTo(0)));
Expand All @@ -256,12 +256,12 @@ public void testOneOffReadWithDiscreteOrCoils(ModbusReadFunctionCode functionCod
try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
comms.submitOneTimePoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID, functionCode, offset, count, 1),
result -> {
callbackCalled.countDown();
if (result.getBits() != null) {
lastData.set(result.getBits());
} else {
unexpectedCount.incrementAndGet();
}
callbackCalled.countDown();
});
assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));

Expand Down Expand Up @@ -351,12 +351,12 @@ public void testOneOffReadWithHolding() throws Exception {
try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
comms.submitOneTimePoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 1, 15, 1), result -> {
callbackCalled.countDown();
if (result.getRegisters() != null) {
lastData.set(result.getRegisters());
} else {
unexpectedCount.incrementAndGet();
}
callbackCalled.countDown();
});
assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));

Expand All @@ -382,12 +382,12 @@ public void testOneOffReadWithInput() throws Exception {
try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
comms.submitOneTimePoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
ModbusReadFunctionCode.READ_INPUT_REGISTERS, 1, 15, 1), result -> {
callbackCalled.countDown();
if (result.getRegisters() != null) {
lastData.set(result.getRegisters());
} else {
unexpectedCount.incrementAndGet();
}
callbackCalled.countDown();
});
assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));

Expand Down Expand Up @@ -459,11 +459,10 @@ public void testOneOffWriteMultipleCoilError() throws Exception {
comms.submitOneTimeWrite(new ModbusWriteCoilRequestBlueprint(SLAVE_UNIT_ID, 3, bits, true, 1), result -> {
if (result.hasError()) {
lastError.set(result.getCause());
callbackCalled.countDown();
} else {
unexpectedCount.incrementAndGet();
callbackCalled.countDown();
}
callbackCalled.countDown();
});
assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));

Expand Down Expand Up @@ -499,11 +498,10 @@ public void testOneOffWriteSingleCoil() throws Exception {
comms.submitOneTimeWrite(new ModbusWriteCoilRequestBlueprint(SLAVE_UNIT_ID, 3, bits, false, 1), result -> {
if (result.hasError()) {
unexpectedCount.incrementAndGet();
callbackCalled.countDown();
} else {
lastData.set(result.getResponse());
callbackCalled.countDown();
}
callbackCalled.countDown();
});
assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));

Expand Down Expand Up @@ -540,11 +538,10 @@ public void testOneOffWriteSingleCoilError() throws Exception {
result -> {
if (result.hasError()) {
lastError.set(result.getCause());
callbackCalled.countDown();
} else {
unexpectedCount.incrementAndGet();
callbackCalled.countDown();
}
callbackCalled.countDown();
});
assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));

Expand Down Expand Up @@ -580,10 +577,8 @@ public void testRegularReadEvery150msWithCoil() throws Exception {
comms.registerRegularPoll(
new ModbusReadRequestBlueprint(SLAVE_UNIT_ID, ModbusReadFunctionCode.READ_COILS, 1, 15, 1), 150, 0,
result -> {
callbackCalled.countDown();
if (result.getBits() != null) {
dataReceived.incrementAndGet();

BitArray bits = result.getBits();
try {
assertThat(bits.size(), is(equalTo(15)));
Expand All @@ -595,6 +590,7 @@ public void testRegularReadEvery150msWithCoil() throws Exception {
} else {
unexpectedCount.incrementAndGet();
}
callbackCalled.countDown();
});
assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));

Expand Down Expand Up @@ -623,7 +619,6 @@ public void testRegularReadEvery150msWithHolding() throws Exception {
try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
comms.registerRegularPoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 1, 15, 1), 150, 0, result -> {
callbackCalled.countDown();
if (result.getRegisters() != null) {
dataReceived.incrementAndGet();
ModbusRegisterArray registers = result.getRegisters();
Expand All @@ -637,6 +632,7 @@ public void testRegularReadEvery150msWithHolding() throws Exception {
} else {
unexpectedCount.incrementAndGet();
}
callbackCalled.countDown();
});
assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
long end = System.currentTimeMillis();
Expand All @@ -657,7 +653,6 @@ public void testRegularReadFirstErrorThenOK() throws Exception {
try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
PollTask task = comms.registerRegularPoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 1, 15, 1), 150, 0, result -> {
callbackCalled.countDown();
if (result.getRegisters() != null) {
dataReceived.incrementAndGet();
ModbusRegisterArray registers = result.getRegisters();
Expand All @@ -671,6 +666,7 @@ public void testRegularReadFirstErrorThenOK() throws Exception {
} else {
unexpectedCount.incrementAndGet();
}
callbackCalled.countDown();
});
assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
long end = System.currentTimeMillis();
Expand Down Expand Up @@ -710,14 +706,13 @@ public void testUnregisterPollingOnClose() throws Exception {

AtomicInteger unexpectedCount = new AtomicInteger();
AtomicInteger errorCount = new AtomicInteger();
CountDownLatch callbackCalled = new CountDownLatch(3);
CountDownLatch successfulCountDownLatch = new CountDownLatch(3);
AtomicInteger expectedReceived = new AtomicInteger();

long start = System.currentTimeMillis();
try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
PollTask task = comms.registerRegularPoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 1, 15, 1), 200, 0, result -> {
callbackCalled.countDown();
if (result.getRegisters() != null) {
expectedReceived.incrementAndGet();
} else if (result.hasError()) {
Expand All @@ -728,14 +723,15 @@ public void testUnregisterPollingOnClose() throws Exception {
expectedReceived.incrementAndGet();
errorCount.incrementAndGet();
generateData();
callbackCalled.countDown();
successfulCountDownLatch.countDown();
}
} else {
// bits
unexpectedCount.incrementAndGet();
}
});
assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
// Wait for N successful responses before proceeding with assertions of poll rate
assertTrue(successfulCountDownLatch.await(60, TimeUnit.SECONDS));

long end = System.currentTimeMillis();
assertPollDetails(unexpectedCount, expectedReceived, start, end, 190, 600);
Expand All @@ -759,7 +755,6 @@ public void testUnregisterPollingExplicit() throws Exception {
try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, null)) {
PollTask task = comms.registerRegularPoll(new ModbusReadRequestBlueprint(SLAVE_UNIT_ID,
ModbusReadFunctionCode.READ_MULTIPLE_REGISTERS, 1, 15, 1), 200, 0, result -> {
callbackCalled.countDown();
if (result.getRegisters() != null) {
expectedReceived.incrementAndGet();
} else if (result.hasError()) {
Expand All @@ -775,6 +770,7 @@ public void testUnregisterPollingExplicit() throws Exception {
// bits
unexpectedCount.incrementAndGet();
}
callbackCalled.countDown();
});
assertTrue(callbackCalled.await(60, TimeUnit.SECONDS));
long end = System.currentTimeMillis();
Expand Down

0 comments on commit e78cf01

Please sign in to comment.