diff --git a/src/api/pc_api.go b/src/api/pc_api.go index a7ce2ba7..ceb4f777 100644 --- a/src/api/pc_api.go +++ b/src/api/pc_api.go @@ -3,7 +3,6 @@ package api import ( "net/http" "strconv" - "sync" "github.com/f1bonacc1/process-compose/src/types" @@ -28,7 +27,6 @@ import ( type PcApi struct { project app.IProject - wsMtx sync.Mutex } func NewPcApi(project app.IProject) *PcApi { diff --git a/src/api/state_ws_api.go b/src/api/state_ws_api.go index 249c0a88..a111381b 100644 --- a/src/api/state_ws_api.go +++ b/src/api/state_ws_api.go @@ -7,6 +7,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/f1bonacc1/process-compose/src/app" "github.com/f1bonacc1/process-compose/src/pclog" @@ -136,9 +137,8 @@ func (api *PcApi) handleStateStream(ws *websocket.Conn, observer *stateWsObserve var writeMu sync.Mutex writeJSON := func(ev types.ProcessStateEvent) error { writeMu.Lock() - api.wsMtx.Lock() + _ = ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) err := ws.WriteJSON(&ev) - api.wsMtx.Unlock() writeMu.Unlock() return err } diff --git a/src/api/ws_api.go b/src/api/ws_api.go index 5ceba063..4abaa198 100644 --- a/src/api/ws_api.go +++ b/src/api/ws_api.go @@ -7,6 +7,8 @@ import ( "strconv" "strings" "sync" + "sync/atomic" + "time" "github.com/f1bonacc1/process-compose/src/app" "github.com/f1bonacc1/process-compose/src/pclog" @@ -45,6 +47,7 @@ func (api *PcApi) HandleLogsStream(c *gin.Context) { } done := make(chan struct{}) + wsWriteMtx := &sync.Mutex{} if follow { go handleIncoming(ws, done) } @@ -55,6 +58,32 @@ func (api *PcApi) HandleLogsStream(c *gin.Context) { logChan := make(chan LogMessage, 256) chanCloseMtx := &sync.Mutex{} isChannelClosed := false + closeLogChan := func() { + chanCloseMtx.Lock() + defer chanCloseMtx.Unlock() + if !isChannelClosed { + close(logChan) + isChannelClosed = true + } + } + dropped := &atomic.Uint64{} + warnedOnce := &atomic.Bool{} + enqueue := func(msg LogMessage) bool { + chanCloseMtx.Lock() + defer chanCloseMtx.Unlock() + if isChannelClosed { + return false + } + select { + case logChan <- msg: + default: + dropped.Add(1) + if warnedOnce.CompareAndSwap(false, true) { + log.Warn().Str("process", processName).Msg("ws subscriber backpressured; dropping log lines") + } + } + return true + } connector := pclog.NewConnector( func(messages []string) { for _, message := range messages { @@ -62,13 +91,10 @@ func (api *PcApi) HandleLogsStream(c *gin.Context) { Message: message, ProcessName: processName, } - logChan <- msg + enqueue(msg) } if !follow { - chanCloseMtx.Lock() - defer chanCloseMtx.Unlock() - close(logChan) - isChannelClosed = true + closeLogChan() } }, func(message string) (n int, err error) { @@ -76,16 +102,13 @@ func (api *PcApi) HandleLogsStream(c *gin.Context) { Message: message, ProcessName: processName, } - chanCloseMtx.Lock() - defer chanCloseMtx.Unlock() - if isChannelClosed { + if !enqueue(msg) { return 0, nil } - logChan <- msg return len(message), nil }, endOffset) - go api.handleLog(ws, processName, connector, logChan, done) + go api.handleLog(ws, processName, connector, logChan, done, wsWriteMtx, dropped, closeLogChan) err = api.project.GetLogsAndSubscribe(processName, connector) if err != nil { @@ -96,7 +119,22 @@ func (api *PcApi) HandleLogsStream(c *gin.Context) { } -func (api *PcApi) handleLog(ws *websocket.Conn, procName string, connector *pclog.Connector, logChan chan LogMessage, done chan struct{}) { +func (api *PcApi) handleLog( + ws *websocket.Conn, + procName string, + connector *pclog.Connector, + logChan chan LogMessage, + done chan struct{}, + wsWriteMtx *sync.Mutex, + dropped *atomic.Uint64, + closeLogChan func(), +) { + defer func() { + if count := dropped.Load(); count > 0 { + log.Warn().Str("process", procName).Uint64("dropped", count). + Msg("ws subscriber disconnected after dropped lines") + } + }() defer func(project app.IProject, name string, observer pclog.LogObserver) { err := project.UnSubscribeLogger(name, observer) if err != nil { @@ -107,9 +145,15 @@ func (api *PcApi) handleLog(ws *websocket.Conn, procName string, connector *pclo for { select { case msg, open := <-logChan: - api.wsMtx.Lock() + if !open { + return + } + // Serialize writes per ws.Conn. Multiple process streams share the + // same connection when the request contains comma-separated names. + wsWriteMtx.Lock() + _ = ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) err := ws.WriteJSON(&msg) - api.wsMtx.Unlock() + wsWriteMtx.Unlock() if err != nil { if errors.Is(err, net.ErrClosed) { return @@ -117,12 +161,9 @@ func (api *PcApi) handleLog(ws *websocket.Conn, procName string, connector *pclo log.Err(err).Msg("Failed to write to socket") return } - if !open { - return - } case <-done: log.Warn().Msg("Socket closed remotely") - close(logChan) + closeLogChan() return } diff --git a/src/pclog/process_log_buffer.go b/src/pclog/process_log_buffer.go index e356a141..1f4a94b1 100644 --- a/src/pclog/process_log_buffer.go +++ b/src/pclog/process_log_buffer.go @@ -50,9 +50,16 @@ func (b *ProcessLogBuffer) Write(message string) { } b.mxBuf.Unlock() + // Snapshot observers under the lock, then fan out without holding it. + // Holding mxObs across observer.WriteString lets a slow/blocked observer + // stall every other Write and any new GetLogsAndSubscribe on this buffer. b.mxObs.Lock() - defer b.mxObs.Unlock() + snapshot := make([]LogObserver, 0, len(b.observers)) for _, observer := range b.observers { + snapshot = append(snapshot, observer) + } + b.mxObs.Unlock() + for _, observer := range snapshot { _, _ = observer.WriteString(message) } }