Skip to content

Commit

Permalink
Dev yobol (#3)
Browse files Browse the repository at this point in the history
* HTTP interfaces for operations

* config logger

* encapsulate tasks unrelated to a protocol driver

Co-authored-by: xufangyou <fangyou.xu@transwarp.io>
  • Loading branch information
Yobol and xufangyou authored Mar 31, 2022
1 parent 746df82 commit 7e302db
Show file tree
Hide file tree
Showing 15 changed files with 265 additions and 83 deletions.
28 changes: 26 additions & 2 deletions config/common.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,32 @@
package config

type CommonOptions struct {
type DriverOptions struct {
DriverHealthCheckIntervalSecond int `json:"driver_health_check_interval_second" yaml:"driver_health_check_interval_second"`
DeviceHealthCheckIntervalSecond int `json:"device_health_check_interval_second" yaml:"device_health_check_interval_second"`
DeviceAutoReconnect bool `json:"device_auto_reconnect" yaml:"device_auto_reconnect"`
DeviceAutoReconnect bool `json:"device_auto_reconnect" yaml:"device_auto_reconnect"` // TODO reconnect automatically by the driver framework
DeviceAutoReconnectIntervalSecond int `json:"device_auto_reconnect_interval_second" yaml:"device_auto_reconnect_interval_second"`
// The number of retries for automatic reconnection of the device. If it is 0, there is no limit.
DeviceAutoReconnectMaxRetries int `json:"device_auto_reconnect_max_retries" yaml:"device_auto_reconnect_max_retries"`
}

type ManagerOptions struct {
HTTP struct {
Port int `json:"port" yaml:"port"`
} `json:"http" yaml:"http"`
}

type LogOptions struct {
Path string `yaml:"path" json:"path"`
Level string `yaml:"level" json:"level" default:"info" validate:"regexp=^(info|debug|warn|error)$"`
Format string `yaml:"format" json:"format" default:"text" validate:"regexp=^(text|json)$"`
Console bool `yaml:"console" json:"console" default:"false"`
Age struct {
Max int `yaml:"max" json:"max" default:"15" validate:"min=1"`
} `yaml:"age" json:"age"` // days
Size struct {
Max int `yaml:"max" json:"max" default:"50" validate:"min=1"`
} `yaml:"size" json:"size"` // in MB
Backup struct {
Max int `yaml:"max" json:"max" default:"15" validate:"min=0"`
} `yaml:"backup" json:"backup"`
}
6 changes: 4 additions & 2 deletions config/define.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ const (
)

type Configuration struct {
CommonOptions CommonOptions `json:"common" yaml:"common"`
MessageBus MessageBusOptions `json:"msgbus" yaml:"msgbus"`
DriverOptions DriverOptions `json:"driver" yaml:"driver"`
ManagerOptions ManagerOptions `json:"manager" yaml:"manager"`
LogOptions LogOptions `json:"log" yaml:"log"`
MessageBus MessageBusOptions `json:"msgbus" yaml:"msgbus"`
}

func NewConfiguration() (*Configuration, errors.EdgeError) {
Expand Down
45 changes: 40 additions & 5 deletions docs/zh/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## 2021.12

> 发布 `v0.1.0` 版本
1. 确定[系统基本架构](./系统架构/README.md)
2. [设备驱动示例(生成随机数)](https://github.com/thingio/edge-randnum-driver)
3. 新增特性:
Expand All @@ -13,11 +15,44 @@
- 支持 MQTTS 配置,可参考 [MQTT 使用 TLS 建立安全连接](./系统安全/MQTTS:%20MQTT%20使用%20TLS%20建立安全连接.md)
- 将设备数据封装为类似于 DeviceData 的结构体,包含属性名、数据类型、数据值、数据采集时间戳等信息

## 2022.01

> 发布 `v0.2.0` 版本
1. 修复了一些问题,增强了系统健壮性
2. 新增特性:
- [`edge-device-manager` 提供元数据及设备数据的 HTTP 及 WebSocket 访问接口](./接口说明/edge-device-manager.md),支持:
- 协议(驱动):
- 查看当前在线驱动(HTTP)
- 产品:
- 创建产品元数据(HTTP)
- 删除产品元数据并联动删除对应驱动中所有已激活的设备影子(HTTP)
- 更新产品元数据并联动更新对应驱动中所有已激活的设备影子(HTTP)
- 查看产品元数据(HTTP)
- 设备:
- 创建设备元数据并联动在对应驱动中创建该设备的影子(HTTP)
- 删除设备元数据并联动在对应驱动中删除该设备的影子(HTTP)
- 更新设备元数据并联动在对应驱动中更新该设备的影子(HTTP)
- 查看设备元数据
- (软)读取设备属性(HTTP)
- (硬)读取设备属性(HTTP)
- 聚合读取设备属性(HTTP -> WebSocket)
- 写入设备属性(HTTP)
- 调用设备方法(HTTP)
- 订阅设备事件(HTTP -> WebSocket)
- 添加 Dockerfile 文件,支持一键生成 Docker 镜像,Manager
[edge-device-manager Dockerfile](https://github.com/thingio/edge-device-manager/blob/main/Dockerfile),Driver
参考 [edge-randnum-driver Dockerfile](https://github.com/thingio/edge-randnum-driver/blob/main/Dockerfile)
- 支持配置日志规则:
- 过滤级别:
- 环境变量:`EDS_LOG.LEVEL: [ panic | fatal | error | warning | info | debug | trace ]`

## TODO

1. 支持设备级别的 MQTT(S) QoS 配置
2.`edge-device-manager` & `edge-device-accessor` 提供 HTTP(S) & WS(S) 访问接口及 Client,与业务无关的代码封装在 `edge-device-std`
1. 由驱动框架完成**设备重连****设备状态转换****设备事件推送****设备属性上报** 等对各驱动来说行为一致的功能
2. 支持设备级别的 MQTT(S) QoS 配置
3.`edge-device-accessor` 提供 HTTP(S) & WS(S) 访问接口及 Client,与业务无关的代码封装在 `edge-device-std`
中提供基础服务
3. `edge-device-accessor` 确定业务边界、架构设计与代码实现
4. 调整 Protocol、Product 与 Device 元数据结构的定义,如 DeviceStatus 是动态消息,只能由系统更改,不应该放在 Device 这样的元数据中
5. 调研 TCP & MQTT(mosquitto) 如何切分数据包,主要是要确定当传输一个大数据文件(如图片)时,MQTT 如何将其拆分,TCP 如何将其拆分?
4. `edge-device-accessor` 确定业务边界、架构设计与代码实现
5. 调整 Protocol、Product 与 Device 元数据结构的定义,如 DeviceStatus 是动态消息,只能由系统更改,不应该放在 Device 这样的元数据中
6. 调研 TCP & MQTT(mosquitto) 如何切分数据包,主要是要确定当传输一个大数据文件(如图片)时,MQTT 如何将其拆分,TCP 如何将其拆分?
6 changes: 6 additions & 0 deletions docs/zh/接口说明/edge-device-manager.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Manager 接口说明

基于 [go-restful-openapi](https://github.com/emicklei/go-restful-openapi) 提供了合乎 RESTful 风格的 API
接口,集成 [go-swagger](https://github.com/go-swagger/go-swagger) 提供了图形化的 API 文档。

启动 Manager 后,访问 [http://172.16.251.163:10996/apidocs/#/](http://172.16.251.163:10996/apidocs/#/) 访问 Swagger 界面。
18 changes: 18 additions & 0 deletions docs/zh/系统架构/Accessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Accessor

1. Accessor 功能:
1. 设备数据持久化与查询;
1. 流程:
1. 设备元数据增加 `recording` 字段,表示是否将采集的设备进行落库;
2. accessor 启动后,连接到 manager,先从 manager 获取全量设备元数据,再监听设备元数据的变更(使用 edge-device-std 定义的 operations );
3. 如果设备的 `recording` 字段为 true,则在 accessor 中启动一个 Recorder 监听 event 以及 props 对应的主题,将采集到的数据落库,否则跳过或者关闭
Recorder;
2. 是否需要支持多数据源,如 InfluxDB | TDEngine 等?
3. 是否可使用 ORM 框架减少开发量,如 gorm | sqlx | ent 等?
4. 是否集成数据可视化工具,如 Grafana | Superset | Metabase 等?
2. 是否需要支持数据推送:
1. 前端通过 WebSocket 获取设备推送的数据?
2. 后端直接通过 MessageBus 订阅设备对应的主题?
2. 对于 [当前的架构设计](https://app.diagrams.net/#G1E_OcUtDI-vPk-1XZFqoLdzKV46zRjKUY) 来说,Manager 负责操作元数据,Accessor 负责操作设备数据。是否可以让
Manager 集成 Accessor,即 Manager 同时负责操作元数据和设备数据?
3. `props *` 的设计跟 `event` 是否有重叠?或者说是否可以用 `event` 表示 `props *`
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
module github.com/thingio/edge-device-std

require (
github.com/banzaicloud/logrus-runtime-formatter v0.0.0-20190729070250-5ae5475bae5e // indirect
github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/mitchellh/mapstructure v1.4.2
github.com/pkg/errors v0.8.1
github.com/rs/xid v1.3.0
github.com/sirupsen/logrus v1.8.1
github.com/spf13/viper v1.9.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)

go 1.16
148 changes: 123 additions & 25 deletions logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,61 @@ package logger

import (
"github.com/sirupsen/logrus"
"github.com/thingio/edge-device-std/config"
"gopkg.in/natefinch/lumberjack.v2"
"io"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"strings"
)

type LogLevel string
func NewLogger(options *config.LogOptions) (*Logger, error) {
root := logrus.NewEntry(logrus.New())

const (
DebugLevel LogLevel = "debug"
InfoLevel LogLevel = "info"
WarnLevel LogLevel = "warn"
ErrorLevel LogLevel = "errors"
FatalLevel LogLevel = "fatal"
PanicLevel LogLevel = "panic"
)
logLevel, err := logrus.ParseLevel(options.Level)
if err != nil {
logLevel = logrus.DebugLevel
}
root.Logger.Level = logLevel

func NewLogger() *Logger {
logger := &Logger{
logger: logrus.NewEntry(logrus.New()),
var logWriter io.Writer
if options.Console {
logWriter = os.Stdout
} else {
logWriter = ioutil.Discard
if options.Path != "" {
if err := os.Mkdir(filepath.Dir(options.Path), 0664); err != nil {
return nil, err
}
if hook, err := newFileHook(fileConfig{
Filename: options.Path,
MaxSize: options.Size.Max,
MaxAge: options.Age.Max,
MaxBackups: options.Backup.Max,
Compress: true,
Level: logLevel,
Formatter: newFormatter(options.Format, false),
}); err != nil {
return nil, err
} else {
root.Logger.Hooks.Add(hook)
}
}
}
_ = logger.SetLevel(InfoLevel)
return logger


root.Logger.SetOutput(logWriter)
root.Logger.SetFormatter(newFormatter("text", true))

return &Logger{logger: root}, nil
}

type Logger struct {
logger *logrus.Entry
}

func (l Logger) SetLevel(level LogLevel) error {
lvl, err := logrus.ParseLevel(string(level))
if err != nil {
return err
}
l.logger.Logger.SetLevel(lvl)
l.logger.Logger.SetOutput(os.Stdout)
l.logger.Logger.SetFormatter(&logFormatter{logrus.TextFormatter{FullTimestamp: true, ForceColors: true}})
return nil
}

// WithFields adds a map of fields to the Entry.
func (l Logger) WithFields(vs ...string) *logrus.Entry {
fs := logrus.Fields{}
Expand Down Expand Up @@ -106,3 +124,83 @@ func (f *logFormatter) Format(entry *logrus.Entry) ([]byte, error) {
}
return data, nil
}

type fileConfig struct {
Filename string
MaxSize int
MaxAge int
MaxBackups int
LocalTime bool
Compress bool
Level logrus.Level
Formatter logrus.Formatter
}

type fileHook struct {
config fileConfig
writer io.Writer
}

func newFileHook(config fileConfig) (logrus.Hook, error) {
hook := fileHook{
config: config,
}

var zeroLevel logrus.Level
if hook.config.Level == zeroLevel {
hook.config.Level = logrus.InfoLevel
}
var zeroFormatter logrus.Formatter
if hook.config.Formatter == zeroFormatter {
hook.config.Formatter = new(logrus.TextFormatter)
}

hook.writer = &lumberjack.Logger{
Filename: config.Filename,
MaxSize: config.MaxSize,
MaxAge: config.MaxAge,
MaxBackups: config.MaxBackups,
LocalTime: config.LocalTime,
Compress: config.Compress,
}

return &hook, nil
}

// Levels Levels
func (hook *fileHook) Levels() []logrus.Level {
return []logrus.Level{
logrus.PanicLevel,
logrus.FatalLevel,
logrus.ErrorLevel,
logrus.WarnLevel,
logrus.InfoLevel,
logrus.DebugLevel,
}
}

// Fire Fire
func (hook *fileHook) Fire(entry *logrus.Entry) (err error) {
if hook.config.Level < entry.Level {
return nil
}
b, err := hook.config.Formatter.Format(entry)
if err != nil {
return err
}
hook.writer.Write(b)
return nil
}

func newFormatter(format string, color bool) logrus.Formatter {
var formatter logrus.Formatter
if strings.ToLower(format) == "json" {
formatter = &logrus.JSONFormatter{}
} else {
if runtime.GOOS == "windows" {
color = false
}
formatter = &logFormatter{logrus.TextFormatter{FullTimestamp: true, DisableColors: !color, ForceColors: color}}
}
return formatter
}
2 changes: 1 addition & 1 deletion models/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type Device struct {
ProductName string `json:"product_name"` // 设备所属产品名称
Category string `json:"category"` // 设备类型(多媒体, 时序), 不可更新
Recording bool `json:"recording"` // 是否正在录制
DeviceStatus string `json:"device_status"` // 设备状态
DeviceStatus string `json:"device_status"` // 设备状态 TODO 状态是一个动态属性,不应该出现在静态的设备元数据中
DeviceProps map[string]string `json:"device_props"` // 设备动态属性, 取决于具体的设备协议
DeviceLabels map[string]string `json:"device_labels"` // 设备标签
DeviceMeta map[string]string `json:"device_meta"` // 视频流元信息
Expand Down
19 changes: 7 additions & 12 deletions models/device_twin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package models

import (
"context"
"github.com/thingio/edge-device-std/logger"
)

Expand All @@ -11,27 +12,21 @@ type DeviceTwin interface {
// It must always return nil if the device needn't be initialized.
Initialize(lg *logger.Logger) error

// Start will to try to create connection with the real device.
// Start will to try to open a connection with the real device.
// It must always return nil if the device needn't be initialized.
Start() error
// Stop will to try to destroy connection with the real device.
Start(ctx context.Context) error
// Stop will to try to close a connection with the real device.
// It must always return nil if the device needn't be initialized.
Stop(force bool) error
// HealthCheck is used to bin the connectivity with the real device.
// HealthCheck is used to check the connectivity with the real device.
HealthCheck() (*DeviceStatus, error)

// Watch will read device's properties periodically with the specified policy.
Watch(bus chan<- *DeviceDataWrapper) error
// Read indicates soft read, it will read the specified property from the cache with TTL.
// Specially, when propertyID is "*", it indicates read all properties.
// Read indicates hard read, it will read the specified property from the real device.
Read(propertyID ProductPropertyID) (map[ProductPropertyID]*DeviceData, error)
// HardRead indicates head read, it will read the specified property from the real device.
// Specially, when propertyID is "*", it indicates read all properties.
HardRead(propertyID ProductPropertyID) (map[ProductPropertyID]*DeviceData, error)
// Write will write the specified property to the real device.
Write(propertyID ProductPropertyID, values map[ProductPropertyID]*DeviceData) error
// Subscribe will subscribe the specified event,
// and put DataOperation including properties specified by the event into the bus.
// and you should put the event into the bus.
Subscribe(eventID ProductEventID, bus chan<- *DeviceDataWrapper) error
// Call is used to call the specified method defined in product,
// then waiting for a while to receive its response.
Expand Down
1 change: 1 addition & 0 deletions models/property.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
)

type Property struct {
Id string `json:"id"`
Name string `json:"name"` // Name 为属性的展示名称
Desc string `json:"desc"` // Desc 为属性的描述, 通常以名称旁的?形式进行展示
Type PropertyValueType `json:"type"` // Type 为该属性的数据类型
Expand Down
9 changes: 5 additions & 4 deletions models/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ type DeviceStatus struct {
}

type DriverStatus struct {
Hello bool `json:"hello"`
Protocol *Protocol `json:"protocol"`
State State `json:"state"`
StateDetail string `json:"state_detail"`
Hello bool `json:"hello"`
Protocol *Protocol `json:"protocol"`
State State `json:"state"`
StateDetail string `json:"state_detail"`
HealthCheckIntervalSecond int `json:"health_check_interval_second"`
}
Loading

0 comments on commit 7e302db

Please sign in to comment.