Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[oracle][doc] Fix oracle quick start #2727

Merged
merged 2 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 87 additions & 25 deletions docs/content/quickstart/oracle-tutorial.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Demo: Oracle CDC to Elasticsearch

**1. Create `docker-compose.yml` file using following contents:**
**Create `docker-compose.yml` file using following contents:**

```
version: '2.1'
Expand Down Expand Up @@ -34,7 +34,7 @@ services:
- /var/run/docker.sock:/var/run/docker.sock
```
The Docker Compose environment consists of the following containers:
- Oracle: Oracle 11g and a pre-populated `products` and `orders` table in the database.
- Oracle: Oracle 19c database.
- Elasticsearch: store the join result of the `orders` and `products` table.
- Kibana: mainly used to visualize the data in Elasticsearch

Expand All @@ -50,14 +50,78 @@ Don’t forget to run the following command to stop all containers after you fin
docker-compose down
```

**2. Download following JAR package to `<FLINK_HOME>/lib`**
**Download following JAR package to `<FLINK_HOME>/lib`**

**Download links are available only for stable releases, SNAPSHOT dependencies need to be built based on master or release- branches by yourself.**
**Download links are available only for stable releases, SNAPSHOT dependencies need to be built based on master or release-branches by yourself.**

- [flink-sql-connector-elasticsearch7-3.0.1-1.17.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/3.0.1-1.17/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar)
- [flink-sql-connector-oracle-cdc-2.5-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.5-SNAPSHOT/flink-sql-connector-oracle-cdc-2.5-SNAPSHOT.jar)

**3. Launch a Flink cluster and start a Flink SQL CLI**

**Preparing data in Oracle database**

Introduce the tables in Oracle:
```shell
docker-compose exec oracle sqlplus debezium/dbz@localhost:1521/ORCLCDB
```
```sql
BEGIN
EXECUTE IMMEDIATE 'DROP TABLE DEBEZIUM.PRODUCTS';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -942 THEN
RAISE;
END IF;
END;
/

CREATE TABLE DEBEZIUM.PRODUCTS (
ID NUMBER(9, 0) NOT NULL,
NAME VARCHAR(255) NOT NULL,
DESCRIPTION VARCHAR(512),
WEIGHT FLOAT,
PRIMARY KEY(ID)
);

BEGIN
EXECUTE IMMEDIATE 'DROP TABLE DEBEZIUM.ORDERS';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -942 THEN
RAISE;
END IF;
END;
/

CREATE TABLE DEBEZIUM.ORDERS (
ID NUMBER(9, 0) NOT NULL,
ORDER_DATE TIMESTAMP(3) NOT NULL,
PURCHASER VARCHAR(255) NOT NULL,
QUANTITY NUMBER(9, 0) NOT NULL,
PRODUCT_ID NUMBER(9, 0) NOT NULL,
PRIMARY KEY(ID)
);

ALTER TABLE DEBEZIUM.PRODUCTS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE DEBEZIUM.ORDERS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

INSERT INTO DEBEZIUM.PRODUCTS VALUES (101, 'scooter', 'Small 2-wheel scooter', 3.14);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (102, 'car battery', '12V car battery', 8.1);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40 to #3', 0.8);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (104, 'hammer', '12oz carpenter''s hammer', 0.75);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (105, 'hammer', '14oz carpenter''s hammer', 0.875);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (106, 'hammer', '16oz carpenter''s hammer', 1.0);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (107, 'rocks', 'box of assorted rocks', 5.3);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (108, 'jacket', 'water resistent black wind breaker', 0.1);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (109, 'spare tire', '24 inch spare tire', 22.2);

