platform: Add group interface

This commit is contained in:
世界
2023-07-02 16:45:30 +08:00
parent 5ad0ea2b5a
commit 9c8565cf21
25 changed files with 576 additions and 95 deletions

View File

@@ -23,6 +23,7 @@ import (
E "github.com/sagernet/sing/common/exceptions"
F "github.com/sagernet/sing/common/format"
N "github.com/sagernet/sing/common/network"
"github.com/sagernet/sing/service"
"github.com/sagernet/sing/service/filemanager"
"github.com/sagernet/websocket"
@@ -68,13 +69,16 @@ func NewServer(ctx context.Context, router adapter.Router, logFactory log.Observ
Handler: chiRouter,
},
trafficManager: trafficManager,
urlTestHistory: urltest.NewHistoryStorage(),
mode: strings.ToLower(options.DefaultMode),
storeSelected: options.StoreSelected,
storeFakeIP: options.StoreFakeIP,
externalUIDownloadURL: options.ExternalUIDownloadURL,
externalUIDownloadDetour: options.ExternalUIDownloadDetour,
}
server.urlTestHistory = service.PtrFromContext[urltest.HistoryStorage](ctx)
if server.urlTestHistory == nil {
server.urlTestHistory = urltest.NewHistoryStorage()
}
if server.mode == "" {
server.mode = "rule"
}

View File

@@ -3,7 +3,9 @@ package libbox
const (
CommandLog int32 = iota
CommandStatus
CommandServiceStop
CommandServiceReload
CommandCloseConnections
CommandGroup
CommandSelectOutbound
CommandURLTest
)

View File

@@ -26,6 +26,13 @@ type CommandClientHandler interface {
Disconnected(message string)
WriteLog(message string)
WriteStatus(message *StatusMessage)
WriteGroups(message OutboundGroupIterator)
}
func NewStandaloneCommandClient(sharedDirectory string) *CommandClient {
return &CommandClient{
sharedDirectory: sharedDirectory,
}
}
func NewCommandClient(sharedDirectory string, handler CommandClientHandler, options *CommandClientOptions) *CommandClient {
@@ -36,16 +43,16 @@ func NewCommandClient(sharedDirectory string, handler CommandClientHandler, opti
}
}
func clientConnect(sharedDirectory string) (net.Conn, error) {
func (c *CommandClient) directConnect() (net.Conn, error) {
return net.DialUnix("unix", nil, &net.UnixAddr{
Name: filepath.Join(sharedDirectory, "command.sock"),
Name: filepath.Join(c.sharedDirectory, "command.sock"),
Net: "unix",
})
}
func (c *CommandClient) Connect() error {
common.Close(c.conn)
conn, err := clientConnect(c.sharedDirectory)
conn, err := c.directConnect()
if err != nil {
return err
}
@@ -65,6 +72,13 @@ func (c *CommandClient) Connect() error {
}
c.handler.Connected()
go c.handleStatusConn(conn)
case CommandGroup:
err = binary.Write(conn, binary.BigEndian, c.options.StatusInterval)
if err != nil {
return E.Cause(err, "write interval")
}
c.handler.Connected()
go c.handleGroupConn(conn)
}
return nil
}

View File

