From 230bc9858a744691c0f0b7fba2ded7087aa8e01f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=9D=99?= Date: Fri, 7 Jul 2023 17:24:49 +0800 Subject: [PATCH] Update --- config.yml.example | 7 ++ equipment/spray_equipment.go | 2 +- main.go | 162 +++++++++++++++++++++++------------ 3 files changed, 116 insertions(+), 55 deletions(-) diff --git a/config.yml.example b/config.yml.example index 137354f..2350f49 100644 --- a/config.yml.example +++ b/config.yml.example @@ -1,3 +1,10 @@ +app: + env: local +log: + level: debug +server: + host: 127.0.0.1 + port: 8080 mqtt: client_id: mqttx_go host: broker.emqx.io diff --git a/equipment/spray_equipment.go b/equipment/spray_equipment.go index 93740e6..9e39e19 100644 --- a/equipment/spray_equipment.go +++ b/equipment/spray_equipment.go @@ -34,7 +34,7 @@ type SprayEquipment struct { isRunning bool // 是否已启动 errCode uint8 // 错误代码: 0 无, 1 急停, 2 低水位报警 connStatus uint8 // 连接状态: 0 未知, 1 在线, 2 离线 - connLostTime time.Duration // 掉线时间 + connLostTime time.Duration // 连接丢失时间 connLostTimeMu sync.RWMutex } diff --git a/main.go b/main.go index 3975c68..1b2e2ba 100644 --- a/main.go +++ b/main.go @@ -27,13 +27,24 @@ func init() { buf, err := os.ReadFile(configFilePath) if err != nil { - log.Fatal(err) + panic(err) } viper.SetConfigType("yaml") if err := viper.ReadConfig(bytes.NewBuffer(buf)); err != nil { - log.Fatal(err) + panic(err) } + if lvl, err := log.ParseLevel(viper.GetString("log.level")); err != nil { + log.SetLevel(log.DebugLevel) + } else { + log.SetLevel(lvl) + } + log.SetOutput(os.Stdout) + log.SetFormatter(&log.TextFormatter{ + TimestampFormat: "2006-01-02 15:04:05", + FullTimestamp: true, + }) + c := mqttx.NewClient(&mqttx.ClientOptions{ Host: viper.GetString("mqtt.host"), Port: viper.GetInt("mqtt.port"), @@ -57,15 +68,19 @@ func init() { func main() { go func() { - c, err := mqttx.Get(viper.GetString("mqtt.client_id")) + clientId := viper.GetString("mqtt.client_id") + log.Debug("mqtt: 客户端获取中...") + c, err := mqttx.Get(clientId) if err != nil { log.WithFields(log.Fields{ "error": err, }).Error("mqtt: 客户端获取失败") return } + log.Debug("mqtt: 客户端获取成功") RetryConnect: + log.Debug("mqtt: 服务器连接中...") if err := c.Connect(); err != nil { log.WithFields(log.Fields{ "error": err, @@ -73,12 +88,18 @@ func main() { time.Sleep(200 * time.Millisecond) goto RetryConnect } + log.Debug("mqtt: 服务器连接成功") RetrySubscribe: - subTopic := mqttx.SubscribeTopic{ + t := mqttx.SubscribeTopic{ Name: "testtopic/PLC325454756", Qos: 0, MessageHandler: func(c mqtt.Client, msg mqtt.Message) { + log.WithFields(log.Fields{ + "topic": msg.Topic(), + "payload": string(msg.Payload()), + }).Debug("mqtt: 订阅消息接收成功") + var payload struct { State uint8 `json:"state"` Speed1 uint8 `json:"Speed1"` @@ -90,12 +111,15 @@ func main() { if err := json.Unmarshal(msg.Payload(), &payload); err != nil { log.WithFields(log.Fields{ + "topic": msg.Topic(), "payload": string(msg.Payload()), "error": err, - }).Error("mqtt: 订阅内容解析失败") + }).Error("mqtt: 订阅消息解析失败") return } + connStatus := sprayEquipment.GetConnStatus() + sprayEquipment.ResetConnLostTime() sprayEquipment.SetSpeed1(payload.Speed1) sprayEquipment.SetSpeed2(payload.Speed2) @@ -103,52 +127,74 @@ func main() { sprayEquipment.SetYv2(payload.Yv2) sprayEquipment.SetIsRunning(payload.State == 1) sprayEquipment.SetErrCode(payload.Err) + + if connStatus == equipment.ConnStatusOffline { + log.Info("mqtt: 喷雾设备已上线") + } }, WaitTimeout: 5 * time.Second, } - if err := c.Subscribe(subTopic); err != nil { + log.WithFields(log.Fields{ + "topic": t.Name, + }).Debug("mqtt: 话题订阅中...") + if err := c.Subscribe(t); err != nil { log.WithFields(log.Fields{ - "topic": subTopic.Name, + "topic": t.Name, "error": err, }).Error("mqtt: 话题订阅失败") time.Sleep(200 * time.Millisecond) goto RetrySubscribe } + log.WithFields(log.Fields{ + "topic": t.Name, + }).Debug("mqtt: 话题订阅成功") - go func() { - d := 5 * time.Second - - for { - time.Sleep(d) - if sprayEquipment.IncreaseConnLostTime(d) >= 60*time.Second && sprayEquipment.GetConnStatus() != equipment.ConnStatusOffline { - sprayEquipment.SetConnStatus(equipment.ConnStatusOffline) - log.Warning("喷雾设备已离线") - } - } - }() - - pubTopic := mqttx.PublishTopic{ - Name: "testtopic/325454756", - Qos: 0, - Retained: false, - Payload: `{"KEY1":0}`, - WaitTimeout: 5 * time.Second, - } + // 喷雾设备在线检查 + keepAliveTimeout := 60 * time.Second + keepAliveInterval := 10 * time.Second for { - if err := c.Publish(pubTopic); err != nil { + go func() { + t := mqttx.PublishTopic{ + Name: "testtopic/325454756", + Qos: 0, + Retained: false, + Payload: `{"key1":0}`, + WaitTimeout: 5 * time.Second, + } log.WithFields(log.Fields{ - "topic": pubTopic.Name, - "error": err, - }).Error("mqtt: 话题发送失败") + "topic": t.Name, + "payload": t.Payload, + }).Debug("mqtt: 话题发布中...") + if err := c.Publish(t); err != nil { + log.WithFields(log.Fields{ + "topic": t.Name, + "payload": t.Payload, + "error": err, + }).Error("mqtt: 话题发布失败") + } + log.WithFields(log.Fields{ + "topic": t.Name, + "payload": t.Payload, + }).Debug("mqtt: 话题发布成功") + }() + + time.Sleep(keepAliveInterval) + + if sprayEquipment.IncreaseConnLostTime(keepAliveInterval) >= keepAliveTimeout && sprayEquipment.GetConnStatus() != equipment.ConnStatusOffline { + sprayEquipment.SetConnStatus(equipment.ConnStatusOffline) + log.Warning("mqtt: 喷雾设备已离线") } - time.Sleep(5 * time.Second) } }() - r := gin.Default() + if env := viper.GetString("app.env"); env == "production" || env == "prod" { + gin.SetMode(gin.ReleaseMode) + } + r := gin.New() + r.Use(gin.Logger(), gin.Recovery()) r.GET("/status", StatusHandler) r.POST("/mqtt", MqttHandler) - r.Run() + r.Run(fmt.Sprintf("%s:%d", viper.GetString("server.host"), viper.GetInt("server.port"))) } func StatusHandler(c *gin.Context) { @@ -176,29 +222,37 @@ func MqttHandler(c *gin.Context) { return } - payload := make(map[string]interface{}) - payload["KEY1"] = 2 - payload["Speed1"] = input.Speed1 - payload["Speed2"] = input.Speed2 - payload["YV1"] = input.YV1 - payload["YV2"] = input.YV2 + if env := viper.GetString("app.env"); env == "production" || env == "prod" { + // 消息体必须按 key 的先后顺序来: {"key1":2,"Speed1":0,"Speed2":0,"YV1":0,"YV2":0} + payload, _ := json.Marshal(struct { + Key1 uint `json:"key1"` + Speed1 uint8 `json:"Speed1"` + Speed2 uint8 `json:"Speed2"` + YV1 uint8 `json:"YV1"` + YV2 uint8 `json:"YV2"` + }{ + Key1: 2, + Speed1: *input.Speed1, + Speed2: *input.Speed2, + YV1: *input.YV1, + YV2: *input.YV2, + }) - buf, _ := json.Marshal(payload) + topic := mqttx.PublishTopic{ + Name: "testtopic/325454756", + Qos: 0, + Retained: false, + Payload: payload, + WaitTimeout: 5 * time.Second, + } - topic := mqttx.PublishTopic{ - Name: "testtopic/325454756", - Qos: 0, - Retained: false, - Payload: buf, - WaitTimeout: 5 * time.Second, - } - - if mc, err := mqttx.Get(viper.GetString("mqtt.client_id")); err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10001, "errmsg": fmt.Sprintf("mqtt: %v", err)}) - return - } else if err := mc.Publish(topic); err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10002, "errmsg": fmt.Sprintf("mqtt: %v", err)}) - return + if mc, err := mqttx.Get(viper.GetString("mqtt.client_id")); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10001, "errmsg": fmt.Sprintf("mqtt: %v", err)}) + return + } else if err := mc.Publish(topic); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10002, "errmsg": fmt.Sprintf("mqtt: %v", err)}) + return + } } c.JSON(http.StatusOK, gin.H{"msg": "ok"})