Update
parent
98cbcb185c
commit
230bc9858a
|
|
@ -1,3 +1,10 @@
|
|||
app:
|
||||
env: local
|
||||
log:
|
||||
level: debug
|
||||
server:
|
||||
host: 127.0.0.1
|
||||
port: 8080
|
||||
mqtt:
|
||||
client_id: mqttx_go
|
||||
host: broker.emqx.io
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ type SprayEquipment struct {
|
|||
isRunning bool // 是否已启动
|
||||
errCode uint8 // 错误代码: 0 无, 1 急停, 2 低水位报警
|
||||
connStatus uint8 // 连接状态: 0 未知, 1 在线, 2 离线
|
||||
connLostTime time.Duration // 掉线时间
|
||||
connLostTime time.Duration // 连接丢失时间
|
||||
connLostTimeMu sync.RWMutex
|
||||
}
|
||||
|
||||
|
|
|
|||
162
main.go
162
main.go
|
|
@ -27,13 +27,24 @@ func init() {
|
|||
|
||||
buf, err := os.ReadFile(configFilePath)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
panic(err)
|
||||
}
|
||||
viper.SetConfigType("yaml")
|
||||
if err := viper.ReadConfig(bytes.NewBuffer(buf)); err != nil {
|
||||
log.Fatal(err)
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if lvl, err := log.ParseLevel(viper.GetString("log.level")); err != nil {
|
||||
log.SetLevel(log.DebugLevel)
|
||||
} else {
|
||||
log.SetLevel(lvl)
|
||||
}
|
||||
log.SetOutput(os.Stdout)
|
||||
log.SetFormatter(&log.TextFormatter{
|
||||
TimestampFormat: "2006-01-02 15:04:05",
|
||||
FullTimestamp: true,
|
||||
})
|
||||
|
||||
c := mqttx.NewClient(&mqttx.ClientOptions{
|
||||
Host: viper.GetString("mqtt.host"),
|
||||
Port: viper.GetInt("mqtt.port"),
|
||||
|
|
@ -57,15 +68,19 @@ func init() {
|
|||
|
||||
func main() {
|
||||
go func() {
|
||||
c, err := mqttx.Get(viper.GetString("mqtt.client_id"))
|
||||
clientId := viper.GetString("mqtt.client_id")
|
||||
log.Debug("mqtt: 客户端获取中...")
|
||||
c, err := mqttx.Get(clientId)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
}).Error("mqtt: 客户端获取失败")
|
||||
return
|
||||
}
|
||||
log.Debug("mqtt: 客户端获取成功")
|
||||
|
||||
RetryConnect:
|
||||
log.Debug("mqtt: 服务器连接中...")
|
||||
if err := c.Connect(); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"error": err,
|
||||
|
|
@ -73,12 +88,18 @@ func main() {
|
|||
time.Sleep(200 * time.Millisecond)
|
||||
goto RetryConnect
|
||||
}
|
||||
log.Debug("mqtt: 服务器连接成功")
|
||||
|
||||
RetrySubscribe:
|
||||
subTopic := mqttx.SubscribeTopic{
|
||||
t := mqttx.SubscribeTopic{
|
||||
Name: "testtopic/PLC325454756",
|
||||
Qos: 0,
|
||||
MessageHandler: func(c mqtt.Client, msg mqtt.Message) {
|
||||
log.WithFields(log.Fields{
|
||||
"topic": msg.Topic(),
|
||||
"payload": string(msg.Payload()),
|
||||
}).Debug("mqtt: 订阅消息接收成功")
|
||||
|
||||
var payload struct {
|
||||
State uint8 `json:"state"`
|
||||
Speed1 uint8 `json:"Speed1"`
|
||||
|
|
@ -90,12 +111,15 @@ func main() {
|
|||
|
||||
if err := json.Unmarshal(msg.Payload(), &payload); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"topic": msg.Topic(),
|
||||
"payload": string(msg.Payload()),
|
||||
"error": err,
|
||||
}).Error("mqtt: 订阅内容解析失败")
|
||||
}).Error("mqtt: 订阅消息解析失败")
|
||||
return
|
||||
}
|
||||
|
||||
connStatus := sprayEquipment.GetConnStatus()
|
||||
|
||||
sprayEquipment.ResetConnLostTime()
|
||||
sprayEquipment.SetSpeed1(payload.Speed1)
|
||||
sprayEquipment.SetSpeed2(payload.Speed2)
|
||||
|
|
@ -103,52 +127,74 @@ func main() {
|
|||
sprayEquipment.SetYv2(payload.Yv2)
|
||||
sprayEquipment.SetIsRunning(payload.State == 1)
|
||||
sprayEquipment.SetErrCode(payload.Err)
|
||||
|
||||
if connStatus == equipment.ConnStatusOffline {
|
||||
log.Info("mqtt: 喷雾设备已上线")
|
||||
}
|
||||
},
|
||||
WaitTimeout: 5 * time.Second,
|
||||
}
|
||||
if err := c.Subscribe(subTopic); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"topic": t.Name,
|
||||
}).Debug("mqtt: 话题订阅中...")
|
||||
if err := c.Subscribe(t); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"topic": subTopic.Name,
|
||||
"topic": t.Name,
|
||||
"error": err,
|
||||
}).Error("mqtt: 话题订阅失败")
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
goto RetrySubscribe
|
||||
}
|
||||
log.WithFields(log.Fields{
|
||||
"topic": t.Name,
|
||||
}).Debug("mqtt: 话题订阅成功")
|
||||
|
||||
go func() {
|
||||
d := 5 * time.Second
|
||||
|
||||
for {
|
||||
time.Sleep(d)
|
||||
if sprayEquipment.IncreaseConnLostTime(d) >= 60*time.Second && sprayEquipment.GetConnStatus() != equipment.ConnStatusOffline {
|
||||
sprayEquipment.SetConnStatus(equipment.ConnStatusOffline)
|
||||
log.Warning("喷雾设备已离线")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
pubTopic := mqttx.PublishTopic{
|
||||
Name: "testtopic/325454756",
|
||||
Qos: 0,
|
||||
Retained: false,
|
||||
Payload: `{"KEY1":0}`,
|
||||
WaitTimeout: 5 * time.Second,
|
||||
}
|
||||
// 喷雾设备在线检查
|
||||
keepAliveTimeout := 60 * time.Second
|
||||
keepAliveInterval := 10 * time.Second
|
||||
for {
|
||||
if err := c.Publish(pubTopic); err != nil {
|
||||
go func() {
|
||||
t := mqttx.PublishTopic{
|
||||
Name: "testtopic/325454756",
|
||||
Qos: 0,
|
||||
Retained: false,
|
||||
Payload: `{"key1":0}`,
|
||||
WaitTimeout: 5 * time.Second,
|
||||
}
|
||||
log.WithFields(log.Fields{
|
||||
"topic": pubTopic.Name,
|
||||
"error": err,
|
||||
}).Error("mqtt: 话题发送失败")
|
||||
"topic": t.Name,
|
||||
"payload": t.Payload,
|
||||
}).Debug("mqtt: 话题发布中...")
|
||||
if err := c.Publish(t); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"topic": t.Name,
|
||||
"payload": t.Payload,
|
||||
"error": err,
|
||||
}).Error("mqtt: 话题发布失败")
|
||||
}
|
||||
log.WithFields(log.Fields{
|
||||
"topic": t.Name,
|
||||
"payload": t.Payload,
|
||||
}).Debug("mqtt: 话题发布成功")
|
||||
}()
|
||||
|
||||
time.Sleep(keepAliveInterval)
|
||||
|
||||
if sprayEquipment.IncreaseConnLostTime(keepAliveInterval) >= keepAliveTimeout && sprayEquipment.GetConnStatus() != equipment.ConnStatusOffline {
|
||||
sprayEquipment.SetConnStatus(equipment.ConnStatusOffline)
|
||||
log.Warning("mqtt: 喷雾设备已离线")
|
||||
}
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}()
|
||||
|
||||
r := gin.Default()
|
||||
if env := viper.GetString("app.env"); env == "production" || env == "prod" {
|
||||
gin.SetMode(gin.ReleaseMode)
|
||||
}
|
||||
r := gin.New()
|
||||
r.Use(gin.Logger(), gin.Recovery())
|
||||
r.GET("/status", StatusHandler)
|
||||
r.POST("/mqtt", MqttHandler)
|
||||
r.Run()
|
||||
r.Run(fmt.Sprintf("%s:%d", viper.GetString("server.host"), viper.GetInt("server.port")))
|
||||
}
|
||||
|
||||
func StatusHandler(c *gin.Context) {
|
||||
|
|
@ -176,29 +222,37 @@ func MqttHandler(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
payload := make(map[string]interface{})
|
||||
payload["KEY1"] = 2
|
||||
payload["Speed1"] = input.Speed1
|
||||
payload["Speed2"] = input.Speed2
|
||||
payload["YV1"] = input.YV1
|
||||
payload["YV2"] = input.YV2
|
||||
if env := viper.GetString("app.env"); env == "production" || env == "prod" {
|
||||
// 消息体必须按 key 的先后顺序来: {"key1":2,"Speed1":0,"Speed2":0,"YV1":0,"YV2":0}
|
||||
payload, _ := json.Marshal(struct {
|
||||
Key1 uint `json:"key1"`
|
||||
Speed1 uint8 `json:"Speed1"`
|
||||
Speed2 uint8 `json:"Speed2"`
|
||||
YV1 uint8 `json:"YV1"`
|
||||
YV2 uint8 `json:"YV2"`
|
||||
}{
|
||||
Key1: 2,
|
||||
Speed1: *input.Speed1,
|
||||
Speed2: *input.Speed2,
|
||||
YV1: *input.YV1,
|
||||
YV2: *input.YV2,
|
||||
})
|
||||
|
||||
buf, _ := json.Marshal(payload)
|
||||
topic := mqttx.PublishTopic{
|
||||
Name: "testtopic/325454756",
|
||||
Qos: 0,
|
||||
Retained: false,
|
||||
Payload: payload,
|
||||
WaitTimeout: 5 * time.Second,
|
||||
}
|
||||
|
||||
topic := mqttx.PublishTopic{
|
||||
Name: "testtopic/325454756",
|
||||
Qos: 0,
|
||||
Retained: false,
|
||||
Payload: buf,
|
||||
WaitTimeout: 5 * time.Second,
|
||||
}
|
||||
|
||||
if mc, err := mqttx.Get(viper.GetString("mqtt.client_id")); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10001, "errmsg": fmt.Sprintf("mqtt: %v", err)})
|
||||
return
|
||||
} else if err := mc.Publish(topic); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10002, "errmsg": fmt.Sprintf("mqtt: %v", err)})
|
||||
return
|
||||
if mc, err := mqttx.Get(viper.GetString("mqtt.client_id")); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10001, "errmsg": fmt.Sprintf("mqtt: %v", err)})
|
||||
return
|
||||
} else if err := mc.Publish(topic); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10002, "errmsg": fmt.Sprintf("mqtt: %v", err)})
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"msg": "ok"})
|
||||
|
|
|
|||
Loading…
Reference in New Issue