Skip to content

Commit

Permalink
[docs][oracle] Fix oracle cdc quick start (apache#2727)
Browse files Browse the repository at this point in the history
This closes apache#2727.

(cherry picked from commit 9aed54d)
  • Loading branch information
GOODBOY008 committed Nov 25, 2023
1 parent 03f7847 commit b6d8cf8
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 54 deletions.
118 changes: 90 additions & 28 deletions docs/content/quickstart/oracle-tutorial.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Demo: Oracle CDC to Elasticsearch

**1. Create `docker-compose.yml` file using following contents:**
**Create `docker-compose.yml` file using following contents:**

```
version: '2.1'
Expand Down Expand Up @@ -34,7 +34,7 @@ services:
- /var/run/docker.sock:/var/run/docker.sock
```
The Docker Compose environment consists of the following containers:
- Oracle: Oracle 11g and a pre-populated `products` and `orders` table in the database.
- Oracle: Oracle 19c database.
- Elasticsearch: store the join result of the `orders` and `products` table.
- Kibana: mainly used to visualize the data in Elasticsearch

Expand All @@ -50,14 +50,78 @@ Don’t forget to run the following command to stop all containers after you fin
docker-compose down
```

**2. Download following JAR package to `<FLINK_HOME>/lib`**
**Download following JAR package to `<FLINK_HOME>/lib`**

*Download links are available only for stable releases, SNAPSHOT dependency need build by yourself. *
**Download links are available only for stable releases, SNAPSHOT dependencies need to be built based on master or release-branches by yourself.**

- [flink-sql-connector-elasticsearch7-3.0.1-1.17.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/3.0.1-1.17/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar)
- [flink-sql-connector-oracle-cdc-2.4.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.4.0/flink-sql-connector-oracle-cdc-2.4.0.jar)
- [flink-sql-connector-oracle-cdc-2.5-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.5-SNAPSHOT/flink-sql-connector-oracle-cdc-2.5-SNAPSHOT.jar)

**3. Launch a Flink cluster and start a Flink SQL CLI**

**Preparing data in Oracle database**

Introduce the tables in Oracle:
```shell
docker-compose exec oracle sqlplus debezium/dbz@localhost:1521/ORCLCDB
```
```sql
BEGIN
EXECUTE IMMEDIATE 'DROP TABLE DEBEZIUM.PRODUCTS';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -942 THEN
RAISE;
END IF;
END;
/

CREATE TABLE DEBEZIUM.PRODUCTS (
ID NUMBER(9, 0) NOT NULL,
NAME VARCHAR(255) NOT NULL,
DESCRIPTION VARCHAR(512),
WEIGHT FLOAT,
PRIMARY KEY(ID)
);

BEGIN
EXECUTE IMMEDIATE 'DROP TABLE DEBEZIUM.ORDERS';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -942 THEN
RAISE;
END IF;
END;
/

CREATE TABLE DEBEZIUM.ORDERS (
ID NUMBER(9, 0) NOT NULL,
ORDER_DATE TIMESTAMP(3) NOT NULL,
PURCHASER VARCHAR(255) NOT NULL,
QUANTITY NUMBER(9, 0) NOT NULL,
PRODUCT_ID NUMBER(9, 0) NOT NULL,
PRIMARY KEY(ID)
);

ALTER TABLE DEBEZIUM.PRODUCTS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE DEBEZIUM.ORDERS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

INSERT INTO DEBEZIUM.PRODUCTS VALUES (101, 'scooter', 'Small 2-wheel scooter', 3.14);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (102, 'car battery', '12V car battery', 8.1);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40 to #3', 0.8);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (104, 'hammer', '12oz carpenter''s hammer', 0.75);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (105, 'hammer', '14oz carpenter''s hammer', 0.875);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (106, 'hammer', '16oz carpenter''s hammer', 1.0);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (107, 'rocks', 'box of assorted rocks', 5.3);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (108, 'jacket', 'water resistent black wind breaker', 0.1);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (109, 'spare tire', '24 inch spare tire', 22.2);

INSERT INTO DEBEZIUM.ORDERS VALUES (1001, TO_TIMESTAMP('2020-07-30 10:08:22.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Jark', 1, 101);
INSERT INTO DEBEZIUM.ORDERS VALUES (1002, TO_TIMESTAMP('2020-07-30 10:11:09.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Sally', 2, 102);
INSERT INTO DEBEZIUM.ORDERS VALUES (1003, TO_TIMESTAMP('2020-07-30 12:00:30.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Edward', 2, 103);
INSERT INTO DEBEZIUM.ORDERS VALUES (1004, TO_TIMESTAMP('2020-07-30 15:22:00.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Jark', 1, 104);
```

**Launch a Flink cluster and start a Flink SQL CLI**

Execute following SQL statements in the Flink SQL CLI:

Expand All @@ -75,38 +139,36 @@ Flink SQL> CREATE TABLE products (
'connector' = 'oracle-cdc',
'hostname' = 'localhost',
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'username' = 'dbzuser',
'password' = 'dbz',
'database-name' = 'ORCLCDB',
'schema-name' = 'flinkuser',
'schema-name' = 'DEBEZIUM',
'table-name' = 'products'
);

Flink SQL> CREATE TABLE orders (
ORDER_ID INT,
ORDER_DATE TIMESTAMP_LTZ(3),
CUSTOMER_NAME STRING,
PRICE DECIMAL(10, 5),
ID INT,
ORDER_DATE TIMESTAMP(3),
PURCHASER STRING,
QUANTITY INT,
PRODUCT_ID INT,
ORDER_STATUS BOOLEAN
) WITH (
'connector' = 'oracle-cdc',
'hostname' = 'localhost',
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'username' = 'dbzuser',
'password' = 'dbz',
'database-name' = 'ORCLCDB',
'schema-name' = 'flinkuser',
'schema-name' = 'DEBEZIUM',
'table-name' = 'orders'
);

Flink SQL> CREATE TABLE enriched_orders (
ORDER_ID INT,
ORDER_DATE TIMESTAMP_LTZ(3),
CUSTOMER_NAME STRING,
PRICE DECIMAL(10, 5),
PRODUCT_ID INT,
ORDER_STATUS BOOLEAN,
ORDER_DATE TIMESTAMP(3),
PURCHASER STRING,
QUANTITY INT,
PRODUCT_NAME STRING,
PRODUCT_DESCRIPTION STRING,
PRIMARY KEY (ORDER_ID) NOT ENFORCED
Expand All @@ -117,27 +179,27 @@ Flink SQL> CREATE TABLE enriched_orders (
);

Flink SQL> INSERT INTO enriched_orders
SELECT o.*, p.NAME, p.DESCRIPTION
SELECT o.ID,o.ORDER_DATE,o.PURCHASER,o.QUANTITY, p.NAME, p.DESCRIPTION
FROM orders AS o
LEFT JOIN products AS p ON o.PRODUCT_ID = p.ID;
```

**4. Check result in Elasticsearch**
**Check result in Elasticsearch**

Check the data has been written to Elasticsearch successfully, you can visit [Kibana](http://localhost:5601/) to see the data.

**5. Make changes in Oracle and watch result in Elasticsearch**
**Make changes in Oracle and watch result in Elasticsearch**

Enter Oracle's container to make some changes in Oracle, then you can see the result in Elasticsearch will change after executing every SQL statement:

```shell
docker-compose exec sqlplus flinkuser/flinkpw
docker-compose exec oracle sqlplus debezium/dbz@localhost:1521/ORCLCDB
```

```sql
INSERT INTO flinkuser.orders VALUES (10004, to_date('2020-07-30 15:22:00', 'yyyy-mm-dd hh24:mi:ss'), 'Jark', 29.71, 104, 0);
INSERT INTO DEBEZIUM.ORDERS VALUES (1005, TO_TIMESTAMP('2020-07-30 15:22:00.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Jark', 5, 105);

UPDATE flinkuser.orders SET ORDER_STATUS = 1 WHERE ORDER_ID = 10004;
UPDATE DEBEZIUM.ORDERS SET QUANTITY = 10 WHERE ID = 1002;

DELETE FROM flinkuser.orders WHERE ORDER_ID = 10004;
DELETE FROM DEBEZIUM.ORDERS WHERE ID = 1004;
```
115 changes: 89 additions & 26 deletions docs/content/快速上手/oracle-tutorial-zh.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# 演示: Oracle CDC 导入 Elasticsearch

**1. 创建`docker-compose.yml`文件,内容如下所示:**
**创建`docker-compose.yml`文件,内容如下所示:**

```
version: '2.1'
Expand Down Expand Up @@ -34,7 +34,7 @@ services:
- /var/run/docker.sock:/var/run/docker.sock
```
该 Docker Compose 中包含的容器有:
- Oracle: Oracle 11g, 已经预先创建了 `products``orders`表,并插入了一些数据.
- Oracle: Oracle 19c 数据库
- Elasticsearch: `orders` 表将和 `products` 表进行join,join的结果写入Elasticsearch中
- Kibana: 可视化 Elasticsearch 中的数据

Expand All @@ -50,14 +50,79 @@ docker-compose up -d
docker-compose down
````

**2. 下载以下 jar 包到 `<FLINK_HOME>/lib/`:**
**下载以下 jar 包到 `<FLINK_HOME>/lib/`:**

*下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译*

- [flink-sql-connector-elasticsearch7-3.0.1-1.17.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/3.0.1-1.17/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar)
- [flink-sql-connector-oracle-cdc-2.4.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.4.0/flink-sql-connector-oracle-cdc-2.4.0.jar)

**3. 然后启动 Flink 集群,再启动 SQL CLI:**

**在 Oracle 数据库中准备数据**

创建数据库和表 `products``orders`,并插入数据:

```shell
docker-compose exec oracle sqlplus debezium/dbz@localhost:1521/ORCLCDB
```
```sql
BEGIN
EXECUTE IMMEDIATE 'DROP TABLE DEBEZIUM.PRODUCTS';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -942 THEN
RAISE;
END IF;
END;
/
CREATE TABLE DEBEZIUM.PRODUCTS (
ID NUMBER(9, 0) NOT NULL,
NAME VARCHAR(255) NOT NULL,
DESCRIPTION VARCHAR(512),
WEIGHT FLOAT,
PRIMARY KEY(ID)
);
BEGIN
EXECUTE IMMEDIATE 'DROP TABLE DEBEZIUM.ORDERS';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -942 THEN
RAISE;
END IF;
END;
/
CREATE TABLE DEBEZIUM.ORDERS (
ID NUMBER(9, 0) NOT NULL,
ORDER_DATE TIMESTAMP(3) NOT NULL,
PURCHASER VARCHAR(255) NOT NULL,
QUANTITY NUMBER(9, 0) NOT NULL,
PRODUCT_ID NUMBER(9, 0) NOT NULL,
PRIMARY KEY(ID)
);
ALTER TABLE DEBEZIUM.PRODUCTS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE DEBEZIUM.ORDERS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
INSERT INTO DEBEZIUM.PRODUCTS VALUES (101, 'scooter', 'Small 2-wheel scooter', 3.14);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (102, 'car battery', '12V car battery', 8.1);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40 to #3', 0.8);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (104, 'hammer', '12oz carpenter''s hammer', 0.75);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (105, 'hammer', '14oz carpenter''s hammer', 0.875);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (106, 'hammer', '16oz carpenter''s hammer', 1.0);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (107, 'rocks', 'box of assorted rocks', 5.3);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (108, 'jacket', 'water resistent black wind breaker', 0.1);
INSERT INTO DEBEZIUM.PRODUCTS VALUES (109, 'spare tire', '24 inch spare tire', 22.2);
INSERT INTO DEBEZIUM.ORDERS VALUES (1001, TO_TIMESTAMP('2020-07-30 10:08:22.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Jark', 1, 101);
INSERT INTO DEBEZIUM.ORDERS VALUES (1002, TO_TIMESTAMP('2020-07-30 10:11:09.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Sally', 2, 102);
INSERT INTO DEBEZIUM.ORDERS VALUES (1003, TO_TIMESTAMP('2020-07-30 12:00:30.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Edward', 2, 103);
INSERT INTO DEBEZIUM.ORDERS VALUES (1004, TO_TIMESTAMP('2020-07-30 15:22:00.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Jark', 1, 104);
```

**然后启动 Flink 集群,再启动 SQL CLI:**

```sql
-- Flink SQL
Expand All @@ -73,38 +138,36 @@ Flink SQL> CREATE TABLE products (
'connector' = 'oracle-cdc',
'hostname' = 'localhost',
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'username' = 'dbzuser',
'password' = 'dbz',
'database-name' = 'ORCLCDB',
'schema-name' = 'flinkuser',
'schema-name' = 'DEBEZIUM',
'table-name' = 'products'
);
Flink SQL> CREATE TABLE orders (
ORDER_ID INT,
ORDER_DATE TIMESTAMP_LTZ(3),
CUSTOMER_NAME STRING,
PRICE DECIMAL(10, 5),
ID INT,
ORDER_DATE TIMESTAMP(3),
PURCHASER STRING,
QUANTITY INT,
PRODUCT_ID INT,
ORDER_STATUS BOOLEAN
) WITH (
'connector' = 'oracle-cdc',
'hostname' = 'localhost',
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'username' = 'dbzuser',
'password' = 'dbz',
'database-name' = 'ORCLCDB',
'schema-name' = 'flinkuser',
'schema-name' = 'DEBEZIUM',
'table-name' = 'orders'
);
Flink SQL> CREATE TABLE enriched_orders (
ORDER_ID INT,
ORDER_DATE TIMESTAMP_LTZ(3),
CUSTOMER_NAME STRING,
PRICE DECIMAL(10, 5),
PRODUCT_ID INT,
ORDER_STATUS BOOLEAN,
ORDER_DATE TIMESTAMP(3),
PURCHASER STRING,
QUANTITY INT,
PRODUCT_NAME STRING,
PRODUCT_DESCRIPTION STRING,
PRIMARY KEY (ORDER_ID) NOT ENFORCED
Expand All @@ -115,27 +178,27 @@ Flink SQL> CREATE TABLE enriched_orders (
);
Flink SQL> INSERT INTO enriched_orders
SELECT o.*, p.NAME, p.DESCRIPTION
SELECT o.ID,o.ORDER_DATE,o.PURCHASER,o.QUANTITY, p.NAME, p.DESCRIPTION
FROM orders AS o
LEFT JOIN products AS p ON o.PRODUCT_ID = p.ID;
```

**4. 检查 ElasticSearch 中的结果**
**检查 ElasticSearch 中的结果**

检查最终的结果是否写入ElasticSearch中, 可以在[Kibana](http://localhost:5601/)看到ElasticSearch中的数据

**5. 在 Oracle 制造一些变更,观察 ElasticSearch 中的结果**
**在 Oracle 制造一些变更,观察 ElasticSearch 中的结果**

进入Oracle容器中并通过如下的SQL语句对Oracle数据库进行一些修改, 然后就可以看到每执行一条SQL语句,Elasticsearch中的数据都会实时更新。

```shell
docker-compose exec sqlplus flinkuser/flinkpw
docker-compose exec oracle sqlplus debezium/dbz@localhost:1521/ORCLCDB
```

```sql
INSERT INTO flinkuser.orders VALUES (10004, to_date('2020-07-30 15:22:00', 'yyyy-mm-dd hh24:mi:ss'), 'Jark', 29.71, 104, 0);
INSERT INTO DEBEZIUM.ORDERS VALUES (1005, TO_TIMESTAMP('2020-07-30 15:22:00.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Jark', 5, 105);
UPDATE flinkuser.orders SET ORDER_STATUS = 1 WHERE ORDER_ID = 10004;
UPDATE DEBEZIUM.ORDERS SET QUANTITY = 10 WHERE ID = 1002;
DELETE FROM flinkuser.orders WHERE ORDER_ID = 10004;
DELETE FROM DEBEZIUM.ORDERS WHERE ID = 1004;
```

0 comments on commit b6d8cf8

Please sign in to comment.