Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,7 @@ FLAG_RETRIEVE_DEFAULT_JSON=0
# AWS_S3_ENDPOINT_URL=http://localhost:9000

# In production: set real AWS credentials and do not set AWS_S3_ENDPOINT_URL.

# Unpaywall API (optional) — required to use the Unpaywall metadata parser.
# Register any email at https://unpaywall.org/products/api
UNPAYWALL_EMAIL=eu_fact_force@gmail.com
84 changes: 0 additions & 84 deletions eu_fact_force/ingestion/data_collection/README.md

This file was deleted.

50 changes: 0 additions & 50 deletions eu_fact_force/ingestion/data_collection/__main__.py

This file was deleted.

84 changes: 75 additions & 9 deletions eu_fact_force/ingestion/data_collection/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,85 @@

from eu_fact_force.ingestion.data_collection.parsers import PARSERS

FIELD_ORDER = [
"found",
"sources",
"title",
"authors",
"journal",
"publication date",
"status",
"doi",
"link",
"document type",
"document subtypes",
"cited by count",
"open access",
"language",
"abstract",
"keywords",
"cited articles",
]

logger = logging.getLogger(__name__)


def _better(new, current):
"""Return True if new is a longer list or string than current."""
if isinstance(new, (list, str)) and isinstance(new, type(current)):
def _better(new, current) -> bool:
"""Return True if new is more complete than current."""
if current is None:
return True
if isinstance(new, list) and isinstance(current, list):
return sum(v is not None for v in new) > sum(v is not None for v in current)
if isinstance(new, str) and isinstance(current, str):
return len(new) > len(current)
return False


def _merge_authors(current: list, update: list) -> list:
"""Merge two author lists by name, preserving orcid associations."""
orcid_by_name = {a["name"]: a["orcid"] for a in update if a.get("orcid")}
base = current if len(current) >= len(update) else update
return [{"name": a["name"], "orcid": orcid_by_name.get(a["name"]) or a.get("orcid")} for a in base]


def _is_pmc_link(url: str) -> bool:
return "ncbi.nlm.nih.gov/pmc" in url


def _is_doi_link(url: str) -> bool:
return "doi.org" in url


def _doi_count(refs: list) -> int:
return sum(1 for r in refs if isinstance(r, str) and r.startswith("10."))


def _merge(merged: dict, update: dict) -> None:
"""Merge update into merged, keeping the most complete value per field, except for specific field :
- for author : merge by name, preserving orcid associations.
- for link : prefer non-PMC links.
- for cited articles : prefer the one with more DOIs.
"""
for key, value in update.items():
if value is None:
continue
current = merged.get(key)
if key == "authors" and isinstance(value, list) and isinstance(current, list):
merged[key] = _merge_authors(current, value)
elif key == "link" and isinstance(value, str) and isinstance(current, str):
if _is_pmc_link(current) and not _is_pmc_link(value):
merged[key] = value
elif _is_doi_link(value) and not _is_doi_link(current):
merged[key] = value
elif key == "cited articles" and isinstance(value, list) and isinstance(current, list):
if _doi_count(value) > _doi_count(current):
merged[key] = value
elif isinstance(value, dict) and isinstance(current, dict):
_merge(merged[key], value)
elif _better(value, current):
merged[key] = value


def fetch_all(doi: str) -> dict:
"""Query all parsers for a DOI and merge results, keeping the most complete value per field."""
merged = {}
Expand All @@ -26,9 +95,6 @@ def fetch_all(doi: str) -> dict:
if not result.get("found"):
continue
sources.append(parser.__class__.__name__)
for field, value in result.items():
if field == "found" or value is None:
continue
if field not in merged or _better(value, merged[field]):
merged[field] = value
return {"found": bool(sources), "sources": sources} | merged
_merge(merged, result)
result = {"found": bool(sources), "sources": sources} | merged
return {k: result.get(k) for k in FIELD_ORDER} | {k: v for k, v in result.items() if k not in FIELD_ORDER}
2 changes: 2 additions & 0 deletions eu_fact_force/ingestion/data_collection/parsers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
from eu_fact_force.ingestion.data_collection.parsers.hal import HALMetadataParser
from eu_fact_force.ingestion.data_collection.parsers.openalex import OpenAlexMetadataParser
from eu_fact_force.ingestion.data_collection.parsers.pubmed import PubMedMetadataParser
from eu_fact_force.ingestion.data_collection.parsers.unpaywall import UnpaywallMetadataParser

