Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: source schema sync bug fix #1103

Merged
merged 2 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions core/src/main/java/datart/core/common/TransactionHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Datart
* <p>
* Copyright 2021
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package datart.core.common;

import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

public class TransactionHelper {

public static TransactionStatus getTransaction(int propagationBehavior, int isolationLevel) {
PlatformTransactionManager transactionManager = Application.getBean(PlatformTransactionManager.class);
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setIsolationLevel(isolationLevel);
def.setPropagationBehavior(propagationBehavior);
return transactionManager.getTransaction(def);
}

public static void commit(TransactionStatus transactionStatus) {
Application.getBean(PlatformTransactionManager.class).commit(transactionStatus);
}

public static void rollback(TransactionStatus transactionStatus) {
Application.getBean(PlatformTransactionManager.class).rollback(transactionStatus);
}

}
33 changes: 25 additions & 8 deletions server/src/main/java/datart/server/job/SchemaSyncJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,27 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import datart.core.common.Application;
import datart.core.common.TransactionHelper;
import datart.core.common.UUIDGenerator;
import datart.core.data.provider.SchemaItem;
import datart.core.data.provider.TableInfo;
import datart.core.entity.Source;
import datart.core.entity.SourceSchemas;
import datart.core.mappers.ext.SourceSchemasMapperExt;
import datart.server.service.DataProviderService;
import datart.server.service.SourceService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.*;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;

import java.io.Closeable;
import java.io.IOException;
import java.util.*;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;

@Slf4j
public class SchemaSyncJob implements Job, Closeable {
Expand All @@ -51,6 +57,18 @@ public void close() throws IOException {
public void execute(JobExecutionContext context) throws JobExecutionException {
String sourceId = (String) context.getMergedJobDataMap().get(SOURCE_ID);
try {
Source source = null;
try {
source = Application.getBean(SourceService.class).retrieve(sourceId);
} catch (Exception ignored) {
}
// remove job if source not exists
if (source == null) {
JobKey key = context.getJobDetail().getKey();
Application.getBean(Scheduler.class).deleteJob(key);
log.warn("source {} not exists , the job has been deleted ", sourceId);
return;
}
execute(sourceId);
} catch (Exception e) {
log.error("source schema sync error ", e);
Expand All @@ -60,9 +78,6 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
public boolean execute(String sourceId) throws Exception {
List<SchemaItem> schemaItems = new LinkedList<>();
DataProviderService dataProviderService = Application.getBean(DataProviderService.class);

// TODO remove job if source not exists

Set<String> databases = dataProviderService.readAllDatabases(sourceId);
if (CollectionUtils.isNotEmpty(databases)) {
for (String database : databases) {
Expand All @@ -85,6 +100,7 @@ public boolean execute(String sourceId) throws Exception {
}

private boolean upsertSchemaInfo(String sourceId, List<SchemaItem> schemaItems) {
TransactionStatus transaction = TransactionHelper.getTransaction(TransactionDefinition.PROPAGATION_REQUIRES_NEW, TransactionDefinition.ISOLATION_REPEATABLE_READ);
try {
SourceSchemasMapperExt mapper = Application.getBean(SourceSchemasMapperExt.class);
SourceSchemas sourceSchemas = mapper.selectBySource(sourceId);
Expand All @@ -100,12 +116,13 @@ private boolean upsertSchemaInfo(String sourceId, List<SchemaItem> schemaItems)
sourceSchemas.setSchemas(OBJECT_MAPPER.writeValueAsString(schemaItems));
mapper.updateByPrimaryKey(sourceSchemas);
}
TransactionHelper.commit(transaction);
return true;
} catch (Exception e) {
TransactionHelper.rollback(transaction);
log.error("source schema parse error ", e);
return false;
}
}


}