diff --git a/src/linkml_map/cli/cli.py b/src/linkml_map/cli/cli.py index 6908155..fcec6de 100644 --- a/src/linkml_map/cli/cli.py +++ b/src/linkml_map/cli/cli.py @@ -39,11 +39,18 @@ output_option = click.option("-o", "--output", help="Output file.") schema_option = click.option("-s", "--schema", help="Path to source schema.") transformer_specification_option = click.option( - "-T", "--transformer-specification", help="Path to transformer specification." + "-T", + "--transformer-specification", + multiple=True, + help="Path to transformer specification file or directory. Can be repeated.", ) target_schema_option = click.option( "--target-schema", help="Path to target schema (required for nested object_derivations)." ) +entity_option = click.option( + "--entity", + help="Only process class_derivations matching this class name.", +) logger = logging.getLogger(__name__) @@ -71,6 +78,7 @@ def main(verbose: int, quiet: bool) -> None: @transformer_specification_option @schema_option @target_schema_option +@entity_option @click.option("--source-type", help="Source type/class name for the input data.") @click.option( "--unrestricted-eval/--no-unrestricted-eval", @@ -104,18 +112,26 @@ def main(verbose: int, quiet: bool) -> None: default=False, help="Continue processing when a row fails to transform. Report errors at end.", ) +@click.option( + "--emit-spec", + type=click.Path(dir_okay=False, writable=True), + default=None, + help="Write the resolved (merged + filtered) spec to this file path as a side-effect.", +) @click.argument("input_data") def map_data( input_data: str, schema: str, source_type: str | None, - transformer_specification: str, + transformer_specification: tuple[str, ...], output: str | None, output_format: str | None, chunk_size: int, additional_output: tuple, continue_on_error: bool = False, target_schema: str | None = None, + entity: str | None = None, + emit_spec: str | None = None, **kwargs: dict[str, Any], ) -> None: """ @@ -133,11 +149,14 @@ def map_data( # Single YAML file (original behavior) linkml-map map-data -T transform.yaml -s schema.yaml data.yaml - # Single TSV file - linkml-map map-data -T transform.yaml -s schema.yaml --source-type Person people.tsv + # Multiple spec files merged at load time + linkml-map map-data -T enums.yaml -T classes.yaml -s schema.yaml data/ + + # Directory of spec files + linkml-map map-data -T specs/ -s schema.yaml data/ - # Directory of TSV files with streaming output - linkml-map map-data -T transform.yaml -s schema.yaml -f jsonl -o output.jsonl ./data/ + # Filter to a single entity class + linkml-map map-data -T specs/ --entity Person -s schema.yaml data/ # Multi-output: write TSV, JSON, and JSONL simultaneously linkml-map map-data -T transform.yaml -s schema.yaml -f jsonl -O out.tsv -O out.json input.tsv @@ -173,6 +192,8 @@ def map_data( additional_output=additional_output, target_schema=target_schema, continue_on_error=continue_on_error, + entity=entity, + emit_spec=emit_spec, **kwargs, ) else: @@ -186,28 +207,60 @@ def map_data( output_format=output_format, target_schema=target_schema, continue_on_error=continue_on_error, + entity=entity, + emit_spec=emit_spec, **kwargs, ) +def _load_specs(tr: ObjectTransformer, transformer_specification: tuple[str, ...]) -> None: + """Load one or more transformer specification files into the transformer.""" + if len(transformer_specification) == 1: + path = Path(transformer_specification[0]) + if path.is_file(): + tr.load_transformer_specification(path) + return + tr.load_transformer_specifications(transformer_specification) + + +def _emit_spec_to_file(tr: ObjectTransformer, emit_spec: str, entity: str | None) -> None: + """Write the resolved specification to a file, optionally filtered by entity.""" + from linkml_runtime.dumpers import yaml_dumper + + spec = tr.specification + if entity: + from copy import deepcopy + + spec = deepcopy(spec) + spec.class_derivations = [cd for cd in spec.class_derivations if cd.name == entity] + with open(emit_spec, "w", encoding="utf-8") as f: + f.write(yaml_dumper.dumps(spec)) + logger.info("Wrote resolved spec to %s", emit_spec) + + def _map_data_single( input_data: str, schema: str, source_type: str | None, - transformer_specification: str, + transformer_specification: tuple[str, ...], output: str | None, output_format: str, target_schema: str | None = None, continue_on_error: bool = False, + entity: str | None = None, + emit_spec: str | None = None, **kwargs: dict[str, Any], ) -> None: """Original single-object transformation logic.""" tr = ObjectTransformer(**kwargs) tr.source_schemaview = SchemaView(schema) - tr.load_transformer_specification(transformer_specification) + _load_specs(tr, transformer_specification) if target_schema: tr.target_schemaview = SchemaView(target_schema) + if emit_spec: + _emit_spec_to_file(tr, emit_spec, entity) + # Load input data (YAML or JSON) with open(input_data) as file: content = file.read() @@ -255,23 +308,28 @@ def _map_data_streaming( input_path: Path, schema: str, source_type: str | None, - transformer_specification: str, + transformer_specification: tuple[str, ...], output: str | None, output_format: str, chunk_size: int, additional_output: tuple = (), target_schema: str | None = None, continue_on_error: bool = False, + entity: str | None = None, + emit_spec: str | None = None, **kwargs: dict[str, Any], ) -> None: """Streaming transformation for tabular/directory input.""" # Initialize transformer tr = ObjectTransformer(**kwargs) tr.source_schemaview = SchemaView(schema) - tr.load_transformer_specification(transformer_specification) + _load_specs(tr, transformer_specification) if target_schema: tr.target_schemaview = SchemaView(target_schema) + if emit_spec: + _emit_spec_to_file(tr, emit_spec, entity) + # Initialize data loader data_loader = DataLoader(input_path) @@ -280,7 +338,7 @@ def _map_data_streaming( on_error = errors.append if continue_on_error else None # Create transform iterator and chunk it - transform_iter = transform_spec(tr, data_loader, source_type, on_error=on_error) + transform_iter = transform_spec(tr, data_loader, source_type, on_error=on_error, entity=entity) chunks = chunked(transform_iter, chunk_size) # Resolve output format @@ -339,7 +397,7 @@ def _map_data_streaming( @click.option("--target", default="python", show_default=True, help="Target representation.") def compile( schema: str, - transformer_specification: str, + transformer_specification: tuple[str, ...], target: str, output: str | None, **kwargs: Any, @@ -363,7 +421,7 @@ def compile( raise NotImplementedError(msg) tr = ObjectTransformer() tr.source_schemaview = sv - tr.load_transformer_specification(transformer_specification) + _load_specs(tr, transformer_specification) result = compiler.compile(tr.specification) # dump as-is, no encoding dump_output(result.serialization, None, output) @@ -375,7 +433,7 @@ def compile( @click.argument("schema") def derive_schema( schema: str, - transformer_specification: str, + transformer_specification: tuple[str, ...], output: str | None, **kwargs: Any, ) -> None: @@ -395,7 +453,7 @@ def derive_schema( """ logger.info(f"Transforming {schema} using {transformer_specification}") tr = ObjectTransformer() - tr.load_transformer_specification(transformer_specification) + _load_specs(tr, transformer_specification) mapper = SchemaMapper(transformer=tr) mapper.source_schemaview = SchemaView(schema) target_schema = mapper.derive_schema() @@ -409,7 +467,7 @@ def derive_schema( @click.argument("schema") def invert( schema: str, - transformer_specification: str, + transformer_specification: tuple[str, ...], output: str | None, **kwargs: Any, ) -> None: @@ -422,7 +480,7 @@ def invert( """ logger.info(f"Inverting {transformer_specification} using {schema} as source") tr = ObjectTransformer() - tr.load_transformer_specification(transformer_specification) + _load_specs(tr, transformer_specification) inverter = TransformationSpecificationInverter( source_schemaview=SchemaView(schema), **kwargs, @@ -432,9 +490,25 @@ def invert( @main.command(name="validate-spec") +@entity_option +@click.option( + "--merge", + is_flag=True, + default=False, + help="Merge all spec files into one before validating. Supports directories.", +) +@click.option( + "--emit-spec", + type=click.Path(dir_okay=False), + default=None, + help="Write the resolved (merged + filtered) spec to a file path. Use '-' for stdout.", +) @click.argument("spec_files", nargs=-1, required=True, type=click.Path(exists=True)) def validate_spec_cmd( spec_files: tuple[str, ...], + entity: str | None = None, + merge: bool = False, + emit_spec: str | None = None, ) -> None: """Validate transformation specification YAML files. @@ -446,12 +520,34 @@ def validate_spec_cmd( linkml-map validate-spec my-transform.yaml linkml-map validate-spec specs/*.yaml + + linkml-map validate-spec --merge --entity Person --emit-spec resolved.yaml specs/ + """ + if merge: + _validate_spec_merged(spec_files, entity=entity, emit_spec=emit_spec) + else: + if entity or emit_spec: + click.echo("--entity and --emit-spec require --merge", err=True) + raise SystemExit(1) + _validate_spec_individual(spec_files) + + +def _validate_spec_individual(spec_files: tuple[str, ...]) -> None: + """Validate each spec file independently. + + Directories are expanded to their contained YAML files. """ + from linkml_map.utils.spec_merge import resolve_spec_paths from linkml_map.validator import validate_spec_file + resolved = resolve_spec_paths(spec_files) + if not resolved: + click.echo("No YAML files found in the provided paths", err=True) + raise SystemExit(1) + has_errors = False - for path in spec_files: - errors = validate_spec_file(path) + for path in resolved: + errors = validate_spec_file(str(path)) if errors: has_errors = True click.echo(f"{path}:", err=True) @@ -464,6 +560,61 @@ def validate_spec_cmd( raise SystemExit(1) +def _validate_spec_merged( + spec_files: tuple[str, ...], + entity: str | None = None, + emit_spec: str | None = None, +) -> None: + """Merge spec files, validate the result, optionally emit.""" + from linkml_map.utils.spec_merge import load_and_merge_specs + from linkml_map.validator import validate_spec + + merged = load_and_merge_specs(spec_files) + + # Apply entity filter before validation so we only validate what + # map-data --entity would actually execute. + if entity: + cd = merged.get("class_derivations") + if isinstance(cd, list): + merged["class_derivations"] = [ + item + for item in cd + if isinstance(item, dict) + and ( + item.get("name") == entity # expanded format + or (len(item) == 1 and entity in item) # compact-key format + ) + ] + elif isinstance(cd, dict): + merged["class_derivations"] = {k: v for k, v in cd.items() if k == entity} + + errors = validate_spec(merged) + if errors: + click.echo("Merged spec validation errors:", err=True) + for error in errors: + click.echo(f" {error}", err=True) + raise SystemExit(1) + + click.echo("Merged spec: ok") + + if emit_spec: + from linkml_map.transformer.transformer import Transformer + + Transformer._normalize_spec_dict(merged) + from linkml_map.datamodel.transformer_model import TransformationSpecification + + spec = TransformationSpecification(**merged) + from linkml_runtime.dumpers import yaml_dumper + + spec_yaml = yaml_dumper.dumps(spec) + if emit_spec == "-": + click.echo(spec_yaml) + else: + with open(emit_spec, "w", encoding="utf-8") as f: + f.write(spec_yaml) + click.echo(f"Wrote resolved spec to {emit_spec}") + + def dump_output( output_data: dict[str, Any] | list[Any] | str, output_format: str | None = None, diff --git a/src/linkml_map/transformer/engine.py b/src/linkml_map/transformer/engine.py index 9a4c418..6cb6d33 100644 --- a/src/linkml_map/transformer/engine.py +++ b/src/linkml_map/transformer/engine.py @@ -23,6 +23,7 @@ def transform_spec( data_loader: DataLoader, source_type: str | None = None, on_error: Callable[[TransformationError], None] | None = None, + entity: str | None = None, ) -> Iterator[dict[str, Any]]: """ Iterate class_derivation blocks and stream transformed rows. @@ -42,6 +43,8 @@ def transform_spec( :class:`TransformationError` is caught, enriched with row context, and passed to the callback. When ``None`` (default), errors propagate immediately (fail-fast). + :param entity: Optional class name filter. When provided, only + top-level class_derivations whose ``name`` matches are processed. :returns: Iterator of transformed row dicts. """ spec = transformer.derived_specification @@ -52,6 +55,8 @@ def transform_spec( transformer.lookup_index = LookupIndex() for class_deriv in spec.class_derivations: + if entity and class_deriv.name != entity: + continue table_name = class_deriv.populated_from or class_deriv.name if table_name not in data_loader: logger.debug("Skipping class_derivation %s: no data found", class_deriv.name) diff --git a/src/linkml_map/transformer/transformer.py b/src/linkml_map/transformer/transformer.py index 04cf07e..40519fb 100644 --- a/src/linkml_map/transformer/transformer.py +++ b/src/linkml_map/transformer/transformer.py @@ -109,6 +109,22 @@ def load_transformer_specification(self, path: str | Path) -> None: self._normalize_spec_dict(obj) self.specification = TransformationSpecification(**obj) + def load_transformer_specifications(self, paths: tuple[str | Path, ...]) -> None: + """Load and merge multiple transformation spec files into a single specification. + + Accepts file paths and/or directories. Directories are recursively + searched for YAML files. All specs are merged (class_derivations + appended, enum/slot_derivations unioned by name) and the result is + set as ``self.specification``. + + :param paths: One or more file or directory paths. + """ + from linkml_map.utils.spec_merge import load_and_merge_specs + + obj = load_and_merge_specs(paths) + self._normalize_spec_dict(obj) + self.specification = TransformationSpecification(**obj) + @classmethod def normalize_transform_spec(cls, obj: dict[str, Any], normalizer: ReferenceValidator) -> dict: """ diff --git a/src/linkml_map/utils/spec_merge.py b/src/linkml_map/utils/spec_merge.py new file mode 100644 index 0000000..d7b9257 --- /dev/null +++ b/src/linkml_map/utils/spec_merge.py @@ -0,0 +1,170 @@ +"""Utilities for loading and merging multiple transformation specification files. + +Supports loading specs from multiple files or directories, handling both +standard ``TransformationSpecification`` dicts and the compact list-of-blocks +format used by per-variable sub-specs:: + + # Standard format + class_derivations: + - EntityName: + populated_from: ... + + # List-of-blocks format (each item is a partial spec) + - class_derivations: + EntityName: + populated_from: ... +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Any + +import yaml + +logger = logging.getLogger(__name__) + + +def resolve_spec_paths(paths: tuple[str | Path, ...]) -> list[Path]: + """Resolve a mix of file paths and directories to a flat list of YAML files. + + Directories are recursively searched for ``*.yaml`` and ``*.yml`` files. + Files are included directly. + + :param paths: File paths or directory paths. + :returns: Resolved YAML file paths in input order, with files discovered + within each directory sorted before being appended. + :raises FileNotFoundError: If a path does not exist. + """ + resolved: list[Path] = [] + for p in paths: + path = Path(p) + if not path.exists(): + msg = f"Spec path does not exist: {path}" + raise FileNotFoundError(msg) + if path.is_dir(): + resolved.extend(sorted([*path.rglob("*.yaml"), *path.rglob("*.yml")])) + else: + resolved.append(path) + return resolved + + +def load_spec_file(path: Path) -> list[dict[str, Any]]: + """Load a YAML spec file and return a list of spec dicts. + + Handles two formats: + + 1. A single dict (standard ``TransformationSpecification``). + 2. A YAML list of partial spec dicts (compact sub-spec format). + + :param path: Path to the YAML file. + :returns: A list of one or more spec dicts. + """ + with open(path) as f: + data = yaml.safe_load(f) + + if isinstance(data, dict): + return [data] + if isinstance(data, list): + return [item for item in data if isinstance(item, dict)] + + logger.warning("Skipping %s: expected dict or list, got %s", path, type(data).__name__) + return [] + + +def merge_spec_dicts(spec_dicts: list[dict[str, Any]]) -> dict[str, Any]: + """Merge multiple spec dicts into a single TransformationSpecification dict. + + Merge strategy: + + - ``class_derivations``: appended in order (list or dict values are + accumulated into a single list). + - ``enum_derivations``: merged by name (dict union). Raises on duplicate + enum names with conflicting definitions. + - ``slot_derivations``: merged by name (dict union). Raises on duplicate + slot names with conflicting definitions. + - Scalar fields (``title``, ``source_schema``, etc.): first non-None value + wins. + + :param spec_dicts: A list of raw spec dicts to merge. + :returns: A single merged spec dict. + :raises ValueError: If enum or slot derivations conflict on the same name. + """ + if not spec_dicts: + return {} + if len(spec_dicts) == 1: + return spec_dicts[0] + + merged: dict[str, Any] = {} + merged_class_derivations: list = [] + merged_enum_derivations: dict[str, Any] = {} + merged_slot_derivations: dict[str, Any] = {} + + _COLLECTION_KEYS = {"class_derivations", "enum_derivations", "slot_derivations"} + + for spec in spec_dicts: + # Accumulate class_derivations + cd = spec.get("class_derivations") + if cd is not None: + if isinstance(cd, list): + merged_class_derivations.extend(cd) + elif isinstance(cd, dict): + for name, body in cd.items(): + merged_class_derivations.append({name: body} if body else {name: {}}) + + # Union enum_derivations by name + ed = spec.get("enum_derivations") + if isinstance(ed, dict): + for name, body in ed.items(): + if name in merged_enum_derivations and merged_enum_derivations[name] != body: + msg = f"Conflicting enum_derivations for '{name}'" + raise ValueError(msg) + merged_enum_derivations[name] = body + + # Union slot_derivations by name + sd = spec.get("slot_derivations") + if isinstance(sd, dict): + for name, body in sd.items(): + if name in merged_slot_derivations and merged_slot_derivations[name] != body: + msg = f"Conflicting slot_derivations for '{name}'" + raise ValueError(msg) + merged_slot_derivations[name] = body + + # Scalar fields: first non-None wins + for key, value in spec.items(): + if key not in _COLLECTION_KEYS and key not in merged and value is not None: + merged[key] = value + + if merged_class_derivations: + merged["class_derivations"] = merged_class_derivations + if merged_enum_derivations: + merged["enum_derivations"] = merged_enum_derivations + if merged_slot_derivations: + merged["slot_derivations"] = merged_slot_derivations + + return merged + + +def load_and_merge_specs(paths: tuple[str | Path, ...]) -> dict[str, Any]: + """Load spec files from paths/directories and merge into a single spec dict. + + :param paths: File paths or directory paths to load. + :returns: A single merged spec dict. + :raises FileNotFoundError: If a path does not exist. + :raises ValueError: If no YAML files are found or derivations conflict. + """ + file_paths = resolve_spec_paths(paths) + if not file_paths: + msg = "No YAML files found in the provided paths" + raise ValueError(msg) + + all_dicts: list[dict[str, Any]] = [] + for fp in file_paths: + all_dicts.extend(load_spec_file(fp)) + + if not all_dicts: + msg = "No valid spec dicts found in the provided files" + raise ValueError(msg) + + return merge_spec_dicts(all_dicts) diff --git a/tests/test_cli/test_cli_multi_spec.py b/tests/test_cli/test_cli_multi_spec.py new file mode 100644 index 0000000..86f27f2 --- /dev/null +++ b/tests/test_cli/test_cli_multi_spec.py @@ -0,0 +1,400 @@ +"""Integration tests for multi-spec loading, --entity filter, and --emit-spec.""" + +from pathlib import Path + +import pytest +import yaml +from click.testing import CliRunner + +from linkml_map.cli.cli import main + +TABULAR_TEST_DIR = Path(__file__).parent.parent / "input" / "examples" / "tabular" +TABULAR_SOURCE_SCHEMA = TABULAR_TEST_DIR / "source" / "person_flat.yaml" +TABULAR_TRANSFORM = TABULAR_TEST_DIR / "transform" / "person_to_agent.transform.yaml" + + +@pytest.fixture +def runner() -> CliRunner: + return CliRunner(mix_stderr=False) + + +@pytest.fixture +def sample_tsv(tmp_path: Path) -> Path: + tsv = tmp_path / "Person.tsv" + tsv.write_text( + "id\tname\tprimary_email\tage_in_years\tgender\n" + "P:001\tAlice\talice@example.com\t30\tcisgender woman\n" + "P:002\tBob\tbob@example.com\t25\tcisgender man\n" + ) + return tsv + + +@pytest.fixture +def simple_schema(tmp_path: Path) -> Path: + schema = { + "id": "https://example.org/test", + "name": "test", + "prefixes": {"linkml": "https://w3id.org/linkml/"}, + "imports": ["linkml:types"], + "default_range": "string", + "classes": { + "Person": { + "attributes": { + "id": {"identifier": True}, + "name": {}, + }, + }, + "Org": { + "attributes": { + "id": {"identifier": True}, + "title": {}, + }, + }, + }, + } + p = tmp_path / "schema.yaml" + p.write_text(yaml.dump(schema)) + return p + + +@pytest.fixture +def split_specs(tmp_path: Path) -> Path: + """Create split spec files: one for Person, one for Org.""" + spec_dir = tmp_path / "specs" + spec_dir.mkdir() + (spec_dir / "person.yaml").write_text( + yaml.dump( + { + "class_derivations": { + "Person": { + "populated_from": "Person", + "slot_derivations": { + "id": {}, + "name": {}, + }, + } + } + } + ) + ) + (spec_dir / "org.yaml").write_text( + yaml.dump( + { + "class_derivations": { + "Org": { + "populated_from": "Org", + "slot_derivations": { + "id": {}, + "title": {}, + }, + } + } + } + ) + ) + return spec_dir + + +# --- Multi-T tests --- + + +def test_multi_t_single_file(runner: CliRunner, sample_tsv: Path) -> None: + """Single -T still works.""" + result = runner.invoke( + main, + [ + "map-data", + "-T", + str(TABULAR_TRANSFORM), + "-s", + str(TABULAR_SOURCE_SCHEMA), + "--source-type", + "Person", + "-f", + "yaml", + str(sample_tsv), + ], + ) + assert result.exit_code == 0, result.stderr + + +def test_multi_t_directory( + runner: CliRunner, + split_specs: Path, + simple_schema: Path, + tmp_path: Path, +) -> None: + """Passing a directory as -T loads and merges all specs.""" + tsv = tmp_path / "Person.tsv" + tsv.write_text("id\tname\nP:001\tAlice\n") + result = runner.invoke( + main, + [ + "map-data", + "-T", + str(split_specs), + "-s", + str(simple_schema), + "--source-type", + "Person", + "-f", + "yaml", + str(tsv), + ], + ) + assert result.exit_code == 0, result.stderr + + +def test_multi_t_multiple_files( + runner: CliRunner, + split_specs: Path, + simple_schema: Path, + tmp_path: Path, +) -> None: + """Multiple -T flags merge specs.""" + tsv = tmp_path / "Person.tsv" + tsv.write_text("id\tname\nP:001\tAlice\n") + result = runner.invoke( + main, + [ + "map-data", + "-T", + str(split_specs / "person.yaml"), + "-T", + str(split_specs / "org.yaml"), + "-s", + str(simple_schema), + "--source-type", + "Person", + "-f", + "yaml", + str(tsv), + ], + ) + assert result.exit_code == 0, result.stderr + + +# --- --entity tests --- + + +def test_entity_filter_streaming( + runner: CliRunner, + split_specs: Path, + simple_schema: Path, + tmp_path: Path, +) -> None: + """--entity filters to only the specified class_derivation.""" + data_dir = tmp_path / "data" + data_dir.mkdir() + (data_dir / "Person.tsv").write_text("id\tname\nP:001\tAlice\n") + (data_dir / "Org.tsv").write_text("id\ttitle\nO:001\tAcme\n") + result = runner.invoke( + main, + [ + "map-data", + "-T", + str(split_specs), + "-s", + str(simple_schema), + "--entity", + "Person", + "-f", + "jsonl", + str(data_dir), + ], + ) + assert result.exit_code == 0, result.stderr + lines = [line for line in result.output.strip().split("\n") if line] + assert len(lines) == 1 + import json + + obj = json.loads(lines[0]) + assert obj["name"] == "Alice" + + +def test_entity_filter_excludes_other( + runner: CliRunner, + split_specs: Path, + simple_schema: Path, + tmp_path: Path, +) -> None: + """--entity Org should not produce Person records.""" + data_dir = tmp_path / "data" + data_dir.mkdir() + (data_dir / "Person.tsv").write_text("id\tname\nP:001\tAlice\n") + (data_dir / "Org.tsv").write_text("id\ttitle\nO:001\tAcme\n") + result = runner.invoke( + main, + [ + "map-data", + "-T", + str(split_specs), + "-s", + str(simple_schema), + "--entity", + "Org", + "-f", + "jsonl", + str(data_dir), + ], + ) + assert result.exit_code == 0, result.stderr + lines = [line for line in result.output.strip().split("\n") if line] + assert len(lines) == 1 + import json + + obj = json.loads(lines[0]) + assert obj["title"] == "Acme" + + +# --- --emit-spec tests --- + + +def test_emit_spec_on_map_data( + runner: CliRunner, + split_specs: Path, + simple_schema: Path, + tmp_path: Path, +) -> None: + """--emit-spec writes the resolved spec to a file.""" + tsv = tmp_path / "Person.tsv" + tsv.write_text("id\tname\nP:001\tAlice\n") + emit_path = tmp_path / "resolved.yaml" + result = runner.invoke( + main, + [ + "map-data", + "-T", + str(split_specs), + "-s", + str(simple_schema), + "--source-type", + "Person", + "--emit-spec", + str(emit_path), + "-f", + "yaml", + str(tsv), + ], + ) + assert result.exit_code == 0, result.stderr + assert emit_path.exists() + emitted = yaml.safe_load(emit_path.read_text()) + assert "class_derivations" in emitted + + +def test_emit_spec_with_entity_filter( + runner: CliRunner, + split_specs: Path, + simple_schema: Path, + tmp_path: Path, +) -> None: + """--emit-spec + --entity only includes the filtered class.""" + tsv = tmp_path / "Person.tsv" + tsv.write_text("id\tname\nP:001\tAlice\n") + emit_path = tmp_path / "resolved.yaml" + result = runner.invoke( + main, + [ + "map-data", + "-T", + str(split_specs), + "-s", + str(simple_schema), + "--entity", + "Person", + "--emit-spec", + str(emit_path), + "-f", + "yaml", + str(tsv), + ], + ) + assert result.exit_code == 0, result.stderr + emitted = yaml.safe_load(emit_path.read_text()) + cd_names = [cd["name"] for cd in emitted["class_derivations"]] + assert cd_names == ["Person"] + + +# --- validate-spec tests --- + + +def test_validate_spec_merge(runner: CliRunner, split_specs: Path) -> None: + """--merge validates all specs as a combined spec.""" + result = runner.invoke( + main, + ["validate-spec", "--merge", str(split_specs / "person.yaml"), str(split_specs / "org.yaml")], + ) + assert result.exit_code == 0, result.stderr + assert "ok" in result.output + + +def test_validate_spec_merge_emit_to_file(runner: CliRunner, split_specs: Path, tmp_path: Path) -> None: + """--merge --emit-spec PATH writes the resolved spec to a file.""" + emit_path = tmp_path / "resolved.yaml" + result = runner.invoke( + main, + [ + "validate-spec", + "--merge", + "--emit-spec", + str(emit_path), + str(split_specs / "person.yaml"), + str(split_specs / "org.yaml"), + ], + ) + assert result.exit_code == 0, result.stderr + assert emit_path.exists() + emitted = yaml.safe_load(emit_path.read_text()) + assert "class_derivations" in emitted + + +def test_validate_spec_merge_emit_stdout(runner: CliRunner, split_specs: Path) -> None: + """--merge --emit-spec - writes the resolved spec to stdout.""" + result = runner.invoke( + main, + [ + "validate-spec", + "--merge", + "--emit-spec", + "-", + str(split_specs / "person.yaml"), + str(split_specs / "org.yaml"), + ], + ) + assert result.exit_code == 0, result.stderr + output_after_ok = result.output.split("Merged spec: ok\n", 1)[1] + emitted = yaml.safe_load(output_after_ok) + assert "class_derivations" in emitted + + +def test_validate_spec_merge_emit_entity(runner: CliRunner, split_specs: Path, tmp_path: Path) -> None: + """--merge --emit-spec --entity filters the emitted spec.""" + emit_path = tmp_path / "resolved.yaml" + result = runner.invoke( + main, + [ + "validate-spec", + "--merge", + "--emit-spec", + str(emit_path), + "--entity", + "Person", + str(split_specs / "person.yaml"), + str(split_specs / "org.yaml"), + ], + ) + assert result.exit_code == 0, result.stderr + emitted = yaml.safe_load(emit_path.read_text()) + cd_names = [cd["name"] for cd in emitted["class_derivations"]] + assert cd_names == ["Person"] + + +def test_validate_spec_entity_without_merge_errors(runner: CliRunner, split_specs: Path) -> None: + """--entity without --merge should error.""" + result = runner.invoke( + main, + ["validate-spec", "--entity", "Person", str(split_specs / "person.yaml")], + ) + assert result.exit_code == 1 + assert "--merge" in result.stderr diff --git a/tests/test_utils/test_spec_merge.py b/tests/test_utils/test_spec_merge.py new file mode 100644 index 0000000..a85f479 --- /dev/null +++ b/tests/test_utils/test_spec_merge.py @@ -0,0 +1,196 @@ +"""Tests for the spec merge utilities.""" + +import pytest +import yaml + +from linkml_map.utils.spec_merge import ( + load_and_merge_specs, + load_spec_file, + merge_spec_dicts, + resolve_spec_paths, +) + + +class TestResolveSpecPaths: + def test_single_file(self, tmp_path): + f = tmp_path / "spec.yaml" + f.write_text("class_derivations: {}") + result = resolve_spec_paths((str(f),)) + assert result == [f] + + def test_directory(self, tmp_path): + (tmp_path / "a.yaml").write_text("{}") + (tmp_path / "b.yml").write_text("{}") + (tmp_path / "c.txt").write_text("{}") + result = resolve_spec_paths((str(tmp_path),)) + assert len(result) == 2 + names = {p.name for p in result} + assert names == {"a.yaml", "b.yml"} + + def test_recursive_directory(self, tmp_path): + sub = tmp_path / "sub" + sub.mkdir() + (tmp_path / "a.yaml").write_text("{}") + (sub / "b.yaml").write_text("{}") + result = resolve_spec_paths((str(tmp_path),)) + assert len(result) == 2 + + def test_nonexistent_path_raises(self): + with pytest.raises(FileNotFoundError): + resolve_spec_paths(("/no/such/path",)) + + def test_mixed_file_and_dir(self, tmp_path): + f = tmp_path / "spec.yaml" + f.write_text("{}") + sub = tmp_path / "sub" + sub.mkdir() + (sub / "other.yaml").write_text("{}") + result = resolve_spec_paths((str(f), str(sub))) + assert len(result) == 2 + + +class TestLoadSpecFile: + def test_dict_format(self, tmp_path): + f = tmp_path / "spec.yaml" + f.write_text(yaml.dump({"class_derivations": {"Foo": {"populated_from": "Bar"}}})) + result = load_spec_file(f) + assert len(result) == 1 + assert "class_derivations" in result[0] + + def test_list_format(self, tmp_path): + f = tmp_path / "spec.yaml" + content = [ + {"class_derivations": {"A": {"populated_from": "X"}}}, + {"class_derivations": {"B": {"populated_from": "Y"}}}, + ] + f.write_text(yaml.dump(content)) + result = load_spec_file(f) + assert len(result) == 2 + + def test_non_dict_items_skipped(self, tmp_path): + f = tmp_path / "spec.yaml" + f.write_text("- class_derivations:\n A:\n populated_from: X\n- just a string\n") + result = load_spec_file(f) + assert len(result) == 1 + + def test_scalar_yaml_returns_empty(self, tmp_path): + f = tmp_path / "spec.yaml" + f.write_text("just a string\n") + result = load_spec_file(f) + assert result == [] + + +class TestMergeSpecDicts: + def test_empty_list(self): + assert merge_spec_dicts([]) == {} + + def test_single_spec(self): + spec = {"class_derivations": [{"name": "Foo"}]} + assert merge_spec_dicts([spec]) is spec + + def test_class_derivations_appended(self): + s1 = {"class_derivations": [{"name": "A", "populated_from": "X"}]} + s2 = {"class_derivations": [{"name": "B", "populated_from": "Y"}]} + merged = merge_spec_dicts([s1, s2]) + assert len(merged["class_derivations"]) == 2 + + def test_class_derivations_dict_format(self): + s1 = {"class_derivations": {"A": {"populated_from": "X"}}} + s2 = {"class_derivations": {"B": {"populated_from": "Y"}}} + merged = merge_spec_dicts([s1, s2]) + assert len(merged["class_derivations"]) == 2 + + def test_enum_derivations_unioned(self): + s1 = {"enum_derivations": {"E1": {"populated_from": "SE1"}}} + s2 = {"enum_derivations": {"E2": {"populated_from": "SE2"}}} + merged = merge_spec_dicts([s1, s2]) + assert "E1" in merged["enum_derivations"] + assert "E2" in merged["enum_derivations"] + + def test_enum_derivations_duplicate_same_ok(self): + body = {"populated_from": "SE1"} + s1 = {"enum_derivations": {"E1": body}} + s2 = {"enum_derivations": {"E1": body}} + merged = merge_spec_dicts([s1, s2]) + assert merged["enum_derivations"]["E1"] == body + + def test_enum_derivations_conflict_raises(self): + s1 = {"enum_derivations": {"E1": {"populated_from": "A"}}} + s2 = {"enum_derivations": {"E1": {"populated_from": "B"}}} + with pytest.raises(ValueError, match="Conflicting enum_derivations"): + merge_spec_dicts([s1, s2]) + + def test_slot_derivations_unioned(self): + s1 = {"slot_derivations": {"s1": {"populated_from": "x"}}} + s2 = {"slot_derivations": {"s2": {"populated_from": "y"}}} + merged = merge_spec_dicts([s1, s2]) + assert "s1" in merged["slot_derivations"] + assert "s2" in merged["slot_derivations"] + + def test_slot_derivations_conflict_raises(self): + s1 = {"slot_derivations": {"s1": {"populated_from": "x"}}} + s2 = {"slot_derivations": {"s1": {"populated_from": "y"}}} + with pytest.raises(ValueError, match="Conflicting slot_derivations"): + merge_spec_dicts([s1, s2]) + + def test_scalar_first_wins(self): + s1 = {"title": "First", "class_derivations": []} + s2 = {"title": "Second", "class_derivations": []} + merged = merge_spec_dicts([s1, s2]) + assert merged["title"] == "First" + + def test_scalar_none_skipped(self): + s1 = {"class_derivations": []} + s2 = {"title": "Second", "class_derivations": []} + merged = merge_spec_dicts([s1, s2]) + assert merged["title"] == "Second" + + def test_mixed_list_and_dict_class_derivations(self): + s1 = {"class_derivations": [{"name": "A"}]} + s2 = {"class_derivations": {"B": {"populated_from": "Y"}}} + merged = merge_spec_dicts([s1, s2]) + assert len(merged["class_derivations"]) == 2 + + +class TestLoadAndMergeSpecs: + def test_single_file(self, tmp_path): + f = tmp_path / "spec.yaml" + f.write_text(yaml.dump({"class_derivations": {"Foo": {"populated_from": "Bar"}}})) + merged = load_and_merge_specs((str(f),)) + assert "class_derivations" in merged + + def test_directory_of_sub_specs(self, tmp_path): + """Simulate bdc-harmonized-variables sub-spec pattern.""" + (tmp_path / "measurement.yaml").write_text( + yaml.dump( + [ + {"class_derivations": {"MeasurementObservation": {"populated_from": "t1"}}}, + {"class_derivations": {"MeasurementObservation": {"populated_from": "t2"}}}, + ] + ) + ) + (tmp_path / "drug.yaml").write_text( + yaml.dump( + [ + {"class_derivations": {"DrugExposure": {"populated_from": "t3"}}}, + ] + ) + ) + merged = load_and_merge_specs((str(tmp_path),)) + assert len(merged["class_derivations"]) == 3 + + def test_separate_enum_file(self, tmp_path): + """Enum file merged with class derivation file.""" + (tmp_path / "enums.yaml").write_text(yaml.dump({"enum_derivations": {"StatusEnum": {"mirror_source": True}}})) + (tmp_path / "classes.yaml").write_text( + yaml.dump({"class_derivations": {"Person": {"populated_from": "people"}}}) + ) + merged = load_and_merge_specs((str(tmp_path),)) + assert "StatusEnum" in merged["enum_derivations"] + assert len(merged["class_derivations"]) == 1 + + def test_no_yaml_files_raises(self, tmp_path): + sub = tmp_path / "empty" + sub.mkdir() + with pytest.raises(ValueError, match="No YAML files"): + load_and_merge_specs((str(sub),))