package main import ( "bytes" "encoding/json" "errors" "flag" "fmt" "net/http" "os" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gin-gonic/gin" log "github.com/sirupsen/logrus" "github.com/spf13/viper" ) var ( pubTopic = "testtopic/325454756" subTopic = "testtopic/PLC325454756" mc mqtt.Client configFilePath string ) type SprayEquipment struct { Speed1 uint8 // 电磁阀1对应的喷雾量(百分比): 0-100 Speed2 uint8 // 电磁阀2对应的喷雾量(百分比): 0-100 YV1 uint8 // 电磁阀1的状态: 0 停止, 1 启动 YV2 uint8 // 电磁阀2的状态: 0 停止, 1 启动 } func init() { flag.StringVar(&configFilePath, "config", "config.yml", "配置文件") buf, err := os.ReadFile(configFilePath) if err != nil { log.Fatal(err) } viper.SetConfigType("yaml") if err := viper.ReadConfig(bytes.NewBuffer(buf)); err != nil { log.Fatal(err) } opts := mqtt.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s:%d", viper.GetString("mqtt.host"), viper.GetInt("mqtt.port"))) opts.SetClientID(viper.GetString("mqtt.client_id")) if username := viper.GetString("mqtt.username"); username != "" { opts.SetUsername(username) } if password := viper.GetString("mqtt.password"); password != "" { opts.SetPassword(password) } opts.OnConnect = func(c mqtt.Client) { log.Info("mqtt: 连接成功") } opts.OnConnectionLost = func(c mqtt.Client, err error) { log.Warningf("mqtt: 连接断开: %v", err.Error()) } opts.OnReconnecting = func(c mqtt.Client, co *mqtt.ClientOptions) { log.Info("mqtt: 重新连接...") } mc = mqtt.NewClient(opts) } func main() { go func() { RetryConnect: if t := mc.Connect(); t.Wait() && t.Error() != nil { log.WithFields(log.Fields{ "error": t.Error(), }).Error("mqtt: 连接失败") time.Sleep(200 * time.Millisecond) goto RetryConnect } RetrySubscribe: if t := mc.Subscribe(subTopic, 0, func(c mqtt.Client, m mqtt.Message) { }); t.Wait() && t.Error() != nil { log.WithFields(log.Fields{ "topic": subTopic, "error": t.Error(), }).Error("mqtt: 话题订阅失败") time.Sleep(200 * time.Millisecond) goto RetrySubscribe } log.WithFields(log.Fields{ "topic": subTopic, }).Info("mqtt: 话题订阅成功") for { <-time.After(5 * time.Second) if t := mc.Publish(pubTopic, 0, false, `{"KEY1":0}`); t.Wait() && t.Error() != nil { log.WithFields(log.Fields{ "topic": pubTopic, "error": t.Error(), }).Error("mqtt: 话题推送失败") } } }() r := gin.Default() r.POST("/mqtt", PostMqtt) r.Run() } func PostMqtt(c *gin.Context) { var input struct { Speed1 *uint8 `json:"speed1" binding:"required,gte=0,lte=100"` // 喷雾量(百分比): 0-100 Speed2 *uint8 `json:"speed2" binding:"required,gte=0,lte=100"` // 喷雾量(百分比): 0-100 YV1 *uint8 `json:"yv1" binding:"required"` // 电磁阀1: 0 关闭, 1 启动 YV2 *uint8 `json:"yv2" binding:"required"` // 电磁阀2: 0 关闭, 1 启动 } if err := c.ShouldBindJSON(&input); err != nil { c.JSON(http.StatusBadRequest, gin.H{"errcode": 10000, "errmsg": err.Error()}) return } payload := map[string]interface{}{ "KEY1": 2, "Speed1": input.Speed1, "Speed2": input.Speed2, "YV1": input.YV1, "YV2": input.YV2, } buf, _ := json.Marshal(payload) t := mc.Publish(pubTopic, 0, false, buf) if !t.WaitTimeout(5 * time.Second) { c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10001, "errmsg": "MQTT: 消息上报超时"}) return } if err := t.Error(); err != nil { if errors.Is(t.Error(), mqtt.ErrNotConnected) { c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10002, "errmsg": "MQTT: 服务未连接"}) } else { c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10003, "errmsg": fmt.Sprintf("MQTT: %v", t.Error())}) } return } c.JSON(http.StatusOK, gin.H{"message": "ok"}) }