Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka handle #21

Open
songlonqi-java opened this issue Jul 28, 2023 · 1 comment
Open

kafka handle #21

songlonqi-java opened this issue Jul 28, 2023 · 1 comment

Comments

@songlonqi-java
Copy link
Contributor

kafkamq 提供一种插件机制:将数据([]byte)通过 HTTP 发送到外部handle,经过处理后再通过response返回行协议的json格式数据。实现定制化数据。

增加如下配置:(以最终配置为准)

  • http url string
  • message_points int 一次发送的消息点数
  • debug bool 值, 当开启debug功能, message_points 则无效,如果开启debug模式,则将原始byte数据发送,不再进行消息合并。
  • threads int 多线程工作
  • is_response_point 是否将行协议数据发送回来
  • pipeline 脚本
  • header_check 特殊的头部检测(bfy定制化,并非通用)
  • 等等

外部插件有一些约束:

  • kafkamq 接收数据但不负责解析
  • 外部插件解析后的数据可以通过 dk api 发送,也可以返回到 kafkamq 再发送到观测云
  • 通过 response 返回到 kafkamq 必须是 行协议格式。
  • 外部插件收到数据,无论解析失败与否 都应该返回200。
  • kafkamq发送数据到外部插件如果出现timeout,端口不存在等。会尝试重连。不再消费kafka中的消息。
@songlonqi-java
Copy link
Contributor Author

BFY 解决方案

kafkamq收到消息后,合并成一个包含100条消息的包,发送到指定的http url上,数据结构如下:

[
	{
		"topic": "bfySpan",
		"value": "dmFsdWUx"
	},
	{
		"topic": "bfySpan",
		"value": "dmFsdWUy"
	},
	{
		"topic": "bfySpan",
		"value": "dmFsdWUz"
	},
	{
		"topic": "bfySpan",
		"value": "dmFsdWU0"
	}
]

go 数据结构:

type KafkaMessage struct {
	Topic string `json:"topic"`
	Value []byte `json:"value"`
}

func TestMashellJson(t *testing.T) {
	messages := []KafkaMessage{
		{Topic: "bfySpan", Value: []byte("value1")},
		{Topic: "bfySpan", Value: []byte("value2")},
		{Topic: "bfySpan", Value: []byte("value3")},
		{Topic: "bfySpan", Value: []byte("value4")},
	}

	// 将多个消息打包为HTTP请求的主体
	jsonData, err := json.MarshalIndent(messages, "", "	")
	if err != nil {
		fmt.Println("Error marshaling JSON:", err)
		return
	}
	fmt.Println(string(jsonData))
}

返回数据

response 如果返回数据,应该返回行协议的json格式:

[
  {
    "measurement": "abc",
    "tags": {
      "t1": "b",
      "t2": "d"
    },
    "fields": {
      "f1": 123,
      "f2": 3.4,
      "f3": "strval"
    },
    "time": 1624550216
  },
  {
    "measurement": "def",
    "tags": {
      "t1": "b",
      "t2": "d"
    },
    "fields": {
      "f1": 123,
      "f2": 3.4,
      "f3": "strval"
    },
    "time": 1624550216
  }
]

py 可以参考 GitHub InfulxDB-python

返回的 header 也应该说明该数据的类型:

X-category=tracing

datakit 支持数据类型

只要接收到数据 就代表kafkamq将数据发送成功,无论解析如何 就应该返回200, 后等待下一个请求。

如果解析失败,则建议将kafkamq配置中的debug=true 这时候,不会再进行json的组装和序列化。 而是 请求的body就是消息本身。


多线程: http Handle 本身就是支持多线程,并且消息与消息之间没有顺序关系。

异步:kafkamq 在发送到py成功之后,不等待response code 200,立即接收下一个100条消息,等待py返回200后,马上再次发送。


开发时间

按照上述方案,kafkamq 需要一天的开发的时间。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant