优化 mqtt 连接

master
李静 2023-07-06 18:06:10 +08:00
parent e9e4d575eb
commit 4300caaaaa
5 changed files with 226 additions and 73 deletions

2
go.mod
View File

@ -5,6 +5,7 @@ go 1.20
require (
github.com/eclipse/paho.mqtt.golang v1.4.2
github.com/gin-gonic/gin v1.9.1
github.com/sirupsen/logrus v1.9.3
github.com/spf13/viper v1.16.0
)
@ -29,7 +30,6 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect

152
main.go
View File

@ -3,9 +3,9 @@ package main
import (
"bytes"
"encoding/json"
"errors"
"flag"
"fmt"
"iot-spray-equipment/mqttx"
"net/http"
"os"
"time"
@ -17,19 +17,9 @@ import (
)
var (
pubTopic = "testtopic/325454756"
subTopic = "testtopic/PLC325454756"
mc mqtt.Client
configFilePath string
)
type SprayEquipment struct {
Speed1 uint8 // 电磁阀1对应的喷雾量(百分比): 0-100
Speed2 uint8 // 电磁阀2对应的喷雾量(百分比): 0-100
YV1 uint8 // 电磁阀1的状态: 0 停止, 1 启动
YV2 uint8 // 电磁阀2的状态: 0 停止, 1 启动
}
func init() {
flag.StringVar(&configFilePath, "config", "config.yml", "配置文件")
@ -42,108 +32,126 @@ func init() {
log.Fatal(err)
}
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", viper.GetString("mqtt.host"), viper.GetInt("mqtt.port")))
opts.SetClientID(viper.GetString("mqtt.client_id"))
if username := viper.GetString("mqtt.username"); username != "" {
opts.SetUsername(username)
}
if password := viper.GetString("mqtt.password"); password != "" {
opts.SetPassword(password)
}
opts.OnConnect = func(c mqtt.Client) {
log.Info("mqtt: 连接成功")
}
opts.OnConnectionLost = func(c mqtt.Client, err error) {
log.Warningf("mqtt: 连接断开: %v", err.Error())
}
opts.OnReconnecting = func(c mqtt.Client, co *mqtt.ClientOptions) {
log.Info("mqtt: 重新连接...")
}
mc = mqtt.NewClient(opts)
c := mqttx.NewClient(&mqttx.ClientOptions{
Host: viper.GetString("mqtt.host"),
Port: viper.GetInt("mqtt.port"),
ClientOptions: mqtt.ClientOptions{
ClientID: viper.GetString("mqtt.client_id"),
Username: viper.GetString("mqtt.username"),
Password: viper.GetString("mqtt.password"),
OnConnect: func(c mqtt.Client) {
log.Info("mqtt: 服务器连接成功")
},
OnConnectionLost: func(c mqtt.Client, err error) {
log.Warningf("mqtt: 服务器连接断开: %v", err.Error())
},
OnReconnecting: func(c mqtt.Client, co *mqtt.ClientOptions) {
log.Info("mqtt: 重连服务器...")
},
},
})
mqttx.Push(c)
}
func main() {
go func() {
RetryConnect:
if t := mc.Connect(); t.Wait() && t.Error() != nil {
c, err := mqttx.Get(viper.GetString("mqtt.client_id"))
if err != nil {
log.WithFields(log.Fields{
"error": t.Error(),
}).Error("mqtt: 连接失败")
"error": err,
}).Error("mqtt: 客户端获取失败")
return
}
RetryConnect:
if err := c.Connect(); err != nil {
log.WithFields(log.Fields{
"error": err,
}).Error("mqtt: 服务器连接失败")
time.Sleep(200 * time.Millisecond)
goto RetryConnect
}
RetrySubscribe:
if t := mc.Subscribe(subTopic, 0, func(c mqtt.Client, m mqtt.Message) {
}); t.Wait() && t.Error() != nil {
subTopic := mqttx.SubscribeTopic{
Name: "testtopic/PLC325454756",
Qos: 0,
MessageHandler: func(c mqtt.Client, m mqtt.Message) {},
WaitTimeout: 5 * time.Second,
}
if err := c.Subscribe(subTopic); err != nil {
log.WithFields(log.Fields{
"topic": subTopic,
"error": t.Error(),
"topic": subTopic.Name,
"error": err,
}).Error("mqtt: 话题订阅失败")
time.Sleep(200 * time.Millisecond)
goto RetrySubscribe
}
log.WithFields(log.Fields{
"topic": subTopic,
}).Info("mqtt: 话题订阅成功")
pubTopic := mqttx.PublishTopic{
Name: "testtopic/325454756",
Qos: 0,
Retained: false,
Payload: `{"KEY1":0}`,
WaitTimeout: 5 * time.Second,
}
for {
<-time.After(5 * time.Second)
if t := mc.Publish(pubTopic, 0, false, `{"KEY1":0}`); t.Wait() && t.Error() != nil {
if err := c.Publish(pubTopic); err != nil {
log.WithFields(log.Fields{
"topic": pubTopic,
"error": t.Error(),
}).Error("mqtt: 话题送失败")
"topic": pubTopic.Name,
"error": err,
}).Error("mqtt: 话题送失败")
}
<-time.After(5 * time.Second)
}
}()
r := gin.Default()
r.POST("/mqtt", PostMqtt)
r.GET("/status", StatusHandler)
r.POST("/mqtt", MqttHandler)
r.Run()
}
func PostMqtt(c *gin.Context) {
func StatusHandler(c *gin.Context) {
//
}
func MqttHandler(c *gin.Context) {
var input struct {
Speed1 *uint8 `json:"speed1" binding:"required,gte=0,lte=100"` // 喷雾量(百分比): 0-100
Speed2 *uint8 `json:"speed2" binding:"required,gte=0,lte=100"` // 喷雾量(百分比): 0-100
YV1 *uint8 `json:"yv1" binding:"required"` // 电磁阀1: 0 关闭, 1 启动
YV2 *uint8 `json:"yv2" binding:"required"` // 电磁阀2: 0 关闭, 1 启动
}
if err := c.ShouldBindJSON(&input); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"errcode": 10000, "errmsg": err.Error()})
c.JSON(http.StatusBadRequest, gin.H{"errcode": 400, "errmsg": err.Error()})
return
}
payload := map[string]interface{}{
"KEY1": 2,
"Speed1": input.Speed1,
"Speed2": input.Speed2,
"YV1": input.YV1,
"YV2": input.YV2,
}
payload := make(map[string]interface{})
payload["KEY1"] = 2
payload["Speed1"] = input.Speed1
payload["Speed2"] = input.Speed2
payload["YV1"] = input.YV1
payload["YV2"] = input.YV2
buf, _ := json.Marshal(payload)
t := mc.Publish(pubTopic, 0, false, buf)
if !t.WaitTimeout(5 * time.Second) {
c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10001, "errmsg": "MQTT: 消息上报超时"})
return
topic := mqttx.PublishTopic{
Name: "testtopic/325454756",
Qos: 0,
Retained: false,
Payload: buf,
WaitTimeout: 5 * time.Second,
}
if err := t.Error(); err != nil {
if errors.Is(t.Error(), mqtt.ErrNotConnected) {
c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10002, "errmsg": "MQTT: 服务未连接"})
} else {
c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10003, "errmsg": fmt.Sprintf("MQTT: %v", t.Error())})
}
if mc, err := mqttx.Get(viper.GetString("mqtt.client_id")); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10001, "errmsg": fmt.Sprintf("mqtt: %v", err)})
return
} else if err := mc.Publish(topic); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10002, "errmsg": fmt.Sprintf("mqtt: %v", err)})
return
}
c.JSON(http.StatusOK, gin.H{"message": "ok"})
c.JSON(http.StatusOK, gin.H{"msg": "ok"})
}

