diff --git a/.github/workflows/build_artifact.yaml b/.github/workflows/build_artifact.yaml deleted file mode 100644 index d39359dfbe..0000000000 --- a/.github/workflows/build_artifact.yaml +++ /dev/null @@ -1,276 +0,0 @@ -### -# This workflow is used for quickly building artifacts -# Triggers: -# 1. Manual trigger -# Jobs: -# 1. Calculate Version Number -# 2. Build Web Artifact (include front resource and only x86_64 platform for now) -# 3. Build Client Artifact -### - -name: Build Artifact -run-name: Build Artifact triggered by ${{ github.actor }} 🛠️ - -on: - workflow_dispatch: - inputs: - build_jar: - description: "Build jar package" - required: true - type: boolean - default: false - build_rpm: - description: "Build rpm package" - required: true - type: boolean - default: false - build_docker: - description: "Build docker image" - required: true - type: boolean - default: false - build_client: - description: "Build client" - required: true - type: boolean - default: false - rpm_release: - description: "Rpm release number" - required: false - default: '' - type: string - image_tag: - description: "Docker image tag" - required: false - default: '' - type: string - -env: - ODC_CURRENT_BRANCH: ${{ github.ref_name }} - -jobs: - calculate-version: - name: Calculate Version Number - runs-on: ubuntu-latest - outputs: - odc_rpm_release_number: ${{ steps.calculate_version.outputs.odc_rpm_release_number }} - odc_docker_image_tag: ${{ steps.calculate_version.outputs.odc_docker_image_tag }} - steps: - - name: Checkout workspace - uses: actions/checkout@v3 - - name: Calculate version number - id: calculate_version - run: | - odc_rpm_release_number=$(date +%Y%m%d) - if [[ -n "${{ inputs.rpm_release }}" ]]; then odc_rpm_release_number="${{ inputs.rpm_release }}"; fi - echo "odc_rpm_release_number=${odc_rpm_release_number}" >> $GITHUB_OUTPUT - echo "odc_rpm_release_number=${odc_rpm_release_number}" - branch_match_regex="^(((dev/)?[0-9]\\.[0-9]\\.([0-9]{1,2}|x))|(release\\S*))$" - tag_prefix=`[[ "${{ env.ODC_CURRENT_BRANCH }}" =~ ${branch_match_regex} ]] && echo "test" || echo "dev"` - odc_docker_image_tag="${tag_prefix}-$(cat distribution/odc-server-VER.txt)-${odc_rpm_release_number}" - if [[ -n "${{ inputs.image_tag }}" ]]; then odc_docker_image_tag="${{ inputs.image_tag }}"; fi - echo "odc_docker_image_tag=${odc_docker_image_tag}" >> $GITHUB_OUTPUT - echo "odc_docker_image_tag=${odc_docker_image_tag}" - - build-web-x86_64: - name: Build Web Artifact (x86_64) - needs: [ calculate-version ] - runs-on: ubuntu-latest - env: - odc_rpm_release_number: ${{ needs.calculate-version.outputs.odc_rpm_release_number }} - odc_docker_image_tag: ${{ needs.calculate-version.outputs.odc_docker_image_tag }} - steps: - - name: Checkout workspace - uses: actions/checkout@v3 - with: - submodules: "recursive" - - name: Setup JDK 8 - uses: actions/setup-java@v3 - with: - java-version: "8" - distribution: "temurin" - cache: maven - - name: Setup node 16 - uses: actions/setup-node@v3 - with: - node-version: "16" - - name: Build front static resources - if: ${{ github.event.inputs.build_jar == 'true' || github.event.inputs.build_rpm == 'true' || github.event.inputs.build_docker == 'true'}} - run: | - echo "Current directory: "`pwd` - echo "Start build front static resources" - pushd client - echo "Run npm install pnpm -g" - npm install pnpm -g - echo "Run pnpm install" - pnpm install - echo "Run npm run build:odc" - npm run build:odc - popd - echo "Build front static resources success" - echo "Start copy resources files" - static_resources_path="server/odc-server/src/main/resources/static" - if [ ! -d "${static_resources_path}" ]; then - echo "mkdir -p ${static_resources_path}" - mkdir -p "${static_resources_path}" - fi - rm --force --recursive --verbose ${static_resources_path}/* - cp --force --recursive --verbose client/dist/renderer/* ${static_resources_path} - echo "Copy resources files success" - - name: Set release version - id: set_release_version - run: | - main_version="$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout | cut -d - -f 1)" - new_version="${main_version}-${odc_rpm_release_number}" - echo "new_version=${new_version}" >> $GITHUB_OUTPUT - echo "RPM's version is "${new_version} - mvn versions:set -DnewVersion="${new_version}" - mvn versions:commit - - name: Build jar - run: | - echo "Start build jar packages" - echo "Start prepare oceanbase-client" - pushd import - echo "Current dir is "`pwd` - rm -rf obclient_aarch.tar.gz - rm -rf obclient_win.tar.gz - mv obclient_x86.tar.gz obclient.tar.gz - popd - echo "Prepare oceanbase-client success" - echo "Start build rpm package" - mvn help:system - mvn clean install -Dmaven.test.skip=true - echo "Build jar packages success" - cp --verbose server/odc-server/target/*-executable.jar distribution/jar/odc.jar - cp -fv distribution/jar/odc.jar distribution/jar/odc-slim.jar - zip -d distribution/jar/odc-slim.jar "BOOT-INF/classes/static/*" - - name: Upload jar - if: ${{ github.event.inputs.build_jar == 'true' || github.event.inputs.build_client == 'true'}} - uses: actions/upload-artifact@v3 - with: - name: odc-artifact-jar - path: | - distribution/plugins/*.jar - distribution/starters/*.jar - distribution/jar/*.jar - - name: Build rpm (x86_64) - if: ${{ github.event.inputs.build_rpm == 'true' || github.event.inputs.build_docker == 'true' }} - run: | - echo "Start build rpm package" - mvn --file server/odc-server/pom.xml rpm:rpm -Drpm.prefix=/opt - echo "Build rpm package success" - rm --force --recursive --verbose distribution/docker/resources/odc-*.rpm - mkdir -p distribution/docker/resources/ - mv --verbose server/odc-server/target/rpm/odc-server/RPMS/*/odc-*.rpm distribution/docker/resources/ - - name: Upload rpm (x86_64) - if: ${{ github.event.inputs.build_rpm == 'true' }} - uses: actions/upload-artifact@v3 - with: - name: odc-server-${{ steps.set_release_version.outputs.new_version }}.x86_64.rpm - path: distribution/docker/resources/odc-*.rpm - - name: Build docker image (x86_64) - if: ${{ github.event.inputs.build_docker == 'true' }} - run: | - sed -e "s/DATE_CHANGE/$(date)/" -i distribution/docker/odc/Dockerfile - echo "odc_docker_image_tag=${odc_docker_image_tag}" - pushd distribution/docker - docker build -t docker.io/oceanbase/odc-server:${odc_docker_image_tag} -f odc/Dockerfile . - docker save -o resources/odc-server-${odc_docker_image_tag}.tar.gz docker.io/oceanbase/odc-server:${odc_docker_image_tag} - popd - - name: Upload docker image (x86_64) - if: ${{ github.event.inputs.build_docker == 'true' }} - uses: actions/upload-artifact@v3 - with: - name: odc-server-${{ env.odc_docker_image_tag }}.tar.gz - path: distribution/docker/resources/odc-server-*.tar.gz - - build-client: - name: Build Client Artifact - needs: [ build-web-x86_64 ] - if: ${{ github.event.inputs.build_client == 'true' }} - runs-on: macos-latest - strategy: - fail-fast: false - matrix: - target: [ win, mac, linux_x86, linux_aarch64 ] - steps: - - name: Checkout workspace - uses: actions/checkout@v3 - with: - submodules: "recursive" - # Build jar failed when run on macos-latest, so we need to build and upload jar on ubuntu-latest in the above job - - name: Download resources - uses: actions/download-artifact@v3 - with: - name: odc-artifact-jar - path: jar-dist - - name: Change directory - run: | - mkdir -p client/libraries/java - cp jar-dist/jar/odc-slim.jar client/libraries/java/odc.jar - mkdir -p client/libraries/java/plugins - cp -R jar-dist/plugins/. client/libraries/java/plugins - mkdir -p client/libraries/java/starters - cp -R jar-dist/starters/. client/libraries/java/starters - - name: Setup node 16 - uses: actions/setup-node@v3 - with: - node-version: "16" - - name: Install dependencies - uses: pnpm/action-setup@v2 - with: - version: 8 - run_install: false - - name: Get pnpm store directory - shell: bash - run: | - echo "STORE_PATH=$(pnpm store path --silent)" >> $GITHUB_ENV - - uses: actions/cache@v3 - name: Setup pnpm cache - with: - path: ${{ env.STORE_PATH }} - key: ${{ runner.os }}-pnpm-store-${{ hashFiles('**/pnpm-lock.yaml') }} - restore-keys: | - ${{ runner.os }}-pnpm-store- - - name: Build artifact - env: - # MACOS_CERTIFICATE: ${{ secrets.PROD_MACOS_CERTIFICATE }} - # MACOS_CERTIFICATE_PWD: ${{ secrets.PROD_MACOS_CERTIFICATE_PWD }} - # MACOS_CERTIFICATE_NAME: ${{ secrets.PROD_MACOS_CERTIFICATE_NAME }} - # MACOS_CI_KEYCHAIN_PWD: ${{ secrets.PROD_MACOS_CI_KEYCHAIN_PWD }} - # APPLE_ID: ${{ secrets.PROD_MACOS_NOTARIZATION_APPLE_ID }} - # APPLE_TEAM_ID: ${{ secrets.PROD_MACOS_NOTARIZATION_TEAM_ID }} - # APPLE_ID_PASSWORD: ${{ secrets.PROD_MACOS_NOTARIZATION_PWD }} - # APPLE_APP_SPECIFIC_PASSWORD: ${{ secrets.PROD_MACOS_NOTARIZATION_PWD }} - WIN_CSC_LINK: ${{ secrets.PROD_WIN_CSC_LINK }} - WIN_CSC_KEY_PASSWORD: ${{ secrets.PROD_WIN_CSC_KEY_PASSWORD }} - run: | - # Turn our base64-encoded certificate back to a regular .p12 file - cd client - # echo $MACOS_CERTIFICATE | base64 --decode > certificate.p12 - # We need to create a new keychain, otherwise using the certificate will prompt - # with a UI dialog asking for the certificate password, which we can't - # use in a headless CI environment - # security create-keychain -p "$MACOS_CI_KEYCHAIN_PWD" build.keychain - # security default-keychain -s build.keychain - # security unlock-keychain -p "$MACOS_CI_KEYCHAIN_PWD" build.keychain - # security import certificate.p12 -k build.keychain -P "$MACOS_CERTIFICATE_PWD" -T /usr/bin/codesign - # security set-key-partition-list -S apple-tool:,apple:,codesign: -s -k "$MACOS_CI_KEYCHAIN_PWD" build.keychain - # echo "Cert Imported" - pnpm install - # We need to create a new keychain, otherwise using the certificate will prompt - # with a UI dialog asking for the certificate password, which we can't - # use in a headless CI environment - export ELECTRON_MIRROR=https://npmmirror.com/mirrors/electron/ - export ODC_BUILD_SKIP_JAR=true - export CSC_IDENTITY_AUTO_DISCOVERY=false - node ./scripts/client/build.js ${{ matrix.target }} - - name: Upload artifact - uses: actions/upload-artifact@v3 - with: - name: odc-client-pkg-${{ matrix.target }} - path: | - client/release/*.dmg - client/release/*.deb - client/release/*.exe - client/release/*.AppImage diff --git a/.github/workflows/build_dev.yaml b/.github/workflows/build_dev.yaml index 98cc6b3cd3..1fd16c87e2 100644 --- a/.github/workflows/build_dev.yaml +++ b/.github/workflows/build_dev.yaml @@ -1,15 +1,14 @@ ### -# This workflow is used for daily development +# This workflow is used for code inspection and basic artifacts' construction (rpm and image) in your daily development # Triggers: # 1. Push # 2. Pull-Request # Jobs: # 1. Check Code Format # 2. PMD Scan -# 3. Unit Test (TODO) -# 4. Calculate Version Number -# 5. Build RPM (exclude front resources and only x86_64 platform for now) -# (Job 4 and 5 are executed only when Pull-Request) +# 3. Calculation Version Number +# 4. Build Web Artifact (only x86_64 for now) +# (Job 3 and 4 are triggered only when Pull-Request) ### name: Build Dev @@ -23,6 +22,10 @@ on: branches: - "**" +env: + ODC_CURRENT_BRANCH: ${{ github.ref_name }} + ODC_TARGET_BRANCH: ${{ github.base_ref }} + jobs: check-format: name: Check Code Format @@ -58,29 +61,14 @@ jobs: - name: Run PMD scan run: mvn pmd:check - unit-test: - name: Unit Test (Skip for now) - needs: [ check-format, pmd-scan ] - runs-on: ubuntu-latest - steps: - - name: Checkout workspace - uses: actions/checkout@v3 - - name: Setup JDK 8 - uses: actions/setup-java@v3 - with: - java-version: "8" - distribution: "temurin" - cache: maven - - name: Run unit test - run: echo "Skip for now 🤪" - calculate-version: name: Calculate Version Number - needs: [ unit-test ] + needs: [ check-format, pmd-scan ] if: ${{ github.event_name == 'pull_request' }} runs-on: ubuntu-latest outputs: odc_rpm_release_number: ${{ steps.calculate_version.outputs.odc_rpm_release_number }} + odc_docker_image_tag: ${{ steps.calculate_version.outputs.odc_docker_image_tag }} steps: - name: Checkout workspace uses: actions/checkout@v3 @@ -90,13 +78,19 @@ jobs: odc_rpm_release_number=$(date +%Y%m%d%H%M%S) echo "odc_rpm_release_number=${odc_rpm_release_number}" >> $GITHUB_OUTPUT echo "odc_rpm_release_number=${odc_rpm_release_number}" + branch_match_regex="^[0-9]\\.[0-9]\\.([0-9]{1,2}|x)_(release|dev)$" + tag_prefix=`[[ "${{ env.ODC_CURRENT_BRANCH }}" =~ ${branch_match_regex} ]] && echo "test" || echo "dev"` + odc_docker_image_tag="${tag_prefix}-$(cat distribution/odc-server-VER.txt)-${{ github.actor }}-${odc_rpm_release_number}" + echo "odc_docker_image_tag=${odc_docker_image_tag}" >> $GITHUB_OUTPUT + echo "odc_docker_image_tag=${odc_docker_image_tag}" - build-rpm-x86_64: - name: Build RPM (x86_64) + build-web-x86_64: + name: Build Web Artifact (x86_64) needs: [ calculate-version ] runs-on: ubuntu-latest env: odc_rpm_release_number: ${{ needs.calculate-version.outputs.odc_rpm_release_number }} + odc_docker_image_tag: ${{ needs.calculate-version.outputs.odc_docker_image_tag }} steps: - name: Checkout workspace uses: actions/checkout@v3 @@ -112,6 +106,28 @@ jobs: uses: actions/setup-node@v3 with: node-version: "16" + - name: Build front static resources + run: | + echo "Current directory: " `pwd` + echo "Start build front static resources" + pushd client + echo "Run npm install pnpm -g" + npm install pnpm -g + echo "Run pnpm install" + pnpm install + echo "Run npm run build:odc" + npm run build:odc + popd + echo "Build front static resources success" + echo "Start copy resources files" + static_resources_path="server/odc-server/src/main/resources/static" + if [ ! -d "${static_resources_path}" ]; then + echo "mkdir -p ${static_resources_path}" + mkdir -p "${static_resources_path}" + fi + rm --force --recursive --verbose ${static_resources_path}/* + cp --force --recursive --verbose client/dist/renderer/* ${static_resources_path} + echo "Copy resources files success" - name: Set release version id: set_release_version run: | @@ -144,3 +160,16 @@ jobs: with: name: odc-server-${{ steps.set_release_version.outputs.new_version }}.x86_64.rpm path: distribution/docker/resources/odc-*.rpm + - name: Build docker image (x86_64) + run: | + sed -e "s/DATE_CHANGE/$(date)/" -i distribution/docker/odc/Dockerfile + echo "odc_docker_image_tag=${odc_docker_image_tag}" + pushd distribution/docker + docker build -t docker.io/oceanbase/odc-server:${odc_docker_image_tag} -f odc/Dockerfile . + docker save -o resources/odc-server-${odc_docker_image_tag}.tar.gz docker.io/oceanbase/odc-server:${odc_docker_image_tag} + popd + - name: Upload docker image (x86_64) + uses: actions/upload-artifact@v3 + with: + name: odc-server-${{ env.odc_docker_image_tag }}.tar.gz + path: distribution/docker/resources/odc-server-*.tar.gz diff --git a/.github/workflows/build_release.yaml b/.github/workflows/build_release.yaml index e5a06ede3b..a85f6bc34a 100644 --- a/.github/workflows/build_release.yaml +++ b/.github/workflows/build_release.yaml @@ -1,16 +1,14 @@ ### -# This workflow is used for release +# This workflow is used to build and release all artifacts (jar, rpm, image and client) # Triggers: # 1. Manual trigger # Jobs: # 1. Check Code Format # 2. PMD Scan -# 3. Unit Test (TODO) -# 4. Calculate Version Number -# 5. Build Web Artifact (include front resource and only x86_64 platform for now) -# 6. Build Client Artifact -# 7. Release (TODO) -# 8. Tag (TODO) +# 3. Calculation Version Number +# 4. Build Web Artifact (only x86_64 for now) +# 5. Build Client Artifact +# 6. Release And Tag (TODO) ### name: Build Release @@ -69,25 +67,9 @@ jobs: - name: Run PMD scan run: mvn pmd:check - unit-test: - name: Unit Test (Skip for now) - needs: [ check-format, pmd-scan ] - runs-on: ubuntu-latest - steps: - - name: Checkout workspace - uses: actions/checkout@v3 - - name: Setup JDK 8 - uses: actions/setup-java@v3 - with: - java-version: "8" - distribution: "temurin" - cache: maven - - name: Run unit test - run: echo "Skip for now 🤪" - calculate-version: name: Calculate Version Number - needs: [ unit-test ] + needs: [ check-format, pmd-scan ] runs-on: ubuntu-latest outputs: odc_rpm_release_number: ${{ steps.calculate_version.outputs.odc_rpm_release_number }} @@ -102,7 +84,7 @@ jobs: if [[ -n "${{ inputs.rpm_release }}" ]]; then odc_rpm_release_number="${{ inputs.rpm_release }}"; fi echo "odc_rpm_release_number=${odc_rpm_release_number}" >> $GITHUB_OUTPUT echo "odc_rpm_release_number=${odc_rpm_release_number}" - branch_match_regex="^(((dev/)?[0-9]\\.[0-9]\\.([0-9]{1,2}|x))|(release\\S*))$" + branch_match_regex="^[0-9]\\.[0-9]\\.([0-9]{1,2}|x)_(release|dev)$" tag_prefix=`[[ "${{ env.ODC_CURRENT_BRANCH }}" =~ ${branch_match_regex} ]] && echo "" || echo "test-"` odc_docker_image_tag="${tag_prefix}$(cat distribution/odc-server-VER.txt)-${odc_rpm_release_number}" if [[ -n "${{ inputs.image_tag }}" ]]; then odc_docker_image_tag="${{ inputs.image_tag }}"; fi @@ -298,19 +280,3 @@ jobs: client/release/*.deb client/release/*.exe client/release/*.AppImage - - release: - name: Release (Skip for now) - needs: [ build-client ] - runs-on: ubuntu-latest - steps: - - name: Release artifacts - run: echo "Skip for now 🤪" - - tag: - name: Tag (Skip for now) - needs: [ release ] - runs-on: ubuntu-latest - steps: - - name: Tag release - run: echo "Skip for now 🤪" diff --git a/.github/workflows/build_daily.yaml b/.github/workflows/build_test.yaml similarity index 92% rename from .github/workflows/build_daily.yaml rename to .github/workflows/build_test.yaml index 891cfc1533..3b34748300 100644 --- a/.github/workflows/build_daily.yaml +++ b/.github/workflows/build_test.yaml @@ -1,19 +1,18 @@ ### -# This workflow is used for daily checking and testing +# This workflow is used to build all artifacts (jar, rpm, image and client) for testing # Triggers: # 1. Daily trigger at 04:00 (Beijing, UTC+08:00) # 2. Manual trigger # Jobs: # 1. Check Code Format # 2. PMD Scan -# 3. Unit Test (TODO) -# 4. Calculate Version Number -# 5. Build Web Artifact (include front resource and only x86_64 platform for now) -# 6. Build Client Artifact +# 3. Calculation Version Number +# 4. Build Web Artifact (only x86_64 for now) +# 5. Build Client Artifact ### -name: Build Daily -run-name: Build Daily triggered by ${{ github.actor }} 🛠️ +name: Build Test +run-name: Build Test triggered by ${{ github.actor }} 🛠️ on: schedule: @@ -33,6 +32,7 @@ on: env: ODC_CURRENT_BRANCH: ${{ github.ref_name }} + ODC_TARGET_BRANCH: ${{ github.base_ref }} jobs: check-format: @@ -69,25 +69,9 @@ jobs: - name: Run PMD scan run: mvn pmd:check - unit-test: - name: Unit Test (Skip for now) - needs: [ check-format, pmd-scan ] - runs-on: ubuntu-latest - steps: - - name: Checkout workspace - uses: actions/checkout@v3 - - name: Setup JDK 8 - uses: actions/setup-java@v3 - with: - java-version: "8" - distribution: "temurin" - cache: maven - - name: Run unit test - run: echo "Skip for now 🤪" - calculate-version: name: Calculate Version Number - needs: [ unit-test ] + needs: [ check-format, pmd-scan ] runs-on: ubuntu-latest outputs: odc_rpm_release_number: ${{ steps.calculate_version.outputs.odc_rpm_release_number }} @@ -102,7 +86,7 @@ jobs: if [[ -n "${{ inputs.rpm_release }}" ]]; then odc_rpm_release_number="${{ inputs.rpm_release }}"; fi echo "odc_rpm_release_number=${odc_rpm_release_number}" >> $GITHUB_OUTPUT echo "odc_rpm_release_number=${odc_rpm_release_number}" - branch_match_regex="^(((dev/)?[0-9]\\.[0-9]\\.([0-9]{1,2}|x))|(release\\S*))$" + branch_match_regex="^[0-9]\\.[0-9]\\.([0-9]{1,2}|x)_(release|dev)$" tag_prefix=`[[ "${{ env.ODC_CURRENT_BRANCH }}" =~ ${branch_match_regex} ]] && echo "test-daily" || echo "dev-daily"` odc_docker_image_tag="${tag_prefix}-$(cat distribution/odc-server-VER.txt)-${odc_rpm_release_number}" if [[ -n "${{ inputs.image_tag }}" ]]; then odc_docker_image_tag="${{ inputs.image_tag }}"; fi @@ -133,7 +117,7 @@ jobs: node-version: "16" - name: Build front static resources run: | - echo "Current directory: "`pwd` + echo "Current directory: " `pwd` echo "Start build front static resources" pushd client echo "Run npm install pnpm -g" diff --git a/libs/db-browser/src/main/java/com/oceanbase/tools/dbbrowser/schema/mysql/ODPOBMySQLSchemaAccessor.java b/libs/db-browser/src/main/java/com/oceanbase/tools/dbbrowser/schema/mysql/ODPOBMySQLSchemaAccessor.java index d20c54bcce..f8da2fddbc 100644 --- a/libs/db-browser/src/main/java/com/oceanbase/tools/dbbrowser/schema/mysql/ODPOBMySQLSchemaAccessor.java +++ b/libs/db-browser/src/main/java/com/oceanbase/tools/dbbrowser/schema/mysql/ODPOBMySQLSchemaAccessor.java @@ -21,7 +21,6 @@ import org.springframework.jdbc.core.JdbcOperations; -import com.oceanbase.tools.dbbrowser.model.DBDatabase; import com.oceanbase.tools.dbbrowser.model.DBFunction; import com.oceanbase.tools.dbbrowser.model.DBObjectIdentity; import com.oceanbase.tools.dbbrowser.model.DBObjectType; @@ -39,27 +38,6 @@ public ODPOBMySQLSchemaAccessor(JdbcOperations jdbcOperations) { super(jdbcOperations); } - @Override - public List listDatabases() { - List dbDatabases = new ArrayList<>(); - String sql = "show databases"; - List dbNames = jdbcOperations.queryForList(sql, String.class); - for (String dbName : dbNames) { - DBDatabase database = new DBDatabase(); - jdbcOperations.execute("use " + dbName); - database.setName(dbName); - database.setId(dbName); - database.setCharset( - jdbcOperations.queryForObject("show variables like 'character_set_database';", - (rs, num) -> rs.getString(2))); - database.setCollation( - jdbcOperations.queryForObject("show variables like 'collation_database';", - (rs, num) -> rs.getString(2))); - dbDatabases.add(database); - } - return dbDatabases; - } - @Override public List showTables(String schemaName) { MySQLSqlBuilder sb = new MySQLSqlBuilder(); @@ -75,13 +53,12 @@ public List showTables(String schemaName) { @Override public List listTables(String schemaName, String tableNameLike) { + String sql = "select database()"; + String currentSchema = jdbcOperations.queryForObject(sql, (rs, rowNum) -> rs.getString(1)); List results = new ArrayList<>(); - MySQLSqlBuilder builder = new MySQLSqlBuilder(); - builder.append("show full tables from ") - .identifier(schemaName) - .append(" where Table_type='BASE TABLE'"); - List tables = jdbcOperations.query(builder.toString(), (rs, rowNum) -> rs.getString(1)); - tables.forEach(name -> results.add(DBObjectIdentity.of(schemaName, DBObjectType.TABLE, name))); + sql = "show full tables where Table_type='BASE TABLE'"; + List views = jdbcOperations.query(sql, (rs, rowNum) -> rs.getString(1)); + views.forEach(name -> results.add(DBObjectIdentity.of(currentSchema, DBObjectType.VIEW, name))); return results; } diff --git a/server/odc-core/src/main/java/com/oceanbase/odc/core/sql/execute/mapper/DefaultJdbcRowMapper.java b/server/odc-core/src/main/java/com/oceanbase/odc/core/sql/execute/mapper/DefaultJdbcRowMapper.java index f1c790c706..ecb8f8c398 100644 --- a/server/odc-core/src/main/java/com/oceanbase/odc/core/sql/execute/mapper/DefaultJdbcRowMapper.java +++ b/server/odc-core/src/main/java/com/oceanbase/odc/core/sql/execute/mapper/DefaultJdbcRowMapper.java @@ -41,7 +41,7 @@ public class DefaultJdbcRowMapper extends BaseDialectBasedRowMapper { public DefaultJdbcRowMapper(@NonNull ConnectionSession session) { super(session.getDialectType()); DialectType dialectType = session.getDialectType(); - if (Objects.nonNull(dialectType) && dialectType.isMysql()) { + if (Objects.nonNull(dialectType) && dialectType.isOBMysql()) { mapperList.add(new MySQLBitMapper()); mapperList.add(new MySQLDatetimeMapper()); mapperList.add(new MySQLYearMapper()); diff --git a/server/odc-migrate/src/main/resources/migrate/common/R_2_0_0__initialize_version_diff_config.sql b/server/odc-migrate/src/main/resources/migrate/common/R_2_0_0__initialize_version_diff_config.sql index f7fbf88cc4..db22b76d0d 100644 --- a/server/odc-migrate/src/main/resources/migrate/common/R_2_0_0__initialize_version_diff_config.sql +++ b/server/odc-migrate/src/main/resources/migrate/common/R_2_0_0__initialize_version_diff_config.sql @@ -175,14 +175,15 @@ insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_view','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_procedure','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_function','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; -insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_mock_data','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; +insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_mock_data','MYSQL','false','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_sql_explain','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; -insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_sql_trace','MYSQL','false','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; +insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_sql_trace','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_partition_plan','MYSQL','false','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_constraint','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_constraint_modify','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_partition','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_partition_modify','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; +insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_partition_plafn','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_shadowtable','MYSQL','false','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_show_foreign_key','MYSQL','true','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; insert into `odc_version_diff_config`(`config_key`,`db_mode`,`config_value`,`min_version`,`gmt_create`) values('support_data_export','MYSQL','false','5.7.0',CURRENT_TIMESTAMP) ON DUPLICATE KEY update `config_key`=`config_key`; diff --git a/server/odc-migrate/src/main/resources/migrate/common/V_4_2_0_43__alter_partition_plan.sql b/server/odc-migrate/src/main/resources/migrate/common/V_4_2_0_43__alter_partition_plan.sql deleted file mode 100644 index 57dbfb958f..0000000000 --- a/server/odc-migrate/src/main/resources/migrate/common/V_4_2_0_43__alter_partition_plan.sql +++ /dev/null @@ -1,5 +0,0 @@ -ALTER TABLE `table_partition_plan` ADD COLUMN `database_id` bigint NULL ; -ALTER TABLE `table_partition_plan` ADD COLUMN `database_partition_plan_id` bigint NULL ; -ALTER TABLE `connection_partition_plan` ADD COLUMN `database_id` bigint NULL ; -ALTER TABLE `connection_partition_plan` ADD COLUMN `schedule_id` bigint NULL ; - diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/PartitionPlanController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/PartitionPlanController.java index 162209abff..4bf0b88734 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/PartitionPlanController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/PartitionPlanController.java @@ -18,7 +18,7 @@ import java.io.IOException; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -28,7 +28,7 @@ import com.oceanbase.odc.service.common.response.Responses; import com.oceanbase.odc.service.common.response.SuccessResponse; import com.oceanbase.odc.service.partitionplan.PartitionPlanService; -import com.oceanbase.odc.service.partitionplan.model.DatabasePartitionPlan; +import com.oceanbase.odc.service.partitionplan.model.ConnectionPartitionPlan; /** * @Author:tianke @@ -36,29 +36,30 @@ * @Descripition: */ @RestController -@RequestMapping("/api/v2/partitionPlan") +@ConditionalOnProperty(name = "odc.feature.partitionplan.enabled", havingValue = "true") public class PartitionPlanController { @Autowired private PartitionPlanService partitionPlanService; - @RequestMapping(value = "/partitionPlans", method = RequestMethod.GET) - public SuccessResponse getPartitionPlans(@RequestParam Long databaseId, + @RequestMapping(value = "/api/v2/patitionplan/ConnectionPartitionPlan", method = RequestMethod.GET) + public SuccessResponse getConnectionPartitionPlan(@RequestParam Long connectionId, @RequestParam(required = false) Long flowInstanceId) { return Responses - .success(partitionPlanService.findRangeTablePlan(databaseId, flowInstanceId)); + .success(partitionPlanService.findRangeTablePlan(connectionId, flowInstanceId)); } - @RequestMapping(value = "/partitionPlans/{id:[\\d]+}", method = RequestMethod.PUT) - public SuccessResponse update(@PathVariable Long id, - @RequestBody DatabasePartitionPlan databasePartitionPlan) throws IOException { - partitionPlanService.updateTablePartitionPlan(databasePartitionPlan); + + @RequestMapping(value = "/api/v2/partitionplan/ConnectionPartitionPlan/batchUpdate", method = RequestMethod.PUT) + public SuccessResponse updateConnectionPartitionPlan( + @RequestBody ConnectionPartitionPlan connectionPartitionPlan) throws IOException { + partitionPlanService.updateTablePartitionPlan(connectionPartitionPlan); return Responses.success("ok"); } - @RequestMapping(value = "/partitionPlans/exists", method = RequestMethod.GET) - public SuccessResponse exist(@RequestParam("databaseId") Long databaseId) { - return Responses.success(partitionPlanService.hasConnectionPartitionPlan(databaseId)); + @RequestMapping(value = "/api/v2/partitionplan/ConnectionPartitionPlan/exist", method = RequestMethod.GET) + public SuccessResponse hasConnectionPartitionPlan(@RequestParam("connectionId") Long connectionId) { + return Responses.success(partitionPlanService.hasConnectionPartitionPlan(connectionId)); } } diff --git a/server/odc-server/src/main/resources/data.sql b/server/odc-server/src/main/resources/data.sql index 62627b1230..be123797e4 100644 --- a/server/odc-server/src/main/resources/data.sql +++ b/server/odc-server/src/main/resources/data.sql @@ -682,4 +682,3 @@ INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES( 'odc.task.datatransfer.cursor-fetch-size', '20', '导出时游标的 fetch size,默认为 20,最大值为 1000' ) ON DUPLICATE KEY UPDATE `id` = `id`; INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES( 'odc.data-security.masking.enabled', 'true', '是否开启数据脱敏,默认为开启' ) ON DUPLICATE KEY UPDATE `id` = `id`; -INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES( 'odc.task.partition-plan.schedule-cron', '0 0 * * * ?', '默认调度周期:每天 0 点' ) ON DUPLICATE KEY UPDATE `id` = `id`; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/config/LockConfiguration.java b/server/odc-service/src/main/java/com/oceanbase/odc/config/LockConfiguration.java index d5c02783e4..31968b83fa 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/config/LockConfiguration.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/config/LockConfiguration.java @@ -21,6 +21,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; import org.springframework.integration.jdbc.lock.DefaultLockRepository; @@ -65,10 +66,32 @@ public LockRepository odcLockRepository(DataSource dataSource, @Value("${server. } @Bean + @Primary public JdbcLockRegistry jdbcLockRegistry(@Qualifier("odcLockRepository") LockRepository odcLockRepository) { return new JdbcLockRegistry(odcLockRepository); } + @Bean("partitionPlanLockRepository") + public LockRepository partitionPlanLockRepository(DataSource dataSource, + @Value("${server.port:8989}") String listenPort) { + String localIpAddress = SystemUtils.getLocalIpAddress(); + log.info("create partition plan lock repository..., localIpAddress={}, listenPort={}", localIpAddress, + listenPort); + initLockTable(dataSource); + String lockId = String.format("%s:%s", localIpAddress, listenPort); + DefaultLockRepository defaultLockRepository = new OdcLockRepository(dataSource, lockId); + defaultLockRepository.setPrefix("DISTRIBUTED_"); + defaultLockRepository.setTimeToLive(1000 * 60 * 60 * 24); + log.info("partition plan lock repository created."); + return defaultLockRepository; + } + + @Bean("partitionPlanJdbcLockRegistry") + public JdbcLockRegistry partitionPlanJdbcLockRegistry( + @Qualifier("partitionPlanLockRepository") LockRepository partitionPlanLockRepository) { + return new JdbcLockRegistry(partitionPlanLockRepository); + } + private void initLockTable(DataSource dataSource) { JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); jdbcTemplate.setQueryTimeout(CREATE_LOCK_TABLE_TIMEOUT_SECONDS); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/DatabasePartitionPlanEntity.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/ConnectionPartitionPlanEntity.java similarity index 91% rename from server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/DatabasePartitionPlanEntity.java rename to server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/ConnectionPartitionPlanEntity.java index 90b2f9c267..00895601f4 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/DatabasePartitionPlanEntity.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/ConnectionPartitionPlanEntity.java @@ -45,7 +45,7 @@ @AllArgsConstructor @NoArgsConstructor @Table(name = "connection_partition_plan") -public class DatabasePartitionPlanEntity { +public class ConnectionPartitionPlanEntity { @Id @Column(name = "id", nullable = false, updatable = false) @@ -54,12 +54,8 @@ public class DatabasePartitionPlanEntity { @Column(name = "connection_id", nullable = false, updatable = false) private Long connectionId; - @Column(name = "database_id", nullable = false, updatable = false) - private Long databaseId; @Column(name = "flow_instance_id", nullable = false, updatable = false) private Long flowInstanceId; - @Column(name = "schedule_id", updatable = false) - private Long scheduleId; @Column(name = "organization_id", nullable = false, updatable = false) private Long organizationId; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/DatabasePartitionPlanRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/ConnectionPartitionPlanRepository.java similarity index 64% rename from server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/DatabasePartitionPlanRepository.java rename to server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/ConnectionPartitionPlanRepository.java index 6638c460c2..a73f56cdd7 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/DatabasePartitionPlanRepository.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/ConnectionPartitionPlanRepository.java @@ -15,6 +15,7 @@ */ package com.oceanbase.odc.metadb.partitionplan; +import java.util.List; import java.util.Optional; import javax.transaction.Transactional; @@ -30,8 +31,8 @@ * @Date: 2022/9/16 16:42 * @Descripition: */ -public interface DatabasePartitionPlanRepository extends JpaRepository, - JpaSpecificationExecutor { +public interface ConnectionPartitionPlanRepository extends JpaRepository, + JpaSpecificationExecutor { @Transactional @Modifying @@ -40,21 +41,19 @@ public interface DatabasePartitionPlanRepository extends JpaRepository findValidPlanByDatabaseId(@Param("databaseId") Long databaseId); + + "where connection_id=:connectionId and is_config_enabled=true", nativeQuery = true) + Optional findValidPlanByConnectionId(@Param("connectionId") Long connectionId); - Optional findByFlowInstanceId(Long flowInstanceId); + Optional findByFlowInstanceId(Long flowInstanceId); - @Transactional - @Modifying - @Query(value = "update connection_partition_plan set is_config_enabled = false " - + "where database_id=:databaseId", nativeQuery = true) - int disableConfigByDatabaseId(@Param("databaseId") Long databaseId); + @Query(value = "select * from connection_partition_plan " + + "where is_config_enabled=true", nativeQuery = true) + List findAllValidPlan(); @Transactional @Modifying - @Query(value = "update connection_partition_plan set schedule_id =:scheduleId " - + "where id=:id", nativeQuery = true) - int updateScheduleIdById(@Param("id") Long id, @Param("scheduleId") Long scheduleId); + @Query(value = "update connection_partition_plan set is_config_enabled = false " + + "where connection_id=:connectionId", nativeQuery = true) + int disableConfigByConnectionId(@Param("connectionId") Long connectionId); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/TablePartitionPlanEntity.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/TablePartitionPlanEntity.java index db525c347b..f20698730e 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/TablePartitionPlanEntity.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/TablePartitionPlanEntity.java @@ -55,10 +55,6 @@ public class TablePartitionPlanEntity { @Column(name = "connection_id", nullable = false, updatable = false) private Long connectionId; - @Column(name = "database_id", nullable = false, updatable = false) - private Long databaseId; - @Column(name = "database_partition_plan_id", updatable = false) - private Long databasePartitionPlanId; @Column(name = "flow_instance_id", nullable = false, updatable = false) private Long flowInstanceId; @Column(name = "schema_name", nullable = false, updatable = false) diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/TablePartitionPlanRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/TablePartitionPlanRepository.java index 52fc360955..06222c7044 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/TablePartitionPlanRepository.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/partitionplan/TablePartitionPlanRepository.java @@ -16,6 +16,7 @@ package com.oceanbase.odc.metadb.partitionplan; import java.util.List; +import java.util.Optional; import javax.transaction.Transactional; @@ -36,22 +37,31 @@ public interface TablePartitionPlanRepository extends JpaRepository findValidPlanByDatabasePartitionPlanId(@Param("id") Long databasePartitionPlanId); - @Query(value = "select * from table_partition_plan " - + "where database_id=:id and is_config_enabled = true", nativeQuery = true) - List findValidPlanByDatabaseId(@Param("id") Long databaseId); + @Query(value = "select * from table_partition_plan where connection_id=:id and schema_name=:schemaName" + + " and table_name=:tableName and is_config_enabled=true", nativeQuery = true) + Optional findEnTablePlan(@Param("id") Long connectionId, + @Param("schemaName") String schemaName, + @Param("tableName") String tableName); + + + @Query(value = "select * from table_partition_plan where flow_instance_id=:flowInstanceId and is_config_enabled = true and is_auto_partition = true", + nativeQuery = true) + List findValidPlanByFlowInstanceId(@Param("flowInstanceId") Long flowInstanceId); + + @Query(value = "select * from table_partition_plan ap " + + "where ap.connection_id=:connectionId and ap.is_config_enabled = true", nativeQuery = true) + List findValidPlanByConnectionId(@Param("connectionId") Long connectionId); + + List findByFlowInstanceId(Long flowInstanceId); - List findByDatabasePartitionPlanId(@Param("id") Long databasePartitionPlanId); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java index 4804a4dadb..e4c0418be7 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/database/DatabaseService.java @@ -423,7 +423,6 @@ public Boolean internalSyncDataSourceSchemas(@NonNull Long dataSourceId) throws database.setCollationName(latest.getCollationName()); database.setCharsetName(latest.getCharsetName()); database.setLastSyncTime(latest.getLastSyncTime()); - database.setExisted(Boolean.TRUE); } return database; }).collect(Collectors.toList()); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/DataMaskingService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/DataMaskingService.java index eed92c85e4..8268ae6ac8 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/DataMaskingService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/DataMaskingService.java @@ -218,7 +218,7 @@ public boolean isMaskingEnabled() { } private SQLParser getSqlParser(DialectType dialectType) { - if (Objects.nonNull(dialectType) && dialectType.isMysql()) { + if (Objects.nonNull(dialectType) && dialectType.isOBMysql()) { return new OBMySQLParser(); } else if (dialectType == DialectType.OB_ORACLE) { return new OBOracleSQLParser(); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/extractor/OBColumnExtractor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/extractor/OBColumnExtractor.java index 291f07b742..4244204390 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/extractor/OBColumnExtractor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/datasecurity/extractor/OBColumnExtractor.java @@ -710,7 +710,7 @@ private List getColumnsFromVirtualTable(String tableName) throws } private String processIdentifier(String identifier) { - if (Objects.nonNull(dialectType) && dialectType.isMysql()) { + if (Objects.nonNull(dialectType) && dialectType.isOBMysql()) { String unquoted = StringUtils.unquoteMySqlIdentifier(identifier); return StringUtils.isBlank(unquoted) ? unquoted : unquoted.toLowerCase(); } else if (dialectType == DialectType.OB_ORACLE) { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/datatransfer/DBObjectNameAccessor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/datatransfer/DBObjectNameAccessor.java index db3e599312..98263b78cc 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/datatransfer/DBObjectNameAccessor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/datatransfer/DBObjectNameAccessor.java @@ -100,7 +100,7 @@ public Set getViewNames() { } public Set getTriggerNames() { - if (session.getDialectType().isMysql()) { + if (session.getDialectType().isOBMysql()) { return Collections.emptySet(); } return accessor.listTriggers(schema).stream().map(DBObjectIdentity::getName).collect(Collectors.toSet()); @@ -131,7 +131,7 @@ public Set getSequenceNames() { } public Set getSynonymNames() { - if (session.getDialectType().isMysql()) { + if (session.getDialectType().isOBMysql()) { return Collections.emptySet(); } return accessor.listSynonyms(schema, DBSynonymType.COMMON).stream().map(DBObjectIdentity::getName) @@ -139,7 +139,7 @@ public Set getSynonymNames() { } public Set getPublicSynonymNames() { - if (session.getDialectType().isMysql()) { + if (session.getDialectType().isOBMysql()) { return Collections.emptySet(); } return accessor.listSynonyms(schema, DBSynonymType.PUBLIC).stream().map(DBObjectIdentity::getName) @@ -147,7 +147,7 @@ public Set getPublicSynonymNames() { } public Set getPackageNames() { - if (session.getDialectType().isMysql()) { + if (session.getDialectType().isOBMysql()) { return Collections.emptySet(); } return accessor.listPackages(schema).stream() @@ -157,7 +157,7 @@ public Set getPackageNames() { } public Set getPackageBodyNames() { - if (session.getDialectType().isMysql()) { + if (session.getDialectType().isOBMysql()) { return Collections.emptySet(); } return accessor.listPackages(schema).stream() diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/db/DBPLService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/db/DBPLService.java index 7f82adc9da..8c0c52c8a1 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/db/DBPLService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/db/DBPLService.java @@ -116,7 +116,7 @@ public void destroy() { @SkipAuthorize("inside connect session") public String startBatchCompile(@NonNull ConnectionSession session, String databaseName, StartBatchCompileReq req) { - if (Objects.nonNull(session.getDialectType()) && session.getDialectType().isMysql()) { + if (Objects.nonNull(session.getDialectType()) && session.getDialectType().isOBMysql()) { throw new UnsupportedException("Batch compile is not supported in mysql mode"); } Validate.notNull(req.getScope(), "Parameter [scope] can not be null"); @@ -148,7 +148,7 @@ public String startBatchCompile(@NonNull ConnectionSession session, @SkipAuthorize("inside connect session") public String startBatchCompile(@NonNull ConnectionSession session, String databaseName, @NonNull List identities) { - if (Objects.nonNull(session.getDialectType()) && session.getDialectType().isMysql()) { + if (Objects.nonNull(session.getDialectType()) && session.getDialectType().isOBMysql()) { throw new UnsupportedException("Batch compile is not supported in mysql mode"); } BatchCompileTaskCallable taskCallable = new BatchCompileTaskCallable(session, identities); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dml/converter/BaseStringConverter.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dml/converter/BaseStringConverter.java index d487edc59b..f557e656e5 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dml/converter/BaseStringConverter.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dml/converter/BaseStringConverter.java @@ -39,7 +39,7 @@ public abstract class BaseStringConverter extends BaseDataConverter { protected String doConvert(@NonNull DataValue dataValue) { ValueEncodeType encodeType = dataValue.getEncodeType(); String value = new String(encodeType.decode(dataValue.getValue()), StandardCharsets.UTF_8); - if (Objects.nonNull(dialectType()) && dialectType().isMysql()) { + if (Objects.nonNull(dialectType()) && dialectType().isOBMysql()) { value = StringUtils.escapeUseDouble(value, (char) 92); } return "'" + StringUtils.escapeUseDouble(value, (char) 39) + "'"; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dml/converter/DataConverters.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dml/converter/DataConverters.java index b08c2a4eff..93b0f63c71 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dml/converter/DataConverters.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dml/converter/DataConverters.java @@ -40,7 +40,7 @@ private DataConverters(DialectType dialectType, String serverTimeZoneId) { converterList = new LinkedList<>(); if (DialectType.OB_ORACLE.equals(dialectType)) { initForOracleMode(serverTimeZoneId); - } else if (Objects.nonNull(dialectType) && dialectType.isMysql()) { + } else if (Objects.nonNull(dialectType) && dialectType.isOBMysql()) { initForMysqlMode(); } else { throw new IllegalArgumentException("Illegal DialectType " + dialectType); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java index c5744bc0f8..94c4aa1af7 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java @@ -129,6 +129,8 @@ import com.oceanbase.odc.service.integration.model.IntegrationConfig; import com.oceanbase.odc.service.integration.model.TemplateVariables; import com.oceanbase.odc.service.integration.model.TemplateVariables.Variable; +import com.oceanbase.odc.service.partitionplan.model.PartitionPlanTaskParameters; +import com.oceanbase.odc.service.partitionplan.model.TablePartitionPlan; import com.oceanbase.odc.service.regulation.approval.model.ApprovalFlowConfig; import com.oceanbase.odc.service.regulation.approval.model.ApprovalNodeConfig; import com.oceanbase.odc.service.regulation.risklevel.RiskLevelService; @@ -208,6 +210,11 @@ public class FlowInstanceService { private final List> shadowTableComparingTaskHooks = new ArrayList<>(); private static final long MAX_EXPORT_OBJECT_COUNT = 10000; + /** + * Max partition count for OB MySQL mode, refer to + * 分区概述 + */ + private static final long MAX_PARTITION_COUNT = 8192; private static final String ODC_SITE_URL = "odc.site.url"; private static final String INVALID_EXTERNAL_INSTANCE_ID = "N/A"; @@ -252,6 +259,19 @@ public List createIndividualFlowInstance(CreateFlowInsta @EnablePreprocess @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRED) public List create(@NotNull @Valid CreateFlowInstanceReq createReq) { + if (createReq.getTaskType() == TaskType.PARTITION_PLAN) { + PartitionPlanTaskParameters parameters = (PartitionPlanTaskParameters) createReq.getParameters(); + List tablePartitionPlans = parameters.getConnectionPartitionPlan() + .getTablePartitionPlans(); + for (TablePartitionPlan tablePartitionPlan : tablePartitionPlans) { + if (tablePartitionPlan.getPartitionCount() > MAX_PARTITION_COUNT + && tablePartitionPlan.getDetail().getIsAutoPartition()) { + throw new RuntimeException( + String.format("Can not create more partition. TableName: %s,PartitionCount: %s", + tablePartitionPlan.getTableName(), tablePartitionPlan.getPartitionCount())); + } + } + } // TODO 原终止逻辑想表达的语意是终止执行中的计划,但目前线上的语意是终止审批流。暂保留逻辑,待前端修改后删除。 if (createReq.getTaskType() == TaskType.ALTER_SCHEDULE) { AlterScheduleParameters parameters = (AlterScheduleParameters) createReq.getParameters(); @@ -750,10 +770,17 @@ private FlowInstanceConfigurer buildConfigurer( log.warn("Create external approval instance failed, the instance will be force closed!"); } } - FlowApprovalInstance approvalInstance = flowFactory.generateFlowApprovalInstance(flowInstance.getId(), - false, false, - nodeConfig.getAutoApproval(), approvalFlowConfig.getApprovalExpirationIntervalSeconds(), - externalApprovalId, externalFlowInstanceId); + FlowApprovalInstance approvalInstance; + if (taskType == TaskType.PARTITION_PLAN && nodeSequence == 0) { + approvalInstance = flowFactory.generateFlowApprovalInstance(flowInstance.getId(), + false, false, nodeConfig.getAutoApproval(), + approvalFlowConfig.getApprovalExpirationIntervalSeconds(), true); + } else { + approvalInstance = flowFactory.generateFlowApprovalInstance(flowInstance.getId(), + false, false, + nodeConfig.getAutoApproval(), approvalFlowConfig.getApprovalExpirationIntervalSeconds(), + externalApprovalId, externalFlowInstanceId); + } if (Objects.nonNull(resourceRoleId)) { approvalPermissionService.setCandidateResourceRole(approvalInstance.getId(), StringUtils.join(flowInstanceReq.getProjectId(), ":", resourceRoleId)); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java index 90a306394b..4a34b5b2df 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java @@ -601,8 +601,8 @@ private List getPartitionPlanResult(@NonNull TaskEntity return null; } Long flowInstanceId = partitionPlanTaskResults.get(0).getFlowInstanceId(); - partitionPlanTaskResults.get(0).setDatabasePartitionPlan( - partitionPlanService.findDatabasePartitionPlanByFlowInstanceId(flowInstanceId)); + partitionPlanTaskResults.get(0).setConnectionPartitionPlan( + partitionPlanService.findTablePartitionPlanByFlowInstanceId(flowInstanceId)); return partitionPlanTaskResults; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/PartitionPlanTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/PartitionPlanTask.java index e4950074e0..8f5c636850 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/PartitionPlanTask.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/PartitionPlanTask.java @@ -15,16 +15,21 @@ */ package com.oceanbase.odc.service.flow.task; +import java.util.List; + import org.flowable.engine.delegate.DelegateExecution; import org.springframework.beans.factory.annotation.Autowired; import com.oceanbase.odc.core.shared.constant.FlowStatus; +import com.oceanbase.odc.metadb.partitionplan.TablePartitionPlanEntity; +import com.oceanbase.odc.metadb.partitionplan.TablePartitionPlanRepository; import com.oceanbase.odc.service.flow.task.model.PartitionPlanTaskResult; import com.oceanbase.odc.service.flow.util.FlowTaskUtil; import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; -import com.oceanbase.odc.service.partitionplan.PartitionPlanService; +import com.oceanbase.odc.service.iam.model.User; +import com.oceanbase.odc.service.partitionplan.PartitionPlanTaskService; import com.oceanbase.odc.service.partitionplan.PartitionPlanTaskTraceContextHolder; -import com.oceanbase.odc.service.partitionplan.model.DatabasePartitionPlan; +import com.oceanbase.odc.service.partitionplan.model.ConnectionPartitionPlan; import com.oceanbase.odc.service.partitionplan.model.PartitionPlanTaskParameters; import com.oceanbase.odc.service.task.TaskService; @@ -38,10 +43,12 @@ @Slf4j public class PartitionPlanTask extends BaseODCFlowTaskDelegate { + @Autowired + private TablePartitionPlanRepository tablePartitionPlanRepository; @Autowired private AuthenticationFacade authenticationFacade; @Autowired - private PartitionPlanService partitionPlanService; + private PartitionPlanTaskService partitionPlanTaskService; private volatile boolean isSuccessful = false; private volatile boolean isFailure = false; @@ -53,14 +60,24 @@ protected PartitionPlanTaskResult start(Long taskId, TaskService taskService, De try { PartitionPlanTaskParameters taskParameters = FlowTaskUtil.getPartitionPlanParameter(execution); - DatabasePartitionPlan databasePartitionPlan = taskParameters.getConnectionPartitionPlan(); + ConnectionPartitionPlan connectionPartitionPlan = taskParameters.getConnectionPartitionPlan(); + // 更新状态 taskService.start(taskId); - // Create and enable partition plan. - databasePartitionPlan.setFlowInstanceId(getFlowInstanceId()); - partitionPlanService.createDatabasePartitionPlan(databasePartitionPlan); + // 审批通过,生效配置 + partitionPlanTaskService.enableFlowInstancePartitionPlan(connectionPartitionPlan.getConnectionId(), + getFlowInstanceId()); + // 拉取配置信息 + List tablePlans = tablePartitionPlanRepository.findValidPlanByFlowInstanceId( + getFlowInstanceId()); + + User taskCreator = FlowTaskUtil.getTaskCreator(execution); + partitionPlanTaskService.executePartitionPlan(connectionPartitionPlan.getConnectionId(), + getFlowInstanceId(), tablePlans, taskCreator); + PartitionPlanTaskResult taskResult = new PartitionPlanTaskResult(); taskResult.setFlowInstanceId(getFlowInstanceId()); - taskResult.setDatabasePartitionPlan(databasePartitionPlan); + taskResult.setConnectionPartitionPlan(connectionPartitionPlan); + // TODO 临时,改为周期性任务后变动 isSuccessful = true; taskService.succeed(taskId, taskResult); log.info("Partition plan task succeed."); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/PartitionPlanTaskResult.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/PartitionPlanTaskResult.java index bf104baa05..1f72be7a22 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/PartitionPlanTaskResult.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/PartitionPlanTaskResult.java @@ -15,7 +15,7 @@ */ package com.oceanbase.odc.service.flow.task.model; -import com.oceanbase.odc.service.partitionplan.model.DatabasePartitionPlan; +import com.oceanbase.odc.service.partitionplan.model.ConnectionPartitionPlan; import lombok.Data; @@ -27,5 +27,5 @@ @Data public class PartitionPlanTaskResult implements FlowTaskResult { private Long flowInstanceId; - private DatabasePartitionPlan databasePartitionPlan; + private ConnectionPartitionPlan connectionPartitionPlan; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/DatabasePartitionPlanMapper.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/ConnectionPartitionPlanMapper.java similarity index 61% rename from server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/DatabasePartitionPlanMapper.java rename to server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/ConnectionPartitionPlanMapper.java index 4e057d3576..19a5577f0e 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/DatabasePartitionPlanMapper.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/ConnectionPartitionPlanMapper.java @@ -18,14 +18,14 @@ import org.mapstruct.Mapper; import org.mapstruct.factory.Mappers; -import com.oceanbase.odc.metadb.partitionplan.DatabasePartitionPlanEntity; -import com.oceanbase.odc.service.partitionplan.model.DatabasePartitionPlan; +import com.oceanbase.odc.metadb.partitionplan.ConnectionPartitionPlanEntity; +import com.oceanbase.odc.service.partitionplan.model.ConnectionPartitionPlan; @Mapper -public interface DatabasePartitionPlanMapper { - DatabasePartitionPlanMapper INSTANCE = Mappers.getMapper(DatabasePartitionPlanMapper.class); +public interface ConnectionPartitionPlanMapper { + ConnectionPartitionPlanMapper INSTANCE = Mappers.getMapper(ConnectionPartitionPlanMapper.class); - DatabasePartitionPlan entityToModel(DatabasePartitionPlanEntity entity); + ConnectionPartitionPlan entityToModel(ConnectionPartitionPlanEntity entity); - DatabasePartitionPlanEntity modelToEntity(DatabasePartitionPlan model); + ConnectionPartitionPlanEntity modelToEntity(ConnectionPartitionPlan model); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanFunction.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanFunction.java index 5ddfe4058a..65cb07980c 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanFunction.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanFunction.java @@ -15,15 +15,15 @@ */ package com.oceanbase.odc.service.partitionplan; +import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; -import java.util.Optional; +import java.util.List; import java.util.regex.Pattern; import com.oceanbase.odc.service.partitionplan.model.PeriodUnit; -import com.oceanbase.tools.dbbrowser.model.DBTable; -import com.oceanbase.tools.dbbrowser.model.DBTableColumn; -import com.oceanbase.tools.dbbrowser.model.DBTablePartitionType; +import com.oceanbase.tools.dbbrowser.model.DBTablePartition; +import com.oceanbase.tools.dbbrowser.model.DBTablePartitionDefinition; /** * @Author:tianke @@ -69,28 +69,23 @@ public static Long getPartitionRightBound(Long baseDate, int interval, PeriodUni return maxRightBound.getTime().getTime(); } - public static PartitionExpressionType getPartitionExpressionType(DBTable table) { - String expression = table.getPartition().getPartitionOption().getExpression(); - PartitionExpressionType type = PartitionExpressionType.OTHER; - if (table.getPartition().getPartitionOption().getType() == DBTablePartitionType.RANGE) { - if (Pattern.matches("UNIX_TIMESTAMP\\(.*\\)", expression)) { - type = PartitionExpressionType.UNIX_TIMESTAMP; - } + public static PartitionExpressionType getPartitionExpressionType(DBTablePartition partition) { + List definitions = partition.getPartitionDefinitions(); + String maxValue = definitions.get(definitions.size() - 1).getMaxValues().get(0); + String expression = partition.getPartitionOption().getExpression(); + if (Pattern.matches("UNIX_TIMESTAMP\\(.*\\)", expression)) { + return PartitionExpressionType.UNIX_TIMESTAMP; } - if (table.getPartition().getPartitionOption().getType() == DBTablePartitionType.RANGE_COLUMNS - && table.getPartition().getPartitionOption().getColumnNames().size() == 1) { - Optional rangeColumn = table.getColumns().stream().filter( - column -> column.getName() - .equals(table.getPartition().getPartitionOption().getColumnNames().get(0))) - .findFirst(); - if (rangeColumn.isPresent()) { - if (rangeColumn.get().getTypeName().equalsIgnoreCase("DATE") - || rangeColumn.get().getTypeName().equalsIgnoreCase("DATETIME")) { - type = PartitionExpressionType.DATE; - } - } + if (Pattern.matches(".*\\(.*\\)", expression)) { + return PartitionExpressionType.OTHER; + } + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + try { + sdf.parse(maxValue.substring(1, maxValue.length() - 1)); + return PartitionExpressionType.DATE; + } catch (Exception e) { + return PartitionExpressionType.OTHER; } - return type; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanSchedules.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanSchedules.java new file mode 100644 index 0000000000..3449453755 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanSchedules.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.partitionplan; + +import java.text.SimpleDateFormat; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.integration.jdbc.lock.JdbcLockRegistry; +import org.springframework.stereotype.Component; + +import com.oceanbase.odc.core.shared.constant.FlowStatus; +import com.oceanbase.odc.metadb.flow.FlowInstanceEntity; +import com.oceanbase.odc.metadb.flow.FlowInstanceRepository; +import com.oceanbase.odc.metadb.iam.UserEntity; +import com.oceanbase.odc.metadb.partitionplan.ConnectionPartitionPlanEntity; +import com.oceanbase.odc.metadb.partitionplan.ConnectionPartitionPlanRepository; +import com.oceanbase.odc.metadb.partitionplan.TablePartitionPlanEntity; +import com.oceanbase.odc.metadb.partitionplan.TablePartitionPlanRepository; +import com.oceanbase.odc.service.flow.FlowInstanceService; +import com.oceanbase.odc.service.iam.UserService; +import com.oceanbase.odc.service.iam.model.User; +import com.oceanbase.odc.service.iam.util.SecurityContextUtils; + +import lombok.extern.slf4j.Slf4j; + +/** + * @Author:tianke + * @Date: 2022/10/9 14:17 + * @Descripition: + */ +@Slf4j +@Component +public class PartitionPlanSchedules { + + @Autowired + private ConnectionPartitionPlanRepository connectionPartitionPlanRepository; + @Autowired + private TablePartitionPlanRepository tablePartitionPlanRepository; + + @Autowired + private FlowInstanceService flowInstanceService; + + @Autowired + private FlowInstanceRepository flowInstanceRepository; + @Autowired + private PartitionPlanTaskService partitionPlanTaskService; + + @Autowired + private UserService userService; + + @Autowired + @Qualifier("partitionPlanJdbcLockRegistry") + private JdbcLockRegistry jdbcLockRegistry; + + private final SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); + + private final String resourcePrefix = "partition_plan_schedule"; + + // @Scheduled(cron = "0 0 0 * * ?") + public void alterPartitionTask() throws Exception { + // 查找所有生效计划 + List allValidPlan = connectionPartitionPlanRepository.findAllValidPlan(); + for (ConnectionPartitionPlanEntity connectionPartitionPlan : allValidPlan) { + try { + List tablePlans = tablePartitionPlanRepository + .findValidPlanByFlowInstanceId(connectionPartitionPlan.getFlowInstanceId()); + + if (tablePlans.isEmpty()) { + continue; + } + // 加全局锁,避免多个实例重复执行报错。锁粒度到每个连接的分区计划。 + if (!tryLock(connectionPartitionPlan.getConnectionId())) { + continue; + } + UserEntity userEntity = userService.nullSafeGet(connectionPartitionPlan.getCreatorId()); + User taskCreator = new User(userEntity); + SecurityContextUtils.setCurrentUser(taskCreator); + + // 查找正在审批中的子流程,并终止。 + cancelApprovingTask(connectionPartitionPlan.getFlowInstanceId()); + + partitionPlanTaskService.executePartitionPlan(connectionPartitionPlan.getConnectionId(), + connectionPartitionPlan.getFlowInstanceId(), tablePlans, taskCreator); + } catch (Exception e) { + log.warn("Partition planning daily task creation failed,connectionId={},error message={}", + connectionPartitionPlan.getConnectionId(), e.getMessage()); + } + + } + + } + + private boolean tryLock(Long connectionId) throws InterruptedException { + String lockKey = String.format("%s_%s_%s", resourcePrefix, connectionId, + sdf.format(System.currentTimeMillis())); + Lock lock = jdbcLockRegistry.obtain(lockKey); + if (!lock.tryLock(3, TimeUnit.SECONDS)) { + log.info("get lock failed:{}", lockKey); + return false; + } + log.info("get lock:{}", lockKey); + return true; + } + + private void cancelApprovingTask(Long flowInstanceId) { + List entities = + flowInstanceRepository.findByParentInstanceId(flowInstanceId); + for (FlowInstanceEntity entity : entities) { + if (entity.getStatus() == FlowStatus.APPROVING) { + log.info("sub task expired:{}", entity.getId()); + flowInstanceService.cancel(entity.getId(), false); + } + } + } +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanService.java index cb8aff2d88..152995a068 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanService.java @@ -18,13 +18,10 @@ import java.io.IOException; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import org.quartz.SchedulerException; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -33,33 +30,23 @@ import com.oceanbase.odc.core.session.ConnectionSession; import com.oceanbase.odc.metadb.flow.ServiceTaskInstanceEntity; import com.oceanbase.odc.metadb.flow.ServiceTaskInstanceRepository; -import com.oceanbase.odc.metadb.partitionplan.DatabasePartitionPlanEntity; -import com.oceanbase.odc.metadb.partitionplan.DatabasePartitionPlanRepository; +import com.oceanbase.odc.metadb.partitionplan.ConnectionPartitionPlanEntity; +import com.oceanbase.odc.metadb.partitionplan.ConnectionPartitionPlanRepository; import com.oceanbase.odc.metadb.partitionplan.TablePartitionPlanEntity; import com.oceanbase.odc.metadb.partitionplan.TablePartitionPlanRepository; -import com.oceanbase.odc.metadb.schedule.ScheduleEntity; import com.oceanbase.odc.metadb.task.TaskEntity; -import com.oceanbase.odc.service.connection.database.DatabaseService; -import com.oceanbase.odc.service.connection.database.model.Database; +import com.oceanbase.odc.service.connection.ConnectionService; import com.oceanbase.odc.service.connection.model.ConnectionConfig; import com.oceanbase.odc.service.db.browser.DBSchemaAccessors; import com.oceanbase.odc.service.flow.FlowInstanceService; import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; -import com.oceanbase.odc.service.partitionplan.model.DatabasePartitionPlan; +import com.oceanbase.odc.service.partitionplan.model.ConnectionPartitionPlan; import com.oceanbase.odc.service.partitionplan.model.PartitionPlanTaskParameters; import com.oceanbase.odc.service.partitionplan.model.TablePartitionPlan; -import com.oceanbase.odc.service.quartz.model.MisfireStrategy; -import com.oceanbase.odc.service.schedule.ScheduleService; -import com.oceanbase.odc.service.schedule.model.JobType; -import com.oceanbase.odc.service.schedule.model.PartitionPlanJobParameters; -import com.oceanbase.odc.service.schedule.model.ScheduleStatus; -import com.oceanbase.odc.service.schedule.model.TriggerConfig; -import com.oceanbase.odc.service.schedule.model.TriggerStrategy; import com.oceanbase.odc.service.session.factory.DefaultConnectSessionFactory; import com.oceanbase.odc.service.task.TaskService; import com.oceanbase.tools.dbbrowser.model.DBTablePartition; import com.oceanbase.tools.dbbrowser.schema.DBSchemaAccessor; -import com.oceanbase.tools.migrator.common.exception.UnExpectedException; import lombok.extern.slf4j.Slf4j; @@ -74,22 +61,17 @@ @SkipAuthorize("permission check inside getForConnect") public class PartitionPlanService { - @Value("${odc.task.partition-plan.schedule-cron:0 0 * * * ?}") - private String defaultScheduleCron; @Autowired private TablePartitionPlanRepository tablePartitionPlanRepository; @Autowired - private DatabasePartitionPlanRepository databasePartitionPlanRepository; + private ConnectionPartitionPlanRepository connectionPartitionPlanRepository; @Autowired private FlowInstanceService flowInstanceService; @Autowired private AuthenticationFacade authenticationFacade; @Autowired - private DatabaseService databaseService; - - @Autowired - private ScheduleService scheduleService; + private ConnectionService connectionService; @Autowired private TaskService taskService; @@ -97,55 +79,67 @@ public class PartitionPlanService { private ServiceTaskInstanceRepository serviceTaskInstanceRepository; - private final DatabasePartitionPlanMapper databasePartitionPlanMapper = DatabasePartitionPlanMapper.INSTANCE; + private final ConnectionPartitionPlanMapper connectionPartitionPlanMapper = ConnectionPartitionPlanMapper.INSTANCE; private final TablePartitionPlanMapper tablePartitionPlanMapper = new TablePartitionPlanMapper(); /** - * 查找 Range 分区表 + * 查找 Range 分区表 TODO:新增筛选已修改的表 */ - public DatabasePartitionPlan findRangeTablePlan(Long databaseId, Long flowInstanceId) { + public ConnectionPartitionPlan findRangeTablePlan(Long connectionId, Long flowInstanceId) { // 根据流程实例 ID 查询时,仅查询实例下的分区配置 if (flowInstanceId != null) { - return findDatabasePartitionPlanByFlowInstanceId(flowInstanceId); + return findTablePartitionPlanByFlowInstanceId(flowInstanceId); + } + // 通过 connectionId 查询时,拉取全 RANGE 表,并返回已存在的分区计划 + ConnectionConfig connectionConfig = connectionService.getForConnect(connectionId); + DefaultConnectSessionFactory factory = new DefaultConnectSessionFactory(connectionConfig); + ConnectionSession connectionSession = factory.generateSession(); + try { + DBSchemaAccessor accessor = DBSchemaAccessors.create(connectionSession); + List tablePartitionPlans = findConnectionAllTablePartitionPlan( + connectionId, connectionConfig.getTenantName(), accessor); + Optional validConnectionPlan = + connectionPartitionPlanRepository.findValidPlanByConnectionId(connectionId); + ConnectionPartitionPlan returnValue = + validConnectionPlan.isPresent() + ? connectionPartitionPlanMapper.entityToModel(validConnectionPlan.get()) + : ConnectionPartitionPlan.builder().connectionId(connectionId).build(); + returnValue.setTablePartitionPlans(tablePartitionPlans); + return returnValue; + } finally { + try { + connectionSession.expire(); + } catch (Exception e) { + // eat exception + } } - // 通过 database 查询时,拉取全 RANGE 表,并返回已存在的分区计划 - Database database = databaseService.detail(databaseId); - Optional databasePartitionPlan = - databasePartitionPlanRepository.findValidPlanByDatabaseId(databaseId); - DatabasePartitionPlan returnValue = - databasePartitionPlan.isPresent() - ? databasePartitionPlanMapper.entityToModel(databasePartitionPlan.get()) - : DatabasePartitionPlan.builder().databaseId(databaseId).build(); - returnValue.setTablePartitionPlans(findDatabaseTablePartitionPlan(database)); - return returnValue; } /** * 确认策略:停用连接下原所有计划,生效当前 flowInstanceId 下的所有配置 */ @Transactional - public void updateTablePartitionPlan(DatabasePartitionPlan databasePartitionPlan) throws IOException { + public void updateTablePartitionPlan(ConnectionPartitionPlan connectionPartitionPlan) throws IOException { // 更新连接配置 - Optional databasePartitionPlanEntity = - databasePartitionPlanRepository.findByFlowInstanceId(databasePartitionPlan.getFlowInstanceId()); - if (!databasePartitionPlanEntity.isPresent()) + Optional connectionPartitionPlanEntity = + connectionPartitionPlanRepository.findByFlowInstanceId(connectionPartitionPlan.getFlowInstanceId()); + if (!connectionPartitionPlanEntity.isPresent()) return; - databasePartitionPlanEntity.get().setInspectEnabled(databasePartitionPlan.isInspectEnable()); - databasePartitionPlanEntity.get() - .setInspectTriggerStrategy(databasePartitionPlan.getInspectTriggerStrategy()); - databasePartitionPlanEntity.get().setConfigEnabled(false); - databasePartitionPlanRepository.saveAndFlush(databasePartitionPlanEntity.get()); + connectionPartitionPlanEntity.get().setInspectEnabled(connectionPartitionPlan.isInspectEnable()); + connectionPartitionPlanEntity.get() + .setInspectTriggerStrategy(connectionPartitionPlan.getInspectTriggerStrategy()); + connectionPartitionPlanEntity.get().setConfigEnabled(false); + connectionPartitionPlanRepository.saveAndFlush(connectionPartitionPlanEntity.get()); // 更新表分区配置 - List tablePartitionPlans = databasePartitionPlan.getTablePartitionPlans(); - List tablePartitionPlanEntities = - tablePartitionPlanRepository.findValidPlanByDatabasePartitionPlanId( - databasePartitionPlan.getId()); + List tablePartitionPlans = connectionPartitionPlan.getTablePartitionPlans(); + List tablePartitionPlanEntities = tablePartitionPlanRepository.findByFlowInstanceId( + connectionPartitionPlan.getFlowInstanceId()); List updateEntities = new LinkedList<>(); tablePartitionPlans.forEach(tablePartitionPlan -> { for (TablePartitionPlanEntity tablePartitionPlanEntity : tablePartitionPlanEntities) { // 未修改的直接生效 - tablePartitionPlanEntity.setFlowInstanceId(databasePartitionPlan.getFlowInstanceId()); + tablePartitionPlanEntity.setFlowInstanceId(connectionPartitionPlan.getFlowInstanceId()); tablePartitionPlanEntity.setIsConfigEnable(false); tablePartitionPlanEntity.setModifierId(authenticationFacade.currentUserId()); if (tablePartitionPlanEntity.getSchemaName().equals(tablePartitionPlan.getSchemaName()) @@ -170,110 +164,99 @@ public void updateTablePartitionPlan(DatabasePartitionPlan databasePartitionPlan // 更新配置 tablePartitionPlanRepository.saveAll(updateEntities); PartitionPlanTaskParameters taskParameters = new PartitionPlanTaskParameters(); - taskParameters.setConnectionPartitionPlan(databasePartitionPlan); + taskParameters.setConnectionPartitionPlan(connectionPartitionPlan); // 更新任务详情 Optional taskInstance = serviceTaskInstanceRepository.findByFlowInstanceId( - databasePartitionPlan.getFlowInstanceId()); + connectionPartitionPlan.getFlowInstanceId()); taskInstance.ifPresent(instance -> { TaskEntity taskEntity = taskService.detail(instance.getTargetTaskId()); taskEntity.setParametersJson(JsonUtils.toJson(taskParameters)); taskService.updateParametersJson(taskEntity); }); // 推进流程节点 - flowInstanceService.approve(databasePartitionPlan.getFlowInstanceId(), "approve update partition plan", + flowInstanceService.approve(connectionPartitionPlan.getFlowInstanceId(), "approve update partition plan", false); } /** * 查询当前连接下是否存在分区计划 */ - public boolean hasConnectionPartitionPlan(Long databaseId) { - Optional validConnectionPlan = - databasePartitionPlanRepository.findValidPlanByDatabaseId(databaseId); + public boolean hasConnectionPartitionPlan(Long connectionId) { + Optional validConnectionPlan = + connectionPartitionPlanRepository.findValidPlanByConnectionId(connectionId); return validConnectionPlan.isPresent(); } - public DatabasePartitionPlan findDatabasePartitionPlanByFlowInstanceId(Long flowInstanceId) { - Optional entity = - databasePartitionPlanRepository.findByFlowInstanceId(flowInstanceId); - DatabasePartitionPlan databasePartitionPlan = new DatabasePartitionPlan(); - if (entity.isPresent()) { - databasePartitionPlan = databasePartitionPlanMapper.entityToModel(entity.get()); - List tablePartitionPlans = tablePartitionPlanRepository - .findValidPlanByDatabasePartitionPlanId( - databasePartitionPlan.getId()) - .stream().map(tablePartitionPlanMapper::entityToModel).collect(Collectors.toList()); - databasePartitionPlan.setTablePartitionPlans(tablePartitionPlans); + public ConnectionPartitionPlan findTablePartitionPlanByFlowInstanceId(Long flowInstanceId) { + ConnectionPartitionPlan connectionPartitionPlan = new ConnectionPartitionPlan(); + Optional connectionPartitionPlanEntity = + connectionPartitionPlanRepository.findByFlowInstanceId(flowInstanceId); + if (connectionPartitionPlanEntity.isPresent()) { + connectionPartitionPlan = + connectionPartitionPlanMapper.entityToModel(connectionPartitionPlanEntity.get()); } - return databasePartitionPlan; + List tablePartitionPlanEntities = + tablePartitionPlanRepository.findByFlowInstanceId(flowInstanceId); + List tablePartitionPlans = tablePartitionPlanEntities.stream().map( + tablePartitionPlanMapper::entityToModel).collect( + Collectors.toList()); + connectionPartitionPlan.setTablePartitionPlans(tablePartitionPlans); + return connectionPartitionPlan; } - private List findDatabaseTablePartitionPlan(Database database) { - // find exist table partition-plan config. - Map tableName2TablePartitionPlan = tablePartitionPlanRepository - .findValidPlanByDatabaseId( - database.getId()) - .stream().collect(Collectors.toMap(TablePartitionPlanEntity::getTableName, o -> o)); - - List dbTablePartitions = listTableRangePartitionInfo(database); + private List findConnectionAllTablePartitionPlan(Long connectionId, String tenantName, + DBSchemaAccessor accessor) { + List connectionValidPartitionPlans = + tablePartitionPlanRepository.findValidPlanByConnectionId(connectionId); + List dbTablePartitions = accessor.listTableRangePartitionInfo(tenantName); List returnValue = new LinkedList<>(); - dbTablePartitions.forEach(dbTablePartition -> { - TablePartitionPlan tablePartitionPlan = - tableName2TablePartitionPlan.containsKey(dbTablePartition.getTableName()) - ? tablePartitionPlanMapper - .entityToModel(tableName2TablePartitionPlan.get(dbTablePartition.getTableName())) - : TablePartitionPlan.builder() - .schemaName(dbTablePartition.getSchemaName()) - .tableName(dbTablePartition.getTableName()).build(); - tablePartitionPlan.setPartitionCount(dbTablePartition.getPartitionOption().getPartitionsNum()); - returnValue.add(tablePartitionPlan); - - }); - return returnValue; - } - - private List listTableRangePartitionInfo(Database database) { - ConnectionConfig connectionConfig = database.getDataSource(); - DefaultConnectSessionFactory factory = new DefaultConnectSessionFactory(connectionConfig); - ConnectionSession connectionSession = factory.generateSession(); - DBSchemaAccessor accessor = DBSchemaAccessors.create(connectionSession); - List dbTablePartitions; - try { - dbTablePartitions = accessor.listTableRangePartitionInfo( - database.getDataSource().getTenantName()).stream() - .filter(o -> o.getSchemaName().equals(database.getName())).collect( - Collectors.toList()); - } finally { - try { - connectionSession.expire(); - } catch (Exception e) { - // eat exception + for (DBTablePartition dbTablePartition : dbTablePartitions) { + boolean hasPartitionPlan = false; + for (TablePartitionPlanEntity tablePlan : connectionValidPartitionPlans) { + if (dbTablePartition.getSchemaName().equals(tablePlan.getSchemaName()) + && dbTablePartition.getTableName().equals(tablePlan.getTableName())) { + TablePartitionPlan tablePartitionPlan = tablePartitionPlanMapper.entityToModel(tablePlan); + tablePartitionPlan.setPartitionCount(dbTablePartition.getPartitionOption().getPartitionsNum()); + returnValue.add(tablePartitionPlan); + hasPartitionPlan = true; + break; + } + } + if (!hasPartitionPlan) { + returnValue.add(TablePartitionPlan.builder() + .schemaName(dbTablePartition.getSchemaName()) + .tableName(dbTablePartition.getTableName()) + .partitionCount(dbTablePartition.getPartitionOption().getPartitionsNum()).build()); } } - return dbTablePartitions; + return returnValue; } + /** + * 创建分区计划时插入,审批通过后生效 + */ @Transactional - public void createDatabasePartitionPlan(DatabasePartitionPlan databasePartitionPlan) { + public void addTablePartitionPlan(ConnectionPartitionPlan connectionPartitionPlan, Long flowInstanceId) { + connectionPartitionPlan.setFlowInstanceId(flowInstanceId); + connectionPartitionPlan.getTablePartitionPlans() + .forEach(tablePartitionPlan -> tablePartitionPlan.setFlowInstanceId(flowInstanceId)); // 新增连接分区配置 long currentUserId = authenticationFacade.currentUserId(); long currentOrganizationId = authenticationFacade.currentOrganizationId(); - DatabasePartitionPlanEntity databasePartitionPlanEntity = - databasePartitionPlanMapper.modelToEntity(databasePartitionPlan); - databasePartitionPlanEntity.setCreatorId(currentUserId); - databasePartitionPlanEntity.setModifierId(currentUserId); - databasePartitionPlanEntity.setOrganizationId(currentOrganizationId); - databasePartitionPlanEntity.setConfigEnabled(false); - databasePartitionPlanEntity = databasePartitionPlanRepository.save(databasePartitionPlanEntity); + ConnectionPartitionPlanEntity connectionPartitionPlanEntity = + connectionPartitionPlanMapper.modelToEntity(connectionPartitionPlan); + connectionPartitionPlanEntity.setCreatorId(currentUserId); + connectionPartitionPlanEntity.setModifierId(currentUserId); + connectionPartitionPlanEntity.setOrganizationId(currentOrganizationId); + connectionPartitionPlanEntity.setConfigEnabled(false); + connectionPartitionPlanRepository.save(connectionPartitionPlanEntity); // 新增分区计划 List entities = new LinkedList<>(); - for (TablePartitionPlan tablePlan : databasePartitionPlan.getTablePartitionPlans()) { + for (TablePartitionPlan tablePlan : connectionPartitionPlan.getTablePartitionPlans()) { TablePartitionPlanEntity tablePlanEntity = tablePartitionPlanMapper.modelToEntity(tablePlan); - tablePlanEntity.setFlowInstanceId(databasePartitionPlanEntity.getFlowInstanceId()); - tablePlanEntity.setConnectionId(databasePartitionPlan.getConnectionId()); - tablePlanEntity.setDatabaseId(databasePartitionPlan.getDatabaseId()); - tablePlanEntity.setDatabasePartitionPlanId(databasePartitionPlanEntity.getId()); + tablePlanEntity.setConnectionId(connectionPartitionPlan.getConnectionId()); + tablePlanEntity.setFlowInstanceId(connectionPartitionPlanEntity.getFlowInstanceId()); tablePlanEntity.setOrganizationId(currentOrganizationId); tablePlanEntity.setCreatorId(currentUserId); tablePlanEntity.setModifierId(currentUserId); @@ -282,90 +265,5 @@ public void createDatabasePartitionPlan(DatabasePartitionPlan databasePartitionP entities.add(tablePlanEntity); } tablePartitionPlanRepository.saveAll(entities); - enableDatabasePartitionPlan(databasePartitionPlanEntity); - try { - // Create partition plan job. - ScheduleEntity scheduleEntity = createDatabasePartitionPlanSchedule( - databasePartitionPlanEntity); - // Bind partition plan job to entity. - databasePartitionPlanRepository.updateScheduleIdById(databasePartitionPlanEntity.getId(), - scheduleEntity.getId()); - } catch (Exception e) { - throw new UnExpectedException("Create database partition plan job failed.", e); - } - } - - private void enableDatabasePartitionPlan(DatabasePartitionPlanEntity databasePartitionPlan) { - Optional oldPartitionPlan = - databasePartitionPlanRepository.findValidPlanByDatabaseId(databasePartitionPlan.getDatabaseId()); - if (oldPartitionPlan.isPresent()) { - try { - log.info("Found a valid plan in this database,start to disable it."); - // Cancel previous plan. - flowInstanceService.cancelNotCheckPermission(oldPartitionPlan.get().getFlowInstanceId()); - // Stop previous job. - if (oldPartitionPlan.get().getScheduleId() != null) { - try { - ScheduleEntity scheduleEntity = scheduleService.nullSafeGetById( - oldPartitionPlan.get().getScheduleId()); - scheduleService.terminate(scheduleEntity); - log.info("Terminate old partition plan job success,scheduleId={}", - oldPartitionPlan.get().getScheduleId()); - } catch (Exception e) { - log.warn("Terminate old partition plan job failed,scheduleId={}", - oldPartitionPlan.get().getScheduleId()); - } - } - } catch (Exception e) { - log.warn("The previous plan has been abandoned,but stop previous instance failed.", e); - } - } - databasePartitionPlanRepository.disableConfigByDatabaseId(databasePartitionPlan.getDatabaseId()); - tablePartitionPlanRepository.disableConfigByDatabaseId(databasePartitionPlan.getDatabaseId()); - databasePartitionPlanRepository.enableConfigByFlowInstanceId(databasePartitionPlan.getFlowInstanceId()); - tablePartitionPlanRepository.enableConfigByDatabasePartitionPlanId(databasePartitionPlan.getId()); - } - - /** - * Create a quartz job to execute partition-plan. - */ - @Transactional - public ScheduleEntity createDatabasePartitionPlanSchedule(DatabasePartitionPlanEntity databasePartitionPlan) - throws SchedulerException, ClassNotFoundException { - ScheduleEntity scheduleEntity = new ScheduleEntity(); - scheduleEntity.setDatabaseId(databasePartitionPlan.getDatabaseId()); - scheduleEntity.setStatus(ScheduleStatus.ENABLED); - scheduleEntity.setCreatorId(authenticationFacade.currentUserId()); - scheduleEntity.setModifierId(authenticationFacade.currentUserId()); - scheduleEntity.setOrganizationId(authenticationFacade.currentOrganizationId()); - scheduleEntity.setAllowConcurrent(false); - scheduleEntity.setMisfireStrategy(MisfireStrategy.MISFIRE_INSTRUCTION_DO_NOTHING); - scheduleEntity.setJobType(JobType.PARTITION_PLAN); - - TriggerConfig triggerConfig = new TriggerConfig(); - triggerConfig.setTriggerStrategy(TriggerStrategy.CRON); - triggerConfig.setCronExpression(defaultScheduleCron); - scheduleEntity.setTriggerConfigJson(JsonUtils.toJson(triggerConfig)); - - Database database = databaseService.detail(scheduleEntity.getDatabaseId()); - scheduleEntity.setProjectId(database.getProject().getId()); - scheduleEntity.setConnectionId(database.getDataSource().getId()); - scheduleEntity.setDatabaseName(database.getName()); - PartitionPlanJobParameters jobParameters = new PartitionPlanJobParameters(); - jobParameters.setDatabasePartitionPlanId(databasePartitionPlan.getId()); - scheduleEntity.setJobParametersJson(JsonUtils.toJson(jobParameters)); - scheduleEntity = scheduleService.create(scheduleEntity); - // Bind job to databasePartitionPlan. - databasePartitionPlan.setScheduleId(scheduleEntity.getId()); - scheduleService.enable(scheduleEntity); - return scheduleEntity; - } - - public DatabasePartitionPlanEntity getDatabasePartitionPlanById(Long id) { - return databasePartitionPlanRepository.findById(id).orElse(null); - } - - public List getValidTablePlanByDatabasePartitionPlanId(Long databaseId) { - return tablePartitionPlanRepository.findValidPlanByDatabasePartitionPlanId(databaseId); } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanSubFlowThread.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanSubFlowThread.java index 709ff0aba9..658ab28ed8 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanSubFlowThread.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanSubFlowThread.java @@ -47,8 +47,8 @@ public class PartitionPlanSubFlowThread extends Thread { @Setter protected long taskId; - public PartitionPlanSubFlowThread(Long parentFlowInstanceId, - Long databaseId, List sqls, FlowInstanceService flowInstanceService, User user) { + public PartitionPlanSubFlowThread(String schemaName, Long parentFlowInstanceId, + Long connectionId, List sqls, FlowInstanceService flowInstanceService, User user) { DatabaseChangeParameters taskParameters = new DatabaseChangeParameters(); taskParameters.setErrorStrategy("ABORT"); StringBuilder sqlContent = new StringBuilder(); @@ -59,9 +59,10 @@ public PartitionPlanSubFlowThread(Long parentFlowInstanceId, CreateFlowInstanceReq flowInstanceReq = new CreateFlowInstanceReq(); flowInstanceReq.setParameters(taskParameters); flowInstanceReq.setTaskType(TaskType.ASYNC); - flowInstanceReq.setDatabaseId(databaseId); + flowInstanceReq.setConnectionId(connectionId); flowInstanceReq.setParentFlowInstanceId(parentFlowInstanceId); flowInstanceReq.setExecutionStrategy(FlowTaskExecutionStrategy.AUTO); + flowInstanceReq.setDatabaseName(schemaName); this.createFlowInstanceReq = flowInstanceReq; this.flowInstanceService = flowInstanceService; this.user = user; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanTaskService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanTaskService.java index dffc1e2ad2..435b575b1a 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanTaskService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/PartitionPlanTaskService.java @@ -22,25 +22,24 @@ import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import com.oceanbase.odc.core.authority.util.SkipAuthorize; import com.oceanbase.odc.core.session.ConnectionSession; -import com.oceanbase.odc.metadb.partitionplan.DatabasePartitionPlanRepository; +import com.oceanbase.odc.metadb.partitionplan.ConnectionPartitionPlanEntity; +import com.oceanbase.odc.metadb.partitionplan.ConnectionPartitionPlanRepository; import com.oceanbase.odc.metadb.partitionplan.TablePartitionPlanEntity; import com.oceanbase.odc.metadb.partitionplan.TablePartitionPlanRepository; -import com.oceanbase.odc.service.connection.database.DatabaseService; +import com.oceanbase.odc.service.connection.ConnectionService; import com.oceanbase.odc.service.connection.model.ConnectionConfig; import com.oceanbase.odc.service.db.browser.DBSchemaAccessors; import com.oceanbase.odc.service.flow.FlowInstanceService; -import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; +import com.oceanbase.odc.service.iam.model.User; import com.oceanbase.odc.service.session.factory.DefaultConnectSessionFactory; -import com.oceanbase.tools.dbbrowser.model.DBTable; import com.oceanbase.tools.dbbrowser.model.DBTablePartition; import com.oceanbase.tools.dbbrowser.model.DBTablePartitionDefinition; import com.oceanbase.tools.dbbrowser.schema.DBSchemaAccessor; @@ -63,25 +62,17 @@ public class PartitionPlanTaskService { @Autowired private TablePartitionPlanRepository tablePartitionPlanRepository; @Autowired - private DatabasePartitionPlanRepository databasePartitionPlanRepository; + private ConnectionPartitionPlanRepository connectionPartitionPlanRepository; @Autowired private FlowInstanceService flowInstanceService; @Autowired - private DatabaseService databaseService; + private ConnectionService connectionService; - @Autowired - private AuthenticationFacade authenticationFacade; - - public void executePartitionPlan(Long flowInstanceId, List tablePlans) + public void executePartitionPlan(Long connectionId, Long flowInstanceId, List tablePlans, + User taskCreator) throws Exception { - Set databaseIds = tablePlans.stream().map(TablePartitionPlanEntity::getDatabaseId).collect( - Collectors.toSet()); - Optional databaseId = databaseIds.stream().findFirst(); - if (!databaseId.isPresent() || databaseIds.size() != 1) { - log.warn("Table plans belongs to multi database,its not allow here."); - return; - } - ConnectionConfig conn = databaseService.findDataSourceForConnectById(databaseId.get()); + + ConnectionConfig conn = connectionService.getForConnect(connectionId); DefaultConnectSessionFactory factory = new DefaultConnectSessionFactory(conn); ConnectionSession connectionSession = factory.generateSession(); try { @@ -90,18 +81,16 @@ public void executePartitionPlan(Long flowInstanceId, List addPartitionSqls = createAddPartitionDDL(accessor, tablePlans); if (!addPartitionSqls.isEmpty()) { PartitionPlanSubFlowThread partitionPlanSubFlowThread = - new PartitionPlanSubFlowThread(flowInstanceId, - databaseId.get(), addPartitionSqls, flowInstanceService, - authenticationFacade.currentUser()); + new PartitionPlanSubFlowThread(conn.getDefaultSchema(), flowInstanceId, + conn.getId(), addPartitionSqls, flowInstanceService, taskCreator); partitionPlanSubFlowThread.start(); } // Task 2:查找过期分区,并发起数据库变更流程 List dropPartitionSqls = createDropPartitionDDL(accessor, tablePlans); if (!dropPartitionSqls.isEmpty()) { PartitionPlanSubFlowThread partitionPlanSubFlowThread = - new PartitionPlanSubFlowThread(flowInstanceId, - databaseId.get(), dropPartitionSqls, flowInstanceService, - authenticationFacade.currentUser()); + new PartitionPlanSubFlowThread(conn.getDefaultSchema(), flowInstanceId, + conn.getId(), dropPartitionSqls, flowInstanceService, taskCreator); partitionPlanSubFlowThread.start(); } } finally { @@ -122,8 +111,8 @@ private List createAddPartitionDDL(DBSchemaAccessor accessor, List definitions = table.getPartition().getPartitionDefinitions(); + DBTablePartition partition = accessor.getPartition(tablePlan.getSchemaName(), tablePlan.getTableName()); + List definitions = partition.getPartitionDefinitions(); // 分区计划生效中,但表被删除 if (definitions.isEmpty()) { log.warn("No partition found,table={}.{}", tablePlan.getSchemaName(), tablePlan.getTableName()); @@ -140,22 +129,20 @@ private List createAddPartitionDDL(DBSchemaAccessor accessor, List
createDropPartitionDDL(DBSchemaAccessor accessor, List definitions = table.getPartition().getPartitionDefinitions(); + DBTablePartition partition = accessor.getPartition(tablePlan.getSchemaName(), tablePlan.getTableName()); + List definitions = partition.getPartitionDefinitions(); if (definitions.isEmpty()) { continue; } - PartitionExpressionType expressionType = PartitionPlanFunction.getPartitionExpressionType(table); + PartitionExpressionType expressionType = PartitionPlanFunction.getPartitionExpressionType(partition); // 查询结果按升序排列,从左开始查找到第一个未过期的分区则停止 for (DBTablePartitionDefinition definition : definitions) { if (expressionType == PartitionExpressionType.OTHER) { - log.warn("Unsupported partition expression!{}.{}:{}", table.getSchemaName(), - table.getName(), table.getPartition().getPartitionOption().getExpression()); + log.warn("Unsupported partition expression!{}.{}:{}", partition.getSchemaName(), + partition.getTableName(), partition.getPartitionOption().getExpression()); break; } String maxValue = definition.getMaxValues().get(0); @@ -238,12 +225,33 @@ private List createDropPartitionDDL(DBSchemaAccessor accessor, List oldPartitionPlan = + connectionPartitionPlanRepository.findValidPlanByConnectionId(connectionId); + // 如果存在计划,先终止原有计划,并更新原任务状态。 + if (oldPartitionPlan.isPresent()) { + try { + flowInstanceService.cancelNotCheckPermission(oldPartitionPlan.get().getFlowInstanceId()); + } catch (Exception e) { + log.warn("The previous plan has been abandoned,but change status failed."); + } + } + // 还是改为每次都关闭所有生效数据,避免脏数据带来的异常 + connectionPartitionPlanRepository.disableConfigByConnectionId(connectionId); + tablePartitionPlanRepository.disableConfigByConnectionId(connectionId); + connectionPartitionPlanRepository.enableConfigByFlowInstanceId(flowInstanceId); + tablePartitionPlanRepository.enableConfigByFlowInstanceId(flowInstanceId); + } + private String getCreateSql(DBTablePartition partition) { List definitions = partition.getPartitionDefinitions(); DBTablePartitionDefinition right = definitions.get(definitions.size() - 1); @@ -262,16 +270,4 @@ private String getDeleteSql(String schema, String table, String part) { .append(");"); return sqlBuilder.toString(); } - - private DBTable getTable(DBSchemaAccessor accessor, String schemaName, String tableName) { - DBTable table = new DBTable(); - table.setSchemaName(schemaName); - table.setName(tableName); - DBTablePartition partition = accessor.getPartition(schemaName, tableName); - partition.setTableName(tableName); - partition.setSchemaName(schemaName); - table.setPartition(partition); - table.setColumns(accessor.listTableColumns(schemaName, tableName)); - return table; - } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/TablePartitionPlanMapper.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/TablePartitionPlanMapper.java index 6538a02d62..4220ac309a 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/TablePartitionPlanMapper.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/TablePartitionPlanMapper.java @@ -31,7 +31,6 @@ public TablePartitionPlan entityToModel(TablePartitionPlanEntity entity) { .partitionIntervalUnit(entity.getPartitionIntervalUnit()) .expirePeriodUnit(entity.getExpirePeriodUnit()).build(); return TablePartitionPlan.builder() - .databasePartitionPlanId(entity.getDatabasePartitionPlanId()) .tableName(entity.getTableName()) .flowInstanceId(entity.getFlowInstanceId()) .schemaName(entity.getSchemaName()) @@ -40,7 +39,6 @@ public TablePartitionPlan entityToModel(TablePartitionPlanEntity entity) { public TablePartitionPlanEntity modelToEntity(TablePartitionPlan model) { return TablePartitionPlanEntity.builder() - .databasePartitionPlanId(model.getDatabasePartitionPlanId()) .isConfigEnable(model.getDetail().getIsAutoPartition()) .expirePeriod(model.getDetail().getExpirePeriod()) .expirePeriodUnit(model.getDetail().getExpirePeriodUnit()) diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/DatabasePartitionPlan.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/ConnectionPartitionPlan.java similarity index 81% rename from server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/DatabasePartitionPlan.java rename to server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/ConnectionPartitionPlan.java index cc333a9729..13828d0c5e 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/DatabasePartitionPlan.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/ConnectionPartitionPlan.java @@ -18,9 +18,6 @@ import java.io.Serializable; import java.util.List; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonProperty.Access; - import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -36,13 +33,10 @@ @Builder @NoArgsConstructor @AllArgsConstructor -public class DatabasePartitionPlan implements Serializable { +public class ConnectionPartitionPlan implements Serializable { - @JsonProperty(access = Access.READ_ONLY) - private Long id; private Long flowInstanceId; private Long connectionId; - private Long databaseId; private boolean inspectEnable; private InspectTriggerStrategy inspectTriggerStrategy; private List tablePartitionPlans; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/PartitionPlanTaskParameters.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/PartitionPlanTaskParameters.java index 1c6ce29737..b71874bb97 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/PartitionPlanTaskParameters.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/PartitionPlanTaskParameters.java @@ -28,5 +28,5 @@ */ @Data public class PartitionPlanTaskParameters implements Serializable, TaskParameters { - private DatabasePartitionPlan connectionPartitionPlan; + private ConnectionPartitionPlan connectionPartitionPlan; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/TablePartitionPlan.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/TablePartitionPlan.java index 3e1f51ce16..c5f1b3bec5 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/TablePartitionPlan.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/partitionplan/model/TablePartitionPlan.java @@ -33,7 +33,6 @@ @AllArgsConstructor public class TablePartitionPlan implements Serializable { - private Long databasePartitionPlanId; private Long flowInstanceId; private String schemaName; private String tableName; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java index 493702d6d6..270baa5cd9 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/executor/AbstractQuartzJob.java @@ -28,7 +28,6 @@ import com.oceanbase.odc.service.schedule.job.DataDeleteJob; import com.oceanbase.odc.service.schedule.job.OdcJob; import com.oceanbase.odc.service.schedule.job.OnlineSchemaChangeCompleteJob; -import com.oceanbase.odc.service.schedule.job.PartitionPlanJob; import com.oceanbase.odc.service.schedule.job.SqlPlanJob; import com.oceanbase.odc.service.schedule.model.JobType; @@ -80,8 +79,6 @@ public OdcJob getOdcJob(JobExecutionContext context) { return new DataDeleteJob(); case ONLINE_SCHEMA_CHANGE_COMPLETE: return new OnlineSchemaChangeCompleteJob(); - case PARTITION_PLAN: - return new PartitionPlanJob(); default: throw new UnsupportedException(); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/rollbackplan/RollbackGeneratorFactory.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/rollbackplan/RollbackGeneratorFactory.java index 57d2bdbde5..8226f85b52 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/rollbackplan/RollbackGeneratorFactory.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/rollbackplan/RollbackGeneratorFactory.java @@ -49,7 +49,7 @@ public static GenerateRollbackPlan create(@NonNull String sql, @NonNull Rollback ConnectType connectType = connectionSession.getConnectType(); SyncJdbcExecutor syncJdbcExecutor = connectionSession.getSyncJdbcExecutor(ConnectionSessionConstants.BACKEND_DS_KEY); - if (connectType.getDialectType().isMysql()) { + if (connectType.getDialectType().isOBMysql()) { OBMySQLParser parser = new OBMySQLParser(); Statement statement = parser.parse(new StringReader(sql)); if (statement instanceof Update) { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/PartitionPlanPreprocessor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/PartitionPlanPreprocessor.java index 52bd91629c..1a909524a0 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/PartitionPlanPreprocessor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/PartitionPlanPreprocessor.java @@ -15,15 +15,13 @@ */ package com.oceanbase.odc.service.schedule.flowtask; -import java.util.List; +import org.springframework.beans.factory.annotation.Autowired; import com.oceanbase.odc.core.shared.constant.TaskType; import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq; import com.oceanbase.odc.service.flow.processor.FlowTaskPreprocessor; import com.oceanbase.odc.service.flow.processor.Preprocessor; -import com.oceanbase.odc.service.partitionplan.model.DatabasePartitionPlan; -import com.oceanbase.odc.service.partitionplan.model.PartitionPlanTaskParameters; -import com.oceanbase.odc.service.partitionplan.model.TablePartitionPlan; +import com.oceanbase.odc.service.partitionplan.PartitionPlanService; /** * @Author:tinker @@ -33,26 +31,11 @@ @FlowTaskPreprocessor(type = TaskType.PARTITION_PLAN) public class PartitionPlanPreprocessor implements Preprocessor { - /** - * Max partition count for OB MySQL mode, refer to - * 分区概述 - */ - private static final long MAX_PARTITION_COUNT = 8192; + @Autowired + private PartitionPlanService partitionPlanService; @Override public void process(CreateFlowInstanceReq req) { - - PartitionPlanTaskParameters parameters = (PartitionPlanTaskParameters) req.getParameters(); - DatabasePartitionPlan databasePartitionPlan = parameters.getConnectionPartitionPlan(); - databasePartitionPlan.setConnectionId(req.getConnectionId()); - List tablePartitionPlans = parameters.getConnectionPartitionPlan().getTablePartitionPlans(); - for (TablePartitionPlan tablePartitionPlan : tablePartitionPlans) { - if (tablePartitionPlan.getPartitionCount() > MAX_PARTITION_COUNT - && tablePartitionPlan.getDetail().getIsAutoPartition()) { - throw new RuntimeException( - String.format("Can not create more partition. TableName: %s,PartitionCount: %s", - tablePartitionPlan.getTableName(), tablePartitionPlan.getPartitionCount())); - } - } + // TODO } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/PartitionPlanJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/PartitionPlanJob.java deleted file mode 100644 index 14cd3fe861..0000000000 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/PartitionPlanJob.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright (c) 2023 OceanBase. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.oceanbase.odc.service.schedule.job; - -import java.util.List; - -import org.quartz.JobExecutionContext; -import org.quartz.SchedulerException; - -import com.oceanbase.odc.common.json.JsonUtils; -import com.oceanbase.odc.metadb.partitionplan.DatabasePartitionPlanEntity; -import com.oceanbase.odc.metadb.partitionplan.TablePartitionPlanEntity; -import com.oceanbase.odc.metadb.schedule.ScheduleEntity; -import com.oceanbase.odc.service.common.util.SpringContextUtil; -import com.oceanbase.odc.service.partitionplan.PartitionPlanService; -import com.oceanbase.odc.service.partitionplan.PartitionPlanTaskService; -import com.oceanbase.odc.service.quartz.util.ScheduleTaskUtils; -import com.oceanbase.odc.service.schedule.ScheduleService; -import com.oceanbase.odc.service.schedule.model.PartitionPlanJobParameters; -import com.oceanbase.tools.migrator.common.exception.UnExpectedException; - -import lombok.extern.slf4j.Slf4j; - -/** - * @Author:tinker - * @Date: 2023/8/21 10:37 - * @Descripition: - */ - -@Slf4j -public class PartitionPlanJob implements OdcJob { - - private final PartitionPlanTaskService partitionPlanTaskService; - - private final PartitionPlanService partitionPlanService; - - private final ScheduleService scheduleService; - - - - public PartitionPlanJob() { - - partitionPlanTaskService = SpringContextUtil.getBean(PartitionPlanTaskService.class); - - partitionPlanService = SpringContextUtil.getBean(PartitionPlanService.class); - - scheduleService = SpringContextUtil.getBean(ScheduleService.class); - } - - @Override - public void execute(JobExecutionContext context) { - Long scheduleId = ScheduleTaskUtils.getScheduleId(context); - ScheduleEntity scheduleEntity; - try { - scheduleEntity = scheduleService.nullSafeGetById(scheduleId); - } catch (Exception e) { - log.warn("Schedule not found,scheduleId={}", scheduleId); - return; - } - PartitionPlanJobParameters jobParameters = JsonUtils.fromJson(scheduleEntity.getJobParametersJson(), - PartitionPlanJobParameters.class); - - DatabasePartitionPlanEntity databasePartitionPlan = partitionPlanService.getDatabasePartitionPlanById( - jobParameters.getDatabasePartitionPlanId()); - - List tablePartitionPlans = - partitionPlanService - .getValidTablePlanByDatabasePartitionPlanId(jobParameters.getDatabasePartitionPlanId()); - - if (!databasePartitionPlan.isConfigEnabled() || tablePartitionPlans.isEmpty()) { - log.info( - "database partition plan is disable or no table need to process,start to stop partition-plan job,scheduleId={}", - scheduleId); - try { - scheduleService.terminate(scheduleEntity); - } catch (SchedulerException e) { - log.warn("Stop partition plan job failed.", e); - throw new RuntimeException(e); - } - } - try { - partitionPlanTaskService.executePartitionPlan(databasePartitionPlan.getFlowInstanceId(), - tablePartitionPlans); - } catch (Exception e) { - log.warn("Create partition-plan database change task failed.", e); - } - } - - @Override - public void interrupt() { - throw new UnExpectedException(); - } -} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/JobType.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/JobType.java index 049408fc3f..a1ef67b64e 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/JobType.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/JobType.java @@ -23,8 +23,6 @@ public enum JobType { SQL_PLAN, - PARTITION_PLAN, - DATA_ARCHIVE, DATA_ARCHIVE_DELETE, diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/PartitionPlanJobParameters.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/PartitionPlanJobParameters.java deleted file mode 100644 index 75682f326a..0000000000 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/PartitionPlanJobParameters.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2023 OceanBase. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.oceanbase.odc.service.schedule.model; - -import lombok.Data; - -/** - * @Author:tinker - * @Date: 2023/8/23 14:26 - * @Descripition: - */ - -@Data -public class PartitionPlanJobParameters { - private Long databasePartitionPlanId; -} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/factory/OBConsoleDataSourceFactory.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/factory/OBConsoleDataSourceFactory.java index 5b4433bc9a..ab3f88b96c 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/factory/OBConsoleDataSourceFactory.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/factory/OBConsoleDataSourceFactory.java @@ -172,6 +172,10 @@ public static Map getJdbcParams(@NonNull ConnectionConfig connec } else { jdbcUrlParams.put("useSSL", "false"); } + DialectType type = connectionConfig.getDialectType(); + if (type != null && type.isOceanbase()) { + jdbcUrlParams.put("enableFullLinkTrace", "true"); + } return jdbcUrlParams; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/BaseSqlChecker.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/BaseSqlChecker.java index d323da9190..e7fdf441d3 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/BaseSqlChecker.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/BaseSqlChecker.java @@ -52,7 +52,7 @@ public BaseSqlChecker(@NonNull DialectType dialectType, String delimiter) { @Override public List check(@NonNull String sqlScript) { List sqls; - if (dialectType.isMysql()) { + if (dialectType.isOBMysql()) { sqls = splitByCommentProcessor(sqlScript); } else if (dialectType == DialectType.OB_ORACLE) { if (DEFAULT_DELIMITER.equals(this.delimiter)) { diff --git a/server/plugins/connect-plugin-mysql/src/main/java/com/oceanbase/odc/plugin/connect/mysql/MySQLConnectionExtension.java b/server/plugins/connect-plugin-mysql/src/main/java/com/oceanbase/odc/plugin/connect/mysql/MySQLConnectionExtension.java index 08b056c600..43396e28bd 100644 --- a/server/plugins/connect-plugin-mysql/src/main/java/com/oceanbase/odc/plugin/connect/mysql/MySQLConnectionExtension.java +++ b/server/plugins/connect-plugin-mysql/src/main/java/com/oceanbase/odc/plugin/connect/mysql/MySQLConnectionExtension.java @@ -21,7 +21,6 @@ import java.sql.SQLException; import java.util.Collections; import java.util.List; -import java.util.Map; import org.pf4j.Extension; @@ -84,12 +83,4 @@ protected HostAddress parseJdbcUrl(String jdbcUrl) throws SQLException { return new HostAddress(url.getHost(), url.getPort()); } - @Override - protected Map appendDefaultJdbcUrlParameters(Map jdbcUrlParams) { - if (!jdbcUrlParams.containsKey("tinyInt1isBit")) { - jdbcUrlParams.put("tinyInt1isBit", "false"); - } - return jdbcUrlParams; - } - } diff --git a/server/plugins/connect-plugin-ob-mysql/src/main/java/com/oceanbase/odc/plugin/connect/obmysql/OBMySQLConnectionExtension.java b/server/plugins/connect-plugin-ob-mysql/src/main/java/com/oceanbase/odc/plugin/connect/obmysql/OBMySQLConnectionExtension.java index 6ace2a469f..2a2b8623d1 100644 --- a/server/plugins/connect-plugin-ob-mysql/src/main/java/com/oceanbase/odc/plugin/connect/obmysql/OBMySQLConnectionExtension.java +++ b/server/plugins/connect-plugin-ob-mysql/src/main/java/com/oceanbase/odc/plugin/connect/obmysql/OBMySQLConnectionExtension.java @@ -104,7 +104,6 @@ public TestResult test(String jdbcUrl, String username, String password, int que } protected String getJdbcUrlParameters(Map jdbcUrlParams) { - jdbcUrlParams = appendDefaultJdbcUrlParameters(jdbcUrlParams); if (CollectionUtils.isEmpty(jdbcUrlParams)) { return null; } @@ -112,13 +111,6 @@ protected String getJdbcUrlParameters(Map jdbcUrlParams) { .collect(Collectors.joining("&")); } - protected Map appendDefaultJdbcUrlParameters(Map jdbcUrlParams) { - if (!jdbcUrlParams.containsKey("enableFullLinkTrace")) { - jdbcUrlParams.put("enableFullLinkTrace", "true"); - } - return jdbcUrlParams; - } - private TestResult test(String jdbcUrl, Properties properties, int queryTimeout) { HostAddress hostAddress; try {