PARSERS = [
CrossrefMetadataParser(),
OpenAlexMetadataParser(),
PubMedMetadataParser(),
HALMetadataParser(),
ArxivMetadataParser(),
UnpaywallMetadataParser(),
]
39 changes: 18 additions & 21 deletions eu_fact_force/ingestion/data_collection/parsers/arxiv.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import arxiv

from eu_fact_force.ingestion.data_collection.parsers.base import MetadataParser

ARXIV_DOI_PREFIX = "10.48550/arXiv."
Expand All @@ -10,16 +9,20 @@ class ArxivMetadataParser(MetadataParser):

def __init__(self):
super().__init__()
self.api_name = "arxiv"
self.client = arxiv.Client()
self._cache = {}

def _search(self, doi: str):
if doi in self._cache:
return self._cache[doi]

if doi.startswith(ARXIV_DOI_PREFIX):
search = arxiv.Search(id_list=[doi[len(ARXIV_DOI_PREFIX):]])
else:
search = arxiv.Search(query=f"doi:{doi}", max_results=1)
results = list(self.client.results(search))
return results[0] if results else None
self._cache[doi] = results[0] if results else None
return self._cache[doi]

def get_metadata(self, doi: str) -> dict:
article = self._search(doi)
Expand All @@ -29,29 +32,23 @@ def get_metadata(self, doi: str) -> dict:
"found": True,
"title": article.title,
"authors": [{"name": str(a), "orcid": None} for a in article.authors],
"journal": article.journal_ref,
"publish date": str(article.published)[:10],
"link": next(
(link.href for link in article.links if link.rel == "alternate"), None
),
"keywords": None,
"cited articles": None,
"doi": doi,
"document type": None,
"open access": True,
"journal": {"name": article.journal_ref, "issn": None},
"publication date": str(article.published)[:10],
"status": f"updated on {str(article.updated)[:10]}"
if article.updated != article.published
else "published",
"doi": doi,
"link": next((link.href for link in article.links if link.rel == "alternate"), None),
"document type": None,
"document subtypes": None,
"open access": True,
"language": None,
"cited by count": None,
"abstract": article.summary,
"keywords": None,
"cited articles": None,
}

def get_pdf_url(self, doi: str) -> list[str]:
article = self._search(doi)
return [article.pdf_url] if article else []


if __name__ == "__main__":
import json

parser = ArxivMetadataParser()
metadata = parser.get_metadata("10.48550/arXiv.2603.06740")
print(json.dumps(metadata, indent=2, ensure_ascii=False))
10 changes: 3 additions & 7 deletions eu_fact_force/ingestion/data_collection/parsers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class MetadataParser(ABC):
"""Base class for all metadata parsers."""

def __init__(self):
self.api_name = None
self.logger = logging.getLogger(self.__class__.__name__)

@abstractmethod
Expand All @@ -39,9 +38,7 @@ def _fetch_pdf_content(self, url: str) -> bytes | None:
response = requests.get(url, timeout=30)
response.raise_for_status()
if not response.content.startswith(b"%PDF"):
self.logger.warning(
f"Content at {url} is not a valid PDF (possibly a paywall page)."
)
self.logger.warning(f"Content at {url} is not a valid PDF (possibly a paywall page).")
return None
return response.content

Expand All @@ -57,12 +54,11 @@ def _save_pdf(self, path: str, content: bytes) -> None:
with open(path, "wb") as f:
f.write(content)
else:
self.logger.info(
f"Skipping {path}: existing file is already as large or larger."
)
self.logger.info(f"Skipping {path}: existing file is already as large or larger.")

def download_pdf(self, doi: str, output_dir: str = "pdf") -> bool:
"""Download the first valid PDF found and save it to output_dir. Returns True on success."""
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f"{doi_to_id(doi)}.pdf")
for url in self.get_pdf_url(doi):
try:
Expand Down
Loading
Loading