Skip to content

Commit

Permalink
[feature](mysql-table) support utf8mb4 for mysql external table (apac…
Browse files Browse the repository at this point in the history
…he#9402)

This patch supports utf8mb4 for mysql external table.

if someone needs a mysql external table with utf8mb4 charset, but only support charset utf8 right now.

When create mysql external table, it can add an optional propertiy "charset" which can set character fom mysql connection, 
default value is "utf8". You can set "utf8mb4" instead of "utf8" when you need.
  • Loading branch information
nextdreamblue authored and minghong.zhou committed May 23, 2022
1 parent 39f8498 commit 2ab3b74
Show file tree
Hide file tree
Showing 18 changed files with 51 additions and 12 deletions.
1 change: 1 addition & 0 deletions be/src/exec/mysql_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ Status MysqlScanNode::prepare(RuntimeState* state) {
_my_param.user = mysql_table->user();
_my_param.passwd = mysql_table->passwd();
_my_param.db = mysql_table->mysql_db();
_my_param.charset = mysql_table->charset();
// new one scanner
_mysql_scanner.reset(new (std::nothrow) MysqlScanner(_my_param));

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/mysql_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Status MysqlScanner::open() {
return _error_status("mysql real connect failed.");
}

if (mysql_set_character_set(_my_conn, "utf8")) {
if (mysql_set_character_set(_my_conn, _my_param.charset.c_str())) {
return Status::InternalError("mysql set character set failed.");
}

Expand Down
1 change: 1 addition & 0 deletions be/src/exec/mysql_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct MysqlScannerParam {
std::string user;
std::string passwd;
std::string db;
std::string charset;
unsigned long client_flag;
MysqlScannerParam() : client_flag(0) {}
};
Expand Down
5 changes: 3 additions & 2 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,14 @@ MySQLTableDescriptor::MySQLTableDescriptor(const TTableDescriptor& tdesc)
_host(tdesc.mysqlTable.host),
_port(tdesc.mysqlTable.port),
_user(tdesc.mysqlTable.user),
_passwd(tdesc.mysqlTable.passwd) {}
_passwd(tdesc.mysqlTable.passwd),
_charset(tdesc.mysqlTable.charset) {}

std::string MySQLTableDescriptor::debug_string() const {
std::stringstream out;
out << "MySQLTable(" << TableDescriptor::debug_string() << " _db" << _mysql_db
<< " table=" << _mysql_table << " host=" << _host << " port=" << _port << " user=" << _user
<< " passwd=" << _passwd;
<< " passwd=" << _passwd << " charset=" << _charset;
return out.str();
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ class MySQLTableDescriptor : public TableDescriptor {
const std::string port() const { return _port; }
const std::string user() const { return _user; }
const std::string passwd() const { return _passwd; }
const std::string charset() const { return _charset; }

private:
std::string _mysql_db;
Expand All @@ -241,6 +242,7 @@ class MySQLTableDescriptor : public TableDescriptor {
std::string _port;
std::string _user;
std::string _passwd;
std::string _charset;
};

class ODBCTableDescriptor : public TableDescriptor {
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/mysql_table_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Status MysqlTableSink::init(const TDataSink& t_sink) {
_conn_info.passwd = t_mysql_sink.passwd;
_conn_info.db = t_mysql_sink.db;
_mysql_tbl = t_mysql_sink.table;
_conn_info.charset = t_mysql_sink.charset;

// From the thrift expressions create the real exprs.
RETURN_IF_ERROR(Expr::create_expr_trees(_pool, _t_output_expr, &_output_expr_ctxs));
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/mysql_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ std::string MysqlConnInfo::debug_string() const {
std::stringstream ss;

ss << "(host=" << host << ",port=" << port << ",user=" << user << ",db=" << db
<< ",passwd=" << passwd << ")";
<< ",passwd=" << passwd << ",charset=" << charset << ")";
return ss.str();
}

Expand Down Expand Up @@ -62,7 +62,7 @@ Status MysqlTableWriter::open(const MysqlConnInfo& conn_info, const std::string&
}

// set character
if (mysql_set_character_set(_mysql_conn, "utf8")) {
if (mysql_set_character_set(_mysql_conn, conn_info.charset.c_str())) {
std::stringstream ss;
ss << "mysql_set_character_set failed because " << mysql_error(_mysql_conn);
return Status::InternalError(ss.str());
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/mysql_table_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct MysqlConnInfo {
std::string passwd;
std::string db;
int port;
std::string charset;

std::string debug_string() const;
};
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/vmysql_table_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Status VMysqlTableSink::init(const TDataSink& t_sink) {
_conn_info.passwd = t_mysql_sink.passwd;
_conn_info.db = t_mysql_sink.db;
_mysql_tbl = t_mysql_sink.table;
_conn_info.charset = t_mysql_sink.charset;

// From the thrift expressions create the real exprs.
RETURN_IF_ERROR(VExpr::create_expr_trees(_pool, _t_output_expr, &_output_expr_ctxs));
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/vmysql_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Status VMysqlTableWriter::open(const MysqlConnInfo& conn_info, const std::string
}

// set character
if (mysql_set_character_set(_mysql_conn, "utf8")) {
if (mysql_set_character_set(_mysql_conn, conn_info.charset.c_str())) {
fmt::memory_buffer err_ss;
fmt::format_to(err_ss, "mysql_set_character_set failed because : {}.",
mysql_error(_mysql_conn));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Which type of external table is mainly identified by the ENGINE type, currently
"table" = "table_name"
)
````
and there is an optional propertiy "charset" which can set character fom mysql connection, default value is "utf8". You can set another value "utf8mb4" instead of "utf8" when you need.

Notice:

Expand Down Expand Up @@ -133,7 +134,8 @@ Which type of external table is mainly identified by the ENGINE type, currently
"user" = "mysql_user",
"password" = "mysql_passwd",
"database" = "mysql_db_test",
"table" = "mysql_table_test"
"table" = "mysql_table_test",
"charset" = "utf8mb4"
)
````
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ CREATE EXTERNAL TABLE
"table" = "table_name"
)
```
以及一个可选属性"charset",可以用来设置mysql连接的字符集, 默认值是"utf8"。如有需要,你可以设置为另外一个字符集"utf8mb4"

注意:

Expand Down Expand Up @@ -133,7 +134,8 @@ CREATE EXTERNAL TABLE
"user" = "mysql_user",
"password" = "mysql_passwd",
"database" = "mysql_db_test",
"table" = "mysql_table_test"
"table" = "mysql_table_test",
"charset" = "utf8mb4"
)
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
mysqlTable.getUserName(),
mysqlTable.getPasswd(),
mysqlTable.getMysqlDatabaseName(),
mysqlTable.getMysqlTableName());
mysqlTable.getMysqlTableName(),
mysqlTable.getCharset());
totalRows.add(row);
} else {
ErrorReport.reportAnalysisException(ErrorCode.ERR_UNKNOWN_STORAGE_ENGINE, table.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4290,6 +4290,7 @@ public static void getDdlStmt(DdlStmt ddlStmt, String dbName, Table table, List<
sb.append("\"port\" = \"").append(mysqlTable.getPort()).append("\",\n");
sb.append("\"user\" = \"").append(mysqlTable.getUserName()).append("\",\n");
sb.append("\"password\" = \"").append(hidePassword ? "" : mysqlTable.getPasswd()).append("\",\n");
sb.append("\"charset\" = \"").append(mysqlTable.getCharset()).append("\",\n");
} else {
sb.append("\"odbc_catalog_resource\" = \"").append(mysqlTable.getOdbcCatalogResourceName()).append("\",\n");
}
Expand Down
25 changes: 23 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class MysqlTable extends Table {
private static final String MYSQL_PASSWORD = "password";
private static final String MYSQL_DATABASE = "database";
private static final String MYSQL_TABLE = "table";
private static final String MYSQL_CHARSET = "charset";

private String odbcCatalogResourceName;
private String host;
Expand All @@ -56,6 +57,7 @@ public class MysqlTable extends Table {
private String passwd;
private String mysqlDatabaseName;
private String mysqlTableName;
private String charset;

public MysqlTable() {
super(TableType.MYSQL);
Expand Down Expand Up @@ -124,6 +126,15 @@ private void validate(Map<String, String> properties) throws DdlException {
throw new DdlException("Password of MySQL table is null. "
+ "Please set proper resource or add properties('password'='xxxx') when create table");
}

charset = properties.get(MYSQL_CHARSET);
if (charset == null) {
charset = "utf8";
}
if (!charset.equalsIgnoreCase("utf8") && !charset.equalsIgnoreCase("utf8mb4")) {
throw new DdlException("Unknown character set of MySQL table. "
+ "Please set charset 'utf8' or 'utf8mb4', other charsets not be unsupported now.");
}
}

mysqlDatabaseName = properties.get(MYSQL_DATABASE);
Expand Down Expand Up @@ -193,9 +204,16 @@ public String getMysqlTableName() {
return mysqlTableName;
}

public String getCharset() {
if (charset != null) {
return charset;
}
return "utf8";
}

public TTableDescriptor toThrift() {
TMySQLTable tMySQLTable =
new TMySQLTable(getHost(), getPort(), getUserName(), getPasswd(), mysqlDatabaseName, mysqlTableName);
TMySQLTable tMySQLTable = new TMySQLTable(getHost(), getPort(), getUserName(), getPasswd(),
mysqlDatabaseName, mysqlTableName, getCharset());
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.MYSQL_TABLE,
fullSchema.size(), 0, getName(), "");
tTableDescriptor.setMysqlTable(tMySQLTable);
Expand All @@ -213,6 +231,7 @@ public String getSignature(int signatureVersion) {
sb.append(getPasswd());
sb.append(mysqlDatabaseName);
sb.append(mysqlTableName);
sb.append(getCharset());
String md5 = DigestUtils.md5Hex(sb.toString());
LOG.debug("get signature of mysql table {}: {}. signature string: {}", name, md5, sb.toString());
return md5;
Expand All @@ -230,6 +249,7 @@ public void write(DataOutput out) throws IOException {
serializeMap.put(MYSQL_PASSWORD, passwd);
serializeMap.put(MYSQL_DATABASE, mysqlDatabaseName);
serializeMap.put(MYSQL_TABLE, mysqlTableName);
serializeMap.put(MYSQL_CHARSET, charset);

int size = (int) serializeMap.values().stream().filter(v -> {
return v != null;
Expand Down Expand Up @@ -262,5 +282,6 @@ public void readFields(DataInput in) throws IOException {
passwd = serializeMap.get(MYSQL_PASSWORD);
mysqlDatabaseName = serializeMap.get(MYSQL_DATABASE);
mysqlTableName = serializeMap.get(MYSQL_TABLE);
charset = serializeMap.get(MYSQL_CHARSET);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class MysqlTableSink extends DataSink {
private final String passwd;
private final String db;
private final String tbl;
private final String charset;

public MysqlTableSink(MysqlTable mysqlTable) {
host = mysqlTable.getHost();
Expand All @@ -38,6 +39,7 @@ public MysqlTableSink(MysqlTable mysqlTable) {
passwd = mysqlTable.getPasswd();
db = mysqlTable.getMysqlDatabaseName();
tbl = mysqlTable.getMysqlTableName();
charset = mysqlTable.getCharset();
}

@Override
Expand All @@ -52,7 +54,7 @@ public String getExplainString(String prefix, TExplainLevel explainLevel) {
protected TDataSink toThrift() {
TDataSink tDataSink = new TDataSink(TDataSinkType.MYSQL_TABLE_SINK);

tDataSink.setMysqlTableSink(new TMysqlTableSink(host, port, user, passwd, db, tbl));
tDataSink.setMysqlTableSink(new TMysqlTableSink(host, port, user, passwd, db, tbl, charset));
return tDataSink;
}

Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/DataSinks.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ struct TMysqlTableSink {
4: required string passwd
5: required string db
6: required string table
7: required string charset
}

struct TOdbcTableSink {
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/Descriptors.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ struct TMySQLTable {
4: required string passwd
5: required string db
6: required string table
7: required string charset
}

struct TOdbcTable {
Expand Down

0 comments on commit 2ab3b74

Please sign in to comment.