From 8dcb6e1b8fd33e40db3c32df07c7a5f1f806e562 Mon Sep 17 00:00:00 2001 From: Bo He Date: Sat, 23 May 2026 11:14:59 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=97=A5=E5=BF=97ws?= =?UTF-8?q?=E6=AD=BB=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/api/state_ws_api.go | 4 ++-- src/api/ws_api.go | 16 +++++++++++++--- src/pclog/process_log_buffer.go | 9 ++++++++- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/api/state_ws_api.go b/src/api/state_ws_api.go index 249c0a88..a111381b 100644 --- a/src/api/state_ws_api.go +++ b/src/api/state_ws_api.go @@ -7,6 +7,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/f1bonacc1/process-compose/src/app" "github.com/f1bonacc1/process-compose/src/pclog" @@ -136,9 +137,8 @@ func (api *PcApi) handleStateStream(ws *websocket.Conn, observer *stateWsObserve var writeMu sync.Mutex writeJSON := func(ev types.ProcessStateEvent) error { writeMu.Lock() - api.wsMtx.Lock() + _ = ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) err := ws.WriteJSON(&ev) - api.wsMtx.Unlock() writeMu.Unlock() return err } diff --git a/src/api/ws_api.go b/src/api/ws_api.go index 5ceba063..78adfa1a 100644 --- a/src/api/ws_api.go +++ b/src/api/ws_api.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/f1bonacc1/process-compose/src/app" "github.com/f1bonacc1/process-compose/src/pclog" @@ -81,7 +82,14 @@ func (api *PcApi) HandleLogsStream(c *gin.Context) { if isChannelClosed { return 0, nil } - logChan <- msg + // Non-blocking send: if the subscriber's WriteJSON is stuck + // (e.g. TCP send buffer full because the client stopped + // reading), dropping the line here is far better than + // blocking the producer and deadlocking the buffer. + select { + case logChan <- msg: + default: + } return len(message), nil }, endOffset) @@ -107,9 +115,11 @@ func (api *PcApi) handleLog(ws *websocket.Conn, procName string, connector *pclo for { select { case msg, open := <-logChan: - api.wsMtx.Lock() + // handleLog is the sole writer on this ws.Conn; no global + // mutex needed. Setting a write deadline guards against a + // half-dead peer whose TCP send buffer never drains. + _ = ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) err := ws.WriteJSON(&msg) - api.wsMtx.Unlock() if err != nil { if errors.Is(err, net.ErrClosed) { return diff --git a/src/pclog/process_log_buffer.go b/src/pclog/process_log_buffer.go index e356a141..1f4a94b1 100644 --- a/src/pclog/process_log_buffer.go +++ b/src/pclog/process_log_buffer.go @@ -50,9 +50,16 @@ func (b *ProcessLogBuffer) Write(message string) { } b.mxBuf.Unlock() + // Snapshot observers under the lock, then fan out without holding it. + // Holding mxObs across observer.WriteString lets a slow/blocked observer + // stall every other Write and any new GetLogsAndSubscribe on this buffer. b.mxObs.Lock() - defer b.mxObs.Unlock() + snapshot := make([]LogObserver, 0, len(b.observers)) for _, observer := range b.observers { + snapshot = append(snapshot, observer) + } + b.mxObs.Unlock() + for _, observer := range snapshot { _, _ = observer.WriteString(message) } } From 0be8ab48f793a88c4ebb09a8c7a2200b88516a26 Mon Sep 17 00:00:00 2001 From: Bo He Date: Sun, 24 May 2026 12:21:57 +0800 Subject: [PATCH 2/2] Fix log websocket backpressure --- src/api/pc_api.go | 2 -- src/api/ws_api.go | 81 ++++++++++++++++++++++++++++++++--------------- 2 files changed, 56 insertions(+), 27 deletions(-) diff --git a/src/api/pc_api.go b/src/api/pc_api.go index a7ce2ba7..ceb4f777 100644 --- a/src/api/pc_api.go +++ b/src/api/pc_api.go @@ -3,7 +3,6 @@ package api import ( "net/http" "strconv" - "sync" "github.com/f1bonacc1/process-compose/src/types" @@ -28,7 +27,6 @@ import ( type PcApi struct { project app.IProject - wsMtx sync.Mutex } func NewPcApi(project app.IProject) *PcApi { diff --git a/src/api/ws_api.go b/src/api/ws_api.go index 78adfa1a..4abaa198 100644 --- a/src/api/ws_api.go +++ b/src/api/ws_api.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/f1bonacc1/process-compose/src/app" @@ -46,6 +47,7 @@ func (api *PcApi) HandleLogsStream(c *gin.Context) { } done := make(chan struct{}) + wsWriteMtx := &sync.Mutex{} if follow { go handleIncoming(ws, done) } @@ -56,6 +58,32 @@ func (api *PcApi) HandleLogsStream(c *gin.Context) { logChan := make(chan LogMessage, 256) chanCloseMtx := &sync.Mutex{} isChannelClosed := false + closeLogChan := func() { + chanCloseMtx.Lock() + defer chanCloseMtx.Unlock() + if !isChannelClosed { + close(logChan) + isChannelClosed = true + } + } + dropped := &atomic.Uint64{} + warnedOnce := &atomic.Bool{} + enqueue := func(msg LogMessage) bool { + chanCloseMtx.Lock() + defer chanCloseMtx.Unlock() + if isChannelClosed { + return false + } + select { + case logChan <- msg: + default: + dropped.Add(1) + if warnedOnce.CompareAndSwap(false, true) { + log.Warn().Str("process", processName).Msg("ws subscriber backpressured; dropping log lines") + } + } + return true + } connector := pclog.NewConnector( func(messages []string) { for _, message := range messages { @@ -63,13 +91,10 @@ func (api *PcApi) HandleLogsStream(c *gin.Context) { Message: message, ProcessName: processName, } - logChan <- msg + enqueue(msg) } if !follow { - chanCloseMtx.Lock() - defer chanCloseMtx.Unlock() - close(logChan) - isChannelClosed = true + closeLogChan() } }, func(message string) (n int, err error) { @@ -77,23 +102,13 @@ func (api *PcApi) HandleLogsStream(c *gin.Context) { Message: message, ProcessName: processName, } - chanCloseMtx.Lock() - defer chanCloseMtx.Unlock() - if isChannelClosed { + if !enqueue(msg) { return 0, nil } - // Non-blocking send: if the subscriber's WriteJSON is stuck - // (e.g. TCP send buffer full because the client stopped - // reading), dropping the line here is far better than - // blocking the producer and deadlocking the buffer. - select { - case logChan <- msg: - default: - } return len(message), nil }, endOffset) - go api.handleLog(ws, processName, connector, logChan, done) + go api.handleLog(ws, processName, connector, logChan, done, wsWriteMtx, dropped, closeLogChan) err = api.project.GetLogsAndSubscribe(processName, connector) if err != nil { @@ -104,7 +119,22 @@ func (api *PcApi) HandleLogsStream(c *gin.Context) { } -func (api *PcApi) handleLog(ws *websocket.Conn, procName string, connector *pclog.Connector, logChan chan LogMessage, done chan struct{}) { +func (api *PcApi) handleLog( + ws *websocket.Conn, + procName string, + connector *pclog.Connector, + logChan chan LogMessage, + done chan struct{}, + wsWriteMtx *sync.Mutex, + dropped *atomic.Uint64, + closeLogChan func(), +) { + defer func() { + if count := dropped.Load(); count > 0 { + log.Warn().Str("process", procName).Uint64("dropped", count). + Msg("ws subscriber disconnected after dropped lines") + } + }() defer func(project app.IProject, name string, observer pclog.LogObserver) { err := project.UnSubscribeLogger(name, observer) if err != nil { @@ -115,11 +145,15 @@ func (api *PcApi) handleLog(ws *websocket.Conn, procName string, connector *pclo for { select { case msg, open := <-logChan: - // handleLog is the sole writer on this ws.Conn; no global - // mutex needed. Setting a write deadline guards against a - // half-dead peer whose TCP send buffer never drains. + if !open { + return + } + // Serialize writes per ws.Conn. Multiple process streams share the + // same connection when the request contains comma-separated names. + wsWriteMtx.Lock() _ = ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) err := ws.WriteJSON(&msg) + wsWriteMtx.Unlock() if err != nil { if errors.Is(err, net.ErrClosed) { return @@ -127,12 +161,9 @@ func (api *PcApi) handleLog(ws *websocket.Conn, procName string, connector *pclo log.Err(err).Msg("Failed to write to socket") return } - if !open { - return - } case <-done: log.Warn().Msg("Socket closed remotely") - close(logChan) + closeLogChan() return }