Compare commits

..

No commits in common. "37cd9382791965641c1cf773591954d803d6d0be" and "e9e4d575eb3e286c8d168b6320274b3046001e1f" have entirely different histories.

6 changed files with 73 additions and 307 deletions

View File

@ -1,71 +0,0 @@
package equipment
func NewSprayEquipment(id string) *SprayEquipment {
return &SprayEquipment{
id: id,
speed1: 0,
speed2: 0,
yv1: 0,
yv2: 0,
isRunning: false,
status: 0,
}
}
type SprayEquipment struct {
id string // 设备ID
speed1 uint8 // 电磁阀1对应的喷雾量(百分比): 0-100
speed2 uint8 // 电磁阀2对应的喷雾量(百分比): 0-100
yv1 uint8 // 电磁阀1的状态: 0 停止, 1 启动
yv2 uint8 // 电磁阀2的状态: 0 停止, 1 启动
isRunning bool // 是否运行中
status uint8 // 连接状态: 0 未知, 1 在线, 2 离线
}
func (s *SprayEquipment) GetId() string {
return s.id
}
func (s *SprayEquipment) SetSpeed1(speed uint8) {
s.speed1 = speed
}
func (s *SprayEquipment) GetSpeed1() uint8 {
return s.speed1
}
func (s *SprayEquipment) SetSpeed2(speed uint8) {
s.speed2 = speed
}
func (s *SprayEquipment) GetSpeed2() uint8 {
return s.speed2
}
func (s *SprayEquipment) SetYv1(yv1 uint8) {
s.yv1 = yv1
}
func (s *SprayEquipment) GetYv1() uint8 {
return s.yv1
}
func (s *SprayEquipment) SetYv2(yv2 uint8) {
s.yv2 = yv2
}
func (s *SprayEquipment) GetYv2() uint8 {
return s.yv2
}
func (s *SprayEquipment) SetIsRunning(isRunning bool) {
s.isRunning = isRunning
}
func (s *SprayEquipment) GetIsRunning() bool {
return s.isRunning
}
func (s *SprayEquipment) GetStatus() uint8 {
return s.status
}

2
go.mod
View File

@ -5,7 +5,6 @@ go 1.20
require ( require (
github.com/eclipse/paho.mqtt.golang v1.4.2 github.com/eclipse/paho.mqtt.golang v1.4.2
github.com/gin-gonic/gin v1.9.1 github.com/gin-gonic/gin v1.9.1
github.com/sirupsen/logrus v1.9.3
github.com/spf13/viper v1.16.0 github.com/spf13/viper v1.16.0
) )
@ -30,6 +29,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/afero v1.9.5 // indirect github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.1 // indirect github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect

162
main.go
View File

