From 287fe834db61cb63b88280c04be03c966b414251 Mon Sep 17 00:00:00 2001 From: Sergei Maklagin Date: Thu, 11 Dec 2025 02:46:57 +0300 Subject: [PATCH] Update XHTTP --- common/xray/buf/buffer.go | 21 ++++++++++++---- common/xray/buf/io.go | 1 + common/xray/buf/multi_buffer.go | 2 +- common/xray/buf/writer.go | 20 ++++++++++++--- common/xray/common.go | 13 ++++++++++ common/xray/crypto/crypto.go | 3 +++ common/xray/pipe/impl.go | 13 ++++++---- common/xray/signal/timer.go | 43 ++++++++++++++++++--------------- transport/v2rayxhttp/writer.go | 21 +++++++++------- 9 files changed, 94 insertions(+), 43 deletions(-) diff --git a/common/xray/buf/buffer.go b/common/xray/buf/buffer.go index 1981fa5c..38bbc9c4 100644 --- a/common/xray/buf/buffer.go +++ b/common/xray/buf/buffer.go @@ -13,7 +13,7 @@ const ( Size = 8192 ) -var zero = [Size * 10]byte{0} +var ErrBufferFull = E.New("buffer is full") var pool = bytespool.GetPool(Size) @@ -144,7 +144,7 @@ func (b *Buffer) Bytes() []byte { } // Extend increases the buffer size by n bytes, and returns the extended part. -// It panics if result size is larger than buf.Size. +// It panics if result size is larger than size of this buffer. func (b *Buffer) Extend(n int32) []byte { end := b.end + n if end > int32(len(b.v)) { @@ -152,7 +152,7 @@ func (b *Buffer) Extend(n int32) []byte { } ext := b.v[b.end:end] b.end = end - copy(ext, zero[:]) + clear(ext) return ext } @@ -215,7 +215,7 @@ func (b *Buffer) Resize(from, to int32) { b.start += from b.Check() if b.end > oldEnd { - copy(b.v[oldEnd:b.end], zero[:]) + clear(b.v[oldEnd:b.end]) } } @@ -244,6 +244,14 @@ func (b *Buffer) Cap() int32 { return int32(len(b.v)) } +// Available returns the available capacity of the buffer content. +func (b *Buffer) Available() int32 { + if b == nil { + return 0 + } + return int32(len(b.v)) - b.end +} + // IsEmpty returns true if the buffer is empty. func (b *Buffer) IsEmpty() bool { return b.Len() == 0 @@ -258,13 +266,16 @@ func (b *Buffer) IsFull() bool { func (b *Buffer) Write(data []byte) (int, error) { nBytes := copy(b.v[b.end:], data) b.end += int32(nBytes) + if nBytes < len(data) { + return nBytes, ErrBufferFull + } return nBytes, nil } // WriteByte writes a single byte into the buffer. func (b *Buffer) WriteByte(v byte) error { if b.IsFull() { - return E.New("buffer full") + return ErrBufferFull } b.v[b.end] = v b.end++ diff --git a/common/xray/buf/io.go b/common/xray/buf/io.go index a745c1b5..5b7826e0 100644 --- a/common/xray/buf/io.go +++ b/common/xray/buf/io.go @@ -22,6 +22,7 @@ var ErrReadTimeout = E.New("IO timeout") // TimeoutReader is a reader that returns error if Read() operation takes longer than the given timeout. type TimeoutReader interface { + Reader ReadMultiBufferTimeout(time.Duration) (MultiBuffer, error) } diff --git a/common/xray/buf/multi_buffer.go b/common/xray/buf/multi_buffer.go index 4d00843a..44dd499c 100644 --- a/common/xray/buf/multi_buffer.go +++ b/common/xray/buf/multi_buffer.go @@ -144,7 +144,7 @@ func Compact(mb MultiBuffer) MultiBuffer { for i := 1; i < len(mb); i++ { curr := mb[i] - if last.Len()+curr.Len() > Size { + if curr.Len() > last.Available() { mb2 = append(mb2, last) last = curr } else { diff --git a/common/xray/buf/writer.go b/common/xray/buf/writer.go index 96aea75f..c76316c5 100644 --- a/common/xray/buf/writer.go +++ b/common/xray/buf/writer.go @@ -75,9 +75,10 @@ func (w *BufferToBytesWriter) ReadFrom(reader io.Reader) (int64, error) { // BufferedWriter is a Writer with internal buffer. type BufferedWriter struct { sync.Mutex - writer Writer - buffer *Buffer - buffered bool + writer Writer + buffer *Buffer + buffered bool + flushNext bool } // NewBufferedWriter creates a new BufferedWriter. @@ -161,6 +162,12 @@ func (w *BufferedWriter) WriteMultiBuffer(b MultiBuffer) error { } } + if w.flushNext { + w.buffered = false + w.flushNext = false + return w.flushInternal() + } + return nil } @@ -201,6 +208,13 @@ func (w *BufferedWriter) SetBuffered(f bool) error { return nil } +// SetFlushNext will wait the next WriteMultiBuffer to flush and set buffered = false +func (w *BufferedWriter) SetFlushNext() { + w.Lock() + defer w.Unlock() + w.flushNext = true +} + // ReadFrom implements io.ReaderFrom. func (w *BufferedWriter) ReadFrom(reader io.Reader) (int64, error) { if err := w.SetBuffered(false); err != nil { diff --git a/common/xray/common.go b/common/xray/common.go index 8d5c858d..266f57ef 100644 --- a/common/xray/common.go +++ b/common/xray/common.go @@ -1,5 +1,7 @@ package common +import "reflect" + // Must panics if err is not nil. func Must(err error) { if err != nil { @@ -17,3 +19,14 @@ func Must2(v interface{}, err error) interface{} { func Error2(v interface{}, err error) error { return err } + +// CloseIfExists call obj.Close() if obj is not nil. +func CloseIfExists(obj any) error { + if obj != nil { + v := reflect.ValueOf(obj) + if !v.IsNil() { + return Close(obj) + } + } + return nil +} diff --git a/common/xray/crypto/crypto.go b/common/xray/crypto/crypto.go index 24c5adb8..4356d412 100644 --- a/common/xray/crypto/crypto.go +++ b/common/xray/crypto/crypto.go @@ -9,6 +9,9 @@ func RandBetween(from int64, to int64) int64 { if from == to { return from } + if from > to { + from, to = to, from + } bigInt, _ := rand.Int(rand.Reader, big.NewInt(to-from)) return from + bigInt.Int64() } diff --git a/common/xray/pipe/impl.go b/common/xray/pipe/impl.go index f36d4d36..b24305e4 100644 --- a/common/xray/pipe/impl.go +++ b/common/xray/pipe/impl.go @@ -200,16 +200,19 @@ func (p *pipe) Interrupt() { p.Lock() defer p.Unlock() + if !p.data.IsEmpty() { + buf.ReleaseMulti(p.data) + p.data = nil + if p.state == closed { + p.state = errord + } + } + if p.state == closed || p.state == errord { return } p.state = errord - if !p.data.IsEmpty() { - buf.ReleaseMulti(p.data) - p.data = nil - } - common.Must(p.done.Close()) } diff --git a/common/xray/signal/timer.go b/common/xray/signal/timer.go index 822350a1..52961bf5 100644 --- a/common/xray/signal/timer.go +++ b/common/xray/signal/timer.go @@ -3,6 +3,7 @@ package signal import ( "context" "sync" + "sync/atomic" "time" "github.com/sagernet/sing-box/common/xray" @@ -14,10 +15,12 @@ type ActivityUpdater interface { } type ActivityTimer struct { - sync.RWMutex + mu sync.RWMutex updated chan struct{} checkTask *task.Periodic onTimeout func() + consumed atomic.Bool + once sync.Once } func (t *ActivityTimer) Update() { @@ -37,39 +40,39 @@ func (t *ActivityTimer) check() error { } func (t *ActivityTimer) finish() { - t.Lock() - defer t.Unlock() + t.once.Do(func() { + t.consumed.Store(true) + t.mu.Lock() + defer t.mu.Unlock() - if t.onTimeout != nil { + common.CloseIfExists(t.checkTask) t.onTimeout() - t.onTimeout = nil - } - if t.checkTask != nil { - t.checkTask.Close() - t.checkTask = nil - } + }) } func (t *ActivityTimer) SetTimeout(timeout time.Duration) { + if t.consumed.Load() { + return + } if timeout == 0 { t.finish() return } - checkTask := &task.Periodic{ + t.mu.Lock() + defer t.mu.Unlock() + // double check, just in case + if t.consumed.Load() { + return + } + newCheckTask := &task.Periodic{ Interval: timeout, Execute: t.check, } - - t.Lock() - - if t.checkTask != nil { - t.checkTask.Close() - } - t.checkTask = checkTask - t.Unlock() + common.CloseIfExists(t.checkTask) + t.checkTask = newCheckTask t.Update() - common.Must(checkTask.Start()) + common.Must(newCheckTask.Start()) } func CancelAfterInactivity(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) *ActivityTimer { diff --git a/transport/v2rayxhttp/writer.go b/transport/v2rayxhttp/writer.go index e39139e3..3c11e3b3 100644 --- a/transport/v2rayxhttp/writer.go +++ b/transport/v2rayxhttp/writer.go @@ -1,6 +1,7 @@ package xhttp import ( + common "github.com/sagernet/sing-box/common/xray" "github.com/sagernet/sing-box/common/xray/buf" "github.com/sagernet/sing-box/common/xray/pipe" ) @@ -24,15 +25,17 @@ func (w uploadWriter) Write(b []byte) (int, error) { b = b[:capacity] } */ - buffer := buf.New() - n, err := buffer.Write(b) - if err != nil { - return 0, err - } - err = w.WriteMultiBuffer([]*buf.Buffer{buffer}) - if err != nil { - return 0, err + buffer := buf.MultiBufferContainer{} + common.Must2(buffer.Write(b)) + + var writed int + for _, buff := range buffer.MultiBuffer { + err := w.WriteMultiBuffer(buf.MultiBuffer{buff}) + if err != nil { + return writed, err + } + writed += int(buff.Len()) } - return n, nil + return writed, nil }