INSERT INTO DEBEZIUM.ORDERS VALUES (1001, TO_TIMESTAMP('2020-07-30 10:08:22.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Jark', 1, 101);
INSERT INTO DEBEZIUM.ORDERS VALUES (1002, TO_TIMESTAMP('2020-07-30 10:11:09.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Sally', 2, 102);
INSERT INTO DEBEZIUM.ORDERS VALUES (1003, TO_TIMESTAMP('2020-07-30 12:00:30.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Edward', 2, 103);
INSERT INTO DEBEZIUM.ORDERS VALUES (1004, TO_TIMESTAMP('2020-07-30 15:22:00.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Jark', 1, 104);
```

**Launch a Flink cluster and start a Flink SQL CLI**

Execute following SQL statements in the Flink SQL CLI:

Expand All @@ -75,38 +139,36 @@ Flink SQL> CREATE TABLE products (
'connector' = 'oracle-cdc',
'hostname' = 'localhost',
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'username' = 'dbzuser',
'password' = 'dbz',
'database-name' = 'ORCLCDB',
'schema-name' = 'flinkuser',
'schema-name' = 'DEBEZIUM',
'table-name' = 'products'
);

Flink SQL> CREATE TABLE orders (
ORDER_ID INT,
ID INT,
ORDER_DATE TIMESTAMP_LTZ(3),
CUSTOMER_NAME STRING,
PRICE DECIMAL(10, 5),
PURCHASER STRING,
QUANTITY INT,
PRODUCT_ID INT,
ORDER_STATUS BOOLEAN
) WITH (
'connector' = 'oracle-cdc',
'hostname' = 'localhost',
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'username' = 'dbzuser',
'password' = 'dbz',
'database-name' = 'ORCLCDB',
'schema-name' = 'flinkuser',
'schema-name' = 'DEBEZIUM',
'table-name' = 'orders'
);

Flink SQL> CREATE TABLE enriched_orders (
ORDER_ID INT,
ORDER_DATE TIMESTAMP_LTZ(3),
CUSTOMER_NAME STRING,
PRICE DECIMAL(10, 5),
PRODUCT_ID INT,
ORDER_STATUS BOOLEAN,
PURCHASER STRING,
QUANTITY INT,
PRODUCT_NAME STRING,
PRODUCT_DESCRIPTION STRING,
PRIMARY KEY (ORDER_ID) NOT ENFORCED
Expand All @@ -117,27 +179,27 @@ Flink SQL> CREATE TABLE enriched_orders (
);

Flink SQL> INSERT INTO enriched_orders
SELECT o.*, p.NAME, p.DESCRIPTION
SELECT o.ID,o.ORDER_DATE,o.PURCHASER,o.QUANTITY, p.NAME, p.DESCRIPTION
FROM orders AS o
LEFT JOIN products AS p ON o.PRODUCT_ID = p.ID;
```

**4. Check result in Elasticsearch**
**Check result in Elasticsearch**

Check the data has been written to Elasticsearch successfully, you can visit [Kibana](http://localhost:5601/) to see the data.

**5. Make changes in Oracle and watch result in Elasticsearch**
**Make changes in Oracle and watch result in Elasticsearch**

Enter Oracle's container to make some changes in Oracle, then you can see the result in Elasticsearch will change after executing every SQL statement:

```shell
docker-compose exec sqlplus flinkuser/flinkpw
docker-compose exec oracle sqlplus debezium/dbz@localhost:1521/ORCLCDB
```

```sql
INSERT INTO flinkuser.orders VALUES (10004, to_date('2020-07-30 15:22:00', 'yyyy-mm-dd hh24:mi:ss'), 'Jark', 29.71, 104, 0);
INSERT INTO DEBEZIUM.ORDERS VALUES (1005, TO_TIMESTAMP('2020-07-30 15:22:00.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Jark', 5, 105);

UPDATE flinkuser.orders SET ORDER_STATUS = 1 WHERE ORDER_ID = 10004;
UPDATE DEBEZIUM.ORDERS SET QUANTITY = 10 WHERE ID = 1002;

DELETE FROM flinkuser.orders WHERE ORDER_ID = 10004;
DELETE FROM DEBEZIUM.ORDERS WHERE ID = 1004;
```
111 changes: 87 additions & 24 deletions docs/content/快速上手/oracle-tutorial-zh.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# 演示: Oracle CDC 导入 Elasticsearch

**1. 创建`docker-compose.yml`文件,内容如下所示:**
**创建`docker-compose.yml`文件,内容如下所示:**

```
version: '2.1'
Expand Down Expand Up @@ -34,7 +34,7 @@ services:
- /var/run/docker.sock:/var/run/docker.sock
```
该 Docker Compose 中包含的容器有:
- Oracle: Oracle 11g, 已经预先创建了 `products` 和 `orders`表,并插入了一些数据.
- Oracle: Oracle 19c 数据库
- Elasticsearch: `orders` 表将和 `products` 表进行join,join的结果写入Elasticsearch中
- Kibana: 可视化 Elasticsearch 中的数据

Expand All @@ -50,14 +50,79 @@ docker-compose up -d
docker-compose down
````

**2. 下载以下 jar 包到 `<FLINK_HOME>/lib/`:**
**下载以下 jar 包到 `<FLINK_HOME>/lib/`:**

*下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地基于 master 或 release- 分支编译*

- [flink-sql-connector-elasticsearch7-3.0.1-1.17.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/3.0.1-1.17/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar)
- [flink-sql-connector-oracle-cdc-2.5-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.5-SNAPSHOT/flink-sql-connector-oracle-cdc-2.5-SNAPSHOT.jar)

**3. 然后启动 Flink 集群,再启动 SQL CLI:**

**在 Oracle 数据库中准备数据**

创建数据库和表 `products`,`orders`,并插入数据:

```shell
docker-compose exec oracle sqlplus debezium/dbz@localhost:1521/ORCLCDB
```
```sql
BEGIN
EXECUTE IMMEDIATE 'DROP TABLE DEBEZIUM.PRODUCTS';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -942 THEN
RAISE;
END IF;
END;
/

CREATE TABLE DEBEZIUM.PRODUCTS (
ID NUMBER(9, 0) NOT NULL,
NAME VARCHAR(255) NOT NULL,
DESCRIPTION VARCHAR(512),
WEIGHT FLOAT,
PRIMARY KEY(ID)
);

BEGIN
EXECUTE IMMEDIATE 'DROP TABLE DEBEZIUM.ORDERS';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -942 THEN
RAISE;
END IF;
END;
/

CREATE TABLE DEBEZIUM.ORDERS (
ID NUMBER(9, 0) NOT NULL,
ORDER_DATE TIMESTAMP(3) NOT NULL,
PURCHASER VARCHAR(255) NOT NULL,
QUANTITY NUMBER(9, 0) NOT NULL,
PRODUCT_ID NUMBER(9, 0) NOT NULL,
PRIMARY KEY(ID)
);

ALTER TABLE DEBEZIUM.PRODUCTS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE DEBEZIUM.ORDERS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

INSERT INTO DEBEZIUM.PRODUCTS VALUES (101, 'scooter', 'Small 2-wheel scooter', 3.14);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (102, 'car battery', '12V car battery', 8.1);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40 to #3', 0.8);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (104, 'hammer', '12oz carpenter''s hammer', 0.75);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (105, 'hammer', '14oz carpenter''s hammer', 0.875);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (106, 'hammer', '16oz carpenter''s hammer', 1.0);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (107, 'rocks', 'box of assorted rocks', 5.3);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (108, 'jacket', 'water resistent black wind breaker', 0.1);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (109, 'spare tire', '24 inch spare tire', 22.2);

INSERT INTO DEBEZIUM.ORDERS VALUES (1001, TO_TIMESTAMP('2020-07-30 10:08:22.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Jark', 1, 101);
INSERT INTO DEBEZIUM.ORDERS VALUES (1002, TO_TIMESTAMP('2020-07-30 10:11:09.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Sally', 2, 102);
INSERT INTO DEBEZIUM.ORDERS VALUES (1003, TO_TIMESTAMP('2020-07-30 12:00:30.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Edward', 2, 103);
INSERT INTO DEBEZIUM.ORDERS VALUES (1004, TO_TIMESTAMP('2020-07-30 15:22:00.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Jark', 1, 104);
```

**然后启动 Flink 集群,再启动 SQL CLI:**

```sql
-- Flink SQL
Expand All @@ -73,38 +138,36 @@ Flink SQL> CREATE TABLE products (
'connector' = 'oracle-cdc',
'hostname' = 'localhost',
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'username' = 'dbzuser',
'password' = 'dbz',
'database-name' = 'ORCLCDB',
'schema-name' = 'flinkuser',
'schema-name' = 'DEBEZIUM',
'table-name' = 'products'
);

Flink SQL> CREATE TABLE orders (
ORDER_ID INT,
ID INT,
ORDER_DATE TIMESTAMP_LTZ(3),
CUSTOMER_NAME STRING,
PRICE DECIMAL(10, 5),
PURCHASER STRING,
QUANTITY INT,
PRODUCT_ID INT,
ORDER_STATUS BOOLEAN
) WITH (
'connector' = 'oracle-cdc',
'hostname' = 'localhost',
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'username' = 'dbzuser',
'password' = 'dbz',
'database-name' = 'ORCLCDB',
'schema-name' = 'flinkuser',
'schema-name' = 'DEBEZIUM',
'table-name' = 'orders'
);

Flink SQL> CREATE TABLE enriched_orders (
ORDER_ID INT,
ORDER_DATE TIMESTAMP_LTZ(3),
CUSTOMER_NAME STRING,
PRICE DECIMAL(10, 5),
PRODUCT_ID INT,
ORDER_STATUS BOOLEAN,
PURCHASER STRING,
QUANTITY INT,
PRODUCT_NAME STRING,
PRODUCT_DESCRIPTION STRING,
PRIMARY KEY (ORDER_ID) NOT ENFORCED
Expand All @@ -115,27 +178,27 @@ Flink SQL> CREATE TABLE enriched_orders (
);

Flink SQL> INSERT INTO enriched_orders
SELECT o.*, p.NAME, p.DESCRIPTION
SELECT o.ID,o.ORDER_DATE,o.PURCHASER,o.QUANTITY, p.NAME, p.DESCRIPTION
FROM orders AS o
LEFT JOIN products AS p ON o.PRODUCT_ID = p.ID;
```

**4. 检查 ElasticSearch 中的结果**
**检查 ElasticSearch 中的结果**

检查最终的结果是否写入ElasticSearch中, 可以在[Kibana](http://localhost:5601/)看到ElasticSearch中的数据

**5. 在 Oracle 制造一些变更,观察 ElasticSearch 中的结果**
**在 Oracle 制造一些变更,观察 ElasticSearch 中的结果**

进入Oracle容器中并通过如下的SQL语句对Oracle数据库进行一些修改, 然后就可以看到每执行一条SQL语句,Elasticsearch中的数据都会实时更新。

```shell
docker-compose exec sqlplus flinkuser/flinkpw
docker-compose exec oracle sqlplus debezium/dbz@localhost:1521/ORCLCDB
```

```sql
INSERT INTO flinkuser.orders VALUES (10004, to_date('2020-07-30 15:22:00', 'yyyy-mm-dd hh24:mi:ss'), 'Jark', 29.71, 104, 0);
INSERT INTO DEBEZIUM.ORDERS VALUES (1005, TO_TIMESTAMP('2020-07-30 15:22:00.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Jark', 5, 105);

UPDATE flinkuser.orders SET ORDER_STATUS = 1 WHERE ORDER_ID = 10004;
UPDATE DEBEZIUM.ORDERS SET QUANTITY = 10 WHERE ID = 1002;

DELETE FROM flinkuser.orders WHERE ORDER_ID = 10004;
DELETE FROM DEBEZIUM.ORDERS WHERE ID = 1004;
```