mirror of
https://github.com/shtorm-7/sing-box-extended.git
synced 2026-05-14 00:51:12 +03:00
Update sing-box core
This commit is contained in:
@@ -283,6 +283,9 @@ func (c *Client) Exchange(ctx context.Context, transport adapter.DNSTransport, m
|
||||
if timeToLive == 0 {
|
||||
for _, recordList := range [][]dns.RR{response.Answer, response.Ns, response.Extra} {
|
||||
for _, record := range recordList {
|
||||
if record.Header().Rrtype == dns.TypeOPT {
|
||||
continue
|
||||
}
|
||||
if timeToLive == 0 || record.Header().Ttl > 0 && record.Header().Ttl < timeToLive {
|
||||
timeToLive = record.Header().Ttl
|
||||
}
|
||||
@@ -294,6 +297,9 @@ func (c *Client) Exchange(ctx context.Context, transport adapter.DNSTransport, m
|
||||
}
|
||||
for _, recordList := range [][]dns.RR{response.Answer, response.Ns, response.Extra} {
|
||||
for _, record := range recordList {
|
||||
if record.Header().Rrtype == dns.TypeOPT {
|
||||
continue
|
||||
}
|
||||
record.Header().Ttl = timeToLive
|
||||
}
|
||||
}
|
||||
@@ -324,16 +330,20 @@ func (c *Client) Lookup(ctx context.Context, transport adapter.DNSTransport, dom
|
||||
} else {
|
||||
strategy = options.Strategy
|
||||
}
|
||||
lookupOptions := options
|
||||
if options.LookupStrategy != C.DomainStrategyAsIS {
|
||||
lookupOptions.Strategy = strategy
|
||||
}
|
||||
if strategy == C.DomainStrategyIPv4Only {
|
||||
return c.lookupToExchange(ctx, transport, dnsName, dns.TypeA, options, responseChecker)
|
||||
return c.lookupToExchange(ctx, transport, dnsName, dns.TypeA, lookupOptions, responseChecker)
|
||||
} else if strategy == C.DomainStrategyIPv6Only {
|
||||
return c.lookupToExchange(ctx, transport, dnsName, dns.TypeAAAA, options, responseChecker)
|
||||
return c.lookupToExchange(ctx, transport, dnsName, dns.TypeAAAA, lookupOptions, responseChecker)
|
||||
}
|
||||
var response4 []netip.Addr
|
||||
var response6 []netip.Addr
|
||||
var group task.Group
|
||||
group.Append("exchange4", func(ctx context.Context) error {
|
||||
response, err := c.lookupToExchange(ctx, transport, dnsName, dns.TypeA, options, responseChecker)
|
||||
response, err := c.lookupToExchange(ctx, transport, dnsName, dns.TypeA, lookupOptions, responseChecker)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -341,7 +351,7 @@ func (c *Client) Lookup(ctx context.Context, transport adapter.DNSTransport, dom
|
||||
return nil
|
||||
})
|
||||
group.Append("exchange6", func(ctx context.Context) error {
|
||||
response, err := c.lookupToExchange(ctx, transport, dnsName, dns.TypeAAAA, options, responseChecker)
|
||||
response, err := c.lookupToExchange(ctx, transport, dnsName, dns.TypeAAAA, lookupOptions, responseChecker)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -377,21 +387,21 @@ func (c *Client) storeCache(transport adapter.DNSTransport, question dns.Questio
|
||||
}
|
||||
if c.disableExpire {
|
||||
if !c.independentCache {
|
||||
c.cache.Add(question, message)
|
||||
c.cache.Add(question, message.Copy())
|
||||
} else {
|
||||
c.transportCache.Add(transportCacheKey{
|
||||
Question: question,
|
||||
transportTag: transport.Tag(),
|
||||
}, message)
|
||||
}, message.Copy())
|
||||
}
|
||||
} else {
|
||||
if !c.independentCache {
|
||||
c.cache.AddWithLifetime(question, message, time.Second*time.Duration(timeToLive))
|
||||
c.cache.AddWithLifetime(question, message.Copy(), time.Second*time.Duration(timeToLive))
|
||||
} else {
|
||||
c.transportCache.AddWithLifetime(transportCacheKey{
|
||||
Question: question,
|
||||
transportTag: transport.Tag(),
|
||||
}, message, time.Second*time.Duration(timeToLive))
|
||||
}, message.Copy(), time.Second*time.Duration(timeToLive))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -482,6 +492,9 @@ func (c *Client) loadResponse(question dns.Question, transport adapter.DNSTransp
|
||||
var originTTL int
|
||||
for _, recordList := range [][]dns.RR{response.Answer, response.Ns, response.Extra} {
|
||||
for _, record := range recordList {
|
||||
if record.Header().Rrtype == dns.TypeOPT {
|
||||
continue
|
||||
}
|
||||
if originTTL == 0 || record.Header().Ttl > 0 && int(record.Header().Ttl) < originTTL {
|
||||
originTTL = int(record.Header().Ttl)
|
||||
}
|
||||
@@ -496,12 +509,18 @@ func (c *Client) loadResponse(question dns.Question, transport adapter.DNSTransp
|
||||
duration := uint32(originTTL - nowTTL)
|
||||
for _, recordList := range [][]dns.RR{response.Answer, response.Ns, response.Extra} {
|
||||
for _, record := range recordList {
|
||||
if record.Header().Rrtype == dns.TypeOPT {
|
||||
continue
|
||||
}
|
||||
record.Header().Ttl = record.Header().Ttl - duration
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for _, recordList := range [][]dns.RR{response.Answer, response.Ns, response.Extra} {
|
||||
for _, record := range recordList {
|
||||
if record.Header().Rrtype == dns.TypeOPT {
|
||||
continue
|
||||
}
|
||||
record.Header().Ttl = uint32(nowTTL)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,7 +195,16 @@ func (r *Router) matchDNS(ctx context.Context, allowFakeIP bool, ruleIndex int,
|
||||
}
|
||||
}
|
||||
}
|
||||
return r.transport.Default(), nil, -1
|
||||
transport := r.transport.Default()
|
||||
if legacyTransport, isLegacy := transport.(adapter.LegacyDNSTransport); isLegacy {
|
||||
if options.Strategy == C.DomainStrategyAsIS {
|
||||
options.Strategy = legacyTransport.LegacyStrategy()
|
||||
}
|
||||
if !options.ClientSubnet.IsValid() {
|
||||
options.ClientSubnet = legacyTransport.LegacyClientSubnet()
|
||||
}
|
||||
}
|
||||
return transport, nil, -1
|
||||
}
|
||||
|
||||
func (r *Router) Exchange(ctx context.Context, message *mDNS.Msg, options adapter.DNSQueryOptions) (*mDNS.Msg, error) {
|
||||
@@ -345,7 +354,7 @@ func (r *Router) Lookup(ctx context.Context, domain string, options adapter.DNSQ
|
||||
transport := options.Transport
|
||||
if legacyTransport, isLegacy := transport.(adapter.LegacyDNSTransport); isLegacy {
|
||||
if options.Strategy == C.DomainStrategyAsIS {
|
||||
options.Strategy = r.defaultDomainStrategy
|
||||
options.Strategy = legacyTransport.LegacyStrategy()
|
||||
}
|
||||
if !options.ClientSubnet.IsValid() {
|
||||
options.ClientSubnet = legacyTransport.LegacyClientSubnet()
|
||||
|
||||
@@ -1,145 +0,0 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
C "github.com/sagernet/sing-box/constant"
|
||||
"github.com/sagernet/sing-box/dns"
|
||||
E "github.com/sagernet/sing/common/exceptions"
|
||||
"github.com/sagernet/sing/common/logger"
|
||||
)
|
||||
|
||||
type TransportState int
|
||||
|
||||
const (
|
||||
StateNew TransportState = iota
|
||||
StateStarted
|
||||
StateClosing
|
||||
StateClosed
|
||||
)
|
||||
|
||||
var (
|
||||
ErrTransportClosed = os.ErrClosed
|
||||
ErrConnectionReset = E.New("connection reset")
|
||||
)
|
||||
|
||||
type BaseTransport struct {
|
||||
dns.TransportAdapter
|
||||
Logger logger.ContextLogger
|
||||
|
||||
mutex sync.Mutex
|
||||
state TransportState
|
||||
inFlight int32
|
||||
queriesComplete chan struct{}
|
||||
closeCtx context.Context
|
||||
closeCancel context.CancelFunc
|
||||
}
|
||||
|
||||
func NewBaseTransport(adapter dns.TransportAdapter, logger logger.ContextLogger) *BaseTransport {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &BaseTransport{
|
||||
TransportAdapter: adapter,
|
||||
Logger: logger,
|
||||
state: StateNew,
|
||||
closeCtx: ctx,
|
||||
closeCancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *BaseTransport) State() TransportState {
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
return t.state
|
||||
}
|
||||
|
||||
func (t *BaseTransport) SetStarted() error {
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
switch t.state {
|
||||
case StateNew:
|
||||
t.state = StateStarted
|
||||
return nil
|
||||
case StateStarted:
|
||||
return nil
|
||||
default:
|
||||
return ErrTransportClosed
|
||||
}
|
||||
}
|
||||
|
||||
func (t *BaseTransport) BeginQuery() bool {
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
if t.state != StateStarted {
|
||||
return false
|
||||
}
|
||||
t.inFlight++
|
||||
return true
|
||||
}
|
||||
|
||||
func (t *BaseTransport) EndQuery() {
|
||||
t.mutex.Lock()
|
||||
if t.inFlight > 0 {
|
||||
t.inFlight--
|
||||
}
|
||||
if t.inFlight == 0 && t.queriesComplete != nil {
|
||||
close(t.queriesComplete)
|
||||
t.queriesComplete = nil
|
||||
}
|
||||
t.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (t *BaseTransport) CloseContext() context.Context {
|
||||
return t.closeCtx
|
||||
}
|
||||
|
||||
func (t *BaseTransport) Shutdown(ctx context.Context) error {
|
||||
t.mutex.Lock()
|
||||
|
||||
if t.state >= StateClosing {
|
||||
t.mutex.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
if t.state == StateNew {
|
||||
t.state = StateClosed
|
||||
t.mutex.Unlock()
|
||||
t.closeCancel()
|
||||
return nil
|
||||
}
|
||||
|
||||
t.state = StateClosing
|
||||
|
||||
if t.inFlight == 0 {
|
||||
t.state = StateClosed
|
||||
t.mutex.Unlock()
|
||||
t.closeCancel()
|
||||
return nil
|
||||
}
|
||||
|
||||
t.queriesComplete = make(chan struct{})
|
||||
queriesComplete := t.queriesComplete
|
||||
t.mutex.Unlock()
|
||||
|
||||
t.closeCancel()
|
||||
|
||||
select {
|
||||
case <-queriesComplete:
|
||||
t.mutex.Lock()
|
||||
t.state = StateClosed
|
||||
t.mutex.Unlock()
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
t.mutex.Lock()
|
||||
t.state = StateClosed
|
||||
t.mutex.Unlock()
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (t *BaseTransport) Close() error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), C.TCPTimeout)
|
||||
defer cancel()
|
||||
return t.Shutdown(ctx)
|
||||
}
|
||||
547
dns/transport/conn_pool.go
Normal file
547
dns/transport/conn_pool.go
Normal file
@@ -0,0 +1,547 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/sagernet/sing/common/x/list"
|
||||
)
|
||||
|
||||
type ConnPoolMode int
|
||||
|
||||
const (
|
||||
ConnPoolSingle ConnPoolMode = iota
|
||||
ConnPoolOrdered
|
||||
)
|
||||
|
||||
type ConnPoolOptions[T comparable] struct {
|
||||
Mode ConnPoolMode
|
||||
IsAlive func(T) bool
|
||||
Close func(T, error)
|
||||
}
|
||||
|
||||
type ConnPool[T comparable] struct {
|
||||
options ConnPoolOptions[T]
|
||||
|
||||
access sync.Mutex
|
||||
closed bool
|
||||
state *connPoolState[T]
|
||||
}
|
||||
|
||||
type connPoolState[T comparable] struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelCauseFunc
|
||||
|
||||
all map[T]struct{}
|
||||
|
||||
idle list.List[T]
|
||||
idleElements map[T]*list.Element[T]
|
||||
|
||||
shared T
|
||||
hasShared bool
|
||||
sharedClaimed bool
|
||||
sharedCtx context.Context
|
||||
sharedCancel context.CancelCauseFunc
|
||||
|
||||
connecting *connPoolConnect[T]
|
||||
}
|
||||
|
||||
type connPoolConnect[T comparable] struct {
|
||||
done chan struct{}
|
||||
err error
|
||||
}
|
||||
|
||||
type connPoolDialContext struct {
|
||||
context.Context
|
||||
parent context.Context
|
||||
}
|
||||
|
||||
func (c connPoolDialContext) Deadline() (time.Time, bool) {
|
||||
return c.parent.Deadline()
|
||||
}
|
||||
|
||||
func (c connPoolDialContext) Value(key any) any {
|
||||
return c.parent.Value(key)
|
||||
}
|
||||
|
||||
func NewConnPool[T comparable](options ConnPoolOptions[T]) *ConnPool[T] {
|
||||
return &ConnPool[T]{
|
||||
options: options,
|
||||
state: newConnPoolState[T](options.Mode),
|
||||
}
|
||||
}
|
||||
|
||||
func newConnPoolState[T comparable](mode ConnPoolMode) *connPoolState[T] {
|
||||
ctx, cancel := context.WithCancelCause(context.Background())
|
||||
state := &connPoolState[T]{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
all: make(map[T]struct{}),
|
||||
}
|
||||
if mode == ConnPoolOrdered {
|
||||
state.idleElements = make(map[T]*list.Element[T])
|
||||
}
|
||||
return state
|
||||
}
|
||||
|
||||
func (p *ConnPool[T]) Acquire(ctx context.Context, dial func(context.Context) (T, error)) (T, bool, error) {
|
||||
switch p.options.Mode {
|
||||
case ConnPoolSingle:
|
||||
conn, _, created, err := p.acquireShared(ctx, dial)
|
||||
return conn, created, err
|
||||
case ConnPoolOrdered:
|
||||
return p.acquireOrdered(ctx, dial)
|
||||
default:
|
||||
var zero T
|
||||
return zero, false, net.ErrClosed
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ConnPool[T]) AcquireShared(ctx context.Context, dial func(context.Context) (T, error)) (T, context.Context, bool, error) {
|
||||
if p.options.Mode != ConnPoolSingle {
|
||||
var zero T
|
||||
return zero, nil, false, net.ErrClosed
|
||||
}
|
||||
return p.acquireShared(ctx, dial)
|
||||
}
|
||||
|
||||
func (p *ConnPool[T]) Release(conn T, reuse bool) {
|
||||
var (
|
||||
closeConn bool
|
||||
closeErr error
|
||||
)
|
||||
|
||||
p.access.Lock()
|
||||
if p.closed || p.state == nil {
|
||||
closeConn = true
|
||||
closeErr = net.ErrClosed
|
||||
p.access.Unlock()
|
||||
if closeConn {
|
||||
p.options.Close(conn, closeErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
currentState := p.state
|
||||
_, tracked := currentState.all[conn]
|
||||
if !tracked {
|
||||
closeConn = true
|
||||
closeErr = p.closeCause(currentState)
|
||||
p.access.Unlock()
|
||||
if closeConn {
|
||||
p.options.Close(conn, closeErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if !reuse || !p.options.IsAlive(conn) {
|
||||
delete(currentState.all, conn)
|
||||
switch p.options.Mode {
|
||||
case ConnPoolSingle:
|
||||
if currentState.hasShared && currentState.shared == conn {
|
||||
var zero T
|
||||
currentState.shared = zero
|
||||
currentState.hasShared = false
|
||||
currentState.sharedClaimed = false
|
||||
currentState.sharedCtx = nil
|
||||
if currentState.sharedCancel != nil {
|
||||
currentState.sharedCancel(net.ErrClosed)
|
||||
currentState.sharedCancel = nil
|
||||
}
|
||||
}
|
||||
case ConnPoolOrdered:
|
||||
if element, loaded := currentState.idleElements[conn]; loaded {
|
||||
currentState.idle.Remove(element)
|
||||
delete(currentState.idleElements, conn)
|
||||
}
|
||||
}
|
||||
closeConn = true
|
||||
closeErr = net.ErrClosed
|
||||
p.access.Unlock()
|
||||
if closeConn {
|
||||
p.options.Close(conn, closeErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if p.options.Mode == ConnPoolOrdered {
|
||||
if _, loaded := currentState.idleElements[conn]; !loaded {
|
||||
currentState.idleElements[conn] = currentState.idle.PushBack(conn)
|
||||
}
|
||||
}
|
||||
p.access.Unlock()
|
||||
}
|
||||
|
||||
func (p *ConnPool[T]) Invalidate(conn T, cause error) {
|
||||
p.access.Lock()
|
||||
if p.closed || p.state == nil {
|
||||
p.access.Unlock()
|
||||
p.options.Close(conn, cause)
|
||||
return
|
||||
}
|
||||
|
||||
currentState := p.state
|
||||
_, tracked := currentState.all[conn]
|
||||
if !tracked {
|
||||
p.access.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
delete(currentState.all, conn)
|
||||
switch p.options.Mode {
|
||||
case ConnPoolSingle:
|
||||
if currentState.hasShared && currentState.shared == conn {
|
||||
var zero T
|
||||
currentState.shared = zero
|
||||
currentState.hasShared = false
|
||||
currentState.sharedClaimed = false
|
||||
currentState.sharedCtx = nil
|
||||
if currentState.sharedCancel != nil {
|
||||
currentState.sharedCancel(cause)
|
||||
currentState.sharedCancel = nil
|
||||
}
|
||||
}
|
||||
case ConnPoolOrdered:
|
||||
if element, loaded := currentState.idleElements[conn]; loaded {
|
||||
currentState.idle.Remove(element)
|
||||
delete(currentState.idleElements, conn)
|
||||
}
|
||||
}
|
||||
p.access.Unlock()
|
||||
|
||||
p.options.Close(conn, cause)
|
||||
}
|
||||
|
||||
func (p *ConnPool[T]) Reset() {
|
||||
p.access.Lock()
|
||||
if p.closed {
|
||||
p.access.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
oldState := p.state
|
||||
p.state = newConnPoolState[T](p.options.Mode)
|
||||
p.access.Unlock()
|
||||
|
||||
p.closeState(oldState, net.ErrClosed)
|
||||
}
|
||||
|
||||
func (p *ConnPool[T]) Close() error {
|
||||
p.access.Lock()
|
||||
if p.closed {
|
||||
p.access.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
p.closed = true
|
||||
oldState := p.state
|
||||
p.state = nil
|
||||
p.access.Unlock()
|
||||
|
||||
p.closeState(oldState, net.ErrClosed)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *ConnPool[T]) acquireOrdered(ctx context.Context, dial func(context.Context) (T, error)) (T, bool, error) {
|
||||
var zero T
|
||||
for {
|
||||
var (
|
||||
staleConn T
|
||||
hasStale bool
|
||||
)
|
||||
|
||||
p.access.Lock()
|
||||
if p.closed {
|
||||
p.access.Unlock()
|
||||
return zero, false, net.ErrClosed
|
||||
}
|
||||
|
||||
currentState := p.state
|
||||
if element := currentState.idle.Front(); element != nil {
|
||||
conn := currentState.idle.Remove(element)
|
||||
delete(currentState.idleElements, conn)
|
||||
if p.options.IsAlive(conn) {
|
||||
p.access.Unlock()
|
||||
return conn, false, nil
|
||||
}
|
||||
delete(currentState.all, conn)
|
||||
staleConn = conn
|
||||
hasStale = true
|
||||
}
|
||||
p.access.Unlock()
|
||||
|
||||
if hasStale {
|
||||
p.options.Close(staleConn, net.ErrClosed)
|
||||
continue
|
||||
}
|
||||
|
||||
conn, err := p.dial(ctx, currentState, dial)
|
||||
if err != nil {
|
||||
return zero, false, err
|
||||
}
|
||||
|
||||
p.access.Lock()
|
||||
if p.closed {
|
||||
p.access.Unlock()
|
||||
p.options.Close(conn, net.ErrClosed)
|
||||
return zero, false, net.ErrClosed
|
||||
}
|
||||
if p.state != currentState {
|
||||
cause := p.closeCause(currentState)
|
||||
p.access.Unlock()
|
||||
p.options.Close(conn, cause)
|
||||
return zero, false, cause
|
||||
}
|
||||
currentState.all[conn] = struct{}{}
|
||||
p.access.Unlock()
|
||||
return conn, true, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ConnPool[T]) acquireShared(ctx context.Context, dial func(context.Context) (T, error)) (T, context.Context, bool, error) {
|
||||
var zero T
|
||||
for {
|
||||
var (
|
||||
staleConn T
|
||||
hasStale bool
|
||||
state *connPoolConnect[T]
|
||||
current *connPoolState[T]
|
||||
startDial bool
|
||||
)
|
||||
|
||||
p.access.Lock()
|
||||
if p.closed {
|
||||
p.access.Unlock()
|
||||
return zero, nil, false, net.ErrClosed
|
||||
}
|
||||
|
||||
current = p.state
|
||||
if current.hasShared {
|
||||
conn := current.shared
|
||||
if p.options.IsAlive(conn) {
|
||||
created := !current.sharedClaimed
|
||||
current.sharedClaimed = true
|
||||
connCtx := current.sharedCtx
|
||||
p.access.Unlock()
|
||||
return conn, connCtx, created, nil
|
||||
}
|
||||
delete(current.all, conn)
|
||||
var zeroConn T
|
||||
current.shared = zeroConn
|
||||
current.hasShared = false
|
||||
current.sharedClaimed = false
|
||||
current.sharedCtx = nil
|
||||
if current.sharedCancel != nil {
|
||||
current.sharedCancel(net.ErrClosed)
|
||||
current.sharedCancel = nil
|
||||
}
|
||||
staleConn = conn
|
||||
hasStale = true
|
||||
p.access.Unlock()
|
||||
p.options.Close(staleConn, net.ErrClosed)
|
||||
continue
|
||||
}
|
||||
|
||||
if current.connecting == nil {
|
||||
current.connecting = &connPoolConnect[T]{
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
startDial = true
|
||||
}
|
||||
state = current.connecting
|
||||
p.access.Unlock()
|
||||
|
||||
if hasStale {
|
||||
continue
|
||||
}
|
||||
if startDial {
|
||||
go p.connectSingle(current, state, ctx, dial)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-state.done:
|
||||
conn, connCtx, created, retry, err := p.collectShared(current, state, startDial)
|
||||
if retry {
|
||||
continue
|
||||
}
|
||||
return conn, connCtx, created, err
|
||||
case <-ctx.Done():
|
||||
return zero, nil, false, ctx.Err()
|
||||
case <-current.ctx.Done():
|
||||
p.access.Lock()
|
||||
closed := p.closed
|
||||
p.access.Unlock()
|
||||
if closed {
|
||||
return zero, nil, false, net.ErrClosed
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ConnPool[T]) connectSingle(current *connPoolState[T], state *connPoolConnect[T], ctx context.Context, dial func(context.Context) (T, error)) {
|
||||
conn, err := p.dial(ctx, current, dial)
|
||||
if err != nil {
|
||||
p.access.Lock()
|
||||
if current.connecting == state {
|
||||
current.connecting = nil
|
||||
}
|
||||
state.err = err
|
||||
p.access.Unlock()
|
||||
close(state.done)
|
||||
return
|
||||
}
|
||||
|
||||
var closeErr error
|
||||
|
||||
p.access.Lock()
|
||||
if current.connecting == state {
|
||||
current.connecting = nil
|
||||
}
|
||||
if p.closed {
|
||||
closeErr = net.ErrClosed
|
||||
state.err = closeErr
|
||||
} else if p.state != current {
|
||||
closeErr = p.closeCause(current)
|
||||
state.err = closeErr
|
||||
} else {
|
||||
sharedCtx, sharedCancel := context.WithCancelCause(current.ctx)
|
||||
current.shared = conn
|
||||
current.hasShared = true
|
||||
current.sharedClaimed = false
|
||||
current.sharedCtx = sharedCtx
|
||||
current.sharedCancel = sharedCancel
|
||||
current.all[conn] = struct{}{}
|
||||
}
|
||||
p.access.Unlock()
|
||||
|
||||
if closeErr != nil {
|
||||
p.options.Close(conn, closeErr)
|
||||
}
|
||||
close(state.done)
|
||||
}
|
||||
|
||||
func (p *ConnPool[T]) collectShared(current *connPoolState[T], state *connPoolConnect[T], startDial bool) (T, context.Context, bool, bool, error) {
|
||||
var zero T
|
||||
|
||||
p.access.Lock()
|
||||
if state.err != nil {
|
||||
err := state.err
|
||||
p.access.Unlock()
|
||||
if startDial {
|
||||
return zero, nil, false, false, err
|
||||
}
|
||||
return zero, nil, false, true, nil
|
||||
}
|
||||
if p.closed {
|
||||
p.access.Unlock()
|
||||
return zero, nil, false, false, net.ErrClosed
|
||||
}
|
||||
if p.state != current {
|
||||
cause := p.closeCause(current)
|
||||
p.access.Unlock()
|
||||
return zero, nil, false, false, cause
|
||||
}
|
||||
if !current.hasShared {
|
||||
p.access.Unlock()
|
||||
return zero, nil, false, true, nil
|
||||
}
|
||||
|
||||
conn := current.shared
|
||||
if !p.options.IsAlive(conn) {
|
||||
delete(current.all, conn)
|
||||
var zeroConn T
|
||||
current.shared = zeroConn
|
||||
current.hasShared = false
|
||||
current.sharedClaimed = false
|
||||
current.sharedCtx = nil
|
||||
if current.sharedCancel != nil {
|
||||
current.sharedCancel(net.ErrClosed)
|
||||
current.sharedCancel = nil
|
||||
}
|
||||
p.access.Unlock()
|
||||
p.options.Close(conn, net.ErrClosed)
|
||||
return zero, nil, false, true, nil
|
||||
}
|
||||
|
||||
created := !current.sharedClaimed
|
||||
current.sharedClaimed = true
|
||||
connCtx := current.sharedCtx
|
||||
p.access.Unlock()
|
||||
return conn, connCtx, created, false, nil
|
||||
}
|
||||
|
||||
func (p *ConnPool[T]) dial(ctx context.Context, current *connPoolState[T], dial func(context.Context) (T, error)) (T, error) {
|
||||
var zero T
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
return zero, err
|
||||
}
|
||||
if cause := context.Cause(current.ctx); cause != nil {
|
||||
return zero, cause
|
||||
}
|
||||
|
||||
dialCtx, cancel := context.WithCancelCause(current.ctx)
|
||||
var (
|
||||
stateAccess sync.Mutex
|
||||
dialComplete bool
|
||||
)
|
||||
stopCancel := context.AfterFunc(ctx, func() {
|
||||
stateAccess.Lock()
|
||||
if !dialComplete {
|
||||
cancel(context.Cause(ctx))
|
||||
}
|
||||
stateAccess.Unlock()
|
||||
})
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
stateAccess.Lock()
|
||||
dialComplete = true
|
||||
stateAccess.Unlock()
|
||||
stopCancel()
|
||||
cancel(context.Cause(ctx))
|
||||
return zero, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
conn, err := dial(connPoolDialContext{
|
||||
Context: dialCtx,
|
||||
parent: ctx,
|
||||
})
|
||||
stateAccess.Lock()
|
||||
dialComplete = true
|
||||
stateAccess.Unlock()
|
||||
stopCancel()
|
||||
if err != nil {
|
||||
if cause := context.Cause(dialCtx); cause != nil {
|
||||
return zero, cause
|
||||
}
|
||||
return zero, err
|
||||
}
|
||||
if cause := context.Cause(dialCtx); cause != nil {
|
||||
p.options.Close(conn, cause)
|
||||
return zero, cause
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (p *ConnPool[T]) closeState(state *connPoolState[T], cause error) {
|
||||
if state == nil {
|
||||
return
|
||||
}
|
||||
|
||||
state.cancel(cause)
|
||||
if state.sharedCancel != nil {
|
||||
state.sharedCancel(cause)
|
||||
}
|
||||
for conn := range state.all {
|
||||
p.options.Close(conn, cause)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ConnPool[T]) closeCause(state *connPoolState[T]) error {
|
||||
_ = state
|
||||
return net.ErrClosed
|
||||
}
|
||||
@@ -1,287 +0,0 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
E "github.com/sagernet/sing/common/exceptions"
|
||||
)
|
||||
|
||||
type ConnectorCallbacks[T any] struct {
|
||||
IsClosed func(connection T) bool
|
||||
Close func(connection T)
|
||||
Reset func(connection T)
|
||||
}
|
||||
|
||||
type Connector[T any] struct {
|
||||
dial func(ctx context.Context) (T, error)
|
||||
callbacks ConnectorCallbacks[T]
|
||||
|
||||
access sync.Mutex
|
||||
connection T
|
||||
hasConnection bool
|
||||
connectionCancel context.CancelFunc
|
||||
connecting chan struct{}
|
||||
|
||||
closeCtx context.Context
|
||||
closed bool
|
||||
}
|
||||
|
||||
func NewConnector[T any](closeCtx context.Context, dial func(context.Context) (T, error), callbacks ConnectorCallbacks[T]) *Connector[T] {
|
||||
return &Connector[T]{
|
||||
dial: dial,
|
||||
callbacks: callbacks,
|
||||
closeCtx: closeCtx,
|
||||
}
|
||||
}
|
||||
|
||||
func NewSingleflightConnector(closeCtx context.Context, dial func(context.Context) (*Connection, error)) *Connector[*Connection] {
|
||||
return NewConnector(closeCtx, dial, ConnectorCallbacks[*Connection]{
|
||||
IsClosed: func(connection *Connection) bool {
|
||||
return connection.IsClosed()
|
||||
},
|
||||
Close: func(connection *Connection) {
|
||||
connection.CloseWithError(ErrTransportClosed)
|
||||
},
|
||||
Reset: func(connection *Connection) {
|
||||
connection.CloseWithError(ErrConnectionReset)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
type contextKeyConnecting struct{}
|
||||
|
||||
var errRecursiveConnectorDial = E.New("recursive connector dial")
|
||||
|
||||
func (c *Connector[T]) Get(ctx context.Context) (T, error) {
|
||||
var zero T
|
||||
for {
|
||||
c.access.Lock()
|
||||
|
||||
if c.closed {
|
||||
c.access.Unlock()
|
||||
return zero, ErrTransportClosed
|
||||
}
|
||||
|
||||
if c.hasConnection && !c.callbacks.IsClosed(c.connection) {
|
||||
connection := c.connection
|
||||
c.access.Unlock()
|
||||
return connection, nil
|
||||
}
|
||||
|
||||
c.hasConnection = false
|
||||
if c.connectionCancel != nil {
|
||||
c.connectionCancel()
|
||||
c.connectionCancel = nil
|
||||
}
|
||||
if isRecursiveConnectorDial(ctx, c) {
|
||||
c.access.Unlock()
|
||||
return zero, errRecursiveConnectorDial
|
||||
}
|
||||
|
||||
if c.connecting != nil {
|
||||
connecting := c.connecting
|
||||
c.access.Unlock()
|
||||
|
||||
select {
|
||||
case <-connecting:
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
return zero, ctx.Err()
|
||||
case <-c.closeCtx.Done():
|
||||
return zero, ErrTransportClosed
|
||||
}
|
||||
}
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
c.access.Unlock()
|
||||
return zero, err
|
||||
}
|
||||
|
||||
c.connecting = make(chan struct{})
|
||||
c.access.Unlock()
|
||||
|
||||
dialContext := context.WithValue(ctx, contextKeyConnecting{}, c)
|
||||
connection, cancel, err := c.dialWithCancellation(dialContext)
|
||||
|
||||
c.access.Lock()
|
||||
close(c.connecting)
|
||||
c.connecting = nil
|
||||
|
||||
if err != nil {
|
||||
c.access.Unlock()
|
||||
return zero, err
|
||||
}
|
||||
|
||||
if c.closed {
|
||||
cancel()
|
||||
c.callbacks.Close(connection)
|
||||
c.access.Unlock()
|
||||
return zero, ErrTransportClosed
|
||||
}
|
||||
if err = ctx.Err(); err != nil {
|
||||
cancel()
|
||||
c.callbacks.Close(connection)
|
||||
c.access.Unlock()
|
||||
return zero, err
|
||||
}
|
||||
|
||||
c.connection = connection
|
||||
c.hasConnection = true
|
||||
c.connectionCancel = cancel
|
||||
result := c.connection
|
||||
c.access.Unlock()
|
||||
|
||||
return result, nil
|
||||
}
|
||||
}
|
||||
|
||||
func isRecursiveConnectorDial[T any](ctx context.Context, connector *Connector[T]) bool {
|
||||
dialConnector, loaded := ctx.Value(contextKeyConnecting{}).(*Connector[T])
|
||||
return loaded && dialConnector == connector
|
||||
}
|
||||
|
||||
func (c *Connector[T]) dialWithCancellation(ctx context.Context) (T, context.CancelFunc, error) {
|
||||
var zero T
|
||||
if err := ctx.Err(); err != nil {
|
||||
return zero, nil, err
|
||||
}
|
||||
connCtx, cancel := context.WithCancel(c.closeCtx)
|
||||
|
||||
var (
|
||||
stateAccess sync.Mutex
|
||||
dialComplete bool
|
||||
)
|
||||
stopCancel := context.AfterFunc(ctx, func() {
|
||||
stateAccess.Lock()
|
||||
if !dialComplete {
|
||||
cancel()
|
||||
}
|
||||
stateAccess.Unlock()
|
||||
})
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
stateAccess.Lock()
|
||||
dialComplete = true
|
||||
stateAccess.Unlock()
|
||||
stopCancel()
|
||||
cancel()
|
||||
return zero, nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
connection, err := c.dial(valueContext{connCtx, ctx})
|
||||
stateAccess.Lock()
|
||||
dialComplete = true
|
||||
stateAccess.Unlock()
|
||||
stopCancel()
|
||||
if err != nil {
|
||||
cancel()
|
||||
return zero, nil, err
|
||||
}
|
||||
return connection, cancel, nil
|
||||
}
|
||||
|
||||
type valueContext struct {
|
||||
context.Context
|
||||
parent context.Context
|
||||
}
|
||||
|
||||
func (v valueContext) Value(key any) any {
|
||||
return v.parent.Value(key)
|
||||
}
|
||||
|
||||
func (v valueContext) Deadline() (time.Time, bool) {
|
||||
return v.parent.Deadline()
|
||||
}
|
||||
|
||||
func (c *Connector[T]) Close() error {
|
||||
c.access.Lock()
|
||||
defer c.access.Unlock()
|
||||
|
||||
if c.closed {
|
||||
return nil
|
||||
}
|
||||
c.closed = true
|
||||
|
||||
if c.connectionCancel != nil {
|
||||
c.connectionCancel()
|
||||
c.connectionCancel = nil
|
||||
}
|
||||
if c.hasConnection {
|
||||
c.callbacks.Close(c.connection)
|
||||
c.hasConnection = false
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Connector[T]) Reset() {
|
||||
c.access.Lock()
|
||||
defer c.access.Unlock()
|
||||
|
||||
if c.connectionCancel != nil {
|
||||
c.connectionCancel()
|
||||
c.connectionCancel = nil
|
||||
}
|
||||
if c.hasConnection {
|
||||
c.callbacks.Reset(c.connection)
|
||||
c.hasConnection = false
|
||||
}
|
||||
}
|
||||
|
||||
type Connection struct {
|
||||
net.Conn
|
||||
|
||||
closeOnce sync.Once
|
||||
done chan struct{}
|
||||
closeError error
|
||||
}
|
||||
|
||||
func WrapConnection(conn net.Conn) *Connection {
|
||||
return &Connection{
|
||||
Conn: conn,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connection) Done() <-chan struct{} {
|
||||
return c.done
|
||||
}
|
||||
|
||||
func (c *Connection) IsClosed() bool {
|
||||
select {
|
||||
case <-c.done:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connection) CloseError() error {
|
||||
select {
|
||||
case <-c.done:
|
||||
if c.closeError != nil {
|
||||
return c.closeError
|
||||
}
|
||||
return ErrTransportClosed
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connection) Close() error {
|
||||
return c.CloseWithError(ErrTransportClosed)
|
||||
}
|
||||
|
||||
func (c *Connection) CloseWithError(err error) error {
|
||||
var returnError error
|
||||
c.closeOnce.Do(func() {
|
||||
c.closeError = err
|
||||
returnError = c.Conn.Close()
|
||||
close(c.done)
|
||||
})
|
||||
return returnError
|
||||
}
|
||||
@@ -1,263 +0,0 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type testConnectorConnection struct{}
|
||||
|
||||
func TestConnectorRecursiveGetFailsFast(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
dialCount atomic.Int32
|
||||
closeCount atomic.Int32
|
||||
connector *Connector[*testConnectorConnection]
|
||||
)
|
||||
|
||||
dial := func(ctx context.Context) (*testConnectorConnection, error) {
|
||||
dialCount.Add(1)
|
||||
_, err := connector.Get(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &testConnectorConnection{}, nil
|
||||
}
|
||||
|
||||
connector = NewConnector(context.Background(), dial, ConnectorCallbacks[*testConnectorConnection]{
|
||||
IsClosed: func(connection *testConnectorConnection) bool {
|
||||
return false
|
||||
},
|
||||
Close: func(connection *testConnectorConnection) {
|
||||
closeCount.Add(1)
|
||||
},
|
||||
Reset: func(connection *testConnectorConnection) {
|
||||
closeCount.Add(1)
|
||||
},
|
||||
})
|
||||
|
||||
_, err := connector.Get(context.Background())
|
||||
require.ErrorIs(t, err, errRecursiveConnectorDial)
|
||||
require.EqualValues(t, 1, dialCount.Load())
|
||||
require.EqualValues(t, 0, closeCount.Load())
|
||||
}
|
||||
|
||||
func TestConnectorRecursiveGetAcrossConnectorsAllowed(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
outerDialCount atomic.Int32
|
||||
innerDialCount atomic.Int32
|
||||
outerConnector *Connector[*testConnectorConnection]
|
||||
innerConnector *Connector[*testConnectorConnection]
|
||||
)
|
||||
|
||||
innerConnector = NewConnector(context.Background(), func(ctx context.Context) (*testConnectorConnection, error) {
|
||||
innerDialCount.Add(1)
|
||||
return &testConnectorConnection{}, nil
|
||||
}, ConnectorCallbacks[*testConnectorConnection]{
|
||||
IsClosed: func(connection *testConnectorConnection) bool {
|
||||
return false
|
||||
},
|
||||
Close: func(connection *testConnectorConnection) {},
|
||||
Reset: func(connection *testConnectorConnection) {},
|
||||
})
|
||||
|
||||
outerConnector = NewConnector(context.Background(), func(ctx context.Context) (*testConnectorConnection, error) {
|
||||
outerDialCount.Add(1)
|
||||
_, err := innerConnector.Get(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &testConnectorConnection{}, nil
|
||||
}, ConnectorCallbacks[*testConnectorConnection]{
|
||||
IsClosed: func(connection *testConnectorConnection) bool {
|
||||
return false
|
||||
},
|
||||
Close: func(connection *testConnectorConnection) {},
|
||||
Reset: func(connection *testConnectorConnection) {},
|
||||
})
|
||||
|
||||
_, err := outerConnector.Get(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, outerDialCount.Load())
|
||||
require.EqualValues(t, 1, innerDialCount.Load())
|
||||
}
|
||||
|
||||
func TestConnectorDialContextPreservesValueAndDeadline(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
type contextKey struct{}
|
||||
|
||||
var (
|
||||
dialValue any
|
||||
dialDeadline time.Time
|
||||
dialHasDeadline bool
|
||||
)
|
||||
|
||||
connector := NewConnector(context.Background(), func(ctx context.Context) (*testConnectorConnection, error) {
|
||||
dialValue = ctx.Value(contextKey{})
|
||||
dialDeadline, dialHasDeadline = ctx.Deadline()
|
||||
return &testConnectorConnection{}, nil
|
||||
}, ConnectorCallbacks[*testConnectorConnection]{
|
||||
IsClosed: func(connection *testConnectorConnection) bool {
|
||||
return false
|
||||
},
|
||||
Close: func(connection *testConnectorConnection) {},
|
||||
Reset: func(connection *testConnectorConnection) {},
|
||||
})
|
||||
|
||||
deadline := time.Now().Add(time.Minute)
|
||||
requestContext, cancel := context.WithDeadline(context.WithValue(context.Background(), contextKey{}, "test-value"), deadline)
|
||||
defer cancel()
|
||||
|
||||
_, err := connector.Get(requestContext)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "test-value", dialValue)
|
||||
require.True(t, dialHasDeadline)
|
||||
require.WithinDuration(t, deadline, dialDeadline, time.Second)
|
||||
}
|
||||
|
||||
func TestConnectorDialSkipsCanceledRequest(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var dialCount atomic.Int32
|
||||
connector := NewConnector(context.Background(), func(ctx context.Context) (*testConnectorConnection, error) {
|
||||
dialCount.Add(1)
|
||||
return &testConnectorConnection{}, nil
|
||||
}, ConnectorCallbacks[*testConnectorConnection]{
|
||||
IsClosed: func(connection *testConnectorConnection) bool {
|
||||
return false
|
||||
},
|
||||
Close: func(connection *testConnectorConnection) {},
|
||||
Reset: func(connection *testConnectorConnection) {},
|
||||
})
|
||||
|
||||
requestContext, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
_, err := connector.Get(requestContext)
|
||||
require.ErrorIs(t, err, context.Canceled)
|
||||
require.EqualValues(t, 0, dialCount.Load())
|
||||
}
|
||||
|
||||
func TestConnectorCanceledRequestDoesNotCacheConnection(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
dialCount atomic.Int32
|
||||
closeCount atomic.Int32
|
||||
)
|
||||
dialStarted := make(chan struct{}, 1)
|
||||
releaseDial := make(chan struct{})
|
||||
|
||||
connector := NewConnector(context.Background(), func(ctx context.Context) (*testConnectorConnection, error) {
|
||||
dialCount.Add(1)
|
||||
select {
|
||||
case dialStarted <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
<-releaseDial
|
||||
return &testConnectorConnection{}, nil
|
||||
}, ConnectorCallbacks[*testConnectorConnection]{
|
||||
IsClosed: func(connection *testConnectorConnection) bool {
|
||||
return false
|
||||
},
|
||||
Close: func(connection *testConnectorConnection) {
|
||||
closeCount.Add(1)
|
||||
},
|
||||
Reset: func(connection *testConnectorConnection) {},
|
||||
})
|
||||
|
||||
requestContext, cancel := context.WithCancel(context.Background())
|
||||
result := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := connector.Get(requestContext)
|
||||
result <- err
|
||||
}()
|
||||
|
||||
<-dialStarted
|
||||
cancel()
|
||||
close(releaseDial)
|
||||
|
||||
err := <-result
|
||||
require.ErrorIs(t, err, context.Canceled)
|
||||
require.EqualValues(t, 1, dialCount.Load())
|
||||
require.EqualValues(t, 1, closeCount.Load())
|
||||
|
||||
_, err = connector.Get(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 2, dialCount.Load())
|
||||
}
|
||||
|
||||
func TestConnectorDialContextNotCanceledByRequestContextAfterDial(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var dialContext context.Context
|
||||
connector := NewConnector(context.Background(), func(ctx context.Context) (*testConnectorConnection, error) {
|
||||
dialContext = ctx
|
||||
return &testConnectorConnection{}, nil
|
||||
}, ConnectorCallbacks[*testConnectorConnection]{
|
||||
IsClosed: func(connection *testConnectorConnection) bool {
|
||||
return false
|
||||
},
|
||||
Close: func(connection *testConnectorConnection) {},
|
||||
Reset: func(connection *testConnectorConnection) {},
|
||||
})
|
||||
|
||||
requestContext, cancel := context.WithCancel(context.Background())
|
||||
_, err := connector.Get(requestContext)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, dialContext)
|
||||
|
||||
cancel()
|
||||
|
||||
select {
|
||||
case <-dialContext.Done():
|
||||
t.Fatal("dial context canceled by request context after successful dial")
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
}
|
||||
|
||||
err = connector.Close()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestConnectorDialContextCanceledOnClose(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var dialContext context.Context
|
||||
connector := NewConnector(context.Background(), func(ctx context.Context) (*testConnectorConnection, error) {
|
||||
dialContext = ctx
|
||||
return &testConnectorConnection{}, nil
|
||||
}, ConnectorCallbacks[*testConnectorConnection]{
|
||||
IsClosed: func(connection *testConnectorConnection) bool {
|
||||
return false
|
||||
},
|
||||
Close: func(connection *testConnectorConnection) {},
|
||||
Reset: func(connection *testConnectorConnection) {},
|
||||
})
|
||||
|
||||
_, err := connector.Get(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, dialContext)
|
||||
|
||||
select {
|
||||
case <-dialContext.Done():
|
||||
t.Fatal("dial context canceled before connector close")
|
||||
default:
|
||||
}
|
||||
|
||||
err = connector.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
select {
|
||||
case <-dialContext.Done():
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("dial context not canceled after connector close")
|
||||
}
|
||||
}
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/sagernet/sing-box/dns"
|
||||
"github.com/sagernet/sing-box/dns/transport"
|
||||
"github.com/sagernet/sing/common/buf"
|
||||
E "github.com/sagernet/sing/common/exceptions"
|
||||
@@ -40,13 +39,6 @@ func (t *Transport) exchangeParallel(ctx context.Context, servers []M.Socksaddr,
|
||||
results := make(chan queryResult)
|
||||
startRacer := func(ctx context.Context, fqdn string) {
|
||||
response, err := t.tryOneName(ctx, servers, fqdn, message)
|
||||
if err == nil {
|
||||
if response.Rcode != mDNS.RcodeSuccess {
|
||||
err = dns.RcodeError(response.Rcode)
|
||||
} else if len(dns.MessageToAddresses(response)) == 0 {
|
||||
err = dns.RcodeSuccess
|
||||
}
|
||||
}
|
||||
select {
|
||||
case results <- queryResult{response, err}:
|
||||
case <-returned:
|
||||
|
||||
@@ -23,16 +23,25 @@ var _ adapter.FakeIPTransport = (*Transport)(nil)
|
||||
|
||||
type Transport struct {
|
||||
dns.TransportAdapter
|
||||
logger logger.ContextLogger
|
||||
store adapter.FakeIPStore
|
||||
logger logger.ContextLogger
|
||||
store adapter.FakeIPStore
|
||||
inet4Enabled bool
|
||||
inet6Enabled bool
|
||||
}
|
||||
|
||||
func NewTransport(ctx context.Context, logger log.ContextLogger, tag string, options option.FakeIPDNSServerOptions) (adapter.DNSTransport, error) {
|
||||
store := NewStore(ctx, logger, options.Inet4Range.Build(netip.Prefix{}), options.Inet6Range.Build(netip.Prefix{}))
|
||||
inet4Range := options.Inet4Range.Build(netip.Prefix{})
|
||||
inet6Range := options.Inet6Range.Build(netip.Prefix{})
|
||||
if !inet4Range.IsValid() && !inet6Range.IsValid() {
|
||||
return nil, E.New("at least one of inet4_range or inet6_range must be set")
|
||||
}
|
||||
store := NewStore(ctx, logger, inet4Range, inet6Range)
|
||||
return &Transport{
|
||||
TransportAdapter: dns.NewTransportAdapter(C.DNSTypeFakeIP, tag, nil),
|
||||
logger: logger,
|
||||
store: store,
|
||||
inet4Enabled: inet4Range.IsValid(),
|
||||
inet6Enabled: inet6Range.IsValid(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -55,6 +64,9 @@ func (t *Transport) Exchange(ctx context.Context, message *mDNS.Msg) (*mDNS.Msg,
|
||||
if question.Qtype != mDNS.TypeA && question.Qtype != mDNS.TypeAAAA {
|
||||
return nil, E.New("only IP queries are supported by fakeip")
|
||||
}
|
||||
if question.Qtype == mDNS.TypeA && !t.inet4Enabled || question.Qtype == mDNS.TypeAAAA && !t.inet6Enabled {
|
||||
return dns.FixedResponseStatus(message, mDNS.RcodeSuccess), nil
|
||||
}
|
||||
address, err := t.store.Create(dns.FqdnToDomain(question.Name), question.Qtype == mDNS.TypeAAAA)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -81,10 +81,7 @@ func (t *Transport) Reset() {
|
||||
|
||||
func (t *Transport) Exchange(ctx context.Context, message *mDNS.Msg) (*mDNS.Msg, error) {
|
||||
if t.resolved != nil {
|
||||
resolverObject := t.resolved.Object()
|
||||
if resolverObject != nil {
|
||||
return t.resolved.Exchange(resolverObject, ctx, message)
|
||||
}
|
||||
return t.resolved.Exchange(ctx, message)
|
||||
}
|
||||
question := message.Question[0]
|
||||
if question.Qtype == mDNS.TypeA || question.Qtype == mDNS.TypeAAAA {
|
||||
|
||||
@@ -9,6 +9,5 @@ import (
|
||||
type ResolvedResolver interface {
|
||||
Start() error
|
||||
Close() error
|
||||
Object() any
|
||||
Exchange(object any, ctx context.Context, message *mDNS.Msg) (*mDNS.Msg, error)
|
||||
Exchange(ctx context.Context, message *mDNS.Msg) (*mDNS.Msg, error)
|
||||
}
|
||||
|
||||
@@ -4,19 +4,26 @@ import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"net/netip"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/sagernet/sing-box/adapter"
|
||||
"github.com/sagernet/sing-box/common/dialer"
|
||||
"github.com/sagernet/sing-box/common/tls"
|
||||
C "github.com/sagernet/sing-box/constant"
|
||||
"github.com/sagernet/sing-box/dns"
|
||||
dnsTransport "github.com/sagernet/sing-box/dns/transport"
|
||||
"github.com/sagernet/sing-box/option"
|
||||
"github.com/sagernet/sing-box/service/resolved"
|
||||
"github.com/sagernet/sing-tun"
|
||||
"github.com/sagernet/sing/common"
|
||||
"github.com/sagernet/sing/common/control"
|
||||
E "github.com/sagernet/sing/common/exceptions"
|
||||
"github.com/sagernet/sing/common/logger"
|
||||
M "github.com/sagernet/sing/common/metadata"
|
||||
N "github.com/sagernet/sing/common/network"
|
||||
"github.com/sagernet/sing/common/x/list"
|
||||
"github.com/sagernet/sing/service"
|
||||
|
||||
@@ -49,13 +56,23 @@ type DBusResolvedResolver struct {
|
||||
interfaceMonitor tun.DefaultInterfaceMonitor
|
||||
interfaceCallback *list.Element[tun.DefaultInterfaceUpdateCallback]
|
||||
systemBus *dbus.Conn
|
||||
resoledObject atomic.Pointer[ResolvedObject]
|
||||
savedServerSet atomic.Pointer[resolvedServerSet]
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
type ResolvedObject struct {
|
||||
dbus.BusObject
|
||||
InterfaceIndex int32
|
||||
type resolvedServerSet struct {
|
||||
servers []resolvedServer
|
||||
}
|
||||
|
||||
type resolvedServer struct {
|
||||
primaryTransport adapter.DNSTransport
|
||||
fallbackTransport adapter.DNSTransport
|
||||
}
|
||||
|
||||
type resolvedServerSpecification struct {
|
||||
address netip.Addr
|
||||
port uint16
|
||||
serverName string
|
||||
}
|
||||
|
||||
func NewResolvedResolver(ctx context.Context, logger logger.ContextLogger) (ResolvedResolver, error) {
|
||||
@@ -82,17 +99,31 @@ func (t *DBusResolvedResolver) Start() error {
|
||||
"org.freedesktop.DBus",
|
||||
"NameOwnerChanged",
|
||||
dbus.WithMatchSender("org.freedesktop.DBus"),
|
||||
dbus.WithMatchArg(0, "org.freedesktop.resolve1.Manager"),
|
||||
dbus.WithMatchArg(0, "org.freedesktop.resolve1"),
|
||||
).Err
|
||||
if err != nil {
|
||||
return E.Cause(err, "configure resolved restart listener")
|
||||
}
|
||||
err = t.systemBus.BusObject().AddMatchSignal(
|
||||
"org.freedesktop.DBus.Properties",
|
||||
"PropertiesChanged",
|
||||
dbus.WithMatchSender("org.freedesktop.resolve1"),
|
||||
dbus.WithMatchArg(0, "org.freedesktop.resolve1.Manager"),
|
||||
).Err
|
||||
if err != nil {
|
||||
return E.Cause(err, "configure resolved properties listener")
|
||||
}
|
||||
go t.loopUpdateStatus()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *DBusResolvedResolver) Close() error {
|
||||
var closeErr error
|
||||
t.closeOnce.Do(func() {
|
||||
serverSet := t.savedServerSet.Swap(nil)
|
||||
if serverSet != nil {
|
||||
closeErr = serverSet.Close()
|
||||
}
|
||||
if t.interfaceCallback != nil {
|
||||
t.interfaceMonitor.UnregisterCallback(t.interfaceCallback)
|
||||
}
|
||||
@@ -100,99 +131,97 @@ func (t *DBusResolvedResolver) Close() error {
|
||||
_ = t.systemBus.Close()
|
||||
}
|
||||
})
|
||||
return nil
|
||||
return closeErr
|
||||
}
|
||||
|
||||
func (t *DBusResolvedResolver) Object() any {
|
||||
return common.PtrOrNil(t.resoledObject.Load())
|
||||
}
|
||||
|
||||
func (t *DBusResolvedResolver) Exchange(object any, ctx context.Context, message *mDNS.Msg) (*mDNS.Msg, error) {
|
||||
question := message.Question[0]
|
||||
resolvedObject := object.(*ResolvedObject)
|
||||
call := resolvedObject.CallWithContext(
|
||||
ctx,
|
||||
"org.freedesktop.resolve1.Manager.ResolveRecord",
|
||||
0,
|
||||
resolvedObject.InterfaceIndex,
|
||||
question.Name,
|
||||
question.Qclass,
|
||||
question.Qtype,
|
||||
uint64(0),
|
||||
)
|
||||
if call.Err != nil {
|
||||
var dbusError dbus.Error
|
||||
if errors.As(call.Err, &dbusError) && dbusError.Name == "org.freedesktop.resolve1.NoNameServers" {
|
||||
t.updateStatus()
|
||||
func (t *DBusResolvedResolver) Exchange(ctx context.Context, message *mDNS.Msg) (*mDNS.Msg, error) {
|
||||
serverSet := t.savedServerSet.Load()
|
||||
if serverSet == nil {
|
||||
var err error
|
||||
serverSet, err = t.checkResolved(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
previousServerSet := t.savedServerSet.Swap(serverSet)
|
||||
if previousServerSet != nil {
|
||||
_ = previousServerSet.Close()
|
||||
}
|
||||
return nil, E.Cause(call.Err, " resolve record via resolved")
|
||||
}
|
||||
var (
|
||||
records []resolved.ResourceRecord
|
||||
outflags uint64
|
||||
)
|
||||
err := call.Store(&records, &outflags)
|
||||
if err != nil {
|
||||
response, err := t.exchangeServerSet(ctx, message, serverSet)
|
||||
if err == nil {
|
||||
return response, nil
|
||||
}
|
||||
t.updateStatus()
|
||||
refreshedServerSet := t.savedServerSet.Load()
|
||||
if refreshedServerSet == nil || refreshedServerSet == serverSet {
|
||||
return nil, err
|
||||
}
|
||||
response := &mDNS.Msg{
|
||||
MsgHdr: mDNS.MsgHdr{
|
||||
Id: message.Id,
|
||||
Response: true,
|
||||
Authoritative: true,
|
||||
RecursionDesired: true,
|
||||
RecursionAvailable: true,
|
||||
Rcode: mDNS.RcodeSuccess,
|
||||
},
|
||||
Question: []mDNS.Question{question},
|
||||
}
|
||||
for _, record := range records {
|
||||
var rr mDNS.RR
|
||||
rr, _, err = mDNS.UnpackRR(record.Data, 0)
|
||||
if err != nil {
|
||||
return nil, E.Cause(err, "unpack resource record")
|
||||
}
|
||||
response.Answer = append(response.Answer, rr)
|
||||
}
|
||||
return response, nil
|
||||
return t.exchangeServerSet(ctx, message, refreshedServerSet)
|
||||
}
|
||||
|
||||
func (t *DBusResolvedResolver) loopUpdateStatus() {
|
||||
signalChan := make(chan *dbus.Signal, 1)
|
||||
t.systemBus.Signal(signalChan)
|
||||
for signal := range signalChan {
|
||||
var restarted bool
|
||||
if signal.Name == "org.freedesktop.DBus.NameOwnerChanged" {
|
||||
if len(signal.Body) != 3 || signal.Body[2].(string) == "" {
|
||||
switch signal.Name {
|
||||
case "org.freedesktop.DBus.NameOwnerChanged":
|
||||
if len(signal.Body) != 3 {
|
||||
continue
|
||||
}
|
||||
newOwner, loaded := signal.Body[2].(string)
|
||||
if !loaded || newOwner == "" {
|
||||
continue
|
||||
}
|
||||
t.updateStatus()
|
||||
case "org.freedesktop.DBus.Properties.PropertiesChanged":
|
||||
if !shouldUpdateResolvedServerSet(signal) {
|
||||
continue
|
||||
} else {
|
||||
restarted = true
|
||||
}
|
||||
}
|
||||
if restarted {
|
||||
t.updateStatus()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *DBusResolvedResolver) updateStatus() {
|
||||
dbusObject, err := t.checkResolved(context.Background())
|
||||
oldValue := t.resoledObject.Swap(dbusObject)
|
||||
serverSet, err := t.checkResolved(context.Background())
|
||||
oldServerSet := t.savedServerSet.Swap(serverSet)
|
||||
if oldServerSet != nil {
|
||||
_ = oldServerSet.Close()
|
||||
}
|
||||
if err != nil {
|
||||
var dbusErr dbus.Error
|
||||
if !errors.As(err, &dbusErr) || dbusErr.Name != "org.freedesktop.DBus.Error.NameHasNoOwnerCould" {
|
||||
if !errors.As(err, &dbusErr) || dbusErr.Name != "org.freedesktop.DBus.Error.NameHasNoOwner" {
|
||||
t.logger.Debug(E.Cause(err, "systemd-resolved service unavailable"))
|
||||
}
|
||||
if oldValue != nil {
|
||||
if oldServerSet != nil {
|
||||
t.logger.Debug("systemd-resolved service is gone")
|
||||
}
|
||||
return
|
||||
} else if oldValue == nil {
|
||||
} else if oldServerSet == nil {
|
||||
t.logger.Debug("using systemd-resolved service as resolver")
|
||||
}
|
||||
}
|
||||
|
||||
func (t *DBusResolvedResolver) checkResolved(ctx context.Context) (*ResolvedObject, error) {
|
||||
func (t *DBusResolvedResolver) exchangeServerSet(ctx context.Context, message *mDNS.Msg, serverSet *resolvedServerSet) (*mDNS.Msg, error) {
|
||||
if serverSet == nil || len(serverSet.servers) == 0 {
|
||||
return nil, E.New("link has no DNS servers configured")
|
||||
}
|
||||
var lastError error
|
||||
for _, server := range serverSet.servers {
|
||||
response, err := server.primaryTransport.Exchange(ctx, message)
|
||||
if err != nil && server.fallbackTransport != nil {
|
||||
response, err = server.fallbackTransport.Exchange(ctx, message)
|
||||
}
|
||||
if err != nil {
|
||||
lastError = err
|
||||
continue
|
||||
}
|
||||
return response, nil
|
||||
}
|
||||
return nil, lastError
|
||||
}
|
||||
|
||||
func (t *DBusResolvedResolver) checkResolved(ctx context.Context) (*resolvedServerSet, error) {
|
||||
dbusObject := t.systemBus.Object("org.freedesktop.resolve1", "/org/freedesktop/resolve1")
|
||||
err := dbusObject.Call("org.freedesktop.DBus.Peer.Ping", 0).Err
|
||||
if err != nil {
|
||||
@@ -220,16 +249,19 @@ func (t *DBusResolvedResolver) checkResolved(ctx context.Context) (*ResolvedObje
|
||||
if linkObject == nil {
|
||||
return nil, E.New("missing link object for default interface")
|
||||
}
|
||||
dnsProp, err := linkObject.GetProperty("org.freedesktop.resolve1.Link.DNS")
|
||||
dnsOverTLSMode, err := loadResolvedLinkDNSOverTLS(linkObject)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var linkDNS []resolved.LinkDNS
|
||||
err = dnsProp.Store(&linkDNS)
|
||||
linkDNSEx, err := loadResolvedLinkDNSEx(linkObject)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(linkDNS) == 0 {
|
||||
linkDNS, err := loadResolvedLinkDNS(linkObject)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(linkDNSEx) == 0 && len(linkDNS) == 0 {
|
||||
for _, inbound := range service.FromContext[adapter.InboundManager](t.ctx).Inbounds() {
|
||||
if inbound.Type() == C.TypeTun {
|
||||
return nil, E.New("No appropriate name servers or networks for name found")
|
||||
@@ -237,12 +269,233 @@ func (t *DBusResolvedResolver) checkResolved(ctx context.Context) (*ResolvedObje
|
||||
}
|
||||
return nil, E.New("link has no DNS servers configured")
|
||||
}
|
||||
return &ResolvedObject{
|
||||
BusObject: dbusObject,
|
||||
InterfaceIndex: int32(defaultInterface.Index),
|
||||
serverDialer, err := dialer.NewDefault(t.ctx, option.DialerOptions{
|
||||
BindInterface: defaultInterface.Name,
|
||||
UDPFragmentDefault: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var serverSpecifications []resolvedServerSpecification
|
||||
if len(linkDNSEx) > 0 {
|
||||
for _, entry := range linkDNSEx {
|
||||
serverSpecification, loaded := buildResolvedServerSpecification(defaultInterface.Name, entry.Address, entry.Port, entry.Name)
|
||||
if !loaded {
|
||||
continue
|
||||
}
|
||||
serverSpecifications = append(serverSpecifications, serverSpecification)
|
||||
}
|
||||
} else {
|
||||
for _, entry := range linkDNS {
|
||||
serverSpecification, loaded := buildResolvedServerSpecification(defaultInterface.Name, entry.Address, 0, "")
|
||||
if !loaded {
|
||||
continue
|
||||
}
|
||||
serverSpecifications = append(serverSpecifications, serverSpecification)
|
||||
}
|
||||
}
|
||||
if len(serverSpecifications) == 0 {
|
||||
return nil, E.New("no valid DNS servers on link")
|
||||
}
|
||||
serverSet := &resolvedServerSet{
|
||||
servers: make([]resolvedServer, 0, len(serverSpecifications)),
|
||||
}
|
||||
for _, serverSpecification := range serverSpecifications {
|
||||
server, createErr := t.createResolvedServer(serverDialer, dnsOverTLSMode, serverSpecification)
|
||||
if createErr != nil {
|
||||
_ = serverSet.Close()
|
||||
return nil, createErr
|
||||
}
|
||||
serverSet.servers = append(serverSet.servers, server)
|
||||
}
|
||||
return serverSet, nil
|
||||
}
|
||||
|
||||
func (t *DBusResolvedResolver) createResolvedServer(serverDialer N.Dialer, dnsOverTLSMode string, serverSpecification resolvedServerSpecification) (resolvedServer, error) {
|
||||
if dnsOverTLSMode == "yes" {
|
||||
primaryTransport, err := t.createResolvedTransport(serverDialer, serverSpecification, true)
|
||||
if err != nil {
|
||||
return resolvedServer{}, err
|
||||
}
|
||||
return resolvedServer{
|
||||
primaryTransport: primaryTransport,
|
||||
}, nil
|
||||
}
|
||||
if dnsOverTLSMode == "opportunistic" {
|
||||
primaryTransport, err := t.createResolvedTransport(serverDialer, serverSpecification, true)
|
||||
if err != nil {
|
||||
return resolvedServer{}, err
|
||||
}
|
||||
fallbackTransport, err := t.createResolvedTransport(serverDialer, serverSpecification, false)
|
||||
if err != nil {
|
||||
_ = primaryTransport.Close()
|
||||
return resolvedServer{}, err
|
||||
}
|
||||
return resolvedServer{
|
||||
primaryTransport: primaryTransport,
|
||||
fallbackTransport: fallbackTransport,
|
||||
}, nil
|
||||
}
|
||||
primaryTransport, err := t.createResolvedTransport(serverDialer, serverSpecification, false)
|
||||
if err != nil {
|
||||
return resolvedServer{}, err
|
||||
}
|
||||
return resolvedServer{
|
||||
primaryTransport: primaryTransport,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (t *DBusResolvedResolver) createResolvedTransport(serverDialer N.Dialer, serverSpecification resolvedServerSpecification, useTLS bool) (adapter.DNSTransport, error) {
|
||||
serverAddress := M.SocksaddrFrom(serverSpecification.address, resolvedServerPort(serverSpecification.port, useTLS))
|
||||
if useTLS {
|
||||
tlsAddress := serverSpecification.address
|
||||
if tlsAddress.Zone() != "" {
|
||||
tlsAddress = tlsAddress.WithZone("")
|
||||
}
|
||||
serverName := serverSpecification.serverName
|
||||
if serverName == "" {
|
||||
serverName = tlsAddress.String()
|
||||
}
|
||||
tlsConfig, err := tls.NewClient(t.ctx, t.logger, tlsAddress.String(), option.OutboundTLSOptions{
|
||||
Enabled: true,
|
||||
ServerName: serverName,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
serverTransport := dnsTransport.NewTLSRaw(t.logger, dns.NewTransportAdapter(C.DNSTypeTLS, "", nil), serverDialer, serverAddress, tlsConfig)
|
||||
err = serverTransport.Start(adapter.StartStateStart)
|
||||
if err != nil {
|
||||
_ = serverTransport.Close()
|
||||
return nil, err
|
||||
}
|
||||
return serverTransport, nil
|
||||
}
|
||||
serverTransport := dnsTransport.NewUDPRaw(t.logger, dns.NewTransportAdapter(C.DNSTypeUDP, "", nil), serverDialer, serverAddress)
|
||||
err := serverTransport.Start(adapter.StartStateStart)
|
||||
if err != nil {
|
||||
_ = serverTransport.Close()
|
||||
return nil, err
|
||||
}
|
||||
return serverTransport, nil
|
||||
}
|
||||
|
||||
func (s *resolvedServerSet) Close() error {
|
||||
var errors []error
|
||||
for _, server := range s.servers {
|
||||
errors = append(errors, server.primaryTransport.Close())
|
||||
if server.fallbackTransport != nil {
|
||||
errors = append(errors, server.fallbackTransport.Close())
|
||||
}
|
||||
}
|
||||
return E.Errors(errors...)
|
||||
}
|
||||
|
||||
func buildResolvedServerSpecification(interfaceName string, rawAddress []byte, port uint16, serverName string) (resolvedServerSpecification, bool) {
|
||||
address, loaded := netip.AddrFromSlice(rawAddress)
|
||||
if !loaded {
|
||||
return resolvedServerSpecification{}, false
|
||||
}
|
||||
if address.Is6() && address.IsLinkLocalUnicast() && address.Zone() == "" {
|
||||
address = address.WithZone(interfaceName)
|
||||
}
|
||||
return resolvedServerSpecification{
|
||||
address: address,
|
||||
port: port,
|
||||
serverName: serverName,
|
||||
}, true
|
||||
}
|
||||
|
||||
func resolvedServerPort(port uint16, useTLS bool) uint16 {
|
||||
if port > 0 {
|
||||
return port
|
||||
}
|
||||
if useTLS {
|
||||
return 853
|
||||
}
|
||||
return 53
|
||||
}
|
||||
|
||||
func loadResolvedLinkDNS(linkObject dbus.BusObject) ([]resolved.LinkDNS, error) {
|
||||
dnsProperty, err := linkObject.GetProperty("org.freedesktop.resolve1.Link.DNS")
|
||||
if err != nil {
|
||||
if isResolvedUnknownPropertyError(err) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
var linkDNS []resolved.LinkDNS
|
||||
err = dnsProperty.Store(&linkDNS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return linkDNS, nil
|
||||
}
|
||||
|
||||
func loadResolvedLinkDNSEx(linkObject dbus.BusObject) ([]resolved.LinkDNSEx, error) {
|
||||
dnsProperty, err := linkObject.GetProperty("org.freedesktop.resolve1.Link.DNSEx")
|
||||
if err != nil {
|
||||
if isResolvedUnknownPropertyError(err) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
var linkDNSEx []resolved.LinkDNSEx
|
||||
err = dnsProperty.Store(&linkDNSEx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return linkDNSEx, nil
|
||||
}
|
||||
|
||||
func loadResolvedLinkDNSOverTLS(linkObject dbus.BusObject) (string, error) {
|
||||
dnsOverTLSProperty, err := linkObject.GetProperty("org.freedesktop.resolve1.Link.DNSOverTLS")
|
||||
if err != nil {
|
||||
if isResolvedUnknownPropertyError(err) {
|
||||
return "", nil
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
var dnsOverTLSMode string
|
||||
err = dnsOverTLSProperty.Store(&dnsOverTLSMode)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return dnsOverTLSMode, nil
|
||||
}
|
||||
|
||||
func isResolvedUnknownPropertyError(err error) bool {
|
||||
var dbusError dbus.Error
|
||||
return errors.As(err, &dbusError) && dbusError.Name == "org.freedesktop.DBus.Error.UnknownProperty"
|
||||
}
|
||||
|
||||
func shouldUpdateResolvedServerSet(signal *dbus.Signal) bool {
|
||||
if len(signal.Body) != 3 {
|
||||
return true
|
||||
}
|
||||
changedProperties, loaded := signal.Body[1].(map[string]dbus.Variant)
|
||||
if !loaded {
|
||||
return true
|
||||
}
|
||||
for propertyName := range changedProperties {
|
||||
switch propertyName {
|
||||
case "DNS", "DNSEx", "DNSOverTLS":
|
||||
return true
|
||||
}
|
||||
}
|
||||
invalidatedProperties, loaded := signal.Body[2].([]string)
|
||||
if !loaded {
|
||||
return true
|
||||
}
|
||||
for _, propertyName := range invalidatedProperties {
|
||||
switch propertyName {
|
||||
case "DNS", "DNSEx", "DNSOverTLS":
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (t *DBusResolvedResolver) updateDefaultInterface(defaultInterface *control.Interface, flags int) {
|
||||
t.updateStatus()
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/sagernet/sing-box/dns"
|
||||
"github.com/sagernet/sing-box/dns/transport"
|
||||
"github.com/sagernet/sing/common/buf"
|
||||
E "github.com/sagernet/sing/common/exceptions"
|
||||
@@ -49,13 +48,6 @@ func (t *Transport) exchangeParallel(ctx context.Context, systemConfig *dnsConfi
|
||||
results := make(chan queryResult)
|
||||
startRacer := func(ctx context.Context, fqdn string) {
|
||||
response, err := t.tryOneName(ctx, systemConfig, fqdn, message)
|
||||
if err == nil {
|
||||
if response.Rcode != mDNS.RcodeSuccess {
|
||||
err = dns.RcodeError(response.Rcode)
|
||||
} else if len(dns.MessageToAddresses(response)) == 0 {
|
||||
err = E.New(fqdn, ": empty result")
|
||||
}
|
||||
}
|
||||
select {
|
||||
case results <- queryResult{response, err}:
|
||||
case <-returned:
|
||||
|
||||
@@ -31,14 +31,13 @@ func RegisterTransport(registry *dns.TransportRegistry) {
|
||||
}
|
||||
|
||||
type Transport struct {
|
||||
*transport.BaseTransport
|
||||
dns.TransportAdapter
|
||||
|
||||
ctx context.Context
|
||||
dialer N.Dialer
|
||||
serverAddr M.Socksaddr
|
||||
tlsConfig tls.Config
|
||||
|
||||
connector *transport.Connector[*quic.Conn]
|
||||
connection *transport.ConnPool[*quic.Conn]
|
||||
}
|
||||
|
||||
func NewQUIC(ctx context.Context, logger log.ContextLogger, tag string, options option.RemoteTLSDNSServerOptions) (adapter.DNSTransport, error) {
|
||||
@@ -63,93 +62,76 @@ func NewQUIC(ctx context.Context, logger log.ContextLogger, tag string, options
|
||||
return nil, E.New("invalid server address: ", serverAddr)
|
||||
}
|
||||
|
||||
t := &Transport{
|
||||
BaseTransport: transport.NewBaseTransport(
|
||||
dns.NewTransportAdapterWithRemoteOptions(C.DNSTypeQUIC, tag, options.RemoteDNSServerOptions),
|
||||
logger,
|
||||
),
|
||||
ctx: ctx,
|
||||
dialer: transportDialer,
|
||||
serverAddr: serverAddr,
|
||||
tlsConfig: tlsConfig,
|
||||
}
|
||||
|
||||
t.connector = transport.NewConnector(t.CloseContext(), t.dial, transport.ConnectorCallbacks[*quic.Conn]{
|
||||
IsClosed: func(connection *quic.Conn) bool {
|
||||
return common.Done(connection.Context())
|
||||
},
|
||||
Close: func(connection *quic.Conn) {
|
||||
connection.CloseWithError(0, "")
|
||||
},
|
||||
Reset: func(connection *quic.Conn) {
|
||||
connection.CloseWithError(0, "")
|
||||
},
|
||||
})
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func (t *Transport) dial(ctx context.Context) (*quic.Conn, error) {
|
||||
conn, err := t.dialer.DialContext(ctx, N.NetworkUDP, t.serverAddr)
|
||||
if err != nil {
|
||||
return nil, E.Cause(err, "dial UDP connection")
|
||||
}
|
||||
earlyConnection, err := sQUIC.DialEarly(
|
||||
ctx,
|
||||
bufio.NewUnbindPacketConn(conn),
|
||||
t.serverAddr.UDPAddr(),
|
||||
t.tlsConfig,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return nil, E.Cause(err, "establish QUIC connection")
|
||||
}
|
||||
return earlyConnection, nil
|
||||
return &Transport{
|
||||
TransportAdapter: dns.NewTransportAdapterWithRemoteOptions(C.DNSTypeQUIC, tag, options.RemoteDNSServerOptions),
|
||||
dialer: transportDialer,
|
||||
serverAddr: serverAddr,
|
||||
tlsConfig: tlsConfig,
|
||||
connection: transport.NewConnPool(transport.ConnPoolOptions[*quic.Conn]{
|
||||
Mode: transport.ConnPoolSingle,
|
||||
IsAlive: func(conn *quic.Conn) bool {
|
||||
return conn != nil && !common.Done(conn.Context())
|
||||
},
|
||||
Close: func(conn *quic.Conn, _ error) {
|
||||
conn.CloseWithError(0, "")
|
||||
},
|
||||
}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (t *Transport) Start(stage adapter.StartStage) error {
|
||||
if stage != adapter.StartStateStart {
|
||||
return nil
|
||||
}
|
||||
err := t.SetStarted()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return dialer.InitializeDetour(t.dialer)
|
||||
}
|
||||
|
||||
func (t *Transport) Close() error {
|
||||
return E.Errors(t.BaseTransport.Close(), t.connector.Close())
|
||||
return t.connection.Close()
|
||||
}
|
||||
|
||||
func (t *Transport) Reset() {
|
||||
t.connector.Reset()
|
||||
t.connection.Reset()
|
||||
}
|
||||
|
||||
func (t *Transport) Exchange(ctx context.Context, message *mDNS.Msg) (*mDNS.Msg, error) {
|
||||
if !t.BeginQuery() {
|
||||
return nil, transport.ErrTransportClosed
|
||||
}
|
||||
defer t.EndQuery()
|
||||
|
||||
var (
|
||||
conn *quic.Conn
|
||||
err error
|
||||
response *mDNS.Msg
|
||||
)
|
||||
for i := 0; i < 2; i++ {
|
||||
conn, err = t.connector.Get(ctx)
|
||||
conn, _, err = t.connection.Acquire(ctx, func(ctx context.Context) (*quic.Conn, error) {
|
||||
rawConn, err := t.dialer.DialContext(ctx, N.NetworkUDP, t.serverAddr)
|
||||
if err != nil {
|
||||
return nil, E.Cause(err, "dial UDP connection")
|
||||
}
|
||||
earlyConnection, err := sQUIC.DialEarly(
|
||||
ctx,
|
||||
bufio.NewUnbindPacketConn(rawConn),
|
||||
t.serverAddr.UDPAddr(),
|
||||
t.tlsConfig,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
rawConn.Close()
|
||||
return nil, E.Cause(err, "establish QUIC connection")
|
||||
}
|
||||
return earlyConnection, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
response, err = t.exchange(ctx, message, conn)
|
||||
if err == nil {
|
||||
t.connection.Release(conn, true)
|
||||
return response, nil
|
||||
} else if !isQUICRetryError(err) {
|
||||
t.connection.Release(conn, true)
|
||||
return nil, err
|
||||
} else {
|
||||
t.connector.Reset()
|
||||
t.connection.Release(conn, true)
|
||||
t.Reset()
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/sagernet/sing-box/adapter"
|
||||
@@ -17,7 +16,6 @@ import (
|
||||
"github.com/sagernet/sing/common/logger"
|
||||
M "github.com/sagernet/sing/common/metadata"
|
||||
N "github.com/sagernet/sing/common/network"
|
||||
"github.com/sagernet/sing/common/x/list"
|
||||
|
||||
mDNS "github.com/miekg/dns"
|
||||
)
|
||||
@@ -29,13 +27,13 @@ func RegisterTLS(registry *dns.TransportRegistry) {
|
||||
}
|
||||
|
||||
type TLSTransport struct {
|
||||
*BaseTransport
|
||||
dns.TransportAdapter
|
||||
logger logger.ContextLogger
|
||||
|
||||
dialer tls.Dialer
|
||||
serverAddr M.Socksaddr
|
||||
tlsConfig tls.Config
|
||||
access sync.Mutex
|
||||
connections list.List[*tlsDNSConn]
|
||||
connections *ConnPool[*tlsDNSConn]
|
||||
}
|
||||
|
||||
type tlsDNSConn struct {
|
||||
@@ -66,10 +64,20 @@ func NewTLS(ctx context.Context, logger log.ContextLogger, tag string, options o
|
||||
|
||||
func NewTLSRaw(logger logger.ContextLogger, adapter dns.TransportAdapter, dialer N.Dialer, serverAddr M.Socksaddr, tlsConfig tls.Config) *TLSTransport {
|
||||
return &TLSTransport{
|
||||
BaseTransport: NewBaseTransport(adapter, logger),
|
||||
dialer: tls.NewDialer(dialer, tlsConfig),
|
||||
serverAddr: serverAddr,
|
||||
tlsConfig: tlsConfig,
|
||||
TransportAdapter: adapter,
|
||||
logger: logger,
|
||||
dialer: tls.NewDialer(dialer, tlsConfig),
|
||||
serverAddr: serverAddr,
|
||||
tlsConfig: tlsConfig,
|
||||
connections: NewConnPool(ConnPoolOptions[*tlsDNSConn]{
|
||||
Mode: ConnPoolOrdered,
|
||||
IsAlive: func(conn *tlsDNSConn) bool {
|
||||
return conn != nil
|
||||
},
|
||||
Close: func(conn *tlsDNSConn, _ error) {
|
||||
conn.Close()
|
||||
},
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,53 +85,43 @@ func (t *TLSTransport) Start(stage adapter.StartStage) error {
|
||||
if stage != adapter.StartStateStart {
|
||||
return nil
|
||||
}
|
||||
err := t.SetStarted()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return dialer.InitializeDetour(t.dialer)
|
||||
}
|
||||
|
||||
func (t *TLSTransport) Close() error {
|
||||
t.access.Lock()
|
||||
for connection := t.connections.Front(); connection != nil; connection = connection.Next() {
|
||||
connection.Value.Close()
|
||||
}
|
||||
t.connections.Init()
|
||||
t.access.Unlock()
|
||||
return t.BaseTransport.Close()
|
||||
return t.connections.Close()
|
||||
}
|
||||
|
||||
func (t *TLSTransport) Reset() {
|
||||
t.access.Lock()
|
||||
defer t.access.Unlock()
|
||||
for connection := t.connections.Front(); connection != nil; connection = connection.Next() {
|
||||
connection.Value.Close()
|
||||
}
|
||||
t.connections.Init()
|
||||
t.connections.Reset()
|
||||
}
|
||||
|
||||
func (t *TLSTransport) Exchange(ctx context.Context, message *mDNS.Msg) (*mDNS.Msg, error) {
|
||||
if !t.BeginQuery() {
|
||||
return nil, ErrTransportClosed
|
||||
}
|
||||
defer t.EndQuery()
|
||||
|
||||
t.access.Lock()
|
||||
conn := t.connections.PopFront()
|
||||
t.access.Unlock()
|
||||
if conn != nil {
|
||||
var lastErr error
|
||||
for attempt := 0; attempt < 2; attempt++ {
|
||||
conn, created, err := t.connections.Acquire(ctx, func(ctx context.Context) (*tlsDNSConn, error) {
|
||||
tlsConn, err := t.dialer.DialTLSContext(ctx, t.serverAddr)
|
||||
if err != nil {
|
||||
return nil, E.Cause(err, "dial TLS connection")
|
||||
}
|
||||
return &tlsDNSConn{Conn: tlsConn}, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
response, err := t.exchange(ctx, message, conn)
|
||||
if err == nil {
|
||||
t.connections.Release(conn, true)
|
||||
return response, nil
|
||||
}
|
||||
t.Logger.DebugContext(ctx, "discarded pooled connection: ", err)
|
||||
lastErr = err
|
||||
t.logger.DebugContext(ctx, "discarded pooled connection: ", err)
|
||||
t.connections.Release(conn, false)
|
||||
if created {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
tlsConn, err := t.dialer.DialTLSContext(ctx, t.serverAddr)
|
||||
if err != nil {
|
||||
return nil, E.Cause(err, "dial TLS connection")
|
||||
}
|
||||
return t.exchange(ctx, message, &tlsDNSConn{Conn: tlsConn})
|
||||
return nil, lastErr
|
||||
}
|
||||
|
||||
func (t *TLSTransport) exchange(ctx context.Context, message *mDNS.Msg, conn *tlsDNSConn) (*mDNS.Msg, error) {
|
||||
@@ -133,22 +131,12 @@ func (t *TLSTransport) exchange(ctx context.Context, message *mDNS.Msg, conn *tl
|
||||
conn.queryId++
|
||||
err := WriteMessage(conn, conn.queryId, message)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return nil, E.Cause(err, "write request")
|
||||
}
|
||||
response, err := ReadMessage(conn)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return nil, E.Cause(err, "read response")
|
||||
}
|
||||
t.access.Lock()
|
||||
if t.State() >= StateClosing {
|
||||
t.access.Unlock()
|
||||
conn.Close()
|
||||
return response, nil
|
||||
}
|
||||
conn.SetDeadline(time.Time{})
|
||||
t.connections.PushBack(conn)
|
||||
t.access.Unlock()
|
||||
return response, nil
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
@@ -27,13 +28,14 @@ func RegisterUDP(registry *dns.TransportRegistry) {
|
||||
}
|
||||
|
||||
type UDPTransport struct {
|
||||
*BaseTransport
|
||||
dns.TransportAdapter
|
||||
logger logger.ContextLogger
|
||||
|
||||
dialer N.Dialer
|
||||
serverAddr M.Socksaddr
|
||||
udpSize atomic.Int32
|
||||
|
||||
connector *Connector[*Connection]
|
||||
connection *ConnPool[net.Conn]
|
||||
|
||||
callbackAccess sync.RWMutex
|
||||
queryId uint16
|
||||
@@ -63,43 +65,38 @@ func NewUDP(ctx context.Context, logger log.ContextLogger, tag string, options o
|
||||
|
||||
func NewUDPRaw(logger logger.ContextLogger, adapter dns.TransportAdapter, dialerInstance N.Dialer, serverAddr M.Socksaddr) *UDPTransport {
|
||||
t := &UDPTransport{
|
||||
BaseTransport: NewBaseTransport(adapter, logger),
|
||||
dialer: dialerInstance,
|
||||
serverAddr: serverAddr,
|
||||
callbacks: make(map[uint16]*udpCallback),
|
||||
TransportAdapter: adapter,
|
||||
logger: logger,
|
||||
dialer: dialerInstance,
|
||||
serverAddr: serverAddr,
|
||||
callbacks: make(map[uint16]*udpCallback),
|
||||
connection: NewConnPool(ConnPoolOptions[net.Conn]{
|
||||
Mode: ConnPoolSingle,
|
||||
IsAlive: func(conn net.Conn) bool {
|
||||
return conn != nil
|
||||
},
|
||||
Close: func(conn net.Conn, cause error) {
|
||||
conn.Close()
|
||||
},
|
||||
}),
|
||||
}
|
||||
t.udpSize.Store(2048)
|
||||
t.connector = NewSingleflightConnector(t.CloseContext(), t.dial)
|
||||
return t
|
||||
}
|
||||
|
||||
func (t *UDPTransport) dial(ctx context.Context) (*Connection, error) {
|
||||
rawConn, err := t.dialer.DialContext(ctx, N.NetworkUDP, t.serverAddr)
|
||||
if err != nil {
|
||||
return nil, E.Cause(err, "dial UDP connection")
|
||||
}
|
||||
conn := WrapConnection(rawConn)
|
||||
go t.recvLoop(conn)
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (t *UDPTransport) Start(stage adapter.StartStage) error {
|
||||
if stage != adapter.StartStateStart {
|
||||
return nil
|
||||
}
|
||||
err := t.SetStarted()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return dialer.InitializeDetour(t.dialer)
|
||||
}
|
||||
|
||||
func (t *UDPTransport) Close() error {
|
||||
return E.Errors(t.BaseTransport.Close(), t.connector.Close())
|
||||
return t.connection.Close()
|
||||
}
|
||||
|
||||
func (t *UDPTransport) Reset() {
|
||||
t.connector.Reset()
|
||||
t.connection.Reset()
|
||||
}
|
||||
|
||||
func (t *UDPTransport) nextAvailableQueryId() (uint16, error) {
|
||||
@@ -116,17 +113,12 @@ func (t *UDPTransport) nextAvailableQueryId() (uint16, error) {
|
||||
}
|
||||
|
||||
func (t *UDPTransport) Exchange(ctx context.Context, message *mDNS.Msg) (*mDNS.Msg, error) {
|
||||
if !t.BeginQuery() {
|
||||
return nil, ErrTransportClosed
|
||||
}
|
||||
defer t.EndQuery()
|
||||
|
||||
response, err := t.exchange(ctx, message)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if response.Truncated {
|
||||
t.Logger.InfoContext(ctx, "response truncated, retrying with TCP")
|
||||
t.logger.InfoContext(ctx, "response truncated, retrying with TCP")
|
||||
return t.exchangeTCP(ctx, message)
|
||||
}
|
||||
return response, nil
|
||||
@@ -158,16 +150,25 @@ func (t *UDPTransport) exchange(ctx context.Context, message *mDNS.Msg) (*mDNS.M
|
||||
break
|
||||
}
|
||||
if t.udpSize.CompareAndSwap(current, udpSize) {
|
||||
t.connector.Reset()
|
||||
t.Reset()
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
conn, err := t.connector.Get(ctx)
|
||||
conn, connCtx, created, err := t.connection.AcquireShared(ctx, func(ctx context.Context) (net.Conn, error) {
|
||||
rawConn, err := t.dialer.DialContext(ctx, N.NetworkUDP, t.serverAddr)
|
||||
if err != nil {
|
||||
return nil, E.Cause(err, "dial UDP connection")
|
||||
}
|
||||
return rawConn, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if created {
|
||||
go t.recvLoop(conn)
|
||||
}
|
||||
|
||||
callback := &udpCallback{
|
||||
done: make(chan struct{}),
|
||||
@@ -177,6 +178,7 @@ func (t *UDPTransport) exchange(ctx context.Context, message *mDNS.Msg) (*mDNS.M
|
||||
queryId, err := t.nextAvailableQueryId()
|
||||
if err != nil {
|
||||
t.callbackAccess.Unlock()
|
||||
t.connection.Release(conn, true)
|
||||
return nil, err
|
||||
}
|
||||
t.callbacks[queryId] = callback
|
||||
@@ -203,30 +205,30 @@ func (t *UDPTransport) exchange(ctx context.Context, message *mDNS.Msg) (*mDNS.M
|
||||
|
||||
_, err = conn.Write(rawMessage)
|
||||
if err != nil {
|
||||
conn.CloseWithError(err)
|
||||
t.connection.Invalidate(conn, err)
|
||||
return nil, E.Cause(err, "write request")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-callback.done:
|
||||
t.connection.Release(conn, true)
|
||||
callback.response.Id = originalId
|
||||
return callback.response, nil
|
||||
case <-conn.Done():
|
||||
return nil, conn.CloseError()
|
||||
case <-t.CloseContext().Done():
|
||||
return nil, ErrTransportClosed
|
||||
case <-connCtx.Done():
|
||||
return nil, context.Cause(connCtx)
|
||||
case <-ctx.Done():
|
||||
t.connection.Release(conn, true)
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (t *UDPTransport) recvLoop(conn *Connection) {
|
||||
func (t *UDPTransport) recvLoop(conn net.Conn) {
|
||||
for {
|
||||
buffer := buf.NewSize(int(t.udpSize.Load()))
|
||||
_, err := buffer.ReadOnceFrom(conn)
|
||||
if err != nil {
|
||||
buffer.Release()
|
||||
conn.CloseWithError(err)
|
||||
t.connection.Invalidate(conn, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -234,7 +236,7 @@ func (t *UDPTransport) recvLoop(conn *Connection) {
|
||||
err = message.Unpack(buffer.Bytes())
|
||||
buffer.Release()
|
||||
if err != nil {
|
||||
t.Logger.Debug("discarded malformed UDP response: ", err)
|
||||
t.logger.Debug("discarded malformed UDP response: ", err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user