Skip to content

Conversation

@akshraj-crest
Copy link
Contributor

@akshraj-crest akshraj-crest commented Dec 31, 2025

Proposed commit message

The initial release includes vulnerability data stream and associated dashboard.

Flashpoint fields are mapped to their corresponding ECS fields where possible.

Test samples were derived from live data samples, which were subsequently
sanitized

Checklist

  • I have reviewed tips for building integrations and this pull request is aligned with them.
  • I have verified that all data streams collect metrics or logs.
  • I have added an entry to my package's changelog.yml file.
  • I have verified that Kibana version constraints are current according to guidelines.
  • I have verified that any added dashboard complies with Kibana's Dashboard good practices

How to test this PR locally

To test the flashpoint package:

  • Clone integrations repo.
  • Install elastic package locally.
  • Start elastic stack using elastic-package.
  • Move to integrations/packages/ti_flashpoint directory.
  • Run the following command to run tests.

elastic-package test

Run asset tests for the package
--- Test results for package: ti_flashpoint - START ---
╭───────────────┬───────────────┬───────────┬────────────────────────────────────────────────────────────────────────┬────────┬──────────────╮
│ PACKAGE       │ DATA STREAM   │ TEST TYPE │ TEST NAME                                                              │ RESULT │ TIME ELAPSED │
├───────────────┼───────────────┼───────────┼────────────────────────────────────────────────────────────────────────┼────────┼──────────────┤
│ ti_flashpoint │               │ asset     │ dashboard ti_flashpoint-cfe7739d-dce8-46e0-9f7e-4d077bc5c7bc is loaded │ PASS   │      1.063µs │
│ ti_flashpoint │               │ asset     │ search ti_flashpoint-02229216-8fcd-4a07-8c65-782f455fcfab is loaded    │ PASS   │        251ns │
│ ti_flashpoint │               │ asset     │ search ti_flashpoint-279eadc8-e6f2-4a00-a5cf-f01bd434eb6e is loaded    │ PASS   │        215ns │
│ ti_flashpoint │               │ asset     │ search ti_flashpoint-78e2de59-5a14-4a7b-9328-69b9b310c0b7 is loaded    │ PASS   │        237ns │
│ ti_flashpoint │ vulnerability │ asset     │ index_template logs-ti_flashpoint.vulnerability is loaded              │ PASS   │        151ns │
│ ti_flashpoint │ vulnerability │ asset     │ ingest_pipeline logs-ti_flashpoint.vulnerability-0.1.0 is loaded       │ PASS   │        149ns │
╰───────────────┴───────────────┴───────────┴────────────────────────────────────────────────────────────────────────┴────────┴──────────────╯
--- Test results for package: ti_flashpoint - END   ---
Done
Run pipeline tests for the package
--- Test results for package: ti_flashpoint - START ---
╭───────────────┬───────────────┬───────────┬───────────────────────────────────────────────────┬────────┬──────────────╮
│ PACKAGE       │ DATA STREAM   │ TEST TYPE │ TEST NAME                                         │ RESULT │ TIME ELAPSED │
├───────────────┼───────────────┼───────────┼───────────────────────────────────────────────────┼────────┼──────────────┤
│ ti_flashpoint │ vulnerability │ pipeline  │ (ingest pipeline warnings test-vulnerability.log) │ PASS   │ 511.002667ms │
│ ti_flashpoint │ vulnerability │ pipeline  │ test-vulnerability.log                            │ PASS   │ 204.086019ms │
╰───────────────┴───────────────┴───────────┴───────────────────────────────────────────────────┴────────┴──────────────╯
--- Test results for package: ti_flashpoint - END   ---
Done
Run policy tests for the package
--- Test results for package: ti_flashpoint - START ---
No test results
--- Test results for package: ti_flashpoint - END   ---
Done
Run script tests for the package
--- Test results for package: ti_flashpoint - START ---
PKG ti_flashpoint
[no test files]
--- Test results for package: ti_flashpoint - END ---
Done
Run static tests for the package
--- Test results for package: ti_flashpoint - START ---
╭───────────────┬───────────────┬───────────┬──────────────────────────┬────────┬──────────────╮
│ PACKAGE       │ DATA STREAM   │ TEST TYPE │ TEST NAME                │ RESULT │ TIME ELAPSED │
├───────────────┼───────────────┼───────────┼──────────────────────────┼────────┼──────────────┤
│ ti_flashpoint │ vulnerability │ static    │ Verify sample_event.json │ PASS   │ 111.144868ms │
╰───────────────┴───────────────┴───────────┴──────────────────────────┴────────┴──────────────╯
--- Test results for package: ti_flashpoint - END   ---
Done
Run system tests for the package
2025/12/31 17:26:48  INFO Installing package...
2025/12/31 17:27:00  INFO Running test for data_stream "vulnerability" with configuration 'default'
2025/12/31 17:27:08  INFO Setting up independent Elastic Agent...
2025/12/31 17:27:17  INFO Setting up service...
2025/12/31 17:27:39  INFO Validating test case...
2025/12/31 17:27:40  INFO Tearing down service...
2025/12/31 17:27:40  INFO Write container logs to file: /root/integrations/build/container-logs/ti_flashpoint-1767182260884325252.log
2025/12/31 17:27:43  INFO Tearing down agent...
2025/12/31 17:27:43  INFO Write container logs to file: /root/integrations/build/container-logs/elastic-agent-1767182263475260694.log
2025/12/31 17:27:50  INFO Uninstalling package...
--- Test results for package: ti_flashpoint - START ---
╭───────────────┬───────────────┬───────────┬───────────┬────────┬───────────────╮
│ PACKAGE       │ DATA STREAM   │ TEST TYPE │ TEST NAME │ RESULT │  TIME ELAPSED │
├───────────────┼───────────────┼───────────┼───────────┼────────┼───────────────┤
│ ti_flashpoint │ vulnerability │ system    │ default   │ PASS   │ 40.241326526s │
╰───────────────┴───────────────┴───────────┴───────────┴────────┴───────────────╯
--- Test results for package: ti_flashpoint - END   ---
Done

