Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions vulnerabilities/api_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,31 @@ class AdvisoryV2Serializer(serializers.ModelSerializer):
references = AdvisoryReferenceSerializer(many=True)
severities = AdvisorySeveritySerializer(many=True)
advisory_id = serializers.CharField(source="avid", read_only=True)
related_ssvc_trees = serializers.SerializerMethodField()

def get_related_ssvc_trees(self, obj):
related_ssvcs = obj.related_ssvcs.all().select_related("source_advisory")
source_ssvcs = obj.source_ssvcs.all().select_related("source_advisory")

seen = set()
result = []

for ssvc in list(related_ssvcs) + list(source_ssvcs):
key = (ssvc.vector, ssvc.source_advisory_id)
if key in seen:
continue
seen.add(key)

result.append(
{
"vector": ssvc.vector,
"decision": ssvc.decision,
"options": ssvc.options,
"source_url": ssvc.source_advisory.url,
}
)

return result

class Meta:
model = AdvisoryV2
Expand All @@ -160,6 +185,7 @@ class Meta:
"exploitability",
"weighted_severity",
"risk_score",
"related_ssvc_trees",
]

def get_aliases(self, obj):
Expand Down
2 changes: 2 additions & 0 deletions vulnerabilities/improvers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from vulnerabilities.pipelines import flag_ghost_packages
from vulnerabilities.pipelines import populate_vulnerability_summary_pipeline
from vulnerabilities.pipelines import remove_duplicate_advisories
from vulnerabilities.pipelines.v2_improvers import collect_ssvc_trees
from vulnerabilities.pipelines.v2_improvers import compute_advisory_todo as compute_advisory_todo_v2
from vulnerabilities.pipelines.v2_improvers import compute_package_risk as compute_package_risk_v2
from vulnerabilities.pipelines.v2_improvers import (
Expand Down Expand Up @@ -70,5 +71,6 @@
compute_advisory_todo_v2.ComputeToDo,
unfurl_version_range_v2.UnfurlVersionRangePipeline,
compute_advisory_todo.ComputeToDo,
collect_ssvc_trees.CollectSSVCPipeline,
]
)
59 changes: 59 additions & 0 deletions vulnerabilities/migrations/0104_ssvc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Generated by Django 4.2.25 on 2025-12-15 15:15

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
("vulnerabilities", "0103_codecommit_impactedpackage_affecting_commits_and_more"),
]

operations = [
migrations.CreateModel(
name="SSVC",
fields=[
(
"id",
models.AutoField(
auto_created=True, primary_key=True, serialize=False, verbose_name="ID"
),
),
(
"vector",
models.CharField(
help_text="The vector string representing the SSVC.", max_length=255
),
),
(
"options",
models.JSONField(help_text="A JSON object containing the SSVC options."),
),
(
"decision",
models.CharField(help_text="The decision string for the SSVC.", max_length=255),
),
(
"related_advisories",
models.ManyToManyField(
help_text="Advisories associated with this SSVC.",
related_name="related_ssvcs",
to="vulnerabilities.advisoryv2",
),
),
(
"source_advisory",
models.ForeignKey(
help_text="The advisory that was used to generate this SSVC decision.",
on_delete=django.db.models.deletion.CASCADE,
related_name="source_ssvcs",
to="vulnerabilities.advisoryv2",
),
),
],
options={
"unique_together": {("vector", "source_advisory")},
},
),
]
23 changes: 23 additions & 0 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3414,3 +3414,26 @@ class CodeCommit(models.Model):

class Meta:
unique_together = ("commit_hash", "vcs_url")


class SSVC(models.Model):
vector = models.CharField(max_length=255, help_text="The vector string representing the SSVC.")
options = models.JSONField(help_text="A JSON object containing the SSVC options.")
decision = models.CharField(max_length=255, help_text="The decision string for the SSVC.")
related_advisories = models.ManyToManyField(
AdvisoryV2,
related_name="related_ssvcs",
help_text="Advisories associated with this SSVC.",
)
source_advisory = models.ForeignKey(
AdvisoryV2,
on_delete=models.CASCADE,
related_name="source_ssvcs",
help_text="The advisory that was used to generate this SSVC decision.",
)

def __str__(self):
return f"SSVC Decision: {self.vector} -> {self.decision}"

class Meta:
unique_together = ("vector", "source_advisory")
115 changes: 1 addition & 114 deletions vulnerabilities/pipelines/v2_importers/vulnrichment_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from vulnerabilities.utils import get_advisory_url
from vulnerabilities.utils import get_cwe_id
from vulnerabilities.utils import get_reference_id
from vulnerabilities.utils import ssvc_calculator

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -210,117 +211,3 @@ def clean_downloads(self):

