Skip to content

Commit 62b9a8e

Browse files
committed
Collect SSVC trees
Signed-off-by: Tushar Goel <[email protected]>
1 parent be89117 commit 62b9a8e

File tree

5 files changed

+297
-114
lines changed

5 files changed

+297
-114
lines changed
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# Generated by Django 4.2.25 on 2025-11-26 13:31
2+
3+
from django.db import migrations, models
4+
import django.db.models.deletion
5+
6+
7+
class Migration(migrations.Migration):
8+
9+
dependencies = [
10+
("vulnerabilities", "0103_codecommit_impactedpackage_affecting_commits_and_more"),
11+
]
12+
13+
operations = [
14+
migrations.CreateModel(
15+
name="SSVC",
16+
fields=[
17+
(
18+
"id",
19+
models.AutoField(
20+
auto_created=True, primary_key=True, serialize=False, verbose_name="ID"
21+
),
22+
),
23+
(
24+
"vector",
25+
models.CharField(
26+
help_text="The vector string representing the SSVC.", max_length=255
27+
),
28+
),
29+
(
30+
"options",
31+
models.JSONField(help_text="A JSON object containing the SSVC options."),
32+
),
33+
(
34+
"decision",
35+
models.CharField(help_text="The decision string for the SSVC.", max_length=255),
36+
),
37+
(
38+
"advisory",
39+
models.ForeignKey(
40+
help_text="The advisory associated with this SSVC.",
41+
on_delete=django.db.models.deletion.CASCADE,
42+
related_name="ssvc_entries",
43+
to="vulnerabilities.advisoryv2",
44+
),
45+
),
46+
],
47+
options={
48+
"unique_together": {("vector", "advisory", "decision")},
49+
},
50+
),
51+
]

vulnerabilities/models.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3414,3 +3414,21 @@ class CodeCommit(models.Model):
34143414

34153415
class Meta:
34163416
unique_together = ("commit_hash", "vcs_url")
3417+
3418+
3419+
class SSVC(models.Model):
3420+
vector = models.CharField(max_length=255, help_text="The vector string representing the SSVC.")
3421+
options = models.JSONField(help_text="A JSON object containing the SSVC options.")
3422+
advisory = models.ForeignKey(
3423+
AdvisoryV2,
3424+
on_delete=models.CASCADE,
3425+
related_name="ssvc_entries",
3426+
help_text="The advisory associated with this SSVC.",
3427+
)
3428+
decision = models.CharField(max_length=255, help_text="The decision string for the SSVC.")
3429+
3430+
def __str__(self):
3431+
return f"SSVC for Advisory {self.advisory.advisory_id}: {self.decision}"
3432+
3433+
class Meta:
3434+
unique_together = ("vector", "advisory", "decision")

vulnerabilities/pipelines/v2_importers/vulnrichment_importer.py

Lines changed: 1 addition & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from vulnerabilities.utils import get_advisory_url
1717
from vulnerabilities.utils import get_cwe_id
1818
from vulnerabilities.utils import get_reference_id
19+
from vulnerabilities.utils import ssvc_calculator
1920

2021
logger = logging.getLogger(__name__)
2122

@@ -210,117 +211,3 @@ def clean_downloads(self):
210211

