Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Max Compute Sink #52

Merged
merged 162 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 85 commits
Commits
Show all changes
162 commits
Select commit Hold shift + click to select a range
12a8fe7
feat: Add Type Info Converter Implementation
ekawinataa Oct 11, 2024
27b8a7b
feat: Add Test
ekawinataa Oct 11, 2024
512e0b6
feat: Complete test for MessageTypeInfoConverterTest
ekawinataa Oct 11, 2024
ca51108
feat: Complete test for TimestampTypeInfoConverter
ekawinataa Oct 11, 2024
468a00e
feat: Complete test for StructTypeInfoConverter.java
ekawinataa Oct 11, 2024
d0228dd
feat: Complete test for DurationTypeInfoConverter.java
ekawinataa Oct 11, 2024
14bdd12
feat: Complete test for BaseTypeInfoConverterTest.java
ekawinataa Oct 11, 2024
31357f1
feat: get implementation diff from feat branch
ekawinataa Oct 29, 2024
bf8af70
test: update ConverterOrchestratorTest
ekawinataa Oct 29, 2024
d1e0609
test: Add InsertManagerFactoryTest
ekawinataa Oct 29, 2024
f2e3d68
chore: remove unused constructor
ekawinataa Oct 29, 2024
8a228ea
test:
ekawinataa Oct 29, 2024
890b5ad
fix: PartitioningStrategyFactoryTest
ekawinataa Oct 29, 2024
0fbbeea
fix: remove unused dependency injection
ekawinataa Oct 29, 2024
2947c92
test: add non null injected decorator
ekawinataa Oct 29, 2024
ad863fc
test: add test for MaxComputeSchemaCache
ekawinataa Oct 29, 2024
157cddf
chore: remove unused lombok annotations
ekawinataa Oct 29, 2024
57985e6
test: Add MaxComputeSinkTest
ekawinataa Oct 29, 2024
1d6fbc5
test: add test for close
ekawinataa Oct 29, 2024
e989107
test: fix typeinfo
ekawinataa Oct 29, 2024
e3675bf
fix: add error handler in MaxComputeSink
ekawinataa Oct 30, 2024
a0b78ce
add ProtoUnknownFieldValidationType
ekawinataa Oct 30, 2024
f6f5985
test: Add test for ProtoUnknownFieldValidationType
ekawinataa Oct 31, 2024
724ddc4
fix: use correct converter
ekawinataa Oct 31, 2024
0c57a19
fix: rename method
ekawinataa Oct 31, 2024
032e94d
chore: rename
ekawinataa Oct 31, 2024
a364d7b
chore: exclude conflicting deps
ekawinataa Oct 31, 2024
8c05295
chore: fix checkstyle main
ekawinataa Oct 31, 2024
acf9b07
chore: fix checkstyle test
ekawinataa Oct 31, 2024
c3e29e4
chore: fix dependencies issue
ekawinataa Oct 31, 2024
a95fc80
chore: remove main
ekawinataa Oct 31, 2024
7bea42e
fix: fix test
ekawinataa Oct 31, 2024
1583952
chore: exclude model package and bump version
ekawinataa Oct 31, 2024
d10c024
feat: add max compute metrics
ekawinataa Nov 4, 2024
b195390
feat: add compression option
ekawinataa Nov 4, 2024
ea62b48
fix: fix ordering on instantiation
ekawinataa Nov 4, 2024
ed07542
feat: Add utils and implement schema update
ekawinataa Nov 5, 2024
6c7da40
feat: Add logging when executing SQL
ekawinataa Nov 5, 2024
b650de1
chore: checkstyle main
ekawinataa Nov 5, 2024
0163c03
chore: fix wrong params
ekawinataa Nov 5, 2024
d709ccf
feat: add converter for MaxComputeCompressionAlgorithmConverter
ekawinataa Nov 6, 2024
0cb4c81
chore: fix checkstyle
ekawinataa Nov 7, 2024
1ecebcb
fix: byte array conversion
ekawinataa Nov 7, 2024
92c95b8
test: fix test
ekawinataa Nov 7, 2024
4b59319
fix: remove record reordering, put metadata at the beginning
ekawinataa Nov 7, 2024
06f01b4
fix: use timestamp_ntz for proto timestamp and partition
ekawinataa Nov 11, 2024
9edfefe
fix: adjust unit test to use timestamp_ntz instead of timestamp
ekawinataa Nov 11, 2024
fc6519c
chore: remove unused lombok annotations
ekawinataa Nov 11, 2024
5476ccc
chore: exclude maxcompute client and factory from coverage
ekawinataa Nov 11, 2024
4079960
chore: exclude maxcompute client from coverage check
ekawinataa Nov 11, 2024
711ef86
chore: delete unused proto
ekawinataa Nov 11, 2024
ab7a8d0
test: add test case for log key
ekawinataa Nov 12, 2024
96608dc
chore: remove unused getters
ekawinataa Nov 12, 2024
edf208e
test: add compression option test
ekawinataa Nov 12, 2024
573ac2f
chore: remove redundant column field
ekawinataa Nov 12, 2024
2231647
fix: duration converter cast to message
ekawinataa Nov 13, 2024
07b1ab7
feat: add partition precondition on MaxComputeClient
ekawinataa Nov 13, 2024
fc1d17b
feat: add allowSchemaMismatch to false
ekawinataa Nov 13, 2024
01542c6
chore: remove unused config
ekawinataa Nov 13, 2024
285f154
chore: adjust config name
ekawinataa Nov 13, 2024
64c0518
chore: add sink and config docs
ekawinataa Nov 13, 2024
c71a175
chore: docs layout
ekawinataa Nov 13, 2024
87bb42e
chore: docs layout + adjust default value of config
ekawinataa Nov 13, 2024
b369b14
fix: revert mistakenly pushed build gradle change
ekawinataa Nov 13, 2024
a70bdac
chore: fix magic constant
ekawinataa Nov 14, 2024
7024955
fix: set null instead of default value to nonexistent field
ekawinataa Nov 14, 2024
964fa8d
chore: fix partitioning strategy
ekawinataa Nov 14, 2024
223bf29
test: complete unit test
ekawinataa Nov 15, 2024
f46e481
feat: add configurable date format for timestamp partition key
ekawinataa Nov 15, 2024
dc22ff4
chore: change default value and docs
ekawinataa Nov 15, 2024
968f480
Maxcompute instrumentation (#55)
Vaishnavi190900 Nov 15, 2024
27788f0
feat: add zone offset for timestamp
ekawinataa Nov 20, 2024
e30c931
feat: use zone offset config for metadata util
ekawinataa Nov 20, 2024
301e890
feat: Use ZoneId instead of ZoneOffset
ekawinataa Nov 20, 2024
366a363
feat: use ali auto partitioning strategy [todo fix ProtoDataColumnRec…
ekawinataa Nov 20, 2024
ceb1d30
test: fix ProtoDataColumnRecordDecoratorTest
ekawinataa Nov 20, 2024
f22d243
chore: remove unused config
ekawinataa Nov 20, 2024
25f8054
fix: set MaxComputeCache when partitioningStrategy is not null
ekawinataa Nov 20, 2024
ef243cf
chore: checkstyle
ekawinataa Nov 20, 2024
6501118
feat: add checking on sql result
ekawinataa Nov 20, 2024
22c9dd5
fix: metadata util
ekawinataa Nov 20, 2024
6282e50
chore: checkstyle
ekawinataa Nov 20, 2024
6ac1490
fix: add validation for enum based config and make it fail fast on in…
ekawinataa Nov 21, 2024
d6d1485
chore: fix exception type
ekawinataa Nov 21, 2024
7c0601d
chore: checkstyle
ekawinataa Nov 21, 2024
6d4027a
fix: class naming and add validation for enum value
ekawinataa Nov 21, 2024
9ae4c95
chore: Fix MetadataUtil to accept List of TupleString
ekawinataa Nov 21, 2024
242650b
chore: switch metadata ordering
ekawinataa Nov 21, 2024
2455b7c
- refactor schema cache to factory method
ekawinataa Nov 22, 2024
f113db3
Fix MaxComputeSchemaCache and its test
ekawinataa Nov 22, 2024
073ce16
chore: if not exists true table creation parameter
ekawinataa Nov 22, 2024
6429bab
- checkstyle SinkConfigUtils
ekawinataa Nov 22, 2024
0d3199b
- add retry utils
ekawinataa Nov 24, 2024
1cc95bd
test: Add test for RetryUtils.java
ekawinataa Nov 25, 2024
03af24e
feat: add global configs as externalized parameter
ekawinataa Nov 25, 2024
d827c4f
feat: refactor streaming management to StreamingSessionManager class
ekawinataa Nov 25, 2024
71391e6
feat: externalize partition time unit configuration
ekawinataa Nov 25, 2024
2d4e6de
fix: refactor TimestampPartitioningStrategy
ekawinataa Nov 25, 2024
53d900f
test: add case when passed object is not Record
ekawinataa Nov 25, 2024
71f0a75
chore: remove null checking since message couldn't be null
ekawinataa Nov 25, 2024
89ac42c
chore: add synchronization on schema update method
ekawinataa Nov 25, 2024
986fa0d
chore: checkstyle main
ekawinataa Nov 25, 2024
6a8a850
chore: checkstyle main
ekawinataa Nov 25, 2024
48439bc
chore: update docs
ekawinataa Nov 25, 2024
cfbeb33
chore: update schema docs
ekawinataa Nov 25, 2024
2235372
chore: reorder annotation in config
ekawinataa Nov 25, 2024
9d9f678
chore: change version to 0.10.0
ekawinataa Nov 25, 2024
3abc604
chore: bump aliyun version
ekawinataa Nov 25, 2024
f570909
chore: remove redundant enum converter class
ekawinataa Nov 26, 2024
6279e70
feat: use guava cache for streaming session
ekawinataa Nov 26, 2024
f1b7b8d
chore: use sessionCache.getUnchecked
ekawinataa Nov 26, 2024
9017f07
test: update test
ekawinataa Nov 26, 2024
4139947
chore: checkstyle
ekawinataa Nov 26, 2024
098e9c7
chore: use const instead of literal string
ekawinataa Nov 26, 2024
c125948
chore: wrap update statement with backtick
ekawinataa Nov 26, 2024
1b55805
test: add IOException use case
ekawinataa Nov 26, 2024
ca08d4c
chore: refactor RetryUtils to receive exception predicate
ekawinataa Nov 26, 2024
3bf75ea
chore: remove validateConfig()
ekawinataa Nov 26, 2024
555a91e
fix: make RecordWrapper immutable
ekawinataa Nov 28, 2024
2ac3990
chore: optimize variable declaration on MaxComputeOdpsGlobalSettingsC…
ekawinataa Nov 28, 2024
7899591
fix: remove unused TableTunnel dependencies from InsertManager.java a…
ekawinataa Nov 28, 2024
12edcff
fix: setup FlushOption on the constructor for InsertManager
ekawinataa Nov 28, 2024
f40d3a8
fix: rename static factory method of streaming session manager
ekawinataa Nov 28, 2024
d1aff87
chore: add javadoc for PayloadConverter
ekawinataa Dec 2, 2024
9b18c68
chore: refactor StreamingSessionManager to receive LoadingCache in it…
ekawinataa Dec 2, 2024
27be293
chore: Refactor PrimitiveTypeInfoConverter to use ImmutableMap
ekawinataa Dec 2, 2024
35b9622
chore: Rename ConverterOrchestrator.java to ProtobufConverterOrchestr…
ekawinataa Dec 2, 2024
6437502
chore: refactor name and type info to static var
ekawinataa Dec 2, 2024
4f06efd
chore: move MaxComputeSchemaHelper to root MaxCompute package
ekawinataa Dec 2, 2024
16f246c
chore: rename getDdlDeclaration to getDDL
ekawinataa Dec 2, 2024
6220f65
chore: remove builder on MaxComputeSchema
ekawinataa Dec 2, 2024
9cd13c2
chore: fix indent
ekawinataa Dec 2, 2024
2bd1598
refactor: MaxComputeSchemaCache to separate handling between non-null…
ekawinataa Dec 2, 2024
190b4e3
refactor: use google Sets on PartitioningStrategyFactory
ekawinataa Dec 2, 2024
b943029
refactor: make TableValidator limit configurable
ekawinataa Dec 2, 2024
783ba5c
refactor: use immutable map on MetadataUtil
ekawinataa Dec 2, 2024
2a80b72
refactor: remove SinkConfigUtils.java and its test. Transfer the impl…
ekawinataa Dec 2, 2024
6115cde
chore: add static imports for assertions lib
ekawinataa Dec 2, 2024
cb8e10c
chore: rename PayloadConverter.java to ProtobufPayloadConverter
ekawinataa Dec 2, 2024
eb0c805
feat: add validation for timestamp type
ekawinataa Dec 2, 2024
207d40e
chore: update docs and default value for valid min and max timestamp
ekawinataa Dec 2, 2024
32e8a0d
refactor: wrap payload converter to work with DTO
ekawinataa Dec 2, 2024
d277f73
chore: set default maximum session count to 2 for SINK_MAXCOMPUTE_STR…
ekawinataa Dec 2, 2024
0f80fd4
test: fix assertion
ekawinataa Dec 2, 2024
eb5385b
feat: add timestamp difference validation
ekawinataa Dec 2, 2024
5f13cf8
feat: Add NaN and Infinite check float and double
ekawinataa Dec 2, 2024
cf3c344
chore: rename upsertTable to createOrUpdateTable
ekawinataa Dec 3, 2024
cdb7202
chore: refactor inline lambda to static method
ekawinataa Dec 3, 2024
db7a871
chore: update docs
ekawinataa Dec 3, 2024
c956f2d
fix: use server side schema for building metadata
ekawinataa Dec 3, 2024
1be1ac0
chore: add docs for SINK_CONNECTOR_SCHEMA_PROTO_UNKNOWN_FIELDS_VALIDA…
ekawinataa Dec 4, 2024
73c763c
chore: rename MaxComputeSchemaHelper.java to MaxComputeSchemaBuilder
ekawinataa Dec 4, 2024
2587f9a
refactor: Add method to add uniform errors in SinkResponse and apply …
ekawinataa Dec 4, 2024
9c8c741
feat: parameterized SINK_MAXCOMPUTE_STREAMING_INSERT_TUNNEL_SLOT_COUN…
ekawinataa Dec 6, 2024
d295f10
chore: update generic.md to use more explicit context
ekawinataa Dec 6, 2024
21ec7c8
chore: update maxcompute.md to show the correct type mapping
ekawinataa Dec 6, 2024
560eb56
chore: rename object to parsedObject in ProtoPayload.java
ekawinataa Dec 6, 2024
16c6fef
chore: refactor `convert` method under ProtobufConverterOrchestrator …
ekawinataa Dec 6, 2024
fef8a25
chore: add javadoc for ProtobufTypeInfoConverter.java
ekawinataa Dec 6, 2024
91f4145
chore: checkstyle
ekawinataa Dec 6, 2024
7320f5f
chore: add static import
ekawinataa Dec 6, 2024
2888730
chore: remove extraneous space
ekawinataa Dec 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ plugins {
}

group 'com.gotocompany'
version '0.9.2'
version '0.10.1'

repositories {
mavenLocal()
mavenCentral()
}

configurations.configureEach { exclude group: 'com.google.guava', module: 'listenablefuture' }

dependencies {
implementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.25.0'
implementation group: 'com.datadoghq', name: 'java-dogstatsd-client', version: '2.13.0'
Expand All @@ -44,6 +46,10 @@ dependencies {
implementation(group: 'com.google.cloud', name: 'google-cloud-bigtable', version: '2.24.1') {
exclude group: "io.grpc"
}
implementation (group: 'com.aliyun.odps', name: 'odps-sdk-core', version: '0.51.0-public.rc0') {
exclude group: "com.google"
exclude group: "io.grpc"
}
implementation 'io.grpc:grpc-all:1.55.1'
implementation group: 'org.slf4j', name: 'jul-to-slf4j', version: '1.7.35'
implementation group: 'redis.clients', name: 'jedis', version: '3.10.0'
Expand All @@ -53,7 +59,9 @@ dependencies {
implementation group: 'com.jayway.jsonpath', name: 'json-path', version: '2.8.0'
implementation group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.1'
implementation group: 'joda-time', name: 'joda-time', version: '2.10.2'
implementation('com.google.guava:guava:32.0.1-jre') { force = true }
testImplementation group: 'junit', name: 'junit', version: '4.13.1'
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.26.3'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'org.mockito:mockito-core:4.5.1'
testImplementation 'com.github.tomakehurst:wiremock:2.16.0'
Expand Down Expand Up @@ -201,7 +209,10 @@ jacocoTestCoverageVerification {
'**/serializer/**',
'**/cortexpb/**',
'**/Clock**',
'**/GoGoProtos**',])
'**/GoGoProtos**',
'**/MaxComputeClient**',
'**/MaxComputeSinkFactory**',
''])
})
}
violationRules {
Expand Down
135 changes: 135 additions & 0 deletions docs/reference/configuration/maxcompute.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# MaxCompute Sink

A MaxCompute sink requires these configurations to be passed on alongside with generic ones

## SINK_MAXCOMPUTE_ODPS_URL

Contains the URL of the MaxCompute endpoint. Further documentation on MaxCompute [ODPS URL](https://www.alibabacloud.com/help/en/maxcompute/user-guide/endpoints).
* Example value: `http://service.ap-southeast-5.maxcompute.aliyun.com/api`
* Type: `required`


## SINK_MAXCOMPUTE_ACCESS_ID

Contains the access id of the MaxCompute project. Further documentation on MaxCompute [Access ID](https://www.alibabacloud.com/help/en/tablestore/support/obtain-an-accesskey-pair).
* Example value: `access-id`
* Type: `required`

## SINK_MAXCOMPUTE_ACCESS_KEY

Contains the access key of the MaxCompute project. Further documentation on MaxCompute [Access Key](https://www.alibabacloud.com/help/en/tablestore/support/obtain-an-accesskey-pair).
* Example value: `access-key`
* Type: `required`

## SINK_MAXCOMPUTE_PROJECT_ID

Contains the identifier of a MaxCompute project. Further documentation on MaxCompute [Project ID](https://www.alibabacloud.com/help/en/maxcompute/product-overview/project).
* Example value: `project-id`
* Type: `required`

## SINK_MAXCOMPUTE_ADD_METADATA_ENABLED

Configuration for enabling metadata in top of the record. This config will be used for adding metadata information to the record. Metadata information will be added to the record in the form of key-value pair.
* Example value: `true`
* Type: `optional`
* Default value: `true`

## SINK_MAXCOMPUTE_METADATA_NAMESPACE

Configuration for wrapping the metadata fields under a specific namespace. This will result in the metadata fields contained in a struct.
Empty string will result in the metadata fields being added directly to the root level.
* Example value: `__kafka_metadata`
* Type: `optional`

## SINK_MAXCOMPUTE_METADATA_COLUMNS_TYPES

Configuration for defining the metadata columns and their types. This config will be used for defining the metadata columns and their types. The format of this config is `column1=type1,column2=type2`.
Supported types are `string`, `integer`, `long`, `timestamp`, `float`, `double`, `boolean`.

* Example value: `topic=string,partition=integer,offset=integer,timestamp=timestamp`
* Type: `optional`

## SINK_MAXCOMPUTE_SCHEMA

Contains the schema of the MaxCompute table. Schema is a dataset grouping of table columns. Further documentation on MaxCompute [Schema](https://www.alibabacloud.com/help/en/maxcompute/user-guide/schemas).
* Example value: `column1=string,column2=integer,column3=double`
* Type: `required`
* Default value: `default`

## SINK_MAXCOMPUTE_TABLE_PARTITIONING_ENABLE

Configuration for enabling partitioning in the MaxCompute table. This config will be used for enabling partitioning in the MaxCompute table.
* Example value: `true`
* Type: `optional`
* Default value: `false`

## SINK_MAXCOMPUTE_TABLE_PARTITION_KEY

Contains the partition key of the MaxCompute table. Partition key is referring to the payload field that will be used as partition key in the MaxCompute table.
Supported MaxCompute type for partition key is `string`, `tinyint`, `smallint`, `int`, `bigint`, `timestamp_ntz`.
* Example value: `column1`
* Type: `optional`
* Default value: `default`

## SINK_MAXCOMPUTE_TABLE_PARTITION_COLUMN_NAME

Contains the partition column name of the MaxCompute table. This could be the same as the partition key or different. This will reflect the column name in the MaxCompute table.
* Example value: `column1`
* Type: `required`

## SINK_MAXCOMPUTE_TABLE_NAME

Contains the name of the MaxCompute table. Further documentation on MaxCompute [Table Name](https://www.alibabacloud.com/help/en/maxcompute/user-guide/tables).
* Example value: `table_name`
* Type: `required`

## SINK_MAXCOMPUTE_TABLE_LIFECYCLE_DAYS

Contains the lifecycle of the MaxCompute table. This config will be used for setting the lifecycle of the MaxCompute table.
Not setting this config will result in table with lifecycle. Lifecycle is applied at partition level. Further documentation on MaxCompute [Table Lifecycle](https://www.alibabacloud.com/help/en/maxcompute/product-overview/lifecycle).
* Example value: `30`
* Type: `optional`

## SINK_MAXCOMPUTE_RECORD_PACK_FLUSH_TIMEOUT_MS

Contains the timeout for flushing the record pack in milliseconds. This config will be used for setting the timeout for flushing the record pack.
* Example value: `1000`
* Type: `optional`
* Default value: `-1`

## SINK_MAXCOMPUTE_STREAMING_INSERT_COMPRESSION_ENABLED

Configuration for enabling compression in the streaming insert operation. This config will be used for enabling compression in the streaming insert operation.
* Example value: `true`
* Type: `optional`
* Default value: `false`

## SINK_MAXCOMPUTE_STREAMING_INSERT_COMPRESSION_ALGORITHM

Configuration for defining the compression algorithm in the streaming insert operation. This config will be used for defining the compression algorithm in the streaming insert operation.
Supported algorithms are ODPS_RAW, ODPS_ZLIB, ODPS_LZ4_FRAME, ODPS_ARROW_LZ4_FRAME, ODPS_ARROW_ZSTD
* Example value: `ODPS_ZLIB`
* Type: `optional`
* Default value: `ODPS_LZ4_FRAME`

## SINK_MAXCOMPUTE_STREAMING_INSERT_COMPRESSION_LEVEL

Configuration for defining the compression level in the streaming insert operation. This config will be used for defining the compression level in the streaming insert operation.
* Example value: `1`
* Type: `optional`
* Default value: `1`

## SINK_MAXCOMPUTE_STREAMING_INSERT_COMPRESSION_STRATEGY

Configuration for defining the compression strategy in the streaming insert operation. This config will be used for defining the compression strategy in the streaming insert operation.
* Example value: `1`
* Type: `optional`
* Default value: `0`

## SINK_MAXCOMPUTE_ZONE_ID

Contains ZoneID used for parsing the timestamp in the record. This config will be used for parsing the timestamp in the record.

* Example value: `Asia/Bangkok`
* Type: `optional`
* Default value: `Asia/Bangkok`
54 changes: 54 additions & 0 deletions docs/sinks/maxcompute.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# MaxCompute sink

### Datatype Protobuf

MaxCompute sink has several responsibilities, including :

1. Creation of MaxCompute table if it does not exist.
2. Updating the MaxCompute table schema based on the latest protobuf schema.
3. Translating protobuf messages into MaxCompute compatible records and inserting them into MaxCompute tables.

## MaxCompute Table Schema Update

### Protobuf

MaxCompute Sink update the MaxCompute table schema on separate table update operation. MaxCompute
utilise [Stencil](https://github.com/goto/stencil) to parse protobuf messages generate schema and update MaxCompute
tables with the latest schema.
The stencil client periodically reload the descriptor cache. Table schema update happened after the descriptor caches
uploaded.

#### Supported Protobuf - MaxCompute Table Type Mapping

| Protobuf Type | MaxCompute Type |
|------------------------------------------------------------------------------------|-----------------------------|
| bytes | BINARY |
| string | STRING |
| enum | STRING |
| float | FLOAT |
| double | DOUBLE |
| bool | BOOLEAN |
| int64, uint64, int32, uint32, fixed64, fixed32, sfixed64, sfixed32, sint64, sint32 | BIGINT |
| message | STRUCT |
| .google.protobuf.Timestamp | TIMESTAMP_NTZ |
| .google.protobuf.Struct | STRING (Json Serialised) |
| .google.protobuf.Duration | STRUCT |
| map<k,v> | ARRAY<STRUCT<key:k, value:v |

## Partitioning

MaxCompute Sink supports creation of table with partition configuration. Currently, MaxCompute Sink supports primitive field(STRING, TINYINT, SMALLINT, BIGINT)
and timestamp field based partitioning. Timestamp based partitioning strategy introduces a pseudo-partition column with the value of the timestamp field truncated to the nearest start of day.

## Clustering

MaxCompute Sink currently does not support clustering.

## Metadata

For data quality checking purposes, sometimes some metadata need to be added on the record.
if `SINK_MAXCOMPUTE_ADD_METADATA_ENABLED` is true then the metadata will be added.
`SINK_MAXCOMPUTE_METADATA_NAMESPACE` is used for another namespace to add columns
if namespace is empty, the metadata columns will be added in the root level.
`SINK_MAXCOMPUTE_METADATA_COLUMNS_TYPES` is set with kafka metadata column and their type,
An example of metadata columns that can be added for kafka records.
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.gotocompany.depot.config;

import com.aliyun.odps.tunnel.io.CompressOption;
import com.gotocompany.depot.common.TupleString;
import com.gotocompany.depot.config.converter.ConfToListConverter;
import com.gotocompany.depot.config.converter.MaxComputeCompressionAlgorithmConverter;
import com.gotocompany.depot.config.converter.ZoneIdConverter;
import org.aeonbits.owner.Config;

import java.time.ZoneId;
import java.util.List;

public interface MaxComputeSinkConfig extends Config {

@Key("SINK_MAXCOMPUTE_ODPS_URL")
String getMaxComputeOdpsUrl();

@Key("SINK_MAXCOMPUTE_TUNNEL_URL")
String getMaxComputeTunnelUrl();

@Key("SINK_MAXCOMPUTE_ACCESS_ID")
String getMaxComputeAccessId();

@Key("SINK_MAXCOMPUTE_ACCESS_KEY")
String getMaxComputeAccessKey();

@Key("SINK_MAXCOMPUTE_PROJECT_ID")
String getMaxComputeProjectId();

@Key("SINK_MAXCOMPUTE_METADATA_NAMESPACE")
@DefaultValue("")
String getMaxcomputeMetadataNamespace();

@DefaultValue("true")
@Key("SINK_MAXCOMPUTE_ADD_METADATA_ENABLED")
boolean shouldAddMetadata();

@DefaultValue("")
@Key("SINK_MAXCOMPUTE_METADATA_COLUMNS_TYPES")
@ConverterClass(ConfToListConverter.class)
@Separator(ConfToListConverter.ELEMENT_SEPARATOR)
List<TupleString> getMetadataColumnsTypes();

@Key("SINK_MAXCOMPUTE_SCHEMA")
@DefaultValue("default")
String getMaxComputeSchema();

@Key("SINK_MAXCOMPUTE_TABLE_PARTITIONING_ENABLE")
@DefaultValue("false")
Boolean isTablePartitioningEnabled();

@Key("SINK_MAXCOMPUTE_TABLE_PARTITION_KEY")
String getTablePartitionKey();

@Key("SINK_MAXCOMPUTE_TABLE_PARTITION_COLUMN_NAME")
String getTablePartitionColumnName();

@Key("SINK_MAXCOMPUTE_TABLE_NAME")
String getMaxComputeTableName();

@Key("SINK_MAXCOMPUTE_TABLE_LIFECYCLE_DAYS")
Long getMaxComputeTableLifecycleDays();

@Key("SINK_MAXCOMPUTE_RECORD_PACK_FLUSH_TIMEOUT_MS")
@DefaultValue("-1")
Long getMaxComputeRecordPackFlushTimeoutMs();

@Key("SINK_MAXCOMPUTE_STREAMING_INSERT_COMPRESSION_ENABLED")
@DefaultValue("false")
boolean isStreamingInsertCompressEnabled();

@Key("SINK_MAXCOMPUTE_STREAMING_INSERT_COMPRESSION_ALGORITHM")
@ConverterClass(MaxComputeCompressionAlgorithmConverter.class)
@DefaultValue("ODPS_LZ4_FRAME")
CompressOption.CompressAlgorithm getMaxComputeCompressionAlgorithm();

@Key("SINK_MAXCOMPUTE_STREAMING_INSERT_COMPRESSION_LEVEL")
@DefaultValue("1")
int getMaxComputeCompressionLevel();

@Key("SINK_MAXCOMPUTE_STREAMING_INSERT_COMPRESSION_STRATEGY")
@DefaultValue("0")
int getMaxComputeCompressionStrategy();

@Key("SINK_MAXCOMPUTE_ZONE_ID")
@ConverterClass(ZoneIdConverter.class)
@DefaultValue("Asia/Bangkok")
ZoneId getZoneId();

}
7 changes: 7 additions & 0 deletions src/main/java/com/gotocompany/depot/config/SinkConfig.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.gotocompany.depot.config;

import com.gotocompany.depot.config.converter.ConfToListConverter;
import com.gotocompany.depot.config.converter.ProtoUnknownFiledValidationTypeConverter;
import com.gotocompany.depot.config.converter.SchemaRegistryHeadersConverter;
import com.gotocompany.depot.config.converter.SchemaRegistryRefreshConverter;
import com.gotocompany.depot.config.converter.SinkConnectorSchemaDataTypeConverter;
import com.gotocompany.depot.config.converter.SinkConnectorSchemaMessageModeConverter;
import com.gotocompany.depot.config.enums.SinkConnectorSchemaDataType;
import com.gotocompany.depot.message.ProtoUnknownFieldValidationType;
import com.gotocompany.depot.message.SinkConnectorSchemaMessageMode;
import com.gotocompany.depot.common.TupleString;
import com.gotocompany.stencil.cache.SchemaRefreshStrategy;
Expand Down Expand Up @@ -99,4 +101,9 @@ public interface SinkConfig extends Config {
@DefaultValue("true")
boolean getSinkDefaultFieldValueEnable();

@Key("SINK_CONNECTOR_SCHEMA_PROTO_UNKNOWN_FIELDS_VALIDATION")
@DefaultValue("MESSAGE")
@ConverterClass(ProtoUnknownFiledValidationTypeConverter.class)
ProtoUnknownFieldValidationType getSinkConnectorSchemaProtoUnknownFieldsValidation();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.gotocompany.depot.config.converter;

import com.aliyun.odps.tunnel.io.CompressOption;
import com.gotocompany.depot.exception.ConfigurationException;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;
import java.util.Arrays;

public class MaxComputeCompressionAlgorithmConverter implements Converter<CompressOption.CompressAlgorithm> {

private static final String INVALID_ENUM_MESSAGE_FORMAT = "Invalid compression algorithm: %s valid values are: %s";

@Override
public CompressOption.CompressAlgorithm convert(Method method, String s) {
try {
return CompressOption.CompressAlgorithm.valueOf(s.toUpperCase());
} catch (IllegalArgumentException e) {
throw new ConfigurationException(String.format(INVALID_ENUM_MESSAGE_FORMAT, s,
Arrays.toString(CompressOption.CompressAlgorithm.values())), e);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.gotocompany.depot.config.converter;

import com.gotocompany.depot.message.ProtoUnknownFieldValidationType;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;

public class ProtoUnknownFiledValidationTypeConverter implements Converter<ProtoUnknownFieldValidationType> {

@Override
public ProtoUnknownFieldValidationType convert(Method method, String s) {
return ProtoUnknownFieldValidationType.valueOf(s);
}

}
Loading
Loading