1- import json
2- import logging
3- from pathlib import Path
4- from typing import Iterable , List
1+ #
2+ # Copyright (c) nexB Inc. and others. All rights reserved.
3+ # VulnerableCode is a trademark of nexB Inc.
4+ # SPDX-License-Identifier: Apache-2.0
5+ # See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+ # See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+ # See https://aboutcode.org for more information about nexB OSS projects.
8+ #
59
6- from fetchcode . vcs import fetch_via_vcs
10+ import logging
711
8- from vulnerabilities .importer import AdvisoryData
9- from vulnerabilities .models import SSVC , AdvisoryV2
12+ from django .db .models import Q
13+ from vulnerabilities .models import SSVC
14+ from vulnerabilities .models import AdvisoryV2
1015from vulnerabilities .pipelines import VulnerableCodePipeline
16+ from vulnerabilities .pipelines .v2_importers .vulnrichment_importer import VulnrichImporterPipeline
1117from vulnerabilities .severity_systems import SCORING_SYSTEMS
12- from vulnerabilities .utils import ssvc_calculator
1318
1419logger = logging .getLogger (__name__ )
1520
@@ -21,92 +26,97 @@ class CollectSSVCPipeline(VulnerableCodePipeline):
2126 This pipeline collects SSVC from Vulnrichment project and associates them with existing advisories.
2227 """
2328
24- pipeline_id = "collect_ssvc "
29+ pipeline_id = "collect_ssvc_tree_v2 "
2530 spdx_license_expression = "CC0-1.0"
26- license_url = "https://github.com/cisagov/vulnrichment/blob/develop/LICENSE"
27- repo_url = "git+https://github.com/cisagov/vulnrichment.git"
2831
2932 @classmethod
3033 def steps (cls ):
3134 return (
32- cls .clone ,
3335 cls .collect_ssvc_data ,
34- cls .clean_downloads ,
3536 )
3637
37- def clone (self ):
38- self .log (f"Cloning `{ self .repo_url } `" )
39- self .vcs_response = fetch_via_vcs (self .repo_url )
40-
4138 def collect_ssvc_data (self ):
42- self .log (self .vcs_response .dest_dir )
43- base_path = Path (self .vcs_response .dest_dir )
44- for file_path in base_path .glob ("**/**/*.json" ):
45- self .log (f"Processing file: { file_path } " )
46- if not file_path .name .startswith ("CVE-" ):
47- continue
48- with open (file_path ) as f :
49- raw_data = json .load (f )
50- file_name = file_path .name
51- # strip .json from file name
52- cve_id = file_name [:- 5 ]
53- advisories = list (AdvisoryV2 .objects .filter (advisory_id = cve_id ))
54- if not advisories :
55- self .log (f"No advisories found for CVE ID: { cve_id } " )
56- continue
57- self .parse_cve_advisory (raw_data , advisories )
58-
59- def parse_cve_advisory (self , raw_data , advisories : List [AdvisoryV2 ]):
60- self .log (f"Processing CVE data" )
61- cve_metadata = raw_data .get ("cveMetadata" , {})
62- cve_id = cve_metadata .get ("cveId" )
63-
64- containers = raw_data .get ("containers" , {})
65- adp_data = containers .get ("adp" , {})
66- self .log (f"Processing ADP" )
67-
68- metrics = [
69- adp_metrics for data in adp_data for adp_metrics in data .get ("metrics" , [])
70- ]
71-
72- vulnrichment_scoring_system = {
73- "other" : {
74- "ssvc" : SCORING_SYSTEMS ["ssvc" ],
75- }, # ignore kev
76- }
77-
78- for metric in metrics :
79- self .log (metric )
80- self .log (f"Processing metric" )
81- for metric_type , metric_value in metric .items ():
82- if metric_type not in vulnrichment_scoring_system :
83- continue
84-
85- if metric_type == "other" :
86- other_types = metric_value .get ("type" )
87- self .log (f"Processing SSVC" )
88- if other_types == "ssvc" :
89- content = metric_value .get ("content" , {})
90- options = content .get ("options" , {})
91- vector_string , decision = ssvc_calculator (content )
92- advisories = list (AdvisoryV2 .objects .filter (advisory_id = cve_id ))
93- if not advisories :
94- continue
95- ssvc_trees = []
96- for advisory in advisories :
97- obj = SSVC (
98- advisory = advisory ,
99- options = options ,
100- decision = decision ,
101- vector = vector_string ,
102- )
103- ssvc_trees .append (obj )
104- SSVC .objects .bulk_create (ssvc_trees , ignore_conflicts = True , batch_size = 1000 )
105-
106- def clean_downloads (self ):
107- if self .vcs_response :
108- self .log ("Removing cloned repository" )
109- self .vcs_response .delete ()
110-
111- def on_failure (self ):
112- self .clean_downloads ()
39+ vulnrichment_advisories = AdvisoryV2 .objects .filter (
40+ datasource_id = VulnrichImporterPipeline .pipeline_id ,
41+ )
42+ for advisory in vulnrichment_advisories :
43+ severities = advisory .severities .filter (scoring_system = SCORING_SYSTEMS ["ssvc" ])
44+ for severity in severities :
45+ ssvc_vector = severity .scoring_elements
46+ try :
47+ ssvc_tree , decision = convert_vector_to_tree_and_decision (ssvc_vector )
48+ self .log (f"Advisory: { advisory .advisory_id } , SSVC Tree: { ssvc_tree } , Decision: { decision } , vector: { ssvc_vector } " )
49+ ssvc_obj , _ = SSVC .objects .get_or_create (
50+ source_advisory = advisory ,
51+ defaults = {
52+ "options" : ssvc_tree ,
53+ "decision" : decision ,
54+ },
55+ )
56+ # All advisories that have advisory.advisory_id in their aliases or advisory_id same as advisory.advisory_id
57+ related_advisories = AdvisoryV2 .objects .filter (
58+ Q (advisory_id = advisory .advisory_id ) |
59+ Q (aliases__alias = advisory .advisory_id )
60+ ).distinct ()
61+ # remove the current advisory from related advisories
62+ related_advisories = related_advisories .exclude (id = advisory .id )
63+ ssvc_obj .related_advisories .set (related_advisories )
64+ except ValueError as e :
65+ logger .error (f"Failed to parse SSVC vector '{ ssvc_vector } ' for advisory '{ advisory } ': { e } " )
66+
67+ REVERSE_POINTS = {
68+ "E" : ("Exploitation" , {"N" : "none" , "P" : "poc" , "A" : "active" }),
69+ "A" : ("Automatable" , {"N" : "no" , "Y" : "yes" }),
70+ "T" : ("Technical Impact" , {"P" : "partial" , "T" : "total" }),
71+ "P" : ("Mission Prevalence" , {"M" : "minimal" , "S" : "support" , "E" : "essential" }),
72+ "B" : ("Public Well-being Impact" , {"M" : "minimal" , "A" : "material" , "I" : "irreversible" }),
73+ "M" : ("Mission & Well-being" , {"L" : "low" , "M" : "medium" , "H" : "high" }),
74+ }
75+
76+ REVERSE_DECISION = {
77+ "T" : "Track" ,
78+ "R" : "Track*" ,
79+ "A" : "Attend" ,
80+ "C" : "Act" ,
81+ }
82+
83+ VECTOR_ORDER = ["E" , "A" , "T" , "P" , "B" , "M" ]
84+
85+ def convert_vector_to_tree_and_decision (vector : str ):
86+ """
87+ Convert a given SSVC vector string into a structured tree and decision.
88+
89+ Args:
90+ vector (str): The SSVC vector string.
91+
92+ Returns:
93+ tuple: A tuple containing the SSVC tree (dict) and decision (str).
94+ """
95+ if not vector .startswith ("SSVCv2/" ):
96+ raise ValueError ("Invalid SSVC vector" )
97+
98+ parts = [p for p in vector .replace ("SSVCv2/" , "" ).split ("/" ) if p ]
99+
100+ options = []
101+ decision = None
102+
103+ for part in parts :
104+ if ":" not in part :
105+ continue
106+
107+ key , value = part .split (":" , 1 )
108+
109+ if key == "D" :
110+ decision = REVERSE_DECISION .get (value )
111+ continue
112+
113+ if key in REVERSE_POINTS :
114+ name , mapping = REVERSE_POINTS [key ]
115+ options .append ({name : mapping [value ]})
116+
117+ # Preserve canonical SSVC order
118+ options .sort (key = lambda o : VECTOR_ORDER .index (
119+ next (k for k , _ in REVERSE_POINTS .values () if k == next (iter (o )))
120+ ) if False else 0 )
121+
122+ return options , decision
0 commit comments