75
mqttx/client.go 100644
View File

@ -0,0 +1,75 @@
package mqttx
import (
"errors"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
var (
ErrPublishWaitTimeout = errors.New("publish wait timeout")
ErrSubscribeWaitTimeout = errors.New("subscribe wait timeout")
)
func NewClient(o *ClientOptions) *Client {
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", o.Host, o.Port))
opts.SetClientID(o.ClientID)
opts.SetUsername(o.Username)
opts.SetPassword(o.Password)
opts.OnConnect = o.OnConnect
opts.OnConnectionLost = o.OnConnectionLost
opts.OnReconnecting = o.OnReconnecting
return &Client{
id: o.ClientID,
Client: mqtt.NewClient(opts),
}
}
type Client struct {
id string
mqtt.Client
}
func (c *Client) GetId() string {
return c.id
}
func (c *Client) Connect() error {
t := c.Client.Connect()
t.Wait()
return t.Error()
}
func (c *Client) Publish(topic PublishTopic) error {
t := c.Client.Publish(topic.Name, topic.Qos, topic.Retained, topic.Payload)
if topic.WaitTimeout > 0 {
if !t.WaitTimeout(topic.WaitTimeout) {
return ErrPublishWaitTimeout
}
} else {
t.Wait()
}
return t.Error()
}
func (c *Client) Subscribe(topic SubscribeTopic) error {
t := c.Client.Subscribe(topic.Name, topic.Qos, topic.MessageHandler)
if topic.WaitTimeout > 0 {
if !t.WaitTimeout(topic.WaitTimeout) {
return ErrSubscribeWaitTimeout
}
} else {
t.Wait()
}
return t.Error()
}
type ClientOptions struct {
Host string
Port int
mqtt.ClientOptions
}

48
mqttx/manager.go 100644
View File

@ -0,0 +1,48 @@
package mqttx
import (
"errors"
"sync"
)
var (
ErrClientNotFound = errors.New("client not found")
ErrClientRequired = errors.New("client required")
manager = &clientManager{
pool: make(map[string]*Client),
}
)
func Get(clientId string) (*Client, error) {
return manager.Get(clientId)
}
func Push(c *Client) error {
return manager.Push(c)
}
type clientManager struct {
pool map[string]*Client
mu sync.RWMutex
}
func (m *clientManager) Get(clientId string) (*Client, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if c, ok := m.pool[clientId]; ok {
return c, nil
}
return nil, ErrClientNotFound
}
func (m *clientManager) Push(c *Client) error {
m.mu.Lock()
defer m.mu.Unlock()
if c == nil {
return ErrClientRequired
}
m.pool[c.GetId()] = c
return nil
}

22
mqttx/topic.go 100644
View File

@ -0,0 +1,22 @@
package mqttx
import (
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
type PublishTopic struct {
Name string
Qos byte
Retained bool
Payload interface{}
WaitTimeout time.Duration
}
type SubscribeTopic struct {
Name string
Qos byte
MessageHandler mqtt.MessageHandler
WaitTimeout time.Duration
}