Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .github/workflows/sync.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: Daily sync of EUVD catalog

on:
workflow_dispatch:
schedule:
- cron: '0 0 * * *'

permissions:
contents: write

jobs:
scheduled:
runs-on: ubuntu-latest

steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.10'

- name: Install required packages
run: pip install requests==2.32.5 aboutcode.pipeline==0.2.1

- name: Run sync (daily)
run: python sync_catalog.py daily

- name: Commit and push if it changed
run: |-
git config user.name "AboutCode Automation"
git config user.email "[email protected]"
git add -A
timestamp=$(date -u)
git commit -m "$(echo -e "Sync EUVD catalog: $timestamp\n\nSigned-off-by: AboutCode Automation <[email protected]>")" || exit 0
git push
21 changes: 21 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Various junk and temp files
.DS_Store
*~
.*.sw[po]
.build
.ve
*.bak
var
share
selenium
local
/dist/
/.*cache/
/.venv/
/.python-version
/.pytest_cache/
/scancodeio.egg-info/
*.rdb
*.aof
.vscode
.ipynb_checkpoints
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
requests==2.32.5
aboutcode.pipeline==0.2.1
222 changes: 222 additions & 0 deletions sync_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import argparse
import json
import sys
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict

import requests

from aboutcode.pipeline import BasePipeline, LoopProgress

ROOT_PATH = Path(__file__).parent
CATALOG_PATH = ROOT_PATH / "catalog"
PAGE_DIRECTORY = CATALOG_PATH / "pages"

API_URL = "https://euvdservices.enisa.europa.eu/api/search"

HEADERS = {
"User-Agent": "Vulnerablecode",
"Accept": "application/json",
}

PAGE_SIZE = 100
DEFAULT_START_YEAR = 1970
REQUEST_TIMEOUT = 10


class EuvdCatalogMirror(BasePipeline):
def __init__(self) -> None:
super().__init__()
self.session = requests.Session()

@classmethod
def steps(cls):
return (cls.collect_catalog,)

def collect_catalog(self) -> None:
if getattr(self, "mode", "backfill") == "daily":
return self.sync_yesterday()
self.backfill_from_year(DEFAULT_START_YEAR)

def backfill_from_year(self, start_year: int) -> None:
today = date.today()
backfill_start = date(start_year, 1, 1)
backfill_end = today

months: list[tuple[int, int]] = []
current = backfill_start

while current <= backfill_end:
months.append((current.year, current.month))
if current.month == 12:
current = date(current.year + 1, 1, 1)
continue

current = date(current.year, current.month + 1, 1)

progress = LoopProgress(total_iterations=len(months), logger=self.log)

for year, month in progress.iter(months):
month_start = date(year, month, 1)
day_token = month_start.isoformat()

if year == backfill_end.year and month == backfill_end.month:
self.log(f"backfill {year}-{month:02d}: {month_start} to {backfill_end}")
self._collect_paginated(
start=month_start,
end=backfill_end,
year=year,
month=month,
day_token=day_token,
)
continue

if month == 12:
next_month = date(year + 1, 1, 1)
else:
next_month = date(year, month + 1, 1)
month_end = next_month - timedelta(days=1)

self.log(f"month {year}-{month:02d}: {month_start} to {month_end}")
self._collect_paginated(
start=month_start,
end=month_end,
year=year,
month=month,
day_token=day_token,
)

def sync_yesterday(self) -> None:
target_date = date.today() - timedelta(days=1)
self.log(f"day {target_date}: updated_on={target_date}")
day_token = target_date.isoformat()

self._collect_paginated(
start=target_date,
end=target_date,
year=target_date.year,
month=target_date.month,
day_token=day_token,
)

def _collect_paginated(
self,
start: date,
end: date,
year: int,
month: int,
day_token: str,
) -> None:
"""
Fetches all results from the EUVD API for the given date range, handling pagination.Each page of results is saved as a separate JSON file under the appropriate year/month directory. Fetching stops when a page returns fewer results than PAGE_SIZE, meaning there are no more results to fetch.
"""
page = 0

while True:
params = {
"fromUpdatedDate": start.isoformat(),
"toUpdatedDate": end.isoformat(),
"size": PAGE_SIZE,
"page": page,
}

data = self.fetch_page(params)
items = data.get("items") or []
count = len(items)

if count == 0:
self.log(f"no results for {start}–{end} on page {page}, stopping")
break

page_number = page + 1
self.write_page_file(
year=year,
month=month,
day_token=day_token,
page_number=page_number,
payload=data,
)

if count < PAGE_SIZE:
self.log(f"finished {start}–{end} at page {page} ({count} items)")
break

page += 1

def write_page_file(
self,
year: int,
month: int,
day_token: str,
page_number: int,
payload: Dict[str, Any],
) -> None:
year_str = f"{year:04d}"
month_str = f"{month:02d}"
page_str = f"{page_number:04d}"

dir_path = PAGE_DIRECTORY / year_str / month_str
dir_path.mkdir(parents=True, exist_ok=True)

filename = f"page{day_token}-{page_str}.json"
path = dir_path / filename

if path.exists():
self.log(f"skip existing file: {path}")
return
Comment on lines +172 to +174

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we skip updating the existing file?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We skip existing files to make the sync idempotent and resumable. If the script is interrupted or re-run, it won't overwrite data that was already successfully saved. Since the data is fetched using the dateUpdated param and updates are forward-only, which makes skipping previously written files safe.


with path.open("w", encoding="utf-8") as output:
json.dump(payload, output, indent=2)

self.log(f"saved {path}")

def fetch_page(self, params: Dict[str, Any]) -> Dict[str, Any]:
self.log(f"GET {API_URL} {params}")
try:
response = self.session.get(
API_URL,
params=params,
headers=HEADERS,
timeout=REQUEST_TIMEOUT,
)
response.raise_for_status()
data: Any = response.json()
if not isinstance(data, dict):
return {}
return data
except requests.exceptions.RequestException as e:
self.log(f"Request failed: {e}")
raise

def log(self, message: str) -> None:
now = datetime.now(timezone.utc).astimezone()
stamp = now.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
print(f"{stamp} {message}")


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="EUVD Catalog Mirror")
parser.add_argument(
"mode",
nargs="?",
default="daily",
choices=["backfill", "daily"],
help="Sync mode: 'backfill' for full history, 'daily' for yesterday only",
)
args = parser.parse_args()

mirror = EuvdCatalogMirror()
mirror.mode = args.mode

status_code, error_message = mirror.execute()
if error_message:
print(error_message)
sys.exit(status_code)