diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 54ef5f816b5..3ce5d5e9e82 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -50,6 +50,10 @@ public class InlongConstants {
public static final String SEMICOLON = ";";
+ public static final String HYPHEN = "-";
+
+ public static final String UNDERSCORE = "_";
+
public static final String LEFT_BRACKET = "(";
public static final String PERCENT = "%";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/tenant/MultiTenantQuery.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/tenant/MultiTenantQuery.java
new file mode 100644
index 00000000000..9996220806b
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/tenant/MultiTenantQuery.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.tenant;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation indicate that SQL queries from this type or method should
+ * be conditioned by tenant, which is obtained from the login user.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Target({ElementType.METHOD, ElementType.TYPE})
+public @interface MultiTenantQuery {
+
+ boolean with() default true;
+}
diff --git a/inlong-manager/manager-dao/pom.xml b/inlong-manager/manager-dao/pom.xml
index 6235e169042..ec1e0b198ab 100644
--- a/inlong-manager/manager-dao/pom.xml
+++ b/inlong-manager/manager-dao/pom.xml
@@ -89,6 +89,10 @@
com.github.pagehelper
pagehelper-spring-boot-starter
+
+ com.github.jsqlparser
+ jsqlparser
+
org.projectlombok
lombok
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/config/JDBCSourceConfig.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/config/JDBCSourceConfig.java
index 9d7296a9424..263fea8b432 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/config/JDBCSourceConfig.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/config/JDBCSourceConfig.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.dao.config;
+import org.apache.inlong.manager.dao.interceptor.MultiTenantInterceptor;
+
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
@@ -52,7 +54,7 @@ public SqlSessionFactory sqlSessionFactory() throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource());
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mappers/*.xml"));
-
+ Objects.requireNonNull(bean.getObject()).getConfiguration().addInterceptor(new MultiTenantInterceptor());
Objects.requireNonNull(bean.getObject()).getConfiguration().setMapUnderscoreToCamelCase(true);
return bean.getObject();
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/interceptor/MultiTenantInterceptor.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/interceptor/MultiTenantInterceptor.java
new file mode 100644
index 00000000000..00f3f67cb66
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/interceptor/MultiTenantInterceptor.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.interceptor;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.pojo.user.LoginUserUtils;
+import org.apache.inlong.manager.pojo.user.UserInfo;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.parser.CCJSqlParserManager;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.select.PlainSelect;
+import net.sf.jsqlparser.statement.select.Select;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ibatis.executor.statement.StatementHandler;
+import org.apache.ibatis.mapping.BoundSql;
+import org.apache.ibatis.mapping.MappedStatement;
+import org.apache.ibatis.plugin.Interceptor;
+import org.apache.ibatis.plugin.Intercepts;
+import org.apache.ibatis.plugin.Invocation;
+import org.apache.ibatis.plugin.Plugin;
+import org.apache.ibatis.plugin.Signature;
+import org.apache.ibatis.reflection.DefaultReflectorFactory;
+import org.apache.ibatis.reflection.MetaObject;
+import org.apache.ibatis.reflection.SystemMetaObject;
+
+import java.io.StringReader;
+import java.sql.Connection;
+import java.util.Properties;
+
+/**
+ * Interceptor for multi-tenant.
+ */
+@Intercepts({
+ @Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})
+})
+public class MultiTenantInterceptor implements Interceptor {
+
+ private static final String TENANT_CONDITION = "tenant=";
+
+ @Override
+ public Object intercept(Invocation invocation) throws Throwable {
+ StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
+ MetaObject metaObject = MetaObject.forObject(statementHandler, SystemMetaObject.DEFAULT_OBJECT_FACTORY,
+ SystemMetaObject.DEFAULT_OBJECT_WRAPPER_FACTORY, new DefaultReflectorFactory());
+ MappedStatement mappedStatement = (MappedStatement) metaObject.getValue("delegate.mappedStatement");
+
+ String fullMethodName = mappedStatement.getId();
+ if (!MultiTenantQueryFilter.isMultiTenantQuery(fullMethodName.split(InlongConstants.UNDERSCORE)[0])) {
+ return invocation.proceed();
+ }
+
+ BoundSql boundSql = statementHandler.getBoundSql();
+ String sql = boundSql.getSql();
+
+ CCJSqlParserManager parserManager = new CCJSqlParserManager();
+ Select select = (Select) parserManager.parse(new StringReader(sql));
+ PlainSelect plain = (PlainSelect) select.getSelectBody();
+
+ StringBuilder whereSql = new StringBuilder();
+ whereSql.append(TENANT_CONDITION).append(getTenant());
+
+ Expression where = plain.getWhere();
+ if (where == null) {
+ Expression expression = CCJSqlParserUtil.parseCondExpression(whereSql.toString());
+ plain.setWhere(expression);
+ } else {
+ if (where.toString().contains(TENANT_CONDITION)) {
+ return invocation.proceed();
+ }
+
+ // else, append the tenant condition
+ whereSql.append(" and ( ").append(where).append(" )");
+ Expression expression = CCJSqlParserUtil.parseCondExpression(whereSql.toString());
+ plain.setWhere(expression);
+ }
+ metaObject.setValue("delegate.boundSql.sql", select.toString());
+ return invocation.proceed();
+ }
+
+ private static String getTenant() {
+ UserInfo userInfo = LoginUserUtils.getLoginUser();
+ if (userInfo == null) {
+ throw new IllegalStateException("current login user is null, please login first");
+ }
+ String tenant = userInfo.getTenant();
+ if (StringUtils.isBlank(tenant)) {
+ throw new IllegalStateException("get no target tenant of userInfo=" + userInfo);
+ }
+ return tenant;
+ }
+
+ @Override
+ public Object plugin(Object target) {
+ if (target instanceof StatementHandler) {
+ return Plugin.wrap(target, this);
+ } else {
+ return target;
+ }
+ }
+
+ @Override
+ public void setProperties(Properties properties) {
+
+ }
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/interceptor/MultiTenantQueryFilter.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/interceptor/MultiTenantQueryFilter.java
new file mode 100644
index 00000000000..8cf99587d88
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/interceptor/MultiTenantQueryFilter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.interceptor;
+
+import org.apache.inlong.manager.common.tenant.MultiTenantQuery;
+
+import lombok.extern.slf4j.Slf4j;
+import org.reflections.Reflections;
+import org.reflections.scanners.Scanners;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Filter to check if SQLs from some method should add tenant condition or not.
+ */
+@Slf4j
+@Component
+public class MultiTenantQueryFilter {
+
+ private static final String METHOD_FILTER_PATH = "org.apache.inlong.manager.dao.mapper";
+
+ private static final Set METHOD_SET = new HashSet<>();
+
+ /**
+ * Check whether the specified method supports multi-tenant queries.
+ *
+ * @param methodName method name
+ * @return true if supports multi-tenant query, false if not
+ */
+ public static boolean isMultiTenantQuery(String methodName) {
+ return METHOD_SET.contains(methodName);
+ }
+
+ /**
+ * Find all methods that support multi-tenant queries - used MultiTenantQuery annotation.
+ */
+ @PostConstruct
+ private void init() {
+ Reflections methodReflections = new Reflections(METHOD_FILTER_PATH, Scanners.MethodsAnnotated);
+ // process methods
+ Set methodSet = methodReflections.getMethodsAnnotatedWith(MultiTenantQuery.class);
+ markMethods(methodSet);
+
+ // process classes
+ Reflections reflections = new Reflections(METHOD_FILTER_PATH, Scanners.TypesAnnotated);
+ Set> clazzSet = reflections.getTypesAnnotatedWith(MultiTenantQuery.class);
+ clazzSet.stream()
+ .filter(Class::isInterface)
+ .forEach(clazz -> {
+ // Get the JsonTypeDefine annotation
+ MultiTenantQuery annotation = clazz.getAnnotation(MultiTenantQuery.class);
+ if (annotation == null || !annotation.with()) {
+ return;
+ }
+ List methods = Arrays.asList(clazz.getMethods());
+ markMethods(methods);
+ });
+
+ log.debug("success to find all methods that support multi-tenant queries, methods={}", METHOD_SET);
+ }
+
+ private static void markMethods(Collection methods) {
+ methods.forEach(method -> {
+ MultiTenantQuery annotation = method.getAnnotation(MultiTenantQuery.class);
+ if (annotation != null && !annotation.with()) {
+ METHOD_SET.remove(getMethodFullName(method));
+ } else {
+ METHOD_SET.add(getMethodFullName(method));
+ }
+ });
+ }
+
+ private static String getMethodFullName(Method method) {
+ return method.getDeclaringClass().getName() + "." + method.getName();
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 32a9ffc78ba..2fa96ff74ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,6 +92,7 @@
3.0.0
1.4.2
5.3.1
+ 4.6
2.1.214
2.0.0
@@ -636,6 +637,11 @@
pagehelper-spring-boot-starter
${pagehelper.springboot.version}
+
+ com.github.jsqlparser
+ jsqlparser
+ ${jsqlparser.version}
+