@ -3,10 +3,9 @@ package main
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors"
"flag" "flag"
"fmt" "fmt"
"iot-spray-equipment/equipment"
"iot-spray-equipment/mqttx"
"net/http" "net/http"
"os" "os"
"time" "time"
@ -18,10 +17,19 @@ import (
) )
var ( var (
pubTopic = "testtopic/325454756"
subTopic = "testtopic/PLC325454756"
mc mqtt.Client
configFilePath string configFilePath string
sprayEquipment = equipment.NewSprayEquipment("DC73AEE4CD274074BD9D01BCEE379A98")
) )
type SprayEquipment struct {
Speed1 uint8 // 电磁阀1对应的喷雾量(百分比): 0-100
Speed2 uint8 // 电磁阀2对应的喷雾量(百分比): 0-100
YV1 uint8 // 电磁阀1的状态: 0 停止, 1 启动
YV2 uint8 // 电磁阀2的状态: 0 停止, 1 启动
}
func init() { func init() {
flag.StringVar(&configFilePath, "config", "config.yml", "配置文件") flag.StringVar(&configFilePath, "config", "config.yml", "配置文件")
@ -34,134 +42,108 @@ func init() {
log.Fatal(err) log.Fatal(err)
} }
c := mqttx.NewClient(&mqttx.ClientOptions{ opts := mqtt.NewClientOptions()
Host: viper.GetString("mqtt.host"), opts.AddBroker(fmt.Sprintf("tcp://%s:%d", viper.GetString("mqtt.host"), viper.GetInt("mqtt.port")))
Port: viper.GetInt("mqtt.port"), opts.SetClientID(viper.GetString("mqtt.client_id"))
ClientOptions: mqtt.ClientOptions{ if username := viper.GetString("mqtt.username"); username != "" {
ClientID: viper.GetString("mqtt.client_id"), opts.SetUsername(username)
Username: viper.GetString("mqtt.username"), }
Password: viper.GetString("mqtt.password"), if password := viper.GetString("mqtt.password"); password != "" {
OnConnect: func(c mqtt.Client) { opts.SetPassword(password)
log.Info("mqtt: 服务器连接成功") }
}, opts.OnConnect = func(c mqtt.Client) {
OnConnectionLost: func(c mqtt.Client, err error) { log.Info("mqtt: 连接成功")
log.Warningf("mqtt: 服务器连接断开: %v", err.Error()) }
}, opts.OnConnectionLost = func(c mqtt.Client, err error) {
OnReconnecting: func(c mqtt.Client, co *mqtt.ClientOptions) { log.Warningf("mqtt: 连接断开: %v", err.Error())
log.Info("mqtt: 重连服务器...") }
}, opts.OnReconnecting = func(c mqtt.Client, co *mqtt.ClientOptions) {
}, log.Info("mqtt: 重新连接...")
}) }
mqttx.Push(c)
mc = mqtt.NewClient(opts)
} }
func main() { func main() {
go func() { go func() {
c, err := mqttx.Get(viper.GetString("mqtt.client_id"))
if err != nil {
log.WithFields(log.Fields{
"error": err,
}).Error("mqtt: 客户端获取失败")
return
}
RetryConnect: RetryConnect:
if err := c.Connect(); err != nil { if t := mc.Connect(); t.Wait() && t.Error() != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"error": err, "error": t.Error(),
}).Error("mqtt: 服务器连接失败") }).Error("mqtt: 连接失败")
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
goto RetryConnect goto RetryConnect
} }
RetrySubscribe: RetrySubscribe:
subTopic := mqttx.SubscribeTopic{ if t := mc.Subscribe(subTopic, 0, func(c mqtt.Client, m mqtt.Message) {
Name: "testtopic/PLC325454756",
Qos: 0, }); t.Wait() && t.Error() != nil {
MessageHandler: func(c mqtt.Client, m mqtt.Message) {},
WaitTimeout: 5 * time.Second,
}
if err := c.Subscribe(subTopic); err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"topic": subTopic.Name, "topic": subTopic,
"error": err, "error": t.Error(),
}).Error("mqtt: 话题订阅失败") }).Error("mqtt: 话题订阅失败")
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
goto RetrySubscribe goto RetrySubscribe
} }
pubTopic := mqttx.PublishTopic{
Name: "testtopic/325454756",
Qos: 0,
Retained: false,
Payload: `{"KEY1":0}`,
WaitTimeout: 5 * time.Second,
}
for {
if err := c.Publish(pubTopic); err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"topic": pubTopic.Name, "topic": subTopic,
"error": err, }).Info("mqtt: 话题订阅成功")
}).Error("mqtt: 话题发送失败")
} for {
<-time.After(5 * time.Second) <-time.After(5 * time.Second)
if t := mc.Publish(pubTopic, 0, false, `{"KEY1":0}`); t.Wait() && t.Error() != nil {
log.WithFields(log.Fields{
"topic": pubTopic,
"error": t.Error(),
}).Error("mqtt: 话题推送失败")
}
} }
}() }()
r := gin.Default() r := gin.Default()
r.GET("/status", StatusHandler) r.POST("/mqtt", PostMqtt)
r.POST("/mqtt", MqttHandler)
r.Run() r.Run()
} }
func StatusHandler(c *gin.Context) { func PostMqtt(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"id": sprayEquipment.GetId(),
"speed1": sprayEquipment.GetSpeed1(),
"speed2": sprayEquipment.GetSpeed2(),
"yv1": sprayEquipment.GetYv1(),
"yv2": sprayEquipment.GetYv2(),
"is_running": sprayEquipment.GetIsRunning(),
"status": sprayEquipment.GetStatus(),
})
}
func MqttHandler(c *gin.Context) {
var input struct { var input struct {
Speed1 *uint8 `json:"speed1" binding:"required,gte=0,lte=100"` // 喷雾量(百分比): 0-100 Speed1 *uint8 `json:"speed1" binding:"required,gte=0,lte=100"` // 喷雾量(百分比): 0-100
Speed2 *uint8 `json:"speed2" binding:"required,gte=0,lte=100"` // 喷雾量(百分比): 0-100 Speed2 *uint8 `json:"speed2" binding:"required,gte=0,lte=100"` // 喷雾量(百分比): 0-100
YV1 *uint8 `json:"yv1" binding:"required"` // 电磁阀1: 0 关闭, 1 启动 YV1 *uint8 `json:"yv1" binding:"required"` // 电磁阀1: 0 关闭, 1 启动
YV2 *uint8 `json:"yv2" binding:"required"` // 电磁阀2: 0 关闭, 1 启动 YV2 *uint8 `json:"yv2" binding:"required"` // 电磁阀2: 0 关闭, 1 启动
} }
if err := c.ShouldBindJSON(&input); err != nil { if err := c.ShouldBindJSON(&input); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"errcode": 400, "errmsg": err.Error()}) c.JSON(http.StatusBadRequest, gin.H{"errcode": 10000, "errmsg": err.Error()})
return return
} }
payload := make(map[string]interface{}) payload := map[string]interface{}{
payload["KEY1"] = 2 "KEY1": 2,
payload["Speed1"] = input.Speed1 "Speed1": input.Speed1,
payload["Speed2"] = input.Speed2 "Speed2": input.Speed2,
payload["YV1"] = input.YV1 "YV1": input.YV1,
payload["YV2"] = input.YV2 "YV2": input.YV2,
}
buf, _ := json.Marshal(payload) buf, _ := json.Marshal(payload)
topic := mqttx.PublishTopic{ t := mc.Publish(pubTopic, 0, false, buf)
Name: "testtopic/325454756", if !t.WaitTimeout(5 * time.Second) {
Qos: 0, c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10001, "errmsg": "MQTT: 消息上报超时"})
Retained: false,
Payload: buf,
WaitTimeout: 5 * time.Second,
}
if mc, err := mqttx.Get(viper.GetString("mqtt.client_id")); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10001, "errmsg": fmt.Sprintf("mqtt: %v", err)})
return return
} else if err := mc.Publish(topic); err != nil { }
c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10002, "errmsg": fmt.Sprintf("mqtt: %v", err)}) if err := t.Error(); err != nil {
if errors.Is(t.Error(), mqtt.ErrNotConnected) {
c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10002, "errmsg": "MQTT: 服务未连接"})
} else {
c.JSON(http.StatusInternalServerError, gin.H{"errcode": 10003, "errmsg": fmt.Sprintf("MQTT: %v", t.Error())})
}
return return
} }
c.JSON(http.StatusOK, gin.H{"msg": "ok"}) c.JSON(http.StatusOK, gin.H{"message": "ok"})
} }

