Skip to content

Commit 4f4b140

Browse files
committed
Refactor the polling
1 parent eff14e2 commit 4f4b140

File tree

8 files changed

+107
-42
lines changed

8 files changed

+107
-42
lines changed

chain/bridge.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,6 @@ func (b *Bridge) bridgeStream(ctx context.Context, listenConfig bridge.ListenCon
181181
raw = idle.NewIdleConn(raw, idleTimeout)
182182
}
183183
backoff = time.Second / 10
184-
b.logger.Info("Connect", "remote_address", raw.RemoteAddr().String())
185184
go b.stepIgnoreErr(ctx, dialer, raw, dials)
186185
}
187186
}(i, l)
@@ -289,7 +288,6 @@ func (b *Bridge) bridgeProxy(ctx context.Context, listenConfig bridge.ListenConf
289288
raw = idle.NewIdleConn(raw, idleTimeout)
290289
}
291290
backoff = time.Second / 10
292-
b.logger.Info("Connect", "remote_address", raw.RemoteAddr().String())
293291
go h.ServeConn(raw)
294292
}
295293
}(i, host)

chain/chain.go

Lines changed: 99 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,36 @@ package chain
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
6-
"math/rand"
7+
"math"
8+
"net"
79
"strings"
10+
"sync"
11+
"time"
812

913
"github.com/wzshiming/bridge"
1014
"github.com/wzshiming/bridge/config"
1115
"github.com/wzshiming/bridge/internal/scheme"
12-
"github.com/wzshiming/schedialer"
13-
"github.com/wzshiming/schedialer/plugins/probe"
14-
"github.com/wzshiming/schedialer/plugins/roundrobin"
16+
"github.com/wzshiming/bridge/logger"
1517
)
1618

1719
// BridgeChain is a bridger that supports multiple crossing of bridger.
1820
type BridgeChain struct {
1921
DialerFunc func(dialer bridge.Dialer) bridge.Dialer
2022
proto map[string]bridge.Bridger
2123
defaultProto bridge.Bridger
24+
25+
backoffCount map[string]uint64
26+
mutex sync.Mutex
2227
}
2328

2429
// NewBridgeChain create a new BridgeChain.
2530
func NewBridgeChain() *BridgeChain {
2631
return &BridgeChain{
27-
proto: map[string]bridge.Bridger{},
28-
DialerFunc: NewEnvDialer,
32+
proto: map[string]bridge.Bridger{},
33+
DialerFunc: NewEnvDialer,
34+
backoffCount: map[string]uint64{},
2935
}
3036
}
3137

@@ -35,7 +41,7 @@ func (b *BridgeChain) BridgeChain(ctx context.Context, dialer bridge.Dialer, add
3541
return dialer, nil
3642
}
3743
address := addresses[len(addresses)-1]
38-
d, err := b.Dial(ctx, dialer, strings.Split(address, "|"), "")
44+
d, err := b.multiDial(ctx, dialer, strings.Split(address, "|"))
3945
if err != nil {
4046
return nil, err
4147
}
@@ -65,7 +71,7 @@ func (b *BridgeChain) bridgeChainWithConfig(ctx context.Context, dialer bridge.D
6571
return dialer, nil
6672
}
6773
address := addresses[len(addresses)-1]
68-
d, err := b.Dial(ctx, dialer, address.LB, address.Probe)
74+
d, err := b.multiDial(ctx, dialer, address.LB)
6975
if err != nil {
7076
return nil, err
7177
}
@@ -76,32 +82,16 @@ func (b *BridgeChain) bridgeChainWithConfig(ctx context.Context, dialer bridge.D
7682
return b.bridgeChainWithConfig(ctx, d, addresses...)
7783
}
7884

