package main import ( "encoding/json" "flag" "fmt" "iot-spray-equipment/database" "iot-spray-equipment/equipment" "iot-spray-equipment/mqttx" "net/http" "os" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gin-gonic/gin" log "github.com/sirupsen/logrus" "github.com/spf13/viper" ) var ( configFile string sprayEquipment = equipment.NewSprayEquipment("DC73AEE4CD274074BD9D01BCEE379A98") ) func init() { flag.StringVar(&configFile, "config", "config.yml", "配置文件") flag.Parse() fmt.Println(configFile) viper.SetConfigFile(configFile) if err := viper.ReadInConfig(); err != nil { panic(err) } if lvl, err := log.ParseLevel(viper.GetString("log.level")); err != nil { log.SetLevel(log.DebugLevel) } else { log.SetLevel(lvl) } log.SetOutput(os.Stdout) log.SetFormatter(&log.TextFormatter{ TimestampFormat: "2006-01-02 15:04:05", FullTimestamp: true, }) c := mqttx.NewClient(&mqttx.ClientOptions{ Host: viper.GetString("mqtt.host"), Port: viper.GetInt("mqtt.port"), ClientOptions: mqtt.ClientOptions{ ClientID: viper.GetString("mqtt.client_id"), Username: viper.GetString("mqtt.username"), Password: viper.GetString("mqtt.password"), OnConnect: func(c mqtt.Client) { log.Info("mqtt: 服务器连接成功") }, OnConnectionLost: func(c mqtt.Client, err error) { log.Warningf("mqtt: 服务器连接断开: %v", err.Error()) }, OnReconnecting: func(c mqtt.Client, co *mqtt.ClientOptions) { log.Info("mqtt: 重连服务器...") }, }, }) mqttx.Push(c) } func main() { db, err := database.New(viper.GetStringMap("database")) if err != nil { log.Fatal("gorm 创建失败: ", err) } go func() { clientId := viper.GetString("mqtt.client_id") log.Debug("mqtt: 客户端获取中...") c, err := mqttx.Get(clientId) if err != nil { log.WithFields(log.Fields{ "error": err, }).Error("mqtt: 客户端获取失败") return } log.Debug("mqtt: 客户端获取成功") RetryConnect: log.Debug("mqtt: 服务器连接中...") if err := c.Connect(); err != nil { log.WithFields(log.Fields{ "error": err, }).Error("mqtt: 服务器连接失败") time.Sleep(200 * time.Millisecond) goto RetryConnect } log.Debug("mqtt: 服务器连接成功") RetrySubscribe: t := mqttx.SubscribeTopic{ Name: "testtopic/PLC325454756", Qos: 0, MessageHandler: func(c mqtt.Client, msg mqtt.Message) { log.WithFields(log.Fields{ "topic": msg.Topic(), "payload": string(msg.Payload()), }).Debug("mqtt: 订阅消息接收成功") var payload struct { State uint8 `json:"state"` Speed1 uint8 `json:"Speed1"` Speed2 uint8 `json:"Speed2"` Yv1 uint8 `json:"YV1"` Yv2 uint8 `json:"YV2"` Err uint8 `json:"ERROR"` } if err := json.Unmarshal(msg.Payload(), &payload); err != nil { log.WithFields(log.Fields{ "topic": msg.Topic(), "payload": string(msg.Payload()), "error": err, }).Error("mqtt: 订阅消息解析失败") return } connStatus := sprayEquipment.GetConnStatus() sprayEquipment.ResetConnLostTime() sprayEquipment.SetSpeed1(payload.Speed1) sprayEquipment.SetSpeed2(payload.Speed2) sprayEquipment.SetYv1(payload.Yv1) sprayEquipment.SetYv2(payload.Yv2) sprayEquipment.SetIsRunning(payload.State == 1) sprayEquipment.SetErrCode(payload.Err) if connStatus == equipment.ConnStatusOffline { log.Info("mqtt: 喷雾设备已上线") } if connStatus != equipment.ConnStatusOnline { result := db.Exec("UPDATE devices SET state = 1 WHERE sn = ?", sprayEquipment.GetId()) if result.Error != nil { log.Warning("mqtt: 喷雾设备状态更新失败: ", result.Error) } } }, WaitTimeout: 5 * time.Second, } log.WithFields(log.Fields{ "topic": t.Name, }).Debug("mqtt: 话题订阅中...") if err := c.Subscribe(t); err != nil { log.WithFields(log.Fields{ "topic": t.Name, "error": err, }).Error("mqtt: 话题订阅失败") time.Sleep(200 * time.Millisecond) goto RetrySubscribe } log.WithFields(log.Fields{ "topic": t.Name, }).Debug("mqtt: 话题订阅成功") // 喷雾设备在线检查 keepAliveTimeout := 60 * time.Second keepAliveInterval := 10 * time.Second for { go func() { t := mqttx.PublishTopic{ Name: "testtopic/325454756", Qos: 0, Retained: false, Payload: `{"key1":0}`, WaitTimeout: 5 * time.Second, } log.WithFields(log.Fields{ "topic": t.Name, "payload": t.Payload, }).Debug("mqtt: 话题发布中...") if err := c.Publish(t); err != nil { log.WithFields(log.Fields{ "topic": t.Name, "payload": t.Payload, "error": err, }).Error("mqtt: 话题发布失败") } log.WithFields(log.Fields{ "topic": t.Name, "payload": t.Payload, }).Debug("mqtt: 话题发布成功") }() time.Sleep(keepAliveInterval) if sprayEquipment.IncreaseConnLostTime(keepAliveInterval) >= keepAliveTimeout && sprayEquipment.GetConnStatus() != equipment.ConnStatusOffline { sprayEquipment.SetConnStatus(equipment.ConnStatusOffline) log.Warning("mqtt: 喷雾设备已离线") result := db.Exec("UPDATE devices SET state = 2 WHERE sn = ?", sprayEquipment.GetId()) if result.Error != nil { log.Warning("mqtt: 喷雾设备状态更新失败: ", result.Error) } } } }() if env := viper.GetString("app.env"); env == "production" || env == "prod" { gin.SetMode(gin.ReleaseMode) } r := gin.New() r.Use(gin.Logger(), gin.Recovery()) r.GET("/status", StatusHandler) r.POST("/mqtt", MqttHandler) r.Run(fmt.Sprintf("%s:%d", viper.GetString("server.host"), viper.GetInt("server.port"))) } func StatusHandler(c *gin.Context) { c.JSON(http.StatusOK, gin.H{ "id": sprayEquipment.GetId(), "speed1": sprayEquipment.GetSpeed1(), "speed2": sprayEquipment.GetSpeed2(), "yv1": sprayEquipment.GetYv1(), "yv2": sprayEquipment.GetYv2(), "is_running": sprayEquipment.GetIsRunning(), "error": sprayEquipment.GetErrCode(), "status": sprayEquipment.GetConnStatus(), }) } func MqttHandler(c *gin.Context) { var input struct { Speed1 *uint8 `json:"speed1" binding:"required,gte=0,lte=100"` // 喷雾量(百分比): 0-100 Speed2 *uint8 `json:"speed2" binding:"required,gte=0,lte=100"` // 喷雾量(百分比): 0-100 YV1 *uint8 `json:"yv1" binding:"required"` // 电磁阀1: 0 关闭, 1 启动 YV2 *uint8 `json:"yv2" binding:"required"` // 电磁阀2: 0 关闭, 1 启动 } if err := c.ShouldBindJSON(&input); err != nil { c.JSON(http.StatusBadRequest, gin.H{"errcode": 400, "errmsg": err.Error()}) return } if env := viper.GetString("app.env"); env == "production" || env == "prod" { // 消息体必须按 key 的先后顺序来: {"key1":2,"Speed1":0,"Speed2":0,"YV1":0,"YV2":0} payload, _ := json.Marshal(struct { Key1 uint `json:"key1"` Speed1 uint8 `json:"Speed1"` Speed2 uint8 `json:"Speed2"` YV1 uint8 `json:"YV1"` YV2 uint8 `json:"YV2"` }{ Key1: 2, Speed1: *input.Speed1, Speed2: *input.Speed2, YV1: *input.YV1, YV2: *input.YV2, }) topic := mqttx.PublishTopic{ Name: "testtopic/325454756", Qos: 0, Retained: false, Payload: payload, WaitTimeout: 5 * time.Second, } if mc, err := mqttx.Get(viper.GetString("mqtt.client_id")); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10001, "errmsg": fmt.Sprintf("mqtt: %v", err)}) return } else if err := mc.Publish(topic); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10002, "errmsg": fmt.Sprintf("mqtt: %v", err)}) return } } c.JSON(http.StatusOK, gin.H{"msg": "ok"}) }