Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 41 additions & 23 deletions api/connector/Connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sync"
"time"

"github.com/devtron-labs/common-lib/async"
"github.com/devtron-labs/devtron/api/bean"
"github.com/gogo/protobuf/proto"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
Expand All @@ -45,19 +46,22 @@ type Pump interface {
}

type PumpImpl struct {
logger *zap.SugaredLogger
logger *zap.SugaredLogger
asyncRunnable *async.Runnable
}

func NewPumpImpl(logger *zap.SugaredLogger) *PumpImpl {
func NewPumpImpl(logger *zap.SugaredLogger, asyncRunnable *async.Runnable) *PumpImpl {
return &PumpImpl{
logger: logger,
logger: logger,
asyncRunnable: asyncRunnable,
}
}

func (impl PumpImpl) StartK8sStreamWithHeartBeat(ctx context.Context, w http.ResponseWriter, isReconnect bool, stream io.ReadCloser, err error) {
f, ok := w.(http.Flusher)
if !ok {
http.Error(w, "unexpected server doesnt support streaming", http.StatusInternalServerError)
return // was missing: f is nil past this point
}

w.Header().Set("Transfer-Encoding", "chunked")
Expand All @@ -81,18 +85,33 @@ func (impl PumpImpl) StartK8sStreamWithHeartBeat(ctx context.Context, w http.Res
return
}
}
// heartbeat start
// sync.Once ensures stream.Close is idempotent: the goroutine may call it
// via the ctx.Done path, and the deferred cleanup calls it too.
var closeOnce sync.Once
closeStream := func() { closeOnce.Do(func() { stream.Close() }) }

ticker := time.NewTicker(30 * time.Second)
done := make(chan struct{}) // close(done) never blocks, so no buffer needed
done := make(chan struct{})
var mux sync.Mutex

go func() {
// WaitGroup restores the happens-before edge that the old `done <- true`
// blocking send provided. wg.Wait() in the defer ensures the goroutine has
// fully exited before this function returns, preventing f.Flush() from being
// called after net/http.finishRequest() recycles the ResponseWriter.
var wg sync.WaitGroup
wg.Add(1)

// asyncRunnable.Execute wraps fn in a panic-recovering goroutine with metrics.
// wg.Done() is deferred inside fn so it fires even when Execute's recover()
// catches a panic (Go runs inner defers before outer recover).
impl.asyncRunnable.Execute(func() {
defer wg.Done()
for {
select {
case <-done:
return
case <-ctx.Done():
stream.Close() // unblocks the blocking bufReader.ReadString below
closeStream() // unblocks bufReader.ReadString in the main loop
return
case t := <-ticker.C:
mux.Lock()
Expand All @@ -107,17 +126,20 @@ func (impl PumpImpl) StartK8sStreamWithHeartBeat(ctx context.Context, w http.Res
}
}
}
}()
})

defer func() {
close(done) // unblock goroutine's select
ticker.Stop()
stream.Close() // idempotent: safe to call after goroutine already closed it
close(done) // signals goroutine to exit if still running
wg.Wait() // block until heartbeat goroutine has fully exited;
// only after this does the handler return and net/http
// reclaim the ResponseWriter
closeStream() // safe: goroutine no longer touches stream after wg.Wait()
}()

bufReader := bufio.NewReader(stream)
eof := false
for !eof {
// fast-exit: if ctx expired between reads, return immediately
select {
case <-ctx.Done():
return
Expand All @@ -132,40 +154,36 @@ func (impl PumpImpl) StartK8sStreamWithHeartBeat(ctx context.Context, w http.Res
}
} else if err != nil {
if ctx.Err() != nil {
// stream was closed because ctx expired — not an application error
return
}
impl.logger.Errorw("error in reading buffer string, StartK8sStreamWithHeartBeat", "err", err)
return
}
log = strings.TrimSpace(log)
if log == "" {
continue // blank line mid-stream: skip without aborting
continue
}
splitLog := strings.SplitN(log, " ", 2)
if len(splitLog) < 2 {
continue // no space separator: not a valid log line, skip
continue
}
parsedTime, err := time.Parse(time.RFC3339, splitLog[0])
if err != nil {
impl.logger.Errorw("error in writing data over sse", "err", err)
return
impl.logger.Errorw("error parsing log timestamp, skipping line", "err", err)
continue
}
eventId := strconv.FormatInt(parsedTime.UnixNano(), 10)
mux.Lock()
if len(splitLog) == 2 {
err = impl.sendEvent([]byte(eventId), nil, []byte(splitLog[1]), w)
}
if err == nil {
sendErr := impl.sendEvent([]byte(eventId), nil, []byte(splitLog[1]), w)
if sendErr == nil {
f.Flush()
}
mux.Unlock()
if err != nil {
impl.logger.Errorw("error in writing data over sse", "err", err)
if sendErr != nil {
impl.logger.Errorw("error in writing data over sse", "err", sendErr)
return
}
}
// heartbeat end
}

func (impl PumpImpl) StartStreamWithTransformer(w http.ResponseWriter, recv func() (proto.Message, error), err error, transformer func(interface{}) interface{}) {
Expand Down
83 changes: 82 additions & 1 deletion api/connector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@ import (
"context"
"io"
"net/http/httptest"
"runtime"
"strings"
"sync"
"testing"
"time"

"github.com/devtron-labs/common-lib/async"
"github.com/devtron-labs/common-lib/constants"
"go.uber.org/zap"
)

func newTestPump() PumpImpl {
logger, _ := zap.NewDevelopment()
return PumpImpl{logger: logger.Sugar()}
runnable := async.NewAsyncRunnable(logger.Sugar(), constants.ServiceName("test"))
return PumpImpl{logger: logger.Sugar(), asyncRunnable: runnable}
}

// blockingReader blocks on Read until Close is called, then returns io.EOF
Expand Down Expand Up @@ -133,3 +138,79 @@ func TestStartK8sStreamWithHeartBeat_MalformedLines_DoNotAbortStream(t *testing.
})
}
}

