From 5f07f5eac7c2aac3bf4837ad1a987106a5af38da Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Sat, 25 Jan 2025 16:58:57 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=20keepalive?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/client/main.go | 5 ++++ pkg/handler/keepalive/keepalive.go | 30 +++++++++++++++++++++++ pkg/manscdp/keepalive.go | 33 ++++++++++++++++++++++++++ pkg/services/device/keepalive.go | 38 ++++++++++++++++++++++++++++++ 4 files changed, 106 insertions(+) create mode 100644 pkg/handler/keepalive/keepalive.go create mode 100644 pkg/services/device/keepalive.go diff --git a/cmd/client/main.go b/cmd/client/main.go index 16c918d..b7135ac 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -2,8 +2,10 @@ package main import ( "git.skcks.cn/Shikong/go-gb28181/pkg/config" + "git.skcks.cn/Shikong/go-gb28181/pkg/handler/keepalive" "git.skcks.cn/Shikong/go-gb28181/pkg/handler/message" "git.skcks.cn/Shikong/go-gb28181/pkg/log" + "git.skcks.cn/Shikong/go-gb28181/pkg/services/device" "git.skcks.cn/Shikong/go-gb28181/pkg/services/zlmediakit" "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" @@ -59,6 +61,9 @@ func main() { srv, _ := sipgo.NewServer(ua, sipgo.WithServerLogger(logger)) message.SetupMessageHandler(srv, client, clientConfig) + keepalive.SetupKeepalive(client, clientConfig) + device.StartKeepAlive(client) + defer device.StopKeepAlive() quit := make(chan os.Signal, 1) go func() { diff --git a/pkg/handler/keepalive/keepalive.go b/pkg/handler/keepalive/keepalive.go new file mode 100644 index 0000000..6dc81ec --- /dev/null +++ b/pkg/handler/keepalive/keepalive.go @@ -0,0 +1,30 @@ +package keepalive + +import ( + "git.skcks.cn/Shikong/go-gb28181/pkg/config" + "git.skcks.cn/Shikong/go-gb28181/pkg/manscdp" + "git.skcks.cn/Shikong/go-gb28181/pkg/services/device" + "git.skcks.cn/Shikong/go-gb28181/pkg/utils" + "git.skcks.cn/Shikong/go-gb28181/pkg/utils/charset" + "github.com/duke-git/lancet/v2/random" + "github.com/emiago/sipgo" + "github.com/emiago/sipgo/sip" +) + +func SetupKeepalive(client *sipgo.Client, clientConfig *config.ClientConfig) { + device.AddKeepaliveSender(clientConfig.DeviceId, func(client *sipgo.Client) { + sn := random.RandNumeral(6) + data := manscdp.NewKeepAliveReqWithOK(sn, clientConfig.DeviceId) + target := sip.Uri{ + User: clientConfig.ServerId, + Host: clientConfig.ServerIp, + Port: clientConfig.ServerPort, + Headers: sip.NewParams(), + } + + req := sip.NewRequest(sip.MESSAGE, target) + marshal, _ := utils.XMLMarshal(data, charset.GBK) + req.SetBody(marshal) + _ = client.WriteRequest(req) + }) +} diff --git a/pkg/manscdp/keepalive.go b/pkg/manscdp/keepalive.go index f2f6601..cd95a33 100644 --- a/pkg/manscdp/keepalive.go +++ b/pkg/manscdp/keepalive.go @@ -1 +1,34 @@ package manscdp + +import ( + "encoding/xml" + "git.skcks.cn/Shikong/go-gb28181/pkg/manscdp/cmdtype" +) + +type KeepAliveReq struct { + XMLName xml.Name `xml:"Notify"` + CmdType string `xml:"CmdType"` + SN string `xml:"SN"` + DeviceID string `xml:"DeviceID"` + Status string `xml:"Status"` +} + +func NewKeepAliveReqWithOK(sn, deviceID string) *KeepAliveReq { + return &KeepAliveReq{ + XMLName: xml.Name{Local: "Notify"}, + CmdType: cmdtype.Keepalive, + SN: sn, + DeviceID: deviceID, + Status: "OK", + } +} + +func NewKeepAliveReq(sn, deviceID, status string) *KeepAliveReq { + return &KeepAliveReq{ + XMLName: xml.Name{Local: "Notify"}, + CmdType: cmdtype.Keepalive, + SN: sn, + DeviceID: deviceID, + Status: status, + } +} diff --git a/pkg/services/device/keepalive.go b/pkg/services/device/keepalive.go new file mode 100644 index 0000000..e2406f1 --- /dev/null +++ b/pkg/services/device/keepalive.go @@ -0,0 +1,38 @@ +package device + +import ( + "git.skcks.cn/Shikong/go-gb28181/pkg/log" + "github.com/emiago/sipgo" + "time" +) + +var keepaliveTimer *time.Ticker + +type SendKeepAlive = func(client *sipgo.Client) + +var senders = make(map[string]SendKeepAlive) + +func AddKeepaliveSender(deviceId string, sender SendKeepAlive) { + log.Log().Info().Msgf("添加 keealive 发送器, deviceId: %s", deviceId) + senders[deviceId] = sender +} + +// StartKeepAlive 启动 keepalive 定时器 +// 每 30 秒发送一次 keepalive 消息到服务器 +func StartKeepAlive(client *sipgo.Client) { + keepaliveTimer = time.NewTicker(time.Second * 30) + go func() { + for range keepaliveTimer.C { + // 遍历所有注册的设备,发送 keepalive 消息 + for deviceId, sender := range senders { + log.Log().Debug().Msgf("执行 发送 keealive 消息, deviceId: %s", deviceId) + go sender(client) + } + } + }() +} + +// StopKeepAlive 停止 keepalive 定时器 +func StopKeepAlive() { + keepaliveTimer.Stop() +}