Skip to content

Commit ee83506

Browse files
committed
Filter out by specific UUID.
1 parent ceb8fda commit ee83506

File tree

3 files changed

+84
-35
lines changed

3 files changed

+84
-35
lines changed

internal/pkg/otel/manager/execution_subprocess.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"sync"
1515
"time"
1616

17-
"github.com/gofrs/uuid/v5"
1817
"go.opentelemetry.io/collector/component"
1918
"gopkg.in/yaml.v3"
2019

@@ -41,16 +40,12 @@ const (
4140

4241
// newSubprocessExecution creates a new execution which runs the otel collector in a subprocess. A metricsPort or
4342
// healthCheckPort of 0 will result in a random port being used.
44-
func newSubprocessExecution(logLevel logp.Level, collectorPath string, metricsPort int, healthCheckPort int) (*subprocessExecution, error) {
45-
nsUUID, err := uuid.NewV4()
46-
if err != nil {
47-
return nil, fmt.Errorf("cannot generate UUID: %w", err)
48-
}
43+
func newSubprocessExecution(logLevel logp.Level, collectorPath string, uuid string, metricsPort int, healthCheckPort int) (*subprocessExecution, error) {
4944
componentType, err := component.NewType(healthCheckExtensionName)
5045
if err != nil {
5146
return nil, fmt.Errorf("cannot create component type: %w", err)
5247
}
53-
healthCheckExtensionID := component.NewIDWithName(componentType, nsUUID.String()).String()
48+
healthCheckExtensionID := component.NewIDWithName(componentType, uuid).String()
5449

5550
return &subprocessExecution{
5651
collectorPath: collectorPath,

internal/pkg/otel/manager/manager.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,22 @@ import (
1818
"sync/atomic"
1919
"time"
2020

21+
"github.com/gofrs/uuid/v5"
22+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
23+
"go.opentelemetry.io/collector/confmap"
2124
"go.uber.org/zap"
2225

23-
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
24-
25-
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
26-
2726
"github.com/elastic/elastic-agent-client/v7/pkg/client"
27+
"github.com/elastic/elastic-agent-libs/logp"
28+
2829
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
30+
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
2931
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
32+
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
3033
"github.com/elastic/elastic-agent/internal/pkg/otel/config"
3134
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"
3235
"github.com/elastic/elastic-agent/pkg/component"
3336
"github.com/elastic/elastic-agent/pkg/component/runtime"
34-
35-
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
36-
"go.opentelemetry.io/collector/confmap"
37-
38-
"github.com/elastic/elastic-agent-libs/logp"
39-
4037
"github.com/elastic/elastic-agent/pkg/core/logger"
4138
)
4239

@@ -79,8 +76,9 @@ type OTelManager struct {
7976
agentInfo info.Agent
8077
beatMonitoringConfigGetter translate.BeatMonitoringConfigGetter
8178

82-
collectorCfg *confmap.Conf
83-
components []component.Component
79+
healthCheckExtID string
80+
collectorCfg *confmap.Conf
81+
components []component.Component
8482

8583
// The current configuration that the OTel collector is using. In the case that
8684
// the mergedCollectorCfg is nil then the collector is not running.
@@ -139,6 +137,12 @@ func NewOTelManager(
139137
var recoveryTimer collectorRecoveryTimer
140138
var err error
141139

140+
hcUUID, err := uuid.NewV4()
141+
if err != nil {
142+
return nil, fmt.Errorf("cannot generate UUID: %w", err)
143+
}
144+
hcUUIDStr := hcUUID.String()
145+
142146
// determine the otel collector ports
143147
collectorMetricsPort, collectorHealthCheckPort := 0, 0
144148
if agentCollectorConfig != nil {
@@ -165,7 +169,7 @@ func NewOTelManager(
165169
return nil, fmt.Errorf("failed to get the path to the collector executable: %w", err)
166170
}
167171
recoveryTimer = newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute)
168-
exec, err = newSubprocessExecution(logLevel, executable, collectorMetricsPort, collectorHealthCheckPort)
172+
exec, err = newSubprocessExecution(logLevel, executable, hcUUIDStr, collectorMetricsPort, collectorHealthCheckPort)
169173
if err != nil {
170174
return nil, fmt.Errorf("failed to create subprocess execution: %w", err)
171175
}
@@ -180,6 +184,7 @@ func NewOTelManager(
180184
baseLogger: baseLogger,
181185
agentInfo: agentInfo,
182186
beatMonitoringConfigGetter: beatMonitoringConfigGetter,
187+
healthCheckExtID: fmt.Sprintf("extension:healthcheckv2/%s", hcUUIDStr),
183188
errCh: make(chan error, 1), // holds at most one error
184189
collectorStatusCh: make(chan *status.AggregateStatus, 1),
185190
// componentStateCh uses a buffer channel to ensure that no state transitions are missed and to prevent
@@ -531,7 +536,7 @@ func (m *OTelManager) handleOtelStatusUpdate(otelStatus *status.AggregateStatus)
531536
delete(extensionsMap.ComponentStatusMap, extensionKey)
532537
case strings.HasPrefix(extensionKey, "extension:elastic_diagnostics"):
533538
delete(extensionsMap.ComponentStatusMap, extensionKey)
534-
case strings.HasPrefix(extensionKey, "extension:healthcheckv2"):
539+
case extensionKey == m.healthCheckExtID:
535540
delete(extensionsMap.ComponentStatusMap, extensionKey)
536541
}
537542
}

internal/pkg/otel/manager/manager_test.go

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,11 @@ func TestOTelManager_Run(t *testing.T) {
349349
{
350350
name: "subprocess collector config updates",
351351
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
352-
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
352+
hcUUID, err := uuid.NewV4()
353+
if err != nil {
354+
return nil, fmt.Errorf("cannot generate UUID: %w", err)
355+
}
356+
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
353357
},
354358
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
355359
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) {
@@ -377,7 +381,11 @@ func TestOTelManager_Run(t *testing.T) {
377381
{
378382
name: "subprocess collector stopped gracefully outside manager",
379383
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
380-
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
384+
hcUUID, err := uuid.NewV4()
385+
if err != nil {
386+
return nil, fmt.Errorf("cannot generate UUID: %w", err)
387+
}
388+
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
381389
},
382390
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
383391
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) {
@@ -406,7 +414,11 @@ func TestOTelManager_Run(t *testing.T) {
406414
{
407415
name: "subprocess collector killed outside manager",
408416
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
409-
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
417+
hcUUID, err := uuid.NewV4()
418+
if err != nil {
419+
return nil, fmt.Errorf("cannot generate UUID: %w", err)
420+
}
421+
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
410422
},
411423
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
412424
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) {
@@ -449,7 +461,11 @@ func TestOTelManager_Run(t *testing.T) {
449461
{
450462
name: "subprocess collector panics restarts",
451463
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
452-
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
464+
hcUUID, err := uuid.NewV4()
465+
if err != nil {
466+
return nil, fmt.Errorf("cannot generate UUID: %w", err)
467+
}
468+
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
453469
},
454470
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
455471
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) {
@@ -484,7 +500,11 @@ func TestOTelManager_Run(t *testing.T) {
484500
{
485501
name: "subprocess collector panics reports fatal",
486502
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
487-
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
503+
hcUUID, err := uuid.NewV4()
504+
if err != nil {
505+
return nil, fmt.Errorf("cannot generate UUID: %w", err)
506+
}
507+
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
488508
},
489509
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
490510
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) {
@@ -525,7 +545,11 @@ func TestOTelManager_Run(t *testing.T) {
525545
{
526546
name: "subprocess collector killed if delayed and manager is stopped",
527547
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
528-
subprocessExec, err := newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
548+
hcUUID, err := uuid.NewV4()
549+
if err != nil {
550+
return nil, fmt.Errorf("cannot generate UUID: %w", err)
551+
}
552+
subprocessExec, err := newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
529553
if err != nil {
530554
return nil, err
531555
}
@@ -578,7 +602,11 @@ func TestOTelManager_Run(t *testing.T) {
578602
{
579603
name: "subprocess collector gracefully exited if delayed a bit and manager is stopped",
580604
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
581-
subprocessExec, err := newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
605+
hcUUID, err := uuid.NewV4()
606+
if err != nil {
607+
return nil, fmt.Errorf("cannot generate UUID: %w", err)
608+
}
609+
subprocessExec, err := newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
582610
if err != nil {
583611
return nil, err
584612
}
@@ -631,7 +659,11 @@ func TestOTelManager_Run(t *testing.T) {
631659
{
632660
name: "subprocess user has healthcheck extension",
633661
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
634-
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
662+
hcUUID, err := uuid.NewV4()
663+
if err != nil {
664+
return nil, fmt.Errorf("cannot generate UUID: %w", err)
665+
}
666+
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
635667
},
636668
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
637669
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) {
@@ -666,7 +698,11 @@ func TestOTelManager_Run(t *testing.T) {
666698
{
667699
name: "subprocess collector empty config",
668700
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
669-
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
701+
hcUUID, err := uuid.NewV4()
702+
if err != nil {
703+
return nil, fmt.Errorf("cannot generate UUID: %w", err)
704+
}
705+
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
670706
},
671707
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
672708
skipListeningErrors: true,
@@ -713,7 +749,11 @@ func TestOTelManager_Run(t *testing.T) {
713749
{
714750
name: "subprocess collector failed to start",
715751
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
716-
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
752+
hcUUID, err := uuid.NewV4()
753+
if err != nil {
754+
return nil, fmt.Errorf("cannot generate UUID: %w", err)
755+
}
756+
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
717757
},
718758
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
719759
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) {
@@ -847,7 +887,11 @@ func TestOTelManager_Logging(t *testing.T) {
847887
{
848888
name: "subprocess execution",
849889
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
850-
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
890+
hcUUID, err := uuid.NewV4()
891+
if err != nil {
892+
return nil, fmt.Errorf("cannot generate UUID: %w", err)
893+
}
894+
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
851895
},
852896
},
853897
} {
@@ -920,7 +964,11 @@ func TestOTelManager_Ports(t *testing.T) {
920964
{
921965
name: "subprocess execution",
922966
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
923-
return newSubprocessExecution(logp.DebugLevel, testBinary, metricsPort, healthCheckPort)
967+
hcUUID, err := uuid.NewV4()
968+
if err != nil {
969+
return nil, fmt.Errorf("cannot generate UUID: %w", err)
970+
}
971+
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), metricsPort, healthCheckPort)
924972
},
925973
healthCheckEnabled: true,
926974
},
@@ -1072,7 +1120,7 @@ func TestOTelManager_PortConflict(t *testing.T) {
10721120
waitTimeForStop,
10731121
)
10741122
require.NoError(t, err, "could not create otel manager")
1075-
executionMode, err := newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
1123+
executionMode, err := newSubprocessExecution(logp.DebugLevel, testBinary, strings.TrimPrefix(m.healthCheckExtID, "extension:healthcheckv2/"), 0, 0)
10761124
require.NoError(t, err, "could not create subprocess execution mode")
10771125
m.execution = executionMode
10781126

@@ -1289,7 +1337,7 @@ func TestOTelManager_handleOtelStatusUpdate(t *testing.T) {
12891337
"extension:elastic_diagnostics/test": {
12901338
Event: componentstatus.NewEvent(componentstatus.StatusOK),
12911339
},
1292-
"extension:healthcheckv2/test": {
1340+
"extension:healthcheckv2/uuid": {
12931341
Event: componentstatus.NewEvent(componentstatus.StatusOK),
12941342
},
12951343
},
@@ -1377,6 +1425,7 @@ func TestOTelManager_handleOtelStatusUpdate(t *testing.T) {
13771425
mgr := &OTelManager{
13781426
logger: newTestLogger(),
13791427
components: tt.components,
1428+
healthCheckExtID: "extension:healthcheckv2/uuid",
13801429
currentComponentStates: make(map[string]runtime.ComponentComponentState),
13811430
}
13821431

0 commit comments

Comments
 (0)