79-
func (b *BridgeChain) Dial(ctx context.Context, dialer bridge.Dialer, addresses []string, probeUrl string) (bridge.Dialer, error) {
80-
if len(addresses) == 1 {
81-
return b.dialOne(ctx, dialer, addresses[0])
82-
}
83-
plugins := []schedialer.Plugin{
84-
roundrobin.NewRoundRobinWithIndex(100, rand.Uint64()%uint64(len(addresses))),
85-
}
86-
if probeUrl != "" {
87-
plugins = append(plugins, probe.NewProbe(100, probeUrl))
88-
}
89-
plugin := schedialer.NewPlugins(plugins...)
90-
for _, address := range addresses {
91-
dial, err := b.dialOne(ctx, dialer, address)
92-
if err != nil {
93-
return nil, err
94-
}
95-
proxy := schedialer.Proxy{
96-
Name: address,
97-
Dialer: dial,
98-
}
99-
plugin.AddProxy(ctx, &proxy)
85+
func (b *BridgeChain) multiDial(ctx context.Context, dialer bridge.Dialer, addresses []string) (bridge.Dialer, error) {
86+
useCount := &backoffManager{
87+
addresses: addresses,
88+
dialer: dialer,
89+
bc: b,
10090
}
101-
return schedialer.NewSchedialer(plugin), nil
91+
return useCount, nil
10292
}
10393

104-
func (b *BridgeChain) dialOne(ctx context.Context, dialer bridge.Dialer, address string) (bridge.Dialer, error) {
94+
func (b *BridgeChain) singleDial(ctx context.Context, dialer bridge.Dialer, address string) (bridge.Dialer, error) {
10595
sch, _, ok := scheme.SplitSchemeAddr(address)
10696
if !ok {
10797
return nil, fmt.Errorf("unsupported protocol format %q", address)
@@ -126,3 +116,81 @@ func (b *BridgeChain) Register(name string, bridger bridge.Bridger) error {
126116
func (b *BridgeChain) RegisterDefault(bridger bridge.Bridger) {
127117
b.defaultProto = bridger
128118
}
119+
120+
type backoffManager struct {
121+
addresses []string
122+
dialer bridge.Dialer
123+
124+
bc *BridgeChain
125+
}
126+
127+
func (u *backoffManager) useLeastAndCount(addresses []string) string {
128+
if len(addresses) == 1 {
129+
return addresses[0]
130+
}
131+
min := uint64(math.MaxUint64)
132+
133+
u.bc.mutex.Lock()
134+
defer u.bc.mutex.Unlock()
135+
136+
var minAddress string
137+
for _, address := range addresses {
138+
if u.bc.backoffCount[address] < min {
139+
min = u.bc.backoffCount[address]
140+
minAddress = address
141+
}
142+
}
143+
u.bc.backoffCount[minAddress]++
144+
return minAddress
145+
}
146+
147+
func (u *backoffManager) backoff(address string, count uint64) {
148+
u.bc.mutex.Lock()
149+
defer u.bc.mutex.Unlock()
150+
u.bc.backoffCount[address] += count
151+
}
152+
153+
func (u *backoffManager) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
154+
var errs []error
155+
156+
timeout := time.Second * 10
157+
dl, ok := ctx.Deadline()
158+
if ok {
159+
timeout = time.Until(dl)
160+
}
161+
162+
period := timeout / time.Duration(len(u.addresses))
163+
if period < time.Second {
164+
period = time.Second
165+
} else if period > 5*time.Second {
166+
period = 5 * time.Second
167+
}
168+
169+
for i := 0; i < len(u.addresses); i++ {
170+
addr := u.useLeastAndCount(u.addresses)
171+
172+
ctx, cancel := context.WithTimeout(ctx, period)
173+
174+
dialer, err := u.bc.singleDial(ctx, u.dialer, addr)
175+
if err != nil {
176+
errs = append(errs, err)
177+
logger.Std.Warn("failed dial", "err", err, "previous", addr)
178+
cancel()
179+
u.backoff(addr, 10)
180+
continue
181+
}
182+
conn, err := dialer.DialContext(ctx, network, address)
183+
if err != nil {
184+
errs = append(errs, err)
185+
logger.Std.Warn("failed dial target", "err", err, "previous", addr, "target", address)
186+
cancel()
187+
u.backoff(addr, 5)
188+
continue
189+
}
190+
191+
logger.Std.Info("success dial target", "previous", addr, "target", address)
192+
cancel()
193+
return conn, nil
194+
}
195+
return nil, fmt.Errorf("all addresses are failed: %w", errors.Join(errs...))
196+
}

cmd/bridge/main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ func run(ctx context.Context, log *slog.Logger, tasks []config.Chain) {
136136
}
137137
go func(task config.Chain) {
138138
defer wg.Done()
139-
log := log.With("chains", task)
140139
log.Info(chain.ShowChainWithConfig(task))
141140
b := chain.NewBridge(log, dump)
142141
err := b.BridgeWithConfig(ctx, task)

cmd/bridge/main_other.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ func runWithReload(ctx context.Context, log *slog.Logger, tasks []config.Chain,
6363
wg.Add(1)
6464
go func(ctx context.Context, task config.Chain) {
6565
defer wg.Done()
66-
log := log.With("chains", task)
6766
log.Info(chain.ShowChainWithConfig(task))
6867
for ctx.Err() == nil {
6968
b := chain.NewBridge(log, dump)

config/config.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,7 @@ func (c Chain) Unique() string {
100100
}
101101

102102
type Node struct {
103-
Probe string `json:"probe"`
104-
LB []string `json:"lb"`
103+
LB []string `json:"lb"`
105104
}
106105

107106
func (m Node) MarshalJSON() ([]byte, error) {

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ require (
1414
github.com/wzshiming/httpproxy v0.5.6
1515
github.com/wzshiming/notify v0.1.1
1616
github.com/wzshiming/permuteproxy v0.0.2
17-
github.com/wzshiming/schedialer v0.6.1
1817
github.com/wzshiming/shadowsocks v0.4.1
1918
github.com/wzshiming/socks4 v0.3.2
2019
github.com/wzshiming/socks5 v0.5.1

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ github.com/wzshiming/notify v0.1.1 h1:rJXoszrkNglhCVyn/IfW500f5cW03q1q7YzL8hsLch
2626
github.com/wzshiming/notify v0.1.1/go.mod h1:SFhsQKZJznzsDcj/Qfo9A65k5IRcpUrpgbLRzZEa/DI=
2727
github.com/wzshiming/permuteproxy v0.0.2 h1:svedMueotlxJk9oJfA0gs8WzRYOdgd0DER9XvKpjwlY=
2828
github.com/wzshiming/permuteproxy v0.0.2/go.mod h1:Ny08A1JbuljB8FeJAOiB7dfvRGCVD8PB9hwrALIvYI8=
29-
github.com/wzshiming/schedialer v0.6.1 h1:4VwtIjVF3uMoWqjbyw3oqYi7WGOEYvDu3L9OYT8sbGY=
30-
github.com/wzshiming/schedialer v0.6.1/go.mod h1:TvVxg4QZIBTJzRfmL/G7g6CzynFQKPmtXtSeJ2c4Lus=
3129
github.com/wzshiming/shadowsocks v0.4.1 h1:tyLYtLSQs90jpMIkD+T8KuZH5foXwOH0ZjxSOb45orI=
3230
github.com/wzshiming/shadowsocks v0.4.1/go.mod h1:CfKm/Keclli2sPGfjskGVH+F3gpF0YPVdcf5t4krypY=
3331
github.com/wzshiming/socks4 v0.3.2 h1:w87nwfgRWteVwIH39nqTur8c+2dcODeLgLrWspcUkSc=

logger/logger.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package logger
22

33
import (
4+
"context"
45
"fmt"
56
"log/slog"
7+
"os"
68
)
79

8-
var Std = slog.Default()
10+
var Std = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
11+
Level: slog.LevelInfo,
12+
AddSource: true,
13+
}))
914

1015
func Wrap(logger *slog.Logger, name string) *wrap {
1116
return &wrap{
@@ -18,5 +23,5 @@ type wrap struct {
1823
}
1924

2025
func (w wrap) Println(v ...interface{}) {
21-
w.Logger.Info(fmt.Sprintln(v...))
26+
w.Logger.Log(context.Background(), slog.LevelInfo, fmt.Sprintln(v...))
2227
}

0 commit comments

Comments
 (0)