Skip to content

Commit

Permalink
[Sort]Upgrade sort format to support InLongMsg body containing multip…
Browse files Browse the repository at this point in the history
…le pieces of data (apache#9520)
  • Loading branch information
baomingyu committed Dec 27, 2023
1 parent 9d745b8 commit 2191ae1
Show file tree
Hide file tree
Showing 58 changed files with 3,392 additions and 1,378 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ public interface AttributeConstants {
*/
String STREAM_ID = "streamId";

/**
* iname is like a streamId but used in file protocol(m=xxx)
*/
String INAME = "iname";

/**
* data time
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,14 @@ public void write(byte[] array, int position, int len)
private boolean compress;
private boolean isNumGroupId = false;
private boolean ischeck = true;

private boolean isSupportLF = false;
private final Version version;
private long timeoffset = 0;

public int getVersion() {
return version.intValue();
}

public void setTimeoffset(long offset) {
this.timeoffset = offset;
}
Expand Down Expand Up @@ -306,6 +310,10 @@ private boolean getBinNumFlag(ByteBuffer data) {
return (data.getShort(BIN_MSG_EXTFIELD_OFFSET) & 0x4) == 0;
}

private boolean getBinisSupportLF(ByteBuffer data) {
return (data.getShort(BIN_MSG_EXTFIELD_OFFSET) & 0x20) == 0x20;
}

private boolean checkBinData(ByteBuffer data) {
int totalLen = data.getInt(BIN_MSG_TOTALLEN_OFFSET);
int bodyLen = data.getInt(BIN_MSG_BODYLEN_OFFSET);
Expand Down Expand Up @@ -644,6 +652,7 @@ private InLongMsg(ByteBuffer buffer, Version magic) throws IOException {
this.createtime = getBinCreatetime(parsedBinInput);
this.msgcnt = getBinMsgCnt(parsedBinInput);
this.isNumGroupId = getBinNumFlag(parsedBinInput);
this.isSupportLF = getBinisSupportLF(parsedBinInput);
}
}

Expand Down Expand Up @@ -800,7 +809,7 @@ private void parseBinMsg() throws IOException {
}

boolean hasOtherAttr = ((extField & 0x1) == 0x1);
commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, "1");
commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(this.msgcnt));
// with private attributes,
// need to splice private attributes + public attributes
if (!hasOtherAttr) {
Expand Down Expand Up @@ -1117,4 +1126,8 @@ public boolean isNumGroupId() {
checkMode(false);
return isNumGroupId;
}

public boolean isSupportLF() {
return isSupportLF;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public String buildAttr() throws Exception {
throw new Exception("t is null");
}

attrBuffer.append("&iname=").append(id);
attrBuffer.append("&streamId=").append(id);

Date d = transData(this.tt, t);
String tstr = null;
Expand Down Expand Up @@ -251,7 +251,7 @@ public MsgAttrProtocolM100 setPartitionUnit(PartitionUnit p) {
public String buildAttr() throws Exception {
// #lizard forgives
if (id != null) {
attrBuffer.append("&iname=").append(id);
attrBuffer.append("&streamId=").append(id);
} else if (idp >= 0) {
attrBuffer.append("&idp=").append(idp);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.formats.base;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* The base implementation of {@link DeserializationSchema}.
*/
public abstract class DefaultDeserializationSchema<T> implements DeserializationSchema<T> {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(DefaultDeserializationSchema.class);

/**
* If true, the deserialization error will be ignored.
*/
protected final boolean ignoreErrors;

/**
* If true, a parsing error is occurred.
*/
private boolean errorOccurred = false;

/**
* The metric counting the number of ignored errors.
*/
private transient Counter numIgnoredErrors;

public DefaultDeserializationSchema(boolean ignoreErrors) {
this.ignoreErrors = ignoreErrors;
}

/**
* Deserialize the data and handle the failure.
*
* <p>Note: Returns null if the message could not be properly deserialized.
*/
@Override
public T deserialize(byte[] bytes) throws IOException {
try {
T result = deserializeInternal(bytes);
// reset error state after deserialize success
errorOccurred = false;
return result;
} catch (Exception e) {
if (ignoreErrors) {
errorOccurred = true;
getNumIgnoredErrors().inc();
if (bytes == null) {
LOG.warn("Could not properly deserialize the data null.");
} else {
LOG.warn("Could not properly deserialize the data {}.",
StringUtils.byteToHexString(bytes), e);
}
return null;
}
throw new IOException("Failed to deserialize data " +
StringUtils.byteToHexString(bytes), e);
}
}

protected abstract T deserializeInternal(byte[] bytes) throws IOException;

public boolean skipCurrentRecord(T t) {
return ignoreErrors && errorOccurred;
}

public long numSkippedRecords() {
return getNumIgnoredErrors().getCount();
}

// This method is used to ensure initialization of the 'numIgnoredErrors' field. The field is non-serializable and
// marked as transient
private Counter getNumIgnoredErrors() {
if (numIgnoredErrors == null) {
numIgnoredErrors = new SimpleCounter();
}
return numIgnoredErrors;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.formats.base;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* The base implementation of {@link SerializationSchema}.
*/
public abstract class DefaultSerializationSchema<T> implements SerializationSchema<T> {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(DefaultSerializationSchema.class);

/**
* If true, the serialization error will be ignored.
*/
private final boolean ignoreErrors;

/**
* If true, a parsing error is occurred.
*/
private boolean errorOccurred = false;

/**
* The metric counting the number of ignored errors.
*/
private transient Counter numIgnoredErrors;

public DefaultSerializationSchema(boolean ignoreErrors) {
this.ignoreErrors = ignoreErrors;
}

/**
* Serialize the data and handle the failure.
*
* <p>Note: Returns null if the message could not be properly serialized.
*/
@Override
public byte[] serialize(T data) {
try {
byte[] result = serializeInternal(data);
// reset error state after serialize success
errorOccurred = false;
return result;
} catch (Exception e) {
if (ignoreErrors) {
errorOccurred = true;
getNumIgnoredErrors().inc();
LOG.warn("Could not properly serialize the data {}", data, e);
return null;
}
throw new RuntimeException("Failed to serialize data " + data, e);
}
}

protected abstract byte[] serializeInternal(T data) throws IOException;

public boolean skipCurrentRecord(T t) {
return ignoreErrors && errorOccurred;
}

public long numSkippedRecords() {
return getNumIgnoredErrors().getCount();
}

// This method is used to ensure initialization of the numIgnoredErrors field. The field is non-serializable and
// marked as transient
private Counter getNumIgnoredErrors() {
if (numIgnoredErrors == null) {
numIgnoredErrors = new SimpleCounter();
}
return numIgnoredErrors;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
/**
* The default implementation of {@link TableFormatDeserializer}.
*/
public class DefaultTableFormatDeserializer implements TableFormatDeserializer {
public class DefaultTableFormatDeserializer extends TableFormatDeserializer {

private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,23 @@
/**
* The default implementation of {@link TableFormatSerializer}.
*/
public class DefaultTableFormatSerializer implements TableFormatSerializer {
public class DefaultTableFormatSerializer extends TableFormatSerializer {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(DefaultTableFormatSerializer.class);

/**
* The delegated serialization schema for rows.
*/
private final SerializationSchema<Row> serializationSchema;

/**
* True if ignore errors in the serialization.
* True if ignore errors in the deserialization.
*/
private final boolean ignoreErrors;

public DefaultTableFormatSerializer(
SerializationSchema<Row> serializationSchema,
boolean ignoreErrors) {
SerializationSchema<Row> serializationSchema, boolean ignoreErrors) {
this.serializationSchema = serializationSchema;
this.ignoreErrors = ignoreErrors;
}
Expand All @@ -75,7 +73,11 @@ public void flatMap(
}
}

collector.collect(bytes);
if (bytes != null) {
collector.collect(bytes);
} else {
LOG.warn("Data is Empty.");
}
}

@Override
Expand All @@ -89,20 +91,17 @@ public boolean equals(Object o) {
}

DefaultTableFormatSerializer that = (DefaultTableFormatSerializer) o;
return ignoreErrors == that.ignoreErrors
&& Objects.equals(serializationSchema, that.serializationSchema);
return Objects.equals(serializationSchema, that.serializationSchema);
}

@Override
public int hashCode() {
return Objects.hash(serializationSchema, ignoreErrors);
return Objects.hash(serializationSchema);
}

@Override
public String toString() {
return "DefaultTableFormatSerializer{"
+ "serializationSchema=" + serializationSchema
+ ", ignoreErrors=" + ignoreErrors
+ '}';
return "DefaultTableFormatSerializer{" +
"serializationSchema=" + serializationSchema + '}';
}
}
Loading

0 comments on commit 2191ae1

Please sign in to comment.