Screenshots

image (6) image (5)

Go Code for Ingest Pipeline Generation

The vulnerability data stream pipeline is generated using Go code built on top of the Dispear library.
Below is the code used for generating the pipeline logic:

package main

import (
	"fmt"
	"strings"

	. "github.com/efd6/dispear"
)

const (
	ECSVersion = "9.2.0"
	PkgRoot    = "json"
)
const errorFormat = "Processor {{{_ingest.on_failure_processor_type}}} with tag {{{_ingest.on_failure_processor_tag}}} in pipeline {{{_ingest.on_failure_pipeline}}} failed with message: {{{_ingest.on_failure_message}}}"

// removeErrorHandler creates a slice of Renderer objects that will remove the specified field
// from a document. If addErrorMessage is provided and set to false, only the removal operation
// will be performed. By default, or if addErrorMessage is true, the function will also append
// a predefined error message to the 'error.message' field. This is useful in scenarios such as
// an Ingest Pipeline where a problematic field needs to be removed from the document and an
// error message should be appended to indicate the removal.
//
// Usage:
//
//	renderers := removeErrorHandler("field_name") // Default behavior: removes field and appends error message
//	renderers := removeErrorHandler("field_name", false) // Removes field without appending error message
func removeErrorHandler(f string, addErrorMessage ...bool) []Renderer {
	// Default value for addErrorMessage is true
	addError := true
	// If addErrorMessage is provided, override the default value
	if len(addErrorMessage) > 0 {
		addError = addErrorMessage[0]
	}

	// Create a slice of Renderer to remove the specified field
	renderers := []Renderer{REMOVE(f)}

	// If addError is true, append the error message Renderer
	if addError {
		renderers = append(renderers, APPEND("error.message", errorFormat))
	}

	return renderers
}

