Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/api/pc_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package api
import (
"net/http"
"strconv"
"sync"

"github.com/f1bonacc1/process-compose/src/types"

Expand All @@ -28,7 +27,6 @@ import (

type PcApi struct {
project app.IProject
wsMtx sync.Mutex
}

func NewPcApi(project app.IProject) *PcApi {
Expand Down
4 changes: 2 additions & 2 deletions src/api/state_ws_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/f1bonacc1/process-compose/src/app"
"github.com/f1bonacc1/process-compose/src/pclog"
Expand Down Expand Up @@ -136,9 +137,8 @@ func (api *PcApi) handleStateStream(ws *websocket.Conn, observer *stateWsObserve
var writeMu sync.Mutex
writeJSON := func(ev types.ProcessStateEvent) error {
writeMu.Lock()
api.wsMtx.Lock()
_ = ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
err := ws.WriteJSON(&ev)
api.wsMtx.Unlock()
writeMu.Unlock()
return err
}
Expand Down
75 changes: 58 additions & 17 deletions src/api/ws_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/f1bonacc1/process-compose/src/app"
"github.com/f1bonacc1/process-compose/src/pclog"
Expand Down Expand Up @@ -45,6 +47,7 @@
}

done := make(chan struct{})
wsWriteMtx := &sync.Mutex{}
if follow {
go handleIncoming(ws, done)
}
Expand All @@ -55,37 +58,57 @@
logChan := make(chan LogMessage, 256)
chanCloseMtx := &sync.Mutex{}
isChannelClosed := false
closeLogChan := func() {
chanCloseMtx.Lock()
defer chanCloseMtx.Unlock()
if !isChannelClosed {
close(logChan)
isChannelClosed = true
}
}
dropped := &atomic.Uint64{}
warnedOnce := &atomic.Bool{}
enqueue := func(msg LogMessage) bool {
chanCloseMtx.Lock()
defer chanCloseMtx.Unlock()
if isChannelClosed {
return false
}
select {
case logChan <- msg:
default:
dropped.Add(1)
if warnedOnce.CompareAndSwap(false, true) {
log.Warn().Str("process", processName).Msg("ws subscriber backpressured; dropping log lines")
}
}
return true
}
connector := pclog.NewConnector(
func(messages []string) {
for _, message := range messages {
msg := LogMessage{
Message: message,
ProcessName: processName,
}
logChan <- msg
enqueue(msg)
}
if !follow {
chanCloseMtx.Lock()
defer chanCloseMtx.Unlock()
close(logChan)
isChannelClosed = true
closeLogChan()
}
},
func(message string) (n int, err error) {
msg := LogMessage{
Message: message,
ProcessName: processName,
}
chanCloseMtx.Lock()
defer chanCloseMtx.Unlock()
if isChannelClosed {
if !enqueue(msg) {
return 0, nil
}
logChan <- msg
return len(message), nil
},
endOffset)
go api.handleLog(ws, processName, connector, logChan, done)
go api.handleLog(ws, processName, connector, logChan, done, wsWriteMtx, dropped, closeLogChan)

err = api.project.GetLogsAndSubscribe(processName, connector)
if err != nil {
Expand All @@ -96,7 +119,22 @@

}

func (api *PcApi) handleLog(ws *websocket.Conn, procName string, connector *pclog.Connector, logChan chan LogMessage, done chan struct{}) {
func (api *PcApi) handleLog(

Check warning on line 122 in src/api/ws_api.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This function has 8 parameters, which is greater than the 7 authorized.

See more on https://sonarcloud.io/project/issues?id=F1bonacc1_process-compose&issues=AZ5YOLXt6Ms3vxq6Kk64&open=AZ5YOLXt6Ms3vxq6Kk64&pullRequest=495
ws *websocket.Conn,
procName string,
connector *pclog.Connector,
logChan chan LogMessage,
done chan struct{},
wsWriteMtx *sync.Mutex,
dropped *atomic.Uint64,
closeLogChan func(),
) {
defer func() {
if count := dropped.Load(); count > 0 {
log.Warn().Str("process", procName).Uint64("dropped", count).
Msg("ws subscriber disconnected after dropped lines")
}
}()
defer func(project app.IProject, name string, observer pclog.LogObserver) {
err := project.UnSubscribeLogger(name, observer)
if err != nil {
Expand All @@ -107,22 +145,25 @@
for {
select {
case msg, open := <-logChan:
api.wsMtx.Lock()
Comment thread
F1bonacc1 marked this conversation as resolved.
if !open {
return
}
// Serialize writes per ws.Conn. Multiple process streams share the
// same connection when the request contains comma-separated names.
wsWriteMtx.Lock()
_ = ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
err := ws.WriteJSON(&msg)
api.wsMtx.Unlock()
wsWriteMtx.Unlock()
if err != nil {
if errors.Is(err, net.ErrClosed) {
return
}
log.Err(err).Msg("Failed to write to socket")
return
}
if !open {
return
}
case <-done:
log.Warn().Msg("Socket closed remotely")
close(logChan)
closeLogChan()
return
}

Expand Down
9 changes: 8 additions & 1 deletion src/pclog/process_log_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,16 @@ func (b *ProcessLogBuffer) Write(message string) {
}
b.mxBuf.Unlock()

// Snapshot observers under the lock, then fan out without holding it.
// Holding mxObs across observer.WriteString lets a slow/blocked observer
// stall every other Write and any new GetLogsAndSubscribe on this buffer.
b.mxObs.Lock()
defer b.mxObs.Unlock()
snapshot := make([]LogObserver, 0, len(b.observers))
for _, observer := range b.observers {
snapshot = append(snapshot, observer)
}
b.mxObs.Unlock()
for _, observer := range snapshot {
_, _ = observer.WriteString(message)
}
}
Expand Down
Loading