catalog 实现

突破 sipgo udp MTU 1500 限制
This commit is contained in:
shikong 2025-01-24 18:26:20 +08:00
parent eb1219c2fe
commit 14837e20bb
Signed by: Shikong
GPG Key ID: BD85FF18B373C341
6 changed files with 200 additions and 131 deletions

View File

@ -1,13 +1,13 @@
package main package main
import ( import (
"encoding/xml"
"git.skcks.cn/Shikong/go-gb28181/pkg/config" "git.skcks.cn/Shikong/go-gb28181/pkg/config"
"git.skcks.cn/Shikong/go-gb28181/pkg/manscdp" "git.skcks.cn/Shikong/go-gb28181/pkg/handler/message"
"git.skcks.cn/Shikong/go-gb28181/pkg/utils" "git.skcks.cn/Shikong/go-gb28181/pkg/log"
"github.com/emiago/sipgo" "github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip" "github.com/emiago/sipgo/sip"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"math"
"time" "time"
"context" "context"
@ -18,6 +18,8 @@ import (
) )
func main() { func main() {
// 解决 sip go udp 包 > 1500 报错
sip.UDPMTUSize = math.MaxInt
output := zerolog.NewConsoleWriter(func(w *zerolog.ConsoleWriter) { output := zerolog.NewConsoleWriter(func(w *zerolog.ConsoleWriter) {
w.Out = os.Stdout w.Out = os.Stdout
w.TimeFormat = time.RFC3339 w.TimeFormat = time.RFC3339
@ -38,87 +40,86 @@ func main() {
logger = logger.Level(zerolog.InfoLevel) logger = logger.Level(zerolog.InfoLevel)
} }
ctx := context.Background() log.SetLogger(&logger)
ctx := context.Background()
addr := fmt.Sprintf("%s:%d", clientConfig.ListenIp, clientConfig.ListenPort) addr := fmt.Sprintf("%s:%d", clientConfig.ListenIp, clientConfig.ListenPort)
ua, _ := sipgo.NewUA( ua, _ := sipgo.NewUA(
sipgo.WithUserAgent(clientConfig.DeviceId), sipgo.WithUserAgent(clientConfig.DeviceId),
sipgo.WithUserAgentHostname(addr)) sipgo.WithUserAgentHostname(addr))
srv, _ := sipgo.NewServer(ua, sipgo.WithServerLogger(logger))
srv.OnMessage(func(req *sip.Request, tx sip.ServerTransaction) {
query := new(manscdp.CatalogReq)
_ = utils.XMLUnmarshal(req.Body(), query)
logger.Info().Msgf("收到查询指令: %s\n%+v\n", query.CmdType, query)
//response := sip.NewResponse(sip.StatusOK, "OK")
//err = tx.Respond(response)
//if err != nil {
// logger.Error().Err(err)
//}
tx.Done()
go func() {
client, _ := sipgo.NewClient(ua, client, _ := sipgo.NewClient(ua,
sipgo.WithClientHostname(clientConfig.ListenIp), sipgo.WithClientHostname(clientConfig.ListenIp),
sipgo.WithClientPort(clientConfig.ListenPort)) sipgo.WithClientPort(clientConfig.ListenPort))
resp := new(manscdp.CatalogResp) srv, _ := sipgo.NewServer(ua, sipgo.WithServerLogger(logger))
resp.XMLName = xml.Name{Local: "Response"}
resp.DeviceID = clientConfig.DeviceId
resp.CmdType = "Catalog"
resp.SumNum = "1"
resp.DeviceList = new(manscdp.CateLogDeviceList)
resp.DeviceList.XMLName = xml.Name{Local: "DeviceList"}
resp.DeviceList.Num = "1"
resp.DeviceList.Item = make([]manscdp.CateLogDevice, 1)
resp.DeviceList.Item[0].DeviceID = "44050100002000000002"
resp.DeviceList.Item[0].Name = "设备名称"
resp.DeviceList.Item[0].Manufacturer = "设备厂商"
resp.DeviceList.Item[0].ErrCode = "0"
resp.DeviceList.Item[0].Port = fmt.Sprintf("%d", clientConfig.ListenPort)
resp.SN = query.SN message.SetupMessageHandler(srv, client, clientConfig)
marshal, _ := utils.XMLMarshal(resp, "gbk") //srv.OnMessage(func(req *sip.Request, tx sip.ServerTransaction) {
logger.Info().Msgf("回复查询指令: %s\n%+v\n", query.CmdType, resp) // query := new(manscdp.CatalogReq)
// _ = utils.XMLUnmarshal(req.Body(), query)
target := sip.Uri{ // logger.Info().Msgf("收到查询指令: %s\n%+v\n", query.CmdType, query)
User: clientConfig.ServerId, //
Host: clientConfig.ServerIp, // tx.Done()
Port: clientConfig.ServerPort, //
Headers: sip.NewParams(), // go func() {
} // resp := new(manscdp.CatalogResp)
// resp.XMLName = xml.Name{Local: "Response"}
time.Sleep(time.Second * 1) // resp.DeviceID = clientConfig.DeviceId
//uri := sip.Uri{User: "44050100002000000002", Host: "10.10.10.20", Port: 5099} // resp.CmdType = "Catalog"
nReq := sip.NewRequest(sip.MESSAGE, target) // resp.SumNum = "1"
nReq.SetTransport("UDP") // resp.DeviceList = new(manscdp.CateLogDeviceList)
to := sip.NewHeader("To", req.GetHeader("From").Value()) // resp.DeviceList.XMLName = xml.Name{Local: "DeviceList"}
from := sip.NewHeader("From", req.GetHeader("To").Value()) // resp.DeviceList.Num = "1"
nReq.AppendHeader(to) // resp.DeviceList.Item = make([]manscdp.CateLogDevice, 0)
nReq.AppendHeader(from) //
//nReq.AppendHeader(req.GetHeader("Call-ID")) // device := manscdp.CateLogDevice{}
nReq.AppendHeader(sip.NewHeader("Content-Type", "Application/MANSCDP+xml")) // device.DeviceID = clientConfig.DeviceId
// device.Name = "设备名称"
nReq.SetBody(marshal) // device.Manufacturer = "设备厂商"
err := sipgo.ClientRequestBuild(client, nReq) // device.ErrCode = "0"
if err != nil { // device.Port = fmt.Sprintf("%d", clientConfig.ListenPort)
logger.Error().Err(err) //
} // resp.DeviceList.Item = append(resp.DeviceList.Item, device)
//
logger.Info().Msgf("向服务器发送查询指令: %s\n%+v\n", query.CmdType, nReq) // resp.SN = query.SN
//
err = client.WriteRequest(nReq) // marshal, _ := utils.XMLMarshal(resp, "gbk")
if err != nil { // logger.Info().Msgf("回复查询指令: %s\n%+v\n", query.CmdType, resp)
logger.Error().Err(err) //
return // target := sip.Uri{
} // User: clientConfig.ServerId,
_ = client.Close() // Host: clientConfig.ServerIp,
logger.Info().Msgf("向服务器发送查询指令完成") // Port: clientConfig.ServerPort,
}() // Headers: sip.NewParams(),
}) // }
//
// //uri := sip.Uri{User: "44050100002000000002", Host: "10.10.10.20", Port: 5099}
// nReq := sip.NewRequest(sip.MESSAGE, target)
// nReq.SetTransport("UDP")
// to := sip.NewHeader("To", req.GetHeader("From").Value())
// from := sip.NewHeader("From", req.GetHeader("To").Value())
// nReq.AppendHeader(to)
// nReq.AppendHeader(from)
// //nReq.AppendHeader(req.GetHeader("Call-ID"))
// nReq.AppendHeader(sip.NewHeader("Content-Type", "Application/MANSCDP+xml"))
//
// nReq.SetBody(marshal)
// err := sipgo.ClientRequestBuild(client, nReq)
// if err != nil {
// logger.Error().Msgf("向服务器发送查询指令失败: %s", err)
// }
//
// logger.Debug().Msgf("向服务器发送查询指令: %s\n%+v\n", query.CmdType, nReq)
//
// err = client.WriteRequest(nReq)
// if err != nil {
// logger.Error().Msgf("向服务器发送查询指令失败: %s", err)
// return
// }
// }()
//})
quit := make(chan os.Signal, 1) quit := make(chan os.Signal, 1)
go func() { go func() {

View File

@ -2,8 +2,8 @@ package config
import ( import (
"errors" "errors"
"fmt"
"git.skcks.cn/Shikong/go-gb28181/pkg/constants" "git.skcks.cn/Shikong/go-gb28181/pkg/constants"
"git.skcks.cn/Shikong/go-gb28181/pkg/logger"
"github.com/pelletier/go-toml/v2" "github.com/pelletier/go-toml/v2"
"github.com/spf13/viper" "github.com/spf13/viper"
"os" "os"
@ -45,7 +45,7 @@ func ReadClientConfig() (*ClientConfig, error) {
var configFileNotFoundError viper.ConfigFileNotFoundError var configFileNotFoundError viper.ConfigFileNotFoundError
if errors.As(err, &configFileNotFoundError) { if errors.As(err, &configFileNotFoundError) {
_ = GenerateConfig() _ = GenerateConfig()
logger.Log().Fatalf("未找到配置文件, 已生成示例配置文件于运行路径下") fmt.Println("未找到配置文件, 已生成示例配置文件于运行路径下")
} }
} }

View File

@ -0,0 +1,78 @@
package message
import (
"encoding/xml"
"fmt"
"git.skcks.cn/Shikong/go-gb28181/pkg/config"
"git.skcks.cn/Shikong/go-gb28181/pkg/log"
"git.skcks.cn/Shikong/go-gb28181/pkg/manscdp"
"git.skcks.cn/Shikong/go-gb28181/pkg/utils"
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
)
func init() {
handlers["Catalog"] = CatalogHandler
}
func CatalogHandler(client *sipgo.Client, clientConfig *config.ClientConfig, req *sip.Request, tx sip.ServerTransaction) {
query := new(manscdp.CatalogReq)
_ = utils.XMLUnmarshal(req.Body(), query)
log.Log().Info().Msgf("收到查询指令: %s\n%+v\n", query.CmdType, query)
tx.Done()
resp := new(manscdp.CatalogResp)
resp.XMLName = xml.Name{Local: "Response"}
resp.DeviceID = clientConfig.DeviceId
resp.CmdType = "Catalog"
resp.SumNum = "1"
resp.DeviceList = new(manscdp.CateLogDeviceList)
resp.DeviceList.XMLName = xml.Name{Local: "DeviceList"}
resp.DeviceList.Num = "1"
resp.DeviceList.Item = make([]manscdp.CateLogDevice, 0)
device := manscdp.CateLogDevice{}
device.DeviceID = clientConfig.DeviceId
device.Name = "设备名称"
device.Manufacturer = "设备厂商"
device.ErrCode = "0"
device.Port = fmt.Sprintf("%d", clientConfig.ListenPort)
resp.DeviceList.Item = append(resp.DeviceList.Item, device)
resp.SN = query.SN
marshal, _ := utils.XMLMarshal(resp, "gbk")
log.Log().Info().Msgf("回复查询指令: %s\n%+v\n", query.CmdType, resp)
target := sip.Uri{
User: clientConfig.ServerId,
Host: clientConfig.ServerIp,
Port: clientConfig.ServerPort,
Headers: sip.NewParams(),
}
//uri := sip.Uri{User: "44050100002000000002", Host: "10.10.10.20", Port: 5099}
nReq := sip.NewRequest(sip.MESSAGE, target)
nReq.SetTransport("UDP")
to := sip.NewHeader("To", req.GetHeader("From").Value())
from := sip.NewHeader("From", req.GetHeader("To").Value())
nReq.AppendHeader(to)
nReq.AppendHeader(from)
//nReq.AppendHeader(req.GetHeader("Call-ID"))
nReq.AppendHeader(sip.NewHeader("Content-Type", "Application/MANSCDP+xml"))
nReq.SetBody(marshal)
err := sipgo.ClientRequestBuild(client, nReq)
if err != nil {
log.Log().Error().Msgf("向服务器发送查询指令失败: %s", err)
}
log.Log().Debug().Msgf("向服务器发送查询指令: %s\n%+v\n", query.CmdType, nReq)
err = client.WriteRequest(nReq)
if err != nil {
log.Log().Error().Msgf("向服务器发送查询指令失败: %s", err)
return
}
}

View File

@ -0,0 +1,34 @@
package message
import (
"git.skcks.cn/Shikong/go-gb28181/pkg/config"
"git.skcks.cn/Shikong/go-gb28181/pkg/log"
"git.skcks.cn/Shikong/go-gb28181/pkg/manscdp"
"git.skcks.cn/Shikong/go-gb28181/pkg/utils"
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
)
type messageHandler = func(client *sipgo.Client, clientConfig *config.ClientConfig, req *sip.Request, tx sip.ServerTransaction)
var handlers = make(map[string]messageHandler)
func SetupMessageHandler(srv *sipgo.Server, client *sipgo.Client, clientConfig *config.ClientConfig) {
srv.OnMessage(func(req *sip.Request, tx sip.ServerTransaction) {
body := req.Body()
message := new(manscdp.MessageReq)
err := utils.XMLUnmarshal(body, message)
if err != nil {
log.Log().Error().Msgf("XML Unmarshal error: %v", err)
}
handlers, ok := handlers[message.CmdType]
if ok {
handlers(client, clientConfig, req, tx)
} else {
log.Log().Warn().Msgf("未找到可用的消息处理器: %s", message.CmdType)
}
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusOK, "OK", nil))
})
}

View File

@ -1,54 +0,0 @@
package logger
import (
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"os"
)
var logger *zap.Logger
var sugarLogger *zap.SugaredLogger
var level = &atomic.String{}
func init() {
SetLevel(zapcore.DebugLevel)
encoder := zapcore.NewConsoleEncoder(DefaultEncoderConfig())
multiWriteSyncer := zapcore.NewMultiWriteSyncer(DefaultConsoleSyncer())
core := zapcore.NewCore(encoder, multiWriteSyncer, DefaultLevelEnabler())
logger = zap.New(core, zap.AddCaller())
_ = logger.Sync()
sugarLogger = logger.Sugar()
_ = sugarLogger.Sync()
}
func SetLevel(l zapcore.Level) {
level.Store(l.String())
}
func DefaultLevelEnabler() zap.LevelEnablerFunc {
return func(z zapcore.Level) bool {
l, _ := zapcore.ParseLevel(level.Load())
return z >= l
}
}
func DefaultTimeEncoder() (timeEncoder zapcore.TimeEncoder) {
timeEncoder = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05.000")
return
}
func DefaultEncoderConfig() (encoderConfig zapcore.EncoderConfig) {
encoderConfig = zap.NewProductionEncoderConfig()
encoderConfig.EncodeTime = DefaultTimeEncoder()
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
return
}
func DefaultConsoleSyncer() zapcore.WriteSyncer {
return zapcore.AddSync(os.Stdout)
}
func Log() *zap.SugaredLogger {
return sugarLogger
}

10
pkg/manscdp/message.go Normal file
View File

@ -0,0 +1,10 @@
package manscdp
import "encoding/xml"
type MessageReq struct {
XMLName xml.Name `xml:"Query"`
CmdType string `xml:"CmdType"`
SN string `xml:"SN"`
DeviceID string `xml:"DeviceID"`
}