Skip to content

Commit

Permalink
handle ignore
Browse files Browse the repository at this point in the history
  • Loading branch information
baomingyu committed Mar 8, 2024
1 parent 020ab5b commit 69d84b8
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@
*/
public interface FailureHandler extends Serializable {

/**
* This method is called when there is a failure occurred while parsing to check is
* or not parse failure.
*
*/
default boolean isIgnoreFailure() {
return false;
};

/**
* This method is called when there is a failure occurred while parsing not InLongMsg.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public void onConvertingRowFailure(InLongMsgHead head, InLongMsgBody body, Excep
LOG.warn("Cannot properly convert the InLongMsg ({}, {})", head, body, exception);
}

@Override
public boolean isIgnoreFailure() {
return true;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public void onConvertingRowFailure(InLongMsgHead head, InLongMsgBody body, Excep
throw exception;
}

@Override
public boolean isIgnoreFailure() {
return false;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ public abstract class DefaultDeserializationSchema<T> implements Deserialization

private static final Logger LOG = LoggerFactory.getLogger(DefaultDeserializationSchema.class);

/**
* If true, the deserialization error will be ignored.
*/
private final boolean ignoreErrors;

/**
* If true, a parsing error is occurred.
*/
Expand All @@ -60,7 +55,6 @@ public abstract class DefaultDeserializationSchema<T> implements Deserialization
protected transient FormatMetricGroup formatMetricGroup;

public DefaultDeserializationSchema(boolean ignoreErrors) {
this.ignoreErrors = ignoreErrors;
if (ignoreErrors) {
failureHandler = new IgnoreFailureHandler();
} else {
Expand All @@ -70,7 +64,6 @@ public DefaultDeserializationSchema(boolean ignoreErrors) {

public DefaultDeserializationSchema(FailureHandler failureHandler) {
this.failureHandler = failureHandler;
this.ignoreErrors = false;
}

@Override
Expand Down Expand Up @@ -104,7 +97,7 @@ public T deserialize(byte[] bytes) throws IOException {
if (formatMetricGroup != null) {
formatMetricGroup.getNumRecordsDeserializeError().inc(1L);
}
if (ignoreErrors) {
if (failureHandler != null && failureHandler.isIgnoreFailure()) {
if (formatMetricGroup != null) {
formatMetricGroup.getNumRecordsDeserializeErrorIgnored().inc(1L);
}
Expand All @@ -122,7 +115,7 @@ public T deserialize(byte[] bytes) throws IOException {
}

public boolean skipCurrentRecord(T element) {
return ignoreErrors && errorOccurred;
return (failureHandler != null && failureHandler.isIgnoreFailure()) && errorOccurred;
}

protected abstract T deserializeInternal(byte[] bytes) throws Exception;
Expand All @@ -136,11 +129,11 @@ public boolean equals(Object object) {
return false;
}
DefaultDeserializationSchema<?> that = (DefaultDeserializationSchema<?>) object;
return Objects.equals(ignoreErrors, that.ignoreErrors);
return Objects.equals(failureHandler, that.failureHandler);
}

@Override
public int hashCode() {
return Objects.hash(ignoreErrors);
return Objects.hash(failureHandler);
}
}

0 comments on commit 69d84b8

Please sign in to comment.