diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md new file mode 100644 index 00000000..82218340 --- /dev/null +++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/README.md @@ -0,0 +1,48 @@ +# APIJSONDemo + +## 支持多数据源-消息队列 + +示例:kafka + +原理说明: + +Access表名 = 消息队列 topic + +Access表配置说明: +![image](https://user-images.githubusercontent.com/12228225/210956299-204115a7-433c-4f18-af27-5120068dab2e.png) +Request表配置post权限 +![image](https://user-images.githubusercontent.com/12228225/210956378-be095589-0ced-4317-bb46-6b296538f26e.png) + +apijson发送mq消息: +单条
+{ + "@datasource": "kafka", + "Topic_User":{ + "message":"test-101" + }, + "tag": "Topic_User", + "@explain": false +}
+多条
+{ + "Topic_User[]": [ + { + "message":"test-100" + }, + { + "message":"test-101" + } + ], + "tag": "Topic_User[]", + "@datasource": "kafka", + "@explain": true +} + +客户端接收消息: + +offset = 47, key = null, value = test-101
+offset = 48, key = null, value = test-100
+offset = 49, key = null, value = test-101
+ + +用java代码方式,获取具体数据源,调用即可 diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/pom.xml b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/pom.xml new file mode 100644 index 00000000..8a84134d --- /dev/null +++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/pom.xml @@ -0,0 +1,203 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.5.13 + + + apijson.demo + apijsondemo-multidatasource-kafka + 5.4.0 + + apijsondemo-multidatasource-kafka + Demo project for testing APIJSON server based on SpringBoot + + + UTF-8 + UTF-8 + 3.12.0 + 1.1.16 + 3.5.1 + 2.3.3 + 4.4 + 1.10 + 30.1.1-jre + 1.2.72 + 4.1.1 + 1.18.4 + 3.12.0 + 2.5 + 1.10 + 4.4 + 1.10 + 5.4.0 + 8.0.31 + 5.3.18 + 2.6.6 + 3.5.2 + 1.8 + 3.2.1 + + + + + + javax.activation + activation + 1.1.1 + + + + + com.github.Tencent + APIJSON + ${apijson.version} + + + com.github.APIJSON + apijson-framework + ${apijson.version} + + + + + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework + spring-context-support + ${spring-context-support.version} + + + org.springframework.boot + spring-boot-configuration-processor + ${spring-boot-configuration-processor.version} + true + + + com.alibaba + druid-spring-boot-starter + ${druid.version} + + + com.baomidou + dynamic-datasource-spring-boot-starter + ${dynamic-datasource-spring-boot-starter.version} + + + com.baomidou + mybatis-plus-boot-starter + ${mybatisplus.version} + + + com.baomidou + mybatis-plus-generator + + + + + com.baomidou + mybatis-plus-support + ${mybatis-plus-support.version} + + + org.apache.commons + commons-collections4 + ${commons-collections4.version} + + + mysql + mysql-connector-java + ${mysql.version} + + + com.google.guava + guava + ${guava.version} + + + org.projectlombok + lombok + ${lombok.version} + + + commons-io + commons-io + ${commons.io.version} + + + commons-codec + commons-codec + ${commons.codec.version} + + + commons-configuration + commons-configuration + ${commons.configuration.version} + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + true + apijson.demo.DemoApplication + + + + + repackage + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + + + + jitpack.io + https://jitpack.io + + true + + + + + spring-snapshots + https://repo.spring.io/snapshot + + true + + + + spring-milestones + https://repo.spring.io/milestone + + + + \ No newline at end of file diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DataBaseConfig.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DataBaseConfig.java new file mode 100644 index 00000000..6e261537 --- /dev/null +++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DataBaseConfig.java @@ -0,0 +1,22 @@ +package apijson.demo; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class DataBaseConfig { + private String primary; + + @Value("${spring.datasource.dynamic.primary}") + public void setPrimary(String primary) { + this.primary = primary; + } + + public String getPrimary() { + return primary; + } + + public static DataBaseConfig getInstence() { + return SpringContextUtils.getBean(DataBaseConfig.class); + } +} diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DataBaseUtil.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DataBaseUtil.java new file mode 100644 index 00000000..a4c6bbf3 --- /dev/null +++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DataBaseUtil.java @@ -0,0 +1,54 @@ +package apijson.demo; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import lombok.extern.log4j.Log4j2; + +@Log4j2 +public class DataBaseUtil { + + /** + * 根据url获取库名 + * @param url + * @return + */ + public static String getLibname(String url) { + Pattern p = Pattern.compile("jdbc:(?\\w+):.*((//)|@)(?.+):(?\\d+)(/|(;DatabaseName=)|:)(?\\w+)\\??.*"); + Matcher m = p.matcher(url); + if(m.find()) { + return m.group("dbName"); + } + return null; + } + + /*** + * primary: master + * strict: false + * @param datasource: 匹配不成功, 自动匹配默认数据库 + * @return + */ + public static javax.sql.DataSource getDataSource(String datasource) { + try { + return DynamicDataSource.getDetail(datasource).getDataSource(); // 数据源 + } catch (Exception e) { + throw new IllegalArgumentException("动态数据源配置错误 " + datasource); + } + } + + public static String getDruidUrl(String datasource) { + return DynamicDataSource.getDetail(datasource).getUrl(); // 数据库连接url + } + + public static String getDruidSchema(String datasource) { + return getLibname(DynamicDataSource.getDetail(datasource).getUrl()); // 数据库名; + } + + public static String getDruidDBAccount(String datasource) { + return DynamicDataSource.getDetail(datasource).getDbAccount(); // 数据库用户名 + } + + public static String getDruidDBPassword(String datasource) { + return DynamicDataSource.getDetail(datasource).getDbPassword(); // 数据库密码 + } +} diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoApplication.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoApplication.java new file mode 100644 index 00000000..2a5f6897 --- /dev/null +++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoApplication.java @@ -0,0 +1,101 @@ +/*Copyright ©2016 TommyLemon(https://github.com/TommyLemon/APIJSON) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.*/ + +package apijson.demo; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.web.server.WebServerFactoryCustomizer; +import org.springframework.boot.web.servlet.server.ConfigurableServletWebServerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.servlet.config.annotation.CorsRegistry; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; + +import apijson.Log; +import apijson.framework.APIJSONApplication; +import apijson.framework.APIJSONCreator; +import apijson.orm.AbstractFunctionParser; +import apijson.orm.AbstractVerifier; +import apijson.orm.FunctionParser; +import apijson.orm.Parser; +import apijson.orm.SQLConfig; +import apijson.orm.SQLExecutor; + +/** + * Demo SpringBoot Application 主应用程序启动类 右键这个类 > Run As > Java Application 具体见 + * SpringBoot 文档 + * https://www.springcloud.cc/spring-boot.html#using-boot-locating-the-main-class + * + * @author Lemon + */ +@Configuration +@SpringBootApplication +@EnableAutoConfiguration +@EnableConfigurationProperties +public class DemoApplication implements WebServerFactoryCustomizer { + public static final String TAG = "DemoApplication"; + + public static void main(String[] args) throws Exception { + SpringApplication.run(DemoApplication.class, args); + Log.DEBUG = true; + APIJSONApplication.init(false); // 4.4.0 以上需要这句来保证以上 static 代码块中给 DEFAULT_APIJSON_CREATOR 赋值会生效 + } + + // SpringBoot 2.x 自定义端口方式 + @Override + public void customize(ConfigurableServletWebServerFactory server) { + server.setPort(8080); + } + + // 支持 APIAuto 中 JavaScript 代码跨域请求 + @Bean + public WebMvcConfigurer corsConfigurer() { + return new WebMvcConfigurer() { + @Override + public void addCorsMappings(CorsRegistry registry) { + registry.addMapping("/**").allowedOriginPatterns("*").allowedMethods("*").allowCredentials(true) + .maxAge(3600); + } + }; + } + + static { + // 使用本项目的自定义处理类 + APIJSONApplication.DEFAULT_APIJSON_CREATOR = new APIJSONCreator() { + @Override + public Parser createParser() { + return new DemoParser(); + } + + @Override + public SQLConfig createSQLConfig() { + return new DemoSQLConfig(); + } + + @Override + public FunctionParser createFunctionParser() { + return new DemoFunctionParser(); + } + + @Override + public SQLExecutor createSQLExecutor() { + return new DemoSQLExecutor(); + } + }; + } + +} diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoController.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoController.java new file mode 100644 index 00000000..c91c58f3 --- /dev/null +++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoController.java @@ -0,0 +1,99 @@ +/*Copyright ©2016 TommyLemon(https://github.com/TommyLemon/APIJSON) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.*/ + +package apijson.demo; + +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.net.URLDecoder; +import java.util.Map; + +import javax.servlet.http.HttpSession; + +import apijson.RequestMethod; +import apijson.StringUtil; +import apijson.framework.APIJSONController; +import apijson.orm.Parser; + + +/**请求路由入口控制器,包括通用增删改查接口等,转交给 APIJSON 的 Parser 来处理 + * 具体见 SpringBoot 文档 + * https://www.springcloud.cc/spring-boot.html#boot-features-spring-mvc + * 以及 APIJSON 通用文档 3.设计规范 3.1 操作方法 + * https://github.com/Tencent/APIJSON/blob/master/Document.md#3.1 + *
建议全通过HTTP POST来请求: + *
1.减少代码 - 客户端无需写HTTP GET,PUT等各种方式的请求代码 + *
2.提高性能 - 无需URL encode和decode + *
3.调试方便 - 建议使用 APIAuto(http://apijson.cn/api) 或 Postman + * @author Lemon + */ +@RestController +@RequestMapping("") +public class DemoController extends APIJSONController { + + @Override + public Parser newParser(HttpSession session, RequestMethod method) { + return super.newParser(session, method).setNeedVerify(false); // TODO 这里关闭校验,方便新手快速测试,实际线上项目建议开启 + } + + /**增删改查统一接口,这个一个接口可替代 7 个万能通用接口,牺牲一些路由解析性能来提升一点开发效率 + * @param method + * @param request + * @param session + * @return + */ + @PostMapping(value = "{method}") // 如果和其它的接口 URL 冲突,可以加前缀,例如改为 crud/{method} 或 Controller 注解 @RequestMapping("crud") + @Override + public String crud(@PathVariable String method, @RequestBody String request, HttpSession session) { + return super.crud(method, request, session); + } + + /**增删改查统一接口,这个一个接口可替代 7 个万能通用接口,牺牲一些路由解析性能来提升一点开发效率 + * @param method + * @param tag + * @param params + * @param request + * @param session + * @return + */ + @PostMapping("{method}/{tag}") // 如果和其它的接口 URL 冲突,可以加前缀,例如改为 crud/{method}/{tag} 或 Controller 注解 @RequestMapping("crud") + @Override + public String crudByTag(@PathVariable String method, @PathVariable String tag, @RequestParam Map params, @RequestBody String request, HttpSession session) { + return super.crudByTag(method, tag, params, request, session); + } + + /**获取 + * 只为兼容HTTP GET请求,推荐用HTTP POST,可删除 + * @param request 只用String,避免encode后未decode + * @param session + * @return + * @see {@link RequestMethod#GET} + */ + @GetMapping("get/{request}") + public String openGet(@PathVariable String request, HttpSession session) { + try { + request = URLDecoder.decode(request, StringUtil.UTF_8); + } catch (Exception e) { + // Parser 会报错 + } + return get(request, session); + } + +} \ No newline at end of file diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoFunctionParser.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoFunctionParser.java new file mode 100644 index 00000000..92e729c7 --- /dev/null +++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoFunctionParser.java @@ -0,0 +1,93 @@ +package apijson.demo; + +import javax.servlet.http.HttpSession; + +import com.alibaba.fastjson.JSONObject; + +import apijson.NotNull; +import apijson.RequestMethod; +import apijson.StringUtil; +import apijson.framework.APIJSONFunctionParser; +import apijson.framework.APIJSONVerifier; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class DemoFunctionParser extends APIJSONFunctionParser { + public DemoFunctionParser() { + this(null, null, 0, null, null); + } + + // 展示在远程函数内部可以用 this 拿到的东西 + public DemoFunctionParser(RequestMethod method, String tag, int version, JSONObject request, HttpSession session) { + super(method, tag, version, request, session); + } + + /*** + * 获取当前用户id + * + * @param current + * @return + */ + public String getCurrentUserId(@NotNull JSONObject current) { + if (this.getSession() == null) { + return "test"; // 启动时的自动测试 + } + return APIJSONVerifier.getVisitorId(getSession()); + } + + /** + * 一个最简单的远程函数示例,返回一个前面拼接了 Hello 的字符串 + * + * @param current + * @param name + * @return + * @throws Exception + */ + public String sayHello(@NotNull JSONObject current, @NotNull String name) throws Exception { + // 注意这里参数 name 是 key,不是 value + Object obj = current.get(name); + + if (this.getSession() == null) { + return "test"; // 启动时的自动测试 + } + + if (obj == null) { + throw new IllegalArgumentException(); + } + if (!(obj instanceof String)) { + throw new IllegalArgumentException(); + } + + // 之后可以用 this.getSession 拿到当前的 HttpSession + return "Hello, " + obj.toString(); + } + + /*** + * 密码加密 + * + * @param current + * @param id 添加id生成 + * @param password 密码字段名 + * @return + * @throws Exception + */ + public void pwdEncrypt(@NotNull JSONObject current, @NotNull String id, @NotNull String password) + throws Exception { + String c_password = current.getString(password); + current.put(password, c_password + "_" + System.currentTimeMillis()); + } + + public void childFunTest(@NotNull JSONObject current, @NotNull String addr) throws Exception { + String c_addr = current.getString(addr); + current.put(addr, c_addr + "_" + System.currentTimeMillis()); + } + + + public void removeKeys(@NotNull JSONObject current, String keys) { + String[] ks = StringUtil.split(keys, ";"); // 用分号 ; 分割 + // 根据 ks remove 掉 current 里的字段 + for (int i = 0; i < ks.length; i++) { + current.remove(ks[i]); + } + } +} diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoObjectParser.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoObjectParser.java new file mode 100644 index 00000000..627de64c --- /dev/null +++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoObjectParser.java @@ -0,0 +1,26 @@ +package apijson.demo; + +import java.util.List; + +import javax.servlet.http.HttpSession; + +import com.alibaba.fastjson.JSONObject; + +import apijson.NotNull; +import apijson.RequestMethod; +import apijson.framework.APIJSONObjectParser; +import apijson.orm.Join; +import apijson.orm.SQLConfig; + +public class DemoObjectParser extends APIJSONObjectParser { + + public DemoObjectParser(HttpSession session, @NotNull JSONObject request, String parentPath, SQLConfig arrayConfig + , boolean isSubquery, boolean isTable, boolean isArrayMainTable) throws Exception { + super(session, request, parentPath, arrayConfig, isSubquery, isTable, isArrayMainTable); + } + + @Override + public SQLConfig newSQLConfig(RequestMethod method, String table, String alias, JSONObject request, List joinList, boolean isProcedure) throws Exception { + return DemoSQLConfig.newSQLConfig(method, table, alias, request, joinList, isProcedure); + } +} diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoParser.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoParser.java new file mode 100644 index 00000000..e12333e6 --- /dev/null +++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoParser.java @@ -0,0 +1,35 @@ +package apijson.demo; + +import com.alibaba.fastjson.JSONObject; + +import apijson.RequestMethod; +import apijson.framework.APIJSONObjectParser; +import apijson.framework.APIJSONParser; +import apijson.orm.SQLConfig; + +public class DemoParser extends APIJSONParser { + public DemoParser() { + super(); + } + + public DemoParser(RequestMethod method) { + super(method); + } + + public DemoParser(RequestMethod method, boolean needVerify) { + super(method, needVerify); + } + + // 可重写来设置最大查询数量 + // @Override + // public int getMaxQueryCount() { + // return 50; + // } + + @Override + public APIJSONObjectParser createObjectParser(JSONObject request, String parentPath, SQLConfig arrayConfig, boolean isSubquery, boolean isTable, boolean isArrayMainTable) throws Exception { + return new DemoObjectParser(getSession(), request, parentPath, arrayConfig, isSubquery, isTable, isArrayMainTable).setMethod(getMethod()).setParser(this); + } + + +} \ No newline at end of file diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoSQLConfig.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoSQLConfig.java new file mode 100644 index 00000000..6337d426 --- /dev/null +++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoSQLConfig.java @@ -0,0 +1,133 @@ +/*Copyright ©2016 TommyLemon(https://github.com/TommyLemon/APIJSON) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.*/ + +package apijson.demo; + +import java.util.UUID; + +import com.alibaba.fastjson.annotation.JSONField; + +import apijson.RequestMethod; +import apijson.framework.APIJSONSQLConfig; +import apijson.orm.AbstractSQLConfig; + + +/**SQL 配置 + * TiDB 用法和 MySQL 一致 + * 具体见详细的说明文档 C.开发说明 C-1-1.修改数据库链接 + * https://github.com/Tencent/APIJSON/blob/master/%E8%AF%A6%E7%BB%86%E7%9A%84%E8%AF%B4%E6%98%8E%E6%96%87%E6%A1%A3.md#c-1-1%E4%BF%AE%E6%94%B9%E6%95%B0%E6%8D%AE%E5%BA%93%E9%93%BE%E6%8E%A5 + * @author Lemon + */ +public class DemoSQLConfig extends APIJSONSQLConfig { + + public DemoSQLConfig() { + super(); + } + + public DemoSQLConfig(RequestMethod method, String table) { + super(method, table); + } + + static { +// DEFAULT_DATABASE = DATABASE_ELASTICSEARCH; // TODO 默认数据库类型,改成你自己的 +// DEFAULT_SCHEMA = "sys"; // TODO 默认数据库名/模式,改成你自己的,默认情况是 MySQL: sys, PostgreSQL: public, SQL Server: dbo, Oracle: + + // 表名和数据库不一致的,需要配置映射关系。只使用 APIJSONORM 时才需要; + // 如果用了 apijson-framework 且调用了 APIJSONApplication.init 则不需要 + // (间接调用 DemoVerifier.init 方法读取数据库 Access 表来替代手动输入配置)。 + // 但如果 Access 这张表的对外表名与数据库实际表名不一致,仍然需要这里注册。例如 + // TABLE_KEY_MAP.put(Access.class.getSimpleName(), "access"); + + SIMPLE_CALLBACK = new SimpleCallback() { + + @Override + public AbstractSQLConfig getSQLConfig(RequestMethod method, String database, String schema, + String datasource, String table) { + return new DemoSQLConfig(method, table); + } + + // 取消注释来实现数据库自增 id + @Override + public String newId(RequestMethod method, String database, String schema, String datasource, String table) { + if(table.equals("Access") || table.equals("Request") || table.equals("Function")){ + return null; + } + return UUID.randomUUID().toString(); // return null 则不生成 id,一般用于数据库自增 id + } + }; + } + + @JSONField(serialize = false) // 不在日志打印 账号/密码 等敏感信息,用了 UnitAuto 则一定要加 + @Override + public String getDBVersion() { + return DynamicDataSource.getDetail(this.getDatasource()).getDbVersion(); + } + + @JSONField(serialize = false) // 不在日志打印 账号/密码 等敏感信息,用了 UnitAuto 则一定要加 + @Override + public String getDatabase() { + if (super.getDatabase() != null) { + return super.getDatabase(); + } + try { + return DynamicDataSource.getDetail(this.getDatasource()).getDatabase(); + } catch (Exception e) { + throw new IllegalArgumentException("动态数据源配置错误 " + this.getDatasource()); + } + } + + @JSONField(serialize = false) // 不在日志打印 账号/密码 等敏感信息,用了 UnitAuto 则一定要加 + @Override + public String getSchema() { + if (super.getSchema() != null) { + return super.getSchema(); + } + try { + return DynamicDataSource.getDetail(this.getDatasource()).getSchema(); + } catch (Exception e) { + throw new IllegalArgumentException("动态数据源配置错误 " + this.getDatasource()); + } + } + + @JSONField(serialize = false) // 不在日志打印 账号/密码 等敏感信息,用了 UnitAuto 则一定要加 + @Override + public String getDBUri() { + try { + return DynamicDataSource.getDetail(this.getDatasource()).getUrl(); // 数据库连接url + } catch (Exception e) { + throw new IllegalArgumentException("动态数据源配置错误 " + this.getDatasource()); + } + } + + @JSONField(serialize = false) // 不在日志打印 账号/密码 等敏感信息,用了 UnitAuto 则一定要加 + @Override + public String getDBAccount() { + try { + return DynamicDataSource.getDetail(this.getDatasource()).getDbAccount(); + } catch (Exception e) { + throw new IllegalArgumentException("动态数据源配置错误 " + this.getDatasource()); + } + } + + @JSONField(serialize = false) // 不在日志打印 账号/密码 等敏感信息,用了 UnitAuto 则一定要加 + @Override + public String getDBPassword() { + try { + return DynamicDataSource.getDetail(this.getDatasource()).getDbPassword(); + } catch (Exception e) { + throw new IllegalArgumentException("动态数据源配置错误 " + this.getDatasource()); + } + } + +} diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoSQLExecutor.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoSQLExecutor.java new file mode 100644 index 00000000..f4e8cbed --- /dev/null +++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DemoSQLExecutor.java @@ -0,0 +1,128 @@ +/*Copyright ©2016 TommyLemon(https://github.com/TommyLemon/APIJSON) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.*/ + +package apijson.demo; + +import java.sql.Connection; +import java.util.List; + +import javax.sql.DataSource; + +import apijson.Log; +import apijson.NotNull; +import apijson.StringUtil; +import apijson.framework.APIJSONSQLExecutor; +import apijson.orm.SQLConfig; +import lombok.extern.log4j.Log4j2; + +/** + * SQL 执行器,支持连接池及多数据源 具体见 https://github.com/Tencent/APIJSON/issues/151 + * + * @author Lemon + */ +@Log4j2 +public class DemoSQLExecutor extends APIJSONSQLExecutor { + public static final String TAG = "DemoSQLExecutor"; + + // 适配连接池,如果这里能拿到连接池的有效 Connection,则 SQLConfig 不需要配置 dbVersion, dbUri, dbAccount, + // dbPassword + @Override + public Connection getConnection(SQLConfig config) throws Exception { + String datasource = config.getDatasource(); + Log.d(TAG, "getConnection config.getDatasource() = " + datasource); + + String key = datasource + "-" + config.getDatabase(); + Connection conn = connectionMap.get(key); + if (conn == null || conn.isClosed()) { + DataSource dataSource = DataBaseUtil.getDataSource(datasource); + connectionMap.put(key, dataSource == null ? null : dataSource.getConnection()); + } + return super.getConnection(config); + } + + @SuppressWarnings("incomplete-switch") + @Override + public int executeUpdate(@NotNull SQLConfig config, String sql) throws Exception { + if (StringUtil.equals(config.getDatasource(), "kafka")) { + // if (config.isMQ() && StringUtil.equals(config.getDatasource(), "kafka")) { + switch (config.getMethod()) { + case POST: + // 消息组装、二次处理 + String jsonColumn = "message"; + DynamicDataSource DynamicDataSource = apijson.demo.DynamicDataSource.getDetail(config.getDatasource()); + for (int i = 0; i < config.getColumn().size(); i++) { + String column = config.getColumn().get(i); + if (StringUtil.equals(column, jsonColumn)) { + for (List list : config.getValues()) { + Object message = list.get(i); + return KafkaSimpleProducer.sendMessage(config.getDatasource(), DynamicDataSource.getProps(), config.getTable(), message); + } + } + } + } + return 0; + } + return super.executeUpdate(config, sql); + } + /*** + * 查询返回字段值进行二次处理 + */ +// @Override +// protected JSONObject onPutColumn(@NotNull SQLConfig config, @NotNull ResultSet rs, @NotNull ResultSetMetaData rsmd +// , final int tablePosition, @NotNull JSONObject table, final int columnIndex, Join join, Map childMap) throws Exception { +// if (table == null) { // 对应副表 viceSql 不能生成正常 SQL, 或者是 ! - Outer, ( - ANTI JOIN 的副表这种不需要缓存及返回的数据 +// Log.i(TAG, "onPutColumn table == null >> return table;"); +// return table; +// } +// +// if (isHideColumn(config, rs, rsmd, tablePosition, table, columnIndex, childMap)) { +// Log.i(TAG, "onPutColumn isHideColumn(config, rs, rsmd, tablePosition, table, columnIndex, childMap) >> return table;"); +// return table; +// } +// +// String label = getKey(config, rs, rsmd, tablePosition, table, columnIndex, childMap); +// Object value = getValue(config, rs, rsmd, tablePosition, table, columnIndex, label, childMap); +// +// // TODO +// if(StringUtils.equals(config.getTable(), "User") && StringUtils.equals(label, "addr_id")) { +// value = "1-1-1"; +// } +// // 主表必须 put 至少一个 null 进去,否则全部字段为 null 都不 put 会导致中断后续正常返回值 +// if (value != null || (join == null && table.isEmpty())) { +// table.put(label, value); +// } +// +// return table; +// } + + // 取消注释支持 !key 反选字段 和 字段名映射,需要先依赖插件 https://github.com/APIJSON/apijson-column + // @Override + // protected String getKey(SQLConfig config, ResultSet rs, ResultSetMetaData + // rsmd, int tablePosition, JSONObject table, + // int columnIndex, Map childMap) throws Exception { + // return ColumnUtil.compatOutputKey(super.getKey(config, rs, rsmd, + // tablePosition, table, columnIndex, childMap), config.getTable(), + // config.getMethod()); + // } + + // 不需要隐藏字段这个功能时,取消注释来提升性能 + // @Override + // protected boolean isHideColumn(SQLConfig config, ResultSet rs, + // ResultSetMetaData rsmd, int tablePosition, + // JSONObject table, int columnIndex, Map childMap) throws + // SQLException { + // return false; + // } + +} diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DynamicDataSource.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DynamicDataSource.java new file mode 100644 index 00000000..7eef7c54 --- /dev/null +++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/DynamicDataSource.java @@ -0,0 +1,169 @@ +package apijson.demo; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import javax.sql.DataSource; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import com.alibaba.druid.pool.DruidDataSource; +import com.baomidou.dynamic.datasource.DynamicRoutingDataSource; +import com.baomidou.dynamic.datasource.ds.ItemDataSource; +import com.baomidou.mybatisplus.extension.toolkit.JdbcUtils; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +/*** + * 不存在并发问题 缓存 jdbc 数据源,供apijson调用 1、应用启动添加数据源 2、页面动态添加数据源(数据库存储数据源信息) + * + * + */ +@Data +@Order(value = 10) +@Component +@Slf4j +public class DynamicDataSource implements ApplicationRunner { + // value: 数据源相关信息 + private static Map dataSourceMap = new HashMap<>(); + private String database; // 表所在的数据库类型 + private String schema; // 表所在的数据库名 + private String datasourceName; // 数据源 + private String url; // jdbc url + private String dbAccount; // 数据库用户名 + private String dbPassword; // 数据库密码 + private String dbVersion; // 数据库版本号 + private String clusterName; // 集群名称 + private Properties props; // 属性值 + + @Autowired + private DataSource dataSource; // 数据源 + + public static void addDataSource(DynamicDataSource detail) { + dataSourceMap.put(detail.getDatasourceName(), detail); + } + + /*** + * 获取数据源详细信息 + * + * @return + */ + public static DynamicDataSource getDetail(String datasource) { + if (datasource == null) { + // 默认数据源 + datasource = DataBaseConfig.getInstence().getPrimary(); + } + // 不存在交给框架处理 + return dataSourceMap.get(datasource); + } + + @Override + public void run(ApplicationArguments args) throws Exception { + initJdbcDataSource(); // 初始化spring application.xml 数据库连接池 + // kafka + initMQ_kafka(); + } + + /*** + * 初始化数据库连接池 + */ + private void initJdbcDataSource() { + DynamicRoutingDataSource dataSourceList = (DynamicRoutingDataSource) this.dataSource; + for (String datasourceName : dataSourceList.getDataSources().keySet()) { + ItemDataSource dataSource = (ItemDataSource) dataSourceList.getDataSources().get(datasourceName); + DruidDataSource druid = (DruidDataSource) dataSource.getRealDataSource(); + String url = druid.getDataSourceStat().getUrl(); // 数据库连接url + String schema = DataBaseUtil.getLibname(url); // 数据库名; + String database = JdbcUtils.getDbType(url).getDb().toUpperCase(); // 数据库类型 + String dbAccount = druid.getUsername(); // 数据库用户名 + String dbPassword = druid.getPassword(); // 数据库密码 + String dbVersion = getDBVersion(dataSource); + + DynamicDataSource dynDataSource = new DynamicDataSource(); + dynDataSource.setDatasourceName(datasourceName); + dynDataSource.setDatabase(database); + dynDataSource.setDataSource(druid); + dynDataSource.setSchema(schema); + dynDataSource.setUrl(url); + dynDataSource.setDbAccount(dbAccount); + dynDataSource.setDbPassword(dbPassword); + dynDataSource.setDbVersion(dbVersion); + dataSourceMap.put(datasourceName, dynDataSource); + } + } + + /*** + * 仅供测试使用 + */ + public void initMQ_kafka() { + /* 1.创建kafka生产者的配置信息 */ + Properties props = new Properties(); + /*2.指定连接的kafka集群, broker-list */ + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx:9092"); + /*3.ack应答级别*/ + props.put(ProducerConfig.ACKS_CONFIG, "all"); + /*4.重试次数*/ + props.put(ProducerConfig.RETRIES_CONFIG, 3); + /*5.批次大小,一次发送多少数据,当数据大于16k,生产者会发送数据到 kafka集群 */ + props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); + /*6.等待时间, 等待时间超过1毫秒,即便数据没有大于16k, 也会写数据到kafka集群 */ + props.put(ProducerConfig.LINGER_MS_CONFIG, 1); + /*7. RecordAccumulator 缓冲区大小*/ + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); + /*8. key, value 的序列化类 */ + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + + DynamicDataSource dynDataSource = new DynamicDataSource(); + dynDataSource.setDatasourceName("kafka"); + dynDataSource.setDatabase("MQ"); + dynDataSource.setSchema(""); // 不需要配置数据库名 + dynDataSource.setDbVersion("2.8.1"); // 后面做成动态的 + dynDataSource.setClusterName("kafka"); + dynDataSource.setProps(props); + dataSourceMap.put(dynDataSource.getDatasourceName(), dynDataSource); + } + + public String getDBVersion(DataSource dataSource) { + Connection connection = null; + Statement statement = null; + ResultSet resultSet = null; + try { + connection = dataSource.getConnection(); + statement = connection.createStatement(); + resultSet = statement.executeQuery("select version() as version"); + while (resultSet.next()) { + return resultSet.getString("version"); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + if (resultSet != null) { + resultSet.close(); + } + if (statement != null) { + statement.close(); + } + if (connection != null) { + connection.close(); + } + } catch (SQLException throwables) { + } + } + return null; + } +} diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/KafkaSimpleProducer.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/KafkaSimpleProducer.java new file mode 100644 index 00000000..71c822fc --- /dev/null +++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/KafkaSimpleProducer.java @@ -0,0 +1,34 @@ +package apijson.demo; + +import java.util.Properties; +import java.util.concurrent.Future; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class KafkaSimpleProducer { + + public static int sendMessage(String datasource, Properties props,String topic, Object message) { + KafkaProducer producer = null; + try { + /* 9.创建生产者对象 */ + producer = new KafkaProducer<>(props); + Future future = producer.send(new ProducerRecord<>(topic, message)); + RecordMetadata rMetadata = future.get(); // 调用future的get方法,让main线程阻塞,就可以实现同步发送 + log.info("rMetadata: {}", rMetadata.toString()); + return 1; + } catch (Exception e) { + e.printStackTrace(); + throw new IllegalArgumentException("动态数据源配置错误 " + datasource); + } finally { + if(producer != null) { + /* 关闭资源 */ + producer.close(); + } + } + } +} diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/SpringContextUtils.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/SpringContextUtils.java new file mode 100644 index 00000000..7a8e8f4f --- /dev/null +++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/java/apijson/demo/SpringContextUtils.java @@ -0,0 +1,60 @@ +package apijson.demo; + +import java.util.Map; + +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +/** + * Spring Context 工具类 + * + * @author + */ +@Component +public class SpringContextUtils implements ApplicationContextAware { + public static ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) + throws BeansException { + SpringContextUtils.applicationContext = applicationContext; + } + + public static Object getBean(String name) { + return applicationContext.getBean(name); + } + + public static T getBean(String name, Class requiredType) { + return applicationContext.getBean(name, requiredType); + } + + /** + * 通过class获取Bean. + * + * @param clazz + * @param + * @return + */ + public static T getBean(Class clazz) { + return applicationContext.getBean(clazz); + } + + public static boolean containsBean(String name) { + return applicationContext.containsBean(name); + } + + public static boolean isSingleton(String name) { + return applicationContext.isSingleton(name); + } + + public static Class getType(String name) { + return applicationContext.getType(name); + } + + public static Map getBeansOfType(Class type) { + return applicationContext.getBeansOfType(type); + } + +} diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/resources/application.yml b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/resources/application.yml new file mode 100644 index 00000000..7d5a68ee --- /dev/null +++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/main/resources/application.yml @@ -0,0 +1,35 @@ +spring: + datasource: + type: com.alibaba.druid.pool.DruidDataSource + dynamic: + primary: master + strict: true + druid: + initial-size: 5 + min-idle: 5 + maxActive: 2000 + maxWait: 60000 + timeBetweenEvictionRunsMillis: 60000 + minEvictableIdleTimeMillis: 300000 + validationQuery: SELECT 1 FROM DUAL + testWhileIdle: true + testOnBorrow: false + testOnReturn: false + poolPreparedStatements: true + maxPoolPreparedStatementPerConnectionSize: 20 + filters: stat,slf4j + connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000 + datasource: + master: + driver-class-name: com.mysql.cj.jdbc.Driver + url: jdbc:mysql://localhost:3306/xxxx?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowMultiQueries=true&useSSL=false + username: + password: + filter: + stat: + log-slow-sql: true + slow-sql-millis: 1000 + merge-sql: false + wall: + config: + multi-statement-allow: true diff --git a/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/test/java/apijson/demo/KafkaSimpleConsumer.java b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/test/java/apijson/demo/KafkaSimpleConsumer.java new file mode 100644 index 00000000..c8ebc23d --- /dev/null +++ b/APIJSON-Java-Server/APIJSONDemo-MultiDataSource-Kafka/src/test/java/apijson/demo/KafkaSimpleConsumer.java @@ -0,0 +1,42 @@ +package apijson.demo; + +import java.util.Properties; +import java.util.Arrays; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; + +public class KafkaSimpleConsumer { + public static void main(String[] args) throws Exception { + + // Kafka consumer configuration settings + String topicName = "Topic_User"; + Properties props = new Properties(); + + props.put("bootstrap.servers", "xxx:9092"); + props.put("group.id", "test"); + props.put("enable.auto.commit", "true"); + props.put("auto.commit.interval.ms", "1000"); + props.put("session.timeout.ms", "30000"); + props.put("key.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); + KafkaConsumer consumer = new KafkaConsumer(props); + + // Kafka Consumer subscribes list of topics here. + consumer.subscribe(Arrays.asList(topicName)); + + // print the topic name + System.out.println("Subscribed to topic " + topicName); + int i = 0; + + while (true) { + ConsumerRecords records = consumer.poll(10); + for (ConsumerRecord record : records) + + // print the offset,key and value for the consumer records. + System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); + } + } +}