View File

@ -1,75 +0,0 @@
package mqttx
import (
"errors"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
var (
ErrPublishWaitTimeout = errors.New("publish wait timeout")
ErrSubscribeWaitTimeout = errors.New("subscribe wait timeout")
)
func NewClient(o *ClientOptions) *Client {
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", o.Host, o.Port))
opts.SetClientID(o.ClientID)
opts.SetUsername(o.Username)
opts.SetPassword(o.Password)
opts.OnConnect = o.OnConnect
opts.OnConnectionLost = o.OnConnectionLost
opts.OnReconnecting = o.OnReconnecting
return &Client{
id: o.ClientID,
Client: mqtt.NewClient(opts),
}
}
type Client struct {
id string
mqtt.Client
}
func (c *Client) GetId() string {
return c.id
}
func (c *Client) Connect() error {
t := c.Client.Connect()
t.Wait()
return t.Error()
}
func (c *Client) Publish(topic PublishTopic) error {
t := c.Client.Publish(topic.Name, topic.Qos, topic.Retained, topic.Payload)
if topic.WaitTimeout > 0 {
if !t.WaitTimeout(topic.WaitTimeout) {
return ErrPublishWaitTimeout
}
} else {
t.Wait()
}
return t.Error()
}
func (c *Client) Subscribe(topic SubscribeTopic) error {
t := c.Client.Subscribe(topic.Name, topic.Qos, topic.MessageHandler)
if topic.WaitTimeout > 0 {
if !t.WaitTimeout(topic.WaitTimeout) {
return ErrSubscribeWaitTimeout
}
} else {
t.Wait()
}
return t.Error()
}
type ClientOptions struct {
Host string
Port int
mqtt.ClientOptions
}

View File

@ -1,48 +0,0 @@
package mqttx
import (
"errors"
"sync"
)
var (
ErrClientNotFound = errors.New("client not found")
ErrClientRequired = errors.New("client required")
manager = &clientManager{
pool: make(map[string]*Client),
}
)
func Get(clientId string) (*Client, error) {
return manager.Get(clientId)
}
func Push(c *Client) error {
return manager.Push(c)
}
type clientManager struct {
pool map[string]*Client
mu sync.RWMutex
}
func (m *clientManager) Get(clientId string) (*Client, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if c, ok := m.pool[clientId]; ok {
return c, nil
}
return nil, ErrClientNotFound
}
func (m *clientManager) Push(c *Client) error {
m.mu.Lock()
defer m.mu.Unlock()
if c == nil {
return ErrClientRequired
}
m.pool[c.GetId()] = c
return nil
}

View File

@ -1,22 +0,0 @@
package mqttx
import (
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
type PublishTopic struct {
Name string
Qos byte
Retained bool
Payload interface{}
WaitTimeout time.Duration
}
type SubscribeTopic struct {
Name string
Qos byte
MessageHandler mqtt.MessageHandler
WaitTimeout time.Duration
}