Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions internal/syncer/measurement_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package syncer

import (
"runtime"
"testing"
"time"
)

// The measurement closure must stop its ticker goroutine when invoked, and be
// safe to invoke more than once — finish() now calls it on every path
// (including error paths) in addition to the happy-path Result builder, so a
// double-call must neither panic (double close) nor change the reported value.
func TestStartMeasurementStopsGoroutineAndIsIdempotent(t *testing.T) {
before := runtime.NumGoroutine()

done := startMeasurement(true)

m1 := done()
if !m1.Enabled {
t.Fatalf("expected an enabled measurement, got %+v", m1)
}
m2 := done() // second call (the finish() path) must be a safe no-op
if m1 != m2 {
t.Fatalf("measurement changed across calls: %+v vs %+v", m1, m2)
}

// The ticker goroutine must have exited; poll to avoid races with the
// scheduler tearing it down.
deadline := time.Now().Add(2 * time.Second)
for runtime.NumGoroutine() > before {
if time.Now().After(deadline) {
t.Fatalf("measurement goroutine leaked: %d goroutines, baseline %d",
runtime.NumGoroutine(), before)
}
time.Sleep(10 * time.Millisecond)
}
}

// When disabled, no goroutine is started and the closure is still safe to call.
func TestStartMeasurementDisabledIsInert(t *testing.T) {
done := startMeasurement(false)
if m := done(); m.Enabled {
t.Fatalf("disabled measurement should not be enabled: %+v", m)
}
_ = done() // idempotent
}
43 changes: 30 additions & 13 deletions internal/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,13 +586,19 @@ type syncSession struct {
rejections map[plumbing.ReferenceName]string
}

// finish releases any resources owned by the session — currently the live
// progress ticker. Idempotent and safe to call from defer in callers that
// also produce results in the happy path.
// finish releases the resources owned by the session: the live progress
// ticker, the memory-measurement ticker goroutine, and the source/target
// transports (SSH transports spawn processes). Idempotent and safe to call
// from defer in callers that also produce results in the happy path —
// measurementDone is sync.Once-guarded, so an error path that never built a
// Result still stops its goroutine without disturbing the happy-path value.
func (s *syncSession) finish() {
if s.progress != nil {
s.progress.terminate()
}
if s.measurementDone != nil {
_ = s.measurementDone()
}
if s.sourceConn != nil {
_ = s.sourceConn.Close()
}
Expand Down Expand Up @@ -657,6 +663,16 @@ func newSession(ctx context.Context, cfg Config, needTarget bool) (*syncSession,
stats: newStats(cfg.ShowStats),
measurementDone: startMeasurement(cfg.MeasureMemory),
}
// startMeasurement spawned a ticker goroutine and the steps below open
// transports (SSH spawns a process). If we return an error partway through
// setup the caller has no session to finish(), so release everything here
// unless we hand the session back.
success := false
defer func() {
if !success {
s.finish()
}
}()
var warnedSSHStats bool
warnSSHStats := func(sourceConn, targetConn gitproto.Conn) {
if warnedSSHStats {
Expand Down Expand Up @@ -700,6 +716,9 @@ func newSession(ctx context.Context, cfg Config, needTarget bool) (*syncSession,
if err != nil {
return nil, fmt.Errorf("create target transport: %w", err)
}
// Hand the conn to the session immediately so the deferred cleanup
// closes it even if a ref-listing step below fails.
s.target = &targetSession{conn: targetConn}
targetConn.SetProgressWriter(&sessionStderr{s: s})
warnSSHStats(s.sourceConn, targetConn)
targetAdv, err := gitproto.AdvertisedRefsV1(ctx, targetConn, transport.ReceivePackService)
Expand All @@ -712,17 +731,14 @@ func newSession(ctx context.Context, cfg Config, needTarget bool) (*syncSession,
}
targetRefMap := gitproto.RefHashMap(targetRefSlice)
targetFeatures := gitproto.TargetFeaturesFromAdvRefs(targetAdv)
s.target = &targetSession{
conn: targetConn,
adv: targetAdv,
refMap: targetRefMap,
features: targetFeatures,
policy: planner.RelayTargetPolicy{
CapabilitiesKnown: targetFeatures.Known,
NoThin: targetFeatures.NoThin,
},
pusher: gitproto.NewPusher(targetConn, targetAdv, cfg.Verbose),
s.target.adv = targetAdv
s.target.refMap = targetRefMap
s.target.features = targetFeatures
s.target.policy = planner.RelayTargetPolicy{
CapabilitiesKnown: targetFeatures.Known,
NoThin: targetFeatures.NoThin,
}
s.target.pusher = gitproto.NewPusher(targetConn, targetAdv, cfg.Verbose)
if cfg.BestEffort {
s.rejections = make(map[plumbing.ReferenceName]string)
s.target.pusher.OnRejection = func(name plumbing.ReferenceName, status string) {
Expand Down Expand Up @@ -752,6 +768,7 @@ func newSession(ctx context.Context, cfg Config, needTarget bool) (*syncSession,
}
}

success = true
return s, nil
}

Expand Down
Loading