// safeNavigateAndCheck converts a dot-separated field path to a safe navigation string.
//
// Example:
// "parent.child.grandchild" -> "ctx?.parent?.child?.grandchild"
func safeNavigateAndCheck(field string) string {
	parts := strings.Split(field, ".")
	condition := "ctx"
	for i, part := range parts {
		if i > 0 { // Skip the first part which is already included in the condition
			condition += fmt.Sprintf("?.%s", part)
		} else {
			condition += fmt.Sprintf(".%s", part)
		}
	}
	return condition
}

func main() {

	// Initial processors of pipeline

	DESCRIPTION("Pipeline for processing vulnerability logs.")

	DROP("empty events placeholder").IF("ctx.message == 'empty_events_placeholder'")

	SET("ecs.version").
		VALUE(ECSVersion).
		TAG("set ecs.version to 9.2.0")

	TERMINATE("data collection error").
		IF("ctx.error?.message != null && ctx.message == null && ctx.event?.original == null").
		DESCRIPTION("error message set and no data to process.")

	BLANK()
	BLANK().COMMENT("remove agentless metadata")

	REMOVE(
		"organization",
		"division",
		"team",
	).
		IF("ctx.organization instanceof String && ctx.division instanceof String && ctx.team instanceof String").
		IGNORE_MISSING(true).
		TAG("remove_agentless_tags").
		DESCRIPTION("Removes the fields added by Agentless as metadata, as they can collide with ECS fields.")

	BLANK()
	BLANK().COMMENT("parse the event JSON")

	RENAME("message", "event.original").
		IF("ctx.event?.original == null").
		DESCRIPTION("Renames the original `message` field to `event.original` to store a copy of the original message. The `event.original` field is not touched if the document already has one; it may happen when Logstash sends the document.").
		IGNORE_MISSING(true)

	REMOVE("message").
		TAG("remove_message").
		IF("ctx.event?.original != null").
		DESCRIPTION("The `message` field is no longer required if the document has an `event.original` field.").
		IGNORE_MISSING(true)

	JSON(PkgRoot, "event.original")

	// Add fingerprint

	BLANK()
	BLANK().COMMENT("Add fingerprint")

	FINGERPRINT("_id", "json.id", "json.timelines.last_modified_at").IGNORE_MISSING(true)

	// Setting event.* fields

	BLANK()
	BLANK().COMMENT("Set event.* fields")

	SET("event.kind").
		VALUE("event").
		TAG("set event.kind to event")
	APPEND("event.type", "info").TAG("Append event.type to info")
	APPEND("event.category", "vulnerability").TAG("Append event.category to vulnerability")

	// Script to rename fields to snake_case (hyphen to underscore)

	BLANK()
	BLANK().COMMENT("rename fields to snake_case (hyphen to underscore)")

	SCRIPT().
		TAG("script_normalize_field_names").
		DESCRIPTION("Convert field names from hyphen to underscore.").
		LANG("painless").
		SOURCE(`
        // Replace '-' with '_' in field names
        String normalize(String str) {
          return str.replace('-', '_');
        }

        // Recursive function to process objects
        def normalizeFields(def obj) {
          if (obj instanceof Map) {
            def newObj = new HashMap();
            for (entry in obj.entrySet()) {
              String newKey = normalize(entry.getKey());
              newObj.put(newKey, normalizeFields(entry.getValue()));
            }
            return newObj;
          } else if (obj instanceof List) {
            def newList = new ArrayList();
            for (item in obj) {
              newList.add(normalizeFields(item));
            }
            return newList;
          }
          return obj;
        }

        // Apply transformation
        if (ctx.json != null) {
          ctx.ti_flashpoint = ctx.ti_flashpoint ?: [:];
          ctx.ti_flashpoint.vulnerability = normalizeFields(ctx.json);
          ctx.remove('json');
        }
		`)

	// Use Date processors

	BLANK()
	BLANK().COMMENT("Date processors")

	for _, field := range []string{
		"ti_flashpoint.vulnerability.timelines.disclosed_at",
		"ti_flashpoint.vulnerability.timelines.discovered_at",
		"ti_flashpoint.vulnerability.timelines.exploit_published_at",
		"ti_flashpoint.vulnerability.timelines.exploited_in_the_wild_at",
		"ti_flashpoint.vulnerability.timelines.last_modified_at",
		"ti_flashpoint.vulnerability.timelines.published_at",
		"ti_flashpoint.vulnerability.timelines.solution_provided_at",
		"ti_flashpoint.vulnerability.timelines.third_party_solution_provided_at",
		"ti_flashpoint.vulnerability.timelines.vendor_acknowledged_at",
		"ti_flashpoint.vulnerability.timelines.vendor_informed_at",
	} {
		DATE(field, field, "ISO8601").
			IF(safeNavigateAndCheck(field) + " != null" + " && " + "ctx." + field + " != ''").
			ON_FAILURE(removeErrorHandler(field)...)
	}

	for _, field := range []struct {
		arrayField, rootField string
	}{
		{arrayField: "ti_flashpoint.vulnerability.cvss_v2s", rootField: "generated_at"},
		{arrayField: "ti_flashpoint.vulnerability.cvss_v3s", rootField: "generated_at"},
		{arrayField: "ti_flashpoint.vulnerability.cvss_v3s", rootField: "updated_at"},
		{arrayField: "ti_flashpoint.vulnerability.cvss_v4s", rootField: "generated_at"},
		{arrayField: "ti_flashpoint.vulnerability.cvss_v4s", rootField: "updated_at"},
		{arrayField: "ti_flashpoint.vulnerability.ext_references", rootField: "created_at"},
	} {
		FOREACH(field.arrayField,
			DATE("_ingest._value."+field.rootField, "_ingest._value."+field.rootField, "ISO8601").
				ON_FAILURE(removeErrorHandler("_ingest._value."+field.rootField, false)...),
		).IF(safeNavigateAndCheck(field.arrayField) + " instanceof List")
	}

	// Convert to Long

	BLANK()
	BLANK().COMMENT("Convert to Long")

	for _, field := range []string{
		"ti_flashpoint.vulnerability.exploits_count",
	} {
		CONVERT("", field, "long").
			IGNORE_MISSING(true).
			ON_FAILURE(removeErrorHandler(field)...)
	}

	// Convert to Double

	BLANK()
	BLANK().COMMENT("Convert to Double")

	for _, field := range []string{
		"ti_flashpoint.vulnerability.scores.cvssv3_score",
		"ti_flashpoint.vulnerability.scores.epss_score",
		"ti_flashpoint.vulnerability.scores.epss_v1_score",
	} {
		CONVERT("", field, "double").
			IGNORE_MISSING(true).
			ON_FAILURE(removeErrorHandler(field)...)
	}

	for _, field := range []struct {
		arrayField, rootField string
	}{
		{arrayField: "ti_flashpoint.vulnerability.cvss_v2s", rootField: "calculated_cvss_base_score"},
		{arrayField: "ti_flashpoint.vulnerability.cvss_v3s", rootField: "calculated_cvss_base_score"},
		{arrayField: "ti_flashpoint.vulnerability.cvss_v3s", rootField: "temporal_score"},
		{arrayField: "ti_flashpoint.vulnerability.cvss_v4s", rootField: "threat_score"},
	} {
		FOREACH(field.arrayField,
			CONVERT("", "_ingest._value."+field.rootField, "double").
				IGNORE_MISSING(true).
				ON_FAILURE(removeErrorHandler("_ingest._value."+field.rootField)...),
		).IF(safeNavigateAndCheck(field.arrayField) + " instanceof List")
	}

	// Convert to Float

	BLANK()
	BLANK().COMMENT("Convert to Float")
	for _, field := range []struct {
		arrayField, rootField string
	}{
		{arrayField: "ti_flashpoint.vulnerability.cvss_v2s", rootField: "score"},
		{arrayField: "ti_flashpoint.vulnerability.cvss_v3s", rootField: "score"},
		{arrayField: "ti_flashpoint.vulnerability.cvss_v4s", rootField: "score"},
	} {
		FOREACH(field.arrayField,
			CONVERT("", "_ingest._value."+field.rootField, "float").
				IGNORE_MISSING(true).
				ON_FAILURE(removeErrorHandler("_ingest._value."+field.rootField)...),
		).IF(safeNavigateAndCheck(field.arrayField) + " instanceof List")
	}

	// Convert to String

	BLANK()
	BLANK().COMMENT("Convert to String")

	for _, field := range []string{
		"ti_flashpoint.vulnerability.id",
	} {
		CONVERT("", field, "string").
			IGNORE_MISSING(true)
	}

	for _, field := range []struct {
		arrayField, rootField string
	}{
		{arrayField: "ti_flashpoint.vulnerability.cwes", rootField: "cwe_id"},
		{arrayField: "ti_flashpoint.vulnerability.products", rootField: "id"},
		{arrayField: "ti_flashpoint.vulnerability.vendors", rootField: "id"},
		{arrayField: "ti_flashpoint.vulnerability.cvss_v2s", rootField: "version"},
		{arrayField: "ti_flashpoint.vulnerability.cvss_v3s", rootField: "version"},
		{arrayField: "ti_flashpoint.vulnerability.cvss_v4s", rootField: "version"},
	} {
		FOREACH(field.arrayField,
			CONVERT("", "_ingest._value."+field.rootField, "string").
				IGNORE_MISSING(true),
		).IF(safeNavigateAndCheck(field.arrayField) + " instanceof List")
	}
	// Set ECS Mapping

	BLANK()
	BLANK().COMMENT("Map custom fields to corresponding ECS and related fields.")

	// Map ECS mapping for top-level fields

	for _, mapping := range []struct {
		ecsField, customField string
	}{
		{ecsField: "event.created", customField: "ti_flashpoint.vulnerability.timelines.published_at"},
		{ecsField: "@timestamp", customField: "ti_flashpoint.vulnerability.timelines.last_modified_at"},
		{ecsField: "vulnerability.severity", customField: "ti_flashpoint.vulnerability.scores.severity"},
		{ecsField: "event.id", customField: "ti_flashpoint.vulnerability.id"},
		{ecsField: "message", customField: "ti_flashpoint.vulnerability.description"},
		{ecsField: "vulnerability.description", customField: "ti_flashpoint.vulnerability.description"},
	} {
		SET(mapping.ecsField).
			COPY_FROM(mapping.customField).
			IGNORE_EMPTY(true).
			TAG(fmt.Sprintf("set %s from %s", mapping.ecsField, mapping.customField))
	}

	for _, mapping := range []struct {
		ecsField, customField string
	}{
		{ecsField: "vulnerability.id", customField: "ti_flashpoint.vulnerability.id"},
	} {
		APPEND(mapping.ecsField, "{{{"+mapping.customField+"}}}").
			IF(safeNavigateAndCheck(mapping.customField) + " != null").
			ALLOW_DUPLICATES(false).
			TAG(fmt.Sprintf("append %s from %s", mapping.ecsField, mapping.customField))
	}

	// Map ECS mapping for array fields
	for _, mapping := range []struct {
		ecsField, customArrayField, customRootField string
	}{
		{ecsField: "vulnerability.id", customArrayField: "ti_flashpoint.vulnerability.cve_ids", customRootField: ""},
		{ecsField: "vulnerability.score.base", customArrayField: "ti_flashpoint.vulnerability.cvss_v2s", customRootField: "score"},
		{ecsField: "vulnerability.id", customArrayField: "ti_flashpoint.vulnerability.cvss_v2s", customRootField: "cve_id"},
		{ecsField: "vulnerability.score.base", customArrayField: "ti_flashpoint.vulnerability.cvss_v3s", customRootField: "score"},
		{ecsField: "vulnerability.score.base", customArrayField: "ti_flashpoint.vulnerability.cvss_v4s", customRootField: "score"},
		{ecsField: "vulnerability.id", customArrayField: "ti_flashpoint.vulnerability.cvss_v3s", customRootField: "cve_id"},
		{ecsField: "vulnerability.score.temporal", customArrayField: "ti_flashpoint.vulnerability.cvss_v3s", customRootField: "temporal_score"},
		{ecsField: "vulnerability.id", customArrayField: "ti_flashpoint.vulnerability.cvss_v4s", customRootField: "cve_id"},
		{ecsField: "vulnerability.score.version", customArrayField: "ti_flashpoint.vulnerability.cvss_v2s", customRootField: "version"},
		{ecsField: "vulnerability.score.version", customArrayField: "ti_flashpoint.vulnerability.cvss_v3s", customRootField: "version"},
		{ecsField: "vulnerability.score.version", customArrayField: "ti_flashpoint.vulnerability.cvss_v4s", customRootField: "version"},
	} {
		valueExpr := "{{{_ingest._value}}}"
		if mapping.customRootField != "" {
			valueExpr = "{{{_ingest._value." + mapping.customRootField + "}}}"
		}

		FOREACH(
			mapping.customArrayField,
			APPEND(mapping.ecsField, valueExpr).
				ALLOW_DUPLICATES(false),
		).IF(
			safeNavigateAndCheck(mapping.customArrayField) + " instanceof List",
		)
	}

	for _, field := range []string{
		"vulnerability.score.base",
		"vulnerability.score.temporal",
	} {

		FOREACH(field,
			CONVERT("", "_ingest._value", "float").
				IGNORE_MISSING(true),
		).IF(safeNavigateAndCheck(field) + " instanceof List").IGNORE_FAILURE(true)
	}

	SET("vulnerability.classification").
		VALUE("cvss").
		TAG("set vulnerability.classification to cvss")

	// Remove Duplicate Fields.

	BLANK()
	BLANK().COMMENT("Remove duplicate custom fields if preserve_duplicate_custom_fields are not enabled")

	FOREACH("ti_flashpoint.vulnerability.cvss_v3s",
		REMOVE(
			"_ingest._value.temporal_score",
		).
			TAG("remove_custom_duplicate_fields_for_cvss_v3s").
			IGNORE_MISSING(true),
	).IF(safeNavigateAndCheck("ti_flashpoint.vulnerability.cvss_v3s") + " instanceof List && (ctx.tags == null || !ctx.tags.contains('preserve_duplicate_custom_fields'))")

	REMOVE(
		"ti_flashpoint.vulnerability.timelines.published_at",
		"ti_flashpoint.vulnerability.timelines.last_modified_at",
		"ti_flashpoint.vulnerability.scores.severity",
		"ti_flashpoint.vulnerability.id",
		"ti_flashpoint.vulnerability.description",
	).
		IF("ctx.tags == null || !ctx.tags.contains('preserve_duplicate_custom_fields')").
		TAG("remove_custom_duplicate_fields").
		IGNORE_MISSING(true)

	// Clean up script

	BLANK()
	BLANK().COMMENT("Cleanup")

	SCRIPT().
		TAG("script_to_drop_null_values").
		DESCRIPTION("This script processor iterates over the whole document to remove fields with null values.").
		LANG("painless").
		SOURCE(`
		void handleMap(Map map) {
		map.values().removeIf(v -> {
			if (v instanceof Map) {
			handleMap(v);
			} else if (v instanceof List) {
			handleList(v);
			}
			return v == null || v == '' || (v instanceof Map && v.size() == 0) || (v instanceof List && v.size() == 0)
		});
		}
		void handleList(List list) {
		list.removeIf(v -> {
			if (v instanceof Map) {
			handleMap(v);
			} else if (v instanceof List) {
			handleList(v);
			}
			return v == null || v == '' || (v instanceof Map && v.size() == 0) || (v instanceof List && v.size() == 0)
		});
		}
		handleMap(ctx);
		`)

	// Set and Append processor on last

	SET("event.kind").
		VALUE("pipeline_error").
		IF("ctx.error?.message != null").
		TAG("set event.kind to pipeline_error")
	APPEND("tags", "preserve_original_event").
		IF("ctx.error?.message != null").
		ALLOW_DUPLICATES(false)

	// Global on failure processor

	ON_FAILURE(
		APPEND("error.message", errorFormat),
		SET("event.kind").VALUE("pipeline_error").TAG("set event.kind to pipeline_error"),
		APPEND("tags", "preserve_original_event").
			ALLOW_DUPLICATES(false),
	)

	// Generate the pipeline

	Generate()
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant