Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# REQUIRED
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# REQUIRED for all kinds
# Change summary; a 80ish characters long description of the change.
summary: hide healthcheckv2 from status output

# REQUIRED for breaking-change, deprecation, known-issue
# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# description:

# REQUIRED for breaking-change, deprecation, known-issue
# impact:

# REQUIRED for breaking-change, deprecation, known-issue
# action:

# REQUIRED for all kinds
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# AUTOMATED
# OPTIONAL to manually add other PR URLs
# PR URL: A link the PR that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
# pr: https://github.com/owner/repo/1234

# AUTOMATED
# OPTIONAL to manually add other issue URLs
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
# issue: https://github.com/owner/repo/1234
9 changes: 2 additions & 7 deletions internal/pkg/otel/manager/execution_subprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"sync"
"time"

"github.com/gofrs/uuid/v5"
"go.opentelemetry.io/collector/component"
"gopkg.in/yaml.v3"

Expand All @@ -41,16 +40,12 @@ const (

// newSubprocessExecution creates a new execution which runs the otel collector in a subprocess. A metricsPort or
// healthCheckPort of 0 will result in a random port being used.
func newSubprocessExecution(logLevel logp.Level, collectorPath string, metricsPort int, healthCheckPort int) (*subprocessExecution, error) {
nsUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("cannot generate UUID: %w", err)
}
func newSubprocessExecution(logLevel logp.Level, collectorPath string, uuid string, metricsPort int, healthCheckPort int) (*subprocessExecution, error) {
componentType, err := component.NewType(healthCheckExtensionName)
if err != nil {
return nil, fmt.Errorf("cannot create component type: %w", err)
}
healthCheckExtensionID := component.NewIDWithName(componentType, nsUUID.String()).String()
healthCheckExtensionID := component.NewIDWithName(componentType, uuid).String()

return &subprocessExecution{
collectorPath: collectorPath,
Expand Down
33 changes: 20 additions & 13 deletions internal/pkg/otel/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,22 @@ import (
"sync/atomic"
"time"

"github.com/gofrs/uuid/v5"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
"go.opentelemetry.io/collector/confmap"
"go.uber.org/zap"

"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"

componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/otel/config"
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
"go.opentelemetry.io/collector/confmap"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/elastic-agent/pkg/core/logger"
)

Expand Down Expand Up @@ -79,8 +76,9 @@ type OTelManager struct {
agentInfo info.Agent
beatMonitoringConfigGetter translate.BeatMonitoringConfigGetter

collectorCfg *confmap.Conf
components []component.Component
healthCheckExtID string
collectorCfg *confmap.Conf
components []component.Component

// The current configuration that the OTel collector is using. In the case that
// the mergedCollectorCfg is nil then the collector is not running.
Expand Down Expand Up @@ -139,6 +137,12 @@ func NewOTelManager(
var recoveryTimer collectorRecoveryTimer
var err error

hcUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("cannot generate UUID: %w", err)
}
hcUUIDStr := hcUUID.String()

// determine the otel collector ports
collectorMetricsPort, collectorHealthCheckPort := 0, 0
if agentCollectorConfig != nil {
Expand All @@ -165,7 +169,7 @@ func NewOTelManager(
return nil, fmt.Errorf("failed to get the path to the collector executable: %w", err)
}
recoveryTimer = newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute)
exec, err = newSubprocessExecution(logLevel, executable, collectorMetricsPort, collectorHealthCheckPort)
exec, err = newSubprocessExecution(logLevel, executable, hcUUIDStr, collectorMetricsPort, collectorHealthCheckPort)
if err != nil {
return nil, fmt.Errorf("failed to create subprocess execution: %w", err)
}
Expand All @@ -180,6 +184,7 @@ func NewOTelManager(
baseLogger: baseLogger,
agentInfo: agentInfo,
beatMonitoringConfigGetter: beatMonitoringConfigGetter,
healthCheckExtID: fmt.Sprintf("extension:healthcheckv2/%s", hcUUIDStr),
errCh: make(chan error, 1), // holds at most one error
collectorStatusCh: make(chan *status.AggregateStatus, 1),
// componentStateCh uses a buffer channel to ensure that no state transitions are missed and to prevent
Expand Down Expand Up @@ -531,6 +536,8 @@ func (m *OTelManager) handleOtelStatusUpdate(otelStatus *status.AggregateStatus)
delete(extensionsMap.ComponentStatusMap, extensionKey)
case strings.HasPrefix(extensionKey, "extension:elastic_diagnostics"):
delete(extensionsMap.ComponentStatusMap, extensionKey)
case extensionKey == m.healthCheckExtID:
delete(extensionsMap.ComponentStatusMap, extensionKey)
}
}

Expand Down
78 changes: 65 additions & 13 deletions internal/pkg/otel/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,11 @@ func TestOTelManager_Run(t *testing.T) {
{
name: "subprocess collector config updates",
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
hcUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("cannot generate UUID: %w", err)
}
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
},
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) {
Expand Down Expand Up @@ -377,7 +381,11 @@ func TestOTelManager_Run(t *testing.T) {
{
name: "subprocess collector stopped gracefully outside manager",
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
hcUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("cannot generate UUID: %w", err)
}
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
},
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) {
Expand Down Expand Up @@ -406,7 +414,11 @@ func TestOTelManager_Run(t *testing.T) {
{
name: "subprocess collector killed outside manager",
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
hcUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("cannot generate UUID: %w", err)
}
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
},
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) {
Expand Down Expand Up @@ -449,7 +461,11 @@ func TestOTelManager_Run(t *testing.T) {
{
name: "subprocess collector panics restarts",
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
hcUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("cannot generate UUID: %w", err)
}
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
},
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) {
Expand Down Expand Up @@ -484,7 +500,11 @@ func TestOTelManager_Run(t *testing.T) {
{
name: "subprocess collector panics reports fatal",
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
hcUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("cannot generate UUID: %w", err)
}
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
},
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) {
Expand Down Expand Up @@ -525,7 +545,11 @@ func TestOTelManager_Run(t *testing.T) {
{
name: "subprocess collector killed if delayed and manager is stopped",
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
subprocessExec, err := newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
hcUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("cannot generate UUID: %w", err)
}
subprocessExec, err := newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -578,7 +602,11 @@ func TestOTelManager_Run(t *testing.T) {
{
name: "subprocess collector gracefully exited if delayed a bit and manager is stopped",
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
subprocessExec, err := newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
hcUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("cannot generate UUID: %w", err)
}
subprocessExec, err := newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -631,7 +659,11 @@ func TestOTelManager_Run(t *testing.T) {
{
name: "subprocess user has healthcheck extension",
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
hcUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("cannot generate UUID: %w", err)
}
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
},
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) {
Expand Down Expand Up @@ -666,7 +698,11 @@ func TestOTelManager_Run(t *testing.T) {
{
name: "subprocess collector empty config",
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
hcUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("cannot generate UUID: %w", err)
}
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
},
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
skipListeningErrors: true,
Expand Down Expand Up @@ -713,7 +749,11 @@ func TestOTelManager_Run(t *testing.T) {
{
name: "subprocess collector failed to start",
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
hcUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("cannot generate UUID: %w", err)
}
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
},
restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute),
testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) {
Expand Down Expand Up @@ -847,7 +887,11 @@ func TestOTelManager_Logging(t *testing.T) {
{
name: "subprocess execution",
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
hcUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("cannot generate UUID: %w", err)
}
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), 0, 0)
},
},
} {
Expand Down Expand Up @@ -920,7 +964,11 @@ func TestOTelManager_Ports(t *testing.T) {
{
name: "subprocess execution",
execModeFn: func(collectorRunErr chan error) (collectorExecution, error) {
return newSubprocessExecution(logp.DebugLevel, testBinary, metricsPort, healthCheckPort)
hcUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("cannot generate UUID: %w", err)
}
return newSubprocessExecution(logp.DebugLevel, testBinary, hcUUID.String(), metricsPort, healthCheckPort)
},
healthCheckEnabled: true,
},
Expand Down Expand Up @@ -1072,7 +1120,7 @@ func TestOTelManager_PortConflict(t *testing.T) {
waitTimeForStop,
)
require.NoError(t, err, "could not create otel manager")
executionMode, err := newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0)
executionMode, err := newSubprocessExecution(logp.DebugLevel, testBinary, strings.TrimPrefix(m.healthCheckExtID, "extension:healthcheckv2/"), 0, 0)
require.NoError(t, err, "could not create subprocess execution mode")
m.execution = executionMode

Expand Down Expand Up @@ -1289,6 +1337,9 @@ func TestOTelManager_handleOtelStatusUpdate(t *testing.T) {
"extension:elastic_diagnostics/test": {
Event: componentstatus.NewEvent(componentstatus.StatusOK),
},
"extension:healthcheckv2/uuid": {
Event: componentstatus.NewEvent(componentstatus.StatusOK),
},
},
},
},
Expand Down Expand Up @@ -1374,6 +1425,7 @@ func TestOTelManager_handleOtelStatusUpdate(t *testing.T) {
mgr := &OTelManager{
logger: newTestLogger(),
components: tt.components,
healthCheckExtID: "extension:healthcheckv2/uuid",
currentComponentStates: make(map[string]runtime.ComponentComponentState),
}

Expand Down