@@ -9,8 +9,8 @@ import (
"github.com/sagernet/sing-box/common/dialer/conntrack"
)
func ClientCloseConnections(sharedDirectory string) error {
conn, err := clientConnect(sharedDirectory)
func (c *CommandClient) CloseConnections() error {
conn, err := c.directConnect()
if err != nil {
return err
}

View File

@@ -0,0 +1,228 @@
package libbox
import (
"encoding/binary"
"io"
"net"
"github.com/sagernet/sing-box/adapter"
"github.com/sagernet/sing-box/common/urltest"
"github.com/sagernet/sing-box/outbound"
"github.com/sagernet/sing/common/rw"
"github.com/sagernet/sing/service"
)
type OutboundGroup struct {
Tag string
Type string
Selectable bool
Selected string
items []*OutboundGroupItem
}
func (g *OutboundGroup) GetItems() OutboundGroupItemIterator {
return newIterator(g.items)
}
type OutboundGroupIterator interface {
Next() *OutboundGroup
HasNext() bool
}
type OutboundGroupItem struct {
Tag string
Type string
URLTestTime int64
URLTestDelay int32
}
type OutboundGroupItemIterator interface {
Next() *OutboundGroupItem
HasNext() bool
}
func (c *CommandClient) handleGroupConn(conn net.Conn) {
defer conn.Close()
for {
groups, err := readGroups(conn)
if err != nil {
c.handler.Disconnected(err.Error())
return
}
c.handler.WriteGroups(groups)
}
}
func (s *CommandServer) handleGroupConn(conn net.Conn) error {
defer conn.Close()
ctx := connKeepAlive(conn)
for {
service := s.service
if service != nil {
err := writeGroups(conn, service)
if err != nil {
return err
}
} else {
err := binary.Write(conn, binary.BigEndian, uint16(0))
if err != nil {
return err
}
}
select {
case <-ctx.Done():
return ctx.Err()
case <-s.urlTestUpdate:
}
}
}
func readGroups(reader io.Reader) (OutboundGroupIterator, error) {
var groupLength uint16
err := binary.Read(reader, binary.BigEndian, &groupLength)
if err != nil {
return nil, err
}
groups := make([]*OutboundGroup, 0, groupLength)
for i := 0; i < int(groupLength); i++ {
var group OutboundGroup
group.Tag, err = rw.ReadVString(reader)
if err != nil {
return nil, err
}
group.Type, err = rw.ReadVString(reader)
if err != nil {
return nil, err
}
err = binary.Read(reader, binary.BigEndian, &group.Selectable)
if err != nil {
return nil, err
}
group.Selected, err = rw.ReadVString(reader)
if err != nil {
return nil, err
}
var itemLength uint16
err = binary.Read(reader, binary.BigEndian, &itemLength)
if err != nil {
return nil, err
}
group.items = make([]*OutboundGroupItem, itemLength)
for j := 0; j < int(itemLength); j++ {
var item OutboundGroupItem
item.Tag, err = rw.ReadVString(reader)
if err != nil {
return nil, err
}
item.Type, err = rw.ReadVString(reader)
if err != nil {
return nil, err
}
err = binary.Read(reader, binary.BigEndian, &item.URLTestTime)
if err != nil {
return nil, err
}
err = binary.Read(reader, binary.BigEndian, &item.URLTestDelay)
if err != nil {
return nil, err
}
group.items[j] = &item
}
groups = append(groups, &group)
}
return newIterator(groups), nil
}
func writeGroups(writer io.Writer, boxService *BoxService) error {
historyStorage := service.PtrFromContext[urltest.HistoryStorage](boxService.ctx)
outbounds := boxService.instance.Router().Outbounds()
var iGroups []adapter.OutboundGroup
for _, it := range outbounds {
if group, isGroup := it.(adapter.OutboundGroup); isGroup {
iGroups = append(iGroups, group)
}
}
var groups []OutboundGroup
for _, iGroup := range iGroups {
var group OutboundGroup
group.Tag = iGroup.Tag()
group.Type = iGroup.Type()
_, group.Selectable = iGroup.(*outbound.Selector)
group.Selected = iGroup.Now()
for _, itemTag := range iGroup.All() {
itemOutbound, isLoaded := boxService.instance.Router().Outbound(itemTag)
if !isLoaded {
continue
}
var item OutboundGroupItem
item.Tag = itemTag
item.Type = itemOutbound.Type()
if history := historyStorage.LoadURLTestHistory(adapter.OutboundTag(itemOutbound)); history != nil {
item.URLTestTime = history.Time.Unix()
item.URLTestDelay = int32(history.Delay)
}
group.items = append(group.items, &item)
}
groups = append(groups, group)
}
err := binary.Write(writer, binary.BigEndian, uint16(len(groups)))
if err != nil {
return err
}
for _, group := range groups {
err = rw.WriteVString(writer, group.Tag)
if err != nil {
return err
}
err = rw.WriteVString(writer, group.Type)
if err != nil {
return err
}
err = binary.Write(writer, binary.BigEndian, group.Selectable)
if err != nil {
return err
}
err = rw.WriteVString(writer, group.Selected)
if err != nil {
return err
}
err = binary.Write(writer, binary.BigEndian, uint16(len(group.items)))
if err != nil {
return err
}
for _, item := range group.items {
err = rw.WriteVString(writer, item.Tag)
if err != nil {
return err
}
err = rw.WriteVString(writer, item.Type)
if err != nil {
return err
}
err = binary.Write(writer, binary.BigEndian, item.URLTestTime)
if err != nil {
return err
}
err = binary.Write(writer, binary.BigEndian, item.URLTestDelay)
if err != nil {
return err
}
}
}
return nil
}

View File

@@ -11,7 +11,7 @@ func (s *CommandServer) WriteMessage(message string) {
s.subscriber.Emit(message)
s.access.Lock()
s.savedLines.PushBack(message)
if s.savedLines.Len() > 100 {
if s.savedLines.Len() > s.maxLines {
s.savedLines.Remove(s.savedLines.Front())
}
s.access.Unlock()

View File

@@ -8,8 +8,8 @@ import (
"github.com/sagernet/sing/common/rw"
)
func ClientServiceReload(sharedDirectory string) error {
conn, err := clientConnect(sharedDirectory)
func (c *CommandClient) ServiceReload() error {
conn, err := c.directConnect()
if err != nil {
return err
}

View File

@@ -0,0 +1,59 @@
package libbox
import (
"encoding/binary"
"net"
"github.com/sagernet/sing-box/outbound"
E "github.com/sagernet/sing/common/exceptions"
"github.com/sagernet/sing/common/rw"
)
func (c *CommandClient) SelectOutbound(groupTag string, outboundTag string) error {
conn, err := c.directConnect()
if err != nil {
return err
}
defer conn.Close()
err = binary.Write(conn, binary.BigEndian, uint8(CommandSelectOutbound))
if err != nil {
return err
}
err = rw.WriteVString(conn, groupTag)
if err != nil {
return err
}
err = rw.WriteVString(conn, outboundTag)
if err != nil {
return err
}
return readError(conn)
}
func (s *CommandServer) handleSelectOutbound(conn net.Conn) error {
defer conn.Close()
groupTag, err := rw.ReadVString(conn)
if err != nil {
return err
}
outboundTag, err := rw.ReadVString(conn)
if err != nil {
return err
}
service := s.service
if service == nil {
return writeError(conn, E.New("service not ready"))
}
outboundGroup, isLoaded := service.instance.Router().Outbound(groupTag)
if !isLoaded {
return writeError(conn, E.New("selector not found: ", groupTag))
}
selector, isSelector := outboundGroup.(*outbound.Selector)
if !isSelector {
return writeError(conn, E.New("outbound is not a selector: ", groupTag))
}
if !selector.SelectOutbound(outboundTag) {
return writeError(conn, E.New("outbound not found in selector: ", outboundTag))
}
return writeError(conn, nil)
}

View File

@@ -7,12 +7,14 @@ import (
"path/filepath"
"sync"
"github.com/sagernet/sing-box/common/urltest"
"github.com/sagernet/sing-box/log"
"github.com/sagernet/sing/common"
"github.com/sagernet/sing/common/debug"
E "github.com/sagernet/sing/common/exceptions"
"github.com/sagernet/sing/common/observable"
"github.com/sagernet/sing/common/x/list"
"github.com/sagernet/sing/service"
)
type CommandServer struct {
@@ -22,26 +24,51 @@ type CommandServer struct {
access sync.Mutex
savedLines *list.List[string]
maxLines int
subscriber *observable.Subscriber[string]
observer *observable.Observer[string]
service *BoxService
urlTestListener *list.Element[func()]
urlTestUpdate chan struct{}
}
type CommandServerHandler interface {
ServiceStop() error
ServiceReload() error
}
func NewCommandServer(sharedDirectory string, handler CommandServerHandler) *CommandServer {
func NewCommandServer(sharedDirectory string, handler CommandServerHandler, maxLines int32) *CommandServer {
server := &CommandServer{
sockPath: filepath.Join(sharedDirectory, "command.sock"),
handler: handler,
savedLines: new(list.List[string]),
subscriber: observable.NewSubscriber[string](128),
sockPath: filepath.Join(sharedDirectory, "command.sock"),
handler: handler,
savedLines: new(list.List[string]),
maxLines: int(maxLines),
subscriber: observable.NewSubscriber[string](128),
urlTestUpdate: make(chan struct{}, 1),
}
server.observer = observable.NewObserver[string](server.subscriber, 64)
return server
}
func (s *CommandServer) SetService(newService *BoxService) {
if s.service != nil && s.listener != nil {
service.PtrFromContext[urltest.HistoryStorage](s.service.ctx).RemoveListener(s.urlTestListener)
s.urlTestListener = nil
}
s.service = newService
if newService != nil {
s.urlTestListener = service.PtrFromContext[urltest.HistoryStorage](newService.ctx).AddListener(s.notifyURLTestUpdate)
}
s.notifyURLTestUpdate()
}
func (s *CommandServer) notifyURLTestUpdate() {
select {
case s.urlTestUpdate <- struct{}{}:
default:
}
}
func (s *CommandServer) Start() error {
os.Remove(s.sockPath)
listener, err := net.ListenUnix("unix", &net.UnixAddr{
@@ -92,12 +119,16 @@ func (s *CommandServer) handleConnection(conn net.Conn) error {
return s.handleLogConn(conn)
case CommandStatus:
return s.handleStatusConn(conn)
case CommandServiceStop:
return s.handleServiceStop(conn)
case CommandServiceReload:
return s.handleServiceReload(conn)
case CommandCloseConnections:
return s.handleCloseConnections(conn)
case CommandGroup:
return s.handleGroupConn(conn)
case CommandSelectOutbound:
return s.handleSelectOutbound(conn)
case CommandURLTest:
return s.handleURLTest(conn)
default:
return E.New("unknown command: ", command)
}

View File

@@ -0,0 +1,39 @@
package libbox
import (
"encoding/binary"
"io"
E "github.com/sagernet/sing/common/exceptions"
"github.com/sagernet/sing/common/rw"
)
func readError(reader io.Reader) error {
var hasError bool
err := binary.Read(reader, binary.BigEndian, &hasError)
if err != nil {
return err
}
if hasError {
errorMessage, err := rw.ReadVString(reader)
if err != nil {
return err
}
return E.New(errorMessage)
}
return nil
}
func writeError(writer io.Writer, wErr error) error {
err := binary.Write(writer, binary.BigEndian, wErr != nil)
if err != nil {
return err
}
if wErr != nil {
err = rw.WriteVString(writer, wErr.Error())
if err != nil {
return err
}
}
return nil
}

View File

@@ -1,48 +0,0 @@
package libbox
import (
"encoding/binary"
"net"
"runtime/debug"
E "github.com/sagernet/sing/common/exceptions"
"github.com/sagernet/sing/common/rw"
)
func ClientServiceStop(sharedDirectory string) error {
conn, err := clientConnect(sharedDirectory)
if err != nil {
return err
}
defer conn.Close()
err = binary.Write(conn, binary.BigEndian, uint8(CommandServiceStop))
if err != nil {
return err
}
var hasError bool
err = binary.Read(conn, binary.BigEndian, &hasError)
if err != nil {
return err
}
if hasError {
errorMessage, err := rw.ReadVString(conn)
if err != nil {
return err
}
return E.New(errorMessage)
}
return nil
}
func (s *CommandServer) handleServiceStop(conn net.Conn) error {
rErr := s.handler.ServiceStop()
err := binary.Write(conn, binary.BigEndian, rErr != nil)
if err != nil {
return err
}
if rErr != nil {
return rw.WriteVString(conn, rErr.Error())
}
debug.FreeOSMemory()
return nil
}

View File

@@ -0,0 +1,95 @@
package libbox
import (
"encoding/binary"
"net"
"time"
"github.com/sagernet/sing-box/adapter"
"github.com/sagernet/sing-box/common/urltest"
"github.com/sagernet/sing-box/outbound"
"github.com/sagernet/sing/common"
"github.com/sagernet/sing/common/batch"
E "github.com/sagernet/sing/common/exceptions"
"github.com/sagernet/sing/common/rw"
)
func (c *CommandClient) URLTest(groupTag string) error {
conn, err := c.directConnect()
if err != nil {
return err
}
defer conn.Close()
err = binary.Write(conn, binary.BigEndian, uint8(CommandURLTest))
if err != nil {
return err
}
err = rw.WriteVString(conn, groupTag)
if err != nil {
return err
}
return readError(conn)
}
func (s *CommandServer) handleURLTest(conn net.Conn) error {
defer conn.Close()
groupTag, err := rw.ReadVString(conn)
if err != nil {
return err
}
service := s.service
if service == nil {
return nil
}
abstractOutboundGroup, isLoaded := service.instance.Router().Outbound(groupTag)
if !isLoaded {
return writeError(conn, E.New("outbound group not found: ", groupTag))
}
outboundGroup, isOutboundGroup := abstractOutboundGroup.(adapter.OutboundGroup)
if !isOutboundGroup {
return writeError(conn, E.New("outbound is not a group: ", groupTag))
}
urlTest, isURLTest := abstractOutboundGroup.(*outbound.URLTest)
if isURLTest {
go urlTest.CheckOutbounds()
} else {
var historyStorage *urltest.HistoryStorage
if clashServer := service.instance.Router().ClashServer(); clashServer != nil {
historyStorage = clashServer.HistoryStorage()
} else {
return writeError(conn, E.New("Clash API is required for URLTest on non-URLTest group"))
}
outbounds := common.Filter(common.Map(outboundGroup.All(), func(it string) adapter.Outbound {
itOutbound, _ := service.instance.Router().Outbound(it)
return itOutbound
}), func(it adapter.Outbound) bool {
if it == nil {
return false
}
_, isGroup := it.(adapter.OutboundGroup)
if isGroup {
return false
}
return true
})
b, _ := batch.New(service.ctx, batch.WithConcurrencyNum[any](10))
for _, detour := range outbounds {
outboundToTest := detour
outboundTag := outboundToTest.Tag()
b.Go(outboundTag, func() (any, error) {
t, err := urltest.URLTest(service.ctx, "", outboundToTest)
if err != nil {
historyStorage.DeleteURLTestHistory(outboundTag)
} else {
historyStorage.StoreURLTestHistory(outboundTag, &urltest.History{
Time: time.Now(),
Delay: t,
})
}
return nil, nil
})
}
}
return writeError(conn, nil)
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/sagernet/sing-box"
"github.com/sagernet/sing-box/adapter"
"github.com/sagernet/sing-box/common/process"
"github.com/sagernet/sing-box/common/urltest"
"github.com/sagernet/sing-box/experimental/libbox/internal/procfs"
"github.com/sagernet/sing-box/experimental/libbox/platform"
"github.com/sagernet/sing-box/option"
@@ -16,6 +17,7 @@ import (
"github.com/sagernet/sing/common/control"
E "github.com/sagernet/sing/common/exceptions"
N "github.com/sagernet/sing/common/network"
"github.com/sagernet/sing/service"
"github.com/sagernet/sing/service/filemanager"
)
@@ -32,6 +34,7 @@ func NewService(configContent string, platformInterface PlatformInterface) (*Box
}
ctx, cancel := context.WithCancel(context.Background())
ctx = filemanager.WithDefault(ctx, sBasePath, sTempPath, sUserID, sGroupID)
ctx = service.ContextWithPtr(ctx, urltest.NewHistoryStorage())
instance, err := box.New(box.Options{
Context: ctx,
Options: options,