From 14837e20bbfaf3e754014851937a788b70cfc906 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Fri, 24 Jan 2025 18:26:20 +0800 Subject: [PATCH] =?UTF-8?q?catalog=20=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 突破 sipgo udp MTU 1500 限制 --- cmd/client/main.go | 151 +++++++++++++++++---------------- pkg/config/loader.go | 4 +- pkg/handler/message/catalog.go | 78 +++++++++++++++++ pkg/handler/message/message.go | 34 ++++++++ pkg/logger/logger.go | 54 ------------ pkg/manscdp/message.go | 10 +++ 6 files changed, 200 insertions(+), 131 deletions(-) create mode 100644 pkg/handler/message/catalog.go create mode 100644 pkg/handler/message/message.go delete mode 100644 pkg/logger/logger.go create mode 100644 pkg/manscdp/message.go diff --git a/cmd/client/main.go b/cmd/client/main.go index 7c6293f..6617bb4 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -1,13 +1,13 @@ package main import ( - "encoding/xml" "git.skcks.cn/Shikong/go-gb28181/pkg/config" - "git.skcks.cn/Shikong/go-gb28181/pkg/manscdp" - "git.skcks.cn/Shikong/go-gb28181/pkg/utils" + "git.skcks.cn/Shikong/go-gb28181/pkg/handler/message" + "git.skcks.cn/Shikong/go-gb28181/pkg/log" "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" "github.com/rs/zerolog" + "math" "time" "context" @@ -18,6 +18,8 @@ import ( ) func main() { + // 解决 sip go udp 包 > 1500 报错 + sip.UDPMTUSize = math.MaxInt output := zerolog.NewConsoleWriter(func(w *zerolog.ConsoleWriter) { w.Out = os.Stdout w.TimeFormat = time.RFC3339 @@ -38,87 +40,86 @@ func main() { logger = logger.Level(zerolog.InfoLevel) } - ctx := context.Background() + log.SetLogger(&logger) + ctx := context.Background() addr := fmt.Sprintf("%s:%d", clientConfig.ListenIp, clientConfig.ListenPort) ua, _ := sipgo.NewUA( sipgo.WithUserAgent(clientConfig.DeviceId), sipgo.WithUserAgentHostname(addr)) + client, _ := sipgo.NewClient(ua, + sipgo.WithClientHostname(clientConfig.ListenIp), + sipgo.WithClientPort(clientConfig.ListenPort)) + srv, _ := sipgo.NewServer(ua, sipgo.WithServerLogger(logger)) - srv.OnMessage(func(req *sip.Request, tx sip.ServerTransaction) { - query := new(manscdp.CatalogReq) - _ = utils.XMLUnmarshal(req.Body(), query) - logger.Info().Msgf("收到查询指令: %s\n%+v\n", query.CmdType, query) + message.SetupMessageHandler(srv, client, clientConfig) - //response := sip.NewResponse(sip.StatusOK, "OK") - //err = tx.Respond(response) - //if err != nil { - // logger.Error().Err(err) - //} - tx.Done() - - go func() { - client, _ := sipgo.NewClient(ua, - sipgo.WithClientHostname(clientConfig.ListenIp), - sipgo.WithClientPort(clientConfig.ListenPort)) - - resp := new(manscdp.CatalogResp) - resp.XMLName = xml.Name{Local: "Response"} - resp.DeviceID = clientConfig.DeviceId - resp.CmdType = "Catalog" - resp.SumNum = "1" - resp.DeviceList = new(manscdp.CateLogDeviceList) - resp.DeviceList.XMLName = xml.Name{Local: "DeviceList"} - resp.DeviceList.Num = "1" - resp.DeviceList.Item = make([]manscdp.CateLogDevice, 1) - resp.DeviceList.Item[0].DeviceID = "44050100002000000002" - resp.DeviceList.Item[0].Name = "设备名称" - resp.DeviceList.Item[0].Manufacturer = "设备厂商" - resp.DeviceList.Item[0].ErrCode = "0" - resp.DeviceList.Item[0].Port = fmt.Sprintf("%d", clientConfig.ListenPort) - - resp.SN = query.SN - - marshal, _ := utils.XMLMarshal(resp, "gbk") - logger.Info().Msgf("回复查询指令: %s\n%+v\n", query.CmdType, resp) - - target := sip.Uri{ - User: clientConfig.ServerId, - Host: clientConfig.ServerIp, - Port: clientConfig.ServerPort, - Headers: sip.NewParams(), - } - - time.Sleep(time.Second * 1) - //uri := sip.Uri{User: "44050100002000000002", Host: "10.10.10.20", Port: 5099} - nReq := sip.NewRequest(sip.MESSAGE, target) - nReq.SetTransport("UDP") - to := sip.NewHeader("To", req.GetHeader("From").Value()) - from := sip.NewHeader("From", req.GetHeader("To").Value()) - nReq.AppendHeader(to) - nReq.AppendHeader(from) - //nReq.AppendHeader(req.GetHeader("Call-ID")) - nReq.AppendHeader(sip.NewHeader("Content-Type", "Application/MANSCDP+xml")) - - nReq.SetBody(marshal) - err := sipgo.ClientRequestBuild(client, nReq) - if err != nil { - logger.Error().Err(err) - } - - logger.Info().Msgf("向服务器发送查询指令: %s\n%+v\n", query.CmdType, nReq) - - err = client.WriteRequest(nReq) - if err != nil { - logger.Error().Err(err) - return - } - _ = client.Close() - logger.Info().Msgf("向服务器发送查询指令完成") - }() - }) + //srv.OnMessage(func(req *sip.Request, tx sip.ServerTransaction) { + // query := new(manscdp.CatalogReq) + // _ = utils.XMLUnmarshal(req.Body(), query) + // logger.Info().Msgf("收到查询指令: %s\n%+v\n", query.CmdType, query) + // + // tx.Done() + // + // go func() { + // resp := new(manscdp.CatalogResp) + // resp.XMLName = xml.Name{Local: "Response"} + // resp.DeviceID = clientConfig.DeviceId + // resp.CmdType = "Catalog" + // resp.SumNum = "1" + // resp.DeviceList = new(manscdp.CateLogDeviceList) + // resp.DeviceList.XMLName = xml.Name{Local: "DeviceList"} + // resp.DeviceList.Num = "1" + // resp.DeviceList.Item = make([]manscdp.CateLogDevice, 0) + // + // device := manscdp.CateLogDevice{} + // device.DeviceID = clientConfig.DeviceId + // device.Name = "设备名称" + // device.Manufacturer = "设备厂商" + // device.ErrCode = "0" + // device.Port = fmt.Sprintf("%d", clientConfig.ListenPort) + // + // resp.DeviceList.Item = append(resp.DeviceList.Item, device) + // + // resp.SN = query.SN + // + // marshal, _ := utils.XMLMarshal(resp, "gbk") + // logger.Info().Msgf("回复查询指令: %s\n%+v\n", query.CmdType, resp) + // + // target := sip.Uri{ + // User: clientConfig.ServerId, + // Host: clientConfig.ServerIp, + // Port: clientConfig.ServerPort, + // Headers: sip.NewParams(), + // } + // + // //uri := sip.Uri{User: "44050100002000000002", Host: "10.10.10.20", Port: 5099} + // nReq := sip.NewRequest(sip.MESSAGE, target) + // nReq.SetTransport("UDP") + // to := sip.NewHeader("To", req.GetHeader("From").Value()) + // from := sip.NewHeader("From", req.GetHeader("To").Value()) + // nReq.AppendHeader(to) + // nReq.AppendHeader(from) + // //nReq.AppendHeader(req.GetHeader("Call-ID")) + // nReq.AppendHeader(sip.NewHeader("Content-Type", "Application/MANSCDP+xml")) + // + // nReq.SetBody(marshal) + // err := sipgo.ClientRequestBuild(client, nReq) + // if err != nil { + // logger.Error().Msgf("向服务器发送查询指令失败: %s", err) + // } + // + // logger.Debug().Msgf("向服务器发送查询指令: %s\n%+v\n", query.CmdType, nReq) + // + // err = client.WriteRequest(nReq) + // if err != nil { + // logger.Error().Msgf("向服务器发送查询指令失败: %s", err) + // return + // } + // }() + //}) quit := make(chan os.Signal, 1) go func() { diff --git a/pkg/config/loader.go b/pkg/config/loader.go index f0e76ee..c73f00e 100644 --- a/pkg/config/loader.go +++ b/pkg/config/loader.go @@ -2,8 +2,8 @@ package config import ( "errors" + "fmt" "git.skcks.cn/Shikong/go-gb28181/pkg/constants" - "git.skcks.cn/Shikong/go-gb28181/pkg/logger" "github.com/pelletier/go-toml/v2" "github.com/spf13/viper" "os" @@ -45,7 +45,7 @@ func ReadClientConfig() (*ClientConfig, error) { var configFileNotFoundError viper.ConfigFileNotFoundError if errors.As(err, &configFileNotFoundError) { _ = GenerateConfig() - logger.Log().Fatalf("未找到配置文件, 已生成示例配置文件于运行路径下") + fmt.Println("未找到配置文件, 已生成示例配置文件于运行路径下") } } diff --git a/pkg/handler/message/catalog.go b/pkg/handler/message/catalog.go new file mode 100644 index 0000000..1cf77c2 --- /dev/null +++ b/pkg/handler/message/catalog.go @@ -0,0 +1,78 @@ +package message + +import ( + "encoding/xml" + "fmt" + "git.skcks.cn/Shikong/go-gb28181/pkg/config" + "git.skcks.cn/Shikong/go-gb28181/pkg/log" + "git.skcks.cn/Shikong/go-gb28181/pkg/manscdp" + "git.skcks.cn/Shikong/go-gb28181/pkg/utils" + "github.com/emiago/sipgo" + "github.com/emiago/sipgo/sip" +) + +func init() { + handlers["Catalog"] = CatalogHandler +} + +func CatalogHandler(client *sipgo.Client, clientConfig *config.ClientConfig, req *sip.Request, tx sip.ServerTransaction) { + query := new(manscdp.CatalogReq) + _ = utils.XMLUnmarshal(req.Body(), query) + log.Log().Info().Msgf("收到查询指令: %s\n%+v\n", query.CmdType, query) + tx.Done() + + resp := new(manscdp.CatalogResp) + resp.XMLName = xml.Name{Local: "Response"} + resp.DeviceID = clientConfig.DeviceId + resp.CmdType = "Catalog" + resp.SumNum = "1" + resp.DeviceList = new(manscdp.CateLogDeviceList) + resp.DeviceList.XMLName = xml.Name{Local: "DeviceList"} + resp.DeviceList.Num = "1" + resp.DeviceList.Item = make([]manscdp.CateLogDevice, 0) + + device := manscdp.CateLogDevice{} + device.DeviceID = clientConfig.DeviceId + device.Name = "设备名称" + device.Manufacturer = "设备厂商" + device.ErrCode = "0" + device.Port = fmt.Sprintf("%d", clientConfig.ListenPort) + + resp.DeviceList.Item = append(resp.DeviceList.Item, device) + + resp.SN = query.SN + + marshal, _ := utils.XMLMarshal(resp, "gbk") + log.Log().Info().Msgf("回复查询指令: %s\n%+v\n", query.CmdType, resp) + + target := sip.Uri{ + User: clientConfig.ServerId, + Host: clientConfig.ServerIp, + Port: clientConfig.ServerPort, + Headers: sip.NewParams(), + } + + //uri := sip.Uri{User: "44050100002000000002", Host: "10.10.10.20", Port: 5099} + nReq := sip.NewRequest(sip.MESSAGE, target) + nReq.SetTransport("UDP") + to := sip.NewHeader("To", req.GetHeader("From").Value()) + from := sip.NewHeader("From", req.GetHeader("To").Value()) + nReq.AppendHeader(to) + nReq.AppendHeader(from) + //nReq.AppendHeader(req.GetHeader("Call-ID")) + nReq.AppendHeader(sip.NewHeader("Content-Type", "Application/MANSCDP+xml")) + + nReq.SetBody(marshal) + err := sipgo.ClientRequestBuild(client, nReq) + if err != nil { + log.Log().Error().Msgf("向服务器发送查询指令失败: %s", err) + } + + log.Log().Debug().Msgf("向服务器发送查询指令: %s\n%+v\n", query.CmdType, nReq) + + err = client.WriteRequest(nReq) + if err != nil { + log.Log().Error().Msgf("向服务器发送查询指令失败: %s", err) + return + } +} diff --git a/pkg/handler/message/message.go b/pkg/handler/message/message.go new file mode 100644 index 0000000..b56512b --- /dev/null +++ b/pkg/handler/message/message.go @@ -0,0 +1,34 @@ +package message + +import ( + "git.skcks.cn/Shikong/go-gb28181/pkg/config" + "git.skcks.cn/Shikong/go-gb28181/pkg/log" + "git.skcks.cn/Shikong/go-gb28181/pkg/manscdp" + "git.skcks.cn/Shikong/go-gb28181/pkg/utils" + "github.com/emiago/sipgo" + "github.com/emiago/sipgo/sip" +) + +type messageHandler = func(client *sipgo.Client, clientConfig *config.ClientConfig, req *sip.Request, tx sip.ServerTransaction) + +var handlers = make(map[string]messageHandler) + +func SetupMessageHandler(srv *sipgo.Server, client *sipgo.Client, clientConfig *config.ClientConfig) { + srv.OnMessage(func(req *sip.Request, tx sip.ServerTransaction) { + body := req.Body() + message := new(manscdp.MessageReq) + err := utils.XMLUnmarshal(body, message) + if err != nil { + log.Log().Error().Msgf("XML Unmarshal error: %v", err) + } + + handlers, ok := handlers[message.CmdType] + if ok { + handlers(client, clientConfig, req, tx) + } else { + log.Log().Warn().Msgf("未找到可用的消息处理器: %s", message.CmdType) + } + + _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusOK, "OK", nil)) + }) +} diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go deleted file mode 100644 index 1b74810..0000000 --- a/pkg/logger/logger.go +++ /dev/null @@ -1,54 +0,0 @@ -package logger - -import ( - "go.uber.org/atomic" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "os" -) - -var logger *zap.Logger -var sugarLogger *zap.SugaredLogger -var level = &atomic.String{} - -func init() { - SetLevel(zapcore.DebugLevel) - encoder := zapcore.NewConsoleEncoder(DefaultEncoderConfig()) - multiWriteSyncer := zapcore.NewMultiWriteSyncer(DefaultConsoleSyncer()) - core := zapcore.NewCore(encoder, multiWriteSyncer, DefaultLevelEnabler()) - logger = zap.New(core, zap.AddCaller()) - _ = logger.Sync() - sugarLogger = logger.Sugar() - _ = sugarLogger.Sync() -} - -func SetLevel(l zapcore.Level) { - level.Store(l.String()) -} - -func DefaultLevelEnabler() zap.LevelEnablerFunc { - return func(z zapcore.Level) bool { - l, _ := zapcore.ParseLevel(level.Load()) - return z >= l - } -} - -func DefaultTimeEncoder() (timeEncoder zapcore.TimeEncoder) { - timeEncoder = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05.000") - return -} - -func DefaultEncoderConfig() (encoderConfig zapcore.EncoderConfig) { - encoderConfig = zap.NewProductionEncoderConfig() - encoderConfig.EncodeTime = DefaultTimeEncoder() - encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder - return -} - -func DefaultConsoleSyncer() zapcore.WriteSyncer { - return zapcore.AddSync(os.Stdout) -} - -func Log() *zap.SugaredLogger { - return sugarLogger -} diff --git a/pkg/manscdp/message.go b/pkg/manscdp/message.go new file mode 100644 index 0000000..5161c04 --- /dev/null +++ b/pkg/manscdp/message.go @@ -0,0 +1,10 @@ +package manscdp + +import "encoding/xml" + +type MessageReq struct { + XMLName xml.Name `xml:"Query"` + CmdType string `xml:"CmdType"` + SN string `xml:"SN"` + DeviceID string `xml:"DeviceID"` +}