def on_failure(self):
self.clean_downloads()


def ssvc_calculator(ssvc_data):
"""
Return the ssvc vector and the decision value
"""
options = ssvc_data.get("options", [])
timestamp = ssvc_data.get("timestamp")

# Extract the options into a dictionary
options_dict = {k: v.lower() for option in options for k, v in option.items()}

# We copied the table value from this link.
# https://www.cisa.gov/sites/default/files/publications/cisa-ssvc-guide%20508c.pdf

# Determining Mission and Well-Being Impact Value
mission_well_being_table = {
# (Mission Prevalence, Public Well-being Impact) : "Mission & Well-being"
("minimal", "minimal"): "low",
("minimal", "material"): "medium",
("minimal", "irreversible"): "high",
("support", "minimal"): "medium",
("support", "material"): "medium",
("support", "irreversible"): "high",
("essential", "minimal"): "high",
("essential", "material"): "high",
("essential", "irreversible"): "high",
}

if "Mission Prevalence" not in options_dict:
options_dict["Mission Prevalence"] = "minimal"

if "Public Well-being Impact" not in options_dict:
options_dict["Public Well-being Impact"] = "material"

options_dict["Mission & Well-being"] = mission_well_being_table[
(options_dict["Mission Prevalence"], options_dict["Public Well-being Impact"])
]

decision_key = (
options_dict.get("Exploitation"),
options_dict.get("Automatable"),
options_dict.get("Technical Impact"),
options_dict.get("Mission & Well-being"),
)

decision_points = {
"Exploitation": {"E": {"none": "N", "poc": "P", "active": "A"}},
"Automatable": {"A": {"no": "N", "yes": "Y"}},
"Technical Impact": {"T": {"partial": "P", "total": "T"}},
"Public Well-being Impact": {"B": {"minimal": "M", "material": "A", "irreversible": "I"}},
"Mission Prevalence": {"P": {"minimal": "M", "support": "S", "essential": "E"}},
"Mission & Well-being": {"M": {"low": "L", "medium": "M", "high": "H"}},
}

# Create the SSVC vector
ssvc_vector = "SSVCv2/"
for key, value_map in options_dict.items():
options_key = decision_points.get(key)
for lhs, rhs_map in options_key.items():
ssvc_vector += f"{lhs}:{rhs_map.get(value_map)}/"

# "Decision": {"D": {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"}},
decision_values = {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"}

decision_lookup = {
("none", "no", "partial", "low"): "Track",
("none", "no", "partial", "medium"): "Track",
("none", "no", "partial", "high"): "Track",
("none", "no", "total", "low"): "Track",
("none", "no", "total", "medium"): "Track",
("none", "no", "total", "high"): "Track*",
("none", "yes", "partial", "low"): "Track",
("none", "yes", "partial", "medium"): "Track",
("none", "yes", "partial", "high"): "Attend",
("none", "yes", "total", "low"): "Track",
("none", "yes", "total", "medium"): "Track",
("none", "yes", "total", "high"): "Attend",
("poc", "no", "partial", "low"): "Track",
("poc", "no", "partial", "medium"): "Track",
("poc", "no", "partial", "high"): "Track*",
("poc", "no", "total", "low"): "Track",
("poc", "no", "total", "medium"): "Track*",
("poc", "no", "total", "high"): "Attend",
("poc", "yes", "partial", "low"): "Track",
("poc", "yes", "partial", "medium"): "Track",
("poc", "yes", "partial", "high"): "Attend",
("poc", "yes", "total", "low"): "Track",
("poc", "yes", "total", "medium"): "Track*",
("poc", "yes", "total", "high"): "Attend",
("active", "no", "partial", "low"): "Track",
("active", "no", "partial", "medium"): "Track",
("active", "no", "partial", "high"): "Attend",
("active", "no", "total", "low"): "Track",
("active", "no", "total", "medium"): "Attend",
("active", "no", "total", "high"): "Act",
("active", "yes", "partial", "low"): "Attend",
("active", "yes", "partial", "medium"): "Attend",
("active", "yes", "partial", "high"): "Act",
("active", "yes", "total", "low"): "Attend",
("active", "yes", "total", "medium"): "Act",
("active", "yes", "total", "high"): "Act",
}

decision = decision_lookup.get(decision_key, "")

if decision:
ssvc_vector += f"D:{decision_values.get(decision)}/"

if timestamp:
timestamp_formatted = dateparser.parse(timestamp).strftime("%Y-%m-%dT%H:%M:%SZ")

ssvc_vector += f"{timestamp_formatted}/"
return ssvc_vector, decision
Loading
Loading