Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Max Compute Sink #52

Open
wants to merge 146 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
146 commits
Select commit Hold shift + click to select a range
12a8fe7
feat: Add Type Info Converter Implementation
ekawinataa Oct 11, 2024
27b8a7b
feat: Add Test
ekawinataa Oct 11, 2024
512e0b6
feat: Complete test for MessageTypeInfoConverterTest
ekawinataa Oct 11, 2024
ca51108
feat: Complete test for TimestampTypeInfoConverter
ekawinataa Oct 11, 2024
468a00e
feat: Complete test for StructTypeInfoConverter.java
ekawinataa Oct 11, 2024
d0228dd
feat: Complete test for DurationTypeInfoConverter.java
ekawinataa Oct 11, 2024
14bdd12
feat: Complete test for BaseTypeInfoConverterTest.java
ekawinataa Oct 11, 2024
31357f1
feat: get implementation diff from feat branch
ekawinataa Oct 29, 2024
bf8af70
test: update ConverterOrchestratorTest
ekawinataa Oct 29, 2024
d1e0609
test: Add InsertManagerFactoryTest
ekawinataa Oct 29, 2024
f2e3d68
chore: remove unused constructor
ekawinataa Oct 29, 2024
8a228ea
test:
ekawinataa Oct 29, 2024
890b5ad
fix: PartitioningStrategyFactoryTest
ekawinataa Oct 29, 2024
0fbbeea
fix: remove unused dependency injection
ekawinataa Oct 29, 2024
2947c92
test: add non null injected decorator
ekawinataa Oct 29, 2024
ad863fc
test: add test for MaxComputeSchemaCache
ekawinataa Oct 29, 2024
157cddf
chore: remove unused lombok annotations
ekawinataa Oct 29, 2024
57985e6
test: Add MaxComputeSinkTest
ekawinataa Oct 29, 2024
1d6fbc5
test: add test for close
ekawinataa Oct 29, 2024
e989107
test: fix typeinfo
ekawinataa Oct 29, 2024
e3675bf
fix: add error handler in MaxComputeSink
ekawinataa Oct 30, 2024
a0b78ce
add ProtoUnknownFieldValidationType
ekawinataa Oct 30, 2024
f6f5985
test: Add test for ProtoUnknownFieldValidationType
ekawinataa Oct 31, 2024
724ddc4
fix: use correct converter
ekawinataa Oct 31, 2024
0c57a19
fix: rename method
ekawinataa Oct 31, 2024
032e94d
chore: rename
ekawinataa Oct 31, 2024
a364d7b
chore: exclude conflicting deps
ekawinataa Oct 31, 2024
8c05295
chore: fix checkstyle main
ekawinataa Oct 31, 2024
acf9b07
chore: fix checkstyle test
ekawinataa Oct 31, 2024
c3e29e4
chore: fix dependencies issue
ekawinataa Oct 31, 2024
a95fc80
chore: remove main
ekawinataa Oct 31, 2024
7bea42e
fix: fix test
ekawinataa Oct 31, 2024
1583952
chore: exclude model package and bump version
ekawinataa Oct 31, 2024
d10c024
feat: add max compute metrics
ekawinataa Nov 4, 2024
b195390
feat: add compression option
ekawinataa Nov 4, 2024
ea62b48
fix: fix ordering on instantiation
ekawinataa Nov 4, 2024
ed07542
feat: Add utils and implement schema update
ekawinataa Nov 5, 2024
6c7da40
feat: Add logging when executing SQL
ekawinataa Nov 5, 2024
b650de1
chore: checkstyle main
ekawinataa Nov 5, 2024
0163c03
chore: fix wrong params
ekawinataa Nov 5, 2024
d709ccf
feat: add converter for MaxComputeCompressionAlgorithmConverter
ekawinataa Nov 6, 2024
0cb4c81
chore: fix checkstyle
ekawinataa Nov 7, 2024
1ecebcb
fix: byte array conversion
ekawinataa Nov 7, 2024
92c95b8
test: fix test
ekawinataa Nov 7, 2024
4b59319
fix: remove record reordering, put metadata at the beginning
ekawinataa Nov 7, 2024
06f01b4
fix: use timestamp_ntz for proto timestamp and partition
ekawinataa Nov 11, 2024
9edfefe
fix: adjust unit test to use timestamp_ntz instead of timestamp
ekawinataa Nov 11, 2024
fc6519c
chore: remove unused lombok annotations
ekawinataa Nov 11, 2024
5476ccc
chore: exclude maxcompute client and factory from coverage
ekawinataa Nov 11, 2024
4079960
chore: exclude maxcompute client from coverage check
ekawinataa Nov 11, 2024
711ef86
chore: delete unused proto
ekawinataa Nov 11, 2024
ab7a8d0
test: add test case for log key
ekawinataa Nov 12, 2024
96608dc
chore: remove unused getters
ekawinataa Nov 12, 2024
edf208e
test: add compression option test
ekawinataa Nov 12, 2024
573ac2f
chore: remove redundant column field
ekawinataa Nov 12, 2024
2231647
fix: duration converter cast to message
ekawinataa Nov 13, 2024
07b1ab7
feat: add partition precondition on MaxComputeClient
ekawinataa Nov 13, 2024
fc1d17b
feat: add allowSchemaMismatch to false
ekawinataa Nov 13, 2024
01542c6
chore: remove unused config
ekawinataa Nov 13, 2024
285f154
chore: adjust config name
ekawinataa Nov 13, 2024
64c0518
chore: add sink and config docs
ekawinataa Nov 13, 2024
c71a175
chore: docs layout
ekawinataa Nov 13, 2024
87bb42e
chore: docs layout + adjust default value of config
ekawinataa Nov 13, 2024
b369b14
fix: revert mistakenly pushed build gradle change
ekawinataa Nov 13, 2024
a70bdac
chore: fix magic constant
ekawinataa Nov 14, 2024
7024955
fix: set null instead of default value to nonexistent field
ekawinataa Nov 14, 2024
964fa8d
chore: fix partitioning strategy
ekawinataa Nov 14, 2024
223bf29
test: complete unit test
ekawinataa Nov 15, 2024
f46e481
feat: add configurable date format for timestamp partition key
ekawinataa Nov 15, 2024
dc22ff4
chore: change default value and docs
ekawinataa Nov 15, 2024
968f480
Maxcompute instrumentation (#55)
Vaishnavi190900 Nov 15, 2024
27788f0
feat: add zone offset for timestamp
ekawinataa Nov 20, 2024
e30c931
feat: use zone offset config for metadata util
ekawinataa Nov 20, 2024
301e890
feat: Use ZoneId instead of ZoneOffset
ekawinataa Nov 20, 2024
366a363
feat: use ali auto partitioning strategy [todo fix ProtoDataColumnRec…
ekawinataa Nov 20, 2024
ceb1d30
test: fix ProtoDataColumnRecordDecoratorTest
ekawinataa Nov 20, 2024
f22d243
chore: remove unused config
ekawinataa Nov 20, 2024
25f8054
fix: set MaxComputeCache when partitioningStrategy is not null
ekawinataa Nov 20, 2024
ef243cf
chore: checkstyle
ekawinataa Nov 20, 2024
6501118
feat: add checking on sql result
ekawinataa Nov 20, 2024
22c9dd5
fix: metadata util
ekawinataa Nov 20, 2024
6282e50
chore: checkstyle
ekawinataa Nov 20, 2024
6ac1490
fix: add validation for enum based config and make it fail fast on in…
ekawinataa Nov 21, 2024
d6d1485
chore: fix exception type
ekawinataa Nov 21, 2024
7c0601d
chore: checkstyle
ekawinataa Nov 21, 2024
6d4027a
fix: class naming and add validation for enum value
ekawinataa Nov 21, 2024
9ae4c95
chore: Fix MetadataUtil to accept List of TupleString
ekawinataa Nov 21, 2024
242650b
chore: switch metadata ordering
ekawinataa Nov 21, 2024
2455b7c
- refactor schema cache to factory method
ekawinataa Nov 22, 2024
f113db3
Fix MaxComputeSchemaCache and its test
ekawinataa Nov 22, 2024
073ce16
chore: if not exists true table creation parameter
ekawinataa Nov 22, 2024
6429bab
- checkstyle SinkConfigUtils
ekawinataa Nov 22, 2024
0d3199b
- add retry utils
ekawinataa Nov 24, 2024
1cc95bd
test: Add test for RetryUtils.java
ekawinataa Nov 25, 2024
03af24e
feat: add global configs as externalized parameter
ekawinataa Nov 25, 2024
d827c4f
feat: refactor streaming management to StreamingSessionManager class
ekawinataa Nov 25, 2024
71391e6
feat: externalize partition time unit configuration
ekawinataa Nov 25, 2024
2d4e6de
fix: refactor TimestampPartitioningStrategy
ekawinataa Nov 25, 2024
53d900f
test: add case when passed object is not Record
ekawinataa Nov 25, 2024
71f0a75
chore: remove null checking since message couldn't be null
ekawinataa Nov 25, 2024
89ac42c
chore: add synchronization on schema update method
ekawinataa Nov 25, 2024
986fa0d
chore: checkstyle main
ekawinataa Nov 25, 2024
6a8a850
chore: checkstyle main
ekawinataa Nov 25, 2024
48439bc
chore: update docs
ekawinataa Nov 25, 2024
cfbeb33
chore: update schema docs
ekawinataa Nov 25, 2024
2235372
chore: reorder annotation in config
ekawinataa Nov 25, 2024
9d9f678
chore: change version to 0.10.0
ekawinataa Nov 25, 2024
3abc604
chore: bump aliyun version
ekawinataa Nov 25, 2024
f570909
chore: remove redundant enum converter class
ekawinataa Nov 26, 2024
6279e70
feat: use guava cache for streaming session
ekawinataa Nov 26, 2024
f1b7b8d
chore: use sessionCache.getUnchecked
ekawinataa Nov 26, 2024
9017f07
test: update test
ekawinataa Nov 26, 2024
4139947
chore: checkstyle
ekawinataa Nov 26, 2024
098e9c7
chore: use const instead of literal string
ekawinataa Nov 26, 2024
c125948
chore: wrap update statement with backtick
ekawinataa Nov 26, 2024
1b55805
test: add IOException use case
ekawinataa Nov 26, 2024
ca08d4c
chore: refactor RetryUtils to receive exception predicate
ekawinataa Nov 26, 2024
3bf75ea
chore: remove validateConfig()
ekawinataa Nov 26, 2024
555a91e
fix: make RecordWrapper immutable
ekawinataa Nov 28, 2024
2ac3990
chore: optimize variable declaration on MaxComputeOdpsGlobalSettingsC…
ekawinataa Nov 28, 2024
7899591
fix: remove unused TableTunnel dependencies from InsertManager.java a…
ekawinataa Nov 28, 2024
12edcff
fix: setup FlushOption on the constructor for InsertManager
ekawinataa Nov 28, 2024
f40d3a8
fix: rename static factory method of streaming session manager
ekawinataa Nov 28, 2024
d1aff87
chore: add javadoc for PayloadConverter
ekawinataa Dec 2, 2024
9b18c68
chore: refactor StreamingSessionManager to receive LoadingCache in it…
ekawinataa Dec 2, 2024
27be293
chore: Refactor PrimitiveTypeInfoConverter to use ImmutableMap
ekawinataa Dec 2, 2024
35b9622
chore: Rename ConverterOrchestrator.java to ProtobufConverterOrchestr…
ekawinataa Dec 2, 2024
6437502
chore: refactor name and type info to static var
ekawinataa Dec 2, 2024
4f06efd
chore: move MaxComputeSchemaHelper to root MaxCompute package
ekawinataa Dec 2, 2024
16f246c
chore: rename getDdlDeclaration to getDDL
ekawinataa Dec 2, 2024
6220f65
chore: remove builder on MaxComputeSchema
ekawinataa Dec 2, 2024
9cd13c2
chore: fix indent
ekawinataa Dec 2, 2024
2bd1598
refactor: MaxComputeSchemaCache to separate handling between non-null…
ekawinataa Dec 2, 2024
190b4e3
refactor: use google Sets on PartitioningStrategyFactory
ekawinataa Dec 2, 2024
b943029
refactor: make TableValidator limit configurable
ekawinataa Dec 2, 2024
783ba5c
refactor: use immutable map on MetadataUtil
ekawinataa Dec 2, 2024
2a80b72
refactor: remove SinkConfigUtils.java and its test. Transfer the impl…
ekawinataa Dec 2, 2024
6115cde
chore: add static imports for assertions lib
ekawinataa Dec 2, 2024
cb8e10c
chore: rename PayloadConverter.java to ProtobufPayloadConverter
ekawinataa Dec 2, 2024
eb0c805
feat: add validation for timestamp type
ekawinataa Dec 2, 2024
207d40e
chore: update docs and default value for valid min and max timestamp
ekawinataa Dec 2, 2024
32e8a0d
refactor: wrap payload converter to work with DTO
ekawinataa Dec 2, 2024
d277f73
chore: set default maximum session count to 2 for SINK_MAXCOMPUTE_STR…
ekawinataa Dec 2, 2024
0f80fd4
test: fix assertion
ekawinataa Dec 2, 2024
eb5385b
feat: add timestamp difference validation
ekawinataa Dec 2, 2024
5f13cf8
feat: Add NaN and Infinite check float and double
ekawinataa Dec 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ plugins {
}

group 'com.gotocompany'
version '0.9.2'
version '0.10.0'

repositories {
mavenLocal()
mavenCentral()
}

configurations.configureEach { exclude group: 'com.google.guava', module: 'listenablefuture' }

dependencies {
implementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.25.0'
implementation group: 'com.datadoghq', name: 'java-dogstatsd-client', version: '2.13.0'
Expand All @@ -44,6 +46,10 @@ dependencies {
implementation(group: 'com.google.cloud', name: 'google-cloud-bigtable', version: '2.24.1') {
exclude group: "io.grpc"
}
implementation (group: 'com.aliyun.odps', name: 'odps-sdk-core', version: '0.51.0-public.rc1') {
exclude group: "com.google"
exclude group: "io.grpc"
}
implementation 'io.grpc:grpc-all:1.55.1'
implementation group: 'org.slf4j', name: 'jul-to-slf4j', version: '1.7.35'
implementation group: 'redis.clients', name: 'jedis', version: '3.10.0'
Expand All @@ -53,7 +59,9 @@ dependencies {
implementation group: 'com.jayway.jsonpath', name: 'json-path', version: '2.8.0'
implementation group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.1'
implementation group: 'joda-time', name: 'joda-time', version: '2.10.2'
implementation('com.google.guava:guava:32.0.1-jre') { force = true }
testImplementation group: 'junit', name: 'junit', version: '4.13.1'
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.26.3'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'org.mockito:mockito-core:4.5.1'
testImplementation 'com.github.tomakehurst:wiremock:2.16.0'
Expand Down Expand Up @@ -201,7 +209,10 @@ jacocoTestCoverageVerification {
'**/serializer/**',
'**/cortexpb/**',
'**/Clock**',
'**/GoGoProtos**',])
'**/GoGoProtos**',
'**/MaxComputeClient**',
'**/MaxComputeSinkFactory**',
''])
})
}
violationRules {
Expand Down
244 changes: 244 additions & 0 deletions docs/reference/configuration/maxcompute.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
# MaxCompute Sink

A MaxCompute sink requires these configurations to be passed on alongside with generic ones

## SINK_MAXCOMPUTE_ODPS_URL

Contains the URL of the MaxCompute endpoint. Further documentation on MaxCompute [ODPS URL](https://www.alibabacloud.com/help/en/maxcompute/user-guide/endpoints).
* Example value: `http://service.ap-southeast-5.maxcompute.aliyun.com/api`
* Type: `required`

## SINK_MAXCOMPUTE_ACCESS_ID

Contains the access id of the MaxCompute project. Further documentation on MaxCompute [Access ID](https://www.alibabacloud.com/help/en/tablestore/support/obtain-an-accesskey-pair).
* Example value: `access-id`
* Type: `required`

## SINK_MAXCOMPUTE_ACCESS_KEY

Contains the access key of the MaxCompute project. Further documentation on MaxCompute [Access Key](https://www.alibabacloud.com/help/en/tablestore/support/obtain-an-accesskey-pair).
* Example value: `access-key`
* Type: `required`

## SINK_MAXCOMPUTE_PROJECT_ID

Contains the identifier of a MaxCompute project. Further documentation on MaxCompute [Project ID](https://www.alibabacloud.com/help/en/maxcompute/product-overview/project).
* Example value: `project-id`
* Type: `required`

## SINK_MAXCOMPUTE_ADD_METADATA_ENABLED

Configuration for enabling metadata in top of the record. This config will be used for adding metadata information to the record. Metadata information will be added to the record in the form of key-value pair.
* Example value: `false`
* Type: `required`
* Default value: `true`

## SINK_MAXCOMPUTE_METADATA_NAMESPACE

Configuration for wrapping the metadata fields under a specific namespace. This will result in the metadata fields contained in a struct.
Empty string will result in the metadata fields being added directly to the root level.
* Example value: `__kafka_metadata`
* Type: `optional`

## SINK_MAXCOMPUTE_METADATA_COLUMNS_TYPES

Configuration for defining the metadata columns and their types. This config will be used for defining the metadata columns and their types. The format of this config is `column1=type1,column2=type2`.
Supported types are `string`, `integer`, `long`, `timestamp`, `float`, `double`, `boolean`.

* Example value: `topic=string,partition=integer,offset=integer,timestamp=timestamp`
* Type: `optional`

## SINK_MAXCOMPUTE_SCHEMA

Contains the schema of the MaxCompute table. Schema is a dataset grouping of table columns. Further documentation on MaxCompute [Schema](https://www.alibabacloud.com/help/en/maxcompute/user-guide/schemas).
* Example value: `your_dataset_name`
* Type: `required`
* Default value: `default`

## SINK_MAXCOMPUTE_TABLE_PARTITIONING_ENABLE

Configuration for enabling partitioning in the MaxCompute table. This config will be used for enabling partitioning in the MaxCompute table.
* Example value: `true`
* Type: `required`
* Default value: `false`

## SINK_MAXCOMPUTE_TABLE_PARTITION_KEY

Contains the partition key of the MaxCompute table. Partition key is referring to the payload field that will be used as partition key in the MaxCompute table.
Supported MaxCompute type for partition key is `string`, `tinyint`, `smallint`, `int`, `bigint`, `timestamp_ntz`.
* Example value: `column1`
* Type: `optional`
* Default value: `default`

## SINK_MAXCOMPUTE_TABLE_PARTITION_BY_TIMESTAMP_TIME_UNIT

Contains the time unit for partitioning by timestamp. This config will be used for setting the time unit for partitioning by timestamp.
Supported time units are `YEAR`, `MONTH`, `DAY`, `HOUR`. Configuration is case-sensitive.

* Example value: `DAYS`
* Type: `required`
* Default value: `DAYS`

## SINK_MAXCOMPUTE_TABLE_PARTITION_COLUMN_NAME

Contains the partition column name of the MaxCompute table. This could be the same as the partition key or different. This will reflect the column name in the MaxCompute table.
ekawinataa marked this conversation as resolved.
Show resolved Hide resolved
Here the SINK_MAXCOMPUTE_TABLE_PARTITION_COLUMN_NAME is differentiated with SINK_MAXCOMPUTE_TABLE_PARTITION_KEY to allow the user to have a different column name in the MaxCompute table.
This is used for timestamp auto-partitioning feature where the partition column coexists with the original column.

* Example value: `column1`
* Type: `optional`
ekawinataa marked this conversation as resolved.
Show resolved Hide resolved

## SINK_MAXCOMPUTE_TABLE_NAME

Contains the name of the MaxCompute table. Further documentation on MaxCompute [Table Name](https://www.alibabacloud.com/help/en/maxcompute/user-guide/tables).
* Example value: `table_name`
* Type: `required`

## SINK_MAXCOMPUTE_TABLE_LIFECYCLE_DAYS

Contains the lifecycle of the MaxCompute table. This config will be used for setting the lifecycle of the MaxCompute table.
Not setting this config will result in table with lifecycle. Lifecycle is applied at partition level. Further documentation on MaxCompute [Table Lifecycle](https://www.alibabacloud.com/help/en/maxcompute/product-overview/lifecycle).
* Example value: `30`
* Type: `optional`

## SINK_MAXCOMPUTE_RECORD_PACK_FLUSH_TIMEOUT_MS

Contains the timeout for flushing the record pack in milliseconds. This config will be used for setting the timeout for flushing the record pack. Negative value indicates no timeout.
* Example value: `1000`
* Type: `required`
* Default value: `-1`
ekawinataa marked this conversation as resolved.
Show resolved Hide resolved

## SINK_MAXCOMPUTE_STREAMING_INSERT_COMPRESSION_ENABLED

Configuration for enabling compression in the streaming insert operation. This config will be used for enabling compression in the streaming insert operation.
* Example value: `false`
* Type: `required`
* Default value: `true`

## SINK_MAXCOMPUTE_STREAMING_INSERT_COMPRESSION_ALGORITHM

Configuration for defining the compression algorithm in the streaming insert operation. This config will be used for defining the compression algorithm in the streaming insert operation.
Supported values are ODPS_RAW, ODPS_ZLIB, ODPS_LZ4_FRAME, ODPS_ARROW_LZ4_FRAME, ODPS_ARROW_ZSTD
* Example value: `ODPS_ZLIB`
* Type: `required`
* Default value: `ODPS_LZ4_FRAME`

## SINK_MAXCOMPUTE_STREAMING_INSERT_COMPRESSION_LEVEL

Configuration for defining the compression level in the streaming insert operation. This config will be used for defining the compression level in the streaming insert operation.
ekawinataa marked this conversation as resolved.
Show resolved Hide resolved
Further documentation on MaxCompute [Compression](https://www.alibabacloud.com/help/en/maxcompute/user-guide/sdk-interfaces#section-cg2-7mb-849).
* Example value: `1`
* Type: `required`
* Default value: `1`

## SINK_MAXCOMPUTE_STREAMING_INSERT_COMPRESSION_STRATEGY

Configuration for defining the compression strategy in the streaming insert operation. This config will be used for defining the compression strategy in the streaming insert operation.
ekawinataa marked this conversation as resolved.
Show resolved Hide resolved
Further documentation on MaxCompute [Compression](https://www.alibabacloud.com/help/en/maxcompute/user-guide/sdk-interfaces#section-cg2-7mb-849).

* Example value: `1`
* Type: `required`
* Default value: `0`

## SINK_MAXCOMPUTE_STREAMING_INSERT_MAXIMUM_SESSION_COUNT

Contains the maximum session cached count for the streaming insert operation. This config will be used for setting the maximum session cache capacity for the streaming insert operation.
Least recently used session will be removed if the cache is full.

* Example value: `7`
* Type: `required`
* Default value: `2`

## SINK_MAXCOMPUTE_ZONE_ID

Contains ZoneID used for parsing the timestamp in the record. This config will be used for parsing the timestamp in the record.

* Example value: `Asia/Bangkok`
* Type: `required`
* Default value: `Asia/Bangkok`

## SINK_MAXCOMPUTE_MAX_DDL_RETRY_COUNT

Contains the maximum retry count for DDL operations. This config will be used for setting the maximum retry count for DDL operations (create and update table schema).

* Example value: `3`
* Type: `required`
* Default value: `3`

## SINK_MAXCOMPUTE_DDL_RETRY_BACKOFF_MILLIS

Contains the backoff time in milliseconds for DDL operations. This config will be used for setting the backoff time in milliseconds for DDL operations (create and update table schema).

* Example value: `10000`
* Type: `required`
* Default value: `1000`

## SINK_MAXCOMPUTE_ODPS_GLOBAL_SETTINGS

Contains the global settings for the MaxCompute sink. This config will be used for setting the global settings for the MaxCompute sink. The format of this config is `key1=value1,key2=value2`.

* Example value: `odps.schema.evolution.enable=true,odps.namespace.schema=true,odps.sql.type.system.odps2=true`
* Type: `optional`
* Default value: `odps.schema.evolution.enable=true,odps.namespace.schema=true`

## SINK_MAXCOMPUTE_TABLE_VALIDATOR_NAME_REGEX

Contains the regex pattern for the table name validation. This config will be used for validating the table name. The table name should match the regex pattern.
Check the official documentation for the [table name](https://www.alibabacloud.com/help/en/maxcompute/product-overview/limits-4#:~:text=A%20table%20can%20contain%20a%20maximum%20of%2060%2C000%20partitions.&text=A%20table%20can%20contain%20a%20maximum%20of%20six%20levels%20of%20partitions.&text=A%20SELECT%20statement%20can%20return%20a%20maximum%20of%2010%2C000%20rows.&text=A%20MULTI%2DINSERT%20statement%20allows,tables%20at%20the%20same%20time.) for more information.

* Example value: `^[a-zA-Z_][a-zA-Z0-9_]*$`
* Type: `required`
* Default value: `^[A-Za-z][A-Za-z0-9_]{0,127}$`

## SINK_MAXCOMPUTE_TABLE_VALIDATOR_MAX_COLUMNS_PER_TABLE

Contains the maximum number of columns allowed in the table. This config will be used for setting the maximum number of columns allowed in the table.
Check the official documentation for the [table name](https://www.alibabacloud.com/help/en/maxcompute/product-overview/limits-4#:~:text=A%20table%20can%20contain%20a%20maximum%20of%2060%2C000%20partitions.&text=A%20table%20can%20contain%20a%20maximum%20of%20six%20levels%20of%20partitions.&text=A%20SELECT%20statement%20can%20return%20a%20maximum%20of%2010%2C000%20rows.&text=A%20MULTI%2DINSERT%20statement%20allows,tables%20at%20the%20same%20time.) for more information.

* Example value: `1000`
* Type: `required`
* Default value: `1200`

## SINK_MAXCOMPUTE_TABLE_VALIDATOR_MAX_PARTITION_KEYS_PER_TABLE

Contains the maximum number of partition keys allowed in the table. This config will be used for setting the maximum number of partition keys allowed in the table.
Check the official documentation for the [table name](https://www.alibabacloud.com/help/en/maxcompute/product-overview/limits-4#:~:text=A%20table%20can%20contain%20a%20maximum%20of%2060%2C000%20partitions.&text=A%20table%20can%20contain%20a%20maximum%20of%20six%20levels%20of%20partitions.&text=A%20SELECT%20statement%20can%20return%20a%20maximum%20of%2010%2C000%20rows.&text=A%20MULTI%2DINSERT%20statement%20allows,tables%20at%20the%20same%20time.) for more information.

* Example value: `6`
* Type: `required`
* Default value: `6`

## SINK_MAXCOMPUTE_VALID_MIN_TIMESTAMP

Contains the minimum valid timestamp. Records with timestamp field less than this value will be considered as invalid message.
Timestamp should be in the format of `yyyy-MM-ddTHH:mm:ss`.

* Example value: `0`
* Type: `required`
* Default value: `1970-01-01T00:00:00`

## SINK_MAXCOMPUTE_VALID_MAX_TIMESTAMP

Contains the maximum valid timestamp. Records with timestamp field more than this value will be considered as invalid message.
Timestamp should be in the format of `yyyy-MM-ddTHH:mm:ss`.

* Example value: `0`
* Type: `required`
* Default value: `9999-12-31T23:59:59`

## SINK_MAXCOMPUTE_MAX_PAST_EVENT_TIME_DIFFERENCE_YEAR

Contains the maximum past event time difference in years. Records with event time difference more than this value will be considered as invalid message.

* Example value: `1`
* Type: `required`
* Default value: `5`

## SINK_MAXCOMPUTE_MAX_FUTURE_EVENT_TIME_DIFFERENCE_YEAR

Contains the maximum future event time difference in years. Records with event time difference more than this value will be considered as invalid message.

* Example value: `1`
* Type: `required`
* Default value: `1`

56 changes: 56 additions & 0 deletions docs/sinks/maxcompute.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# MaxCompute sink

### Datatype Protobuf

MaxCompute sink has several responsibilities, including :

1. Creation of MaxCompute table if it does not exist.
2. Updating the MaxCompute table schema based on the latest protobuf schema.
3. Translating protobuf messages into MaxCompute compatible records and inserting them into MaxCompute tables.

## MaxCompute Table Schema Update

ekawinataa marked this conversation as resolved.
Show resolved Hide resolved
### JSON (ToDo)

### Protobuf

MaxCompute Sink update the MaxCompute table schema on separate table update operation. MaxCompute
utilise [Stencil](https://github.com/goto/stencil) to parse protobuf messages generate schema and update MaxCompute
tables with the latest schema.
The stencil client periodically reload the descriptor cache. Table schema update happened after the descriptor caches
uploaded.

#### Supported Protobuf - MaxCompute Table Type Mapping

| Protobuf Type | MaxCompute Type |
|------------------------------------------------------------------------------------|-----------------------------|
| bytes | BINARY |
| string | STRING |
| enum | STRING |
| float | FLOAT |
| double | DOUBLE |
| bool | BOOLEAN |
| int64, uint64, int32, uint32, fixed64, fixed32, sfixed64, sfixed32, sint64, sint32 | BIGINT |
| message | STRUCT |
| .google.protobuf.Timestamp | TIMESTAMP_NTZ |
| .google.protobuf.Struct | STRING (Json Serialised) |
| .google.protobuf.Duration | STRUCT |
| map<k,v> | ARRAY<STRUCT<key:k, value:v |

## Partitioning

MaxCompute Sink supports creation of table with partition configuration. Currently, MaxCompute Sink supports primitive field(STRING, TINYINT, SMALLINT, BIGINT)
and timestamp field based partitioning. Timestamp based partitioning strategy introduces a pseudo-partition column with the value of the timestamp field truncated to the nearest start of day.

## Clustering

MaxCompute Sink currently does not support clustering.

## Metadata

For data quality checking purposes, sometimes some metadata need to be added on the record.
if `SINK_MAXCOMPUTE_ADD_METADATA_ENABLED` is true then the metadata will be added.
`SINK_MAXCOMPUTE_METADATA_NAMESPACE` is used for another namespace to add columns
if namespace is empty, the metadata columns will be added in the root level.
`SINK_MAXCOMPUTE_METADATA_COLUMNS_TYPES` is set with kafka metadata column and their type,
An example of metadata columns that can be added for kafka records.
Loading