From 6a2bbfd0ff75e20ed401022baddb86a4332dc501 Mon Sep 17 00:00:00 2001 From: tinyAdapter Date: Wed, 7 Aug 2024 02:23:50 +0000 Subject: [PATCH 1/4] fix: NullPointerException when a new cublet is created --- .../core/io/writestore/MetaUserFieldWS.java | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/cool-core/src/main/java/com/nus/cool/core/io/writestore/MetaUserFieldWS.java b/cool-core/src/main/java/com/nus/cool/core/io/writestore/MetaUserFieldWS.java index 21c8d998..a80149b8 100644 --- a/cool-core/src/main/java/com/nus/cool/core/io/writestore/MetaUserFieldWS.java +++ b/cool-core/src/main/java/com/nus/cool/core/io/writestore/MetaUserFieldWS.java @@ -1,5 +1,6 @@ package com.nus.cool.core.io.writestore; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.nus.cool.core.field.FieldValue; import com.nus.cool.core.field.HashField; @@ -14,6 +15,7 @@ import java.io.DataOutput; import java.io.IOException; import java.nio.charset.Charset; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -38,7 +40,12 @@ public class MetaUserFieldWS extends MetaHashFieldWS { public MetaUserFieldWS(FieldType type, Charset charset, MetaChunkWS metaChunkWS) { super(type, charset); this.metaChunkWS = metaChunkWS; - for (int invariantIdx : metaChunkWS.getTableSchema().getInvariantFieldIdxs()) { + this.resetInvariantIdxToValueList(); + } + + private void resetInvariantIdxToValueList() { + this.invariantIdxToValueList.clear(); + for (int invariantIdx : this.metaChunkWS.getTableSchema().getInvariantFieldIdxs()) { this.invariantIdxToValueList.put(invariantIdx, new LinkedList<>()); } } @@ -47,7 +54,7 @@ public MetaUserFieldWS(FieldType type, Charset charset, MetaChunkWS metaChunkWS) public void put(FieldValue[] tuple, int idx) throws IllegalArgumentException { if (!(tuple[idx] instanceof HashField)) { throw new IllegalArgumentException( - "Illegal argument for MetaUserFieldWS (HashField required)."); + "Illegal argument for MetaUserFieldWS (HashField required)."); } HashField user = (HashField) tuple[idx]; int hashKey = user.getInt(); @@ -89,15 +96,12 @@ public int count() { @Override public void cleanForNextCublet() { - this.fingerToGid.clear(); - this.invariantIdxToValueList.clear(); - this.valueList.clear(); - this.nextGid = 0; // a field can have different id across cublet. + super.cleanForNextCublet(); + this.resetInvariantIdxToValueList(); } @Override public int writeTo(DataOutput out) throws IOException { - int bytesWritten = writeFingersAndGids(out); TableSchema tableSchema = this.metaChunkWS.getTableSchema(); @@ -121,13 +125,13 @@ public int writeTo(DataOutput out) throws IOException { .numOfValues(values.size()) .build(); bytesWritten += OutputCompressor.writeTo(CompressType.Value, hist, - values, out); + values, out); } // Write value bytesWritten += OutputCompressor.writeTo(CompressType.KeyString, - Histogram.builder().charset(charset).build(), - valueList, out); + Histogram.builder().charset(charset).build(), + valueList, out); return bytesWritten; } From 523f5b03ddf631ecd378f2982af713967fc61b32 Mon Sep 17 00:00:00 2001 From: tinyAdapter Date: Wed, 7 Aug 2024 02:36:42 +0000 Subject: [PATCH 2/4] fix: wrong data chunk offset due to not resetting after cublet change --- .../nus/cool/core/io/readstore/CubletRS.java | 16 ++++++++++++--- .../cool/core/io/writestore/DataChunkWS.java | 6 +++++- .../core/util/writer/NativeDataWriter.java | 20 +++++++++++++++---- .../java/com/nus/cool/model/CoolModel.java | 8 ++++++++ 4 files changed, 42 insertions(+), 8 deletions(-) diff --git a/cool-core/src/main/java/com/nus/cool/core/io/readstore/CubletRS.java b/cool-core/src/main/java/com/nus/cool/core/io/readstore/CubletRS.java index 68386ec2..5f4ebb0a 100644 --- a/cool-core/src/main/java/com/nus/cool/core/io/readstore/CubletRS.java +++ b/cool-core/src/main/java/com/nus/cool/core/io/readstore/CubletRS.java @@ -21,14 +21,16 @@ import static com.google.common.base.Preconditions.checkNotNull; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.primitives.Ints; import com.nus.cool.core.io.Input; import com.nus.cool.core.io.compression.SimpleBitSetCompressor; import com.nus.cool.core.schema.TableSchema; -import java.nio.ByteBuffer; -import java.util.BitSet; -import java.util.List; import lombok.Getter; import lombok.Setter; @@ -50,6 +52,8 @@ */ public class CubletRS implements Input { + static final Logger logger = LoggerFactory.getLogger(CubletRS.class); + /** * MetaChunk for this cublet. */ @@ -82,6 +86,8 @@ public CubletRS(TableSchema schema) { */ @Override public void readFrom(ByteBuffer buffer) { + logger.debug("readFrom: buffer.limit()=" + buffer.limit()); + // Read header offset int end = buffer.limit(); this.limit = end; @@ -105,12 +111,15 @@ public void readFrom(ByteBuffer buffer) { } } + logger.debug("headOffset=" + headOffset); + // Get #chunk and chunk offsets buffer.position(headOffset); int chunks = buffer.getInt(); int[] chunkOffsets = new int[chunks]; for (int i = 0; i < chunks; i++) { chunkOffsets[i] = buffer.getInt(); + logger.debug("chunkOffsets[" + i + "]=" + chunkOffsets[i]); } // read the metaChunk, which is the last one in #chunks @@ -124,6 +133,7 @@ public void readFrom(ByteBuffer buffer) { for (int i = 0; i < chunks - 1; i++) { buffer.position(chunkOffsets[i]); chunkHeadOffset = buffer.getInt(); + logger.debug("chunkHeadOffset[" + i + "]=" + chunkHeadOffset); buffer.position(chunkHeadOffset); ChunkRS chunk = new ChunkRS(this.schema, this.metaChunk); chunk.readFrom(buffer); diff --git a/cool-core/src/main/java/com/nus/cool/core/io/writestore/DataChunkWS.java b/cool-core/src/main/java/com/nus/cool/core/io/writestore/DataChunkWS.java index 4a76f082..eb6cb998 100644 --- a/cool-core/src/main/java/com/nus/cool/core/io/writestore/DataChunkWS.java +++ b/cool-core/src/main/java/com/nus/cool/core/io/writestore/DataChunkWS.java @@ -58,7 +58,7 @@ public class DataChunkWS implements Output { /** * Chunk beginning offset, don't update. */ - private final int chunkBeginOffset; + private int chunkBeginOffset; /** * Number of records. @@ -82,6 +82,10 @@ public DataChunkWS(int offset, DataFieldWS[] fields) { this.chunkBeginOffset = offset; } + public void setChunkBeginOffset(int value) { + this.chunkBeginOffset = value; + } + /** * Data Chunk Builder. * diff --git a/cool-core/src/main/java/com/nus/cool/core/util/writer/NativeDataWriter.java b/cool-core/src/main/java/com/nus/cool/core/util/writer/NativeDataWriter.java index ee0da22b..68538f62 100644 --- a/cool-core/src/main/java/com/nus/cool/core/util/writer/NativeDataWriter.java +++ b/cool-core/src/main/java/com/nus/cool/core/util/writer/NativeDataWriter.java @@ -13,6 +13,8 @@ import java.util.List; import javax.validation.constraints.NotNull; import lombok.RequiredArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Native data writer writes a set of records in cool storage format. @@ -20,6 +22,8 @@ @RequiredArgsConstructor public class NativeDataWriter implements DataWriter { + static final Logger logger = LoggerFactory.getLogger(NativeDataWriter.class); + @NotNull private final TableSchema tableSchema; @@ -107,6 +111,7 @@ private boolean maybeSwitchChunk(FieldValue curUser) throws IOException { } finishChunk(); // create a new data chunk, init tuple Count + logger.debug("newDataChunk: offset=" + offset); dataChunk = DataChunkWS.newDataChunk(tableSchema, metaChunk.getMetaFields(), offset); tupleCount = 0; return true; @@ -134,6 +139,8 @@ private void finishCublet() throws IOException { // 4. flush after writing whole Cublet. out.flush(); out.close(); + + this.offset = 0; } /** @@ -146,7 +153,6 @@ private DataOutputStream newCublet() throws IOException { System.out.println("[*] A new cublet " + fileName + " is created!"); File cublet = new File(outputDir, fileName); DataOutputStream out = new DataOutputStream(new FileOutputStream(cublet)); - offset = 0; chunkHeaderOffsets.clear(); return out; } @@ -154,12 +160,16 @@ private DataOutputStream newCublet() throws IOException { /** * Switch a new cublet File once meet 1GB. */ - private void maybeSwitchCublet() throws IOException { + private boolean maybeSwitchCublet() throws IOException { if (offset < cubletSize) { - return; + return false; } finishCublet(); + logger.debug("switching cublet..."); + out = newCublet(); + + return true; } @Override @@ -170,7 +180,9 @@ public boolean add(FieldValue[] tuple) throws IOException { } // start a new chunk if (maybeSwitchChunk(curUser)) { - maybeSwitchCublet(); + if (maybeSwitchCublet()) { + dataChunk.setChunkBeginOffset(0); + } } lastUser = curUser; // update metachunk / metafield diff --git a/cool-core/src/main/java/com/nus/cool/model/CoolModel.java b/cool-core/src/main/java/com/nus/cool/model/CoolModel.java index b36e6992..f890a59f 100644 --- a/cool-core/src/main/java/com/nus/cool/model/CoolModel.java +++ b/cool-core/src/main/java/com/nus/cool/model/CoolModel.java @@ -143,12 +143,20 @@ public synchronized void reload(String cube) throws IOException { CubeRS cubeRS = new CubeRS(schema); File[] cubletFiles = currentVersion.listFiles((file, s) -> s.endsWith(".dz")); + Arrays.sort(cubletFiles); + logger.info("Cube " + cube + ", Use version: " + currentVersion.getName()); storePath.put(cube, currentVersion); + logger.debug("cublet files: "); + for (int i = 0; i < cubletFiles.length; i++) { + logger.debug(cubletFiles[i].getPath()); + } + // Load all cubes under latest version checkNotNull(cubletFiles); for (File cubletFile : cubletFiles) { + logger.debug("loading cublet file: " + cubletFile); cubeRS.addCublet(cubletFile); } From 10c2542df8c7c714213148659d46460b90e2ade8 Mon Sep 17 00:00:00 2001 From: tinyAdapter Date: Wed, 7 Aug 2024 03:26:18 +0000 Subject: [PATCH 3/4] refactor: new data chunk instead of inplace setting --- .../java/com/nus/cool/core/io/writestore/DataChunkWS.java | 6 +----- .../com/nus/cool/core/util/writer/NativeDataWriter.java | 3 ++- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/cool-core/src/main/java/com/nus/cool/core/io/writestore/DataChunkWS.java b/cool-core/src/main/java/com/nus/cool/core/io/writestore/DataChunkWS.java index eb6cb998..4a76f082 100644 --- a/cool-core/src/main/java/com/nus/cool/core/io/writestore/DataChunkWS.java +++ b/cool-core/src/main/java/com/nus/cool/core/io/writestore/DataChunkWS.java @@ -58,7 +58,7 @@ public class DataChunkWS implements Output { /** * Chunk beginning offset, don't update. */ - private int chunkBeginOffset; + private final int chunkBeginOffset; /** * Number of records. @@ -82,10 +82,6 @@ public DataChunkWS(int offset, DataFieldWS[] fields) { this.chunkBeginOffset = offset; } - public void setChunkBeginOffset(int value) { - this.chunkBeginOffset = value; - } - /** * Data Chunk Builder. * diff --git a/cool-core/src/main/java/com/nus/cool/core/util/writer/NativeDataWriter.java b/cool-core/src/main/java/com/nus/cool/core/util/writer/NativeDataWriter.java index 68538f62..5edd0281 100644 --- a/cool-core/src/main/java/com/nus/cool/core/util/writer/NativeDataWriter.java +++ b/cool-core/src/main/java/com/nus/cool/core/util/writer/NativeDataWriter.java @@ -181,7 +181,8 @@ public boolean add(FieldValue[] tuple) throws IOException { // start a new chunk if (maybeSwitchChunk(curUser)) { if (maybeSwitchCublet()) { - dataChunk.setChunkBeginOffset(0); + // create a new data chunk with offset 0 + this.dataChunk = DataChunkWS.newDataChunk(this.tableSchema, this.metaChunk.getMetaFields(), 0); } } lastUser = curUser; From 8ea014224fbaa9d4704c69932adc31844bdf0b82 Mon Sep 17 00:00:00 2001 From: hugy718 Date: Wed, 7 Aug 2024 16:33:00 +0800 Subject: [PATCH 4/4] fix: formatting --- .../java/com/nus/cool/core/io/readstore/CubletRS.java | 10 +++++----- .../nus/cool/core/util/writer/NativeDataWriter.java | 3 ++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/cool-core/src/main/java/com/nus/cool/core/io/readstore/CubletRS.java b/cool-core/src/main/java/com/nus/cool/core/io/readstore/CubletRS.java index 5f4ebb0a..5c8fd047 100644 --- a/cool-core/src/main/java/com/nus/cool/core/io/readstore/CubletRS.java +++ b/cool-core/src/main/java/com/nus/cool/core/io/readstore/CubletRS.java @@ -21,18 +21,18 @@ import static com.google.common.base.Preconditions.checkNotNull; -import java.nio.ByteBuffer; -import java.util.BitSet; -import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.primitives.Ints; import com.nus.cool.core.io.Input; import com.nus.cool.core.io.compression.SimpleBitSetCompressor; import com.nus.cool.core.schema.TableSchema; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.List; import lombok.Getter; import lombok.Setter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Read cublet store diff --git a/cool-core/src/main/java/com/nus/cool/core/util/writer/NativeDataWriter.java b/cool-core/src/main/java/com/nus/cool/core/util/writer/NativeDataWriter.java index 5edd0281..2d0d2579 100644 --- a/cool-core/src/main/java/com/nus/cool/core/util/writer/NativeDataWriter.java +++ b/cool-core/src/main/java/com/nus/cool/core/util/writer/NativeDataWriter.java @@ -182,7 +182,8 @@ public boolean add(FieldValue[] tuple) throws IOException { if (maybeSwitchChunk(curUser)) { if (maybeSwitchCublet()) { // create a new data chunk with offset 0 - this.dataChunk = DataChunkWS.newDataChunk(this.tableSchema, this.metaChunk.getMetaFields(), 0); + this.dataChunk = DataChunkWS.newDataChunk( + this.tableSchema, this.metaChunk.getMetaFields(), 0); } } lastUser = curUser;