iot-spray-equipment/main.go

275 lines
7.7 KiB
Go

package main
import (
"encoding/json"
"flag"
"fmt"
"iot-spray-equipment/database"
"iot-spray-equipment/equipment"
"iot-spray-equipment/mqttx"
"net/http"
"os"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
var (
configFile string
sprayEquipment = equipment.NewSprayEquipment("DC73AEE4CD274074BD9D01BCEE379A98")
)
func init() {
flag.StringVar(&configFile, "config", "config.yml", "配置文件")
flag.Parse()
fmt.Println(configFile)
viper.SetConfigFile(configFile)
if err := viper.ReadInConfig(); err != nil {
panic(err)
}
if lvl, err := log.ParseLevel(viper.GetString("log.level")); err != nil {
log.SetLevel(log.DebugLevel)
} else {
log.SetLevel(lvl)
}
log.SetOutput(os.Stdout)
log.SetFormatter(&log.TextFormatter{
TimestampFormat: "2006-01-02 15:04:05",
FullTimestamp: true,
})
c := mqttx.NewClient(&mqttx.ClientOptions{
Host: viper.GetString("mqtt.host"),
Port: viper.GetInt("mqtt.port"),
ClientOptions: mqtt.ClientOptions{
ClientID: viper.GetString("mqtt.client_id"),
Username: viper.GetString("mqtt.username"),
Password: viper.GetString("mqtt.password"),
OnConnect: func(c mqtt.Client) {
log.Info("mqtt: 服务器连接成功")
},
OnConnectionLost: func(c mqtt.Client, err error) {
log.Warningf("mqtt: 服务器连接断开: %v", err.Error())
},
OnReconnecting: func(c mqtt.Client, co *mqtt.ClientOptions) {
log.Info("mqtt: 重连服务器...")
},
},
})
mqttx.Push(c)
}
func main() {
db, err := database.New(viper.GetStringMap("database"))
if err != nil {
log.Fatal("gorm 创建失败: ", err)
}
go func() {
clientId := viper.GetString("mqtt.client_id")
log.Debug("mqtt: 客户端获取中...")
c, err := mqttx.Get(clientId)
if err != nil {
log.WithFields(log.Fields{
"error": err,
}).Error("mqtt: 客户端获取失败")
return
}
log.Debug("mqtt: 客户端获取成功")
RetryConnect:
log.Debug("mqtt: 服务器连接中...")
if err := c.Connect(); err != nil {
log.WithFields(log.Fields{
"error": err,
}).Error("mqtt: 服务器连接失败")
time.Sleep(200 * time.Millisecond)
goto RetryConnect
}
log.Debug("mqtt: 服务器连接成功")
RetrySubscribe:
t := mqttx.SubscribeTopic{
Name: "testtopic/PLC325454756",
Qos: 0,
MessageHandler: func(c mqtt.Client, msg mqtt.Message) {
log.WithFields(log.Fields{
"topic": msg.Topic(),
"payload": string(msg.Payload()),
}).Debug("mqtt: 订阅消息接收成功")
var payload struct {
State uint8 `json:"state"`
Speed1 uint8 `json:"Speed1"`
Speed2 uint8 `json:"Speed2"`
Yv1 uint8 `json:"YV1"`
Yv2 uint8 `json:"YV2"`
Err uint8 `json:"ERROR"`
}
if err := json.Unmarshal(msg.Payload(), &payload); err != nil {
log.WithFields(log.Fields{
"topic": msg.Topic(),
"payload": string(msg.Payload()),
"error": err,
}).Error("mqtt: 订阅消息解析失败")
return
}
connStatus := sprayEquipment.GetConnStatus()
sprayEquipment.ResetConnLostTime()
sprayEquipment.SetSpeed1(payload.Speed1)
sprayEquipment.SetSpeed2(payload.Speed2)
sprayEquipment.SetYv1(payload.Yv1)
sprayEquipment.SetYv2(payload.Yv2)
sprayEquipment.SetIsRunning(payload.State == 1)
sprayEquipment.SetErrCode(payload.Err)
if connStatus == equipment.ConnStatusOffline {
log.Info("mqtt: 喷雾设备已上线")
}
if connStatus != equipment.ConnStatusOnline {
result := db.Exec("UPDATE devices SET state = 1 WHERE sn = ?", sprayEquipment.GetId())
if result.Error != nil {
log.Warning("mqtt: 喷雾设备状态更新失败: ", result.Error)
}
}
},
WaitTimeout: 5 * time.Second,
}
log.WithFields(log.Fields{
"topic": t.Name,
}).Debug("mqtt: 话题订阅中...")
if err := c.Subscribe(t); err != nil {
log.WithFields(log.Fields{
"topic": t.Name,
"error": err,
}).Error("mqtt: 话题订阅失败")
time.Sleep(200 * time.Millisecond)
goto RetrySubscribe
}
log.WithFields(log.Fields{
"topic": t.Name,
}).Debug("mqtt: 话题订阅成功")
// 喷雾设备在线检查
keepAliveTimeout := 60 * time.Second
keepAliveInterval := 10 * time.Second
for {
go func() {
t := mqttx.PublishTopic{
Name: "testtopic/325454756",
Qos: 0,
Retained: false,
Payload: `{"key1":0}`,
WaitTimeout: 5 * time.Second,
}
log.WithFields(log.Fields{
"topic": t.Name,
"payload": t.Payload,
}).Debug("mqtt: 话题发布中...")
if err := c.Publish(t); err != nil {
log.WithFields(log.Fields{
"topic": t.Name,
"payload": t.Payload,
"error": err,
}).Error("mqtt: 话题发布失败")
}
log.WithFields(log.Fields{
"topic": t.Name,
"payload": t.Payload,
}).Debug("mqtt: 话题发布成功")
}()
time.Sleep(keepAliveInterval)
if sprayEquipment.IncreaseConnLostTime(keepAliveInterval) >= keepAliveTimeout && sprayEquipment.GetConnStatus() != equipment.ConnStatusOffline {
sprayEquipment.SetConnStatus(equipment.ConnStatusOffline)
log.Warning("mqtt: 喷雾设备已离线")
result := db.Exec("UPDATE devices SET state = 2 WHERE sn = ?", sprayEquipment.GetId())
if result.Error != nil {
log.Warning("mqtt: 喷雾设备状态更新失败: ", result.Error)
}
}
}
}()
if env := viper.GetString("app.env"); env == "production" || env == "prod" {
gin.SetMode(gin.ReleaseMode)
}
r := gin.New()
r.Use(gin.Logger(), gin.Recovery())
r.GET("/status", StatusHandler)
r.POST("/mqtt", MqttHandler)
r.Run(fmt.Sprintf("%s:%d", viper.GetString("server.host"), viper.GetInt("server.port")))
}
func StatusHandler(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"id": sprayEquipment.GetId(),
"speed1": sprayEquipment.GetSpeed1(),
"speed2": sprayEquipment.GetSpeed2(),
"yv1": sprayEquipment.GetYv1(),
"yv2": sprayEquipment.GetYv2(),
"is_running": sprayEquipment.GetIsRunning(),
"error": sprayEquipment.GetErrCode(),
"status": sprayEquipment.GetConnStatus(),
})
}
func MqttHandler(c *gin.Context) {
var input struct {
Speed1 *uint8 `json:"speed1" binding:"required,gte=0,lte=100"` // 喷雾量(百分比): 0-100
Speed2 *uint8 `json:"speed2" binding:"required,gte=0,lte=100"` // 喷雾量(百分比): 0-100
YV1 *uint8 `json:"yv1" binding:"required"` // 电磁阀1: 0 关闭, 1 启动
YV2 *uint8 `json:"yv2" binding:"required"` // 电磁阀2: 0 关闭, 1 启动
}
if err := c.ShouldBindJSON(&input); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"errcode": 400, "errmsg": err.Error()})
return
}
if env := viper.GetString("app.env"); env == "production" || env == "prod" {
// 消息体必须按 key 的先后顺序来: {"key1":2,"Speed1":0,"Speed2":0,"YV1":0,"YV2":0}
payload, _ := json.Marshal(struct {
Key1 uint `json:"key1"`
Speed1 uint8 `json:"Speed1"`
Speed2 uint8 `json:"Speed2"`
YV1 uint8 `json:"YV1"`
YV2 uint8 `json:"YV2"`
}{
Key1: 2,
Speed1: *input.Speed1,
Speed2: *input.Speed2,
YV1: *input.YV1,
YV2: *input.YV2,
})
topic := mqttx.PublishTopic{
Name: "testtopic/325454756",
Qos: 0,
Retained: false,
Payload: payload,
WaitTimeout: 5 * time.Second,
}
if mc, err := mqttx.Get(viper.GetString("mqtt.client_id")); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10001, "errmsg": fmt.Sprintf("mqtt: %v", err)})
return
} else if err := mc.Publish(topic); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10002, "errmsg": fmt.Sprintf("mqtt: %v", err)})
return
}
}
c.JSON(http.StatusOK, gin.H{"msg": "ok"})
}