package session import ( "io" "net" "os" "sync" "time" "github.com/metacubex/mihomo/transport/anytls/pipe" ) // Stream implements net.Conn type Stream struct { id uint32 sess *Session pipeR *pipe.PipeReader pipeW *pipe.PipeWriter writeDeadline pipe.PipeDeadline dieOnce sync.Once dieHook func() dieErr error reportOnce sync.Once } // newStream initiates a Stream struct func newStream(id uint32, sess *Session) *Stream { s := new(Stream) s.id = id s.sess = sess s.pipeR, s.pipeW = pipe.Pipe() s.writeDeadline = pipe.MakePipeDeadline() return s } // Read implements net.Conn func (s *Stream) Read(b []byte) (n int, err error) { n, err = s.pipeR.Read(b) if n == 0 && s.dieErr != nil { err = s.dieErr } return } // Write implements net.Conn func (s *Stream) Write(b []byte) (n int, err error) { select { case <-s.writeDeadline.Wait(): return 0, os.ErrDeadlineExceeded default: } f := newFrame(cmdPSH, s.id) f.data = b n, err = s.sess.writeFrame(f) return } // Close implements net.Conn func (s *Stream) Close() error { return s.CloseWithError(io.ErrClosedPipe) } func (s *Stream) CloseWithError(err error) error { // if err != io.ErrClosedPipe { // logrus.Debugln(err) // } var once bool s.dieOnce.Do(func() { s.dieErr = err s.pipeR.Close() once = true }) if once { if s.dieHook != nil { s.dieHook() s.dieHook = nil } return s.sess.streamClosed(s.id) } else { return s.dieErr } } func (s *Stream) SetReadDeadline(t time.Time) error { return s.pipeR.SetReadDeadline(t) } func (s *Stream) SetWriteDeadline(t time.Time) error { s.writeDeadline.Set(t) return nil } func (s *Stream) SetDeadline(t time.Time) error { s.SetWriteDeadline(t) return s.SetReadDeadline(t) } // LocalAddr satisfies net.Conn interface func (s *Stream) LocalAddr() net.Addr { if ts, ok := s.sess.conn.(interface { LocalAddr() net.Addr }); ok { return ts.LocalAddr() } return nil } // RemoteAddr satisfies net.Conn interface func (s *Stream) RemoteAddr() net.Addr { if ts, ok := s.sess.conn.(interface { RemoteAddr() net.Addr }); ok { return ts.RemoteAddr() } return nil } // HandshakeFailure should be called when Server fail to create outbound proxy func (s *Stream) HandshakeFailure(err error) error { var once bool s.reportOnce.Do(func() { once = true }) if once && err != nil && s.sess.peerVersion >= 2 { f := newFrame(cmdSYNACK, s.id) f.data = []byte(err.Error()) if _, err := s.sess.writeFrame(f); err != nil { return err } } return nil } // HandshakeSuccess should be called when Server success to create outbound proxy func (s *Stream) HandshakeSuccess() error { var once bool s.reportOnce.Do(func() { once = true }) if once && s.sess.peerVersion >= 2 { if _, err := s.sess.writeFrame(newFrame(cmdSYNACK, s.id)); err != nil { return err } } return nil }