Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 170 additions & 19 deletions src/linkml_map/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,18 @@
output_option = click.option("-o", "--output", help="Output file.")
schema_option = click.option("-s", "--schema", help="Path to source schema.")
transformer_specification_option = click.option(
"-T", "--transformer-specification", help="Path to transformer specification."
"-T",
"--transformer-specification",
multiple=True,
help="Path to transformer specification file or directory. Can be repeated.",
)
target_schema_option = click.option(
"--target-schema", help="Path to target schema (required for nested object_derivations)."
)
entity_option = click.option(
"--entity",
help="Only process class_derivations matching this class name.",
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -71,6 +78,7 @@ def main(verbose: int, quiet: bool) -> None:
@transformer_specification_option
@schema_option
@target_schema_option
@entity_option
@click.option("--source-type", help="Source type/class name for the input data.")
@click.option(
"--unrestricted-eval/--no-unrestricted-eval",
Expand Down Expand Up @@ -104,18 +112,26 @@ def main(verbose: int, quiet: bool) -> None:
default=False,
help="Continue processing when a row fails to transform. Report errors at end.",
)
@click.option(
"--emit-spec",
type=click.Path(dir_okay=False, writable=True),
default=None,
help="Write the resolved (merged + filtered) spec to this file path as a side-effect.",
)
@click.argument("input_data")
def map_data(
input_data: str,
schema: str,
source_type: str | None,
transformer_specification: str,
transformer_specification: tuple[str, ...],
output: str | None,
output_format: str | None,
chunk_size: int,
additional_output: tuple,
continue_on_error: bool = False,
target_schema: str | None = None,
entity: str | None = None,
emit_spec: str | None = None,
**kwargs: dict[str, Any],
) -> None:
"""
Expand All @@ -133,11 +149,14 @@ def map_data(
# Single YAML file (original behavior)
linkml-map map-data -T transform.yaml -s schema.yaml data.yaml

# Single TSV file
linkml-map map-data -T transform.yaml -s schema.yaml --source-type Person people.tsv
# Multiple spec files merged at load time
linkml-map map-data -T enums.yaml -T classes.yaml -s schema.yaml data/

# Directory of spec files
linkml-map map-data -T specs/ -s schema.yaml data/

# Directory of TSV files with streaming output
linkml-map map-data -T transform.yaml -s schema.yaml -f jsonl -o output.jsonl ./data/
# Filter to a single entity class
linkml-map map-data -T specs/ --entity Person -s schema.yaml data/

# Multi-output: write TSV, JSON, and JSONL simultaneously
linkml-map map-data -T transform.yaml -s schema.yaml -f jsonl -O out.tsv -O out.json input.tsv
Expand Down Expand Up @@ -173,6 +192,8 @@ def map_data(
additional_output=additional_output,
target_schema=target_schema,
continue_on_error=continue_on_error,
entity=entity,
emit_spec=emit_spec,
**kwargs,
)
else:
Expand All @@ -186,28 +207,60 @@ def map_data(
output_format=output_format,
target_schema=target_schema,
continue_on_error=continue_on_error,
entity=entity,
emit_spec=emit_spec,
**kwargs,
)


def _load_specs(tr: ObjectTransformer, transformer_specification: tuple[str, ...]) -> None:
"""Load one or more transformer specification files into the transformer."""
if len(transformer_specification) == 1:
path = Path(transformer_specification[0])
if path.is_file():
tr.load_transformer_specification(path)
return
tr.load_transformer_specifications(transformer_specification)


def _emit_spec_to_file(tr: ObjectTransformer, emit_spec: str, entity: str | None) -> None:
"""Write the resolved specification to a file, optionally filtered by entity."""
from linkml_runtime.dumpers import yaml_dumper

spec = tr.specification
if entity:
from copy import deepcopy

spec = deepcopy(spec)
spec.class_derivations = [cd for cd in spec.class_derivations if cd.name == entity]
with open(emit_spec, "w", encoding="utf-8") as f:
f.write(yaml_dumper.dumps(spec))
logger.info("Wrote resolved spec to %s", emit_spec)


def _map_data_single(
input_data: str,
schema: str,
source_type: str | None,
transformer_specification: str,
transformer_specification: tuple[str, ...],
output: str | None,
output_format: str,
target_schema: str | None = None,
continue_on_error: bool = False,
entity: str | None = None,
emit_spec: str | None = None,
**kwargs: dict[str, Any],
) -> None:
"""Original single-object transformation logic."""
tr = ObjectTransformer(**kwargs)
tr.source_schemaview = SchemaView(schema)
tr.load_transformer_specification(transformer_specification)
_load_specs(tr, transformer_specification)
if target_schema:
tr.target_schemaview = SchemaView(target_schema)

if emit_spec:
_emit_spec_to_file(tr, emit_spec, entity)

# Load input data (YAML or JSON)
with open(input_data) as file:
content = file.read()
Expand Down Expand Up @@ -255,23 +308,28 @@ def _map_data_streaming(
input_path: Path,
schema: str,
source_type: str | None,
transformer_specification: str,
transformer_specification: tuple[str, ...],
output: str | None,
output_format: str,
chunk_size: int,
additional_output: tuple = (),
target_schema: str | None = None,
continue_on_error: bool = False,
entity: str | None = None,
emit_spec: str | None = None,
**kwargs: dict[str, Any],
) -> None:
"""Streaming transformation for tabular/directory input."""
# Initialize transformer
tr = ObjectTransformer(**kwargs)
tr.source_schemaview = SchemaView(schema)
tr.load_transformer_specification(transformer_specification)
_load_specs(tr, transformer_specification)
if target_schema:
tr.target_schemaview = SchemaView(target_schema)

if emit_spec:
_emit_spec_to_file(tr, emit_spec, entity)

# Initialize data loader
data_loader = DataLoader(input_path)

Expand All @@ -280,7 +338,7 @@ def _map_data_streaming(
on_error = errors.append if continue_on_error else None

# Create transform iterator and chunk it
transform_iter = transform_spec(tr, data_loader, source_type, on_error=on_error)
transform_iter = transform_spec(tr, data_loader, source_type, on_error=on_error, entity=entity)
chunks = chunked(transform_iter, chunk_size)

# Resolve output format
Expand Down Expand Up @@ -339,7 +397,7 @@ def _map_data_streaming(
@click.option("--target", default="python", show_default=True, help="Target representation.")
def compile(
schema: str,
transformer_specification: str,
transformer_specification: tuple[str, ...],
target: str,
output: str | None,
**kwargs: Any,
Expand All @@ -363,7 +421,7 @@ def compile(
raise NotImplementedError(msg)
tr = ObjectTransformer()
tr.source_schemaview = sv
tr.load_transformer_specification(transformer_specification)
_load_specs(tr, transformer_specification)
result = compiler.compile(tr.specification)
# dump as-is, no encoding
dump_output(result.serialization, None, output)
Expand All @@ -375,7 +433,7 @@ def compile(
@click.argument("schema")
def derive_schema(
schema: str,
transformer_specification: str,
transformer_specification: tuple[str, ...],
output: str | None,
**kwargs: Any,
) -> None:
Expand All @@ -395,7 +453,7 @@ def derive_schema(
"""
logger.info(f"Transforming {schema} using {transformer_specification}")
tr = ObjectTransformer()
tr.load_transformer_specification(transformer_specification)
_load_specs(tr, transformer_specification)
mapper = SchemaMapper(transformer=tr)
mapper.source_schemaview = SchemaView(schema)
target_schema = mapper.derive_schema()
Expand All @@ -409,7 +467,7 @@ def derive_schema(
@click.argument("schema")
def invert(
schema: str,
transformer_specification: str,
transformer_specification: tuple[str, ...],
output: str | None,
**kwargs: Any,
) -> None:
Expand All @@ -422,7 +480,7 @@ def invert(
"""
logger.info(f"Inverting {transformer_specification} using {schema} as source")
tr = ObjectTransformer()
tr.load_transformer_specification(transformer_specification)
_load_specs(tr, transformer_specification)
inverter = TransformationSpecificationInverter(
source_schemaview=SchemaView(schema),
**kwargs,
Expand All @@ -432,9 +490,25 @@ def invert(


@main.command(name="validate-spec")
@entity_option
@click.option(
"--merge",
is_flag=True,
default=False,
help="Merge all spec files into one before validating. Supports directories.",
)
@click.option(
"--emit-spec",
type=click.Path(dir_okay=False),
default=None,
help="Write the resolved (merged + filtered) spec to a file path. Use '-' for stdout.",
)
@click.argument("spec_files", nargs=-1, required=True, type=click.Path(exists=True))
def validate_spec_cmd(
spec_files: tuple[str, ...],
entity: str | None = None,
merge: bool = False,
emit_spec: str | None = None,
) -> None:
"""Validate transformation specification YAML files.

Expand All @@ -446,12 +520,34 @@ def validate_spec_cmd(
linkml-map validate-spec my-transform.yaml

linkml-map validate-spec specs/*.yaml

linkml-map validate-spec --merge --entity Person --emit-spec resolved.yaml specs/
"""
if merge:
_validate_spec_merged(spec_files, entity=entity, emit_spec=emit_spec)
else:
if entity or emit_spec:
click.echo("--entity and --emit-spec require --merge", err=True)
raise SystemExit(1)
_validate_spec_individual(spec_files)


def _validate_spec_individual(spec_files: tuple[str, ...]) -> None:
"""Validate each spec file independently.

Directories are expanded to their contained YAML files.
"""
from linkml_map.utils.spec_merge import resolve_spec_paths
from linkml_map.validator import validate_spec_file

resolved = resolve_spec_paths(spec_files)
if not resolved:
click.echo("No YAML files found in the provided paths", err=True)
raise SystemExit(1)

has_errors = False
for path in spec_files:
errors = validate_spec_file(path)
for path in resolved:
errors = validate_spec_file(str(path))
if errors:
has_errors = True
click.echo(f"{path}:", err=True)
Expand All @@ -464,6 +560,61 @@ def validate_spec_cmd(
raise SystemExit(1)


def _validate_spec_merged(
spec_files: tuple[str, ...],
entity: str | None = None,
emit_spec: str | None = None,
) -> None:
"""Merge spec files, validate the result, optionally emit."""
from linkml_map.utils.spec_merge import load_and_merge_specs
from linkml_map.validator import validate_spec

merged = load_and_merge_specs(spec_files)

# Apply entity filter before validation so we only validate what
# map-data --entity would actually execute.
if entity:
cd = merged.get("class_derivations")
if isinstance(cd, list):
merged["class_derivations"] = [
item
for item in cd
if isinstance(item, dict)
and (
item.get("name") == entity # expanded format
or (len(item) == 1 and entity in item) # compact-key format
)
]
elif isinstance(cd, dict):
merged["class_derivations"] = {k: v for k, v in cd.items() if k == entity}

errors = validate_spec(merged)
if errors:
click.echo("Merged spec validation errors:", err=True)
for error in errors:
click.echo(f" {error}", err=True)
raise SystemExit(1)

click.echo("Merged spec: ok")

if emit_spec:
from linkml_map.transformer.transformer import Transformer

Transformer._normalize_spec_dict(merged)
from linkml_map.datamodel.transformer_model import TransformationSpecification

spec = TransformationSpecification(**merged)
from linkml_runtime.dumpers import yaml_dumper

spec_yaml = yaml_dumper.dumps(spec)
if emit_spec == "-":
click.echo(spec_yaml)
else:
with open(emit_spec, "w", encoding="utf-8") as f:
f.write(spec_yaml)
click.echo(f"Wrote resolved spec to {emit_spec}")


def dump_output(
output_data: dict[str, Any] | list[Any] | str,
output_format: str | None = None,
Expand Down
5 changes: 5 additions & 0 deletions src/linkml_map/transformer/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def transform_spec(
data_loader: DataLoader,
source_type: str | None = None,
on_error: Callable[[TransformationError], None] | None = None,
entity: str | None = None,
) -> Iterator[dict[str, Any]]:
"""
Iterate class_derivation blocks and stream transformed rows.
Expand All @@ -42,6 +43,8 @@ def transform_spec(
:class:`TransformationError` is caught, enriched with row context,
and passed to the callback. When ``None`` (default), errors propagate
immediately (fail-fast).
:param entity: Optional class name filter. When provided, only
top-level class_derivations whose ``name`` matches are processed.
:returns: Iterator of transformed row dicts.
"""
spec = transformer.derived_specification
Expand All @@ -52,6 +55,8 @@ def transform_spec(
transformer.lookup_index = LookupIndex()

for class_deriv in spec.class_derivations:
if entity and class_deriv.name != entity:
continue
table_name = class_deriv.populated_from or class_deriv.name
if table_name not in data_loader:
logger.debug("Skipping class_derivation %s: no data found", class_deriv.name)
Expand Down
Loading
Loading