// sentinelFlusher reports a test error if Flush is called after markReturned().
type sentinelFlusher struct {
*httptest.ResponseRecorder
mu sync.Mutex
afterReturn bool
t *testing.T
}

func (sf *sentinelFlusher) markReturned() {
sf.mu.Lock()
sf.afterReturn = true
sf.mu.Unlock()
}

func (sf *sentinelFlusher) Flush() {
sf.mu.Lock()
violation := sf.afterReturn
sf.mu.Unlock()
if violation {
sf.t.Error("Flush called after StartK8sStreamWithHeartBeat returned — heartbeat goroutine leak")
}
sf.ResponseRecorder.Flush()
}

// TestHeartbeatGoroutineExitsBeforeFunctionReturns verifies that no Flush()
// call reaches the ResponseWriter after StartK8sStreamWithHeartBeat returns.
func TestHeartbeatGoroutineExitsBeforeFunctionReturns(t *testing.T) {
t.Parallel()

pump := newTestPump()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

stream := newFakeStream("2024-01-01T00:00:00Z hello world\n")
w := &sentinelFlusher{ResponseRecorder: httptest.NewRecorder(), t: t}

done := make(chan struct{})
go func() {
defer close(done)
pump.StartK8sStreamWithHeartBeat(ctx, w, false, stream, nil)
w.markReturned() // any Flush after this line is a violation
}()

select {
case <-done:
// Give scheduler a moment: a leaked goroutine would call Flush here.
time.Sleep(50 * time.Millisecond)
case <-time.After(3 * time.Second):
t.Fatal("StartK8sStreamWithHeartBeat did not return within 3s")
}
}

// TestNoGoroutineLeakAfterStreamEOF verifies no goroutine is left running
// after the stream finishes normally.
func TestNoGoroutineLeakAfterStreamEOF(t *testing.T) {
// Not parallel: runtime.NumGoroutine() counts all goroutines in the process;
// parallel sibling tests contaminate the before/after samples.
pump := newTestPump()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

runtime.GC()
before := runtime.NumGoroutine()

stream := newFakeStream("2024-01-01T00:00:00Z line one\n")
pump.StartK8sStreamWithHeartBeat(ctx, httptest.NewRecorder(), false, stream, nil)

runtime.GC()
time.Sleep(50 * time.Millisecond)

after := runtime.NumGoroutine()
if after > before {
t.Errorf("goroutine leak: %d before, %d after", before, after)
}
}
2 changes: 1 addition & 1 deletion cmd/external-app/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading