Skip to content

Commit 9822981

Browse files
Merge branch 'main' into feat/allow-fleet-config-override
2 parents a2709d9 + cb08f8b commit 9822981

File tree

13 files changed

+1284
-68
lines changed

13 files changed

+1284
-68
lines changed

.buildkite/pipeline.agentless-app-release.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ steps:
5757

5858
- label: ":docker: Validate docker image is built for all architectures"
5959
command: ".buildkite/scripts/steps/validate-agentless-docker-image.sh"
60+
env:
61+
SERVICE_VERSION: "${VERSION}"
6062
agents:
6163
image: "docker.elastic.co/ci-agent-images/observability/oci-image-tools-agent:latest@sha256:a4ababd1347111759babc05c9ad5a680f4af48892784951358488b7e7fc94af9"
6264
plugins:

.buildkite/pipeline.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ env:
99
IMAGE_UBUNTU_2204_ARM_64: "platform-ingest-elastic-agent-ubuntu-2204-aarch64-1762801856"
1010
IMAGE_WIN_2016: "platform-ingest-elastic-agent-windows-2016-1762801856"
1111
IMAGE_WIN_2022: "platform-ingest-elastic-agent-windows-2022-1762801856"
12-
IMAGE_WIN_10: "platform-ingest-elastic-agent-windows-10-1762801856"
13-
IMAGE_WIN_11: "platform-ingest-elastic-agent-windows-11-1762801856"
12+
IMAGE_WIN_10: "platform-ingest-elastic-agent-windows-10-1764775167"
13+
IMAGE_WIN_11: "platform-ingest-elastic-agent-windows-11-1764775167"
1414

1515
steps:
1616
- label: "check-ci"
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# REQUIRED
2+
# Kind can be one of:
3+
# - breaking-change: a change to previously-documented behavior
4+
# - deprecation: functionality that is being removed in a later release
5+
# - bug-fix: fixes a problem in a previous version
6+
# - enhancement: extends functionality but does not break or fix existing behavior
7+
# - feature: new functionality
8+
# - known-issue: problems that we are aware of in a given version
9+
# - security: impacts on the security of a product or a user’s deployment.
10+
# - upgrade: important information for someone upgrading from a prior version
11+
# - other: does not fit into any of the other categories
12+
kind: bug-fix
13+
14+
# REQUIRED for all kinds
15+
# Change summary; a 80ish characters long description of the change.
16+
summary: report crashing otel process cleanly with proper status reporting
17+
18+
# REQUIRED for breaking-change, deprecation, known-issue
19+
# Long description; in case the summary is not enough to describe the change
20+
# this field accommodate a description without length limits.
21+
# description:
22+
23+
# REQUIRED for breaking-change, deprecation, known-issue
24+
# impact:
25+
26+
# REQUIRED for breaking-change, deprecation, known-issue
27+
# action:
28+
29+
# REQUIRED for all kinds
30+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
31+
component:
32+
33+
# AUTOMATED
34+
# OPTIONAL to manually add other PR URLs
35+
# PR URL: A link the PR that added the changeset.
36+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
37+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
38+
# Please provide it if you are adding a fragment for a different PR.
39+
# pr: https://github.com/owner/repo/1234
40+
41+
# AUTOMATED
42+
# OPTIONAL to manually add other issue URLs
43+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
44+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
45+
# issue: https://github.com/owner/repo/1234
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# REQUIRED
2+
# Kind can be one of:
3+
# - breaking-change: a change to previously-documented behavior
4+
# - deprecation: functionality that is being removed in a later release
5+
# - bug-fix: fixes a problem in a previous version
6+
# - enhancement: extends functionality but does not break or fix existing behavior
7+
# - feature: new functionality
8+
# - known-issue: problems that we are aware of in a given version
9+
# - security: impacts on the security of a product or a user’s deployment.
10+
# - upgrade: important information for someone upgrading from a prior version
11+
# - other: does not fit into any of the other categories
12+
kind: enhancement
13+
14+
# REQUIRED for all kinds
15+
# Change summary; a 80ish characters long description of the change.
16+
summary: make otel default runtime for system\metrics input
17+
18+
# REQUIRED for breaking-change, deprecation, known-issue
19+
# Long description; in case the summary is not enough to describe the change
20+
# this field accommodate a description without length limits.
21+
# description:
22+
23+
# REQUIRED for breaking-change, deprecation, known-issue
24+
# impact:
25+
26+
# REQUIRED for breaking-change, deprecation, known-issue
27+
# action:
28+
29+
# REQUIRED for all kinds
30+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
31+
component: elastic-agent
32+
33+
# AUTOMATED
34+
# OPTIONAL to manually add other PR URLs
35+
# PR URL: A link the PR that added the changeset.
36+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
37+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
38+
# Please provide it if you are adding a fragment for a different PR.
39+
pr: https://github.com/elastic/elastic-agent/pull/11613
40+
41+
42+
# AUTOMATED
43+
# OPTIONAL to manually add other issue URLs
44+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
45+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
46+
# issue: https://github.com/owner/repo/1234

