diff --git a/milvusreader/pom.xml b/milvusreader/pom.xml
new file mode 100644
index 000000000..c66a6c7ba
--- /dev/null
+++ b/milvusreader/pom.xml
@@ -0,0 +1,109 @@
+
+
+ 4.0.0
+
+ com.alibaba.datax
+ datax-all
+ 0.0.1-SNAPSHOT
+
+
+ milvusreader
+
+
+ UTF-8
+ official
+ 1.8
+
+
+
+
+ guava
+ com.google.guava
+ 32.0.1-jre
+
+
+
+
+
+
+ io.milvus
+ milvus-sdk-java
+ 2.4.8
+
+
+ org.jetbrains.kotlin
+ kotlin-test-junit5
+ 2.0.0
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.10.0
+ test
+
+
+ org.jetbrains.kotlin
+ kotlin-stdlib
+ 2.0.0
+
+
+ com.alibaba.datax
+ datax-common
+ 0.0.1-SNAPSHOT
+ compile
+
+
+ org.projectlombok
+ lombok
+ 1.18.30
+ provided
+
+
+
+
+
+
+
+ src/main/resources
+
+ **/*.*
+
+ true
+
+
+
+
+
+ maven-compiler-plugin
+
+ ${jdk-version}
+ ${jdk-version}
+ ${project-sourceEncoding}
+
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/milvusreader/src/main/assembly/package.xml b/milvusreader/src/main/assembly/package.xml
new file mode 100644
index 000000000..3b12c5cb5
--- /dev/null
+++ b/milvusreader/src/main/assembly/package.xml
@@ -0,0 +1,36 @@
+
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/reader/milvusreader
+
+
+ target/
+
+ milvuswriter-0.0.1-SNAPSHOT.jar
+
+ plugin/reader/milvusreader
+
+
+
+
+
+ false
+ plugin/reader/milvusreader/libs
+ runtime
+
+
+
diff --git a/milvusreader/src/main/doc/milvusreader.md b/milvusreader/src/main/doc/milvusreader.md
new file mode 100644
index 000000000..66f2ec867
--- /dev/null
+++ b/milvusreader/src/main/doc/milvusreader.md
@@ -0,0 +1,106 @@
+### Datax MilvusReader
+#### 1 快速介绍
+
+MilvusReader 插件利用 Milvus 的java客户端MilvusClient进行Milvus的读操作。
+
+#### 2 实现原理
+
+MilvusReader通过Datax框架从Milvus读取数据,通过主控的JOB程序按照指定的规则对Milvus中的数据进行分片,并行读取,然后将Milvus支持的类型通过逐一判断转换成Datax支持的类型。
+
+#### 3 功能说明
+* 该示例从Milvus读一份Collection数据到另一个Milvus。
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "milvusreader",
+ "parameter": {
+ "uri": "https://****.aws-us-west-2.vectordb.zillizcloud.com:19532",
+ "token": "*****",
+ "collection": "medium_articles",
+ "batchSize": 10
+ }
+ },
+ "writer": {
+ "name": "milvuswriter",
+ "parameter": {
+ "uri": "https://*****.aws-us-west-2.vectordb.zillizcloud.com:19530",
+ "token": "*****",
+ "collection": "medium_articles",
+ "batchSize": 10,
+ "schemaCreateMode": "createWhenTableNotExit",
+ "column": [
+ {
+ "name": "id",
+ "type": "Int64",
+ "isPrimaryKey": true
+ },
+ {
+ "name": "title_vector",
+ "type": "FloatVector",
+ "dimension": 768
+ },
+ {
+ "name": "title",
+ "type": "VarChar",
+ "maxLength": 1000
+ },
+ {
+ "name": "link",
+ "type": "VarChar",
+ "maxLength": 1000
+ },
+ {
+ "name": "reading_time",
+ "type": "Int64"
+ },
+ {
+ "name": "publication",
+ "type": "VarChar",
+ "maxLength": 1000
+ },
+ {
+ "name": "claps",
+ "type": "Int64"
+ },
+ {
+ "name": "responses",
+ "type": "Int64"
+ }
+ ]
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
+```
+
+#### 4 参数说明
+
+* uri: Milvus Cluster endpoint。【必填】
+* token:Milvus的连接token。【必填】
+* collection: 读取数据的collection。【必填】
+* partition: 读取数据的partition。【选填】
+* batchSize: 每次读取数据的行数【选填】
+
+#### 5 类型转换
+
+| DataX 内部类型| Milvus 数据类型 |
+| -------- |-----------------|
+| Long | int |
+| Double | double |
+| String | string, varchar |
+| Boolean | bool |
+
+- 当前暂不支持读取dynamic schema的数据,及按partition读取
+
+#### 6 性能报告
+#### 7 测试报告
diff --git a/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/BufferUtils.java b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/BufferUtils.java
new file mode 100644
index 000000000..ed22c129e
--- /dev/null
+++ b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/BufferUtils.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.datax.plugin.reader.milvusreader;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+public class BufferUtils {
+
+ public static ByteBuffer toByteBuffer(Short[] shortArray) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2);
+
+ for (Short value : shortArray) {
+ byteBuffer.putShort(value);
+ }
+
+ // Compatible compilation and running versions are not consistent
+ // Flip the buffer to prepare for reading
+ ((Buffer) byteBuffer).flip();
+
+ return byteBuffer;
+ }
+
+ public static Short[] toShortArray(ByteBuffer byteBuffer) {
+ Short[] shortArray = new Short[byteBuffer.capacity() / 2];
+
+ for (int i = 0; i < shortArray.length; i++) {
+ shortArray[i] = byteBuffer.getShort();
+ }
+
+ return shortArray;
+ }
+
+ public static ByteBuffer toByteBuffer(Float[] floatArray) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * 4);
+
+ for (float value : floatArray) {
+ byteBuffer.putFloat(value);
+ }
+
+ ((Buffer) byteBuffer).flip();
+
+ return byteBuffer;
+ }
+
+ public static Float[] toFloatArray(ByteBuffer byteBuffer) {
+ Float[] floatArray = new Float[byteBuffer.capacity() / 4];
+
+ for (int i = 0; i < floatArray.length; i++) {
+ floatArray[i] = byteBuffer.getFloat();
+ }
+
+ return floatArray;
+ }
+
+ public static ByteBuffer toByteBuffer(Double[] doubleArray) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(doubleArray.length * 8);
+
+ for (double value : doubleArray) {
+ byteBuffer.putDouble(value);
+ }
+
+ ((Buffer) byteBuffer).flip();
+
+ return byteBuffer;
+ }
+
+ public static Double[] toDoubleArray(ByteBuffer byteBuffer) {
+ Double[] doubleArray = new Double[byteBuffer.capacity() / 8];
+
+ for (int i = 0; i < doubleArray.length; i++) {
+ doubleArray[i] = byteBuffer.getDouble();
+ }
+
+ return doubleArray;
+ }
+
+ public static ByteBuffer toByteBuffer(Integer[] intArray) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(intArray.length * 4);
+
+ for (int value : intArray) {
+ byteBuffer.putInt(value);
+ }
+
+ ((Buffer) byteBuffer).flip();
+
+ return byteBuffer;
+ }
+
+ public static Integer[] toIntArray(ByteBuffer byteBuffer) {
+ Integer[] intArray = new Integer[byteBuffer.capacity() / 4];
+
+ for (int i = 0; i < intArray.length; i++) {
+ intArray[i] = byteBuffer.getInt();
+ }
+
+ return intArray;
+ }
+}
diff --git a/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/KeyConstant.java b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/KeyConstant.java
new file mode 100644
index 000000000..b0f608be1
--- /dev/null
+++ b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/KeyConstant.java
@@ -0,0 +1,10 @@
+package com.alibaba.datax.plugin.reader.milvusreader;
+
+public class KeyConstant {
+ public static final String URI = "uri";
+ public static final String TOKEN = "token";
+ public static final String DATABASE = "database";
+ public static final String COLLECTION = "collection";
+ public static final String PARTITION = "partition";
+ public static final String BATCH_SIZE = "batchSize";
+}
diff --git a/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusReader.java b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusReader.java
new file mode 100644
index 000000000..51e7798ed
--- /dev/null
+++ b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusReader.java
@@ -0,0 +1,130 @@
+package com.alibaba.datax.plugin.reader.milvusreader;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.plugin.RecordSender;
+import com.alibaba.datax.common.spi.Reader;
+import com.alibaba.datax.common.spi.Writer;
+import com.alibaba.datax.common.util.Configuration;
+import io.milvus.orm.iterator.QueryIterator;
+import io.milvus.response.QueryResultsWrapper;
+import io.milvus.v2.client.ConnectConfig;
+import io.milvus.v2.client.MilvusClientV2;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
+import io.milvus.v2.service.collection.request.DescribeCollectionReq;
+import io.milvus.v2.service.collection.request.HasCollectionReq;
+import io.milvus.v2.service.collection.response.DescribeCollectionResp;
+import io.milvus.v2.service.vector.request.QueryIteratorReq;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@Slf4j
+public class MilvusReader extends Reader {
+ public static class Job extends Reader.Job {
+ private Configuration originalConfig = null;
+ /**
+ * 切分任务。
+ *
+ * @param mandatoryNumber 为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!
+ */
+ @Override
+ public List split(int mandatoryNumber) {
+ List configList = new ArrayList();
+ for(int i = 0; i < mandatoryNumber; i++) {
+ configList.add(this.originalConfig.clone());
+ }
+ return configList;
+ }
+
+ @Override
+ public void init() {
+ this.originalConfig = super.getPluginJobConf();
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+ }
+ public static class Task extends Reader.Task {
+
+ private MilvusClientV2 milvusClientV2;
+
+ private MilvusSourceConverter milvusSourceConverter;
+
+ private String collection = null;
+ private String partition = null;
+ private Integer batchSize;
+
+ private CreateCollectionReq.CollectionSchema collectionSchema;
+
+ @Override
+ public void init() {
+ log.info("Initializing Milvus writer");
+ // get configuration
+ Configuration writerSliceConfig = this.getPluginJobConf();
+ this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION);
+ this.partition = writerSliceConfig.getString(KeyConstant.PARTITION, null);
+ this.batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100);
+ log.info("Collection:{}", this.collection);
+ // connect to milvus
+ ConnectConfig connectConfig = ConnectConfig.builder()
+ .uri(writerSliceConfig.getString(KeyConstant.URI))
+ .token(writerSliceConfig.getString(KeyConstant.TOKEN))
+ .build();
+ if(writerSliceConfig.getString(KeyConstant.DATABASE) == null) {
+ log.warn("Database is set, using database{}", writerSliceConfig.getString(KeyConstant.DATABASE));
+ connectConfig.setDbName(writerSliceConfig.getString(KeyConstant.DATABASE));
+ }
+ this.milvusClientV2 = new MilvusClientV2(connectConfig);
+ this.milvusSourceConverter = new MilvusSourceConverter();
+ log.info("Milvus writer initialized");
+ }
+ @Override
+ public void prepare() {
+ super.prepare();
+ Boolean hasCollection = milvusClientV2.hasCollection(HasCollectionReq.builder().collectionName(collection).build());
+ if (!hasCollection) {
+ log.error("Collection {} does not exist", collection);
+ throw new RuntimeException("Collection does not exist");
+ }
+ DescribeCollectionReq describeCollectionReq = DescribeCollectionReq.builder()
+ .collectionName(collection)
+ .build();
+ DescribeCollectionResp describeCollectionResp = milvusClientV2.describeCollection(describeCollectionReq);
+ this.collectionSchema = describeCollectionResp.getCollectionSchema();
+ }
+
+ @Override
+ public void destroy() {
+ log.info("Closing Milvus reader, closing connection");
+ this.milvusClientV2.close();
+ }
+
+ @Override
+ public void startRead(RecordSender recordSender) {
+ QueryIteratorReq queryIteratorReq = QueryIteratorReq.builder()
+ .collectionName(collection)
+ .outputFields(Collections.singletonList("*"))
+ .batchSize(batchSize)
+ .build();
+ if(partition != null) {
+ queryIteratorReq.setPartitionNames(Collections.singletonList(partition));
+ }
+ QueryIterator queryIterator = milvusClientV2.queryIterator(queryIteratorReq);
+ while (true){
+ List rowRecords = queryIterator.next();
+ if(rowRecords.isEmpty()){
+ break;
+ }
+ rowRecords.forEach(rowRecord -> {
+ Record record = recordSender.createRecord();
+ record = milvusSourceConverter.toDataXRecord(record, rowRecord, collectionSchema);
+ recordSender.sendToWriter(record);
+ });
+ }
+ }
+ }
+}
diff --git a/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusSourceConverter.java b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusSourceConverter.java
new file mode 100644
index 000000000..2f8eb6fd3
--- /dev/null
+++ b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusSourceConverter.java
@@ -0,0 +1,64 @@
+package com.alibaba.datax.plugin.reader.milvusreader;
+
+import com.alibaba.datax.common.element.BoolColumn;
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.DoubleColumn;
+import com.alibaba.datax.common.element.LongColumn;
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.element.StringColumn;
+import com.google.gson.Gson;
+import io.milvus.response.QueryResultsWrapper;
+import io.milvus.v2.common.DataType;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class MilvusSourceConverter {
+
+ public Record toDataXRecord(Record record, QueryResultsWrapper.RowRecord rowRecord, CreateCollectionReq.CollectionSchema collectionSchema) {
+ Map fields = rowRecord.getFieldValues();
+
+ for (int i = 0; i < collectionSchema.getFieldSchemaList().size(); i++) {
+ CreateCollectionReq.FieldSchema fieldSchema = collectionSchema.getFieldSchemaList().get(i);
+ String fieldName = fieldSchema.getName();
+ Object fieldValue = fields.get(fieldName);
+ Column column = convertToDataXColumn(fieldSchema.getDataType(), fieldValue);
+ record.addColumn(column);
+ }
+ return record;
+ }
+
+ private Column convertToDataXColumn(DataType dataType, Object fieldValue) {
+ Gson gson = new Gson();
+ switch (dataType) {
+ case Bool:
+ return new BoolColumn(Boolean.getBoolean(fieldValue.toString()));
+ case Int8:
+ case Int16:
+ case Int32:
+ case Int64:
+ return new LongColumn(Integer.parseInt(fieldValue.toString()));
+ case Float:
+ case Double:
+ return new DoubleColumn(java.lang.Double.parseDouble(fieldValue.toString()));
+ case VarChar:
+ case String:
+ return new StringColumn(fieldValue.toString());
+ case JSON:
+ return new StringColumn(gson.toJson(fieldValue));
+ case Array:
+ return new StringColumn(gson.toJson(fieldValue));
+ case FloatVector:
+ List floats = (List) fieldValue;
+ return new StringColumn(Arrays.toString(floats.toArray()));
+ case BinaryVector:
+ Integer[] binarys = BufferUtils.toIntArray((ByteBuffer) fieldValue);
+ return new StringColumn(Arrays.toString(binarys));
+ default:
+ throw new IllegalArgumentException("Unsupported data type: " + dataType);
+ }
+ }
+}
diff --git a/milvusreader/src/main/resources/plugin.json b/milvusreader/src/main/resources/plugin.json
new file mode 100644
index 000000000..dc90019da
--- /dev/null
+++ b/milvusreader/src/main/resources/plugin.json
@@ -0,0 +1,6 @@
+{
+ "name": "milvusreader",
+ "class": "com.alibaba.datax.plugin.reader.milvusreader.MilvusReader",
+ "description": "useScene: prod. mechanism: via milvusclient connect milvus read data concurrent.",
+ "developer": "nianliuu"
+}
diff --git a/milvusreader/src/main/resources/plugin_job_template.json b/milvusreader/src/main/resources/plugin_job_template.json
new file mode 100644
index 000000000..de33d6512
--- /dev/null
+++ b/milvusreader/src/main/resources/plugin_job_template.json
@@ -0,0 +1,9 @@
+{
+ "name": "milvusreader",
+ "parameter": {
+ "uri": "https://*****.aws-us-west-2.vectordb.zillizcloud.com:19532",
+ "token": "*****",
+ "collection": "medium_articles",
+ "batchSize": 10
+ }
+}
\ No newline at end of file
diff --git a/milvuswriter/src/doc/milvuswriter.md b/milvuswriter/src/doc/milvuswriter.md
new file mode 100644
index 000000000..e1ca60b62
--- /dev/null
+++ b/milvuswriter/src/doc/milvuswriter.md
@@ -0,0 +1,116 @@
+### Datax MilvusWriter插件
+#### 1 快速介绍
+
+MilvusWriter 插件利用 Milvus 的java客户端MilvusClient进行Milvus的写操作。
+
+#### 2 实现原理
+
+MilvusWriter通过Datax框架向Milvus写入数据,通过主控的JOB程序按照指定的规则向Milvus写入,然后将Datax的类型通过逐一判断转换成Milvus支持的类型。
+
+#### 3 功能说明
+* 该示例从Milvus读一份Collection数据到另一个Milvus。
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "milvusreader",
+ "parameter": {
+ "uri": "https://****.aws-us-west-2.vectordb.zillizcloud.com:19532",
+ "token": "*****",
+ "collection": "medium_articles",
+ "batchSize": 10
+ }
+ },
+ "writer": {
+ "name": "milvuswriter",
+ "parameter": {
+ "uri": "https://*****.aws-us-west-2.vectordb.zillizcloud.com:19530",
+ "token": "*****",
+ "collection": "medium_articles",
+ "batchSize": 10,
+ "schemaCreateMode": "createWhenTableNotExit",
+ "column": [
+ {
+ "name": "id",
+ "type": "Int64",
+ "isPrimaryKey": true
+ },
+ {
+ "name": "title_vector",
+ "type": "FloatVector",
+ "dimension": 768
+ },
+ {
+ "name": "title",
+ "type": "VarChar",
+ "maxLength": 1000
+ },
+ {
+ "name": "link",
+ "type": "VarChar",
+ "maxLength": 1000
+ },
+ {
+ "name": "reading_time",
+ "type": "Int64"
+ },
+ {
+ "name": "publication",
+ "type": "VarChar",
+ "maxLength": 1000
+ },
+ {
+ "name": "claps",
+ "type": "Int64"
+ },
+ {
+ "name": "responses",
+ "type": "Int64"
+ }
+ ]
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
+```
+
+#### 4 参数说明
+
+* uri: Milvus Cluster endpoint。【必填】
+* token:Milvus的连接token。【必填】
+* collection: 读取数据的collection。【必填】
+* partition: 读取数据的partition。【选填】
+* batchSize: 每次读取数据的行数【选填】
+* schemaCreateMode: schema创建模式, 默认为createWhenTableNotExit ["createWhenTableNotExit","exception"]【选填】
+* enableDyanmicSchema: 是否启用动态schema, 默认为true【选填】
+* column: 写入的字段信息【必填】
+ * name: 字段名【必填】
+ * type: 字段类型[Int8, Int16, Int32, Int64, Float, Double, VarChar, FloatVector, JSON, Array]【必填】
+ * isPrimaryKey: 是否为主键【选填】
+ * isPartitionKey: 是否为分区键【选填】
+ * dimension: FloatVector类型的维度【选填】
+ * maxLength: VarChar类型的最大长度【选填】
+ * elementType: Array类型的元素类型【选填】
+ * maxcapacity: Array类型的最大容量【选填】
+#### 5 类型转换
+
+| DataX 内部类型| Milvus 数据类型 |
+| -------- |-----------------|
+| Long | int |
+| Double | double |
+| String | string, varchar |
+| Boolean | bool |
+
+- 当前暂不支持写入dynamic schema的数据,及按partition写入
+
+#### 6 性能报告
+#### 7 测试报告
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java
index cc6364040..78fc302c5 100644
--- a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java
@@ -5,6 +5,7 @@ public class KeyConstant {
public static final String TOKEN = "token";
public static final String DATABASE = "database";
public static final String COLLECTION = "collection";
+ public static final String PARTITION = "partition";
public static final String AUTO_ID = "autoId";
public static final String ENABLE_DYNAMIC_SCHEMA = "enableDynamicSchema";
public static final String BATCH_SIZE = "batchSize";
@@ -17,4 +18,7 @@ public class KeyConstant {
public static final String schemaCreateMode = "schemaCreateMode";
public static final String IS_PARTITION_KEY = "isPartitionKey";
public static final String MAX_LENGTH = "maxLength";
+ public static final String ELEMENT_TYPE = "elementType";
+ public static final String MAX_CAPACITY = "maxCapacity";
+ public static final String IS_AUTO_INCREMENT = "autoId";
}
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java
index da686af22..dc3590860 100644
--- a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java
@@ -13,12 +13,14 @@ public class MilvusBufferWriter {
private final MilvusClientV2 milvusClientV2;
private final String collection;
+ private final String partition;
private final Integer batchSize;
private List dataCache;
- public MilvusBufferWriter(MilvusClientV2 milvusClientV2, String collection, Integer batchSize){
+ public MilvusBufferWriter(MilvusClientV2 milvusClientV2, String collection, String partition, Integer batchSize){
this.milvusClientV2 = milvusClientV2;
this.collection = collection;
+ this.partition = partition;
this.batchSize = batchSize;
this.dataCache = new ArrayList<>();
}
@@ -37,6 +39,9 @@ public void commit(){
.collectionName(collection)
.data(dataCache)
.build();
+ if(partition != null){
+ upsertReq.setPartitionName(partition);
+ }
milvusClientV2.upsert(upsertReq);
dataCache = new ArrayList<>();
}
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java
index 390f95e5f..fd436f3b0 100644
--- a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java
@@ -5,13 +5,16 @@
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import com.google.gson.reflect.TypeToken;
import io.milvus.v2.common.DataType;
import static io.milvus.v2.common.DataType.*;
import io.milvus.v2.service.collection.request.AddFieldReq;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
+import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.List;
import java.util.stream.Collectors;
public class MilvusSinkConverter {
@@ -31,6 +34,10 @@ public JsonObject convertByType(JSONArray milvusColumnMeta, Record record) {
private Object convertToMilvusField(String type, Object rawData) {
Gson gson = new Gson();
switch (valueOf(type)) {
+ case Int8:
+ return Byte.parseByte(rawData.toString());
+ case Int16:
+ return Short.parseShort(rawData.toString());
case Int32:
return Integer.parseInt(rawData.toString());
case Int64:
@@ -42,11 +49,16 @@ private Object convertToMilvusField(String type, Object rawData) {
return rawData.toString();
case Bool:
return Boolean.parseBoolean(rawData.toString());
+ case JSON:
+ return gson.fromJson(rawData.toString(), JsonObject.class);
+ case Array:
+ Type listType = new TypeToken>() {}.getType();
+ return gson.fromJson(rawData.toString(), listType);
case FloatVector:
- java.lang.Float[] floats = Arrays.stream(rawData.toString().split(",")).map(java.lang.Float::parseFloat).toArray(java.lang.Float[]::new);
+ java.lang.Float[] floats = Arrays.stream(processVectorString(rawData)).map(java.lang.Float::parseFloat).toArray(java.lang.Float[]::new);
return Arrays.stream(floats).collect(Collectors.toList());
case BinaryVector:
- java.lang.Integer[] binarys = Arrays.stream(rawData.toString().split(",")).map(java.lang.Integer::parseInt).toArray(java.lang.Integer[]::new);
+ java.lang.Integer[] binarys = Arrays.stream(processVectorString(rawData)).map(java.lang.Integer::parseInt).toArray(java.lang.Integer[]::new);
return BufferUtils.toByteBuffer(binarys);
case Float16Vector:
case BFloat16Vector:
@@ -56,10 +68,26 @@ private Object convertToMilvusField(String type, Object rawData) {
case SparseFloatVector:
return JsonParser.parseString(gson.toJson(rawData)).getAsJsonObject();
default:
- throw new RuntimeException("Unsupported data type");
+ throw new RuntimeException("Unsupported data type: " + type);
}
}
+ private String[] processArrayString(Object rawData) {
+ // Step 1: Remove square brackets
+ String cleanedInput = rawData.toString().replace("[", "").replace("]", "");
+
+ // Step 2: Split the string into an array of string numbers
+ return cleanedInput.split(",\\s*");
+ }
+
+ private String[] processVectorString(Object rawData) {
+ // Step 1: Remove square brackets
+ String cleanedInput = rawData.toString().replace("[", "").replace("]", "");
+
+ // Step 2: Split the string into an array of string numbers
+ return cleanedInput.split(",\\s*");
+ }
+
public CreateCollectionReq.CollectionSchema prepareCollectionSchema(JSONArray milvusColumnMeta) {
CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder().build();
for (int i = 0; i < milvusColumnMeta.size(); i++) {
@@ -79,6 +107,13 @@ public CreateCollectionReq.CollectionSchema prepareCollectionSchema(JSONArray mi
if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.MAX_LENGTH)) {
addFieldReq.setMaxLength(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_LENGTH));
}
+ if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.ELEMENT_TYPE)) {
+ addFieldReq.setElementType(DataType.valueOf(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.ELEMENT_TYPE)));
+ addFieldReq.setMaxLength(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_LENGTH));
+ }
+ if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.MAX_CAPACITY)) {
+ addFieldReq.setMaxCapacity(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_CAPACITY));
+ }
collectionSchema.addField(addFieldReq);
}
return collectionSchema;
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java
index c9b5a1bcc..61998848f 100644
--- a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java
@@ -13,6 +13,8 @@
import io.milvus.v2.service.collection.request.AddFieldReq;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import io.milvus.v2.service.collection.request.HasCollectionReq;
+import io.milvus.v2.service.partition.request.CreatePartitionReq;
+import io.milvus.v2.service.partition.request.HasPartitionReq;
import io.milvus.v2.service.vector.request.UpsertReq;
import lombok.extern.slf4j.Slf4j;
@@ -56,19 +58,25 @@ public static class Task extends Writer.Task {
private MilvusBufferWriter milvusBufferWriter;
private String collection = null;
+ private String partition = null;
private JSONArray milvusColumnMeta;
- private String schemaCreateMode = "createWhenTableNotExit";
+ private boolean enableDynamicSchema;
+
+ private String schemaCreateMode;
@Override
public void startWrite(RecordReceiver lineReceiver) {
Record record = lineReceiver.getFromReader();
- JsonObject data = milvusSinkConverter.convertByType(milvusColumnMeta, record);
- milvusBufferWriter.write(data);
- if(milvusBufferWriter.needCommit()){
- log.info("Reached buffer limit, Committing data");
- milvusBufferWriter.commit();
- log.info("Data committed");
+ while(record != null){
+ JsonObject data = milvusSinkConverter.convertByType(milvusColumnMeta, record);
+ milvusBufferWriter.write(data);
+ if (milvusBufferWriter.needCommit()) {
+ log.info("Reached buffer limit, Committing data");
+ milvusBufferWriter.commit();
+ log.info("Data committed");
+ }
+ record = lineReceiver.getFromReader();
}
}
@@ -78,6 +86,8 @@ public void init() {
// get configuration
Configuration writerSliceConfig = this.getPluginJobConf();
this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION);
+ this.partition = writerSliceConfig.getString(KeyConstant.PARTITION, null);
+ this.enableDynamicSchema = writerSliceConfig.getBool(KeyConstant.ENABLE_DYNAMIC_SCHEMA, true);
this.milvusColumnMeta = JSON.parseArray(writerSliceConfig.getString(KeyConstant.COLUMN));
this.schemaCreateMode = writerSliceConfig.getString(KeyConstant.schemaCreateMode) == null ?
"createWhenTableNotExit" : writerSliceConfig.getString(KeyConstant.schemaCreateMode);
@@ -94,7 +104,7 @@ public void init() {
}
this.milvusClientV2 = new MilvusClientV2(connectConfig);
this.milvusSinkConverter = new MilvusSinkConverter();
- this.milvusBufferWriter = new MilvusBufferWriter(milvusClientV2, collection, batchSize);
+ this.milvusBufferWriter = new MilvusBufferWriter(milvusClientV2, collection, partition, batchSize);
log.info("Milvus writer initialized");
}
@Override
@@ -107,7 +117,7 @@ public void prepare() {
// create collection
log.info("Creating collection:{}", this.collection);
CreateCollectionReq.CollectionSchema collectionSchema = milvusSinkConverter.prepareCollectionSchema(milvusColumnMeta);
-
+ collectionSchema.setEnableDynamicField(enableDynamicSchema);
CreateCollectionReq createCollectionReq = CreateCollectionReq.builder()
.collectionName(collection)
.collectionSchema(collectionSchema)
@@ -118,6 +128,18 @@ public void prepare() {
throw new RuntimeException("Collection not exist");
}
}
+ if(partition != null) {
+ Boolean hasPartition = milvusClientV2.hasPartition(HasPartitionReq.builder().collectionName(collection).partitionName(partition).build());
+ if (!hasPartition) {
+ log.info("Partition not exist, creating");
+ CreatePartitionReq createPartitionReq = CreatePartitionReq.builder()
+ .collectionName(collection)
+ .partitionName(partition)
+ .build();
+ milvusClientV2.createPartition(createPartitionReq);
+ log.info("Partition created");
+ }
+ }
}
@Override
diff --git a/milvuswriter/src/main/resources/plugin_job_template.json b/milvuswriter/src/main/resources/plugin_job_template.json
index d4ba4bf1f..b9bff3628 100644
--- a/milvuswriter/src/main/resources/plugin_job_template.json
+++ b/milvuswriter/src/main/resources/plugin_job_template.json
@@ -1,15 +1,49 @@
{
- "name": "mongodbwriter",
+ "name": "milvuswriter",
"parameter": {
- "address": [],
- "userName": "",
- "userPassword": "",
- "dbName": "",
- "collectionName": "",
- "column": [],
- "upsertInfo": {
- "isUpsert": "",
- "upsertKey": ""
- }
+ "uri": "https://*****.aws-us-west-2.vectordb.zillizcloud.com:19530",
+ "token": "*****",
+ "collection": "medium_articles",
+ "batchSize": 10,
+ "schemaCreateMode": "createWhenTableNotExit",
+ "column": [
+ {
+ "name": "id",
+ "type": "Int64",
+ "isPrimaryKey": true
+ },
+ {
+ "name": "title_vector",
+ "type": "FloatVector",
+ "dimension": 768
+ },
+ {
+ "name": "title",
+ "type": "VarChar",
+ "maxLength": 1000
+ },
+ {
+ "name": "link",
+ "type": "VarChar",
+ "maxLength": 1000
+ },
+ {
+ "name": "reading_time",
+ "type": "Int64"
+ },
+ {
+ "name": "publication",
+ "type": "VarChar",
+ "maxLength": 1000
+ },
+ {
+ "name": "claps",
+ "type": "Int64"
+ },
+ {
+ "name": "responses",
+ "type": "Int64"
+ }
+ ]
}
}
\ No newline at end of file
diff --git a/package.xml b/package.xml
index 624109f79..49eff42d6 100644
--- a/package.xml
+++ b/package.xml
@@ -257,8 +257,22 @@
datax
+
+ milvusreader/target/datax/
+
+ **/*.*
+
+ datax
+
+
+ milvuswriter/target/datax/
+
+ **/*.*
+
+ datax
+
mysqlwriter/target/datax/
diff --git a/pom.xml b/pom.xml
index 1b364a754..2d4d3a66c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,7 @@
starrocksreader
sybasereader
dorisreader
+ milvusreader
mysqlwriter
starrockswriter