Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Max Compute Sink #52

Open
wants to merge 121 commits into
base: main
Choose a base branch
from
Open

feat: Max Compute Sink #52

wants to merge 121 commits into from

Conversation

ekawinataa
Copy link

No description provided.

@ekawinataa ekawinataa changed the title Feat max compute sink feat: Max Compute Sink Phase 1 Oct 29, 2024

@Override
public Map<String, String> convert(Method method, String s) {
if (Objects.isNull(s) || StringUtils.isEmpty(s.trim())) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map<String, String> settings = new HashMap<>();
if (Objects.isNull(s) || StringUtils.isEmpty(s.trim())) {
    return settings;
}

String[] pairs = s.split(CONFIG_SEPARATOR);
for (String pair : pairs) {
....

import java.util.List;

@AllArgsConstructor
@NoArgsConstructor
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is no args constructor required?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some other dependent class requires MaxComputeClient to be mocked, which requires NoArgsConstructor. I admit that there is no actual usage outside of the test, do you have any suggestion?

.getSchema();
}

public void upsertTable(TableSchema tableSchema) throws OdpsException {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upsert means "update or insert the records" and what we are doing here is "create or update table ddl", we should name it as createOrUpdateTable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BQ Client uses the same terminology upsert for updating and creating tables. Wdyt, should we maintain naming consistency for this sink as well?

private final Instrumentation instrumentation;
private final MaxComputeMetrics maxComputeMetrics;

public void upsertTable(TableSchema tableSchema) throws OdpsException {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here;

upsert means "update or insert the records" and what we are doing here is "create or update table ddl", we should name it as createOrUpdateTable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BQ Client uses the same terminology upsert for updating and creating tables. Wdyt, should we maintain naming consistency for this sink as well?

Comment on lines +46 to +47
new TableTunnel.FlushOption()
.timeout(super.getMaxComputeSinkConfig().getMaxComputeRecordPackFlushTimeoutMs()));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this instance can be created in this class constructor and pass the reference.

.build(cacheLoader);
}

public static StreamingSessionManager nonParititonedStreamingSessionManager(TableTunnel tableTunnel, MaxComputeSinkConfig maxComputeSinkConfig) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usage will look like StreamingSessionManager.nonParititonedStreamingSessionManager repeatative

I suggest to use createNonPartioned or newNonPartioned or newPartionedInstance

}, maxComputeSinkConfig);
}

public static StreamingSessionManager partitionedStreamingSessionManager(TableTunnel tableTunnel, MaxComputeSinkConfig maxComputeSinkConfig) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usage will look like StreamingSessionManager.parititonedStreamingSessionManager repeatative

I suggest to use createPartioned or newPartioned or newPartionedInstance


