reality/cmd/grss/serv.go
2024-10-17 18:55:28 +08:00

176 lines
4.2 KiB
Go

package main
import (
"fmt"
"io"
"net"
"sync"
"github.com/hashicorp/yamux"
"github.com/howmp/reality"
"github.com/howmp/reality/cmd"
"github.com/sirupsen/logrus"
)
type serv struct {
ConfigPath string `short:"o" default:"config.json" description:"server config path"`
}
func (s *serv) Execute(args []string) error {
config, err := loadConfig(s.ConfigPath)
if err != nil {
return err
}
server := NewServer(config)
server.Serve()
return nil
}
type sessionManager struct {
logger logrus.FieldLogger
sessions [128]*yamux.Session
sessionsLock [128]sync.Mutex
}
func (s *sessionManager) createSession(conn net.Conn, id byte) {
if s.isSessionOpen(id) {
s.logger.Errorf("client(id:%d) session already open, close %s", id, conn.RemoteAddr())
conn.Close()
return
}
s.sessionsLock[id].Lock()
defer s.sessionsLock[id].Unlock()
session, err := yamux.Server(conn, nil)
if err != nil {
s.logger.Error(err)
conn.Close()
}
go s.checkSession(id, session)
s.sessions[id] = session
s.logger.Infof("client(id:%d) session opened %s", id, conn.RemoteAddr())
}
func (s *sessionManager) isSessionOpen(id byte) bool {
s.sessionsLock[id].Lock()
defer s.sessionsLock[id].Unlock()
session := s.sessions[id]
if session != nil {
return !session.IsClosed()
}
return false
}
func (s *sessionManager) openClientSessionStream(id byte) (*yamux.Stream, error) {
s.sessionsLock[id].Lock()
defer s.sessionsLock[id].Unlock()
session := s.sessions[id]
if session != nil {
stream, err := session.OpenStream()
if err != nil {
session.Close()
s.sessions[id] = nil
return nil, err
}
return stream, nil
}
return nil, fmt.Errorf("client(id:%d) session not open", id)
}
func (s *sessionManager) checkSession(id byte, session *yamux.Session) {
<-session.CloseChan()
s.logger.Infof("client session closed %s", session.RemoteAddr())
s.sessionsLock[id].Lock()
defer s.sessionsLock[id].Unlock()
if s.sessions[id] == session {
s.sessions[id] = nil
}
}
// Server 反向socks5代理服务端
type Server struct {
config *reality.ServerConfig
logger logrus.FieldLogger
sm *sessionManager
}
func NewServer(config *reality.ServerConfig) *Server {
logger := reality.GetLogger(config.Debug)
return &Server{
config: config,
logger: logger,
sm: &sessionManager{
logger: logger,
},
}
}
// Serve 监听端口,同时接收Reality客户端和用户连接
func (s *Server) Serve() {
_, port, err := net.SplitHostPort(s.config.ServerAddr)
if err != nil {
s.logger.Fatalf("split ServerAddr %s : %v", s.config.ServerAddr, err)
}
bindAddr := fmt.Sprintf(":%s", port)
l, err := reality.Listen(bindAddr, s.config)
if err != nil {
s.logger.Fatalf("reality listen: %v", err)
}
s.logger.Infof("reality listen %s", bindAddr)
for {
conn, err := l.Accept()
if err != nil {
s.logger.Errorf("reality accept: %v", err)
continue
}
if o, ok := conn.(reality.OverlayData); ok {
isGRSC, id := cmd.ParseShortID(o.OverlayData())
if isGRSC {
s.logger.Infof("accept client(id:%d) %s", id, conn.RemoteAddr())
go s.sm.createSession(conn, id)
continue
} else {
s.logger.Infof("accept user(id:%d) %s", id, conn.RemoteAddr())
go s.handleUser(conn, id)
continue
}
}
s.logger.Warnf("accept %s, but no overlay data", conn.RemoteAddr())
conn.Close()
}
}
func (s *Server) handleUser(conn net.Conn, id byte) {
defer conn.Close()
session, err := yamux.Client(conn, nil)
if err != nil {
s.logger.Errorf("user(id:%d) yamux: %v", id, err)
return
}
defer session.Close()
for {
stream, err := session.Accept()
if err != nil {
s.logger.Errorf("user(id:%d) session accept: %v", id, err)
return
}
s.logger.Infof("user(id:%d) stream accept %s", id, stream.RemoteAddr())
go s.handleUserStream(stream, id)
}
}
func (s *Server) handleUserStream(stream net.Conn, id byte) {
defer stream.Close()
conn, err := s.sm.openClientSessionStream(id)
if err != nil {
s.logger.Errorf("open client(id:%d) session stream: %v", id, err)
return
}
defer conn.Close()
go io.Copy(conn, stream)
io.Copy(stream, conn)
}