211212
def on_failure(self):
212213
self.clean_downloads()
213-
214-
215-
def ssvc_calculator(ssvc_data):
216-
"""
217-
Return the ssvc vector and the decision value
218-
"""
219-
options = ssvc_data.get("options", [])
220-
timestamp = ssvc_data.get("timestamp")
221-
222-
# Extract the options into a dictionary
223-
options_dict = {k: v.lower() for option in options for k, v in option.items()}
224-
225-
# We copied the table value from this link.
226-
# https://www.cisa.gov/sites/default/files/publications/cisa-ssvc-guide%20508c.pdf
227-
228-
# Determining Mission and Well-Being Impact Value
229-
mission_well_being_table = {
230-
# (Mission Prevalence, Public Well-being Impact) : "Mission & Well-being"
231-
("minimal", "minimal"): "low",
232-
("minimal", "material"): "medium",
233-
("minimal", "irreversible"): "high",
234-
("support", "minimal"): "medium",
235-
("support", "material"): "medium",
236-
("support", "irreversible"): "high",
237-
("essential", "minimal"): "high",
238-
("essential", "material"): "high",
239-
("essential", "irreversible"): "high",
240-
}
241-
242-
if "Mission Prevalence" not in options_dict:
243-
options_dict["Mission Prevalence"] = "minimal"
244-
245-
if "Public Well-being Impact" not in options_dict:
246-
options_dict["Public Well-being Impact"] = "material"
247-
248-
options_dict["Mission & Well-being"] = mission_well_being_table[
249-
(options_dict["Mission Prevalence"], options_dict["Public Well-being Impact"])
250-
]
251-
252-
decision_key = (
253-
options_dict.get("Exploitation"),
254-
options_dict.get("Automatable"),
255-
options_dict.get("Technical Impact"),
256-
options_dict.get("Mission & Well-being"),
257-
)
258-
259-
decision_points = {
260-
"Exploitation": {"E": {"none": "N", "poc": "P", "active": "A"}},
261-
"Automatable": {"A": {"no": "N", "yes": "Y"}},
262-
"Technical Impact": {"T": {"partial": "P", "total": "T"}},
263-
"Public Well-being Impact": {"B": {"minimal": "M", "material": "A", "irreversible": "I"}},
264-
"Mission Prevalence": {"P": {"minimal": "M", "support": "S", "essential": "E"}},
265-
"Mission & Well-being": {"M": {"low": "L", "medium": "M", "high": "H"}},
266-
}
267-
268-
# Create the SSVC vector
269-
ssvc_vector = "SSVCv2/"
270-
for key, value_map in options_dict.items():
271-
options_key = decision_points.get(key)
272-
for lhs, rhs_map in options_key.items():
273-
ssvc_vector += f"{lhs}:{rhs_map.get(value_map)}/"
274-
275-
# "Decision": {"D": {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"}},
276-
decision_values = {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"}
277-
278-
decision_lookup = {
279-
("none", "no", "partial", "low"): "Track",
280-
("none", "no", "partial", "medium"): "Track",
281-
("none", "no", "partial", "high"): "Track",
282-
("none", "no", "total", "low"): "Track",
283-
("none", "no", "total", "medium"): "Track",
284-
("none", "no", "total", "high"): "Track*",
285-
("none", "yes", "partial", "low"): "Track",
286-
("none", "yes", "partial", "medium"): "Track",
287-
("none", "yes", "partial", "high"): "Attend",
288-
("none", "yes", "total", "low"): "Track",
289-
("none", "yes", "total", "medium"): "Track",
290-
("none", "yes", "total", "high"): "Attend",
291-
("poc", "no", "partial", "low"): "Track",
292-
("poc", "no", "partial", "medium"): "Track",
293-
("poc", "no", "partial", "high"): "Track*",
294-
("poc", "no", "total", "low"): "Track",
295-
("poc", "no", "total", "medium"): "Track*",
296-
("poc", "no", "total", "high"): "Attend",
297-
("poc", "yes", "partial", "low"): "Track",
298-
("poc", "yes", "partial", "medium"): "Track",
299-
("poc", "yes", "partial", "high"): "Attend",
300-
("poc", "yes", "total", "low"): "Track",
301-
("poc", "yes", "total", "medium"): "Track*",
302-
("poc", "yes", "total", "high"): "Attend",
303-
("active", "no", "partial", "low"): "Track",
304-
("active", "no", "partial", "medium"): "Track",
305-
("active", "no", "partial", "high"): "Attend",
306-
("active", "no", "total", "low"): "Track",
307-
("active", "no", "total", "medium"): "Attend",
308-
("active", "no", "total", "high"): "Act",
309-
("active", "yes", "partial", "low"): "Attend",
310-
("active", "yes", "partial", "medium"): "Attend",
311-
("active", "yes", "partial", "high"): "Act",
312-
("active", "yes", "total", "low"): "Attend",
313-
("active", "yes", "total", "medium"): "Act",
314-
("active", "yes", "total", "high"): "Act",
315-
}
316-
317-
decision = decision_lookup.get(decision_key, "")
318-
319-
if decision:
320-
ssvc_vector += f"D:{decision_values.get(decision)}/"
321-
322-
if timestamp:
323-
timestamp_formatted = dateparser.parse(timestamp).strftime("%Y-%m-%dT%H:%M:%SZ")
324-
325-
ssvc_vector += f"{timestamp_formatted}/"
326-
return ssvc_vector, decision
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import json
2+
import logging
3+
from pathlib import Path
4+
from typing import Iterable, List
5+
6+
from fetchcode.vcs import fetch_via_vcs
7+
8+
from vulnerabilities.importer import AdvisoryData
9+
from vulnerabilities.models import SSVC, AdvisoryV2
10+
from vulnerabilities.pipelines import VulnerableCodePipeline
11+
from vulnerabilities.severity_systems import SCORING_SYSTEMS
12+
from vulnerabilities.utils import ssvc_calculator
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
class CollectSSVCPipeline(VulnerableCodePipeline):
18+
"""
19+
Collect SSVC Pipeline
20+
21+
This pipeline collects SSVC from Vulnrichment project and associates them with existing advisories.
22+
"""
23+
24+
pipeline_id = "collect_ssvc"
25+
spdx_license_expression = "CC0-1.0"
26+
license_url = "https://github.com/cisagov/vulnrichment/blob/develop/LICENSE"
27+
repo_url = "git+https://github.com/cisagov/vulnrichment.git"
28+
29+
@classmethod
30+
def steps(cls):
31+
return (
32+
cls.clone,
33+
cls.collect_ssvc_data,
34+
cls.clean_downloads,
35+
)
36+
37+
def clone(self):
38+
self.log(f"Cloning `{self.repo_url}`")
39+
self.vcs_response = fetch_via_vcs(self.repo_url)
40+
41+
def collect_ssvc_data(self):
42+
self.log(self.vcs_response.dest_dir)
43+
base_path = Path(self.vcs_response.dest_dir)
44+
for file_path in base_path.glob("**/**/*.json"):
45+
self.log(f"Processing file: {file_path}")
46+
if not file_path.name.startswith("CVE-"):
47+
continue
48+
with open(file_path) as f:
49+
raw_data = json.load(f)
50+
file_name = file_path.name
51+
# strip .json from file name
52+
cve_id = file_name[:-5]
53+
advisories = list(AdvisoryV2.objects.filter(advisory_id=cve_id))
54+
if not advisories:
55+
self.log(f"No advisories found for CVE ID: {cve_id}")
56+
continue
57+
self.parse_cve_advisory(raw_data, advisories)
58+
59+
def parse_cve_advisory(self, raw_data, advisories: List[AdvisoryV2]):
60+
self.log(f"Processing CVE data")
61+
cve_metadata = raw_data.get("cveMetadata", {})
62+
cve_id = cve_metadata.get("cveId")
63+
64+
containers = raw_data.get("containers", {})
65+
adp_data = containers.get("adp", {})
66+
self.log(f"Processing ADP")
67+
68+
metrics = [
69+
adp_metrics for data in adp_data for adp_metrics in data.get("metrics", [])
70+
]
71+
72+
vulnrichment_scoring_system = {
73+
"other": {
74+
"ssvc": SCORING_SYSTEMS["ssvc"],
75+
}, # ignore kev
76+
}
77+
78+
for metric in metrics:
79+
self.log(metric)
80+
self.log(f"Processing metric")
81+
for metric_type, metric_value in metric.items():
82+
if metric_type not in vulnrichment_scoring_system:
83+
continue
84+
85+
if metric_type == "other":
86+
other_types = metric_value.get("type")
87+
self.log(f"Processing SSVC")
88+
if other_types == "ssvc":
89+
content = metric_value.get("content", {})
90+
options = content.get("options", {})
91+
vector_string, decision = ssvc_calculator(content)
92+
advisories = list(AdvisoryV2.objects.filter(advisory_id=cve_id))
93+
if not advisories:
94+
continue
95+
ssvc_trees = []
96+
for advisory in advisories:
97+
obj = SSVC(
98+
advisory=advisory,
99+
options=options,
100+
decision=decision,
101+
vector=vector_string,
102+
)
103+
ssvc_trees.append(obj)
104+
SSVC.objects.bulk_create(ssvc_trees, ignore_conflicts=True, batch_size=1000)
105+
106+
def clean_downloads(self):
107+
if self.vcs_response:
108+
self.log("Removing cloned repository")
109+
self.vcs_response.delete()
110+
111+
def on_failure(self):
112+
self.clean_downloads()

0 commit comments

Comments
 (0)