Closed
Description
my environment
- openjdk 11.0.2
- influxdb-java 2.15
- influxdb : 1.7.6
during load test, there are a lot of message below.
then don't send any data.
Could you help out.
"OkHttp ConnectionPool" #441 daemon prio=5 os_prio=0 cpu=0.14ms elapsed=6.96s tid=0x00007face4057800 nid=0x7e11 in Object.wait() [0x00007facce1a2000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(java.base@11.0.2/Native Method)
- waiting on <0x0000000700b731f0> (a okhttp3.ConnectionPool)
at java.lang.Object.wait(java.base@11.0.2/Object.java:462)
at okhttp3.ConnectionPool.lambda$new$0(ConnectionPool.java:66)
- waiting to re-lock in wait() <0x0000000700b731f0> (a okhttp3.ConnectionPool)
at okhttp3.ConnectionPool$$Lambda$212/0x000000080026e440.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.2/ThreadPoolExecutor.java:1128)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.2/ThreadPoolExecutor.java:628)
at java.lang.Thread.run(java.base@11.0.2/Thread.java:834)
"OkHttp ConnectionPool" #440 daemon prio=5 os_prio=0 cpu=0.20ms elapsed=6.96s tid=0x00007fad5c015000 nid=0x7e12 in Object.wait() [0x00007facce0a1000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(java.base@11.0.2/Native Method)
- waiting on <no object reference available>
at java.lang.Object.wait(java.base@11.0.2/Object.java:462)
at okhttp3.ConnectionPool.lambda$new$0(ConnectionPool.java:66)
- waiting to re-lock in wait() <0x0000000700433e68> (a okhttp3.ConnectionPool)
at okhttp3.ConnectionPool$$Lambda$212/0x000000080026e440.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.2/ThreadPoolExecutor.java:1128)
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonReader;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.influx.InfluxDbProperties;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Component
@RequiredArgsConstructor
//@EnableConfigurationProperties(InfluxDbProperties.class)
@Slf4j
public class TestService {
// private final InfluxDB influxDB;
private final InfluxDbProperties influxDbProperties;
@Value("${json.dir}")
private String dir;
@Value("${influxinfo.dbname}")
private String dbname;
@Value("${influxinfo.measurement}")
private String measurement;
// @PostConstruct
// public void init() {
// influxDB.enableGzip();
// influxDB.setDatabase(this.dbname);
// influxDB.setRetentionPolicy("autogen");
// influxDB.enableBatch(BatchOptions.DEFAULTS.actions(1000).flushDuration(100).jitterDuration(500));
// }
// @PreDestroy
// public void destory() {
// influxDB.close();
// }
// @Async("testExecutor")
public void saveInflux(String fullPath) {
InfluxDB influxDB = InfluxDBFactory.connect(
influxDbProperties.getUrl(),
influxDbProperties.getUser(),
influxDbProperties.getPassword());
influxDB.enableGzip();
influxDB.setDatabase(this.dbname);
influxDB.setRetentionPolicy("autogen");
// influxDB.enableBatch(BatchOptions.DEFAULTS.actions(5000).flushDuration(1000).jitterDuration(10));
StopWatch stopWatch1 = new StopWatch();
stopWatch1.start("다중 처리 :");
log.info("fullPath : {}", fullPath);
int i = 0;
try (JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(fullPath), "UTF-8"))) {
Gson gson = new GsonBuilder().create();
reader.beginArray();
log.debug("begine array");
while (reader.hasNext()) {
i++;
Map<String, Object> map = gson.fromJson(reader, Map.class);
// log.info("map : {}", map);
Map<String, String> header = (Map) map.get("HEADER");
Map<String, Object> payload = (Map) map.get("PAYLOAD");
long time = Long.parseLong(header.get("TIME")) * 1000;
if (header.containsKey("TIME")) header.remove("TIME");
if (header.containsKey("UUID")) header.remove("UUID");
// log.debug ("sdfsdfdsfdsfdsfsfd");
payload.forEach((k, v) -> {
Map<String, Object> field = (Map<String, Object>) v;
influxDB.write(Point.measurement(this.measurement)
.time(time, TimeUnit.MILLISECONDS)
.tag(header)
.tag("maker", k)
.fields(field)
.build()
);
});
if (i % 100 == 0) {
log.info(fullPath + ": count" + i );
}
}
reader.endArray();
stopWatch1.stop();
log.info(stopWatch1.prettyPrint());
} catch (Exception e) {
log.error(e.getMessage());
}
}
}