3636from vulnerabilities .severity_systems import SCORING_SYSTEMS
3737from vulnerabilities .severity_systems import ScoringSystem
3838from vulnerabilities .utils import classproperty
39+ from vulnerabilities .utils import compute_patch_checksum
3940from vulnerabilities .utils import get_reference_id
41+ from vulnerabilities .utils import is_commit
4042from vulnerabilities .utils import is_cve
4143from vulnerabilities .utils import nearest_patched_package
4244from vulnerabilities .utils import purl_to_dict
@@ -194,6 +196,103 @@ def from_url(cls, url):
194196 return cls (url = url )
195197
196198
199+ @dataclasses .dataclass (eq = True )
200+ @functools .total_ordering
201+ class PackageCommitPatchData :
202+ vcs_url : str
203+ commit_hash : str
204+ patch_text : Optional [str ] = None
205+ patch_checksum : Optional [str ] = dataclasses .field (init = False , default = None )
206+
207+ def __post_init__ (self ):
208+ if not self .commit_hash :
209+ raise ValueError ("Commit must have a non-empty commit_hash." )
210+
211+ if not is_commit (self .commit_hash ):
212+ raise ValueError (f"Commit must be a valid a commit_hash: { self .commit_hash } ." )
213+
214+ if not self .vcs_url :
215+ raise ValueError ("Commit must have a non-empty vcs_url." )
216+
217+ if self .patch_text :
218+ self .patch_checksum = compute_patch_checksum (self .patch_text )
219+
220+ def __lt__ (self , other ):
221+ if not isinstance (other , PackageCommitPatchData ):
222+ return NotImplemented
223+ return self ._cmp_key () < other ._cmp_key ()
224+
225+ # TODO: Add cache
226+ def _cmp_key (self ):
227+ return (
228+ self .vcs_url ,
229+ self .commit_hash ,
230+ self .patch_text ,
231+ self .patch_checksum ,
232+ )
233+
234+ def to_dict (self ) -> dict :
235+ """Return a normalized dictionary representation of the commit."""
236+ return {
237+ "vcs_url" : self .vcs_url ,
238+ "commit_hash" : self .commit_hash ,
239+ "patch_text" : self .patch_text ,
240+ "patch_checksum" : self .patch_checksum ,
241+ }
242+
243+ @classmethod
244+ def from_dict (cls , data : dict ):
245+ """Create a PackageCommitPatchData instance from a dictionary."""
246+ return cls (
247+ vcs_url = data .get ("vcs_url" ),
248+ commit_hash = data .get ("commit_hash" ),
249+ patch_text = data .get ("patch_text" ),
250+ )
251+
252+
253+ @dataclasses .dataclass (eq = True )
254+ @functools .total_ordering
255+ class PatchData :
256+ patch_url : Optional [str ] = None
257+ patch_text : Optional [str ] = None
258+ patch_checksum : Optional [str ] = dataclasses .field (init = False , default = None )
259+
260+ def __post_init__ (self ):
261+ if not self .patch_url and not self .patch_text :
262+ raise ValueError ("A patch must include either patch_url or patch_text" )
263+
264+ if self .patch_text :
265+ self .patch_checksum = compute_patch_checksum (self .patch_text )
266+
267+ def __lt__ (self , other ):
268+ if not isinstance (other , PatchData ):
269+ return NotImplemented
270+ return self ._cmp_key () < other ._cmp_key ()
271+
272+ def _cmp_key (self ):
273+ return (
274+ self .patch_url ,
275+ self .patch_text ,
276+ self .patch_checksum ,
277+ )
278+
279+ def to_dict (self ) -> dict :
280+ """Return a normalized dictionary representation of the commit."""
281+ return {
282+ "patch_url" : self .patch_url ,
283+ "patch_text" : self .patch_text ,
284+ "patch_checksum" : self .patch_checksum ,
285+ }
286+
287+ @classmethod
288+ def from_dict (cls , data : dict ):
289+ """Create a PatchData instance from a dictionary."""
290+ return cls (
291+ patch_url = data .get ("patch_url" ),
292+ patch_text = data .get ("patch_text" ),
293+ )
294+
295+
197296class UnMergeablePackageError (Exception ):
198297 """
199298 Raised when a package cannot be merged with another one.
@@ -344,21 +443,30 @@ class AffectedPackageV2:
344443 """
345444 Relate a Package URL with a range of affected versions and fixed versions.
346445 The Package URL must *not* have a version.
347- AffectedPackage must contain either ``affected_version_range`` or ``fixed_version_range``.
446+ AffectedPackage must contain either ``affected_version_range`` or ``fixed_version_range`` or ``introduced_by_commits`` or ``fixed_by_commits`` .
348447 """
349448
350449 package : PackageURL
351450 affected_version_range : Optional [VersionRange ] = None
352451 fixed_version_range : Optional [VersionRange ] = None
452+ introduced_by_commit_patches : List [PackageCommitPatchData ] = dataclasses .field (
453+ default_factory = list
454+ )
455+ fixed_by_commit_patches : List [PackageCommitPatchData ] = dataclasses .field (default_factory = list )
353456
354457 def __post_init__ (self ):
355458 if self .package .version :
356459 raise ValueError (f"Affected Package URL { self .package !r} cannot have a version." )
357460
358- if not (self .affected_version_range or self .fixed_version_range ):
461+ if not (
462+ self .affected_version_range
463+ or self .fixed_version_range
464+ or self .introduced_by_commit_patches
465+ or self .fixed_by_commit_patches
466+ ):
359467 raise ValueError (
360- f"Affected Package { self .package !r} should have either fixed version range or an "
361- "affected version range."
468+ f"Affected package { self .package !r} must have either a fixed version range, "
469+ "an affected version range, introduced commit patches, or fixed commit patches ."
362470 )
363471
364472 def __lt__ (self , other ):
@@ -372,6 +480,8 @@ def _cmp_key(self):
372480 str (self .package ),
373481 str (self .affected_version_range or "" ),
374482 str (self .fixed_version_range or "" ),
483+ str (self .introduced_by_commit_patches or []),
484+ str (self .fixed_by_commit_patches or []),
375485 )
376486
377487 def to_dict (self ):
@@ -385,6 +495,12 @@ def to_dict(self):
385495 "package" : purl_to_dict (self .package ),
386496 "affected_version_range" : affected_version_range ,
387497 "fixed_version_range" : fixed_version_range ,
498+ "introduced_by_commit_patches" : [
499+ commit .to_dict () for commit in self .introduced_by_commit_patches
500+ ],
501+ "fixed_by_commit_patches" : [
502+ commit .to_dict () for commit in self .fixed_by_commit_patches
503+ ],
388504 }
389505
390506 @classmethod
@@ -396,6 +512,10 @@ def from_dict(cls, affected_pkg: dict):
396512 fixed_version_range = None
397513 affected_range = affected_pkg ["affected_version_range" ]
398514 fixed_range = affected_pkg ["fixed_version_range" ]
515+ introduced_by_commit_patches = (
516+ affected_pkg .get ("introduced_by_package_commit_patches" ) or []
517+ )
518+ fixed_by_commit_patches = affected_pkg .get ("fixed_by_package_commit_patches" ) or []
399519
400520 try :
401521 affected_version_range = VersionRange .from_string (affected_range )
@@ -417,6 +537,12 @@ def from_dict(cls, affected_pkg: dict):
417537 package = package ,
418538 affected_version_range = affected_version_range ,
419539 fixed_version_range = fixed_version_range ,
540+ introduced_by_commit_patches = [
541+ PackageCommitPatchData .from_dict (commit ) for commit in introduced_by_commit_patches
542+ ],
543+ fixed_by_commit_patches = [
544+ PackageCommitPatchData .from_dict (commit ) for commit in fixed_by_commit_patches
545+ ],
420546 )
421547
422548
@@ -441,6 +567,7 @@ class AdvisoryData:
441567 )
442568 references : List [Reference ] = dataclasses .field (default_factory = list )
443569 references_v2 : List [ReferenceV2 ] = dataclasses .field (default_factory = list )
570+ patches : List [PatchData ] = dataclasses .field (default_factory = list )
444571 date_published : Optional [datetime .datetime ] = None
445572 weaknesses : List [int ] = dataclasses .field (default_factory = list )
446573 severities : List [VulnerabilitySeverity ] = dataclasses .field (default_factory = list )
@@ -473,6 +600,7 @@ def to_dict(self):
473600 "summary" : self .summary ,
474601 "affected_packages" : [pkg .to_dict () for pkg in self .affected_packages ],
475602 "references_v2" : [ref .to_dict () for ref in self .references_v2 ],
603+ "patches" : [patch .to_dict () for patch in self .patches ],
476604 "severities" : [sev .to_dict () for sev in self .severities ],
477605 "date_published" : self .date_published .isoformat () if self .date_published else None ,
478606 "weaknesses" : self .weaknesses ,
@@ -505,74 +633,7 @@ def from_dict(cls, advisory_data):
505633 "affected_packages" : [
506634 affected_package_cls .from_dict (pkg ) for pkg in affected_packages if pkg is not None
507635 ],
508- "references" : [Reference .from_dict (ref ) for ref in advisory_data ["references" ]],
509- "date_published" : datetime .datetime .fromisoformat (date_published )
510- if date_published
511- else None ,
512- "weaknesses" : advisory_data ["weaknesses" ],
513- "url" : advisory_data .get ("url" ) or None ,
514- }
515- return cls (** transformed )
516-
517-
518- @dataclasses .dataclass (order = True )
519- class AdvisoryDataV2 :
520- """
521- This data class expresses the contract between data sources and the import runner.
522-
523- If a vulnerability_id is present then:
524- summary or affected_packages or references must be present
525- otherwise
526- either affected_package or references should be present
527-
528- date_published must be aware datetime
529- """
530-
531- advisory_id : str = ""
532- aliases : List [str ] = dataclasses .field (default_factory = list )
533- summary : Optional [str ] = ""
534- affected_packages : List [AffectedPackage ] = dataclasses .field (default_factory = list )
535- references : List [ReferenceV2 ] = dataclasses .field (default_factory = list )
536- date_published : Optional [datetime .datetime ] = None
537- weaknesses : List [int ] = dataclasses .field (default_factory = list )
538- url : Optional [str ] = None
539-
540- def __post_init__ (self ):
541- if self .date_published and not self .date_published .tzinfo :
542- logger .warning (f"AdvisoryData with no tzinfo: { self !r} " )
543- if self .summary :
544- self .summary = self .clean_summary (self .summary )
545-
546- def clean_summary (self , summary ):
547- # https://nvd.nist.gov/vuln/detail/CVE-2013-4314
548- # https://github.com/cms-dev/cms/issues/888#issuecomment-516977572
549- summary = summary .strip ()
550- if summary :
551- summary = summary .replace ("\x00 " , "\uFFFD " )
552- return summary
553-
554- def to_dict (self ):
555- return {
556- "aliases" : self .aliases ,
557- "summary" : self .summary ,
558- "affected_packages" : [pkg .to_dict () for pkg in self .affected_packages ],
559- "references" : [ref .to_dict () for ref in self .references ],
560- "date_published" : self .date_published .isoformat () if self .date_published else None ,
561- "weaknesses" : self .weaknesses ,
562- "url" : self .url if self .url else "" ,
563- }
564-
565- @classmethod
566- def from_dict (cls , advisory_data ):
567- date_published = advisory_data ["date_published" ]
568- transformed = {
569- "aliases" : advisory_data ["aliases" ],
570- "summary" : advisory_data ["summary" ],
571- "affected_packages" : [
572- AffectedPackage .from_dict (pkg )
573- for pkg in advisory_data ["affected_packages" ]
574- if pkg is not None
575- ],
636+ "patches" : [PatchData .from_dict (patch ) for patch in advisory_data .get ("patches" , [])],
576637 "references" : [Reference .from_dict (ref ) for ref in advisory_data ["references" ]],
577638 "date_published" : datetime .datetime .fromisoformat (date_published )
578639 if date_published
0 commit comments