internal/pkg/otel/manager/common.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,14 @@ import (
99
"errors"
1010
"fmt"
1111
"net"
12+
"strings"
1213

1314
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
15+
"go.opentelemetry.io/collector/component"
16+
"go.opentelemetry.io/collector/component/componentstatus"
17+
"go.opentelemetry.io/collector/confmap"
18+
"go.opentelemetry.io/collector/otelcol"
19+
"go.opentelemetry.io/collector/pipeline"
1420
)
1521

1622
// for testing purposes
@@ -80,3 +86,131 @@ func findRandomTCPPorts(count int) (ports []int, err error) {
8086

8187
return ports, err
8288
}
89+
90+
// otelConfigToStatus converts the `cfg` to `status.AggregateStatus` using the reported error.
91+
//
92+
// The flow of this function comes from https://github.com/open-telemetry/opentelemetry-collector/blob/main/service/internal/graph/graph.go
93+
// It's a much simpler version, but follows the same for loop ordering and building of connectors of the internal
94+
// graph system that OTEL uses to build its component graph.
95+
func otelConfigToStatus(cfg *confmap.Conf, err error) (*status.AggregateStatus, error) {
96+
// marshall into config
97+
var c otelcol.Config
98+
if unmarshalErr := cfg.Unmarshal(&c); unmarshalErr != nil {
99+
return nil, fmt.Errorf("could not unmarshal config: %w", unmarshalErr)
100+
}
101+
102+
// should at least define a single pipeline
103+
if len(c.Service.Pipelines) == 0 {
104+
return nil, fmt.Errorf("no pipelines defined")
105+
}
106+
107+
// aggregators are used to create the overall status structure
108+
// aggGeneric is used to for a generic aggregator status where all instances get the same error
109+
// aggSpecific is used to provide status to the specific instance that caused the error
110+
// aggSpecific is only used if matchOccurred is true
111+
aggGeneric := status.NewAggregator(status.PriorityPermanent)
112+
aggSpecific := status.NewAggregator(status.PriorityPermanent)
113+
matchOccurred := false
114+
115+
// extensions
116+
for _, id := range c.Service.Extensions {
117+
instanceID := componentstatus.NewInstanceID(id, component.KindExtension)
118+
aggGeneric.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err))
119+
if recordSpecificErr(aggSpecific, instanceID, err) {
120+
matchOccurred = true
121+
}
122+
}
123+
124+
// track connectors
125+
connectors := make(map[component.ID]struct{})
126+
connectorsAsReceiver := make(map[component.ID][]pipeline.ID)
127+
connectorsAsExporter := make(map[component.ID][]pipeline.ID)
128+
129+
// pipelines
130+
for pipelineID, pipelineCfg := range c.Service.Pipelines {
131+
for _, recvID := range pipelineCfg.Receivers {
132+
// upstream graph creates a single component instance for a set of pipelines, then status reporting
133+
// copies the instance for each pipeline. creating a unique instance per-pipeline provides the same
134+
// behavior.
135+
instanceID := componentstatus.NewInstanceID(recvID, component.KindReceiver, pipelineID)
136+
_, isConnector := c.Connectors[recvID]
137+
if isConnector {
138+
connectors[recvID] = struct{}{}
139+
connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID)
140+
}
141+
aggGeneric.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err))
142+
if recordSpecificErr(aggSpecific, instanceID, err) {
143+
matchOccurred = true
144+
}
145+
}
146+
for _, procID := range pipelineCfg.Processors {
147+
instanceID := componentstatus.NewInstanceID(procID, component.KindProcessor, pipelineID)
148+
aggGeneric.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err))
149+
if recordSpecificErr(aggSpecific, instanceID, err) {
150+
matchOccurred = true
151+
}
152+
}
153+
for _, exporterID := range pipelineCfg.Exporters {
154+
instanceID := componentstatus.NewInstanceID(exporterID, component.KindExporter, pipelineID)
155+
_, isConnector := c.Connectors[exporterID]
156+
if isConnector {
157+
connectors[exporterID] = struct{}{}
158+
connectorsAsExporter[exporterID] = append(connectorsAsExporter[exporterID], pipelineID)
159+
}
160+
aggGeneric.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err))
161+
if recordSpecificErr(aggSpecific, instanceID, err) {
162+
matchOccurred = true
163+
}
164+
}
165+
}
166+
167+
// connectors
168+
for connID := range connectors {
169+
extraMatchStr := fmt.Sprintf("connector %q used as", connID)
170+
for _, eID := range connectorsAsExporter[connID] {
171+
for _, rID := range connectorsAsReceiver[connID] {
172+
instanceID := componentstatus.NewInstanceID(
173+
connID, component.KindConnector, eID, rID,
174+
)
175+
aggGeneric.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err))
176+
if recordSpecificErr(aggSpecific, instanceID, err, extraMatchStr) {
177+
matchOccurred = true
178+
}
179+
}
180+
}
181+
}
182+
183+
if matchOccurred {
184+
// specific for the matched error
185+
aggStatus, _ := aggSpecific.AggregateStatus(status.ScopeAll, status.Verbose)
186+
return aggStatus, nil
187+
}
188+
// no match found so generic failed on all instances
189+
aggStatus, _ := aggGeneric.AggregateStatus(status.ScopeAll, status.Verbose)
190+
return aggStatus, nil
191+
}
192+
193+
func recordSpecificErr(agg *status.Aggregator, instanceID *componentstatus.InstanceID, err error, extraMatchStrs ...string) bool {
194+
// matches configuration errors for a specific component
195+
forIDStr := fmt.Sprintf("for id: %q", instanceID.ComponentID().String())
196+
// occurs when a specific component fails to start
197+
failedMatchStr := fmt.Sprintf("failed to start %q %s:", instanceID.ComponentID().String(), strings.ToLower(instanceID.Kind().String()))
198+
// occurs when a component factory is not available (unknown component type)
199+
factoryNotAvailableStr := fmt.Sprintf("factory not available for: %q", instanceID.ComponentID().String())
200+
if strings.Contains(err.Error(), forIDStr) || strings.Contains(err.Error(), failedMatchStr) || strings.Contains(err.Error(), factoryNotAvailableStr) {
201+
// specific so this instance gets the reported error
202+
agg.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err))
203+
return true
204+
}
205+
// extra matchers
206+
for _, matchStr := range extraMatchStrs {
207+
if strings.Contains(err.Error(), matchStr) {
208+
// specific so this instance gets the reported error
209+
agg.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err))
210+
return true
211+
}
212+
}
213+
// not specific to this instance, so we record this one as starting
214+
agg.RecordStatus(instanceID, componentstatus.NewEvent(componentstatus.StatusStarting))
215+
return false
216+
}

0 commit comments

Comments
 (0)