From 4300caaaaabafa13d14b199ffb2841be43425f42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=9D=99?= Date: Thu, 6 Jul 2023 18:06:10 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=20mqtt=20=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 2 +- main.go | 152 +++++++++++++++++++++++++---------------------- mqttx/client.go | 75 +++++++++++++++++++++++ mqttx/manager.go | 48 +++++++++++++++ mqttx/topic.go | 22 +++++++ 5 files changed, 226 insertions(+), 73 deletions(-) create mode 100644 mqttx/client.go create mode 100644 mqttx/manager.go create mode 100644 mqttx/topic.go diff --git a/go.mod b/go.mod index 299aad9..7267374 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.20 require ( github.com/eclipse/paho.mqtt.golang v1.4.2 github.com/gin-gonic/gin v1.9.1 + github.com/sirupsen/logrus v1.9.3 github.com/spf13/viper v1.16.0 ) @@ -29,7 +30,6 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect - github.com/sirupsen/logrus v1.9.3 // indirect github.com/spf13/afero v1.9.5 // indirect github.com/spf13/cast v1.5.1 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect diff --git a/main.go b/main.go index 4af19c4..353d6e8 100644 --- a/main.go +++ b/main.go @@ -3,9 +3,9 @@ package main import ( "bytes" "encoding/json" - "errors" "flag" "fmt" + "iot-spray-equipment/mqttx" "net/http" "os" "time" @@ -17,19 +17,9 @@ import ( ) var ( - pubTopic = "testtopic/325454756" - subTopic = "testtopic/PLC325454756" - mc mqtt.Client configFilePath string ) -type SprayEquipment struct { - Speed1 uint8 // 电磁阀1对应的喷雾量(百分比): 0-100 - Speed2 uint8 // 电磁阀2对应的喷雾量(百分比): 0-100 - YV1 uint8 // 电磁阀1的状态: 0 停止, 1 启动 - YV2 uint8 // 电磁阀2的状态: 0 停止, 1 启动 -} - func init() { flag.StringVar(&configFilePath, "config", "config.yml", "配置文件") @@ -42,108 +32,126 @@ func init() { log.Fatal(err) } - opts := mqtt.NewClientOptions() - opts.AddBroker(fmt.Sprintf("tcp://%s:%d", viper.GetString("mqtt.host"), viper.GetInt("mqtt.port"))) - opts.SetClientID(viper.GetString("mqtt.client_id")) - if username := viper.GetString("mqtt.username"); username != "" { - opts.SetUsername(username) - } - if password := viper.GetString("mqtt.password"); password != "" { - opts.SetPassword(password) - } - opts.OnConnect = func(c mqtt.Client) { - log.Info("mqtt: 连接成功") - } - opts.OnConnectionLost = func(c mqtt.Client, err error) { - log.Warningf("mqtt: 连接断开: %v", err.Error()) - } - opts.OnReconnecting = func(c mqtt.Client, co *mqtt.ClientOptions) { - log.Info("mqtt: 重新连接...") - } - - mc = mqtt.NewClient(opts) + c := mqttx.NewClient(&mqttx.ClientOptions{ + Host: viper.GetString("mqtt.host"), + Port: viper.GetInt("mqtt.port"), + ClientOptions: mqtt.ClientOptions{ + ClientID: viper.GetString("mqtt.client_id"), + Username: viper.GetString("mqtt.username"), + Password: viper.GetString("mqtt.password"), + OnConnect: func(c mqtt.Client) { + log.Info("mqtt: 服务器连接成功") + }, + OnConnectionLost: func(c mqtt.Client, err error) { + log.Warningf("mqtt: 服务器连接断开: %v", err.Error()) + }, + OnReconnecting: func(c mqtt.Client, co *mqtt.ClientOptions) { + log.Info("mqtt: 重连服务器...") + }, + }, + }) + mqttx.Push(c) } func main() { go func() { - RetryConnect: - if t := mc.Connect(); t.Wait() && t.Error() != nil { + c, err := mqttx.Get(viper.GetString("mqtt.client_id")) + if err != nil { log.WithFields(log.Fields{ - "error": t.Error(), - }).Error("mqtt: 连接失败") + "error": err, + }).Error("mqtt: 客户端获取失败") + return + } + + RetryConnect: + if err := c.Connect(); err != nil { + log.WithFields(log.Fields{ + "error": err, + }).Error("mqtt: 服务器连接失败") time.Sleep(200 * time.Millisecond) goto RetryConnect } RetrySubscribe: - if t := mc.Subscribe(subTopic, 0, func(c mqtt.Client, m mqtt.Message) { - - }); t.Wait() && t.Error() != nil { + subTopic := mqttx.SubscribeTopic{ + Name: "testtopic/PLC325454756", + Qos: 0, + MessageHandler: func(c mqtt.Client, m mqtt.Message) {}, + WaitTimeout: 5 * time.Second, + } + if err := c.Subscribe(subTopic); err != nil { log.WithFields(log.Fields{ - "topic": subTopic, - "error": t.Error(), + "topic": subTopic.Name, + "error": err, }).Error("mqtt: 话题订阅失败") time.Sleep(200 * time.Millisecond) goto RetrySubscribe } - log.WithFields(log.Fields{ - "topic": subTopic, - }).Info("mqtt: 话题订阅成功") - + pubTopic := mqttx.PublishTopic{ + Name: "testtopic/325454756", + Qos: 0, + Retained: false, + Payload: `{"KEY1":0}`, + WaitTimeout: 5 * time.Second, + } for { - <-time.After(5 * time.Second) - - if t := mc.Publish(pubTopic, 0, false, `{"KEY1":0}`); t.Wait() && t.Error() != nil { + if err := c.Publish(pubTopic); err != nil { log.WithFields(log.Fields{ - "topic": pubTopic, - "error": t.Error(), - }).Error("mqtt: 话题推送失败") + "topic": pubTopic.Name, + "error": err, + }).Error("mqtt: 话题发送失败") } + <-time.After(5 * time.Second) } }() r := gin.Default() - r.POST("/mqtt", PostMqtt) + r.GET("/status", StatusHandler) + r.POST("/mqtt", MqttHandler) r.Run() } -func PostMqtt(c *gin.Context) { +func StatusHandler(c *gin.Context) { + // +} + +func MqttHandler(c *gin.Context) { var input struct { Speed1 *uint8 `json:"speed1" binding:"required,gte=0,lte=100"` // 喷雾量(百分比): 0-100 Speed2 *uint8 `json:"speed2" binding:"required,gte=0,lte=100"` // 喷雾量(百分比): 0-100 YV1 *uint8 `json:"yv1" binding:"required"` // 电磁阀1: 0 关闭, 1 启动 YV2 *uint8 `json:"yv2" binding:"required"` // 电磁阀2: 0 关闭, 1 启动 } - if err := c.ShouldBindJSON(&input); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"errcode": 10000, "errmsg": err.Error()}) + c.JSON(http.StatusBadRequest, gin.H{"errcode": 400, "errmsg": err.Error()}) return } - payload := map[string]interface{}{ - "KEY1": 2, - "Speed1": input.Speed1, - "Speed2": input.Speed2, - "YV1": input.YV1, - "YV2": input.YV2, - } + payload := make(map[string]interface{}) + payload["KEY1"] = 2 + payload["Speed1"] = input.Speed1 + payload["Speed2"] = input.Speed2 + payload["YV1"] = input.YV1 + payload["YV2"] = input.YV2 buf, _ := json.Marshal(payload) - t := mc.Publish(pubTopic, 0, false, buf) - if !t.WaitTimeout(5 * time.Second) { - c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10001, "errmsg": "MQTT: 消息上报超时"}) - return + topic := mqttx.PublishTopic{ + Name: "testtopic/325454756", + Qos: 0, + Retained: false, + Payload: buf, + WaitTimeout: 5 * time.Second, } - if err := t.Error(); err != nil { - if errors.Is(t.Error(), mqtt.ErrNotConnected) { - c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10002, "errmsg": "MQTT: 服务未连接"}) - } else { - c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10003, "errmsg": fmt.Sprintf("MQTT: %v", t.Error())}) - } + + if mc, err := mqttx.Get(viper.GetString("mqtt.client_id")); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10001, "errmsg": fmt.Sprintf("mqtt: %v", err)}) + return + } else if err := mc.Publish(topic); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10002, "errmsg": fmt.Sprintf("mqtt: %v", err)}) return } - c.JSON(http.StatusOK, gin.H{"message": "ok"}) + c.JSON(http.StatusOK, gin.H{"msg": "ok"}) } diff --git a/mqttx/client.go b/mqttx/client.go new file mode 100644 index 0000000..e74b4a0 --- /dev/null +++ b/mqttx/client.go @@ -0,0 +1,75 @@ +package mqttx + +import ( + "errors" + "fmt" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +var ( + ErrPublishWaitTimeout = errors.New("publish wait timeout") + ErrSubscribeWaitTimeout = errors.New("subscribe wait timeout") +) + +func NewClient(o *ClientOptions) *Client { + opts := mqtt.NewClientOptions() + opts.AddBroker(fmt.Sprintf("tcp://%s:%d", o.Host, o.Port)) + opts.SetClientID(o.ClientID) + opts.SetUsername(o.Username) + opts.SetPassword(o.Password) + opts.OnConnect = o.OnConnect + opts.OnConnectionLost = o.OnConnectionLost + opts.OnReconnecting = o.OnReconnecting + + return &Client{ + id: o.ClientID, + Client: mqtt.NewClient(opts), + } +} + +type Client struct { + id string + mqtt.Client +} + +func (c *Client) GetId() string { + return c.id +} + +func (c *Client) Connect() error { + t := c.Client.Connect() + t.Wait() + return t.Error() +} + +func (c *Client) Publish(topic PublishTopic) error { + t := c.Client.Publish(topic.Name, topic.Qos, topic.Retained, topic.Payload) + if topic.WaitTimeout > 0 { + if !t.WaitTimeout(topic.WaitTimeout) { + return ErrPublishWaitTimeout + } + } else { + t.Wait() + } + return t.Error() +} + +func (c *Client) Subscribe(topic SubscribeTopic) error { + t := c.Client.Subscribe(topic.Name, topic.Qos, topic.MessageHandler) + if topic.WaitTimeout > 0 { + if !t.WaitTimeout(topic.WaitTimeout) { + return ErrSubscribeWaitTimeout + } + } else { + t.Wait() + } + return t.Error() +} + +type ClientOptions struct { + Host string + Port int + + mqtt.ClientOptions +} diff --git a/mqttx/manager.go b/mqttx/manager.go new file mode 100644 index 0000000..c7c9a71 --- /dev/null +++ b/mqttx/manager.go @@ -0,0 +1,48 @@ +package mqttx + +import ( + "errors" + "sync" +) + +var ( + ErrClientNotFound = errors.New("client not found") + ErrClientRequired = errors.New("client required") + manager = &clientManager{ + pool: make(map[string]*Client), + } +) + +func Get(clientId string) (*Client, error) { + return manager.Get(clientId) +} + +func Push(c *Client) error { + return manager.Push(c) +} + +type clientManager struct { + pool map[string]*Client + mu sync.RWMutex +} + +func (m *clientManager) Get(clientId string) (*Client, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + if c, ok := m.pool[clientId]; ok { + return c, nil + } + return nil, ErrClientNotFound +} + +func (m *clientManager) Push(c *Client) error { + m.mu.Lock() + defer m.mu.Unlock() + + if c == nil { + return ErrClientRequired + } + m.pool[c.GetId()] = c + return nil +} diff --git a/mqttx/topic.go b/mqttx/topic.go new file mode 100644 index 0000000..297d9e9 --- /dev/null +++ b/mqttx/topic.go @@ -0,0 +1,22 @@ +package mqttx + +import ( + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +type PublishTopic struct { + Name string + Qos byte + Retained bool + Payload interface{} + WaitTimeout time.Duration +} + +type SubscribeTopic struct { + Name string + Qos byte + MessageHandler mqtt.MessageHandler + WaitTimeout time.Duration +}