diff --git a/.github/workflows/build_artifact.yaml b/.github/workflows/build_artifact.yaml new file mode 100644 index 0000000000..d39359dfbe --- /dev/null +++ b/.github/workflows/build_artifact.yaml @@ -0,0 +1,276 @@ +### +# This workflow is used for quickly building artifacts +# Triggers: +# 1. Manual trigger +# Jobs: +# 1. Calculate Version Number +# 2. Build Web Artifact (include front resource and only x86_64 platform for now) +# 3. Build Client Artifact +### + +name: Build Artifact +run-name: Build Artifact triggered by ${{ github.actor }} 🛠️ + +on: + workflow_dispatch: + inputs: + build_jar: + description: "Build jar package" + required: true + type: boolean + default: false + build_rpm: + description: "Build rpm package" + required: true + type: boolean + default: false + build_docker: + description: "Build docker image" + required: true + type: boolean + default: false + build_client: + description: "Build client" + required: true + type: boolean + default: false + rpm_release: + description: "Rpm release number" + required: false + default: '' + type: string + image_tag: + description: "Docker image tag" + required: false + default: '' + type: string + +env: + ODC_CURRENT_BRANCH: ${{ github.ref_name }} + +jobs: + calculate-version: + name: Calculate Version Number + runs-on: ubuntu-latest + outputs: + odc_rpm_release_number: ${{ steps.calculate_version.outputs.odc_rpm_release_number }} + odc_docker_image_tag: ${{ steps.calculate_version.outputs.odc_docker_image_tag }} + steps: + - name: Checkout workspace + uses: actions/checkout@v3 + - name: Calculate version number + id: calculate_version + run: | + odc_rpm_release_number=$(date +%Y%m%d) + if [[ -n "${{ inputs.rpm_release }}" ]]; then odc_rpm_release_number="${{ inputs.rpm_release }}"; fi + echo "odc_rpm_release_number=${odc_rpm_release_number}" >> $GITHUB_OUTPUT + echo "odc_rpm_release_number=${odc_rpm_release_number}" + branch_match_regex="^(((dev/)?[0-9]\\.[0-9]\\.([0-9]{1,2}|x))|(release\\S*))$" + tag_prefix=`[[ "${{ env.ODC_CURRENT_BRANCH }}" =~ ${branch_match_regex} ]] && echo "test" || echo "dev"` + odc_docker_image_tag="${tag_prefix}-$(cat distribution/odc-server-VER.txt)-${odc_rpm_release_number}" + if [[ -n "${{ inputs.image_tag }}" ]]; then odc_docker_image_tag="${{ inputs.image_tag }}"; fi + echo "odc_docker_image_tag=${odc_docker_image_tag}" >> $GITHUB_OUTPUT + echo "odc_docker_image_tag=${odc_docker_image_tag}" + + build-web-x86_64: + name: Build Web Artifact (x86_64) + needs: [ calculate-version ] + runs-on: ubuntu-latest + env: + odc_rpm_release_number: ${{ needs.calculate-version.outputs.odc_rpm_release_number }} + odc_docker_image_tag: ${{ needs.calculate-version.outputs.odc_docker_image_tag }} + steps: + - name: Checkout workspace + uses: actions/checkout@v3 + with: + submodules: "recursive" + - name: Setup JDK 8 + uses: actions/setup-java@v3 + with: + java-version: "8" + distribution: "temurin" + cache: maven + - name: Setup node 16 + uses: actions/setup-node@v3 + with: + node-version: "16" + - name: Build front static resources + if: ${{ github.event.inputs.build_jar == 'true' || github.event.inputs.build_rpm == 'true' || github.event.inputs.build_docker == 'true'}} + run: | + echo "Current directory: "`pwd` + echo "Start build front static resources" + pushd client + echo "Run npm install pnpm -g" + npm install pnpm -g + echo "Run pnpm install" + pnpm install + echo "Run npm run build:odc" + npm run build:odc + popd + echo "Build front static resources success" + echo "Start copy resources files" + static_resources_path="server/odc-server/src/main/resources/static" + if [ ! -d "${static_resources_path}" ]; then + echo "mkdir -p ${static_resources_path}" + mkdir -p "${static_resources_path}" + fi + rm --force --recursive --verbose ${static_resources_path}/* + cp --force --recursive --verbose client/dist/renderer/* ${static_resources_path} + echo "Copy resources files success" + - name: Set release version + id: set_release_version + run: | + main_version="$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout | cut -d - -f 1)" + new_version="${main_version}-${odc_rpm_release_number}" + echo "new_version=${new_version}" >> $GITHUB_OUTPUT + echo "RPM's version is "${new_version} + mvn versions:set -DnewVersion="${new_version}" + mvn versions:commit + - name: Build jar + run: | + echo "Start build jar packages" + echo "Start prepare oceanbase-client" + pushd import + echo "Current dir is "`pwd` + rm -rf obclient_aarch.tar.gz + rm -rf obclient_win.tar.gz + mv obclient_x86.tar.gz obclient.tar.gz + popd + echo "Prepare oceanbase-client success" + echo "Start build rpm package" + mvn help:system + mvn clean install -Dmaven.test.skip=true + echo "Build jar packages success" + cp --verbose server/odc-server/target/*-executable.jar distribution/jar/odc.jar + cp -fv distribution/jar/odc.jar distribution/jar/odc-slim.jar + zip -d distribution/jar/odc-slim.jar "BOOT-INF/classes/static/*" + - name: Upload jar + if: ${{ github.event.inputs.build_jar == 'true' || github.event.inputs.build_client == 'true'}} + uses: actions/upload-artifact@v3 + with: + name: odc-artifact-jar + path: | + distribution/plugins/*.jar + distribution/starters/*.jar + distribution/jar/*.jar + - name: Build rpm (x86_64) + if: ${{ github.event.inputs.build_rpm == 'true' || github.event.inputs.build_docker == 'true' }} + run: | + echo "Start build rpm package" + mvn --file server/odc-server/pom.xml rpm:rpm -Drpm.prefix=/opt + echo "Build rpm package success" + rm --force --recursive --verbose distribution/docker/resources/odc-*.rpm + mkdir -p distribution/docker/resources/ + mv --verbose server/odc-server/target/rpm/odc-server/RPMS/*/odc-*.rpm distribution/docker/resources/ + - name: Upload rpm (x86_64) + if: ${{ github.event.inputs.build_rpm == 'true' }} + uses: actions/upload-artifact@v3 + with: + name: odc-server-${{ steps.set_release_version.outputs.new_version }}.x86_64.rpm + path: distribution/docker/resources/odc-*.rpm + - name: Build docker image (x86_64) + if: ${{ github.event.inputs.build_docker == 'true' }} + run: | + sed -e "s/DATE_CHANGE/$(date)/" -i distribution/docker/odc/Dockerfile + echo "odc_docker_image_tag=${odc_docker_image_tag}" + pushd distribution/docker + docker build -t docker.io/oceanbase/odc-server:${odc_docker_image_tag} -f odc/Dockerfile . + docker save -o resources/odc-server-${odc_docker_image_tag}.tar.gz docker.io/oceanbase/odc-server:${odc_docker_image_tag} + popd + - name: Upload docker image (x86_64) + if: ${{ github.event.inputs.build_docker == 'true' }} + uses: actions/upload-artifact@v3 + with: + name: odc-server-${{ env.odc_docker_image_tag }}.tar.gz + path: distribution/docker/resources/odc-server-*.tar.gz + + build-client: + name: Build Client Artifact + needs: [ build-web-x86_64 ] + if: ${{ github.event.inputs.build_client == 'true' }} + runs-on: macos-latest + strategy: + fail-fast: false + matrix: + target: [ win, mac, linux_x86, linux_aarch64 ] + steps: + - name: Checkout workspace + uses: actions/checkout@v3 + with: + submodules: "recursive" + # Build jar failed when run on macos-latest, so we need to build and upload jar on ubuntu-latest in the above job + - name: Download resources + uses: actions/download-artifact@v3 + with: + name: odc-artifact-jar + path: jar-dist + - name: Change directory + run: | + mkdir -p client/libraries/java + cp jar-dist/jar/odc-slim.jar client/libraries/java/odc.jar + mkdir -p client/libraries/java/plugins + cp -R jar-dist/plugins/. client/libraries/java/plugins + mkdir -p client/libraries/java/starters + cp -R jar-dist/starters/. client/libraries/java/starters + - name: Setup node 16 + uses: actions/setup-node@v3 + with: + node-version: "16" + - name: Install dependencies + uses: pnpm/action-setup@v2 + with: + version: 8 + run_install: false + - name: Get pnpm store directory + shell: bash + run: | + echo "STORE_PATH=$(pnpm store path --silent)" >> $GITHUB_ENV + - uses: actions/cache@v3 + name: Setup pnpm cache + with: + path: ${{ env.STORE_PATH }} + key: ${{ runner.os }}-pnpm-store-${{ hashFiles('**/pnpm-lock.yaml') }} + restore-keys: | + ${{ runner.os }}-pnpm-store- + - name: Build artifact + env: + # MACOS_CERTIFICATE: ${{ secrets.PROD_MACOS_CERTIFICATE }} + # MACOS_CERTIFICATE_PWD: ${{ secrets.PROD_MACOS_CERTIFICATE_PWD }} + # MACOS_CERTIFICATE_NAME: ${{ secrets.PROD_MACOS_CERTIFICATE_NAME }} + # MACOS_CI_KEYCHAIN_PWD: ${{ secrets.PROD_MACOS_CI_KEYCHAIN_PWD }} + # APPLE_ID: ${{ secrets.PROD_MACOS_NOTARIZATION_APPLE_ID }} + # APPLE_TEAM_ID: ${{ secrets.PROD_MACOS_NOTARIZATION_TEAM_ID }} + # APPLE_ID_PASSWORD: ${{ secrets.PROD_MACOS_NOTARIZATION_PWD }} + # APPLE_APP_SPECIFIC_PASSWORD: ${{ secrets.PROD_MACOS_NOTARIZATION_PWD }} + WIN_CSC_LINK: ${{ secrets.PROD_WIN_CSC_LINK }} + WIN_CSC_KEY_PASSWORD: ${{ secrets.PROD_WIN_CSC_KEY_PASSWORD }} + run: | + # Turn our base64-encoded certificate back to a regular .p12 file + cd client + # echo $MACOS_CERTIFICATE | base64 --decode > certificate.p12 + # We need to create a new keychain, otherwise using the certificate will prompt + # with a UI dialog asking for the certificate password, which we can't + # use in a headless CI environment + # security create-keychain -p "$MACOS_CI_KEYCHAIN_PWD" build.keychain + # security default-keychain -s build.keychain + # security unlock-keychain -p "$MACOS_CI_KEYCHAIN_PWD" build.keychain + # security import certificate.p12 -k build.keychain -P "$MACOS_CERTIFICATE_PWD" -T /usr/bin/codesign + # security set-key-partition-list -S apple-tool:,apple:,codesign: -s -k "$MACOS_CI_KEYCHAIN_PWD" build.keychain + # echo "Cert Imported" + pnpm install + # We need to create a new keychain, otherwise using the certificate will prompt + # with a UI dialog asking for the certificate password, which we can't + # use in a headless CI environment + export ELECTRON_MIRROR=https://npmmirror.com/mirrors/electron/ + export ODC_BUILD_SKIP_JAR=true + export CSC_IDENTITY_AUTO_DISCOVERY=false + node ./scripts/client/build.js ${{ matrix.target }} + - name: Upload artifact + uses: actions/upload-artifact@v3 + with: + name: odc-client-pkg-${{ matrix.target }} + path: | + client/release/*.dmg + client/release/*.deb + client/release/*.exe + client/release/*.AppImage diff --git a/.github/workflows/build_test.yaml b/.github/workflows/build_daily.yaml similarity index 92% rename from .github/workflows/build_test.yaml rename to .github/workflows/build_daily.yaml index 3b34748300..891cfc1533 100644 --- a/.github/workflows/build_test.yaml +++ b/.github/workflows/build_daily.yaml @@ -1,18 +1,19 @@ ### -# This workflow is used to build all artifacts (jar, rpm, image and client) for testing +# This workflow is used for daily checking and testing # Triggers: # 1. Daily trigger at 04:00 (Beijing, UTC+08:00) # 2. Manual trigger # Jobs: # 1. Check Code Format # 2. PMD Scan -# 3. Calculation Version Number -# 4. Build Web Artifact (only x86_64 for now) -# 5. Build Client Artifact +# 3. Unit Test (TODO) +# 4. Calculate Version Number +# 5. Build Web Artifact (include front resource and only x86_64 platform for now) +# 6. Build Client Artifact ### -name: Build Test -run-name: Build Test triggered by ${{ github.actor }} 🛠️ +name: Build Daily +run-name: Build Daily triggered by ${{ github.actor }} 🛠️ on: schedule: @@ -32,7 +33,6 @@ on: env: ODC_CURRENT_BRANCH: ${{ github.ref_name }} - ODC_TARGET_BRANCH: ${{ github.base_ref }} jobs: check-format: @@ -69,9 +69,25 @@ jobs: - name: Run PMD scan run: mvn pmd:check + unit-test: + name: Unit Test (Skip for now) + needs: [ check-format, pmd-scan ] + runs-on: ubuntu-latest + steps: + - name: Checkout workspace + uses: actions/checkout@v3 + - name: Setup JDK 8 + uses: actions/setup-java@v3 + with: + java-version: "8" + distribution: "temurin" + cache: maven + - name: Run unit test + run: echo "Skip for now 🤪" + calculate-version: name: Calculate Version Number - needs: [ check-format, pmd-scan ] + needs: [ unit-test ] runs-on: ubuntu-latest outputs: odc_rpm_release_number: ${{ steps.calculate_version.outputs.odc_rpm_release_number }} @@ -86,7 +102,7 @@ jobs: if [[ -n "${{ inputs.rpm_release }}" ]]; then odc_rpm_release_number="${{ inputs.rpm_release }}"; fi echo "odc_rpm_release_number=${odc_rpm_release_number}" >> $GITHUB_OUTPUT echo "odc_rpm_release_number=${odc_rpm_release_number}" - branch_match_regex="^[0-9]\\.[0-9]\\.([0-9]{1,2}|x)_(release|dev)$" + branch_match_regex="^(((dev/)?[0-9]\\.[0-9]\\.([0-9]{1,2}|x))|(release\\S*))$" tag_prefix=`[[ "${{ env.ODC_CURRENT_BRANCH }}" =~ ${branch_match_regex} ]] && echo "test-daily" || echo "dev-daily"` odc_docker_image_tag="${tag_prefix}-$(cat distribution/odc-server-VER.txt)-${odc_rpm_release_number}" if [[ -n "${{ inputs.image_tag }}" ]]; then odc_docker_image_tag="${{ inputs.image_tag }}"; fi @@ -117,7 +133,7 @@ jobs: node-version: "16" - name: Build front static resources run: | - echo "Current directory: " `pwd` + echo "Current directory: "`pwd` echo "Start build front static resources" pushd client echo "Run npm install pnpm -g" diff --git a/.github/workflows/build_dev.yaml b/.github/workflows/build_dev.yaml index 1fd16c87e2..98cc6b3cd3 100644 --- a/.github/workflows/build_dev.yaml +++ b/.github/workflows/build_dev.yaml @@ -1,14 +1,15 @@ ### -# This workflow is used for code inspection and basic artifacts' construction (rpm and image) in your daily development +# This workflow is used for daily development # Triggers: # 1. Push # 2. Pull-Request # Jobs: # 1. Check Code Format # 2. PMD Scan -# 3. Calculation Version Number -# 4. Build Web Artifact (only x86_64 for now) -# (Job 3 and 4 are triggered only when Pull-Request) +# 3. Unit Test (TODO) +# 4. Calculate Version Number +# 5. Build RPM (exclude front resources and only x86_64 platform for now) +# (Job 4 and 5 are executed only when Pull-Request) ### name: Build Dev @@ -22,10 +23,6 @@ on: branches: - "**" -env: - ODC_CURRENT_BRANCH: ${{ github.ref_name }} - ODC_TARGET_BRANCH: ${{ github.base_ref }} - jobs: check-format: name: Check Code Format @@ -61,14 +58,29 @@ jobs: - name: Run PMD scan run: mvn pmd:check + unit-test: + name: Unit Test (Skip for now) + needs: [ check-format, pmd-scan ] + runs-on: ubuntu-latest + steps: + - name: Checkout workspace + uses: actions/checkout@v3 + - name: Setup JDK 8 + uses: actions/setup-java@v3 + with: + java-version: "8" + distribution: "temurin" + cache: maven + - name: Run unit test + run: echo "Skip for now 🤪" + calculate-version: name: Calculate Version Number - needs: [ check-format, pmd-scan ] + needs: [ unit-test ] if: ${{ github.event_name == 'pull_request' }} runs-on: ubuntu-latest outputs: odc_rpm_release_number: ${{ steps.calculate_version.outputs.odc_rpm_release_number }} - odc_docker_image_tag: ${{ steps.calculate_version.outputs.odc_docker_image_tag }} steps: - name: Checkout workspace uses: actions/checkout@v3 @@ -78,19 +90,13 @@ jobs: odc_rpm_release_number=$(date +%Y%m%d%H%M%S) echo "odc_rpm_release_number=${odc_rpm_release_number}" >> $GITHUB_OUTPUT echo "odc_rpm_release_number=${odc_rpm_release_number}" - branch_match_regex="^[0-9]\\.[0-9]\\.([0-9]{1,2}|x)_(release|dev)$" - tag_prefix=`[[ "${{ env.ODC_CURRENT_BRANCH }}" =~ ${branch_match_regex} ]] && echo "test" || echo "dev"` - odc_docker_image_tag="${tag_prefix}-$(cat distribution/odc-server-VER.txt)-${{ github.actor }}-${odc_rpm_release_number}" - echo "odc_docker_image_tag=${odc_docker_image_tag}" >> $GITHUB_OUTPUT - echo "odc_docker_image_tag=${odc_docker_image_tag}" - build-web-x86_64: - name: Build Web Artifact (x86_64) + build-rpm-x86_64: + name: Build RPM (x86_64) needs: [ calculate-version ] runs-on: ubuntu-latest env: odc_rpm_release_number: ${{ needs.calculate-version.outputs.odc_rpm_release_number }} - odc_docker_image_tag: ${{ needs.calculate-version.outputs.odc_docker_image_tag }} steps: - name: Checkout workspace uses: actions/checkout@v3 @@ -106,28 +112,6 @@ jobs: uses: actions/setup-node@v3 with: node-version: "16" - - name: Build front static resources - run: | - echo "Current directory: " `pwd` - echo "Start build front static resources" - pushd client - echo "Run npm install pnpm -g" - npm install pnpm -g - echo "Run pnpm install" - pnpm install - echo "Run npm run build:odc" - npm run build:odc - popd - echo "Build front static resources success" - echo "Start copy resources files" - static_resources_path="server/odc-server/src/main/resources/static" - if [ ! -d "${static_resources_path}" ]; then - echo "mkdir -p ${static_resources_path}" - mkdir -p "${static_resources_path}" - fi - rm --force --recursive --verbose ${static_resources_path}/* - cp --force --recursive --verbose client/dist/renderer/* ${static_resources_path} - echo "Copy resources files success" - name: Set release version id: set_release_version run: | @@ -160,16 +144,3 @@ jobs: with: name: odc-server-${{ steps.set_release_version.outputs.new_version }}.x86_64.rpm path: distribution/docker/resources/odc-*.rpm - - name: Build docker image (x86_64) - run: | - sed -e "s/DATE_CHANGE/$(date)/" -i distribution/docker/odc/Dockerfile - echo "odc_docker_image_tag=${odc_docker_image_tag}" - pushd distribution/docker - docker build -t docker.io/oceanbase/odc-server:${odc_docker_image_tag} -f odc/Dockerfile . - docker save -o resources/odc-server-${odc_docker_image_tag}.tar.gz docker.io/oceanbase/odc-server:${odc_docker_image_tag} - popd - - name: Upload docker image (x86_64) - uses: actions/upload-artifact@v3 - with: - name: odc-server-${{ env.odc_docker_image_tag }}.tar.gz - path: distribution/docker/resources/odc-server-*.tar.gz diff --git a/.github/workflows/build_release.yaml b/.github/workflows/build_release.yaml index a85f6bc34a..e5a06ede3b 100644 --- a/.github/workflows/build_release.yaml +++ b/.github/workflows/build_release.yaml @@ -1,14 +1,16 @@ ### -# This workflow is used to build and release all artifacts (jar, rpm, image and client) +# This workflow is used for release # Triggers: # 1. Manual trigger # Jobs: # 1. Check Code Format # 2. PMD Scan -# 3. Calculation Version Number -# 4. Build Web Artifact (only x86_64 for now) -# 5. Build Client Artifact -# 6. Release And Tag (TODO) +# 3. Unit Test (TODO) +# 4. Calculate Version Number +# 5. Build Web Artifact (include front resource and only x86_64 platform for now) +# 6. Build Client Artifact +# 7. Release (TODO) +# 8. Tag (TODO) ### name: Build Release @@ -67,9 +69,25 @@ jobs: - name: Run PMD scan run: mvn pmd:check + unit-test: + name: Unit Test (Skip for now) + needs: [ check-format, pmd-scan ] + runs-on: ubuntu-latest + steps: + - name: Checkout workspace + uses: actions/checkout@v3 + - name: Setup JDK 8 + uses: actions/setup-java@v3 + with: + java-version: "8" + distribution: "temurin" + cache: maven + - name: Run unit test + run: echo "Skip for now 🤪" + calculate-version: name: Calculate Version Number - needs: [ check-format, pmd-scan ] + needs: [ unit-test ] runs-on: ubuntu-latest outputs: odc_rpm_release_number: ${{ steps.calculate_version.outputs.odc_rpm_release_number }} @@ -84,7 +102,7 @@ jobs: if [[ -n "${{ inputs.rpm_release }}" ]]; then odc_rpm_release_number="${{ inputs.rpm_release }}"; fi echo "odc_rpm_release_number=${odc_rpm_release_number}" >> $GITHUB_OUTPUT echo "odc_rpm_release_number=${odc_rpm_release_number}" - branch_match_regex="^[0-9]\\.[0-9]\\.([0-9]{1,2}|x)_(release|dev)$" + branch_match_regex="^(((dev/)?[0-9]\\.[0-9]\\.([0-9]{1,2}|x))|(release\\S*))$" tag_prefix=`[[ "${{ env.ODC_CURRENT_BRANCH }}" =~ ${branch_match_regex} ]] && echo "" || echo "test-"` odc_docker_image_tag="${tag_prefix}$(cat distribution/odc-server-VER.txt)-${odc_rpm_release_number}" if [[ -n "${{ inputs.image_tag }}" ]]; then odc_docker_image_tag="${{ inputs.image_tag }}"; fi @@ -280,3 +298,19 @@ jobs: client/release/*.deb client/release/*.exe client/release/*.AppImage + + release: + name: Release (Skip for now) + needs: [ build-client ] + runs-on: ubuntu-latest + steps: + - name: Release artifacts + run: echo "Skip for now 🤪" + + tag: + name: Tag (Skip for now) + needs: [ release ] + runs-on: ubuntu-latest + steps: + - name: Tag release + run: echo "Skip for now 🤪" diff --git a/libs/db-browser/src/main/java/com/oceanbase/tools/dbbrowser/schema/mysql/ODPOBMySQLSchemaAccessor.java b/libs/db-browser/src/main/java/com/oceanbase/tools/dbbrowser/schema/mysql/ODPOBMySQLSchemaAccessor.java index f8da2fddbc..d20c54bcce 100644 --- a/libs/db-browser/src/main/java/com/oceanbase/tools/dbbrowser/schema/mysql/ODPOBMySQLSchemaAccessor.java +++ b/libs/db-browser/src/main/java/com/oceanbase/tools/dbbrowser/schema/mysql/ODPOBMySQLSchemaAccessor.java @@ -21,6 +21,7 @@ import org.springframework.jdbc.core.JdbcOperations; +import com.oceanbase.tools.dbbrowser.model.DBDatabase; import com.oceanbase.tools.dbbrowser.model.DBFunction; import com.oceanbase.tools.dbbrowser.model.DBObjectIdentity; import com.oceanbase.tools.dbbrowser.model.DBObjectType; @@ -38,6 +39,27 @@ public ODPOBMySQLSchemaAccessor(JdbcOperations jdbcOperations) { super(jdbcOperations); } + @Override + public List listDatabases() { + List dbDatabases = new ArrayList<>(); + String sql = "show databases"; + List dbNames = jdbcOperations.queryForList(sql, String.class); + for (String dbName : dbNames) { + DBDatabase database = new DBDatabase(); + jdbcOperations.execute("use " + dbName); + database.setName(dbName); + database.setId(dbName); + database.setCharset( + jdbcOperations.queryForObject("show variables like 'character_set_database';", + (rs, num) -> rs.getString(2))); + database.setCollation( + jdbcOperations.queryForObject("show variables like 'collation_database';", + (rs, num) -> rs.getString(2))); + dbDatabases.add(database); + } + return dbDatabases; + } + @Override public List showTables(String schemaName) { MySQLSqlBuilder sb = new MySQLSqlBuilder(); @@ -53,12 +75,13 @@ public List showTables(String schemaName) { @Override public List listTables(String schemaName, String tableNameLike) { - String sql = "select database()"; - String currentSchema = jdbcOperations.queryForObject(sql, (rs, rowNum) -> rs.getString(1)); List results = new ArrayList<>(); - sql = "show full tables where Table_type='BASE TABLE'"; - List views = jdbcOperations.query(sql, (rs, rowNum) -> rs.getString(1)); - views.forEach(name -> results.add(DBObjectIdentity.of(currentSchema, DBObjectType.VIEW, name))); + MySQLSqlBuilder builder = new MySQLSqlBuilder(); + builder.append("show full tables from ") + .identifier(schemaName) + .append(" where Table_type='BASE TABLE'"); + List tables = jdbcOperations.query(builder.toString(), (rs, rowNum) -> rs.getString(1)); + tables.forEach(name -> results.add(DBObjectIdentity.of(schemaName, DBObjectType.TABLE, name))); return results; } diff --git a/server/odc-core/src/main/java/com/oceanbase/odc/core/sql/execute/mapper/DefaultJdbcRowMapper.java b/server/odc-core/src/main/java/com/oceanbase/odc/core/sql/execute/mapper/DefaultJdbcRowMapper.java index ecb8f8c398..f1c790c706 100644 --- a/server/odc-core/src/main/java/com/oceanbase/odc/core/sql/execute/mapper/DefaultJdbcRowMapper.java +++ b/server/odc-core/src/main/java/com/oceanbase/odc/core/sql/execute/mapper/DefaultJdbcRowMapper.java @@ -41,7 +41,7 @@ public class DefaultJdbcRowMapper extends BaseDialectBasedRowMapper { public DefaultJdbcRowMapper(@NonNull ConnectionSession session) { super(session.getDialectType()); DialectType dialectType = session.getDialectType(); - if (Objects.nonNull(dialectType) && dialectType.isOBMysql()) { + if (Objects.nonNull(dialectType) && dialectType.isMysql()) { mapperList.add(new MySQLBitMapper()); mapperList.add(new MySQLDatetimeMapper()); mapperList.add(new MySQLYearMapper()); diff --git a/server/odc-migrate/src/main/resources/migrate/common/R_2_0_0__initialize_version_diff_config.sql b/server/odc-migrate/src/main/resources/migrate/common/R_2_0_0__initialize_version_diff_config.sql index db22b76d0d..f7fbf88cc4 100644 --- a/server/odc-migrate/src/main/resources/migrate/common/R_2_0_0__initialize_version_diff_config.sql +++ b/server/odc-migrate/src/main/resources/migrate/common/R_2_0_0__initialize_version_diff_config.sql @@ -175,15 +175,14 @@ insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_view','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_procedure','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_function','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; -insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_mock_data','MYSQL','false','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; +insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_mock_data','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_sql_explain','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; -insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_sql_trace','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; +insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_sql_trace','MYSQL','false','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_partition_plan','MYSQL','false','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_constraint','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_constraint_modify','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_partition','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_partition_modify','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; -insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_partition_plafn','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_shadowtable','MYSQL','false','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_show_foreign_key','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_data_export','MYSQL','false','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; diff --git a/server/odc-migrate/src/main/resources/migrate/common/V_4_2_0_43__alter_partition_plan.sql b/server/odc-migrate/src/main/resources/migrate/common/V_4_2_0_43__alter_partition_plan.sql new file mode 100644 index 0000000000..57dbfb958f --- /dev/null +++ b/server/odc-migrate/src/main/resources/migrate/common/V_4_2_0_43__alter_partition_plan.sql @@ -0,0 +1,5 @@ +ALTER TABLE `table_partition_plan` ADD COLUMN `database_id` bigint NULL ; +ALTER TABLE `table_partition_plan` ADD COLUMN `database_partition_plan_id` bigint NULL ; +ALTER TABLE `connection_partition_plan` ADD COLUMN `database_id` bigint NULL ; +ALTER TABLE `connection_partition_plan` ADD COLUMN `schedule_id` bigint NULL ; + diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/PartitionPlanController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/PartitionPlanController.java index 4bf0b88734..162209abff 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/PartitionPlanController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/PartitionPlanController.java @@ -18,7 +18,7 @@ import java.io.IOException; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -28,7 +28,7 @@ import com.oceanbase.odc.service.common.response.Responses; import com.oceanbase.odc.service.common.response.SuccessResponse; import com.oceanbase.odc.service.partitionplan.PartitionPlanService; -import com.oceanbase.odc.service.partitionplan.model.ConnectionPartitionPlan; +import com.oceanbase.odc.service.partitionplan.model.DatabasePartitionPlan; /** * @Author:tianke @@ -36,30 +36,29 @@ * @Descripition: */ @RestController -@ConditionalOnProperty(name = "odc.feature.partitionplan.enabled", havingValue = "true") +@RequestMapping("/api/v2/partitionPlan") public class PartitionPlanController { @Autowired private PartitionPlanService partitionPlanService; - @RequestMapping(value = "/api/v2/patitionplan/ConnectionPartitionPlan", method = RequestMethod.GET) - public SuccessResponse getConnectionPartitionPlan(@RequestParam Long connectionId, + @RequestMapping(value = "/partitionPlans", method = RequestMethod.GET) + public SuccessResponse getPartitionPlans(@RequestParam Long databaseId, @RequestParam(required = false) Long flowInstanceId) { return Responses - .success(partitionPlanService.findRangeTablePlan(connectionId, flowInstanceId)); + .success(partitionPlanService.findRangeTablePlan(databaseId, flowInstanceId)); } - - @RequestMapping(value = "/api/v2/partitionplan/ConnectionPartitionPlan/batchUpdate", method = RequestMethod.PUT) - public SuccessResponse updateConnectionPartitionPlan( - @RequestBody ConnectionPartitionPlan connectionPartitionPlan) throws IOException { - partitionPlanService.updateTablePartitionPlan(connectionPartitionPlan); + @RequestMapping(value = "/partitionPlans/{id:[\\d]+}", method = RequestMethod.PUT) + public SuccessResponse update(@PathVariable Long id, + @RequestBody DatabasePartitionPlan databasePartitionPlan) throws IOException { + partitionPlanService.updateTablePartitionPlan(databasePartitionPlan); return Responses.success("ok"); } - @RequestMapping(value = "/api/v2/partitionplan/ConnectionPartitionPlan/exist", method = RequestMethod.GET) - public SuccessResponse hasConnectionPartitionPlan(@RequestParam("connectionId") Long connectionId) { - return Responses.success(partitionPlanService.hasConnectionPartitionPlan(connectionId)); + @RequestMapping(value = "/partitionPlans/exists", method = RequestMethod.GET) + public SuccessResponse exist(@RequestParam("databaseId") Long databaseId) { + return Responses.success(partitionPlanService.hasConnectionPartitionPlan(databaseId)); } } diff --git a/server/odc-server/src/main/resources/data.sql b/server/odc-server/src/main/resources/data.sql index be123797e4..62627b1230 100644 --- a/server/odc-server/src/main/resources/data.sql +++ b/server/odc-server/src/main/resources/data.sql @@ -682,3 +682,4 @@ INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES( 'odc.task.datatransfer.cursor-fetch-size', '20', '导出时游标的 fetch size,默认为 20,最大值为 1000' ) ON DUPLICATE KEY UPDATE `id` = `id`; INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES( 'odc.data-security.masking.enabled', 'true', '是否开启数据脱敏,默认为开启' ) ON DUPLICATE KEY UPDATE `id` = `id`; +INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES( 'odc.task.partition-plan.schedule-cron', '0 0 * * * ?', '默认调度周期:每天 0 点' ) ON DUPLICATE KEY UPDATE `id` = `id`; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/config/LockConfiguration.java b/server/odc-service/src/main/java/com/oceanbase/odc/config/LockConfiguration.java index 31968b83fa..d5c02783e4 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/config/LockConfiguration.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/config/LockConfiguration.java @@ -21,7 +21,6 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Primary; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; import org.springframework.integration.jdbc.lock.DefaultLockRepository; @@ -66,32 +65,10 @@ public LockRepository odcLockRepository(DataSource dataSource, @Value("${server. } @Bean - @Primary public JdbcLockRegistry jdbcLockRegistry(@Qualifier("odcLockRepository") LockRepository odcLockRepository) { return new JdbcLockRegistry(odcLockRepository); } - @Bean("partitionPlanLockRepository") - public LockRepository partitionPlanLockRepository(DataSource dataSource, - @Value("${server.port:8989}") String listenPort) { - String localIpAddress = SystemUtils.getLocalIpAddress(); - log.info("create partition plan lock repository..., localIpAddress={}, listenPort={}", localIpAddress, - listenPort); - initLockTable(dataSource); - String lockId = String.format("%s:%s", localIpAddress, listenPort); - DefaultLockRepository defaultLockRepository = new OdcLockRepository(dataSource, lockId); - defaultLockRepository.setPrefix("DISTRIBUTED_"); - defaultLockRepository.setTimeToLive(1000 * 60 * 60 * 24); - log.info("partition plan lock repository created."); - return defaultLockRepository; - } - - @Bean("partitionPlanJdbcLockRegistry") - public JdbcLockRegistry partitionPlanJdbcLockRegistry( - @Qualifier("partitionPlanLockRepository") LockRepository partitionPlanLockRepository) { - return new JdbcLockRegistry(partitionPlanLockRepository); - } - private void initLockTable(DataSource dataSource) { JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); jdbcTemplate.setQueryTimeout(CREATE_LOCK_TABLE_TIMEOUT_SECONDS); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/ConnectionPartitionPlanEntity.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/DatabasePartitionPlanEntity.java similarity index 91% rename from server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/ConnectionPartitionPlanEntity.java rename to server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/DatabasePartitionPlanEntity.java index 00895601f4..90b2f9c267 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/ConnectionPartitionPlanEntity.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/DatabasePartitionPlanEntity.java @@ -45,7 +45,7 @@ @AllArgsConstructor @NoArgsConstructor @Table(name = "connection_partition_plan") -public class ConnectionPartitionPlanEntity { +public class DatabasePartitionPlanEntity { @Id @Column(name = "id", nullable = false, updatable = false) @@ -54,8 +54,12 @@ public class ConnectionPartitionPlanEntity { @Column(name = "connection_id", nullable = false, updatable = false) private Long connectionId; + @Column(name = "database_id", nullable = false, updatable = false) + private Long databaseId; @Column(name = "flow_instance_id", nullable = false, updatable = false) private Long flowInstanceId; + @Column(name = "schedule_id", updatable = false) + private Long scheduleId; @Column(name = "organization_id", nullable = false, updatable = false) private Long organizationId; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/ConnectionPartitionPlanRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/DatabasePartitionPlanRepository.java similarity index 64% rename from server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/ConnectionPartitionPlanRepository.java rename to server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/DatabasePartitionPlanRepository.java index a73f56cdd7..6638c460c2 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/ConnectionPartitionPlanRepository.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/DatabasePartitionPlanRepository.java @@ -15,7 +15,6 @@ */ package com.oceanbase.odc.metadb.partitionplan; -import java.util.List; import java.util.Optional; import javax.transaction.Transactional; @@ -31,8 +30,8 @@ * @Date: 2022/9/16 16:42 * @Descripition: */ -public interface ConnectionPartitionPlanRepository extends JpaRepository, - JpaSpecificationExecutor { +public interface DatabasePartitionPlanRepository extends JpaRepository, + JpaSpecificationExecutor { @Transactional @Modifying @@ -41,19 +40,21 @@ public interface ConnectionPartitionPlanRepository extends JpaRepository findValidPlanByConnectionId(@Param("connectionId") Long connectionId); + + "where database_id=:databaseId and is_config_enabled=true", nativeQuery = true) + Optional findValidPlanByDatabaseId(@Param("databaseId") Long databaseId); - Optional findByFlowInstanceId(Long flowInstanceId); - - @Query(value = "select * from connection_partition_plan " - + "where is_config_enabled=true", nativeQuery = true) - List findAllValidPlan(); + Optional findByFlowInstanceId(Long flowInstanceId); @Transactional @Modifying @Query(value = "update connection_partition_plan set is_config_enabled = false " - + "where connection_id=:connectionId", nativeQuery = true) - int disableConfigByConnectionId(@Param("connectionId") Long connectionId); + + "where database_id=:databaseId", nativeQuery = true) + int disableConfigByDatabaseId(@Param("databaseId") Long databaseId); + + @Transactional + @Modifying + @Query(value = "update connection_partition_plan set schedule_id =:scheduleId " + + "where id=:id", nativeQuery = true) + int updateScheduleIdById(@Param("id") Long id, @Param("scheduleId") Long scheduleId); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/TablePartitionPlanEntity.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/TablePartitionPlanEntity.java index f20698730e..db525c347b 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/TablePartitionPlanEntity.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/TablePartitionPlanEntity.java @@ -55,6 +55,10 @@ public class TablePartitionPlanEntity { @Column(name = "connection_id", nullable = false, updatable = false) private Long connectionId; + @Column(name = "database_id", nullable = false, updatable = false) + private Long databaseId; + @Column(name = "database_partition_plan_id", updatable = false) + private Long databasePartitionPlanId; @Column(name = "flow_instance_id", nullable = false, updatable = false) private Long flowInstanceId; @Column(name = "schema_name", nullable = false, updatable = false) diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/TablePartitionPlanRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/TablePartitionPlanRepository.java index 06222c7044..52fc360955 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/TablePartitionPlanRepository.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/TablePartitionPlanRepository.java @@ -16,7 +16,6 @@ package com.oceanbase.odc.metadb.partitionplan; import java.util.List; -import java.util.Optional; import javax.transaction.Transactional; @@ -37,31 +36,22 @@ public interface TablePartitionPlanRepository extends JpaRepository findValidPlanByDatabasePartitionPlanId(@Param("id") Long databasePartitionPlanId); - @Query(value = "select * from table_partition_plan where connection_id=:id and schema_name=:schemaName" - + " and table_name=:tableName and is_config_enabled=true", nativeQuery = true) - Optional findEnTablePlan(@Param("id") Long connectionId, - @Param("schemaName") String schemaName, - @Param("tableName") String tableName); - - - @Query(value = "select * from table_partition_plan where flow_instance_id=:flowInstanceId and is_config_enabled = true and is_auto_partition = true", - nativeQuery = true) - List findValidPlanByFlowInstanceId(@Param("flowInstanceId") Long flowInstanceId); - - @Query(value = "select * from table_partition_plan ap " - + "where ap.connection_id=:connectionId and ap.is_config_enabled = true", nativeQuery = true) - List findValidPlanByConnectionId(@Param("connectionId") Long connectionId); - - List findByFlowInstanceId(Long flowInstanceId); + @Query(value = "select * from table_partition_plan " + + "where database_id=:id and is_config_enabled = true", nativeQuery = true) + List findValidPlanByDatabaseId(@Param("id") Long databaseId); + List findByDatabasePartitionPlanId(@Param("id") Long databasePartitionPlanId); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java index e4c0418be7..4804a4dadb 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java @@ -423,6 +423,7 @@ public Boolean internalSyncDataSourceSchemas(@NonNull Long dataSourceId) throws database.setCollationName(latest.getCollationName()); database.setCharsetName(latest.getCharsetName()); database.setLastSyncTime(latest.getLastSyncTime()); + database.setExisted(Boolean.TRUE); } return database; }).collect(Collectors.toList()); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/DataMaskingService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/DataMaskingService.java index 8268ae6ac8..eed92c85e4 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/DataMaskingService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/DataMaskingService.java @@ -218,7 +218,7 @@ public boolean isMaskingEnabled() { } private SQLParser getSqlParser(DialectType dialectType) { - if (Objects.nonNull(dialectType) && dialectType.isOBMysql()) { + if (Objects.nonNull(dialectType) && dialectType.isMysql()) { return new OBMySQLParser(); } else if (dialectType == DialectType.OB_ORACLE) { return new OBOracleSQLParser(); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/extractor/OBColumnExtractor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/extractor/OBColumnExtractor.java index 4244204390..291f07b742 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/extractor/OBColumnExtractor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/extractor/OBColumnExtractor.java @@ -710,7 +710,7 @@ private List getColumnsFromVirtualTable(String tableName) throws } private String processIdentifier(String identifier) { - if (Objects.nonNull(dialectType) && dialectType.isOBMysql()) { + if (Objects.nonNull(dialectType) && dialectType.isMysql()) { String unquoted = StringUtils.unquoteMySqlIdentifier(identifier); return StringUtils.isBlank(unquoted) ? unquoted : unquoted.toLowerCase(); } else if (dialectType == DialectType.OB_ORACLE) { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/datatransfer/DBObjectNameAccessor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/datatransfer/DBObjectNameAccessor.java index 98263b78cc..db3e599312 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/datatransfer/DBObjectNameAccessor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/datatransfer/DBObjectNameAccessor.java @@ -100,7 +100,7 @@ public Set getViewNames() { } public Set getTriggerNames() { - if (session.getDialectType().isOBMysql()) { + if (session.getDialectType().isMysql()) { return Collections.emptySet(); } return accessor.listTriggers(schema).stream().map(DBObjectIdentity::getName).collect(Collectors.toSet()); @@ -131,7 +131,7 @@ public Set getSequenceNames() { } public Set getSynonymNames() { - if (session.getDialectType().isOBMysql()) { + if (session.getDialectType().isMysql()) { return Collections.emptySet(); } return accessor.listSynonyms(schema, DBSynonymType.COMMON).stream().map(DBObjectIdentity::getName) @@ -139,7 +139,7 @@ public Set getSynonymNames() { } public Set getPublicSynonymNames() { - if (session.getDialectType().isOBMysql()) { + if (session.getDialectType().isMysql()) { return Collections.emptySet(); } return accessor.listSynonyms(schema, DBSynonymType.PUBLIC).stream().map(DBObjectIdentity::getName) @@ -147,7 +147,7 @@ public Set getPublicSynonymNames() { } public Set getPackageNames() { - if (session.getDialectType().isOBMysql()) { + if (session.getDialectType().isMysql()) { return Collections.emptySet(); } return accessor.listPackages(schema).stream() @@ -157,7 +157,7 @@ public Set getPackageNames() { } public Set getPackageBodyNames() { - if (session.getDialectType().isOBMysql()) { + if (session.getDialectType().isMysql()) { return Collections.emptySet(); } return accessor.listPackages(schema).stream() diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/db/DBPLService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/db/DBPLService.java index 8c0c52c8a1..7f82adc9da 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/db/DBPLService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/db/DBPLService.java @@ -116,7 +116,7 @@ public void destroy() { @SkipAuthorize("inside connect session") public String startBatchCompile(@NonNull ConnectionSession session, String databaseName, StartBatchCompileReq req) { - if (Objects.nonNull(session.getDialectType()) && session.getDialectType().isOBMysql()) { + if (Objects.nonNull(session.getDialectType()) && session.getDialectType().isMysql()) { throw new UnsupportedException("Batch compile is not supported in mysql mode"); } Validate.notNull(req.getScope(), "Parameter [scope] can not be null"); @@ -148,7 +148,7 @@ public String startBatchCompile(@NonNull ConnectionSession session, @SkipAuthorize("inside connect session") public String startBatchCompile(@NonNull ConnectionSession session, String databaseName, @NonNull List identities) { - if (Objects.nonNull(session.getDialectType()) && session.getDialectType().isOBMysql()) { + if (Objects.nonNull(session.getDialectType()) && session.getDialectType().isMysql()) { throw new UnsupportedException("Batch compile is not supported in mysql mode"); } BatchCompileTaskCallable taskCallable = new BatchCompileTaskCallable(session, identities); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dml/converter/BaseStringConverter.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dml/converter/BaseStringConverter.java index f557e656e5..d487edc59b 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dml/converter/BaseStringConverter.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dml/converter/BaseStringConverter.java @@ -39,7 +39,7 @@ public abstract class BaseStringConverter extends BaseDataConverter { protected String doConvert(@NonNull DataValue dataValue) { ValueEncodeType encodeType = dataValue.getEncodeType(); String value = new String(encodeType.decode(dataValue.getValue()), StandardCharsets.UTF_8); - if (Objects.nonNull(dialectType()) && dialectType().isOBMysql()) { + if (Objects.nonNull(dialectType()) && dialectType().isMysql()) { value = StringUtils.escapeUseDouble(value, (char) 92); } return "'" + StringUtils.escapeUseDouble(value, (char) 39) + "'"; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dml/converter/DataConverters.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dml/converter/DataConverters.java index 93b0f63c71..b08c2a4eff 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dml/converter/DataConverters.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dml/converter/DataConverters.java @@ -40,7 +40,7 @@ private DataConverters(DialectType dialectType, String serverTimeZoneId) { converterList = new LinkedList<>(); if (DialectType.OB_ORACLE.equals(dialectType)) { initForOracleMode(serverTimeZoneId); - } else if (Objects.nonNull(dialectType) && dialectType.isOBMysql()) { + } else if (Objects.nonNull(dialectType) && dialectType.isMysql()) { initForMysqlMode(); } else { throw new IllegalArgumentException("Illegal DialectType " + dialectType); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java index 94c4aa1af7..c5744bc0f8 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java @@ -129,8 +129,6 @@ import com.oceanbase.odc.service.integration.model.IntegrationConfig; import com.oceanbase.odc.service.integration.model.TemplateVariables; import com.oceanbase.odc.service.integration.model.TemplateVariables.Variable; -import com.oceanbase.odc.service.partitionplan.model.PartitionPlanTaskParameters; -import com.oceanbase.odc.service.partitionplan.model.TablePartitionPlan; import com.oceanbase.odc.service.regulation.approval.model.ApprovalFlowConfig; import com.oceanbase.odc.service.regulation.approval.model.ApprovalNodeConfig; import com.oceanbase.odc.service.regulation.risklevel.RiskLevelService; @@ -210,11 +208,6 @@ public class FlowInstanceService { private final List> shadowTableComparingTaskHooks = new ArrayList<>(); private static final long MAX_EXPORT_OBJECT_COUNT = 10000; - /** - * Max partition count for OB MySQL mode, refer to - * 分区概述 - */ - private static final long MAX_PARTITION_COUNT = 8192; private static final String ODC_SITE_URL = "odc.site.url"; private static final String INVALID_EXTERNAL_INSTANCE_ID = "N/A"; @@ -259,19 +252,6 @@ public List createIndividualFlowInstance(CreateFlowInsta @EnablePreprocess @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRED) public List create(@NotNull @Valid CreateFlowInstanceReq createReq) { - if (createReq.getTaskType() == TaskType.PARTITION_PLAN) { - PartitionPlanTaskParameters parameters = (PartitionPlanTaskParameters) createReq.getParameters(); - List tablePartitionPlans = parameters.getConnectionPartitionPlan() - .getTablePartitionPlans(); - for (TablePartitionPlan tablePartitionPlan : tablePartitionPlans) { - if (tablePartitionPlan.getPartitionCount() > MAX_PARTITION_COUNT - && tablePartitionPlan.getDetail().getIsAutoPartition()) { - throw new RuntimeException( - String.format("Can not create more partition. TableName: %s,PartitionCount: %s", - tablePartitionPlan.getTableName(), tablePartitionPlan.getPartitionCount())); - } - } - } // TODO 原终止逻辑想表达的语意是终止执行中的计划,但目前线上的语意是终止审批流。暂保留逻辑,待前端修改后删除。 if (createReq.getTaskType() == TaskType.ALTER_SCHEDULE) { AlterScheduleParameters parameters = (AlterScheduleParameters) createReq.getParameters(); @@ -770,17 +750,10 @@ private FlowInstanceConfigurer buildConfigurer( log.warn("Create external approval instance failed, the instance will be force closed!"); } } - FlowApprovalInstance approvalInstance; - if (taskType == TaskType.PARTITION_PLAN && nodeSequence == 0) { - approvalInstance = flowFactory.generateFlowApprovalInstance(flowInstance.getId(), - false, false, nodeConfig.getAutoApproval(), - approvalFlowConfig.getApprovalExpirationIntervalSeconds(), true); - } else { - approvalInstance = flowFactory.generateFlowApprovalInstance(flowInstance.getId(), - false, false, - nodeConfig.getAutoApproval(), approvalFlowConfig.getApprovalExpirationIntervalSeconds(), - externalApprovalId, externalFlowInstanceId); - } + FlowApprovalInstance approvalInstance = flowFactory.generateFlowApprovalInstance(flowInstance.getId(), + false, false, + nodeConfig.getAutoApproval(), approvalFlowConfig.getApprovalExpirationIntervalSeconds(), + externalApprovalId, externalFlowInstanceId); if (Objects.nonNull(resourceRoleId)) { approvalPermissionService.setCandidateResourceRole(approvalInstance.getId(), StringUtils.join(flowInstanceReq.getProjectId(), ":", resourceRoleId)); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java index 4a34b5b2df..90a306394b 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java @@ -601,8 +601,8 @@ private List getPartitionPlanResult(@NonNull TaskEntity return null; } Long flowInstanceId = partitionPlanTaskResults.get(0).getFlowInstanceId(); - partitionPlanTaskResults.get(0).setConnectionPartitionPlan( - partitionPlanService.findTablePartitionPlanByFlowInstanceId(flowInstanceId)); + partitionPlanTaskResults.get(0).setDatabasePartitionPlan( + partitionPlanService.findDatabasePartitionPlanByFlowInstanceId(flowInstanceId)); return partitionPlanTaskResults; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/PartitionPlanTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/PartitionPlanTask.java index 8f5c636850..e4950074e0 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/PartitionPlanTask.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/PartitionPlanTask.java @@ -15,21 +15,16 @@ */ package com.oceanbase.odc.service.flow.task; -import java.util.List; - import org.flowable.engine.delegate.DelegateExecution; import org.springframework.beans.factory.annotation.Autowired; import com.oceanbase.odc.core.shared.constant.FlowStatus; -import com.oceanbase.odc.metadb.partitionplan.TablePartitionPlanEntity; -import com.oceanbase.odc.metadb.partitionplan.TablePartitionPlanRepository; import com.oceanbase.odc.service.flow.task.model.PartitionPlanTaskResult; import com.oceanbase.odc.service.flow.util.FlowTaskUtil; import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; -import com.oceanbase.odc.service.iam.model.User; -import com.oceanbase.odc.service.partitionplan.PartitionPlanTaskService; +import com.oceanbase.odc.service.partitionplan.PartitionPlanService; import com.oceanbase.odc.service.partitionplan.PartitionPlanTaskTraceContextHolder; -import com.oceanbase.odc.service.partitionplan.model.ConnectionPartitionPlan; +import com.oceanbase.odc.service.partitionplan.model.DatabasePartitionPlan; import com.oceanbase.odc.service.partitionplan.model.PartitionPlanTaskParameters; import com.oceanbase.odc.service.task.TaskService; @@ -43,12 +38,10 @@ @Slf4j public class PartitionPlanTask extends BaseODCFlowTaskDelegate { - @Autowired - private TablePartitionPlanRepository tablePartitionPlanRepository; @Autowired private AuthenticationFacade authenticationFacade; @Autowired - private PartitionPlanTaskService partitionPlanTaskService; + private PartitionPlanService partitionPlanService; private volatile boolean isSuccessful = false; private volatile boolean isFailure = false; @@ -60,24 +53,14 @@ protected PartitionPlanTaskResult start(Long taskId, TaskService taskService, De try { PartitionPlanTaskParameters taskParameters = FlowTaskUtil.getPartitionPlanParameter(execution); - ConnectionPartitionPlan connectionPartitionPlan = taskParameters.getConnectionPartitionPlan(); - // 更新状态 + DatabasePartitionPlan databasePartitionPlan = taskParameters.getConnectionPartitionPlan(); taskService.start(taskId); - // 审批通过,生效配置 - partitionPlanTaskService.enableFlowInstancePartitionPlan(connectionPartitionPlan.getConnectionId(), - getFlowInstanceId()); - // 拉取配置信息 - List tablePlans = tablePartitionPlanRepository.findValidPlanByFlowInstanceId( - getFlowInstanceId()); - - User taskCreator = FlowTaskUtil.getTaskCreator(execution); - partitionPlanTaskService.executePartitionPlan(connectionPartitionPlan.getConnectionId(), - getFlowInstanceId(), tablePlans, taskCreator); - + // Create and enable partition plan. + databasePartitionPlan.setFlowInstanceId(getFlowInstanceId()); + partitionPlanService.createDatabasePartitionPlan(databasePartitionPlan); PartitionPlanTaskResult taskResult = new PartitionPlanTaskResult(); taskResult.setFlowInstanceId(getFlowInstanceId()); - taskResult.setConnectionPartitionPlan(connectionPartitionPlan); - // TODO 临时,改为周期性任务后变动 + taskResult.setDatabasePartitionPlan(databasePartitionPlan); isSuccessful = true; taskService.succeed(taskId, taskResult); log.info("Partition plan task succeed."); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/PartitionPlanTaskResult.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/PartitionPlanTaskResult.java index 1f72be7a22..bf104baa05 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/PartitionPlanTaskResult.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/PartitionPlanTaskResult.java @@ -15,7 +15,7 @@ */ package com.oceanbase.odc.service.flow.task.model; -import com.oceanbase.odc.service.partitionplan.model.ConnectionPartitionPlan; +import com.oceanbase.odc.service.partitionplan.model.DatabasePartitionPlan; import lombok.Data; @@ -27,5 +27,5 @@ @Data public class PartitionPlanTaskResult implements FlowTaskResult { private Long flowInstanceId; - private ConnectionPartitionPlan connectionPartitionPlan; + private DatabasePartitionPlan databasePartitionPlan; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/ConnectionPartitionPlanMapper.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/DatabasePartitionPlanMapper.java similarity index 61% rename from server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/ConnectionPartitionPlanMapper.java rename to server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/DatabasePartitionPlanMapper.java index 19a5577f0e..4e057d3576 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/ConnectionPartitionPlanMapper.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/DatabasePartitionPlanMapper.java @@ -18,14 +18,14 @@ import org.mapstruct.Mapper; import org.mapstruct.factory.Mappers; -import com.oceanbase.odc.metadb.partitionplan.ConnectionPartitionPlanEntity; -import com.oceanbase.odc.service.partitionplan.model.ConnectionPartitionPlan; +import com.oceanbase.odc.metadb.partitionplan.DatabasePartitionPlanEntity; +import com.oceanbase.odc.service.partitionplan.model.DatabasePartitionPlan; @Mapper -public interface ConnectionPartitionPlanMapper { - ConnectionPartitionPlanMapper INSTANCE = Mappers.getMapper(ConnectionPartitionPlanMapper.class); +public interface DatabasePartitionPlanMapper { + DatabasePartitionPlanMapper INSTANCE = Mappers.getMapper(DatabasePartitionPlanMapper.class); - ConnectionPartitionPlan entityToModel(ConnectionPartitionPlanEntity entity); + DatabasePartitionPlan entityToModel(DatabasePartitionPlanEntity entity); - ConnectionPartitionPlanEntity modelToEntity(ConnectionPartitionPlan model); + DatabasePartitionPlanEntity modelToEntity(DatabasePartitionPlan model); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanFunction.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanFunction.java index 65cb07980c..5ddfe4058a 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanFunction.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanFunction.java @@ -15,15 +15,15 @@ */ package com.oceanbase.odc.service.partitionplan; -import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; -import java.util.List; +import java.util.Optional; import java.util.regex.Pattern; import com.oceanbase.odc.service.partitionplan.model.PeriodUnit; -import com.oceanbase.tools.dbbrowser.model.DBTablePartition; -import com.oceanbase.tools.dbbrowser.model.DBTablePartitionDefinition; +import com.oceanbase.tools.dbbrowser.model.DBTable; +import com.oceanbase.tools.dbbrowser.model.DBTableColumn; +import com.oceanbase.tools.dbbrowser.model.DBTablePartitionType; /** * @Author:tianke @@ -69,23 +69,28 @@ public static Long getPartitionRightBound(Long baseDate, int interval, PeriodUni return maxRightBound.getTime().getTime(); } - public static PartitionExpressionType getPartitionExpressionType(DBTablePartition partition) { - List definitions = partition.getPartitionDefinitions(); - String maxValue = definitions.get(definitions.size() - 1).getMaxValues().get(0); - String expression = partition.getPartitionOption().getExpression(); - if (Pattern.matches("UNIX_TIMESTAMP\\(.*\\)", expression)) { - return PartitionExpressionType.UNIX_TIMESTAMP; - } - if (Pattern.matches(".*\\(.*\\)", expression)) { - return PartitionExpressionType.OTHER; + public static PartitionExpressionType getPartitionExpressionType(DBTable table) { + String expression = table.getPartition().getPartitionOption().getExpression(); + PartitionExpressionType type = PartitionExpressionType.OTHER; + if (table.getPartition().getPartitionOption().getType() == DBTablePartitionType.RANGE) { + if (Pattern.matches("UNIX_TIMESTAMP\\(.*\\)", expression)) { + type = PartitionExpressionType.UNIX_TIMESTAMP; + } } - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); - try { - sdf.parse(maxValue.substring(1, maxValue.length() - 1)); - return PartitionExpressionType.DATE; - } catch (Exception e) { - return PartitionExpressionType.OTHER; + if (table.getPartition().getPartitionOption().getType() == DBTablePartitionType.RANGE_COLUMNS + && table.getPartition().getPartitionOption().getColumnNames().size() == 1) { + Optional rangeColumn = table.getColumns().stream().filter( + column -> column.getName() + .equals(table.getPartition().getPartitionOption().getColumnNames().get(0))) + .findFirst(); + if (rangeColumn.isPresent()) { + if (rangeColumn.get().getTypeName().equalsIgnoreCase("DATE") + || rangeColumn.get().getTypeName().equalsIgnoreCase("DATETIME")) { + type = PartitionExpressionType.DATE; + } + } } + return type; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanSchedules.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanSchedules.java deleted file mode 100644 index 3449453755..0000000000 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanSchedules.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Copyright (c) 2023 OceanBase. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.oceanbase.odc.service.partitionplan; - -import java.text.SimpleDateFormat; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.integration.jdbc.lock.JdbcLockRegistry; -import org.springframework.stereotype.Component; - -import com.oceanbase.odc.core.shared.constant.FlowStatus; -import com.oceanbase.odc.metadb.flow.FlowInstanceEntity; -import com.oceanbase.odc.metadb.flow.FlowInstanceRepository; -import com.oceanbase.odc.metadb.iam.UserEntity; -import com.oceanbase.odc.metadb.partitionplan.ConnectionPartitionPlanEntity; -import com.oceanbase.odc.metadb.partitionplan.ConnectionPartitionPlanRepository; -import com.oceanbase.odc.metadb.partitionplan.TablePartitionPlanEntity; -import com.oceanbase.odc.metadb.partitionplan.TablePartitionPlanRepository; -import com.oceanbase.odc.service.flow.FlowInstanceService; -import com.oceanbase.odc.service.iam.UserService; -import com.oceanbase.odc.service.iam.model.User; -import com.oceanbase.odc.service.iam.util.SecurityContextUtils; - -import lombok.extern.slf4j.Slf4j; - -/** - * @Author:tianke - * @Date: 2022/10/9 14:17 - * @Descripition: - */ -@Slf4j -@Component -public class PartitionPlanSchedules { - - @Autowired - private ConnectionPartitionPlanRepository connectionPartitionPlanRepository; - @Autowired - private TablePartitionPlanRepository tablePartitionPlanRepository; - - @Autowired - private FlowInstanceService flowInstanceService; - - @Autowired - private FlowInstanceRepository flowInstanceRepository; - @Autowired - private PartitionPlanTaskService partitionPlanTaskService; - - @Autowired - private UserService userService; - - @Autowired - @Qualifier("partitionPlanJdbcLockRegistry") - private JdbcLockRegistry jdbcLockRegistry; - - private final SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); - - private final String resourcePrefix = "partition_plan_schedule"; - - // @Scheduled(cron = "0 0 0 * * ?") - public void alterPartitionTask() throws Exception { - // 查找所有生效计划 - List allValidPlan = connectionPartitionPlanRepository.findAllValidPlan(); - for (ConnectionPartitionPlanEntity connectionPartitionPlan : allValidPlan) { - try { - List tablePlans = tablePartitionPlanRepository - .findValidPlanByFlowInstanceId(connectionPartitionPlan.getFlowInstanceId()); - - if (tablePlans.isEmpty()) { - continue; - } - // 加全局锁,避免多个实例重复执行报错。锁粒度到每个连接的分区计划。 - if (!tryLock(connectionPartitionPlan.getConnectionId())) { - continue; - } - UserEntity userEntity = userService.nullSafeGet(connectionPartitionPlan.getCreatorId()); - User taskCreator = new User(userEntity); - SecurityContextUtils.setCurrentUser(taskCreator); - - // 查找正在审批中的子流程,并终止。 - cancelApprovingTask(connectionPartitionPlan.getFlowInstanceId()); - - partitionPlanTaskService.executePartitionPlan(connectionPartitionPlan.getConnectionId(), - connectionPartitionPlan.getFlowInstanceId(), tablePlans, taskCreator); - } catch (Exception e) { - log.warn("Partition planning daily task creation failed,connectionId={},error message={}", - connectionPartitionPlan.getConnectionId(), e.getMessage()); - } - - } - - } - - private boolean tryLock(Long connectionId) throws InterruptedException { - String lockKey = String.format("%s_%s_%s", resourcePrefix, connectionId, - sdf.format(System.currentTimeMillis())); - Lock lock = jdbcLockRegistry.obtain(lockKey); - if (!lock.tryLock(3, TimeUnit.SECONDS)) { - log.info("get lock failed:{}", lockKey); - return false; - } - log.info("get lock:{}", lockKey); - return true; - } - - private void cancelApprovingTask(Long flowInstanceId) { - List entities = - flowInstanceRepository.findByParentInstanceId(flowInstanceId); - for (FlowInstanceEntity entity : entities) { - if (entity.getStatus() == FlowStatus.APPROVING) { - log.info("sub task expired:{}", entity.getId()); - flowInstanceService.cancel(entity.getId(), false); - } - } - } -} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanService.java index 152995a068..cb8aff2d88 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanService.java @@ -18,10 +18,13 @@ import java.io.IOException; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import org.quartz.SchedulerException; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -30,23 +33,33 @@ import com.oceanbase.odc.core.session.ConnectionSession; import com.oceanbase.odc.metadb.flow.ServiceTaskInstanceEntity; import com.oceanbase.odc.metadb.flow.ServiceTaskInstanceRepository; -import com.oceanbase.odc.metadb.partitionplan.ConnectionPartitionPlanEntity; -import com.oceanbase.odc.metadb.partitionplan.ConnectionPartitionPlanRepository; +import com.oceanbase.odc.metadb.partitionplan.DatabasePartitionPlanEntity; +import com.oceanbase.odc.metadb.partitionplan.DatabasePartitionPlanRepository; import com.oceanbase.odc.metadb.partitionplan.TablePartitionPlanEntity; import com.oceanbase.odc.metadb.partitionplan.TablePartitionPlanRepository; +import com.oceanbase.odc.metadb.schedule.ScheduleEntity; import com.oceanbase.odc.metadb.task.TaskEntity; -import com.oceanbase.odc.service.connection.ConnectionService; +import com.oceanbase.odc.service.connection.database.DatabaseService; +import com.oceanbase.odc.service.connection.database.model.Database; import com.oceanbase.odc.service.connection.model.ConnectionConfig; import com.oceanbase.odc.service.db.browser.DBSchemaAccessors; import com.oceanbase.odc.service.flow.FlowInstanceService; import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; -import com.oceanbase.odc.service.partitionplan.model.ConnectionPartitionPlan; +import com.oceanbase.odc.service.partitionplan.model.DatabasePartitionPlan; import com.oceanbase.odc.service.partitionplan.model.PartitionPlanTaskParameters; import com.oceanbase.odc.service.partitionplan.model.TablePartitionPlan; +import com.oceanbase.odc.service.quartz.model.MisfireStrategy; +import com.oceanbase.odc.service.schedule.ScheduleService; +import com.oceanbase.odc.service.schedule.model.JobType; +import com.oceanbase.odc.service.schedule.model.PartitionPlanJobParameters; +import com.oceanbase.odc.service.schedule.model.ScheduleStatus; +import com.oceanbase.odc.service.schedule.model.TriggerConfig; +import com.oceanbase.odc.service.schedule.model.TriggerStrategy; import com.oceanbase.odc.service.session.factory.DefaultConnectSessionFactory; import com.oceanbase.odc.service.task.TaskService; import com.oceanbase.tools.dbbrowser.model.DBTablePartition; import com.oceanbase.tools.dbbrowser.schema.DBSchemaAccessor; +import com.oceanbase.tools.migrator.common.exception.UnExpectedException; import lombok.extern.slf4j.Slf4j; @@ -61,17 +74,22 @@ @SkipAuthorize("permission check inside getForConnect") public class PartitionPlanService { + @Value("${odc.task.partition-plan.schedule-cron:0 0 * * * ?}") + private String defaultScheduleCron; @Autowired private TablePartitionPlanRepository tablePartitionPlanRepository; @Autowired - private ConnectionPartitionPlanRepository connectionPartitionPlanRepository; + private DatabasePartitionPlanRepository databasePartitionPlanRepository; @Autowired private FlowInstanceService flowInstanceService; @Autowired private AuthenticationFacade authenticationFacade; @Autowired - private ConnectionService connectionService; + private DatabaseService databaseService; + + @Autowired + private ScheduleService scheduleService; @Autowired private TaskService taskService; @@ -79,67 +97,55 @@ public class PartitionPlanService { private ServiceTaskInstanceRepository serviceTaskInstanceRepository; - private final ConnectionPartitionPlanMapper connectionPartitionPlanMapper = ConnectionPartitionPlanMapper.INSTANCE; + private final DatabasePartitionPlanMapper databasePartitionPlanMapper = DatabasePartitionPlanMapper.INSTANCE; private final TablePartitionPlanMapper tablePartitionPlanMapper = new TablePartitionPlanMapper(); /** - * 查找 Range 分区表 TODO:新增筛选已修改的表 + * 查找 Range 分区表 */ - public ConnectionPartitionPlan findRangeTablePlan(Long connectionId, Long flowInstanceId) { + public DatabasePartitionPlan findRangeTablePlan(Long databaseId, Long flowInstanceId) { // 根据流程实例 ID 查询时,仅查询实例下的分区配置 if (flowInstanceId != null) { - return findTablePartitionPlanByFlowInstanceId(flowInstanceId); - } - // 通过 connectionId 查询时,拉取全 RANGE 表,并返回已存在的分区计划 - ConnectionConfig connectionConfig = connectionService.getForConnect(connectionId); - DefaultConnectSessionFactory factory = new DefaultConnectSessionFactory(connectionConfig); - ConnectionSession connectionSession = factory.generateSession(); - try { - DBSchemaAccessor accessor = DBSchemaAccessors.create(connectionSession); - List tablePartitionPlans = findConnectionAllTablePartitionPlan( - connectionId, connectionConfig.getTenantName(), accessor); - Optional validConnectionPlan = - connectionPartitionPlanRepository.findValidPlanByConnectionId(connectionId); - ConnectionPartitionPlan returnValue = - validConnectionPlan.isPresent() - ? connectionPartitionPlanMapper.entityToModel(validConnectionPlan.get()) - : ConnectionPartitionPlan.builder().connectionId(connectionId).build(); - returnValue.setTablePartitionPlans(tablePartitionPlans); - return returnValue; - } finally { - try { - connectionSession.expire(); - } catch (Exception e) { - // eat exception - } + return findDatabasePartitionPlanByFlowInstanceId(flowInstanceId); } + // 通过 database 查询时,拉取全 RANGE 表,并返回已存在的分区计划 + Database database = databaseService.detail(databaseId); + Optional databasePartitionPlan = + databasePartitionPlanRepository.findValidPlanByDatabaseId(databaseId); + DatabasePartitionPlan returnValue = + databasePartitionPlan.isPresent() + ? databasePartitionPlanMapper.entityToModel(databasePartitionPlan.get()) + : DatabasePartitionPlan.builder().databaseId(databaseId).build(); + returnValue.setTablePartitionPlans(findDatabaseTablePartitionPlan(database)); + return returnValue; } /** * 确认策略:停用连接下原所有计划,生效当前 flowInstanceId 下的所有配置 */ @Transactional - public void updateTablePartitionPlan(ConnectionPartitionPlan connectionPartitionPlan) throws IOException { + public void updateTablePartitionPlan(DatabasePartitionPlan databasePartitionPlan) throws IOException { // 更新连接配置 - Optional connectionPartitionPlanEntity = - connectionPartitionPlanRepository.findByFlowInstanceId(connectionPartitionPlan.getFlowInstanceId()); - if (!connectionPartitionPlanEntity.isPresent()) + Optional databasePartitionPlanEntity = + databasePartitionPlanRepository.findByFlowInstanceId(databasePartitionPlan.getFlowInstanceId()); + if (!databasePartitionPlanEntity.isPresent()) return; - connectionPartitionPlanEntity.get().setInspectEnabled(connectionPartitionPlan.isInspectEnable()); - connectionPartitionPlanEntity.get() - .setInspectTriggerStrategy(connectionPartitionPlan.getInspectTriggerStrategy()); - connectionPartitionPlanEntity.get().setConfigEnabled(false); - connectionPartitionPlanRepository.saveAndFlush(connectionPartitionPlanEntity.get()); + databasePartitionPlanEntity.get().setInspectEnabled(databasePartitionPlan.isInspectEnable()); + databasePartitionPlanEntity.get() + .setInspectTriggerStrategy(databasePartitionPlan.getInspectTriggerStrategy()); + databasePartitionPlanEntity.get().setConfigEnabled(false); + databasePartitionPlanRepository.saveAndFlush(databasePartitionPlanEntity.get()); // 更新表分区配置 - List tablePartitionPlans = connectionPartitionPlan.getTablePartitionPlans(); - List tablePartitionPlanEntities = tablePartitionPlanRepository.findByFlowInstanceId( - connectionPartitionPlan.getFlowInstanceId()); + List tablePartitionPlans = databasePartitionPlan.getTablePartitionPlans(); + List tablePartitionPlanEntities = + tablePartitionPlanRepository.findValidPlanByDatabasePartitionPlanId( + databasePartitionPlan.getId()); List updateEntities = new LinkedList<>(); tablePartitionPlans.forEach(tablePartitionPlan -> { for (TablePartitionPlanEntity tablePartitionPlanEntity : tablePartitionPlanEntities) { // 未修改的直接生效 - tablePartitionPlanEntity.setFlowInstanceId(connectionPartitionPlan.getFlowInstanceId()); + tablePartitionPlanEntity.setFlowInstanceId(databasePartitionPlan.getFlowInstanceId()); tablePartitionPlanEntity.setIsConfigEnable(false); tablePartitionPlanEntity.setModifierId(authenticationFacade.currentUserId()); if (tablePartitionPlanEntity.getSchemaName().equals(tablePartitionPlan.getSchemaName()) @@ -164,99 +170,110 @@ public void updateTablePartitionPlan(ConnectionPartitionPlan connectionPartition // 更新配置 tablePartitionPlanRepository.saveAll(updateEntities); PartitionPlanTaskParameters taskParameters = new PartitionPlanTaskParameters(); - taskParameters.setConnectionPartitionPlan(connectionPartitionPlan); + taskParameters.setConnectionPartitionPlan(databasePartitionPlan); // 更新任务详情 Optional taskInstance = serviceTaskInstanceRepository.findByFlowInstanceId( - connectionPartitionPlan.getFlowInstanceId()); + databasePartitionPlan.getFlowInstanceId()); taskInstance.ifPresent(instance -> { TaskEntity taskEntity = taskService.detail(instance.getTargetTaskId()); taskEntity.setParametersJson(JsonUtils.toJson(taskParameters)); taskService.updateParametersJson(taskEntity); }); // 推进流程节点 - flowInstanceService.approve(connectionPartitionPlan.getFlowInstanceId(), "approve update partition plan", + flowInstanceService.approve(databasePartitionPlan.getFlowInstanceId(), "approve update partition plan", false); } /** * 查询当前连接下是否存在分区计划 */ - public boolean hasConnectionPartitionPlan(Long connectionId) { - Optional validConnectionPlan = - connectionPartitionPlanRepository.findValidPlanByConnectionId(connectionId); + public boolean hasConnectionPartitionPlan(Long databaseId) { + Optional validConnectionPlan = + databasePartitionPlanRepository.findValidPlanByDatabaseId(databaseId); return validConnectionPlan.isPresent(); } - public ConnectionPartitionPlan findTablePartitionPlanByFlowInstanceId(Long flowInstanceId) { - ConnectionPartitionPlan connectionPartitionPlan = new ConnectionPartitionPlan(); - Optional connectionPartitionPlanEntity = - connectionPartitionPlanRepository.findByFlowInstanceId(flowInstanceId); - if (connectionPartitionPlanEntity.isPresent()) { - connectionPartitionPlan = - connectionPartitionPlanMapper.entityToModel(connectionPartitionPlanEntity.get()); + public DatabasePartitionPlan findDatabasePartitionPlanByFlowInstanceId(Long flowInstanceId) { + Optional entity = + databasePartitionPlanRepository.findByFlowInstanceId(flowInstanceId); + DatabasePartitionPlan databasePartitionPlan = new DatabasePartitionPlan(); + if (entity.isPresent()) { + databasePartitionPlan = databasePartitionPlanMapper.entityToModel(entity.get()); + List tablePartitionPlans = tablePartitionPlanRepository + .findValidPlanByDatabasePartitionPlanId( + databasePartitionPlan.getId()) + .stream().map(tablePartitionPlanMapper::entityToModel).collect(Collectors.toList()); + databasePartitionPlan.setTablePartitionPlans(tablePartitionPlans); } - List tablePartitionPlanEntities = - tablePartitionPlanRepository.findByFlowInstanceId(flowInstanceId); - List tablePartitionPlans = tablePartitionPlanEntities.stream().map( - tablePartitionPlanMapper::entityToModel).collect( - Collectors.toList()); - connectionPartitionPlan.setTablePartitionPlans(tablePartitionPlans); - return connectionPartitionPlan; + return databasePartitionPlan; } - private List findConnectionAllTablePartitionPlan(Long connectionId, String tenantName, - DBSchemaAccessor accessor) { - List connectionValidPartitionPlans = - tablePartitionPlanRepository.findValidPlanByConnectionId(connectionId); - List dbTablePartitions = accessor.listTableRangePartitionInfo(tenantName); + private List findDatabaseTablePartitionPlan(Database database) { + // find exist table partition-plan config. + Map tableName2TablePartitionPlan = tablePartitionPlanRepository + .findValidPlanByDatabaseId( + database.getId()) + .stream().collect(Collectors.toMap(TablePartitionPlanEntity::getTableName, o -> o)); + + List dbTablePartitions = listTableRangePartitionInfo(database); List returnValue = new LinkedList<>(); - for (DBTablePartition dbTablePartition : dbTablePartitions) { - boolean hasPartitionPlan = false; - for (TablePartitionPlanEntity tablePlan : connectionValidPartitionPlans) { - if (dbTablePartition.getSchemaName().equals(tablePlan.getSchemaName()) - && dbTablePartition.getTableName().equals(tablePlan.getTableName())) { - TablePartitionPlan tablePartitionPlan = tablePartitionPlanMapper.entityToModel(tablePlan); - tablePartitionPlan.setPartitionCount(dbTablePartition.getPartitionOption().getPartitionsNum()); - returnValue.add(tablePartitionPlan); - hasPartitionPlan = true; - break; - } - } - if (!hasPartitionPlan) { - returnValue.add(TablePartitionPlan.builder() - .schemaName(dbTablePartition.getSchemaName()) - .tableName(dbTablePartition.getTableName()) - .partitionCount(dbTablePartition.getPartitionOption().getPartitionsNum()).build()); + dbTablePartitions.forEach(dbTablePartition -> { + TablePartitionPlan tablePartitionPlan = + tableName2TablePartitionPlan.containsKey(dbTablePartition.getTableName()) + ? tablePartitionPlanMapper + .entityToModel(tableName2TablePartitionPlan.get(dbTablePartition.getTableName())) + : TablePartitionPlan.builder() + .schemaName(dbTablePartition.getSchemaName()) + .tableName(dbTablePartition.getTableName()).build(); + tablePartitionPlan.setPartitionCount(dbTablePartition.getPartitionOption().getPartitionsNum()); + returnValue.add(tablePartitionPlan); + + }); + return returnValue; + } + + private List listTableRangePartitionInfo(Database database) { + ConnectionConfig connectionConfig = database.getDataSource(); + DefaultConnectSessionFactory factory = new DefaultConnectSessionFactory(connectionConfig); + ConnectionSession connectionSession = factory.generateSession(); + DBSchemaAccessor accessor = DBSchemaAccessors.create(connectionSession); + List dbTablePartitions; + try { + dbTablePartitions = accessor.listTableRangePartitionInfo( + database.getDataSource().getTenantName()).stream() + .filter(o -> o.getSchemaName().equals(database.getName())).collect( + Collectors.toList()); + } finally { + try { + connectionSession.expire(); + } catch (Exception e) { + // eat exception } } - return returnValue; + return dbTablePartitions; } - /** - * 创建分区计划时插入,审批通过后生效 - */ @Transactional - public void addTablePartitionPlan(ConnectionPartitionPlan connectionPartitionPlan, Long flowInstanceId) { - connectionPartitionPlan.setFlowInstanceId(flowInstanceId); - connectionPartitionPlan.getTablePartitionPlans() - .forEach(tablePartitionPlan -> tablePartitionPlan.setFlowInstanceId(flowInstanceId)); + public void createDatabasePartitionPlan(DatabasePartitionPlan databasePartitionPlan) { // 新增连接分区配置 long currentUserId = authenticationFacade.currentUserId(); long currentOrganizationId = authenticationFacade.currentOrganizationId(); - ConnectionPartitionPlanEntity connectionPartitionPlanEntity = - connectionPartitionPlanMapper.modelToEntity(connectionPartitionPlan); - connectionPartitionPlanEntity.setCreatorId(currentUserId); - connectionPartitionPlanEntity.setModifierId(currentUserId); - connectionPartitionPlanEntity.setOrganizationId(currentOrganizationId); - connectionPartitionPlanEntity.setConfigEnabled(false); - connectionPartitionPlanRepository.save(connectionPartitionPlanEntity); + DatabasePartitionPlanEntity databasePartitionPlanEntity = + databasePartitionPlanMapper.modelToEntity(databasePartitionPlan); + databasePartitionPlanEntity.setCreatorId(currentUserId); + databasePartitionPlanEntity.setModifierId(currentUserId); + databasePartitionPlanEntity.setOrganizationId(currentOrganizationId); + databasePartitionPlanEntity.setConfigEnabled(false); + databasePartitionPlanEntity = databasePartitionPlanRepository.save(databasePartitionPlanEntity); // 新增分区计划 List entities = new LinkedList<>(); - for (TablePartitionPlan tablePlan : connectionPartitionPlan.getTablePartitionPlans()) { + for (TablePartitionPlan tablePlan : databasePartitionPlan.getTablePartitionPlans()) { TablePartitionPlanEntity tablePlanEntity = tablePartitionPlanMapper.modelToEntity(tablePlan); - tablePlanEntity.setConnectionId(connectionPartitionPlan.getConnectionId()); - tablePlanEntity.setFlowInstanceId(connectionPartitionPlanEntity.getFlowInstanceId()); + tablePlanEntity.setFlowInstanceId(databasePartitionPlanEntity.getFlowInstanceId()); + tablePlanEntity.setConnectionId(databasePartitionPlan.getConnectionId()); + tablePlanEntity.setDatabaseId(databasePartitionPlan.getDatabaseId()); + tablePlanEntity.setDatabasePartitionPlanId(databasePartitionPlanEntity.getId()); tablePlanEntity.setOrganizationId(currentOrganizationId); tablePlanEntity.setCreatorId(currentUserId); tablePlanEntity.setModifierId(currentUserId); @@ -265,5 +282,90 @@ public void addTablePartitionPlan(ConnectionPartitionPlan connectionPartitionPla entities.add(tablePlanEntity); } tablePartitionPlanRepository.saveAll(entities); + enableDatabasePartitionPlan(databasePartitionPlanEntity); + try { + // Create partition plan job. + ScheduleEntity scheduleEntity = createDatabasePartitionPlanSchedule( + databasePartitionPlanEntity); + // Bind partition plan job to entity. + databasePartitionPlanRepository.updateScheduleIdById(databasePartitionPlanEntity.getId(), + scheduleEntity.getId()); + } catch (Exception e) { + throw new UnExpectedException("Create database partition plan job failed.", e); + } + } + + private void enableDatabasePartitionPlan(DatabasePartitionPlanEntity databasePartitionPlan) { + Optional oldPartitionPlan = + databasePartitionPlanRepository.findValidPlanByDatabaseId(databasePartitionPlan.getDatabaseId()); + if (oldPartitionPlan.isPresent()) { + try { + log.info("Found a valid plan in this database,start to disable it."); + // Cancel previous plan. + flowInstanceService.cancelNotCheckPermission(oldPartitionPlan.get().getFlowInstanceId()); + // Stop previous job. + if (oldPartitionPlan.get().getScheduleId() != null) { + try { + ScheduleEntity scheduleEntity = scheduleService.nullSafeGetById( + oldPartitionPlan.get().getScheduleId()); + scheduleService.terminate(scheduleEntity); + log.info("Terminate old partition plan job success,scheduleId={}", + oldPartitionPlan.get().getScheduleId()); + } catch (Exception e) { + log.warn("Terminate old partition plan job failed,scheduleId={}", + oldPartitionPlan.get().getScheduleId()); + } + } + } catch (Exception e) { + log.warn("The previous plan has been abandoned,but stop previous instance failed.", e); + } + } + databasePartitionPlanRepository.disableConfigByDatabaseId(databasePartitionPlan.getDatabaseId()); + tablePartitionPlanRepository.disableConfigByDatabaseId(databasePartitionPlan.getDatabaseId()); + databasePartitionPlanRepository.enableConfigByFlowInstanceId(databasePartitionPlan.getFlowInstanceId()); + tablePartitionPlanRepository.enableConfigByDatabasePartitionPlanId(databasePartitionPlan.getId()); + } + + /** + * Create a quartz job to execute partition-plan. + */ + @Transactional + public ScheduleEntity createDatabasePartitionPlanSchedule(DatabasePartitionPlanEntity databasePartitionPlan) + throws SchedulerException, ClassNotFoundException { + ScheduleEntity scheduleEntity = new ScheduleEntity(); + scheduleEntity.setDatabaseId(databasePartitionPlan.getDatabaseId()); + scheduleEntity.setStatus(ScheduleStatus.ENABLED); + scheduleEntity.setCreatorId(authenticationFacade.currentUserId()); + scheduleEntity.setModifierId(authenticationFacade.currentUserId()); + scheduleEntity.setOrganizationId(authenticationFacade.currentOrganizationId()); + scheduleEntity.setAllowConcurrent(false); + scheduleEntity.setMisfireStrategy(MisfireStrategy.MISFIRE_INSTRUCTION_DO_NOTHING); + scheduleEntity.setJobType(JobType.PARTITION_PLAN); + + TriggerConfig triggerConfig = new TriggerConfig(); + triggerConfig.setTriggerStrategy(TriggerStrategy.CRON); + triggerConfig.setCronExpression(defaultScheduleCron); + scheduleEntity.setTriggerConfigJson(JsonUtils.toJson(triggerConfig)); + + Database database = databaseService.detail(scheduleEntity.getDatabaseId()); + scheduleEntity.setProjectId(database.getProject().getId()); + scheduleEntity.setConnectionId(database.getDataSource().getId()); + scheduleEntity.setDatabaseName(database.getName()); + PartitionPlanJobParameters jobParameters = new PartitionPlanJobParameters(); + jobParameters.setDatabasePartitionPlanId(databasePartitionPlan.getId()); + scheduleEntity.setJobParametersJson(JsonUtils.toJson(jobParameters)); + scheduleEntity = scheduleService.create(scheduleEntity); + // Bind job to databasePartitionPlan. + databasePartitionPlan.setScheduleId(scheduleEntity.getId()); + scheduleService.enable(scheduleEntity); + return scheduleEntity; + } + + public DatabasePartitionPlanEntity getDatabasePartitionPlanById(Long id) { + return databasePartitionPlanRepository.findById(id).orElse(null); + } + + public List getValidTablePlanByDatabasePartitionPlanId(Long databaseId) { + return tablePartitionPlanRepository.findValidPlanByDatabasePartitionPlanId(databaseId); } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanSubFlowThread.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanSubFlowThread.java index 658ab28ed8..709ff0aba9 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanSubFlowThread.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanSubFlowThread.java @@ -47,8 +47,8 @@ public class PartitionPlanSubFlowThread extends Thread { @Setter protected long taskId; - public PartitionPlanSubFlowThread(String schemaName, Long parentFlowInstanceId, - Long connectionId, List sqls, FlowInstanceService flowInstanceService, User user) { + public PartitionPlanSubFlowThread(Long parentFlowInstanceId, + Long databaseId, List sqls, FlowInstanceService flowInstanceService, User user) { DatabaseChangeParameters taskParameters = new DatabaseChangeParameters(); taskParameters.setErrorStrategy("ABORT"); StringBuilder sqlContent = new StringBuilder(); @@ -59,10 +59,9 @@ public PartitionPlanSubFlowThread(String schemaName, Long parentFlowInstanceId, CreateFlowInstanceReq flowInstanceReq = new CreateFlowInstanceReq(); flowInstanceReq.setParameters(taskParameters); flowInstanceReq.setTaskType(TaskType.ASYNC); - flowInstanceReq.setConnectionId(connectionId); + flowInstanceReq.setDatabaseId(databaseId); flowInstanceReq.setParentFlowInstanceId(parentFlowInstanceId); flowInstanceReq.setExecutionStrategy(FlowTaskExecutionStrategy.AUTO); - flowInstanceReq.setDatabaseName(schemaName); this.createFlowInstanceReq = flowInstanceReq; this.flowInstanceService = flowInstanceService; this.user = user; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanTaskService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanTaskService.java index 435b575b1a..dffc1e2ad2 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanTaskService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanTaskService.java @@ -22,24 +22,25 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; import com.oceanbase.odc.core.authority.util.SkipAuthorize; import com.oceanbase.odc.core.session.ConnectionSession; -import com.oceanbase.odc.metadb.partitionplan.ConnectionPartitionPlanEntity; -import com.oceanbase.odc.metadb.partitionplan.ConnectionPartitionPlanRepository; +import com.oceanbase.odc.metadb.partitionplan.DatabasePartitionPlanRepository; import com.oceanbase.odc.metadb.partitionplan.TablePartitionPlanEntity; import com.oceanbase.odc.metadb.partitionplan.TablePartitionPlanRepository; -import com.oceanbase.odc.service.connection.ConnectionService; +import com.oceanbase.odc.service.connection.database.DatabaseService; import com.oceanbase.odc.service.connection.model.ConnectionConfig; import com.oceanbase.odc.service.db.browser.DBSchemaAccessors; import com.oceanbase.odc.service.flow.FlowInstanceService; -import com.oceanbase.odc.service.iam.model.User; +import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; import com.oceanbase.odc.service.session.factory.DefaultConnectSessionFactory; +import com.oceanbase.tools.dbbrowser.model.DBTable; import com.oceanbase.tools.dbbrowser.model.DBTablePartition; import com.oceanbase.tools.dbbrowser.model.DBTablePartitionDefinition; import com.oceanbase.tools.dbbrowser.schema.DBSchemaAccessor; @@ -62,17 +63,25 @@ public class PartitionPlanTaskService { @Autowired private TablePartitionPlanRepository tablePartitionPlanRepository; @Autowired - private ConnectionPartitionPlanRepository connectionPartitionPlanRepository; + private DatabasePartitionPlanRepository databasePartitionPlanRepository; @Autowired private FlowInstanceService flowInstanceService; @Autowired - private ConnectionService connectionService; + private DatabaseService databaseService; - public void executePartitionPlan(Long connectionId, Long flowInstanceId, List tablePlans, - User taskCreator) - throws Exception { + @Autowired + private AuthenticationFacade authenticationFacade; - ConnectionConfig conn = connectionService.getForConnect(connectionId); + public void executePartitionPlan(Long flowInstanceId, List tablePlans) + throws Exception { + Set databaseIds = tablePlans.stream().map(TablePartitionPlanEntity::getDatabaseId).collect( + Collectors.toSet()); + Optional databaseId = databaseIds.stream().findFirst(); + if (!databaseId.isPresent() || databaseIds.size() != 1) { + log.warn("Table plans belongs to multi database,its not allow here."); + return; + } + ConnectionConfig conn = databaseService.findDataSourceForConnectById(databaseId.get()); DefaultConnectSessionFactory factory = new DefaultConnectSessionFactory(conn); ConnectionSession connectionSession = factory.generateSession(); try { @@ -81,16 +90,18 @@ public void executePartitionPlan(Long connectionId, Long flowInstanceId, List addPartitionSqls = createAddPartitionDDL(accessor, tablePlans); if (!addPartitionSqls.isEmpty()) { PartitionPlanSubFlowThread partitionPlanSubFlowThread = - new PartitionPlanSubFlowThread(conn.getDefaultSchema(), flowInstanceId, - conn.getId(), addPartitionSqls, flowInstanceService, taskCreator); + new PartitionPlanSubFlowThread(flowInstanceId, + databaseId.get(), addPartitionSqls, flowInstanceService, + authenticationFacade.currentUser()); partitionPlanSubFlowThread.start(); } // Task 2:查找过期分区,并发起数据库变更流程 List dropPartitionSqls = createDropPartitionDDL(accessor, tablePlans); if (!dropPartitionSqls.isEmpty()) { PartitionPlanSubFlowThread partitionPlanSubFlowThread = - new PartitionPlanSubFlowThread(conn.getDefaultSchema(), flowInstanceId, - conn.getId(), dropPartitionSqls, flowInstanceService, taskCreator); + new PartitionPlanSubFlowThread(flowInstanceId, + databaseId.get(), dropPartitionSqls, flowInstanceService, + authenticationFacade.currentUser()); partitionPlanSubFlowThread.start(); } } finally { @@ -111,8 +122,8 @@ private List createAddPartitionDDL(DBSchemaAccessor accessor, List definitions = partition.getPartitionDefinitions(); + DBTable table = getTable(accessor, tablePlan.getSchemaName(), tablePlan.getTableName()); + List definitions = table.getPartition().getPartitionDefinitions(); // 分区计划生效中,但表被删除 if (definitions.isEmpty()) { log.warn("No partition found,table={}.{}", tablePlan.getSchemaName(), tablePlan.getTableName()); @@ -129,20 +140,22 @@ private List createAddPartitionDDL(DBSchemaAccessor accessor, List
createDropPartitionDDL(DBSchemaAccessor accessor, List definitions = partition.getPartitionDefinitions(); + DBTable table = getTable(accessor, tablePlan.getSchemaName(), tablePlan.getTableName()); + List definitions = table.getPartition().getPartitionDefinitions(); if (definitions.isEmpty()) { continue; } - PartitionExpressionType expressionType = PartitionPlanFunction.getPartitionExpressionType(partition); + PartitionExpressionType expressionType = PartitionPlanFunction.getPartitionExpressionType(table); // 查询结果按升序排列,从左开始查找到第一个未过期的分区则停止 for (DBTablePartitionDefinition definition : definitions) { if (expressionType == PartitionExpressionType.OTHER) { - log.warn("Unsupported partition expression!{}.{}:{}", partition.getSchemaName(), - partition.getTableName(), partition.getPartitionOption().getExpression()); + log.warn("Unsupported partition expression!{}.{}:{}", table.getSchemaName(), + table.getName(), table.getPartition().getPartitionOption().getExpression()); break; } String maxValue = definition.getMaxValues().get(0); @@ -225,33 +238,12 @@ private List createDropPartitionDDL(DBSchemaAccessor accessor, List oldPartitionPlan = - connectionPartitionPlanRepository.findValidPlanByConnectionId(connectionId); - // 如果存在计划,先终止原有计划,并更新原任务状态。 - if (oldPartitionPlan.isPresent()) { - try { - flowInstanceService.cancelNotCheckPermission(oldPartitionPlan.get().getFlowInstanceId()); - } catch (Exception e) { - log.warn("The previous plan has been abandoned,but change status failed."); - } - } - // 还是改为每次都关闭所有生效数据,避免脏数据带来的异常 - connectionPartitionPlanRepository.disableConfigByConnectionId(connectionId); - tablePartitionPlanRepository.disableConfigByConnectionId(connectionId); - connectionPartitionPlanRepository.enableConfigByFlowInstanceId(flowInstanceId); - tablePartitionPlanRepository.enableConfigByFlowInstanceId(flowInstanceId); - } - private String getCreateSql(DBTablePartition partition) { List definitions = partition.getPartitionDefinitions(); DBTablePartitionDefinition right = definitions.get(definitions.size() - 1); @@ -270,4 +262,16 @@ private String getDeleteSql(String schema, String table, String part) { .append(");"); return sqlBuilder.toString(); } + + private DBTable getTable(DBSchemaAccessor accessor, String schemaName, String tableName) { + DBTable table = new DBTable(); + table.setSchemaName(schemaName); + table.setName(tableName); + DBTablePartition partition = accessor.getPartition(schemaName, tableName); + partition.setTableName(tableName); + partition.setSchemaName(schemaName); + table.setPartition(partition); + table.setColumns(accessor.listTableColumns(schemaName, tableName)); + return table; + } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/TablePartitionPlanMapper.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/TablePartitionPlanMapper.java index 4220ac309a..6538a02d62 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/TablePartitionPlanMapper.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/TablePartitionPlanMapper.java @@ -31,6 +31,7 @@ public TablePartitionPlan entityToModel(TablePartitionPlanEntity entity) { .partitionIntervalUnit(entity.getPartitionIntervalUnit()) .expirePeriodUnit(entity.getExpirePeriodUnit()).build(); return TablePartitionPlan.builder() + .databasePartitionPlanId(entity.getDatabasePartitionPlanId()) .tableName(entity.getTableName()) .flowInstanceId(entity.getFlowInstanceId()) .schemaName(entity.getSchemaName()) @@ -39,6 +40,7 @@ public TablePartitionPlan entityToModel(TablePartitionPlanEntity entity) { public TablePartitionPlanEntity modelToEntity(TablePartitionPlan model) { return TablePartitionPlanEntity.builder() + .databasePartitionPlanId(model.getDatabasePartitionPlanId()) .isConfigEnable(model.getDetail().getIsAutoPartition()) .expirePeriod(model.getDetail().getExpirePeriod()) .expirePeriodUnit(model.getDetail().getExpirePeriodUnit()) diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/ConnectionPartitionPlan.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/DatabasePartitionPlan.java similarity index 81% rename from server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/ConnectionPartitionPlan.java rename to server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/DatabasePartitionPlan.java index 13828d0c5e..cc333a9729 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/ConnectionPartitionPlan.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/DatabasePartitionPlan.java @@ -18,6 +18,9 @@ import java.io.Serializable; import java.util.List; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonProperty.Access; + import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -33,10 +36,13 @@ @Builder @NoArgsConstructor @AllArgsConstructor -public class ConnectionPartitionPlan implements Serializable { +public class DatabasePartitionPlan implements Serializable { + @JsonProperty(access = Access.READ_ONLY) + private Long id; private Long flowInstanceId; private Long connectionId; + private Long databaseId; private boolean inspectEnable; private InspectTriggerStrategy inspectTriggerStrategy; private List tablePartitionPlans; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/PartitionPlanTaskParameters.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/PartitionPlanTaskParameters.java index b71874bb97..1c6ce29737 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/PartitionPlanTaskParameters.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/PartitionPlanTaskParameters.java @@ -28,5 +28,5 @@ */ @Data public class PartitionPlanTaskParameters implements Serializable, TaskParameters { - private ConnectionPartitionPlan connectionPartitionPlan; + private DatabasePartitionPlan connectionPartitionPlan; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/TablePartitionPlan.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/TablePartitionPlan.java index c5f1b3bec5..3e1f51ce16 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/TablePartitionPlan.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/TablePartitionPlan.java @@ -33,6 +33,7 @@ @AllArgsConstructor public class TablePartitionPlan implements Serializable { + private Long databasePartitionPlanId; private Long flowInstanceId; private String schemaName; private String tableName; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java index 270baa5cd9..493702d6d6 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java @@ -28,6 +28,7 @@ import com.oceanbase.odc.service.schedule.job.DataDeleteJob; import com.oceanbase.odc.service.schedule.job.OdcJob; import com.oceanbase.odc.service.schedule.job.OnlineSchemaChangeCompleteJob; +import com.oceanbase.odc.service.schedule.job.PartitionPlanJob; import com.oceanbase.odc.service.schedule.job.SqlPlanJob; import com.oceanbase.odc.service.schedule.model.JobType; @@ -79,6 +80,8 @@ public OdcJob getOdcJob(JobExecutionContext context) { return new DataDeleteJob(); case ONLINE_SCHEMA_CHANGE_COMPLETE: return new OnlineSchemaChangeCompleteJob(); + case PARTITION_PLAN: + return new PartitionPlanJob(); default: throw new UnsupportedException(); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/rollbackplan/RollbackGeneratorFactory.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/rollbackplan/RollbackGeneratorFactory.java index 8226f85b52..57d2bdbde5 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/rollbackplan/RollbackGeneratorFactory.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/rollbackplan/RollbackGeneratorFactory.java @@ -49,7 +49,7 @@ public static GenerateRollbackPlan create(@NonNull String sql, @NonNull Rollback ConnectType connectType = connectionSession.getConnectType(); SyncJdbcExecutor syncJdbcExecutor = connectionSession.getSyncJdbcExecutor(ConnectionSessionConstants.BACKEND_DS_KEY); - if (connectType.getDialectType().isOBMysql()) { + if (connectType.getDialectType().isMysql()) { OBMySQLParser parser = new OBMySQLParser(); Statement statement = parser.parse(new StringReader(sql)); if (statement instanceof Update) { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/PartitionPlanPreprocessor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/PartitionPlanPreprocessor.java index 1a909524a0..52bd91629c 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/PartitionPlanPreprocessor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/PartitionPlanPreprocessor.java @@ -15,13 +15,15 @@ */ package com.oceanbase.odc.service.schedule.flowtask; -import org.springframework.beans.factory.annotation.Autowired; +import java.util.List; import com.oceanbase.odc.core.shared.constant.TaskType; import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq; import com.oceanbase.odc.service.flow.processor.FlowTaskPreprocessor; import com.oceanbase.odc.service.flow.processor.Preprocessor; -import com.oceanbase.odc.service.partitionplan.PartitionPlanService; +import com.oceanbase.odc.service.partitionplan.model.DatabasePartitionPlan; +import com.oceanbase.odc.service.partitionplan.model.PartitionPlanTaskParameters; +import com.oceanbase.odc.service.partitionplan.model.TablePartitionPlan; /** * @Author:tinker @@ -31,11 +33,26 @@ @FlowTaskPreprocessor(type = TaskType.PARTITION_PLAN) public class PartitionPlanPreprocessor implements Preprocessor { - @Autowired - private PartitionPlanService partitionPlanService; + /** + * Max partition count for OB MySQL mode, refer to + * 分区概述 + */ + private static final long MAX_PARTITION_COUNT = 8192; @Override public void process(CreateFlowInstanceReq req) { - // TODO + + PartitionPlanTaskParameters parameters = (PartitionPlanTaskParameters) req.getParameters(); + DatabasePartitionPlan databasePartitionPlan = parameters.getConnectionPartitionPlan(); + databasePartitionPlan.setConnectionId(req.getConnectionId()); + List tablePartitionPlans = parameters.getConnectionPartitionPlan().getTablePartitionPlans(); + for (TablePartitionPlan tablePartitionPlan : tablePartitionPlans) { + if (tablePartitionPlan.getPartitionCount() > MAX_PARTITION_COUNT + && tablePartitionPlan.getDetail().getIsAutoPartition()) { + throw new RuntimeException( + String.format("Can not create more partition. TableName: %s,PartitionCount: %s", + tablePartitionPlan.getTableName(), tablePartitionPlan.getPartitionCount())); + } + } } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/PartitionPlanJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/PartitionPlanJob.java new file mode 100644 index 0000000000..14cd3fe861 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/PartitionPlanJob.java @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.odc.service.schedule.job; + +import java.util.List; + +import org.quartz.JobExecutionContext; +import org.quartz.SchedulerException; + +import com.oceanbase.odc.common.json.JsonUtils; +import com.oceanbase.odc.metadb.partitionplan.DatabasePartitionPlanEntity; +import com.oceanbase.odc.metadb.partitionplan.TablePartitionPlanEntity; +import com.oceanbase.odc.metadb.schedule.ScheduleEntity; +import com.oceanbase.odc.service.common.util.SpringContextUtil; +import com.oceanbase.odc.service.partitionplan.PartitionPlanService; +import com.oceanbase.odc.service.partitionplan.PartitionPlanTaskService; +import com.oceanbase.odc.service.quartz.util.ScheduleTaskUtils; +import com.oceanbase.odc.service.schedule.ScheduleService; +import com.oceanbase.odc.service.schedule.model.PartitionPlanJobParameters; +import com.oceanbase.tools.migrator.common.exception.UnExpectedException; + +import lombok.extern.slf4j.Slf4j; + +/** + * @Author:tinker + * @Date: 2023/8/21 10:37 + * @Descripition: + */ + +@Slf4j +public class PartitionPlanJob implements OdcJob { + + private final PartitionPlanTaskService partitionPlanTaskService; + + private final PartitionPlanService partitionPlanService; + + private final ScheduleService scheduleService; + + + + public PartitionPlanJob() { + + partitionPlanTaskService = SpringContextUtil.getBean(PartitionPlanTaskService.class); + + partitionPlanService = SpringContextUtil.getBean(PartitionPlanService.class); + + scheduleService = SpringContextUtil.getBean(ScheduleService.class); + } + + @Override + public void execute(JobExecutionContext context) { + Long scheduleId = ScheduleTaskUtils.getScheduleId(context); + ScheduleEntity scheduleEntity; + try { + scheduleEntity = scheduleService.nullSafeGetById(scheduleId); + } catch (Exception e) { + log.warn("Schedule not found,scheduleId={}", scheduleId); + return; + } + PartitionPlanJobParameters jobParameters = JsonUtils.fromJson(scheduleEntity.getJobParametersJson(), + PartitionPlanJobParameters.class); + + DatabasePartitionPlanEntity databasePartitionPlan = partitionPlanService.getDatabasePartitionPlanById( + jobParameters.getDatabasePartitionPlanId()); + + List tablePartitionPlans = + partitionPlanService + .getValidTablePlanByDatabasePartitionPlanId(jobParameters.getDatabasePartitionPlanId()); + + if (!databasePartitionPlan.isConfigEnabled() || tablePartitionPlans.isEmpty()) { + log.info( + "database partition plan is disable or no table need to process,start to stop partition-plan job,scheduleId={}", + scheduleId); + try { + scheduleService.terminate(scheduleEntity); + } catch (SchedulerException e) { + log.warn("Stop partition plan job failed.", e); + throw new RuntimeException(e); + } + } + try { + partitionPlanTaskService.executePartitionPlan(databasePartitionPlan.getFlowInstanceId(), + tablePartitionPlans); + } catch (Exception e) { + log.warn("Create partition-plan database change task failed.", e); + } + } + + @Override + public void interrupt() { + throw new UnExpectedException(); + } +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/JobType.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/JobType.java index a1ef67b64e..049408fc3f 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/JobType.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/JobType.java @@ -23,6 +23,8 @@ public enum JobType { SQL_PLAN, + PARTITION_PLAN, + DATA_ARCHIVE, DATA_ARCHIVE_DELETE, diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/PartitionPlanJobParameters.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/PartitionPlanJobParameters.java new file mode 100644 index 0000000000..75682f326a --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/PartitionPlanJobParameters.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.odc.service.schedule.model; + +import lombok.Data; + +/** + * @Author:tinker + * @Date: 2023/8/23 14:26 + * @Descripition: + */ + +@Data +public class PartitionPlanJobParameters { + private Long databasePartitionPlanId; +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/factory/OBConsoleDataSourceFactory.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/factory/OBConsoleDataSourceFactory.java index ab3f88b96c..5b4433bc9a 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/factory/OBConsoleDataSourceFactory.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/factory/OBConsoleDataSourceFactory.java @@ -172,10 +172,6 @@ public static Map getJdbcParams(@NonNull ConnectionConfig connec } else { jdbcUrlParams.put("useSSL", "false"); } - DialectType type = connectionConfig.getDialectType(); - if (type != null && type.isOceanbase()) { - jdbcUrlParams.put("enableFullLinkTrace", "true"); - } return jdbcUrlParams; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/BaseSqlChecker.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/BaseSqlChecker.java index e7fdf441d3..d323da9190 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/BaseSqlChecker.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/BaseSqlChecker.java @@ -52,7 +52,7 @@ public BaseSqlChecker(@NonNull DialectType dialectType, String delimiter) { @Override public List check(@NonNull String sqlScript) { List sqls; - if (dialectType.isOBMysql()) { + if (dialectType.isMysql()) { sqls = splitByCommentProcessor(sqlScript); } else if (dialectType == DialectType.OB_ORACLE) { if (DEFAULT_DELIMITER.equals(this.delimiter)) { diff --git a/server/plugins/connect-plugin-mysql/src/main/java/com/oceanbase/odc/plugin/connect/mysql/MySQLConnectionExtension.java b/server/plugins/connect-plugin-mysql/src/main/java/com/oceanbase/odc/plugin/connect/mysql/MySQLConnectionExtension.java index 43396e28bd..08b056c600 100644 --- a/server/plugins/connect-plugin-mysql/src/main/java/com/oceanbase/odc/plugin/connect/mysql/MySQLConnectionExtension.java +++ b/server/plugins/connect-plugin-mysql/src/main/java/com/oceanbase/odc/plugin/connect/mysql/MySQLConnectionExtension.java @@ -21,6 +21,7 @@ import java.sql.SQLException; import java.util.Collections; import java.util.List; +import java.util.Map; import org.pf4j.Extension; @@ -83,4 +84,12 @@ protected HostAddress parseJdbcUrl(String jdbcUrl) throws SQLException { return new HostAddress(url.getHost(), url.getPort()); } + @Override + protected Map appendDefaultJdbcUrlParameters(Map jdbcUrlParams) { + if (!jdbcUrlParams.containsKey("tinyInt1isBit")) { + jdbcUrlParams.put("tinyInt1isBit", "false"); + } + return jdbcUrlParams; + } + } diff --git a/server/plugins/connect-plugin-ob-mysql/src/main/java/com/oceanbase/odc/plugin/connect/obmysql/OBMySQLConnectionExtension.java b/server/plugins/connect-plugin-ob-mysql/src/main/java/com/oceanbase/odc/plugin/connect/obmysql/OBMySQLConnectionExtension.java index 2a2b8623d1..6ace2a469f 100644 --- a/server/plugins/connect-plugin-ob-mysql/src/main/java/com/oceanbase/odc/plugin/connect/obmysql/OBMySQLConnectionExtension.java +++ b/server/plugins/connect-plugin-ob-mysql/src/main/java/com/oceanbase/odc/plugin/connect/obmysql/OBMySQLConnectionExtension.java @@ -104,6 +104,7 @@ public TestResult test(String jdbcUrl, String username, String password, int que } protected String getJdbcUrlParameters(Map jdbcUrlParams) { + jdbcUrlParams = appendDefaultJdbcUrlParameters(jdbcUrlParams); if (CollectionUtils.isEmpty(jdbcUrlParams)) { return null; } @@ -111,6 +112,13 @@ protected String getJdbcUrlParameters(Map jdbcUrlParams) { .collect(Collectors.joining("&")); } + protected Map appendDefaultJdbcUrlParameters(Map jdbcUrlParams) { + if (!jdbcUrlParams.containsKey("enableFullLinkTrace")) { + jdbcUrlParams.put("enableFullLinkTrace", "true"); + } + return jdbcUrlParams; + } + private TestResult test(String jdbcUrl, Properties properties, int queryTimeout) { HostAddress hostAddress; try {