public final class StreamingSessionManager {

private final LoadingCache<String, TableTunnel.StreamUploadSession> sessionCache;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the context behind using this LoadingCache?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as there would be only one entry mostly

private static final String NON_PARTITIONED = "non-partitioned";

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docs for this, related to the reasoning of using cache + key used

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context: Session object could be reused multiple times for doing streaming insert. This session object is assigned to a specific partition by design. There are chances of insertion process happening throughout several partitions, eg: change of day, event replay, etc.

import java.util.List;
import java.util.stream.Collectors;

public interface PayloadConverter {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome 👏

Add javadoc for the interface and also write what is the expectation of canConvert and convertSingular methods.

values.add(null);
return;
}
Object mappedInnerValue = payloadConverters.stream()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one we can optimize using the map lookup later

import static com.gotocompany.depot.maxcompute.util.TypeInfoUtils.isStructArrayType;
import static com.gotocompany.depot.maxcompute.util.TypeInfoUtils.isStructType;

public class SchemaDifferenceUtils {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add javadoc, and mark as deprecated, until ali's team provide the proper table update

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we need to verify with them that will it also remove column if the new schema don't have it? or what is the expectation of it?

PROTO_TYPE_MAP.put(Descriptors.FieldDescriptor.Type.STRING, TypeInfoFactory.STRING);
PROTO_TYPE_MAP.put(Descriptors.FieldDescriptor.Type.ENUM, TypeInfoFactory.STRING);
PROTO_TYPE_MAP.put(Descriptors.FieldDescriptor.Type.DOUBLE, TypeInfoFactory.DOUBLE);
PROTO_TYPE_MAP.put(Descriptors.FieldDescriptor.Type.FLOAT, TypeInfoFactory.FLOAT);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

float -> Infinite NaN, check whether it's supported in MC or not (edited)

This is used for timestamp auto-partitioning feature where the partition column coexists with the original column.

* Example value: `column1`
* Type: `optional`
Copy link
Author

@ekawinataa ekawinataa Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add default value __partition_value + link to their docs for the auto partitioning feature


* Example value: `7`
* Type: `required`
* Default value: `1`
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to 2, consider the case of DAY trunct partitioning


@Override
public void insert(List<RecordWrapper> recordWrappers) throws TunnelException, IOException {
Map<String, List<RecordWrapper>> partitionSpecRecordWrapperMap = recordWrappers.stream()
Copy link
Author

@ekawinataa ekawinataa Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go with single iteration, outside of the iteration we can track the RecordPack using map and partitionspec as key

Comment on lines +15 to +20
private StreamingSessionManager(CacheLoader<String, TableTunnel.StreamUploadSession> cacheLoader,
MaxComputeSinkConfig maxComputeSinkConfig) {
sessionCache = CacheBuilder.newBuilder()
.maximumSize(maxComputeSinkConfig.getStreamingInsertMaximumSessionCount())
.build(cacheLoader);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instantiating this class complexity is split into two methods, this constructor and the static method. How about this, where the logic is in static method and constructor is simple.

    private StreamingSessionManager(LoadingCache<String, TableTunnel.StreamUploadSession> sessionCache) {
        this.sessionCache = sessionCache;
    }

    public static StreamingSessionManager nonParititoned(TableTunnel tableTunnel, MaxComputeSinkConfig maxComputeSinkConfig) {
        CacheLoader<String, TableTunnel.StreamUploadSession> cacheLoader = new CacheLoader<String, TableTunnel.StreamUploadSession>() {
            @Override
            public TableTunnel.StreamUploadSession load(String sessionId) throws TunnelException {
                return tableTunnel.buildStreamUploadSession(
                                maxComputeSinkConfig.getMaxComputeProjectId(),
                                maxComputeSinkConfig.getMaxComputeTableName())
                        .allowSchemaMismatch(false)
                        .build();
            }
        };
        return new StreamingSessionManager(
                CacheBuilder.newBuilder()
                        .maximumSize(maxComputeSinkConfig.getStreamingInsertMaximumSessionCount())
                        .build(cacheLoader));
    }

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nonParititoned typo is there nonPartitioned


private static final Map<Descriptors.FieldDescriptor.Type, TypeInfo> PROTO_TYPE_MAP;
static {
PROTO_TYPE_MAP = new HashMap<>();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, to make it concise and readable and addressing immutability, please do apply the below changes

import static com.google.protobuf.Descriptors.FieldDescriptor.Type.*;

public class PrimitiveTypeInfoConverter implements TypeInfoConverter {

    private static final Map<Descriptors.FieldDescriptor.Type, TypeInfo> PROTO_TYPE_MAP;

    static {
        PROTO_TYPE_MAP = ImmutableMap.<Descriptors.FieldDescriptor.Type, TypeInfo>builder()
                .put(BYTES, TypeInfoFactory.BINARY)
                .put(STRING, TypeInfoFactory.STRING)
                .put(ENUM, TypeInfoFactory.STRING)
                .put(DOUBLE, TypeInfoFactory.DOUBLE)
                .put(FLOAT, TypeInfoFactory.FLOAT)
                .put(BOOL, TypeInfoFactory.BOOLEAN)
                .put(INT64, TypeInfoFactory.BIGINT)
                .put(UINT64, TypeInfoFactory.BIGINT)
                .put(INT32, TypeInfoFactory.INT)
                .put(UINT32, TypeInfoFactory.INT)
                .put(FIXED64, TypeInfoFactory.BIGINT)
                .put(FIXED32, TypeInfoFactory.INT)
                .put(SFIXED32, TypeInfoFactory.INT)
                .put(SFIXED64, TypeInfoFactory.BIGINT)
                .put(SINT32, TypeInfoFactory.INT)
                .put(SINT64, TypeInfoFactory.BIGINT)
                .build();
    }

I think this is more easily readable.

Comment on lines +37 to +43
public TypeInfo convert(Descriptors.FieldDescriptor fieldDescriptor) {
return typeInfoCache.computeIfAbsent(fieldDescriptor.getFullName(), key -> typeInfoConverters.stream()
.filter(converter -> converter.canConvert(fieldDescriptor))
.findFirst()
.map(converter -> converter.convert(fieldDescriptor))
.orElseThrow(() -> new IllegalArgumentException("Unsupported type: " + fieldDescriptor.getType())));
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also another reason for using map is, say two implementations of converter can convert one particular fieldDescriptor, we know logically that might happen, but from code/architecture wise we're not putting that boundary.

To have exact mapping of which implementation is responsible for which fieldDescriptor, I think we need to have clear hard coded mapping rules, that will also help to identify future issues that which implementation is causing certain behavior.

maxComputeClient.insert(recordWrappers.getValidRecords());
} catch (IOException | TunnelException e) {
log.error("Error while inserting records to MaxCompute: ", e);
mapInsertionError(recordWrappers.getValidRecords(), sinkResponse, new ErrorInfo(e, ErrorType.SINK_RETRYABLE_ERROR));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 41 and 46, mapInsertionError is not saying the intent of the call clearly. How about using existing methods in sinkResponse or having a method in sinkResponse?

            if (response.hasErrors()) {
                Map<Long, ErrorInfo> errorInfoMap = BigQueryResponseParser.getErrorsFromBQResponse(records.getValidRecords(), response, bigQueryMetrics, instrumentation);
                errorInfoMap.forEach(sinkResponse::addErrors);
                errorHandler.handle(response.getInsertErrors(), records.getValidRecords());
            }

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also what is the difference between this catch block and the line 44th catch block? Do DEFAULT_ERROR will not be retried? where is this written in javadoc. I think it should be documented in the Sink interface (you can defer this documentation part to phase-2 as it is mostly enhancement of existing and not w.r.t this MR)

Copy link
Author

@ekawinataa ekawinataa Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retry configuration is on Firehose side (which errorsto retry and which are not), not on depot. As of now, we have identified that IOException might occurs from schema mismatch and TunnelException might occur from intermittent network issues, so we map that particularly to SINK-RETRYABLE_ERROR explicitly.

Other uncategorized exceptions are mapped as DEFAULT_ERROR.

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ConverterOrchestrator {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels to generic for this repo.

How about ProtoBufTypeToMaxComputeTypeConverter ?

Comment on lines +18 to +19
List<String> fieldNames = Arrays.asList(SECONDS, NANOS);
List<TypeInfo> typeInfos = Arrays.asList(TypeInfoFactory.BIGINT, TypeInfoFactory.INT);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    private static final List<String> fieldNames = Arrays.asList(SECONDS, NANOS);
    private static final List<TypeInfo> typeInfos = Arrays.asList(TypeInfoFactory.BIGINT, TypeInfoFactory.INT);

import java.util.stream.Collectors;

@RequiredArgsConstructor
public class MaxComputeSchemaHelper {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

take this out of helper sub-package.

This class is MaxComputeSchemaBuilder which has only one method buildMaxComputeSchema which can be renamed as build

Comment on lines +12 to +14
private static final Pattern VALID_TABLE_NAME_REGEX = Pattern.compile("^[A-Za-z][A-Za-z0-9_]{0,127}$");
private static final int MAX_COLUMNS_PER_TABLE = 1200;
private static final int MAX_PARTITION_KEYS_PER_TABLE = 6;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be configurable and the values you've set can be kept as default if not overriden.

private static final String BOOLEAN = "boolean";

static {
METADATA_TYPE_MAP = new HashMap<>();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as suggested in previous comments, use google commons collection method to make it concise and readable.

}
};

public abstract boolean shouldFilter(Object object);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its bit ambiguous about the intent of this class. Lets discuss this over call.

either way, shouldFilter is not defining the intent clearly also. Lets discuss.


public class SinkConfigUtils {

public static String getProtoSchemaClassName(SinkConfig sinkConfig) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in only one place, MaxComputeSinkFactory, we can move it there.


Map<String, String> settings = converter.convert(null, odpsGlobalSettings);

Assertions.assertEquals(2, settings.size());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add static imports for assertXYZ;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants