Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature]: Introduce the seamless integration of AMS Catalog Service and Flink Engine #1860

Closed
1 of 2 tasks
YesOrNo828 opened this issue Aug 21, 2023 · 11 comments
Closed
1 of 2 tasks
Labels
stale type:feature Feature Requests

Comments

@YesOrNo828
Copy link
Contributor

YesOrNo828 commented Aug 21, 2023

Description

AMS offers a Catalog service that can handle various formats such as iceberg, mixed hive, and paimon (as per #1269). The aim is to provide a speedy method of interfacing with the Flink engine without needing to create a catalog through Flink SQL DDL or Java. Ideally, I would like to find a way to seamlessly integrate with the Flink engine, thus avoiding the need to create AMS catalogs through flink SQL or Java.

Use case/motivation

One potential use case for AMS Catalog Service's support for multiple formats is for companies looking to improve the speed and efficiency of their data processing. By seamlessly integrating with the Flink engine, AMS Catalog Service allows for faster and more efficient data processing without the need for creating catalogs through flink SQL or Java. This can save companies time and resources while improving their overall data processing capabilities.

Now creating catalogs in AMS:
image
To access the AMS metadata using the Flink engine, we must create Flink Catalogs and register them into Flink's CatalogManager individually.

create catalog amoroCommerce With ('type'='arctic', 'metastore.url'='thrift://******:18050/hive_commerce');
create catalog amoroTrino With ('type'='arctic', 'metastore.url'='thrift://******:18050/hive_trino');
select * from amoroCommerce.db.table;
select * from amoroTrino.db.table;

Or through the Java language:

tableEnv.registerCatalog("hive_commerce", amoroCatalog1);
tableEnv.registerCatalog("hive_trino", amoroCatalog2);

Expected:
I want to introduce a simple way that allows the Flink engine to access all AMS Catalogs directly.

select * from amoroCommerce.db.table;
select * from amoroTrino.db.table;

Avoid users creating and registering AMS catalogs.

Describe the solution

1. Based on Flip-295 provide CatalogStoreFactory store AMS catalogs.

Using Configuration:
AMSCatalogStore will fetch and save AMS catalogs through the specific AMS thrift address.

table.catalog-store.kind: ams
table.catalog-store.ams.url: thrift://******:18050

Using Table API

public class CatalogStoreExample {
    public static void main(String[] args) throws Exception {
        // Initialize a catalog Store
        CatalogStore catalogStore = new AMSCatalogStore("thrift://******:18050");
 
        // set up the Table API
        final EnvironmentSettings settings =
                EnvironmentSettings.newInstance().inBatchMode()
                        .withCatalogStore(catalogStore)
                        .build();
 
        final TableEnvironment tableEnv = TableEnvironment.create(settings);
    }
}

Limitation: Flip-295 implements in version 1.18.

2. Provides a custom TableEnvironment with built-in AMS catalogs

public class FlinkTableEnvironmentImpl extends TableEnvironmentImpl {
    public FlinkTableEnvironmentImpl(CatalogManager catalogManager, ...) {...}

    public static TableEnvironmentImpl create(EnvironmentSettings settings) {
        // create a CatalogManager by the CatalogManagerBuilder.
        CatalogManager cm = new CatalogManagerBuilder().amsMetastoreAddress("").build();

        return new FlinkTableEnvironmentImpl(cm, ...);
    }
}

public class CatalogManagerBuilder {
    String amsMetastoreAddress;

    public CatalogManager build() {
        CatalogManager catalogManager =
                CatalogManager.newBuilder()
                        .defaultCatalog(this.defaultCatalog, buildDefaultCatalog())
                        .build();

        registerCatalogFromAMS(catalogManager);
        return catalogManager;
    }

    public CatalogManagerBuilder amsMetastoreAddress(String address) {
        this.amsMetastoreAddress = address;
        return this;
    }

    public void registerCatalogFromAMS(CatalogManager cm) {
        // fetch the catalogs from AMS by the thrift address, and register the catalogs into CatalogManager.
        cm.registerCatalog(name, catalog);
    }
}

public class Example {
    public static void main(String[] args) throws Exception {
        // set up the Table API
        final EnvironmentSettings settings =
                EnvironmentSettings.newInstance().inBatchMode().build();
 
        final TableEnvironment tableEnv = FlinkTableEnvironment.create(settings);
    }
}

The above two approaches are just a simplified description of the outline, the detailed design will be initiated again later.
Anyone who is interested can take part in the discussion.

Subtasks

No response

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@YesOrNo828 YesOrNo828 added the type:feature Feature Requests label Aug 21, 2023
@YesOrNo828 YesOrNo828 changed the title [Feature]: AMS Catalog Service Supports Multiple Formats for Seamless Flink Engine Integration [Feature]: Introduce the seamless integration of AMS Catalog Service and Flink Engine Aug 22, 2023
@dpengpeng
Copy link

This is a good idea, we are designing such a catalog management system, but we have to consider the low version of Flink (<1.18);

@YesOrNo828
Copy link
Contributor Author

@dpengpeng Thanks for your feedback. What tool do you use to submit the Flink SQL task, Flink SQL-Client, Flink SQL-Gateway, or internal submission tool?

@dpengpeng
Copy link

@YesOrNo828 We consider using flink sql client, which also supports the use of TABLE/SQL API to write Java job running tasks.

@dpengpeng
Copy link

@YesOrNo828 Do you consider external data sources such as MysqlCatalog, KafkaCatalog, etc.

@baiyangtx
Copy link
Contributor

@YesOrNo828 Do you consider external data sources such as MysqlCatalog, KafkaCatalog, etc.

We have discussed the possibility of using AMS as the stream meta-store to register Kafka stream table , which is valuable in the context of Flink SQL, but it is not currently included in the roadmap. Also, we haven't figured out a good way to support this feature in the low version (<1.18) Flink SQL Client/Gateway.

@dpengpeng
Copy link

@baiyangtx On the Alibaba Cloud Flink VVP platform, a variety of catalogs have been supported, such as KafkaCatlaog and MysqlCatalog. They can directly use Flink SQL to query table data and metadata information. We guess whether it is also possible to use a custom Catalog to manipulate external data sources.

@baiyangtx
Copy link
Contributor

baiyangtx commented Sep 5, 2023

@baiyangtx On the Alibaba Cloud Flink VVP platform, a variety of catalogs have been supported, such as KafkaCatlaog and MysqlCatalog. They can directly use Flink SQL to query table data and metadata information. We guess whether it is also possible to use a custom Catalog to manipulate external data sources.

Expanding support for more data source types and enabling smoother usage of multiple catalogs in Flink SQL is a fairly broad topic. Here, I will break it down and discuss it further.

  1. Expanding support for more formats within a single Catalog.

    Currently, for computing engines, Catalogs are essentially equivalent to data source types, but from the perspective of Catalog Services, a Catalog is an index of tables. A Catalog is not limited to a single type of table. Traditional Metastores like Hive can register tables of various types. Users are concerned with the business attributes of tables rather than storage formats.

    Currently, the community is working on a feature called UnifiedCatalog. This feature allows for multiple formats to be supported within one Catalog created on AMS. Rather than creating multiple Catalogs for each table type, the engine can use various data lake formats under a single database based on business needs. This is a planned feature in this year's roadmap, and you can track its progress through this issue.

    [Feature]: Support managing tables with multiple formats under one Catalog #1061

  2. Eliminating the need for the computing engine to repeatedly create Catalogs.

    Even with UnifiedCatalog, if multiple Catalogs are created due to business needs or federation is required between multiple Hives, the computing engine still needs to use CREATE CATALOG multiple times to point to the Catalog on AMS. We hope to provide a way for the computing engine to only configure the URI of AMS rather than the specific URI of each Catalog, so that when using Flink SQL, the user can write SQL like SELECT * FROM catalog.db.table without having to write CREATE CATALOG. This feature is called CatalogManager or CatalogStore. This issue is currently discussing this matter.

    For Flink 1.18, CatalogStore is a natively supported feature, and we can complete this process by implementing the standard Flink CatalogStore API. However, for lower versions of Flink, it is not possible to provide such a non-invasive solution. A feasible solution is to provide an Amoro version of TableEnvironment, which can be easily implemented for SQL tasks submitted by packaging into jars through Maven dependencies. However, if Flink SQL Client or Flink SQL Gateway is used directly, the corresponding Flink source code must be modified to use this feature.

  3. Extend UnifiedCatalog and CatalogStore to non-data lake tables, such as Kafka Topic Table.

    Since Amoro was originally positioned as a lake table management platform on the data lake, when designing related features at the beginning, only how to connect to lake tables was considered. However, it now appears that serving as a metadata center for stream processing is also valuable, so we are also discussing how to expand the supported table format types.

    One proposed solution is to allow InternalCatalog to register Kafka Topic Tables. As Kafka Topic Tables naturally do not have metadata services, Kafka itself can be considered a storage cluster, so kafka topics can be registered in InternalCatalog and accessed through UnifiedCatalog and CatalogStore. For other tables such as MySQL Tables, they can be registered as ExternalCatalog. This ultimately achieves the requirement of querying all types of tables required in stream processing tasks through Flink SQL.

    However, this process has not been included in the roadmap yet.

If you are interested in those features, you can participate in the development together. Step 3 of expanding data source types can begin after supporting UnifiedCatalog in Step 1, without waiting for Step 2. @dpengpeng

@YesOrNo828
Copy link
Contributor Author

YesOrNo828 commented Sep 11, 2023

@YesOrNo828 Do you consider external data sources such as MysqlCatalog, KafkaCatalog, etc.

@dpengpeng @baiyangtx Glad to see the active discussion for enhancing the seamless integration of AMS and compute engines.

I'd like to share some thoughts on this:

  1. We consider expanding the data sources, such as message queues, and relational databases. That is mentioned in here, but when would expend data source? It depends on the community.

  2. Expanding support for more formats within a single Catalog.
    How can this conflict be avoided and resolved if different catalogs have the same database or table name?
    Some methods to consider are as follows: Give the user an option to choose, keep the former db/tables, keep the latter db/tables, or delete the conflicting db/tables
    @baiyangtx @dpengpeng WDYT?

  3. It makes sense to store the table metadata of KafkaCatalog(PlusarCatalog) in AMS.

@baiyangtx
Copy link
Contributor

@YesOrNo828 Do you consider external data sources such as MysqlCatalog, KafkaCatalog, etc.

@dpengpeng @baiyangtx Glad to see the active discussion for enhancing the seamless integration of AMS and compute engines.

I'd like to share some thoughts on this:

1. We consider expanding the data sources, such as message queues, and relational databases. That is mentioned in[ here](https://amoro.netease.com/docs/latest/catalogs/#future-work), but when would expend data source? It depends on the community.

2. Expanding support for more formats within a single Catalog.
   How can this conflict be avoided and resolved if different catalogs have the same database or table name?
   Some methods to consider are as follows: Give the user an option to choose, keep the former db/tables, keep the latter db/tables, or delete the conflicting db/tables
   @baiyangtx @dpengpeng  WDYT?

3. It makes sense to store the table metadata of KafkaCatalog(PlusarCatalog) in AMS.

How can this conflict be avoided and resolved if different catalogs have the same database or table name?

There won't be such conflict issue. The Catalog itself will solve this problem, which is also the core capability of the Catalog.

Copy link

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

@github-actions github-actions bot added the stale label Aug 21, 2024
Copy link

github-actions bot commented Sep 4, 2024

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Sep 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stale type:feature Feature Requests
Projects
None yet
Development

No branches or pull requests

3 participants