From 0238c54261c578ab50954e858d118b9bd36108c7 Mon Sep 17 00:00:00 2001 From: Sergei Maklagin Date: Sun, 8 Jun 2025 19:35:59 +0300 Subject: [PATCH] Add xhttp transport --- common/xray/buf/buffer.go | 337 ++++++++++++++++++ common/xray/buf/copy.go | 124 +++++++ common/xray/buf/io.go | 126 +++++++ common/xray/buf/multi_buffer.go | 310 +++++++++++++++++ common/xray/buf/override.go | 38 ++ common/xray/buf/reader.go | 175 ++++++++++ common/xray/buf/writer.go | 270 +++++++++++++++ common/xray/bytespool/pool.go | 72 ++++ common/xray/common.go | 19 + common/xray/crypto/crypto.go | 14 + common/xray/errors/errors.go | 25 ++ common/xray/interfaces.go | 52 +++ common/xray/json/badoption/range.go | 53 +++ common/xray/net/address.go | 181 ++++++++++ common/xray/net/destination.go | 146 ++++++++ common/xray/net/net.go | 13 + common/xray/net/network.go | 33 ++ common/xray/net/port.go | 55 +++ common/xray/net/system.go | 84 +++++ common/xray/pipe/impl.go | 215 ++++++++++++ common/xray/pipe/pipe.go | 53 +++ common/xray/pipe/reader.go | 41 +++ common/xray/pipe/writer.go | 29 ++ common/xray/serial/serial.go | 29 ++ common/xray/serial/string.go | 35 ++ common/xray/signal/done/done.go | 49 +++ common/xray/signal/notifier.go | 26 ++ common/xray/signal/pubsub/pubsub.go | 105 ++++++ common/xray/signal/semaphore/semaphore.go | 27 ++ common/xray/signal/timer.go | 82 +++++ common/xray/stat/connection.go | 34 ++ common/xray/stats/stats.go | 13 + common/xray/task/common.go | 10 + common/xray/task/periodic.go | 85 +++++ common/xray/task/task.go | 64 ++++ common/xray/uuid/uuid.go | 101 ++++++ constant/v2ray.go | 1 + go.mod | 8 +- go.sum | 8 +- option/v2ray_transport.go | 152 ++++++++ route/conn.go | 5 +- transport/v2ray/transport.go | 5 + transport/v2rayxhttp/client.go | 400 ++++++++++++++++++++++ transport/v2rayxhttp/conn.go | 108 ++++++ transport/v2rayxhttp/dialer.go | 192 +++++++++++ transport/v2rayxhttp/http.go | 41 +++ transport/v2rayxhttp/mux.go | 104 ++++++ transport/v2rayxhttp/server.go | 354 +++++++++++++++++++ transport/v2rayxhttp/upload_queue.go | 157 +++++++++ transport/v2rayxhttp/writer.go | 38 ++ 50 files changed, 4691 insertions(+), 7 deletions(-) create mode 100644 common/xray/buf/buffer.go create mode 100644 common/xray/buf/copy.go create mode 100644 common/xray/buf/io.go create mode 100644 common/xray/buf/multi_buffer.go create mode 100644 common/xray/buf/override.go create mode 100644 common/xray/buf/reader.go create mode 100644 common/xray/buf/writer.go create mode 100644 common/xray/bytespool/pool.go create mode 100644 common/xray/common.go create mode 100644 common/xray/crypto/crypto.go create mode 100644 common/xray/errors/errors.go create mode 100644 common/xray/interfaces.go create mode 100644 common/xray/json/badoption/range.go create mode 100644 common/xray/net/address.go create mode 100644 common/xray/net/destination.go create mode 100644 common/xray/net/net.go create mode 100644 common/xray/net/network.go create mode 100644 common/xray/net/port.go create mode 100644 common/xray/net/system.go create mode 100644 common/xray/pipe/impl.go create mode 100644 common/xray/pipe/pipe.go create mode 100644 common/xray/pipe/reader.go create mode 100644 common/xray/pipe/writer.go create mode 100644 common/xray/serial/serial.go create mode 100644 common/xray/serial/string.go create mode 100644 common/xray/signal/done/done.go create mode 100644 common/xray/signal/notifier.go create mode 100644 common/xray/signal/pubsub/pubsub.go create mode 100644 common/xray/signal/semaphore/semaphore.go create mode 100644 common/xray/signal/timer.go create mode 100644 common/xray/stat/connection.go create mode 100644 common/xray/stats/stats.go create mode 100644 common/xray/task/common.go create mode 100644 common/xray/task/periodic.go create mode 100644 common/xray/task/task.go create mode 100644 common/xray/uuid/uuid.go create mode 100644 transport/v2rayxhttp/client.go create mode 100644 transport/v2rayxhttp/conn.go create mode 100644 transport/v2rayxhttp/dialer.go create mode 100644 transport/v2rayxhttp/http.go create mode 100644 transport/v2rayxhttp/mux.go create mode 100644 transport/v2rayxhttp/server.go create mode 100644 transport/v2rayxhttp/upload_queue.go create mode 100644 transport/v2rayxhttp/writer.go diff --git a/common/xray/buf/buffer.go b/common/xray/buf/buffer.go new file mode 100644 index 00000000..1981fa5c --- /dev/null +++ b/common/xray/buf/buffer.go @@ -0,0 +1,337 @@ +package buf + +import ( + "io" + + "github.com/sagernet/sing-box/common/xray/bytespool" + "github.com/sagernet/sing-box/common/xray/net" + E "github.com/sagernet/sing/common/exceptions" +) + +const ( + // Size of a regular buffer. + Size = 8192 +) + +var zero = [Size * 10]byte{0} + +var pool = bytespool.GetPool(Size) + +// ownership represents the data owner of the buffer. +type ownership uint8 + +const ( + managed ownership = iota + unmanaged + bytespools +) + +// Buffer is a recyclable allocation of a byte array. Buffer.Release() recycles +// the buffer into an internal buffer pool, in order to recreate a buffer more +// quickly. +type Buffer struct { + v []byte + start int32 + end int32 + ownership ownership + UDP *net.Destination +} + +// New creates a Buffer with 0 length and 8K capacity, managed. +func New() *Buffer { + buf := pool.Get().([]byte) + if cap(buf) >= Size { + buf = buf[:Size] + } else { + buf = make([]byte, Size) + } + + return &Buffer{ + v: buf, + } +} + +// NewExisted creates a standard size Buffer with an existed bytearray, managed. +func NewExisted(b []byte) *Buffer { + if cap(b) < Size { + panic("Invalid buffer") + } + + oLen := len(b) + if oLen < Size { + b = b[:Size] + } + + return &Buffer{ + v: b, + end: int32(oLen), + } +} + +// FromBytes creates a Buffer with an existed bytearray, unmanaged. +func FromBytes(b []byte) *Buffer { + return &Buffer{ + v: b, + end: int32(len(b)), + ownership: unmanaged, + } +} + +// StackNew creates a new Buffer object on stack, managed. +// This method is for buffers that is released in the same function. +func StackNew() Buffer { + buf := pool.Get().([]byte) + if cap(buf) >= Size { + buf = buf[:Size] + } else { + buf = make([]byte, Size) + } + + return Buffer{ + v: buf, + } +} + +// NewWithSize creates a Buffer with 0 length and capacity with at least the given size, bytespool's. +func NewWithSize(size int32) *Buffer { + return &Buffer{ + v: bytespool.Alloc(size), + ownership: bytespools, + } +} + +// Release recycles the buffer into an internal buffer pool. +func (b *Buffer) Release() { + if b == nil || b.v == nil || b.ownership == unmanaged { + return + } + + p := b.v + b.v = nil + b.Clear() + + switch b.ownership { + case managed: + if cap(p) == Size { + pool.Put(p) + } + case bytespools: + bytespool.Free(p) + } + b.UDP = nil +} + +// Clear clears the content of the buffer, results an empty buffer with +// Len() = 0. +func (b *Buffer) Clear() { + b.start = 0 + b.end = 0 +} + +// Byte returns the bytes at index. +func (b *Buffer) Byte(index int32) byte { + return b.v[b.start+index] +} + +// SetByte sets the byte value at index. +func (b *Buffer) SetByte(index int32, value byte) { + b.v[b.start+index] = value +} + +// Bytes returns the content bytes of this Buffer. +func (b *Buffer) Bytes() []byte { + return b.v[b.start:b.end] +} + +// Extend increases the buffer size by n bytes, and returns the extended part. +// It panics if result size is larger than buf.Size. +func (b *Buffer) Extend(n int32) []byte { + end := b.end + n + if end > int32(len(b.v)) { + panic("extending out of bound") + } + ext := b.v[b.end:end] + b.end = end + copy(ext, zero[:]) + return ext +} + +// BytesRange returns a slice of this buffer with given from and to boundary. +func (b *Buffer) BytesRange(from, to int32) []byte { + if from < 0 { + from += b.Len() + } + if to < 0 { + to += b.Len() + } + return b.v[b.start+from : b.start+to] +} + +// BytesFrom returns a slice of this Buffer starting from the given position. +func (b *Buffer) BytesFrom(from int32) []byte { + if from < 0 { + from += b.Len() + } + return b.v[b.start+from : b.end] +} + +// BytesTo returns a slice of this Buffer from start to the given position. +func (b *Buffer) BytesTo(to int32) []byte { + if to < 0 { + to += b.Len() + } + if to < 0 { + to = 0 + } + return b.v[b.start : b.start+to] +} + +// Check makes sure that 0 <= b.start <= b.end. +func (b *Buffer) Check() { + if b.start < 0 { + b.start = 0 + } + if b.end < 0 { + b.end = 0 + } + if b.start > b.end { + b.start = b.end + } +} + +// Resize cuts the buffer at the given position. +func (b *Buffer) Resize(from, to int32) { + oldEnd := b.end + if from < 0 { + from += b.Len() + } + if to < 0 { + to += b.Len() + } + if to < from { + panic("Invalid slice") + } + b.end = b.start + to + b.start += from + b.Check() + if b.end > oldEnd { + copy(b.v[oldEnd:b.end], zero[:]) + } +} + +// Advance cuts the buffer at the given position. +func (b *Buffer) Advance(from int32) { + if from < 0 { + from += b.Len() + } + b.start += from + b.Check() +} + +// Len returns the length of the buffer content. +func (b *Buffer) Len() int32 { + if b == nil { + return 0 + } + return b.end - b.start +} + +// Cap returns the capacity of the buffer content. +func (b *Buffer) Cap() int32 { + if b == nil { + return 0 + } + return int32(len(b.v)) +} + +// IsEmpty returns true if the buffer is empty. +func (b *Buffer) IsEmpty() bool { + return b.Len() == 0 +} + +// IsFull returns true if the buffer has no more room to grow. +func (b *Buffer) IsFull() bool { + return b != nil && b.end == int32(len(b.v)) +} + +// Write implements Write method in io.Writer. +func (b *Buffer) Write(data []byte) (int, error) { + nBytes := copy(b.v[b.end:], data) + b.end += int32(nBytes) + return nBytes, nil +} + +// WriteByte writes a single byte into the buffer. +func (b *Buffer) WriteByte(v byte) error { + if b.IsFull() { + return E.New("buffer full") + } + b.v[b.end] = v + b.end++ + return nil +} + +// WriteString implements io.StringWriter. +func (b *Buffer) WriteString(s string) (int, error) { + return b.Write([]byte(s)) +} + +// ReadByte implements io.ByteReader +func (b *Buffer) ReadByte() (byte, error) { + if b.start == b.end { + return 0, io.EOF + } + + nb := b.v[b.start] + b.start++ + return nb, nil +} + +// ReadBytes implements bufio.Reader.ReadBytes +func (b *Buffer) ReadBytes(length int32) ([]byte, error) { + if b.end-b.start < length { + return nil, io.EOF + } + + nb := b.v[b.start : b.start+length] + b.start += length + return nb, nil +} + +// Read implements io.Reader.Read(). +func (b *Buffer) Read(data []byte) (int, error) { + if b.Len() == 0 { + return 0, io.EOF + } + nBytes := copy(data, b.v[b.start:b.end]) + if int32(nBytes) == b.Len() { + b.Clear() + } else { + b.start += int32(nBytes) + } + return nBytes, nil +} + +// ReadFrom implements io.ReaderFrom. +func (b *Buffer) ReadFrom(reader io.Reader) (int64, error) { + n, err := reader.Read(b.v[b.end:]) + b.end += int32(n) + return int64(n), err +} + +// ReadFullFrom reads exact size of bytes from given reader, or until error occurs. +func (b *Buffer) ReadFullFrom(reader io.Reader, size int32) (int64, error) { + end := b.end + size + if end > int32(len(b.v)) { + v := end + return 0, E.New("out of bound: ", v) + } + n, err := io.ReadFull(reader, b.v[b.end:end]) + b.end += int32(n) + return int64(n), err +} + +// String returns the string form of this Buffer. +func (b *Buffer) String() string { + return string(b.Bytes()) +} diff --git a/common/xray/buf/copy.go b/common/xray/buf/copy.go new file mode 100644 index 00000000..efab0e17 --- /dev/null +++ b/common/xray/buf/copy.go @@ -0,0 +1,124 @@ +package buf + +import ( + "io" + "time" + + "github.com/sagernet/sing-box/common/xray/errors" + "github.com/sagernet/sing-box/common/xray/signal" + E "github.com/sagernet/sing/common/exceptions" +) + +type dataHandler func(MultiBuffer) + +type copyHandler struct { + onData []dataHandler +} + +// SizeCounter is for counting bytes copied by Copy(). +type SizeCounter struct { + Size int64 +} + +// CopyOption is an option for copying data. +type CopyOption func(*copyHandler) + +// UpdateActivity is a CopyOption to update activity on each data copy operation. +func UpdateActivity(timer signal.ActivityUpdater) CopyOption { + return func(handler *copyHandler) { + handler.onData = append(handler.onData, func(MultiBuffer) { + timer.Update() + }) + } +} + +// CountSize is a CopyOption that sums the total size of data copied into the given SizeCounter. +func CountSize(sc *SizeCounter) CopyOption { + return func(handler *copyHandler) { + handler.onData = append(handler.onData, func(b MultiBuffer) { + sc.Size += int64(b.Len()) + }) + } +} + +type readError struct { + error +} + +func (e readError) Error() string { + return e.error.Error() +} + +func (e readError) Unwrap() error { + return e.error +} + +// IsReadError returns true if the error in Copy() comes from reading. +func IsReadError(err error) bool { + _, ok := err.(readError) + return ok +} + +type writeError struct { + error +} + +func (e writeError) Error() string { + return e.error.Error() +} + +func (e writeError) Unwrap() error { + return e.error +} + +// IsWriteError returns true if the error in Copy() comes from writing. +func IsWriteError(err error) bool { + _, ok := err.(writeError) + return ok +} + +func copyInternal(reader Reader, writer Writer, handler *copyHandler) error { + for { + buffer, err := reader.ReadMultiBuffer() + if !buffer.IsEmpty() { + for _, handler := range handler.onData { + handler(buffer) + } + + if werr := writer.WriteMultiBuffer(buffer); werr != nil { + return writeError{werr} + } + } + + if err != nil { + return readError{err} + } + } +} + +// Copy dumps all payload from reader to writer or stops when an error occurs. It returns nil when EOF. +func Copy(reader Reader, writer Writer, options ...CopyOption) error { + var handler copyHandler + for _, option := range options { + option(&handler) + } + err := copyInternal(reader, writer, &handler) + if err != nil && errors.Cause(err) != io.EOF { + return err + } + return nil +} + +var ErrNotTimeoutReader = E.New("not a TimeoutReader") + +func CopyOnceTimeout(reader Reader, writer Writer, timeout time.Duration) error { + timeoutReader, ok := reader.(TimeoutReader) + if !ok { + return ErrNotTimeoutReader + } + mb, err := timeoutReader.ReadMultiBufferTimeout(timeout) + if err != nil { + return err + } + return writer.WriteMultiBuffer(mb) +} diff --git a/common/xray/buf/io.go b/common/xray/buf/io.go new file mode 100644 index 00000000..a745c1b5 --- /dev/null +++ b/common/xray/buf/io.go @@ -0,0 +1,126 @@ +package buf + +import ( + "io" + "net" + "syscall" + "time" + + "github.com/sagernet/sing-box/common/xray/stat" + "github.com/sagernet/sing-box/common/xray/stats" + E "github.com/sagernet/sing/common/exceptions" +) + +// Reader extends io.Reader with MultiBuffer. +type Reader interface { + // ReadMultiBuffer reads content from underlying reader, and put it into a MultiBuffer. + ReadMultiBuffer() (MultiBuffer, error) +} + +// ErrReadTimeout is an error that happens with IO timeout. +var ErrReadTimeout = E.New("IO timeout") + +// TimeoutReader is a reader that returns error if Read() operation takes longer than the given timeout. +type TimeoutReader interface { + ReadMultiBufferTimeout(time.Duration) (MultiBuffer, error) +} + +// Writer extends io.Writer with MultiBuffer. +type Writer interface { + // WriteMultiBuffer writes a MultiBuffer into underlying writer. + WriteMultiBuffer(MultiBuffer) error +} + +// WriteAllBytes ensures all bytes are written into the given writer. +func WriteAllBytes(writer io.Writer, payload []byte, c stats.Counter) error { + wc := 0 + defer func() { + if c != nil { + c.Add(int64(wc)) + } + }() + + for len(payload) > 0 { + n, err := writer.Write(payload) + wc += n + if err != nil { + return err + } + payload = payload[n:] + } + return nil +} + +func isPacketReader(reader io.Reader) bool { + _, ok := reader.(net.PacketConn) + return ok +} + +// NewReader creates a new Reader. +// The Reader instance doesn't take the ownership of reader. +func NewReader(reader io.Reader) Reader { + if mr, ok := reader.(Reader); ok { + return mr + } + + if isPacketReader(reader) { + return &PacketReader{ + Reader: reader, + } + } + + return &SingleReader{ + Reader: reader, + } +} + +// NewPacketReader creates a new PacketReader based on the given reader. +func NewPacketReader(reader io.Reader) Reader { + if mr, ok := reader.(Reader); ok { + return mr + } + + return &PacketReader{ + Reader: reader, + } +} + +func isPacketWriter(writer io.Writer) bool { + if _, ok := writer.(net.PacketConn); ok { + return true + } + + // If the writer doesn't implement syscall.Conn, it is probably not a TCP connection. + if _, ok := writer.(syscall.Conn); !ok { + return true + } + return false +} + +// NewWriter creates a new Writer. +func NewWriter(writer io.Writer) Writer { + if mw, ok := writer.(Writer); ok { + return mw + } + + iConn := writer + if statConn, ok := writer.(*stat.CounterConnection); ok { + iConn = statConn.Connection + } + + if isPacketWriter(iConn) { + return &SequentialWriter{ + Writer: writer, + } + } + + var counter stats.Counter + + if statConn, ok := writer.(*stat.CounterConnection); ok { + counter = statConn.WriteCounter + } + return &BufferToBytesWriter{ + Writer: iConn, + counter: counter, + } +} diff --git a/common/xray/buf/multi_buffer.go b/common/xray/buf/multi_buffer.go new file mode 100644 index 00000000..4d00843a --- /dev/null +++ b/common/xray/buf/multi_buffer.go @@ -0,0 +1,310 @@ +package buf + +import ( + "io" + + "github.com/sagernet/sing-box/common/xray" + "github.com/sagernet/sing-box/common/xray/errors" + "github.com/sagernet/sing-box/common/xray/serial" +) + +// ReadAllToBytes reads all content from the reader into a byte array, until EOF. +func ReadAllToBytes(reader io.Reader) ([]byte, error) { + mb, err := ReadFrom(reader) + if err != nil { + return nil, err + } + if mb.Len() == 0 { + return nil, nil + } + b := make([]byte, mb.Len()) + mb, _ = SplitBytes(mb, b) + ReleaseMulti(mb) + return b, nil +} + +// MultiBuffer is a list of Buffers. The order of Buffer matters. +type MultiBuffer []*Buffer + +// MergeMulti merges content from src to dest, and returns the new address of dest and src +func MergeMulti(dest MultiBuffer, src MultiBuffer) (MultiBuffer, MultiBuffer) { + dest = append(dest, src...) + for idx := range src { + src[idx] = nil + } + return dest, src[:0] +} + +// MergeBytes merges the given bytes into MultiBuffer and return the new address of the merged MultiBuffer. +func MergeBytes(dest MultiBuffer, src []byte) MultiBuffer { + n := len(dest) + if n > 0 && !(dest)[n-1].IsFull() { + nBytes, _ := (dest)[n-1].Write(src) + src = src[nBytes:] + } + + for len(src) > 0 { + b := New() + nBytes, _ := b.Write(src) + src = src[nBytes:] + dest = append(dest, b) + } + + return dest +} + +// ReleaseMulti releases all content of the MultiBuffer, and returns an empty MultiBuffer. +func ReleaseMulti(mb MultiBuffer) MultiBuffer { + for i := range mb { + mb[i].Release() + mb[i] = nil + } + return mb[:0] +} + +// Copy copied the beginning part of the MultiBuffer into the given byte array. +func (mb MultiBuffer) Copy(b []byte) int { + total := 0 + for _, bb := range mb { + nBytes := copy(b[total:], bb.Bytes()) + total += nBytes + if int32(nBytes) < bb.Len() { + break + } + } + return total +} + +// ReadFrom reads all content from reader until EOF. +func ReadFrom(reader io.Reader) (MultiBuffer, error) { + mb := make(MultiBuffer, 0, 16) + for { + b := New() + _, err := b.ReadFullFrom(reader, Size) + if b.IsEmpty() { + b.Release() + } else { + mb = append(mb, b) + } + if err != nil { + if errors.Cause(err) == io.EOF || errors.Cause(err) == io.ErrUnexpectedEOF { + return mb, nil + } + return mb, err + } + } +} + +// SplitBytes splits the given amount of bytes from the beginning of the MultiBuffer. +// It returns the new address of MultiBuffer leftover, and number of bytes written into the input byte slice. +func SplitBytes(mb MultiBuffer, b []byte) (MultiBuffer, int) { + totalBytes := 0 + endIndex := -1 + for i := range mb { + pBuffer := mb[i] + nBytes, _ := pBuffer.Read(b) + totalBytes += nBytes + b = b[nBytes:] + if !pBuffer.IsEmpty() { + endIndex = i + break + } + pBuffer.Release() + mb[i] = nil + } + + if endIndex == -1 { + mb = mb[:0] + } else { + mb = mb[endIndex:] + } + + return mb, totalBytes +} + +// SplitFirstBytes splits the first buffer from MultiBuffer, and then copy its content into the given slice. +func SplitFirstBytes(mb MultiBuffer, p []byte) (MultiBuffer, int) { + mb, b := SplitFirst(mb) + if b == nil { + return mb, 0 + } + n := copy(p, b.Bytes()) + b.Release() + return mb, n +} + +// Compact returns another MultiBuffer by merging all content of the given one together. +func Compact(mb MultiBuffer) MultiBuffer { + if len(mb) == 0 { + return mb + } + + mb2 := make(MultiBuffer, 0, len(mb)) + last := mb[0] + + for i := 1; i < len(mb); i++ { + curr := mb[i] + if last.Len()+curr.Len() > Size { + mb2 = append(mb2, last) + last = curr + } else { + common.Must2(last.ReadFrom(curr)) + curr.Release() + } + } + + mb2 = append(mb2, last) + return mb2 +} + +// SplitFirst splits the first Buffer from the beginning of the MultiBuffer. +func SplitFirst(mb MultiBuffer) (MultiBuffer, *Buffer) { + if len(mb) == 0 { + return mb, nil + } + + b := mb[0] + mb[0] = nil + mb = mb[1:] + return mb, b +} + +// SplitSize splits the beginning of the MultiBuffer into another one, for at most size bytes. +func SplitSize(mb MultiBuffer, size int32) (MultiBuffer, MultiBuffer) { + if len(mb) == 0 { + return mb, nil + } + + if mb[0].Len() > size { + b := New() + copy(b.Extend(size), mb[0].BytesTo(size)) + mb[0].Advance(size) + return mb, MultiBuffer{b} + } + + totalBytes := int32(0) + var r MultiBuffer + endIndex := -1 + for i := range mb { + if totalBytes+mb[i].Len() > size { + endIndex = i + break + } + totalBytes += mb[i].Len() + r = append(r, mb[i]) + mb[i] = nil + } + if endIndex == -1 { + // To reuse mb array + mb = mb[:0] + } else { + mb = mb[endIndex:] + } + return mb, r +} + +// SplitMulti splits the beginning of the MultiBuffer into first one, the index i and after into second one +func SplitMulti(mb MultiBuffer, i int) (MultiBuffer, MultiBuffer) { + mb2 := make(MultiBuffer, 0, len(mb)) + if i < len(mb) && i >= 0 { + mb2 = append(mb2, mb[i:]...) + for j := i; j < len(mb); j++ { + mb[j] = nil + } + mb = mb[:i] + } + return mb, mb2 +} + +// WriteMultiBuffer writes all buffers from the MultiBuffer to the Writer one by one, and return error if any, with leftover MultiBuffer. +func WriteMultiBuffer(writer io.Writer, mb MultiBuffer) (MultiBuffer, error) { + for { + mb2, b := SplitFirst(mb) + mb = mb2 + if b == nil { + break + } + + _, err := writer.Write(b.Bytes()) + b.Release() + if err != nil { + return mb, err + } + } + + return nil, nil +} + +// Len returns the total number of bytes in the MultiBuffer. +func (mb MultiBuffer) Len() int32 { + if mb == nil { + return 0 + } + + size := int32(0) + for _, b := range mb { + size += b.Len() + } + return size +} + +// IsEmpty returns true if the MultiBuffer has no content. +func (mb MultiBuffer) IsEmpty() bool { + for _, b := range mb { + if !b.IsEmpty() { + return false + } + } + return true +} + +// String returns the content of the MultiBuffer in string. +func (mb MultiBuffer) String() string { + v := make([]interface{}, len(mb)) + for i, b := range mb { + v[i] = b + } + return serial.Concat(v...) +} + +// MultiBufferContainer is a ReadWriteCloser wrapper over MultiBuffer. +type MultiBufferContainer struct { + MultiBuffer +} + +// Read implements io.Reader. +func (c *MultiBufferContainer) Read(b []byte) (int, error) { + if c.MultiBuffer.IsEmpty() { + return 0, io.EOF + } + + mb, nBytes := SplitBytes(c.MultiBuffer, b) + c.MultiBuffer = mb + return nBytes, nil +} + +// ReadMultiBuffer implements Reader. +func (c *MultiBufferContainer) ReadMultiBuffer() (MultiBuffer, error) { + mb := c.MultiBuffer + c.MultiBuffer = nil + return mb, nil +} + +// Write implements io.Writer. +func (c *MultiBufferContainer) Write(b []byte) (int, error) { + c.MultiBuffer = MergeBytes(c.MultiBuffer, b) + return len(b), nil +} + +// WriteMultiBuffer implements Writer. +func (c *MultiBufferContainer) WriteMultiBuffer(b MultiBuffer) error { + mb, _ := MergeMulti(c.MultiBuffer, b) + c.MultiBuffer = mb + return nil +} + +// Close implements io.Closer. +func (c *MultiBufferContainer) Close() error { + c.MultiBuffer = ReleaseMulti(c.MultiBuffer) + return nil +} diff --git a/common/xray/buf/override.go b/common/xray/buf/override.go new file mode 100644 index 00000000..2032854f --- /dev/null +++ b/common/xray/buf/override.go @@ -0,0 +1,38 @@ +package buf + +import ( + "github.com/sagernet/sing-box/common/xray/net" +) + +type EndpointOverrideReader struct { + Reader + Dest net.Address + OriginalDest net.Address +} + +func (r *EndpointOverrideReader) ReadMultiBuffer() (MultiBuffer, error) { + mb, err := r.Reader.ReadMultiBuffer() + if err == nil { + for _, b := range mb { + if b.UDP != nil && b.UDP.Address == r.OriginalDest { + b.UDP.Address = r.Dest + } + } + } + return mb, err +} + +type EndpointOverrideWriter struct { + Writer + Dest net.Address + OriginalDest net.Address +} + +func (w *EndpointOverrideWriter) WriteMultiBuffer(mb MultiBuffer) error { + for _, b := range mb { + if b.UDP != nil && b.UDP.Address == w.Dest { + b.UDP.Address = w.OriginalDest + } + } + return w.Writer.WriteMultiBuffer(mb) +} diff --git a/common/xray/buf/reader.go b/common/xray/buf/reader.go new file mode 100644 index 00000000..17b2e491 --- /dev/null +++ b/common/xray/buf/reader.go @@ -0,0 +1,175 @@ +package buf + +import ( + "io" + + "github.com/sagernet/sing-box/common/xray" + "github.com/sagernet/sing-box/common/xray/errors" + E "github.com/sagernet/sing/common/exceptions" +) + +func readOneUDP(r io.Reader) (*Buffer, error) { + b := New() + for i := 0; i < 64; i++ { + _, err := b.ReadFrom(r) + if !b.IsEmpty() { + return b, nil + } + if err != nil { + b.Release() + return nil, err + } + } + + b.Release() + return nil, E.New("Reader returns too many empty payloads.") +} + +// ReadBuffer reads a Buffer from the given reader. +func ReadBuffer(r io.Reader) (*Buffer, error) { + b := New() + n, err := b.ReadFrom(r) + if n > 0 { + return b, err + } + b.Release() + return nil, err +} + +// BufferedReader is a Reader that keeps its internal buffer. +type BufferedReader struct { + // Reader is the underlying reader to be read from + Reader Reader + // Buffer is the internal buffer to be read from first + Buffer MultiBuffer + // Splitter is a function to read bytes from MultiBuffer + Splitter func(MultiBuffer, []byte) (MultiBuffer, int) +} + +// BufferedBytes returns the number of bytes that is cached in this reader. +func (r *BufferedReader) BufferedBytes() int32 { + return r.Buffer.Len() +} + +// ReadByte implements io.ByteReader. +func (r *BufferedReader) ReadByte() (byte, error) { + var b [1]byte + _, err := r.Read(b[:]) + return b[0], err +} + +// Read implements io.Reader. It reads from internal buffer first (if available) and then reads from the underlying reader. +func (r *BufferedReader) Read(b []byte) (int, error) { + spliter := r.Splitter + if spliter == nil { + spliter = SplitBytes + } + + if !r.Buffer.IsEmpty() { + buffer, nBytes := spliter(r.Buffer, b) + r.Buffer = buffer + if r.Buffer.IsEmpty() { + r.Buffer = nil + } + return nBytes, nil + } + + mb, err := r.Reader.ReadMultiBuffer() + if err != nil { + return 0, err + } + + mb, nBytes := spliter(mb, b) + if !mb.IsEmpty() { + r.Buffer = mb + } + return nBytes, nil +} + +// ReadMultiBuffer implements Reader. +func (r *BufferedReader) ReadMultiBuffer() (MultiBuffer, error) { + if !r.Buffer.IsEmpty() { + mb := r.Buffer + r.Buffer = nil + return mb, nil + } + + return r.Reader.ReadMultiBuffer() +} + +// ReadAtMost returns a MultiBuffer with at most size. +func (r *BufferedReader) ReadAtMost(size int32) (MultiBuffer, error) { + if r.Buffer.IsEmpty() { + mb, err := r.Reader.ReadMultiBuffer() + if mb.IsEmpty() && err != nil { + return nil, err + } + r.Buffer = mb + } + + rb, mb := SplitSize(r.Buffer, size) + r.Buffer = rb + if r.Buffer.IsEmpty() { + r.Buffer = nil + } + return mb, nil +} + +func (r *BufferedReader) writeToInternal(writer io.Writer) (int64, error) { + mbWriter := NewWriter(writer) + var sc SizeCounter + if r.Buffer != nil { + sc.Size = int64(r.Buffer.Len()) + if err := mbWriter.WriteMultiBuffer(r.Buffer); err != nil { + return 0, err + } + r.Buffer = nil + } + + err := Copy(r.Reader, mbWriter, CountSize(&sc)) + return sc.Size, err +} + +// WriteTo implements io.WriterTo. +func (r *BufferedReader) WriteTo(writer io.Writer) (int64, error) { + nBytes, err := r.writeToInternal(writer) + if errors.Cause(err) == io.EOF { + return nBytes, nil + } + return nBytes, err +} + +// Interrupt implements common.Interruptible. +func (r *BufferedReader) Interrupt() { + common.Interrupt(r.Reader) +} + +// Close implements io.Closer. +func (r *BufferedReader) Close() error { + return common.Close(r.Reader) +} + +// SingleReader is a Reader that read one Buffer every time. +type SingleReader struct { + io.Reader +} + +// ReadMultiBuffer implements Reader. +func (r *SingleReader) ReadMultiBuffer() (MultiBuffer, error) { + b, err := ReadBuffer(r.Reader) + return MultiBuffer{b}, err +} + +// PacketReader is a Reader that read one Buffer every time. +type PacketReader struct { + io.Reader +} + +// ReadMultiBuffer implements Reader. +func (r *PacketReader) ReadMultiBuffer() (MultiBuffer, error) { + b, err := readOneUDP(r.Reader) + if err != nil { + return nil, err + } + return MultiBuffer{b}, nil +} diff --git a/common/xray/buf/writer.go b/common/xray/buf/writer.go new file mode 100644 index 00000000..96aea75f --- /dev/null +++ b/common/xray/buf/writer.go @@ -0,0 +1,270 @@ +package buf + +import ( + "io" + "net" + "sync" + + "github.com/sagernet/sing-box/common/xray" + "github.com/sagernet/sing-box/common/xray/errors" + "github.com/sagernet/sing-box/common/xray/stats" +) + +// BufferToBytesWriter is a Writer that writes alloc.Buffer into underlying writer. +type BufferToBytesWriter struct { + io.Writer + + counter stats.Counter + cache [][]byte +} + +// WriteMultiBuffer implements Writer. This method takes ownership of the given buffer. +func (w *BufferToBytesWriter) WriteMultiBuffer(mb MultiBuffer) error { + defer ReleaseMulti(mb) + + size := mb.Len() + if size == 0 { + return nil + } + + if len(mb) == 1 { + return WriteAllBytes(w.Writer, mb[0].Bytes(), w.counter) + } + + if cap(w.cache) < len(mb) { + w.cache = make([][]byte, 0, len(mb)) + } + + bs := w.cache + for _, b := range mb { + bs = append(bs, b.Bytes()) + } + + defer func() { + for idx := range bs { + bs[idx] = nil + } + }() + + nb := net.Buffers(bs) + wc := int64(0) + defer func() { + if w.counter != nil { + w.counter.Add(wc) + } + }() + for size > 0 { + n, err := nb.WriteTo(w.Writer) + wc += n + if err != nil { + return err + } + size -= int32(n) + } + + return nil +} + +// ReadFrom implements io.ReaderFrom. +func (w *BufferToBytesWriter) ReadFrom(reader io.Reader) (int64, error) { + var sc SizeCounter + err := Copy(NewReader(reader), w, CountSize(&sc)) + return sc.Size, err +} + +// BufferedWriter is a Writer with internal buffer. +type BufferedWriter struct { + sync.Mutex + writer Writer + buffer *Buffer + buffered bool +} + +// NewBufferedWriter creates a new BufferedWriter. +func NewBufferedWriter(writer Writer) *BufferedWriter { + return &BufferedWriter{ + writer: writer, + buffer: New(), + buffered: true, + } +} + +// WriteByte implements io.ByteWriter. +func (w *BufferedWriter) WriteByte(c byte) error { + return common.Error2(w.Write([]byte{c})) +} + +// Write implements io.Writer. +func (w *BufferedWriter) Write(b []byte) (int, error) { + if len(b) == 0 { + return 0, nil + } + + w.Lock() + defer w.Unlock() + + if !w.buffered { + if writer, ok := w.writer.(io.Writer); ok { + return writer.Write(b) + } + } + + totalBytes := 0 + for len(b) > 0 { + if w.buffer == nil { + w.buffer = New() + } + + nBytes, err := w.buffer.Write(b) + totalBytes += nBytes + if err != nil { + return totalBytes, err + } + if !w.buffered || w.buffer.IsFull() { + if err := w.flushInternal(); err != nil { + return totalBytes, err + } + } + b = b[nBytes:] + } + + return totalBytes, nil +} + +// WriteMultiBuffer implements Writer. It takes ownership of the given MultiBuffer. +func (w *BufferedWriter) WriteMultiBuffer(b MultiBuffer) error { + if b.IsEmpty() { + return nil + } + + w.Lock() + defer w.Unlock() + + if !w.buffered { + return w.writer.WriteMultiBuffer(b) + } + + reader := MultiBufferContainer{ + MultiBuffer: b, + } + defer reader.Close() + + for !reader.MultiBuffer.IsEmpty() { + if w.buffer == nil { + w.buffer = New() + } + common.Must2(w.buffer.ReadFrom(&reader)) + if w.buffer.IsFull() { + if err := w.flushInternal(); err != nil { + return err + } + } + } + + return nil +} + +// Flush flushes buffered content into underlying writer. +func (w *BufferedWriter) Flush() error { + w.Lock() + defer w.Unlock() + + return w.flushInternal() +} + +func (w *BufferedWriter) flushInternal() error { + if w.buffer.IsEmpty() { + return nil + } + + b := w.buffer + w.buffer = nil + + if writer, ok := w.writer.(io.Writer); ok { + err := WriteAllBytes(writer, b.Bytes(), nil) + b.Release() + return err + } + + return w.writer.WriteMultiBuffer(MultiBuffer{b}) +} + +// SetBuffered sets whether the internal buffer is used. If set to false, Flush() will be called to clear the buffer. +func (w *BufferedWriter) SetBuffered(f bool) error { + w.Lock() + defer w.Unlock() + + w.buffered = f + if !f { + return w.flushInternal() + } + return nil +} + +// ReadFrom implements io.ReaderFrom. +func (w *BufferedWriter) ReadFrom(reader io.Reader) (int64, error) { + if err := w.SetBuffered(false); err != nil { + return 0, err + } + + var sc SizeCounter + err := Copy(NewReader(reader), w, CountSize(&sc)) + return sc.Size, err +} + +// Close implements io.Closable. +func (w *BufferedWriter) Close() error { + if err := w.Flush(); err != nil { + return err + } + return common.Close(w.writer) +} + +// SequentialWriter is a Writer that writes MultiBuffer sequentially into the underlying io.Writer. +type SequentialWriter struct { + io.Writer +} + +// WriteMultiBuffer implements Writer. +func (w *SequentialWriter) WriteMultiBuffer(mb MultiBuffer) error { + mb, err := WriteMultiBuffer(w.Writer, mb) + ReleaseMulti(mb) + return err +} + +type noOpWriter byte + +func (noOpWriter) WriteMultiBuffer(b MultiBuffer) error { + ReleaseMulti(b) + return nil +} + +func (noOpWriter) Write(b []byte) (int, error) { + return len(b), nil +} + +func (noOpWriter) ReadFrom(reader io.Reader) (int64, error) { + b := New() + defer b.Release() + + totalBytes := int64(0) + for { + b.Clear() + _, err := b.ReadFrom(reader) + totalBytes += int64(b.Len()) + if err != nil { + if errors.Cause(err) == io.EOF { + return totalBytes, nil + } + return totalBytes, err + } + } +} + +var ( + // Discard is a Writer that swallows all contents written in. + Discard Writer = noOpWriter(0) + + // DiscardBytes is an io.Writer that swallows all contents written in. + DiscardBytes io.Writer = noOpWriter(0) +) diff --git a/common/xray/bytespool/pool.go b/common/xray/bytespool/pool.go new file mode 100644 index 00000000..6f632d52 --- /dev/null +++ b/common/xray/bytespool/pool.go @@ -0,0 +1,72 @@ +package bytespool + +import "sync" + +func createAllocFunc(size int32) func() interface{} { + return func() interface{} { + return make([]byte, size) + } +} + +// The following parameters controls the size of buffer pools. +// There are numPools pools. Starting from 2k size, the size of each pool is sizeMulti of the previous one. +// Package buf is guaranteed to not use buffers larger than the largest pool. +// Other packets may use larger buffers. +const ( + numPools = 4 + sizeMulti = 4 +) + +var ( + pool [numPools]sync.Pool + poolSize [numPools]int32 +) + +func init() { + size := int32(2048) + for i := 0; i < numPools; i++ { + pool[i] = sync.Pool{ + New: createAllocFunc(size), + } + poolSize[i] = size + size *= sizeMulti + } +} + +// GetPool returns a sync.Pool that generates bytes array with at least the given size. +// It may return nil if no such pool exists. +// +// xray:api:stable +func GetPool(size int32) *sync.Pool { + for idx, ps := range poolSize { + if size <= ps { + return &pool[idx] + } + } + return nil +} + +// Alloc returns a byte slice with at least the given size. Minimum size of returned slice is 2048. +// +// xray:api:stable +func Alloc(size int32) []byte { + pool := GetPool(size) + if pool != nil { + return pool.Get().([]byte) + } + return make([]byte, size) +} + +// Free puts a byte slice into the internal pool. +// +// xray:api:stable +func Free(b []byte) { + size := int32(cap(b)) + b = b[0:cap(b)] + for i := numPools - 1; i >= 0; i-- { + if size >= poolSize[i] { + pool[i].Put(b) + return + } + } +} diff --git a/common/xray/common.go b/common/xray/common.go new file mode 100644 index 00000000..8d5c858d --- /dev/null +++ b/common/xray/common.go @@ -0,0 +1,19 @@ +package common + +// Must panics if err is not nil. +func Must(err error) { + if err != nil { + panic(err) + } +} + +// Must2 panics if the second parameter is not nil, otherwise returns the first parameter. +func Must2(v interface{}, err error) interface{} { + Must(err) + return v +} + +// Error2 returns the err from the 2nd parameter. +func Error2(v interface{}, err error) error { + return err +} diff --git a/common/xray/crypto/crypto.go b/common/xray/crypto/crypto.go new file mode 100644 index 00000000..24c5adb8 --- /dev/null +++ b/common/xray/crypto/crypto.go @@ -0,0 +1,14 @@ +package crypto + +import ( + "crypto/rand" + "math/big" +) + +func RandBetween(from int64, to int64) int64 { + if from == to { + return from + } + bigInt, _ := rand.Int(rand.Reader, big.NewInt(to-from)) + return from + bigInt.Int64() +} diff --git a/common/xray/errors/errors.go b/common/xray/errors/errors.go new file mode 100644 index 00000000..54af7cbe --- /dev/null +++ b/common/xray/errors/errors.go @@ -0,0 +1,25 @@ +package errors + +type hasInnerError interface { + // Unwrap returns the underlying error of this one. + Unwrap() error +} + +func Cause(err error) error { + if err == nil { + return nil + } +L: + for { + switch inner := err.(type) { + case hasInnerError: + if inner.Unwrap() == nil { + break L + } + err = inner.Unwrap() + default: + break L + } + } + return err +} diff --git a/common/xray/interfaces.go b/common/xray/interfaces.go new file mode 100644 index 00000000..428c3388 --- /dev/null +++ b/common/xray/interfaces.go @@ -0,0 +1,52 @@ +package common + +// Closable is the interface for objects that can release its resources. +// +// xray:api:beta +type Closable interface { + // Close release all resources used by this object, including goroutines. + Close() error +} + +// Interruptible is an interface for objects that can be stopped before its completion. +// +// xray:api:beta +type Interruptible interface { + Interrupt() +} + +// Close closes the obj if it is a Closable. +// +// xray:api:beta +func Close(obj interface{}) error { + if c, ok := obj.(Closable); ok { + return c.Close() + } + return nil +} + +// Interrupt calls Interrupt() if object implements Interruptible interface, or Close() if the object implements Closable interface. +// +// xray:api:beta +func Interrupt(obj interface{}) error { + if c, ok := obj.(Interruptible); ok { + c.Interrupt() + return nil + } + return Close(obj) +} + +// Runnable is the interface for objects that can start to work and stop on demand. +type Runnable interface { + // Start starts the runnable object. Upon the method returning nil, the object begins to function properly. + Start() error + + Closable +} + +// HasType is the interface for objects that knows its type. +type HasType interface { + // Type returns the type of the object. + // Usually it returns (*Type)(nil) of the object. + Type() interface{} +} diff --git a/common/xray/json/badoption/range.go b/common/xray/json/badoption/range.go new file mode 100644 index 00000000..6390a158 --- /dev/null +++ b/common/xray/json/badoption/range.go @@ -0,0 +1,53 @@ +package badoption + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + + "github.com/sagernet/sing-box/common/xray/crypto" + E "github.com/sagernet/sing/common/exceptions" +) + +type Range struct { + From int32 `json:"from"` + To int32 `json:"to"` +} + +func (c *Range) Build() *Range { + return (*Range)(c) +} + +func (c *Range) MarshalJSON() ([]byte, error) { + return json.Marshal(fmt.Sprintf("%d-%d", c.From, c.To)) +} + +func (c *Range) UnmarshalJSON(content []byte) error { + var stringValue string + err := json.Unmarshal(content, &stringValue) + if err != nil { + return err + } + parts := strings.Split(stringValue, "-") + if len(parts) != 2 { + return E.New("invalid length of range parts") + } + from, err := strconv.ParseInt(parts[0], 10, 32) + if err != nil { + return err + } + to, err := strconv.ParseInt(parts[1], 10, 32) + if err != nil { + return err + } + if from > to { + return E.New("invalid range") + } + *c = Range{int32(from), int32(to)} + return nil +} + +func (c Range) Rand() int32 { + return int32(crypto.RandBetween(int64(c.From), int64(c.To))) +} diff --git a/common/xray/net/address.go b/common/xray/net/address.go new file mode 100644 index 00000000..7a99c8ad --- /dev/null +++ b/common/xray/net/address.go @@ -0,0 +1,181 @@ +package net + +import ( + "bytes" + "net" + "strings" +) + +var ( + // LocalHostIP is a constant value for localhost IP in IPv4. + LocalHostIP = IPAddress([]byte{127, 0, 0, 1}) + + // AnyIP is a constant value for any IP in IPv4. + AnyIP = IPAddress([]byte{0, 0, 0, 0}) + + // LocalHostDomain is a constant value for localhost domain. + LocalHostDomain = DomainAddress("localhost") + + // LocalHostIPv6 is a constant value for localhost IP in IPv6. + LocalHostIPv6 = IPAddress([]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}) + + // AnyIPv6 is a constant value for any IP in IPv6. + AnyIPv6 = IPAddress([]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}) +) + +// AddressFamily is the type of address. +type AddressFamily byte + +const ( + // AddressFamilyIPv4 represents address as IPv4 + AddressFamilyIPv4 = AddressFamily(0) + + // AddressFamilyIPv6 represents address as IPv6 + AddressFamilyIPv6 = AddressFamily(1) + + // AddressFamilyDomain represents address as Domain + AddressFamilyDomain = AddressFamily(2) +) + +// IsIPv4 returns true if current AddressFamily is IPv4. +func (af AddressFamily) IsIPv4() bool { + return af == AddressFamilyIPv4 +} + +// IsIPv6 returns true if current AddressFamily is IPv6. +func (af AddressFamily) IsIPv6() bool { + return af == AddressFamilyIPv6 +} + +// IsIP returns true if current AddressFamily is IPv6 or IPv4. +func (af AddressFamily) IsIP() bool { + return af == AddressFamilyIPv4 || af == AddressFamilyIPv6 +} + +// IsDomain returns true if current AddressFamily is Domain. +func (af AddressFamily) IsDomain() bool { + return af == AddressFamilyDomain +} + +// Address represents a network address to be communicated with. It may be an IP address or domain +// address, not both. This interface doesn't resolve IP address for a given domain. +type Address interface { + IP() net.IP // IP of this Address + Domain() string // Domain of this Address + Family() AddressFamily + + String() string // String representation of this Address +} + +func isAlphaNum(c byte) bool { + return (c >= '0' && c <= '9') || (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') +} + +// ParseAddress parses a string into an Address. The return value will be an IPAddress when +// the string is in the form of IPv4 or IPv6 address, or a DomainAddress otherwise. +func ParseAddress(addr string) Address { + // Handle IPv6 address in form as "[2001:4860:0:2001::68]" + lenAddr := len(addr) + if lenAddr > 0 && addr[0] == '[' && addr[lenAddr-1] == ']' { + addr = addr[1 : lenAddr-1] + lenAddr -= 2 + } + + if lenAddr > 0 && (!isAlphaNum(addr[0]) || !isAlphaNum(addr[len(addr)-1])) { + addr = strings.TrimSpace(addr) + } + + ip := net.ParseIP(addr) + if ip != nil { + return IPAddress(ip) + } + return DomainAddress(addr) +} + +var bytes0 = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + +// IPAddress creates an Address with given IP. +func IPAddress(ip []byte) Address { + switch len(ip) { + case net.IPv4len: + var addr ipv4Address = [4]byte{ip[0], ip[1], ip[2], ip[3]} + return addr + case net.IPv6len: + if bytes.Equal(ip[:10], bytes0) && ip[10] == 0xff && ip[11] == 0xff { + return IPAddress(ip[12:16]) + } + var addr ipv6Address = [16]byte{ + ip[0], ip[1], ip[2], ip[3], + ip[4], ip[5], ip[6], ip[7], + ip[8], ip[9], ip[10], ip[11], + ip[12], ip[13], ip[14], ip[15], + } + return addr + default: + return nil + } +} + +// DomainAddress creates an Address with given domain. +// This is an internal function that forcibly converts a string to domain. +// It's mainly used in test files and mux. +// Unless you have a specific reason, use net.ParseAddress instead, +// as this function does not check whether the input is an IP address. +// Otherwise, you will get strange results like domain: 1.1.1.1 +func DomainAddress(domain string) Address { + return domainAddress(domain) +} + +type ipv4Address [4]byte + +func (a ipv4Address) IP() net.IP { + return net.IP(a[:]) +} + +func (ipv4Address) Domain() string { + panic("Calling Domain() on an IPv4Address.") +} + +func (ipv4Address) Family() AddressFamily { + return AddressFamilyIPv4 +} + +func (a ipv4Address) String() string { + return a.IP().String() +} + +type ipv6Address [16]byte + +func (a ipv6Address) IP() net.IP { + return net.IP(a[:]) +} + +func (ipv6Address) Domain() string { + panic("Calling Domain() on an IPv6Address.") +} + +func (ipv6Address) Family() AddressFamily { + return AddressFamilyIPv6 +} + +func (a ipv6Address) String() string { + return "[" + a.IP().String() + "]" +} + +type domainAddress string + +func (domainAddress) IP() net.IP { + panic("Calling IP() on a DomainAddress.") +} + +func (a domainAddress) Domain() string { + return string(a) +} + +func (domainAddress) Family() AddressFamily { + return AddressFamilyDomain +} + +func (a domainAddress) String() string { + return a.Domain() +} diff --git a/common/xray/net/destination.go b/common/xray/net/destination.go new file mode 100644 index 00000000..561c7726 --- /dev/null +++ b/common/xray/net/destination.go @@ -0,0 +1,146 @@ +package net + +import ( + "net" + "strings" +) + +// Destination represents a network destination including address and protocol (tcp / udp). +type Destination struct { + Address Address + Port Port + Network Network +} + +// DestinationFromAddr generates a Destination from a net address. +func DestinationFromAddr(addr net.Addr) Destination { + switch addr := addr.(type) { + case *net.TCPAddr: + return TCPDestination(IPAddress(addr.IP), Port(addr.Port)) + case *net.UDPAddr: + return UDPDestination(IPAddress(addr.IP), Port(addr.Port)) + case *net.UnixAddr: + return UnixDestination(DomainAddress(addr.Name)) + default: + panic("Net: Unknown address type.") + } +} + +// ParseDestination converts a destination from its string presentation. +func ParseDestination(dest string) (Destination, error) { + d := Destination{ + Address: AnyIP, + Port: Port(0), + } + if strings.HasPrefix(dest, "tcp:") { + d.Network = Network_TCP + dest = dest[4:] + } else if strings.HasPrefix(dest, "udp:") { + d.Network = Network_UDP + dest = dest[4:] + } else if strings.HasPrefix(dest, "unix:") { + d = UnixDestination(DomainAddress(dest[5:])) + return d, nil + } + + hstr, pstr, err := SplitHostPort(dest) + if err != nil { + return d, err + } + if len(hstr) > 0 { + d.Address = ParseAddress(hstr) + } + if len(pstr) > 0 { + port, err := PortFromString(pstr) + if err != nil { + return d, err + } + d.Port = port + } + return d, nil +} + +// TCPDestination creates a TCP destination with given address +func TCPDestination(address Address, port Port) Destination { + return Destination{ + Network: Network_TCP, + Address: address, + Port: port, + } +} + +// UDPDestination creates a UDP destination with given address +func UDPDestination(address Address, port Port) Destination { + return Destination{ + Network: Network_UDP, + Address: address, + Port: port, + } +} + +// UnixDestination creates a Unix destination with given address +func UnixDestination(address Address) Destination { + return Destination{ + Network: Network_UNIX, + Address: address, + } +} + +// NetAddr returns the network address in this Destination in string form. +func (d Destination) NetAddr() string { + addr := "" + if d.Network == Network_TCP || d.Network == Network_UDP { + addr = d.Address.String() + ":" + d.Port.String() + } else if d.Network == Network_UNIX { + addr = d.Address.String() + } + return addr +} + +// RawNetAddr converts a net.Addr from its Destination presentation. +func (d Destination) RawNetAddr() net.Addr { + var addr net.Addr + switch d.Network { + case Network_TCP: + if d.Address.Family().IsIP() { + addr = &net.TCPAddr{ + IP: d.Address.IP(), + Port: int(d.Port), + } + } + case Network_UDP: + if d.Address.Family().IsIP() { + addr = &net.UDPAddr{ + IP: d.Address.IP(), + Port: int(d.Port), + } + } + case Network_UNIX: + if d.Address.Family().IsDomain() { + addr = &net.UnixAddr{ + Name: d.Address.String(), + Net: d.Network.SystemString(), + } + } + } + return addr +} + +// String returns the strings form of this Destination. +func (d Destination) String() string { + prefix := "unknown:" + switch d.Network { + case Network_TCP: + prefix = "tcp:" + case Network_UDP: + prefix = "udp:" + case Network_UNIX: + prefix = "unix:" + } + return prefix + d.NetAddr() +} + +// IsValid returns true if this Destination is valid. +func (d Destination) IsValid() bool { + return d.Network != Network_Unknown +} diff --git a/common/xray/net/net.go b/common/xray/net/net.go new file mode 100644 index 00000000..9bc049d9 --- /dev/null +++ b/common/xray/net/net.go @@ -0,0 +1,13 @@ +package net + +import "time" + +// defines the maximum time an idle TCP session can survive in the tunnel, so +// it should be consistent across HTTP versions and with other transports. +const ConnIdleTimeout = 300 * time.Second + +// consistent with quic-go +const QuicgoH3KeepAlivePeriod = 10 * time.Second + +// consistent with chrome +const ChromeH2KeepAlivePeriod = 45 * time.Second diff --git a/common/xray/net/network.go b/common/xray/net/network.go new file mode 100644 index 00000000..356d7fda --- /dev/null +++ b/common/xray/net/network.go @@ -0,0 +1,33 @@ +package net + +type Network int32 + +const ( + Network_Unknown Network = 0 + Network_TCP Network = 2 + Network_UDP Network = 3 + Network_UNIX Network = 4 +) + +func (n Network) SystemString() string { + switch n { + case Network_TCP: + return "tcp" + case Network_UDP: + return "udp" + case Network_UNIX: + return "unix" + default: + return "unknown" + } +} + +// HasNetwork returns true if the network list has a certain network. +func HasNetwork(list []Network, network Network) bool { + for _, value := range list { + if value == network { + return true + } + } + return false +} diff --git a/common/xray/net/port.go b/common/xray/net/port.go new file mode 100644 index 00000000..6d42c40b --- /dev/null +++ b/common/xray/net/port.go @@ -0,0 +1,55 @@ +package net + +import ( + "encoding/binary" + "strconv" + + E "github.com/sagernet/sing/common/exceptions" +) + +// Port represents a network port in TCP and UDP protocol. +type Port uint16 + +// PortFromBytes converts a byte array to a Port, assuming bytes are in big endian order. +// @unsafe Caller must ensure that the byte array has at least 2 elements. +func PortFromBytes(port []byte) Port { + return Port(binary.BigEndian.Uint16(port)) +} + +// PortFromInt converts an integer to a Port. +// @error when the integer is not positive or larger then 65535 +func PortFromInt(val uint32) (Port, error) { + if val > 65535 { + return Port(0), E.New("invalid port range: ", val) + } + return Port(val), nil +} + +// PortFromString converts a string to a Port. +// @error when the string is not an integer or the integral value is a not a valid Port. +func PortFromString(s string) (Port, error) { + val, err := strconv.ParseUint(s, 10, 32) + if err != nil { + return Port(0), E.New("invalid port range: ", s) + } + return PortFromInt(uint32(val)) +} + +// Value return the corresponding uint16 value of a Port. +func (p Port) Value() uint16 { + return uint16(p) +} + +// String returns the string presentation of a Port. +func (p Port) String() string { + return strconv.Itoa(int(p)) +} + +type MemoryPortRange struct { + From Port + To Port +} + +func (r MemoryPortRange) Contains(port Port) bool { + return r.From <= port && port <= r.To +} diff --git a/common/xray/net/system.go b/common/xray/net/system.go new file mode 100644 index 00000000..7e1c4b01 --- /dev/null +++ b/common/xray/net/system.go @@ -0,0 +1,84 @@ +package net + +import "net" + +// DialTCP is an alias of net.DialTCP. +var ( + DialTCP = net.DialTCP + DialUDP = net.DialUDP + DialUnix = net.DialUnix + Dial = net.Dial +) + +type ListenConfig = net.ListenConfig + +var ( + Listen = net.Listen + ListenTCP = net.ListenTCP + ListenUDP = net.ListenUDP + ListenUnix = net.ListenUnix +) + +var LookupIP = net.LookupIP + +var FileConn = net.FileConn + +// ParseIP is an alias of net.ParseIP +var ParseIP = net.ParseIP + +var SplitHostPort = net.SplitHostPort + +var CIDRMask = net.CIDRMask + +type ( + Addr = net.Addr + Conn = net.Conn + PacketConn = net.PacketConn +) + +type ( + TCPAddr = net.TCPAddr + TCPConn = net.TCPConn +) + +type ( + UDPAddr = net.UDPAddr + UDPConn = net.UDPConn +) + +type ( + UnixAddr = net.UnixAddr + UnixConn = net.UnixConn +) + +// IP is an alias for net.IP. +type ( + IP = net.IP + IPMask = net.IPMask + IPNet = net.IPNet +) + +const ( + IPv4len = net.IPv4len + IPv6len = net.IPv6len +) + +type ( + Error = net.Error + AddrError = net.AddrError +) + +type ( + Dialer = net.Dialer + Listener = net.Listener + TCPListener = net.TCPListener + UnixListener = net.UnixListener +) + +var ( + ResolveTCPAddr = net.ResolveTCPAddr + ResolveUDPAddr = net.ResolveUDPAddr + ResolveUnixAddr = net.ResolveUnixAddr +) + +type Resolver = net.Resolver diff --git a/common/xray/pipe/impl.go b/common/xray/pipe/impl.go new file mode 100644 index 00000000..f36d4d36 --- /dev/null +++ b/common/xray/pipe/impl.go @@ -0,0 +1,215 @@ +package pipe + +import ( + "errors" + "io" + "runtime" + "sync" + "time" + + "github.com/sagernet/sing-box/common/xray" + "github.com/sagernet/sing-box/common/xray/buf" + "github.com/sagernet/sing-box/common/xray/signal" + "github.com/sagernet/sing-box/common/xray/signal/done" +) + +type state byte + +const ( + open state = iota + closed + errord +) + +type pipeOption struct { + limit int32 // maximum buffer size in bytes + discardOverflow bool +} + +func (o *pipeOption) isFull(curSize int32) bool { + return o.limit >= 0 && curSize > o.limit +} + +type pipe struct { + sync.Mutex + data buf.MultiBuffer + readSignal *signal.Notifier + writeSignal *signal.Notifier + done *done.Instance + errChan chan error + option pipeOption + state state +} + +var ( + errBufferFull = errors.New("buffer full") + errSlowDown = errors.New("slow down") +) + +func (p *pipe) Len() int32 { + data := p.data + if data == nil { + return 0 + } + return data.Len() +} + +func (p *pipe) getState(forRead bool) error { + switch p.state { + case open: + if !forRead && p.option.isFull(p.data.Len()) { + return errBufferFull + } + return nil + case closed: + if !forRead { + return io.ErrClosedPipe + } + if !p.data.IsEmpty() { + return nil + } + return io.EOF + case errord: + return io.ErrClosedPipe + default: + panic("impossible case") + } +} + +func (p *pipe) readMultiBufferInternal() (buf.MultiBuffer, error) { + p.Lock() + defer p.Unlock() + + if err := p.getState(true); err != nil { + return nil, err + } + + data := p.data + p.data = nil + return data, nil +} + +func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) { + for { + data, err := p.readMultiBufferInternal() + if data != nil || err != nil { + p.writeSignal.Signal() + return data, err + } + + select { + case <-p.readSignal.Wait(): + case <-p.done.Wait(): + case err = <-p.errChan: + return nil, err + } + } +} + +func (p *pipe) ReadMultiBufferTimeout(d time.Duration) (buf.MultiBuffer, error) { + timer := time.NewTimer(d) + defer timer.Stop() + + for { + data, err := p.readMultiBufferInternal() + if data != nil || err != nil { + p.writeSignal.Signal() + return data, err + } + + select { + case <-p.readSignal.Wait(): + case <-p.done.Wait(): + case <-timer.C: + return nil, buf.ErrReadTimeout + } + } +} + +func (p *pipe) writeMultiBufferInternal(mb buf.MultiBuffer) error { + p.Lock() + defer p.Unlock() + + if err := p.getState(false); err != nil { + return err + } + + if p.data == nil { + p.data = mb + return nil + } + + p.data, _ = buf.MergeMulti(p.data, mb) + return errSlowDown +} + +func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error { + if mb.IsEmpty() { + return nil + } + + for { + err := p.writeMultiBufferInternal(mb) + if err == nil { + p.readSignal.Signal() + return nil + } + + if err == errSlowDown { + p.readSignal.Signal() + + // Yield current goroutine. Hopefully the reading counterpart can pick up the payload. + runtime.Gosched() + return nil + } + + if err == errBufferFull && p.option.discardOverflow { + buf.ReleaseMulti(mb) + return nil + } + + if err != errBufferFull { + buf.ReleaseMulti(mb) + p.readSignal.Signal() + return err + } + + select { + case <-p.writeSignal.Wait(): + case <-p.done.Wait(): + return io.ErrClosedPipe + } + } +} + +func (p *pipe) Close() error { + p.Lock() + defer p.Unlock() + + if p.state == closed || p.state == errord { + return nil + } + + p.state = closed + common.Must(p.done.Close()) + return nil +} + +// Interrupt implements common.Interruptible. +func (p *pipe) Interrupt() { + p.Lock() + defer p.Unlock() + + if p.state == closed || p.state == errord { + return + } + + p.state = errord + + if !p.data.IsEmpty() { + buf.ReleaseMulti(p.data) + p.data = nil + } + + common.Must(p.done.Close()) +} diff --git a/common/xray/pipe/pipe.go b/common/xray/pipe/pipe.go new file mode 100644 index 00000000..dad26678 --- /dev/null +++ b/common/xray/pipe/pipe.go @@ -0,0 +1,53 @@ +package pipe + +import ( + "github.com/sagernet/sing-box/common/xray/signal" + "github.com/sagernet/sing-box/common/xray/signal/done" +) + +// Option for creating new Pipes. +type Option func(*pipeOption) + +// WithoutSizeLimit returns an Option for Pipe to have no size limit. +func WithoutSizeLimit() Option { + return func(opt *pipeOption) { + opt.limit = -1 + } +} + +// WithSizeLimit returns an Option for Pipe to have the given size limit. +func WithSizeLimit(limit int32) Option { + return func(opt *pipeOption) { + opt.limit = limit + } +} + +// DiscardOverflow returns an Option for Pipe to discard writes if full. +func DiscardOverflow() Option { + return func(opt *pipeOption) { + opt.discardOverflow = true + } +} + +// New creates a new Reader and Writer that connects to each other. +func New(opts ...Option) (*Reader, *Writer) { + p := &pipe{ + readSignal: signal.NewNotifier(), + writeSignal: signal.NewNotifier(), + done: done.New(), + errChan: make(chan error, 1), + option: pipeOption{ + limit: -1, + }, + } + + for _, opt := range opts { + opt(&(p.option)) + } + + return &Reader{ + pipe: p, + }, &Writer{ + pipe: p, + } +} diff --git a/common/xray/pipe/reader.go b/common/xray/pipe/reader.go new file mode 100644 index 00000000..cafe444a --- /dev/null +++ b/common/xray/pipe/reader.go @@ -0,0 +1,41 @@ +package pipe + +import ( + "time" + + "github.com/sagernet/sing-box/common/xray/buf" +) + +// Reader is a buf.Reader that reads content from a pipe. +type Reader struct { + pipe *pipe +} + +// ReadMultiBuffer implements buf.Reader. +func (r *Reader) ReadMultiBuffer() (buf.MultiBuffer, error) { + return r.pipe.ReadMultiBuffer() +} + +// ReadMultiBufferTimeout reads content from a pipe within the given duration, or returns buf.ErrTimeout otherwise. +func (r *Reader) ReadMultiBufferTimeout(d time.Duration) (buf.MultiBuffer, error) { + return r.pipe.ReadMultiBufferTimeout(d) +} + +// Interrupt implements common.Interruptible. +func (r *Reader) Interrupt() { + r.pipe.Interrupt() +} + +// ReturnAnError makes ReadMultiBuffer return an error, only once. +func (r *Reader) ReturnAnError(err error) { + r.pipe.errChan <- err +} + +// Recover catches an error set by ReturnAnError, if exists. +func (r *Reader) Recover() (err error) { + select { + case err = <-r.pipe.errChan: + default: + } + return +} diff --git a/common/xray/pipe/writer.go b/common/xray/pipe/writer.go new file mode 100644 index 00000000..15312635 --- /dev/null +++ b/common/xray/pipe/writer.go @@ -0,0 +1,29 @@ +package pipe + +import ( + "github.com/sagernet/sing-box/common/xray/buf" +) + +// Writer is a buf.Writer that writes data into a pipe. +type Writer struct { + pipe *pipe +} + +// WriteMultiBuffer implements buf.Writer. +func (w *Writer) WriteMultiBuffer(mb buf.MultiBuffer) error { + return w.pipe.WriteMultiBuffer(mb) +} + +// Close implements io.Closer. After the pipe is closed, writing to the pipe will return io.ErrClosedPipe, while reading will return io.EOF. +func (w *Writer) Close() error { + return w.pipe.Close() +} + +func (w *Writer) Len() int32 { + return w.pipe.Len() +} + +// Interrupt implements common.Interruptible. +func (w *Writer) Interrupt() { + w.pipe.Interrupt() +} diff --git a/common/xray/serial/serial.go b/common/xray/serial/serial.go new file mode 100644 index 00000000..9d8cb4ca --- /dev/null +++ b/common/xray/serial/serial.go @@ -0,0 +1,29 @@ +package serial + +import ( + "encoding/binary" + "io" +) + +// ReadUint16 reads first two bytes from the reader, and then converts them to an uint16 value. +func ReadUint16(reader io.Reader) (uint16, error) { + var b [2]byte + if _, err := io.ReadFull(reader, b[:]); err != nil { + return 0, err + } + return binary.BigEndian.Uint16(b[:]), nil +} + +// WriteUint16 writes an uint16 value into writer. +func WriteUint16(writer io.Writer, value uint16) (int, error) { + var b [2]byte + binary.BigEndian.PutUint16(b[:], value) + return writer.Write(b[:]) +} + +// WriteUint64 writes an uint64 value into writer. +func WriteUint64(writer io.Writer, value uint64) (int, error) { + var b [8]byte + binary.BigEndian.PutUint64(b[:], value) + return writer.Write(b[:]) +} diff --git a/common/xray/serial/string.go b/common/xray/serial/string.go new file mode 100644 index 00000000..8fdcd7f6 --- /dev/null +++ b/common/xray/serial/string.go @@ -0,0 +1,35 @@ +package serial + +import ( + "fmt" + "strings" +) + +// ToString serializes an arbitrary value into string. +func ToString(v interface{}) string { + if v == nil { + return "" + } + + switch value := v.(type) { + case string: + return value + case *string: + return *value + case fmt.Stringer: + return value.String() + case error: + return value.Error() + default: + return fmt.Sprintf("%+v", value) + } +} + +// Concat concatenates all input into a single string. +func Concat(v ...interface{}) string { + builder := strings.Builder{} + for _, value := range v { + builder.WriteString(ToString(value)) + } + return builder.String() +} diff --git a/common/xray/signal/done/done.go b/common/xray/signal/done/done.go new file mode 100644 index 00000000..189a8cf3 --- /dev/null +++ b/common/xray/signal/done/done.go @@ -0,0 +1,49 @@ +package done + +import ( + "sync" +) + +// Instance is a utility for notifications of something being done. +type Instance struct { + access sync.Mutex + c chan struct{} + closed bool +} + +// New returns a new Done. +func New() *Instance { + return &Instance{ + c: make(chan struct{}), + } +} + +// Done returns true if Close() is called. +func (d *Instance) Done() bool { + select { + case <-d.Wait(): + return true + default: + return false + } +} + +// Wait returns a channel for waiting for done. +func (d *Instance) Wait() <-chan struct{} { + return d.c +} + +// Close marks this Done 'done'. This method may be called multiple times. All calls after first call will have no effect on its status. +func (d *Instance) Close() error { + d.access.Lock() + defer d.access.Unlock() + + if d.closed { + return nil + } + + d.closed = true + close(d.c) + + return nil +} diff --git a/common/xray/signal/notifier.go b/common/xray/signal/notifier.go new file mode 100644 index 00000000..19836e54 --- /dev/null +++ b/common/xray/signal/notifier.go @@ -0,0 +1,26 @@ +package signal + +// Notifier is a utility for notifying changes. The change producer may notify changes multiple time, and the consumer may get notified asynchronously. +type Notifier struct { + c chan struct{} +} + +// NewNotifier creates a new Notifier. +func NewNotifier() *Notifier { + return &Notifier{ + c: make(chan struct{}, 1), + } +} + +// Signal signals a change, usually by producer. This method never blocks. +func (n *Notifier) Signal() { + select { + case n.c <- struct{}{}: + default: + } +} + +// Wait returns a channel for waiting for changes. The returned channel never gets closed. +func (n *Notifier) Wait() <-chan struct{} { + return n.c +} diff --git a/common/xray/signal/pubsub/pubsub.go b/common/xray/signal/pubsub/pubsub.go new file mode 100644 index 00000000..6de453e8 --- /dev/null +++ b/common/xray/signal/pubsub/pubsub.go @@ -0,0 +1,105 @@ +package pubsub + +import ( + "errors" + "sync" + "time" + + "github.com/sagernet/sing-box/common/xray" + "github.com/sagernet/sing-box/common/xray/signal/done" + "github.com/sagernet/sing-box/common/xray/task" +) + +type Subscriber struct { + buffer chan interface{} + done *done.Instance +} + +func (s *Subscriber) push(msg interface{}) { + select { + case s.buffer <- msg: + default: + } +} + +func (s *Subscriber) Wait() <-chan interface{} { + return s.buffer +} + +func (s *Subscriber) Close() error { + return s.done.Close() +} + +func (s *Subscriber) IsClosed() bool { + return s.done.Done() +} + +type Service struct { + sync.RWMutex + subs map[string][]*Subscriber + ctask *task.Periodic +} + +func NewService() *Service { + s := &Service{ + subs: make(map[string][]*Subscriber), + } + s.ctask = &task.Periodic{ + Execute: s.Cleanup, + Interval: time.Second * 30, + } + return s +} + +// Cleanup cleans up internal caches of subscribers. +// Visible for testing only. +func (s *Service) Cleanup() error { + s.Lock() + defer s.Unlock() + + if len(s.subs) == 0 { + return errors.New("nothing to do") + } + + for name, subs := range s.subs { + newSub := make([]*Subscriber, 0, len(s.subs)) + for _, sub := range subs { + if !sub.IsClosed() { + newSub = append(newSub, sub) + } + } + if len(newSub) == 0 { + delete(s.subs, name) + } else { + s.subs[name] = newSub + } + } + + if len(s.subs) == 0 { + s.subs = make(map[string][]*Subscriber) + } + return nil +} + +func (s *Service) Subscribe(name string) *Subscriber { + sub := &Subscriber{ + buffer: make(chan interface{}, 16), + done: done.New(), + } + s.Lock() + s.subs[name] = append(s.subs[name], sub) + s.Unlock() + common.Must(s.ctask.Start()) + return sub +} + +func (s *Service) Publish(name string, message interface{}) { + s.RLock() + defer s.RUnlock() + + for _, sub := range s.subs[name] { + if !sub.IsClosed() { + sub.push(message) + } + } +} diff --git a/common/xray/signal/semaphore/semaphore.go b/common/xray/signal/semaphore/semaphore.go new file mode 100644 index 00000000..8696b148 --- /dev/null +++ b/common/xray/signal/semaphore/semaphore.go @@ -0,0 +1,27 @@ +package semaphore + +// Instance is an implementation of semaphore. +type Instance struct { + token chan struct{} +} + +// New create a new Semaphore with n permits. +func New(n int) *Instance { + s := &Instance{ + token: make(chan struct{}, n), + } + for i := 0; i < n; i++ { + s.token <- struct{}{} + } + return s +} + +// Wait returns a channel for acquiring a permit. +func (s *Instance) Wait() <-chan struct{} { + return s.token +} + +// Signal releases a permit into the semaphore. +func (s *Instance) Signal() { + s.token <- struct{}{} +} diff --git a/common/xray/signal/timer.go b/common/xray/signal/timer.go new file mode 100644 index 00000000..822350a1 --- /dev/null +++ b/common/xray/signal/timer.go @@ -0,0 +1,82 @@ +package signal + +import ( + "context" + "sync" + "time" + + "github.com/sagernet/sing-box/common/xray" + "github.com/sagernet/sing-box/common/xray/task" +) + +type ActivityUpdater interface { + Update() +} + +type ActivityTimer struct { + sync.RWMutex + updated chan struct{} + checkTask *task.Periodic + onTimeout func() +} + +func (t *ActivityTimer) Update() { + select { + case t.updated <- struct{}{}: + default: + } +} + +func (t *ActivityTimer) check() error { + select { + case <-t.updated: + default: + t.finish() + } + return nil +} + +func (t *ActivityTimer) finish() { + t.Lock() + defer t.Unlock() + + if t.onTimeout != nil { + t.onTimeout() + t.onTimeout = nil + } + if t.checkTask != nil { + t.checkTask.Close() + t.checkTask = nil + } +} + +func (t *ActivityTimer) SetTimeout(timeout time.Duration) { + if timeout == 0 { + t.finish() + return + } + + checkTask := &task.Periodic{ + Interval: timeout, + Execute: t.check, + } + + t.Lock() + + if t.checkTask != nil { + t.checkTask.Close() + } + t.checkTask = checkTask + t.Unlock() + t.Update() + common.Must(checkTask.Start()) +} + +func CancelAfterInactivity(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) *ActivityTimer { + timer := &ActivityTimer{ + updated: make(chan struct{}, 1), + onTimeout: cancel, + } + timer.SetTimeout(timeout) + return timer +} diff --git a/common/xray/stat/connection.go b/common/xray/stat/connection.go new file mode 100644 index 00000000..463db62d --- /dev/null +++ b/common/xray/stat/connection.go @@ -0,0 +1,34 @@ +package stat + +import ( + "net" + + "github.com/sagernet/sing-box/common/xray/stats" +) + +type Connection interface { + net.Conn +} + +type CounterConnection struct { + Connection + ReadCounter stats.Counter + WriteCounter stats.Counter +} + +func (c *CounterConnection) Read(b []byte) (int, error) { + nBytes, err := c.Connection.Read(b) + if c.ReadCounter != nil { + c.ReadCounter.Add(int64(nBytes)) + } + + return nBytes, err +} + +func (c *CounterConnection) Write(b []byte) (int, error) { + nBytes, err := c.Connection.Write(b) + if c.WriteCounter != nil { + c.WriteCounter.Add(int64(nBytes)) + } + return nBytes, err +} diff --git a/common/xray/stats/stats.go b/common/xray/stats/stats.go new file mode 100644 index 00000000..70bbd026 --- /dev/null +++ b/common/xray/stats/stats.go @@ -0,0 +1,13 @@ +package stats + +// Counter is the interface for stats counters. +// +// xray:api:stable +type Counter interface { + // Value is the current value of the counter. + Value() int64 + // Set sets a new value to the counter, and returns the previous one. + Set(int64) int64 + // Add adds a value to the current counter value, and returns the previous value. + Add(int64) int64 +} diff --git a/common/xray/task/common.go b/common/xray/task/common.go new file mode 100644 index 00000000..11078fb9 --- /dev/null +++ b/common/xray/task/common.go @@ -0,0 +1,10 @@ +package task + +import "github.com/sagernet/sing-box/common/xray" + +// Close returns a func() that closes v. +func Close(v interface{}) func() error { + return func() error { + return common.Close(v) + } +} diff --git a/common/xray/task/periodic.go b/common/xray/task/periodic.go new file mode 100644 index 00000000..6abe41ae --- /dev/null +++ b/common/xray/task/periodic.go @@ -0,0 +1,85 @@ +package task + +import ( + "sync" + "time" +) + +// Periodic is a task that runs periodically. +type Periodic struct { + // Interval of the task being run + Interval time.Duration + // Execute is the task function + Execute func() error + + access sync.Mutex + timer *time.Timer + running bool +} + +func (t *Periodic) hasClosed() bool { + t.access.Lock() + defer t.access.Unlock() + + return !t.running +} + +func (t *Periodic) checkedExecute() error { + if t.hasClosed() { + return nil + } + + if err := t.Execute(); err != nil { + t.access.Lock() + t.running = false + t.access.Unlock() + return err + } + + t.access.Lock() + defer t.access.Unlock() + + if !t.running { + return nil + } + + t.timer = time.AfterFunc(t.Interval, func() { + t.checkedExecute() + }) + + return nil +} + +// Start implements common.Runnable. +func (t *Periodic) Start() error { + t.access.Lock() + if t.running { + t.access.Unlock() + return nil + } + t.running = true + t.access.Unlock() + + if err := t.checkedExecute(); err != nil { + t.access.Lock() + t.running = false + t.access.Unlock() + return err + } + + return nil +} + +// Close implements common.Closable. +func (t *Periodic) Close() error { + t.access.Lock() + defer t.access.Unlock() + + t.running = false + if t.timer != nil { + t.timer.Stop() + t.timer = nil + } + + return nil +} diff --git a/common/xray/task/task.go b/common/xray/task/task.go new file mode 100644 index 00000000..a1538392 --- /dev/null +++ b/common/xray/task/task.go @@ -0,0 +1,64 @@ +package task + +import ( + "context" + + "github.com/sagernet/sing-box/common/xray/signal/semaphore" +) + +// OnSuccess executes g() after f() returns nil. +func OnSuccess(f func() error, g func() error) func() error { + return func() error { + if err := f(); err != nil { + return err + } + return g() + } +} + +// Run executes a list of tasks in parallel, returns the first error encountered or nil if all tasks pass. +func Run(ctx context.Context, tasks ...func() error) error { + n := len(tasks) + s := semaphore.New(n) + done := make(chan error, 1) + + for _, task := range tasks { + <-s.Wait() + go func(f func() error) { + err := f() + if err == nil { + s.Signal() + return + } + + select { + case done <- err: + default: + } + }(task) + } + + /* + if altctx := ctx.Value("altctx"); altctx != nil { + ctx = altctx.(context.Context) + } + */ + + for i := 0; i < n; i++ { + select { + case err := <-done: + return err + case <-ctx.Done(): + return ctx.Err() + case <-s.Wait(): + } + } + + /* + if cancel := ctx.Value("cancel"); cancel != nil { + cancel.(context.CancelFunc)() + } + */ + + return nil +} diff --git a/common/xray/uuid/uuid.go b/common/xray/uuid/uuid.go new file mode 100644 index 00000000..a0219036 --- /dev/null +++ b/common/xray/uuid/uuid.go @@ -0,0 +1,101 @@ +package uuid + +import ( + "bytes" + "crypto/rand" + "crypto/sha1" + "encoding/hex" + + "github.com/sagernet/sing-box/common/xray" + E "github.com/sagernet/sing/common/exceptions" +) + +var byteGroups = []int{8, 4, 4, 4, 12} + +type UUID [16]byte + +// String returns the string representation of this UUID. +func (u *UUID) String() string { + bytes := u.Bytes() + result := hex.EncodeToString(bytes[0 : byteGroups[0]/2]) + start := byteGroups[0] / 2 + for i := 1; i < len(byteGroups); i++ { + nBytes := byteGroups[i] / 2 + result += "-" + result += hex.EncodeToString(bytes[start : start+nBytes]) + start += nBytes + } + return result +} + +// Bytes returns the bytes representation of this UUID. +func (u *UUID) Bytes() []byte { + return u[:] +} + +// Equals returns true if this UUID equals another UUID by value. +func (u *UUID) Equals(another *UUID) bool { + if u == nil && another == nil { + return true + } + if u == nil || another == nil { + return false + } + return bytes.Equal(u.Bytes(), another.Bytes()) +} + +// New creates a UUID with random value. +func New() UUID { + var uuid UUID + common.Must2(rand.Read(uuid.Bytes())) + uuid[6] = (uuid[6] & 0x0f) | (4 << 4) + uuid[8] = (uuid[8]&(0xff>>2) | (0x02 << 6)) + return uuid +} + +// ParseBytes converts a UUID in byte form to object. +func ParseBytes(b []byte) (UUID, error) { + var uuid UUID + if len(b) != 16 { + return uuid, E.New("invalid UUID: ", b) + } + copy(uuid[:], b) + return uuid, nil +} + +// ParseString converts a UUID in string form to object. +func ParseString(str string) (UUID, error) { + var uuid UUID + + text := []byte(str) + if l := len(text); l < 32 || l > 36 { + if l == 0 || l > 30 { + return uuid, E.New("invalid UUID: ", str) + } + h := sha1.New() + h.Write(uuid[:]) + h.Write(text) + u := h.Sum(nil)[:16] + u[6] = (u[6] & 0x0f) | (5 << 4) + u[8] = (u[8]&(0xff>>2) | (0x02 << 6)) + copy(uuid[:], u) + return uuid, nil + } + + b := uuid.Bytes() + + for _, byteGroup := range byteGroups { + if text[0] == '-' { + text = text[1:] + } + + if _, err := hex.Decode(b[:byteGroup/2], text[:byteGroup]); err != nil { + return uuid, err + } + + text = text[byteGroup:] + b = b[byteGroup/2:] + } + + return uuid, nil +} diff --git a/constant/v2ray.go b/constant/v2ray.go index c3089a6c..1811df5f 100644 --- a/constant/v2ray.go +++ b/constant/v2ray.go @@ -6,4 +6,5 @@ const ( V2RayTransportTypeQUIC = "quic" V2RayTransportTypeGRPC = "grpc" V2RayTransportTypeHTTPUpgrade = "httpupgrade" + V2RayTransportTypeXHTTP = "xhttp" ) diff --git a/go.mod b/go.mod index 4daa087a..631abc3d 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/sagernet/sing-box -go 1.23.6 +go 1.24 toolchain go1.24.3 @@ -19,6 +19,7 @@ require ( github.com/mholt/acmez v1.2.0 github.com/miekg/dns v1.1.63 github.com/oschwald/maxminddb-golang v1.12.0 + github.com/quic-go/quic-go v0.52.0 github.com/sagernet/asc-go v0.0.0-20241217030726-d563060fe4e1 github.com/sagernet/bbolt v0.0.0-20231014093535-ea5cb2fe9f0a github.com/sagernet/cloudflare-tls v0.0.0-20231208171750-a4483c1b7cd1 @@ -59,6 +60,7 @@ require ( require ( github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect + go.uber.org/mock v0.5.0 // indirect ) //replace github.com/sagernet/sing => ../sing @@ -90,7 +92,7 @@ require ( github.com/onsi/ginkgo/v2 v2.9.7 // indirect github.com/pierrec/lz4/v4 v4.1.14 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/quic-go/qpack v0.4.0 // indirect + github.com/quic-go/qpack v0.5.1 // indirect github.com/quic-go/qtls-go1-20 v0.4.1 // indirect github.com/sagernet/netlink v0.0.0-20240612041022-b9a21c07ac6a // indirect github.com/sagernet/nftables v0.3.0-beta.4 // indirect @@ -103,7 +105,7 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/sync v0.10.0 // indirect golang.org/x/text v0.21.0 // indirect - golang.org/x/time v0.7.0 // indirect + golang.org/x/time v0.7.0 golang.org/x/tools v0.24.0 // indirect golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect diff --git a/go.sum b/go.sum index 09c49c61..4c8a8db0 100644 --- a/go.sum +++ b/go.sum @@ -97,10 +97,12 @@ github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo= -github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A= +github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI= +github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg= github.com/quic-go/qtls-go1-20 v0.4.1 h1:D33340mCNDAIKBqXuAvexTNMUByrYmFYVfKfDN5nfFs= github.com/quic-go/qtls-go1-20 v0.4.1/go.mod h1:X9Nh97ZL80Z+bX/gUXMbipO6OxdiDi58b/fMC9mAL+k= +github.com/quic-go/quic-go v0.52.0 h1:/SlHrCRElyaU6MaEPKqKr9z83sBg2v4FLLvWM+Z47pA= +github.com/quic-go/quic-go v0.52.0/go.mod h1:MFlGGpcpJqRAfmYi6NC2cptDPSxRWTOGNuP4wqrWmzQ= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sagernet/asc-go v0.0.0-20241217030726-d563060fe4e1 h1:qi+ijeREa0yfAaO+NOcZ81gv4uzOfALUIdhkiIFvmG4= github.com/sagernet/asc-go v0.0.0-20241217030726-d563060fe4e1/go.mod h1:JULDuzTMn2gyZFcjpTVZP4/UuwAdbHJ0bum2RdjXojU= @@ -184,6 +186,8 @@ github.com/zeebo/pcg v1.0.1 h1:lyqfGeWiv4ahac6ttHs+I5hwtH/+1mrhlCtVNQM2kHo= github.com/zeebo/pcg v1.0.1/go.mod h1:09F0S9iiKrwn9rlI5yjLkmrug154/YRW6KnnXVDM/l4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU= +go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= diff --git a/option/v2ray_transport.go b/option/v2ray_transport.go index 68c23858..7b5287b7 100644 --- a/option/v2ray_transport.go +++ b/option/v2ray_transport.go @@ -1,6 +1,11 @@ package option import ( + "net/http" + "net/url" + "strings" + + Xbadoption "github.com/sagernet/sing-box/common/xray/json/badoption" C "github.com/sagernet/sing-box/constant" E "github.com/sagernet/sing/common/exceptions" "github.com/sagernet/sing/common/json" @@ -15,6 +20,7 @@ type _V2RayTransportOptions struct { QUICOptions V2RayQUICOptions `json:"-"` GRPCOptions V2RayGRPCOptions `json:"-"` HTTPUpgradeOptions V2RayHTTPUpgradeOptions `json:"-"` + XHTTPOptions V2RayXHTTPOptions `json:"-"` } type V2RayTransportOptions _V2RayTransportOptions @@ -32,6 +38,8 @@ func (o V2RayTransportOptions) MarshalJSON() ([]byte, error) { v = o.GRPCOptions case C.V2RayTransportTypeHTTPUpgrade: v = o.HTTPUpgradeOptions + case C.V2RayTransportTypeXHTTP: + v = o.XHTTPOptions case "": return nil, E.New("missing transport type") default: @@ -57,6 +65,8 @@ func (o *V2RayTransportOptions) UnmarshalJSON(bytes []byte) error { v = &o.GRPCOptions case C.V2RayTransportTypeHTTPUpgrade: v = &o.HTTPUpgradeOptions + case C.V2RayTransportTypeXHTTP: + v = &o.XHTTPOptions default: return E.New("unknown transport type: " + o.Type) } @@ -98,3 +108,145 @@ type V2RayHTTPUpgradeOptions struct { Path string `json:"path,omitempty"` Headers badoption.HTTPHeader `json:"headers,omitempty"` } + +type V2RayXHTTPBaseOptions struct { + Mode string `json:"mode"` + Host string `json:"host,omitempty"` + Path string `json:"path,omitempty"` + Headers map[string]string `json:"headers,omitempty"` + DomainStrategy DomainStrategy `json:"domain_strategy,omitempty"` + XPaddingBytes Xbadoption.Range `json:"x_padding_bytes"` + NoGRPCHeader bool `json:"no_grpc_header,omitempty"` + NoSSEHeader bool `json:"no_sse_header,omitempty"` + ScMaxEachPostBytes Xbadoption.Range `json:"sc_max_each_post_bytes"` + ScMinPostsIntervalMs Xbadoption.Range `json:"sc_min_posts_interval_ms"` + ScMaxBufferedPosts int64 `json:"sc_max_buffered_posts,omitempty"` + ScStreamUpServerSecs Xbadoption.Range `json:"sc_stream_up_server_secs"` + Xmux *V2RayXHTTPXmuxOptions `json:"xmux"` +} + +type V2RayXHTTPOptions struct { + V2RayXHTTPBaseOptions + Download *V2RayXHTTPDownloadOptions `json:"download"` +} + +type V2RayXHTTPDownloadOptions struct { + V2RayXHTTPBaseOptions + ServerOptions + OutboundTLSOptionsContainer + Detour string `json:"detour,omitempty"` +} + +func (c *V2RayXHTTPBaseOptions) GetNormalizedPath() string { + pathAndQuery := strings.SplitN(c.Path, "?", 2) + path := pathAndQuery[0] + if path == "" || path[0] != '/' { + path = "/" + path + } + if path[len(path)-1] != '/' { + path = path + "/" + } + return path +} + +func (c *V2RayXHTTPBaseOptions) GetNormalizedQuery() string { + pathAndQuery := strings.SplitN(c.Path, "?", 2) + query := "" + if len(pathAndQuery) > 1 { + query = pathAndQuery[1] + } + return query +} + +func (c *V2RayXHTTPBaseOptions) GetRequestHeader(rawURL string) http.Header { + header := http.Header{} + for k, v := range c.Headers { + header.Add(k, v) + } + u, _ := url.Parse(rawURL) + // https://www.rfc-editor.org/rfc/rfc7541.html#appendix-B + // h2's HPACK Header Compression feature employs a huffman encoding using a static table. + // 'X' is assigned an 8 bit code, so HPACK compression won't change actual padding length on the wire. + // https://www.rfc-editor.org/rfc/rfc9204.html#section-4.1.2-2 + // h3's similar QPACK feature uses the same huffman table. + u.RawQuery = "x_padding=" + strings.Repeat("X", int(c.GetNormalizedXPaddingBytes().Rand())) + header.Set("Referer", u.String()) + return header +} + +func (c *V2RayXHTTPBaseOptions) GetNormalizedXPaddingBytes() Xbadoption.Range { + if c.XPaddingBytes.To == 0 { + return Xbadoption.Range{ + From: 100, + To: 1000, + } + } + return c.XPaddingBytes +} + +func (c *V2RayXHTTPBaseOptions) GetNormalizedScMaxEachPostBytes() Xbadoption.Range { + if c.ScMaxEachPostBytes.To == 0 { + return Xbadoption.Range{ + From: 1000000, + To: 1000000, + } + } + return c.ScMaxEachPostBytes +} + +func (c *V2RayXHTTPBaseOptions) GetNormalizedScMinPostsIntervalMs() Xbadoption.Range { + if c.ScMinPostsIntervalMs.To == 0 { + return Xbadoption.Range{ + From: 30, + To: 30, + } + } + return c.ScMinPostsIntervalMs +} + +func (c *V2RayXHTTPBaseOptions) GetNormalizedScMaxBufferedPosts() int { + if c.ScMaxBufferedPosts == 0 { + return 30 + } + + return int(c.ScMaxBufferedPosts) +} + +func (c *V2RayXHTTPBaseOptions) GetNormalizedScStreamUpServerSecs() Xbadoption.Range { + if c.ScStreamUpServerSecs.To == 0 { + return Xbadoption.Range{ + From: 20, + To: 80, + } + } + return c.ScStreamUpServerSecs +} + +type V2RayXHTTPXmuxOptions struct { + MaxConcurrency Xbadoption.Range `json:"max_concurrency"` + MaxConnections Xbadoption.Range `json:"max_connections"` + CMaxReuseTimes Xbadoption.Range `json:"c_max_reuse_times"` + HMaxRequestTimes Xbadoption.Range `json:"h_max_request_times"` + HMaxReusableSecs Xbadoption.Range `json:"h_max_reusable_secs"` + HKeepAlivePeriod int64 `json:"h_keep_alive_period"` +} + +func (m *V2RayXHTTPXmuxOptions) GetNormalizedMaxConcurrency() Xbadoption.Range { + return m.MaxConcurrency +} + +func (m *V2RayXHTTPXmuxOptions) GetNormalizedMaxConnections() Xbadoption.Range { + return m.MaxConnections +} + +func (m *V2RayXHTTPXmuxOptions) GetNormalizedCMaxReuseTimes() Xbadoption.Range { + return m.CMaxReuseTimes +} + +func (m *V2RayXHTTPXmuxOptions) GetNormalizedHMaxRequestTimes() Xbadoption.Range { + return m.HMaxRequestTimes +} + +func (m *V2RayXHTTPXmuxOptions) GetNormalizedHMaxReusableSecs() Xbadoption.Range { + return m.HMaxReusableSecs +} diff --git a/route/conn.go b/route/conn.go index 319e463c..ffa5311f 100644 --- a/route/conn.go +++ b/route/conn.go @@ -7,6 +7,7 @@ import ( "net" "net/netip" "os" + "strings" "sync" "sync/atomic" "time" @@ -264,7 +265,7 @@ func (m *ConnectionManager) connectionCopy(ctx context.Context, source net.Conn, if !direction { if err == nil { m.logger.DebugContext(ctx, "connection upload finished") - } else if !E.IsClosedOrCanceled(err) { + } else if !E.IsClosedOrCanceled(err) && !strings.Contains(err.Error(), "NO_ERROR") { m.logger.ErrorContext(ctx, "connection upload closed: ", err) } else { m.logger.TraceContext(ctx, "connection upload closed") @@ -272,7 +273,7 @@ func (m *ConnectionManager) connectionCopy(ctx context.Context, source net.Conn, } else { if err == nil { m.logger.DebugContext(ctx, "connection download finished") - } else if !E.IsClosedOrCanceled(err) { + } else if !E.IsClosedOrCanceled(err) && !strings.Contains(err.Error(), "NO_ERROR") { m.logger.ErrorContext(ctx, "connection download closed: ", err) } else { m.logger.TraceContext(ctx, "connection download closed") diff --git a/transport/v2ray/transport.go b/transport/v2ray/transport.go index ab52f55e..e739fe3f 100644 --- a/transport/v2ray/transport.go +++ b/transport/v2ray/transport.go @@ -10,6 +10,7 @@ import ( "github.com/sagernet/sing-box/transport/v2rayhttp" "github.com/sagernet/sing-box/transport/v2rayhttpupgrade" "github.com/sagernet/sing-box/transport/v2raywebsocket" + xhttp "github.com/sagernet/sing-box/transport/v2rayxhttp" E "github.com/sagernet/sing/common/exceptions" "github.com/sagernet/sing/common/logger" M "github.com/sagernet/sing/common/metadata" @@ -39,6 +40,8 @@ func NewServerTransport(ctx context.Context, logger logger.ContextLogger, option return NewGRPCServer(ctx, logger, options.GRPCOptions, tlsConfig, handler) case C.V2RayTransportTypeHTTPUpgrade: return v2rayhttpupgrade.NewServer(ctx, logger, options.HTTPUpgradeOptions, tlsConfig, handler) + case C.V2RayTransportTypeXHTTP: + return xhttp.NewServer(ctx, logger, options.XHTTPOptions, tlsConfig, handler) default: return nil, E.New("unknown transport type: " + options.Type) } @@ -62,6 +65,8 @@ func NewClientTransport(ctx context.Context, dialer N.Dialer, serverAddr M.Socks return NewQUICClient(ctx, dialer, serverAddr, options.QUICOptions, tlsConfig) case C.V2RayTransportTypeHTTPUpgrade: return v2rayhttpupgrade.NewClient(ctx, dialer, serverAddr, options.HTTPUpgradeOptions, tlsConfig) + case C.V2RayTransportTypeXHTTP: + return xhttp.NewClient(ctx, dialer, serverAddr, options.XHTTPOptions, tlsConfig) default: return nil, E.New("unknown transport type: " + options.Type) } diff --git a/transport/v2rayxhttp/client.go b/transport/v2rayxhttp/client.go new file mode 100644 index 00000000..a0582bb5 --- /dev/null +++ b/transport/v2rayxhttp/client.go @@ -0,0 +1,400 @@ +package xhttp + +import ( + "context" + gotls "crypto/tls" + "io" + "net/http" + "net/http/httptrace" + "net/url" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/quic-go/quic-go" + "github.com/quic-go/quic-go/http3" + "github.com/sagernet/sing-box/adapter" + "github.com/sagernet/sing-box/common/tls" + "github.com/sagernet/sing-box/common/xray/buf" + "github.com/sagernet/sing-box/common/xray/net" + "github.com/sagernet/sing-box/common/xray/pipe" + "github.com/sagernet/sing-box/common/xray/signal/done" + "github.com/sagernet/sing-box/common/xray/uuid" + "github.com/sagernet/sing-box/option" + dns "github.com/sagernet/sing-dns" + "github.com/sagernet/sing/common" + "github.com/sagernet/sing/common/bufio" + E "github.com/sagernet/sing/common/exceptions" + M "github.com/sagernet/sing/common/metadata" + N "github.com/sagernet/sing/common/network" + sHTTP "github.com/sagernet/sing/protocol/http" + "github.com/sagernet/sing/service" + "golang.org/x/net/http2" +) + +type Client struct { + ctx context.Context + options *option.V2RayXHTTPOptions + getRequestURL func(sessionId string) url.URL + getRequestURL2 func(sessionId string) url.URL + getHTTPClient func() (DialerClient, *XmuxClient) + getHTTPClient2 func() (DialerClient, *XmuxClient) +} + +func NewClient(ctx context.Context, dialer N.Dialer, serverAddr M.Socksaddr, options option.V2RayXHTTPOptions, tlsConfig tls.Config) (adapter.V2RayClientTransport, error) { + if options.Mode == "" { + return nil, E.New("mode is not set") + } + router := service.FromContext[adapter.Router](ctx) + dest := serverAddr + var gotlsConfig *gotls.Config + if tlsConfig != nil { + var err error + gotlsConfig, err = tlsConfig.Config() + if err != nil { + return nil, err + } + } + baseRequestURL, err := getBaseRequestURL( + &options.V2RayXHTTPBaseOptions, dest, tlsConfig, + ) + if err != nil { + return nil, err + } + getRequestURL := func(sessionId string) url.URL { + requestURL := baseRequestURL + requestURL.Path += sessionId + return requestURL + } + if dest.IsFqdn() { + addresses, err := router.Lookup(ctx, dest.AddrString(), dns.DomainStrategy(options.DomainStrategy)) + if err != nil { + return nil, err + } + dest.Addr = addresses[0] + } + var xmuxOptions option.V2RayXHTTPXmuxOptions + if options.Xmux != nil { + xmuxOptions = *options.Xmux + } + xmuxManager := NewXmuxManager(xmuxOptions, func() XmuxConn { + return createHTTPClient(dest, dialer, &options.V2RayXHTTPBaseOptions, tlsConfig, gotlsConfig) + }) + getHTTPClient := func() (DialerClient, *XmuxClient) { + xmuxClient := xmuxManager.GetXmuxClient(ctx) + return xmuxClient.XmuxConn.(DialerClient), xmuxClient + } + getRequestURL2 := getRequestURL + getHTTPClient2 := func() (DialerClient, *XmuxClient) { + return nil, nil + } + if options.Download != nil { + options2 := options.Download + dialer2 := dialer + if options2.Detour != "" { + var ok bool + dialer2, ok = service.FromContext[adapter.OutboundManager](ctx).Outbound(options2.Detour) + if !ok { + return nil, E.New("outbound detour not found: ", options2.Detour) + } + } + dest2 := options2.ServerOptions.Build() + var tlsConfig2 tls.Config + var gotlsConfig2 *gotls.Config + if options2.TLS != nil { + tlsConfig2, err = tls.NewClient(ctx, options2.Server, common.PtrValueOrDefault(options2.TLS)) + if err != nil { + return nil, err + } + gotlsConfig2, err = tlsConfig2.Config() + if err != nil { + return nil, err + } + } + baseRequestURL2, err := getBaseRequestURL(&options2.V2RayXHTTPBaseOptions, dest2, tlsConfig2) + if err != nil { + return nil, err + } + getRequestURL2 = func(sessionId string) url.URL { + requestURL2 := baseRequestURL2 + requestURL2.Path += sessionId + return requestURL2 + } + if dest2.IsFqdn() { + addresses2, err := router.Lookup(ctx, dest2.AddrString(), dns.DomainStrategy(options2.DomainStrategy)) + if err != nil { + return nil, err + } + dest2.Addr = addresses2[0] + } + var xmuxOptions2 option.V2RayXHTTPXmuxOptions + if options2.Xmux != nil { + xmuxOptions2 = *options2.Xmux + } + xmuxManager2 := NewXmuxManager(xmuxOptions2, func() XmuxConn { + return createHTTPClient(dest2, dialer2, &options2.V2RayXHTTPBaseOptions, tlsConfig2, gotlsConfig2) + }) + getHTTPClient2 = func() (DialerClient, *XmuxClient) { + xmuxClient2 := xmuxManager2.GetXmuxClient(ctx) + return xmuxClient2.XmuxConn.(DialerClient), xmuxClient2 + } + } + return &Client{ + ctx: ctx, + options: &options, + getHTTPClient: getHTTPClient, + getHTTPClient2: getHTTPClient2, + getRequestURL: getRequestURL, + getRequestURL2: getRequestURL2, + }, nil +} + +func (c *Client) DialContext(ctx context.Context) (net.Conn, error) { + options := c.options + mode := c.options.Mode + sessionIdUuid := uuid.New() + requestURL := c.getRequestURL(sessionIdUuid.String()) + requestURL2 := c.getRequestURL(sessionIdUuid.String()) + httpClient, xmuxClient := c.getHTTPClient() + httpClient2, xmuxClient2 := c.getHTTPClient2() + if xmuxClient != nil { + xmuxClient.OpenUsage.Add(1) + } + if xmuxClient2 != nil && xmuxClient2 != xmuxClient { + xmuxClient2.OpenUsage.Add(1) + } + var closed atomic.Int32 + reader, writer := io.Pipe() + conn := splitConn{ + writer: writer, + onClose: func() { + if closed.Add(1) > 1 { + return + } + if xmuxClient != nil { + xmuxClient.OpenUsage.Add(-1) + } + if xmuxClient2 != nil && xmuxClient2 != xmuxClient { + xmuxClient2.OpenUsage.Add(-1) + } + }, + } + var err error + if mode == "stream-one" { + requestURL.Path = options.GetNormalizedPath() + if xmuxClient != nil { + xmuxClient.LeftRequests.Add(-1) + } + conn.reader, conn.remoteAddr, conn.localAddr, err = httpClient.OpenStream(ctx, requestURL.String(), reader, false) + if err != nil { // browser dialer only + return nil, err + } + return &conn, nil + } else { // stream-down + if xmuxClient2 != nil { + xmuxClient2.LeftRequests.Add(-1) + } + conn.reader, conn.remoteAddr, conn.localAddr, err = httpClient2.OpenStream(ctx, requestURL2.String(), nil, false) + if err != nil { // browser dialer only + return nil, err + } + } + if mode == "stream-up" { + if xmuxClient != nil { + xmuxClient.LeftRequests.Add(-1) + } + _, _, _, err = httpClient.OpenStream(ctx, requestURL.String(), reader, true) + if err != nil { // browser dialer only + return nil, err + } + return &conn, nil + } + scMaxEachPostBytes := options.GetNormalizedScMaxEachPostBytes() + scMinPostsIntervalMs := options.GetNormalizedScMinPostsIntervalMs() + if scMaxEachPostBytes.From <= buf.Size { + panic("`scMaxEachPostBytes` should be bigger than " + strconv.Itoa(buf.Size)) + } + maxUploadSize := scMaxEachPostBytes.Rand() + // WithSizeLimit(0) will still allow single bytes to pass, and a lot of + // code relies on this behavior. Subtract 1 so that together with + // uploadWriter wrapper, exact size limits can be enforced + // uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize - 1)) + uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize - buf.Size)) + conn.writer = uploadWriter{ + uploadPipeWriter, + maxUploadSize, + } + go func() { + var seq int64 + var lastWrite time.Time + for { + wroteRequest := done.New() + ctx := httptrace.WithClientTrace(ctx, &httptrace.ClientTrace{ + WroteRequest: func(httptrace.WroteRequestInfo) { + wroteRequest.Close() + }, + }) + // this intentionally makes a shallow-copy of the struct so we + // can reassign Path (potentially concurrently) + url := requestURL + url.Path += "/" + strconv.FormatInt(seq, 10) + seq += 1 + if scMinPostsIntervalMs.From > 0 { + time.Sleep(time.Duration(scMinPostsIntervalMs.Rand())*time.Millisecond - time.Since(lastWrite)) + } + // by offloading the uploads into a buffered pipe, multiple conn.Write + // calls get automatically batched together into larger POST requests. + // without batching, bandwidth is extremely limited. + chunk, err := uploadPipeReader.ReadMultiBuffer() + if err != nil { + break + } + lastWrite = time.Now() + if xmuxClient != nil && (xmuxClient.LeftRequests.Add(-1) <= 0 || + (xmuxClient.UnreusableAt != time.Time{} && lastWrite.After(xmuxClient.UnreusableAt))) { + httpClient, xmuxClient = c.getHTTPClient() + } + go func() { + err := httpClient.PostPacket( + ctx, + url.String(), + &buf.MultiBufferContainer{MultiBuffer: chunk}, + int64(chunk.Len()), + ) + wroteRequest.Close() + if err != nil { + uploadPipeReader.Interrupt() + } + }() + if _, ok := httpClient.(*DefaultDialerClient); ok { + <-wroteRequest.Wait() + } + } + }() + return &conn, nil +} + +func (c *Client) Close() error { + return nil +} + +func decideHTTPVersion(gotlsConfig *gotls.Config) string { + if gotlsConfig == nil || len(gotlsConfig.NextProtos) == 0 || gotlsConfig.NextProtos[0] == "http/1.1" { + return "1.1" + } + if gotlsConfig.NextProtos[0] == "h3" { + return "3" + } + return "2" +} + +func getBaseRequestURL(options *option.V2RayXHTTPBaseOptions, dest M.Socksaddr, tlsConfig tls.Config) (url.URL, error) { + var requestURL url.URL + if tlsConfig == nil { + requestURL.Scheme = "http" + } else { + requestURL.Scheme = "https" + } + requestURL.Host = options.Host + if requestURL.Host == "" && tlsConfig != nil { + requestURL.Host = tlsConfig.ServerName() + } + if requestURL.Host == "" { + requestURL.Host = dest.AddrString() + } + requestURL.Path = options.Path + if err := sHTTP.URLSetPath(&requestURL, options.Path); err != nil { + return requestURL, E.New(err, "parse path") + } + if !strings.HasPrefix(requestURL.Path, "/") { + requestURL.Path = "/" + requestURL.Path + } + requestURL.Path = options.GetNormalizedPath() + requestURL.RawQuery = options.GetNormalizedQuery() + return requestURL, nil +} + +func createHTTPClient(dest M.Socksaddr, dialer N.Dialer, options *option.V2RayXHTTPBaseOptions, tlsConfig tls.Config, gotlsConfig *gotls.Config) DialerClient { + httpVersion := decideHTTPVersion(gotlsConfig) + dialContext := func(ctxInner context.Context) (net.Conn, error) { + conn, err := dialer.DialContext(ctxInner, "tcp", dest) + if err != nil { + return nil, err + } + if httpVersion == "2" { + return tls.ClientHandshake(ctxInner, conn, tlsConfig) + } + return conn, nil + } + var keepAlivePeriod time.Duration + if options.Xmux != nil { + keepAlivePeriod = time.Duration(options.Xmux.HKeepAlivePeriod) * time.Second + } + var transport http.RoundTripper + switch httpVersion { + case "3": + if keepAlivePeriod == 0 { + keepAlivePeriod = net.QuicgoH3KeepAlivePeriod + } + if keepAlivePeriod < 0 { + keepAlivePeriod = 0 + } + quicConfig := &quic.Config{ + MaxIdleTimeout: net.ConnIdleTimeout, + // these two are defaults of quic-go/http3. the default of quic-go (no + // http3) is different, so it is hardcoded here for clarity. + // https://github.com/quic-go/quic-go/blob/b8ea5c798155950fb5bbfdd06cad1939c9355878/http3/client.go#L36-L39 + MaxIncomingStreams: -1, + KeepAlivePeriod: keepAlivePeriod, + } + transport = &http3.Transport{ + QUICConfig: quicConfig, + TLSClientConfig: gotlsConfig.Clone(), + Dial: func(ctx context.Context, addr string, tlsCfg *gotls.Config, cfg *quic.Config) (quic.EarlyConnection, error) { + udpConn, dErr := dialer.DialContext(ctx, N.NetworkUDP, dest) + if dErr != nil { + return nil, dErr + } + return quic.DialEarly(ctx, bufio.NewUnbindPacketConn(udpConn), udpConn.RemoteAddr(), tlsCfg, cfg) + }, + } + case "2": + if keepAlivePeriod == 0 { + keepAlivePeriod = net.ChromeH2KeepAlivePeriod + } + if keepAlivePeriod < 0 { + keepAlivePeriod = 0 + } + transport = &http2.Transport{ + DialTLSContext: func(ctxInner context.Context, network string, addr string, cfg *gotls.Config) (net.Conn, error) { + return dialContext(ctxInner) + }, + IdleConnTimeout: net.ConnIdleTimeout, + ReadIdleTimeout: keepAlivePeriod, + } + default: + httpDialContext := func(ctxInner context.Context, network string, addr string) (net.Conn, error) { + return dialContext(ctxInner) + } + transport = &http.Transport{ + DialTLSContext: httpDialContext, + DialContext: httpDialContext, + IdleConnTimeout: net.ConnIdleTimeout, + // chunked transfer download with KeepAlives is buggy with + // http.Client and our custom dial context. + DisableKeepAlives: true, + } + } + client := &DefaultDialerClient{ + options: options, + client: &http.Client{ + Transport: transport, + }, + httpVersion: httpVersion, + uploadRawPool: &sync.Pool{}, + dialUploadConn: dialContext, + } + return client +} diff --git a/transport/v2rayxhttp/conn.go b/transport/v2rayxhttp/conn.go new file mode 100644 index 00000000..7da8e916 --- /dev/null +++ b/transport/v2rayxhttp/conn.go @@ -0,0 +1,108 @@ +package xhttp + +import ( + "bufio" + "io" + "net" + "net/http" + "sync" + "time" + + "github.com/sagernet/sing-box/common/xray/signal/done" +) + +type splitConn struct { + writer io.WriteCloser + reader io.ReadCloser + remoteAddr net.Addr + localAddr net.Addr + onClose func() +} + +func (c *splitConn) Write(b []byte) (int, error) { + return c.writer.Write(b) +} + +func (c *splitConn) Read(b []byte) (int, error) { + return c.reader.Read(b) +} + +func (c *splitConn) Close() error { + if c.onClose != nil { + c.onClose() + } + + err := c.writer.Close() + err2 := c.reader.Close() + if err != nil { + return err + } + + if err2 != nil { + return err + } + + return nil +} + +func (c *splitConn) LocalAddr() net.Addr { + return c.localAddr +} + +func (c *splitConn) RemoteAddr() net.Addr { + return c.remoteAddr +} + +func (c *splitConn) SetDeadline(t time.Time) error { + // TODO cannot do anything useful + return nil +} + +func (c *splitConn) SetReadDeadline(t time.Time) error { + // TODO cannot do anything useful + return nil +} + +func (c *splitConn) SetWriteDeadline(t time.Time) error { + // TODO cannot do anything useful + return nil +} + +type H1Conn struct { + UnreadedResponsesCount int + RespBufReader *bufio.Reader + net.Conn +} + +func NewH1Conn(conn net.Conn) *H1Conn { + return &H1Conn{ + RespBufReader: bufio.NewReader(conn), + Conn: conn, + } +} + +type httpServerConn struct { + sync.Mutex + *done.Instance + io.Reader // no need to Close request.Body + http.ResponseWriter +} + +func (c *httpServerConn) Write(b []byte) (int, error) { + c.Lock() + defer c.Unlock() + if c.Done() { + return 0, io.ErrClosedPipe + } + n, err := c.ResponseWriter.Write(b) + if err == nil { + c.ResponseWriter.(http.Flusher).Flush() + } + return n, err +} + +func (c *httpServerConn) Close() error { + c.Lock() + defer c.Unlock() + return c.Instance.Close() +} diff --git a/transport/v2rayxhttp/dialer.go b/transport/v2rayxhttp/dialer.go new file mode 100644 index 00000000..13dcdd9a --- /dev/null +++ b/transport/v2rayxhttp/dialer.go @@ -0,0 +1,192 @@ +package xhttp + +import ( + "bytes" + "context" + "fmt" + "io" + "net" + "net/http" + "net/http/httptrace" + "sync" + + "github.com/sagernet/sing-box/common/xray" + "github.com/sagernet/sing-box/common/xray/signal/done" + "github.com/sagernet/sing-box/option" +) + +// interface to abstract between use of browser dialer, vs net/http +type DialerClient interface { + IsClosed() bool + + // ctx, url, body, uploadOnly + OpenStream(context.Context, string, io.Reader, bool) (io.ReadCloser, net.Addr, net.Addr, error) + + // ctx, url, body, contentLength + PostPacket(context.Context, string, io.Reader, int64) error +} + +// implements xhttp.DialerClient in terms of direct network connections +type DefaultDialerClient struct { + options *option.V2RayXHTTPBaseOptions + client *http.Client + closed bool + httpVersion string + // pool of net.Conn, created using dialUploadConn + uploadRawPool *sync.Pool + dialUploadConn func(ctxInner context.Context) (net.Conn, error) +} + +func (c *DefaultDialerClient) IsClosed() bool { + return c.closed +} + +func (c *DefaultDialerClient) OpenStream(ctx context.Context, url string, body io.Reader, uploadOnly bool) (wrc io.ReadCloser, remoteAddr, localAddr net.Addr, err error) { + // this is done when the TCP/UDP connection to the server was established, + // and we can unblock the Dial function and print correct net addresses in + // logs + gotConn := done.New() + ctx = httptrace.WithClientTrace(ctx, &httptrace.ClientTrace{ + GotConn: func(connInfo httptrace.GotConnInfo) { + remoteAddr = connInfo.Conn.RemoteAddr() + localAddr = connInfo.Conn.LocalAddr() + gotConn.Close() + }, + }) + method := "GET" // stream-down + if body != nil { + method = "POST" // stream-up/one + } + req, _ := http.NewRequestWithContext(context.WithoutCancel(ctx), method, url, body) + req.Header = c.options.GetRequestHeader(url) + if method == "POST" && !c.options.NoGRPCHeader { + req.Header.Set("Content-Type", "application/grpc") + } + wrc = &WaitReadCloser{Wait: make(chan struct{})} + go func() { + resp, err := c.client.Do(req) + if err != nil { + if !uploadOnly { // stream-down is enough + c.closed = true + } + gotConn.Close() + wrc.Close() + return + } + if resp.StatusCode != 200 || uploadOnly { // stream-up + io.Copy(io.Discard, resp.Body) + resp.Body.Close() // if it is called immediately, the upload will be interrupted also + wrc.Close() + return + } + wrc.(*WaitReadCloser).Set(resp.Body) + }() + <-gotConn.Wait() + return +} + +func (c *DefaultDialerClient) PostPacket(ctx context.Context, url string, body io.Reader, contentLength int64) error { + req, err := http.NewRequestWithContext(context.WithoutCancel(ctx), "POST", url, body) + if err != nil { + return err + } + req.ContentLength = contentLength + req.Header = c.options.GetRequestHeader(url) + if c.httpVersion != "1.1" { + resp, err := c.client.Do(req) + if err != nil { + c.closed = true + return err + } + io.Copy(io.Discard, resp.Body) + defer resp.Body.Close() + } else { + // stringify the entire HTTP/1.1 request so it can be + // safely retried. if instead req.Write is called multiple + // times, the body is already drained after the first + // request + requestBuff := new(bytes.Buffer) + common.Must(req.Write(requestBuff)) + var uploadConn any + var h1UploadConn *H1Conn + for { + uploadConn = c.uploadRawPool.Get() + newConnection := uploadConn == nil + if newConnection { + newConn, err := c.dialUploadConn(context.WithoutCancel(ctx)) + if err != nil { + return err + } + h1UploadConn = NewH1Conn(newConn) + uploadConn = h1UploadConn + } else { + h1UploadConn = uploadConn.(*H1Conn) + + // TODO: Replace 0 here with a config value later + // Or add some other condition for optimization purposes + if h1UploadConn.UnreadedResponsesCount > 0 { + resp, err := http.ReadResponse(h1UploadConn.RespBufReader, req) + if err != nil { + c.closed = true + return fmt.Errorf("error while reading response: %s", err.Error()) + } + io.Copy(io.Discard, resp.Body) + defer resp.Body.Close() + if resp.StatusCode != 200 { + return fmt.Errorf("got non-200 error response code: %d", resp.StatusCode) + } + } + } + _, err := h1UploadConn.Write(requestBuff.Bytes()) + // if the write failed, we try another connection from + // the pool, until the write on a new connection fails. + // failed writes to a pooled connection are normal when + // the connection has been closed in the meantime. + if err == nil { + break + } else if newConnection { + return err + } + } + c.uploadRawPool.Put(uploadConn) + } + + return nil +} + +type WaitReadCloser struct { + Wait chan struct{} + io.ReadCloser +} + +func (w *WaitReadCloser) Set(rc io.ReadCloser) { + w.ReadCloser = rc + defer func() { + if recover() != nil { + rc.Close() + } + }() + close(w.Wait) +} + +func (w *WaitReadCloser) Read(b []byte) (int, error) { + if w.ReadCloser == nil { + if <-w.Wait; w.ReadCloser == nil { + return 0, io.ErrClosedPipe + } + } + return w.ReadCloser.Read(b) +} + +func (w *WaitReadCloser) Close() error { + if w.ReadCloser != nil { + return w.ReadCloser.Close() + } + defer func() { + if recover() != nil && w.ReadCloser != nil { + w.ReadCloser.Close() + } + }() + close(w.Wait) + return nil +} diff --git a/transport/v2rayxhttp/http.go b/transport/v2rayxhttp/http.go new file mode 100644 index 00000000..03a47da7 --- /dev/null +++ b/transport/v2rayxhttp/http.go @@ -0,0 +1,41 @@ +package xhttp + +import ( + "net/http" + "strings" + + "github.com/sagernet/sing-box/common/xray/net" + "github.com/sagernet/sing-box/common/xray/signal/done" +) + +type httpSession struct { + uploadQueue *uploadQueue + // for as long as the GET request is not opened by the client, this will be + // open ("undone"), and the session may be expired within a certain TTL. + // after the client connects, this becomes "done" and the session lives as + // long as the GET request. + isFullyConnected *done.Instance +} + +func parseXForwardedFor(header http.Header) []net.Address { + xff := header.Get("X-Forwarded-For") + if xff == "" { + return nil + } + list := strings.Split(xff, ",") + addrs := make([]net.Address, 0, len(list)) + for _, proxy := range list { + addrs = append(addrs, net.ParseAddress(proxy)) + } + return addrs +} + +func isValidHTTPHost(request string, config string) bool { + r := strings.ToLower(request) + c := strings.ToLower(config) + if strings.Contains(r, ":") { + h, _, _ := net.SplitHostPort(r) + return h == c + } + return r == c +} diff --git a/transport/v2rayxhttp/mux.go b/transport/v2rayxhttp/mux.go new file mode 100644 index 00000000..f753aed5 --- /dev/null +++ b/transport/v2rayxhttp/mux.go @@ -0,0 +1,104 @@ +package xhttp + +import ( + "context" + "crypto/rand" + "math" + "math/big" + "sync" + "sync/atomic" + "time" + + "github.com/sagernet/sing-box/option" +) + +type XmuxConn interface { + IsClosed() bool +} + +type XmuxClient struct { + XmuxConn XmuxConn + OpenUsage atomic.Int32 + leftUsage int32 + LeftRequests atomic.Int32 + UnreusableAt time.Time +} + +type XmuxManager struct { + options option.V2RayXHTTPXmuxOptions + concurrency int32 + connections int32 + newConnFunc func() XmuxConn + xmuxClients []*XmuxClient + mtx sync.Mutex +} + +func NewXmuxManager(options option.V2RayXHTTPXmuxOptions, newConnFunc func() XmuxConn) *XmuxManager { + return &XmuxManager{ + options: options, + concurrency: options.GetNormalizedMaxConcurrency().Rand(), + connections: options.GetNormalizedMaxConnections().Rand(), + newConnFunc: newConnFunc, + xmuxClients: make([]*XmuxClient, 0), + } +} + +func (m *XmuxManager) newXmuxClient() *XmuxClient { + xmuxClient := &XmuxClient{ + XmuxConn: m.newConnFunc(), + leftUsage: -1, + } + if x := m.options.GetNormalizedCMaxReuseTimes().Rand(); x > 0 { + xmuxClient.leftUsage = x - 1 + } + xmuxClient.LeftRequests.Store(math.MaxInt32) + if x := m.options.GetNormalizedHMaxRequestTimes().Rand(); x > 0 { + xmuxClient.LeftRequests.Store(x) + } + if x := m.options.GetNormalizedHMaxReusableSecs().Rand(); x > 0 { + xmuxClient.UnreusableAt = time.Now().Add(time.Duration(x) * time.Second) + } + m.xmuxClients = append(m.xmuxClients, xmuxClient) + return xmuxClient +} + +func (m *XmuxManager) GetXmuxClient(ctx context.Context) *XmuxClient { + m.mtx.Lock() + defer m.mtx.Unlock() + for i := 0; i < len(m.xmuxClients); { + xmuxClient := m.xmuxClients[i] + if xmuxClient.XmuxConn.IsClosed() || + xmuxClient.leftUsage == 0 || + xmuxClient.LeftRequests.Load() <= 0 || + (xmuxClient.UnreusableAt != time.Time{} && time.Now().After(xmuxClient.UnreusableAt)) { + m.xmuxClients = append(m.xmuxClients[:i], m.xmuxClients[i+1:]...) + } else { + i++ + } + } + if len(m.xmuxClients) == 0 { + return m.newXmuxClient() + } + if m.connections > 0 && len(m.xmuxClients) < int(m.connections) { + return m.newXmuxClient() + } + xmuxClients := make([]*XmuxClient, 0) + if m.concurrency > 0 { + for _, xmuxClient := range m.xmuxClients { + if xmuxClient.OpenUsage.Load() < m.concurrency { + xmuxClients = append(xmuxClients, xmuxClient) + } + } + } else { + xmuxClients = m.xmuxClients + } + if len(xmuxClients) == 0 { + return m.newXmuxClient() + } + i, _ := rand.Int(rand.Reader, big.NewInt(int64(len(xmuxClients)))) + xmuxClient := xmuxClients[i.Int64()] + if xmuxClient.leftUsage > 0 { + xmuxClient.leftUsage -= 1 + } + return xmuxClient +} diff --git a/transport/v2rayxhttp/server.go b/transport/v2rayxhttp/server.go new file mode 100644 index 00000000..1454539c --- /dev/null +++ b/transport/v2rayxhttp/server.go @@ -0,0 +1,354 @@ +package xhttp + +import ( + "bytes" + "context" + "io" + "net" + "net/http" + "net/url" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/sagernet/quic-go" + "github.com/sagernet/quic-go/http3" + "github.com/sagernet/sing-box/adapter" + "github.com/sagernet/sing-box/common/tls" + C "github.com/sagernet/sing-box/constant" + "github.com/sagernet/sing-box/log" + "github.com/sagernet/sing-box/option" + qtls "github.com/sagernet/sing-quic" + + // qtls "github.com/sagernet/sing-quic" + "github.com/sagernet/sing-box/common/xray/signal/done" + "github.com/sagernet/sing/common" + "github.com/sagernet/sing/common/logger" + M "github.com/sagernet/sing/common/metadata" + N "github.com/sagernet/sing/common/network" + aTLS "github.com/sagernet/sing/common/tls" + sHttp "github.com/sagernet/sing/protocol/http" +) + +var _ adapter.V2RayServerTransport = (*Server)(nil) + +type Server struct { + ctx context.Context + logger logger.ContextLogger + tlsConfig tls.ServerConfig + quicConfig *quic.Config + handler adapter.V2RayServerTransportHandler + httpServer *http.Server + http3Server *http3.Server + localAddr net.Addr + options *option.V2RayXHTTPOptions + host string + path string + sessionMu sync.Mutex + sessions sync.Map +} + +func NewServer(ctx context.Context, logger logger.ContextLogger, options option.V2RayXHTTPOptions, tlsConfig tls.ServerConfig, handler adapter.V2RayServerTransportHandler) (*Server, error) { + server := &Server{ + ctx: ctx, + logger: logger, + tlsConfig: tlsConfig, + handler: handler, + options: &options, + host: options.Host, + path: options.GetNormalizedPath(), + } + if server.network() == N.NetworkTCP { + protocols := new(http.Protocols) + protocols.SetHTTP1(true) + protocols.SetUnencryptedHTTP2(true) + server.httpServer = &http.Server{ + Handler: server, + ReadHeaderTimeout: time.Second * 4, + MaxHeaderBytes: 8192, + Protocols: protocols, + BaseContext: func(net.Listener) context.Context { + return ctx + }, + ConnContext: func(ctx context.Context, c net.Conn) context.Context { + return log.ContextWithNewID(ctx) + }, + } + } else { + server.quicConfig = &quic.Config{ + DisablePathMTUDiscovery: !C.IsLinux && !C.IsWindows, + } + server.http3Server = &http3.Server{ + Handler: server, + } + } + return server, nil +} + +func (s *Server) ServeHTTP(writer http.ResponseWriter, request *http.Request) { + if len(s.host) > 0 && !isValidHTTPHost(request.Host, s.host) { + s.logger.ErrorContext(request.Context(), "failed to validate host, request:", request.Host, ", config:", s.host) + writer.WriteHeader(http.StatusNotFound) + return + } + if !strings.HasPrefix(request.URL.Path, s.path) { + s.logger.ErrorContext(request.Context(), "failed to validate path, request:", request.URL.Path, ", config:", s.path) + writer.WriteHeader(http.StatusNotFound) + return + } + writer.Header().Set("Access-Control-Allow-Origin", "*") + writer.Header().Set("Access-Control-Allow-Methods", "GET, POST") + writer.Header().Set("X-Padding", strings.Repeat("X", int(s.options.GetNormalizedXPaddingBytes().Rand()))) + validRange := s.options.GetNormalizedXPaddingBytes() + paddingLength := 0 + referrer := request.Header.Get("Referer") + if referrer != "" { + if referrerURL, err := url.Parse(referrer); err == nil { + // Browser dialer cannot control the host part of referrer header, so only check the query + paddingLength = len(referrerURL.Query().Get("x_padding")) + } + } else { + paddingLength = len(request.URL.Query().Get("x_padding")) + } + if int32(paddingLength) < validRange.From || int32(paddingLength) > validRange.To { + s.logger.ErrorContext(request.Context(), "invalid x_padding length:", int32(paddingLength)) + writer.WriteHeader(http.StatusBadRequest) + return + } + sessionId := "" + subpath := strings.Split(request.URL.Path[len(s.path):], "/") + if len(subpath) > 0 { + sessionId = subpath[0] + } + if sessionId == "" && s.options.Mode != "" && s.options.Mode != "auto" && s.options.Mode != "stream-one" && s.options.Mode != "stream-up" { + s.logger.ErrorContext(request.Context(), "stream-one mode is not allowed") + writer.WriteHeader(http.StatusBadRequest) + return + } + forwardedAddrs := parseXForwardedFor(request.Header) + var remoteAddr net.Addr + var err error + remoteAddr, err = net.ResolveTCPAddr("tcp", request.RemoteAddr) + if err != nil { + remoteAddr = &net.TCPAddr{ + IP: []byte{0, 0, 0, 0}, + Port: 0, + } + } + if request.ProtoMajor == 3 { + remoteAddr = &net.UDPAddr{ + IP: remoteAddr.(*net.TCPAddr).IP, + Port: remoteAddr.(*net.TCPAddr).Port, + } + } + if len(forwardedAddrs) > 0 && forwardedAddrs[0].Family().IsIP() { + remoteAddr = &net.TCPAddr{ + IP: forwardedAddrs[0].IP(), + Port: 0, + } + } + var currentSession *httpSession + if sessionId != "" { + currentSession = s.upsertSession(sessionId) + } + scMaxEachPostBytes := int(s.options.GetNormalizedScMaxEachPostBytes().To) + if request.Method == "POST" && sessionId != "" { // stream-up, packet-up + seq := "" + if len(subpath) > 1 { + seq = subpath[1] + } + if seq == "" { + if s.options.Mode != "" && s.options.Mode != "auto" && s.options.Mode != "stream-up" { + s.logger.ErrorContext(request.Context(), "stream-up mode is not allowed") + writer.WriteHeader(http.StatusBadRequest) + return + } + httpSC := &httpServerConn{ + Instance: done.New(), + Reader: request.Body, + ResponseWriter: writer, + } + err = currentSession.uploadQueue.Push(Packet{ + Reader: httpSC, + }) + if err != nil { + s.logger.InfoContext(request.Context(), err, "failed to upload (PushReader)") + writer.WriteHeader(http.StatusConflict) + } else { + writer.Header().Set("X-Accel-Buffering", "no") + writer.Header().Set("Cache-Control", "no-store") + writer.WriteHeader(http.StatusOK) + scStreamUpServerSecs := s.options.GetNormalizedScStreamUpServerSecs() + if referrer != "" && scStreamUpServerSecs.To > 0 { + go func() { + for { + _, err := httpSC.Write(bytes.Repeat([]byte{'X'}, int(s.options.GetNormalizedXPaddingBytes().Rand()))) + if err != nil { + break + } + time.Sleep(time.Duration(scStreamUpServerSecs.Rand()) * time.Second) + } + }() + } + select { + case <-request.Context().Done(): + case <-httpSC.Wait(): + } + } + httpSC.Close() + return + } + if s.options.Mode != "" && s.options.Mode != "auto" && s.options.Mode != "packet-up" { + s.logger.ErrorContext(request.Context(), "packet-up mode is not allowed") + writer.WriteHeader(http.StatusBadRequest) + return + } + payload, err := io.ReadAll(io.LimitReader(request.Body, int64(scMaxEachPostBytes)+1)) + if len(payload) > scMaxEachPostBytes { + s.logger.ErrorContext(request.Context(), "Too large upload. scMaxEachPostBytes is set to ", scMaxEachPostBytes, "but request size exceed it. Adjust scMaxEachPostBytes on the server to be at least as large as client.") + writer.WriteHeader(http.StatusRequestEntityTooLarge) + return + } + if err != nil { + s.logger.InfoContext(request.Context(), err, "failed to upload (ReadAll)") + writer.WriteHeader(http.StatusInternalServerError) + return + } + seqInt, err := strconv.ParseUint(seq, 10, 64) + if err != nil { + s.logger.InfoContext(request.Context(), err, "failed to upload (ParseUint)") + writer.WriteHeader(http.StatusInternalServerError) + return + } + err = currentSession.uploadQueue.Push(Packet{ + Payload: payload, + Seq: seqInt, + }) + if err != nil { + s.logger.InfoContext(request.Context(), err, "failed to upload (PushPayload)") + writer.WriteHeader(http.StatusInternalServerError) + return + } + writer.WriteHeader(http.StatusOK) + } else if request.Method == "GET" || sessionId == "" { // stream-down, stream-one + if sessionId != "" { + // after GET is done, the connection is finished. disable automatic + // session reaping, and handle it in defer + currentSession.isFullyConnected.Close() + defer s.sessions.Delete(sessionId) + } + // magic header instructs nginx + apache to not buffer response body + writer.Header().Set("X-Accel-Buffering", "no") + // A web-compliant header telling all middleboxes to disable caching. + // Should be able to prevent overloading the cache, or stop CDNs from + // teeing the response stream into their cache, causing slowdowns. + writer.Header().Set("Cache-Control", "no-store") + if !s.options.NoSSEHeader { + // magic header to make the HTTP middle box consider this as SSE to disable buffer + writer.Header().Set("Content-Type", "text/event-stream") + } + writer.WriteHeader(http.StatusOK) + writer.(http.Flusher).Flush() + httpSC := &httpServerConn{ + Instance: done.New(), + Reader: request.Body, + ResponseWriter: writer, + } + conn := splitConn{ + writer: httpSC, + reader: httpSC, + remoteAddr: remoteAddr, + localAddr: s.localAddr, + } + if sessionId != "" { // if not stream-one + conn.reader = currentSession.uploadQueue + } + s.handler.NewConnectionEx(request.Context(), &conn, sHttp.SourceAddress(request), M.Socksaddr{}, func(it error) {}) + // "A ResponseWriter may not be used after [Handler.ServeHTTP] has returned." + select { + case <-request.Context().Done(): + case <-httpSC.Wait(): + } + conn.Close() + } else { + s.logger.ErrorContext(request.Context(), "unsupported method: ", request.Method) + writer.WriteHeader(http.StatusMethodNotAllowed) + } +} + +func (s *Server) Network() []string { + return []string{s.network()} +} + +func (s *Server) Serve(listener net.Listener) error { + if s.network() == N.NetworkTCP { + if s.tlsConfig != nil { + listener = aTLS.NewListener(listener, s.tlsConfig) + } + s.localAddr = listener.Addr() + return s.httpServer.Serve(listener) + } + return os.ErrInvalid +} + +func (s *Server) ServePacket(listener net.PacketConn) error { + if s.network() == N.NetworkUDP { + quicListener, err := qtls.ListenEarly(listener, s.tlsConfig, s.quicConfig) + if err != nil { + return err + } + s.localAddr = quicListener.Addr() + return s.http3Server.ServeListener(quicListener) + } + return os.ErrInvalid +} + +func (s *Server) Close() error { + if s.network() == N.NetworkTCP { + return common.Close(s.httpServer) + } + return common.Close(s.http3Server) +} + +func (s *Server) network() string { + if s.tlsConfig != nil && len(s.tlsConfig.NextProtos()) == 1 && s.tlsConfig.NextProtos()[0] == "h3" { + return N.NetworkUDP + } + return N.NetworkTCP +} + +func (s *Server) upsertSession(sessionId string) *httpSession { + // fast path + currentSessionAny, ok := s.sessions.Load(sessionId) + if ok { + return currentSessionAny.(*httpSession) + } + // slow path + s.sessionMu.Lock() + defer s.sessionMu.Unlock() + currentSessionAny, ok = s.sessions.Load(sessionId) + if ok { + return currentSessionAny.(*httpSession) + } + session := &httpSession{ + uploadQueue: NewUploadQueue(s.options.GetNormalizedScMaxBufferedPosts()), + isFullyConnected: done.New(), + } + s.sessions.Store(sessionId, session) + shouldReap := done.New() + go func() { + time.Sleep(30 * time.Second) + shouldReap.Close() + }() + go func() { + select { + case <-shouldReap.Wait(): + s.sessions.Delete(sessionId) + session.uploadQueue.Close() + case <-session.isFullyConnected.Wait(): + } + }() + return session +} diff --git a/transport/v2rayxhttp/upload_queue.go b/transport/v2rayxhttp/upload_queue.go new file mode 100644 index 00000000..32259a73 --- /dev/null +++ b/transport/v2rayxhttp/upload_queue.go @@ -0,0 +1,157 @@ +package xhttp + +// upload_queue is a specialized priorityqueue + channel to reorder generic +// packets by a sequence number + +import ( + "container/heap" + "io" + "runtime" + "sync" + + E "github.com/sagernet/sing/common/exceptions" +) + +type Packet struct { + Reader io.ReadCloser + Payload []byte + Seq uint64 +} + +type uploadQueue struct { + reader io.ReadCloser + nomore bool + pushedPackets chan Packet + writeCloseMutex sync.Mutex + heap uploadHeap + nextSeq uint64 + closed bool + maxPackets int +} + +func NewUploadQueue(maxPackets int) *uploadQueue { + return &uploadQueue{ + pushedPackets: make(chan Packet, maxPackets), + heap: uploadHeap{}, + nextSeq: 0, + closed: false, + maxPackets: maxPackets, + } +} + +func (h *uploadQueue) Push(p Packet) error { + h.writeCloseMutex.Lock() + defer h.writeCloseMutex.Unlock() + if h.closed { + return E.New("packet queue closed") + } + if h.nomore { + return E.New("h.reader already exists") + } + if p.Reader != nil { + h.nomore = true + } + h.pushedPackets <- p + return nil +} + +func (h *uploadQueue) Close() error { + h.writeCloseMutex.Lock() + defer h.writeCloseMutex.Unlock() + if !h.closed { + h.closed = true + runtime.Gosched() // hope Read() gets the packet + f: + for { + select { + case p := <-h.pushedPackets: + if p.Reader != nil { + h.reader = p.Reader + } + default: + break f + } + } + close(h.pushedPackets) + } + if h.reader != nil { + return h.reader.Close() + } + return nil +} + +func (h *uploadQueue) Read(b []byte) (int, error) { + if h.reader != nil { + return h.reader.Read(b) + } + if h.closed { + return 0, io.EOF + } + if len(h.heap) == 0 { + packet, more := <-h.pushedPackets + if !more { + return 0, io.EOF + } + if packet.Reader != nil { + h.reader = packet.Reader + return h.reader.Read(b) + } + heap.Push(&h.heap, packet) + } + for len(h.heap) > 0 { + packet := heap.Pop(&h.heap).(Packet) + n := 0 + + if packet.Seq == h.nextSeq { + copy(b, packet.Payload) + n = min(len(b), len(packet.Payload)) + + if n < len(packet.Payload) { + // partial read + packet.Payload = packet.Payload[n:] + heap.Push(&h.heap, packet) + } else { + h.nextSeq = packet.Seq + 1 + } + + return n, nil + } + // misordered packet + if packet.Seq > h.nextSeq { + if len(h.heap) > h.maxPackets { + // the "reassembly buffer" is too large, and we want to + // constrain memory usage somehow. let's tear down the + // connection, and hope the application retries. + return 0, E.New("packet queue is too large") + } + heap.Push(&h.heap, packet) + packet2, more := <-h.pushedPackets + if !more { + return 0, io.EOF + } + heap.Push(&h.heap, packet2) + } + } + return 0, nil +} + +// heap code directly taken from https://pkg.go.dev/container/heap +type uploadHeap []Packet + +func (h uploadHeap) Len() int { return len(h) } +func (h uploadHeap) Less(i, j int) bool { return h[i].Seq < h[j].Seq } +func (h uploadHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *uploadHeap) Push(x any) { + // Push and Pop use pointer receivers because they modify the slice's length, + // not just its contents. + *h = append(*h, x.(Packet)) +} + +func (h *uploadHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/transport/v2rayxhttp/writer.go b/transport/v2rayxhttp/writer.go new file mode 100644 index 00000000..e39139e3 --- /dev/null +++ b/transport/v2rayxhttp/writer.go @@ -0,0 +1,38 @@ +package xhttp + +import ( + "github.com/sagernet/sing-box/common/xray/buf" + "github.com/sagernet/sing-box/common/xray/pipe" +) + +// A wrapper around pipe that ensures the size limit is exactly honored. +// +// The MultiBuffer pipe accepts any single WriteMultiBuffer call even if that +// single MultiBuffer exceeds the size limit, and then starts blocking on the +// next WriteMultiBuffer call. This means that ReadMultiBuffer can return more +// bytes than the size limit. We work around this by splitting a potentially +// too large write up into multiple. +type uploadWriter struct { + *pipe.Writer + maxLen int32 +} + +func (w uploadWriter) Write(b []byte) (int, error) { + /* + capacity := int(w.maxLen - w.Len()) + if capacity > 0 && capacity < len(b) { + b = b[:capacity] + } + */ + buffer := buf.New() + n, err := buffer.Write(b) + if err != nil { + return 0, err + } + + err = w.WriteMultiBuffer([]*buf.Buffer{buffer}) + if err != nil { + return 0, err + } + return n, nil +}