diff --git a/docs/src/compute/autopopulate2.0-spec.md b/docs/src/compute/autopopulate2.0-spec.md new file mode 100644 index 000000000..03382b06b --- /dev/null +++ b/docs/src/compute/autopopulate2.0-spec.md @@ -0,0 +1,842 @@ +# Autopopulate 2.0 Specification + +## Overview + +This specification redesigns the DataJoint job handling system to provide better visibility, control, and scalability for distributed computing workflows. The new system replaces the schema-level `~jobs` table with per-table job tables that offer richer status tracking, proper referential integrity, and dashboard-friendly monitoring. + +## Problem Statement + +### Current Jobs Table Limitations + +The existing `~jobs` table has significant limitations: + +1. **Limited status tracking**: Only supports `reserved`, `error`, and `ignore` statuses +2. **Functions as an error log**: Cannot efficiently track pending or completed jobs +3. **Poor dashboard visibility**: No way to monitor pipeline progress without querying multiple tables +4. **Key hashing obscures data**: Primary keys are stored as hashes, making debugging difficult +5. **No referential integrity**: Jobs table is independent of computed tables; orphaned jobs can accumulate + +### Key Source Limitations + +1. **Frequent manual modifications**: Subset operations require modifying `key_source` property +2. **Local visibility only**: Custom key sources are not accessible database-wide +3. **Performance bottleneck**: Multiple workers querying `key_source` simultaneously creates contention +4. **Codebase dependency**: Requires full pipeline codebase to determine pending work + +## Proposed Solution + +### Terminology + +- **Stale job**: A job (any status) whose key no longer exists in `key_source`. The upstream records have been deleted. Stale jobs are cleaned up by `refresh()` based on the `stale_timeout` parameter. + +- **Orphaned job**: A `reserved` job whose worker is no longer running. The process that reserved the job crashed, was terminated, or lost connection. The job remains `reserved` indefinitely. Orphaned jobs can be cleaned up by `refresh(orphan_timeout=...)` or manually deleted. + +- **Completed job**: A job with status `success`. Only exists when `keep_completed=True`. Represents historical record of successful computation. + +### Core Design Principles + +1. **Per-table jobs**: Each computed table gets its own hidden jobs table +2. **FK-only primary keys**: Auto-populated tables must have primary keys composed entirely of foreign key references. Non-FK primary key attributes are prohibited in new tables (legacy tables are supported with degraded granularity) +3. **No FK constraints on jobs**: Jobs tables omit foreign key constraints for performance; stale jobs are cleaned by `refresh()` +4. **Rich status tracking**: Extended status values for full lifecycle visibility +5. **Automatic refresh**: `populate()` automatically refreshes the jobs queue (adding new jobs, removing stale ones) +6. **Backward compatible**: When `reserve_jobs=False` (default), 1.0 behavior is preserved + +## Architecture + +### Jobs Table Structure + +Each `dj.Imported` or `dj.Computed` table `MyTable` will have an associated hidden jobs table `~~my_table` with the following structure: + +``` +# Job queue for MyTable +subject_id : int +session_id : int +... # Only FK-derived primary key attributes (NO foreign key constraints) +--- +status : enum('pending', 'reserved', 'success', 'error', 'ignore') +priority : uint8 # Lower = more urgent (0 = highest), set by refresh() +created_time=CURRENT_TIMESTAMP : timestamp # When job was added to queue +scheduled_time=CURRENT_TIMESTAMP : timestamp # Process on or after this time +reserved_time=null : timestamp # When job was reserved +completed_time=null : timestamp # When job completed +duration=null : float64 # Execution duration in seconds +error_message="" : varchar(2047) # Truncated error message +error_stack=null : # Full error traceback +user="" : varchar(255) # Database user who reserved/completed job +host="" : varchar(255) # Hostname of worker +pid=0 : uint32 # Process ID of worker +connection_id=0 : uint64 # MySQL connection ID +version="" : varchar(255) # Code version (git hash, package version, etc.) +``` + +**Important**: The jobs table primary key includes only those attributes that come through foreign keys in the target table's primary key. Additional primary key attributes (if any) are excluded. This means: +- If a target table has primary key `(-> Subject, -> Session, method)`, the jobs table has primary key `(subject_id, session_id)` only +- Multiple target rows may map to a single job entry when additional PK attributes exist +- Jobs tables have **no foreign key constraints** for performance (stale jobs handled by `refresh()`) + +### Access Pattern + +Jobs are accessed as a property of the computed table: + +```python +# Current pattern (schema-level) +schema.jobs + +# New pattern (per-table) +MyTable.jobs + +# Examples +FilteredImage.jobs # Access jobs table +FilteredImage.jobs & 'status="error"' # Query errors +FilteredImage.jobs.refresh() # Refresh job queue +``` + +### Status Values + +| Status | Description | +|--------|-------------| +| `pending` | Job is queued and ready to be processed | +| `reserved` | Job is currently being processed by a worker | +| `success` | Job completed successfully (optional, depends on settings) | +| `error` | Job failed with an error | +| `ignore` | Job should be skipped (manually set, not part of automatic transitions) | + +### Status Transitions + +```mermaid +stateDiagram-v2 + state "(none)" as none1 + state "(none)" as none2 + none1 --> pending : refresh() + none1 --> ignore : ignore() + pending --> reserved : reserve() + reserved --> none2 : complete() + reserved --> success : complete()* + reserved --> error : error() + success --> pending : refresh()* + error --> none2 : delete() + success --> none2 : delete() + ignore --> none2 : delete() +``` + +- `complete()` deletes the job entry (default when `jobs.keep_completed=False`) +- `complete()*` keeps the job as `success` (when `jobs.keep_completed=True`) +- `refresh()*` re-pends a `success` job if its key is in `key_source` but not in target + +**Transition methods:** +- `refresh()` — Adds new jobs as `pending`; also re-pends `success` jobs if key is in `key_source` but not in target +- `ignore()` — Marks a key as `ignore` (can be called on keys not yet in jobs table) +- `reserve()` — Marks a pending job as `reserved` before calling `make()` +- `complete()` — Marks reserved job as `success`, or deletes it (based on `jobs.keep_completed` setting) +- `error()` — Marks reserved job as `error` with message and stack trace +- `delete()` — Inherited from `delete_quick()`; use `(jobs & condition).delete()` pattern + +**Manual status control:** +- `ignore` is set manually via `jobs.ignore(key)` and is not part of automatic transitions +- Jobs with `status='ignore'` are skipped by `populate()` and `refresh()` +- To reset an ignored job, delete it and call `refresh()`: `jobs.ignored.delete(); jobs.refresh()` + +## API Design + +### JobsTable Class + +```python +class JobsTable(Table): + """Hidden table managing job queue for a computed table.""" + + @property + def definition(self) -> str: + """Dynamically generated based on parent table's primary key.""" + ... + + def refresh( + self, + *restrictions, + delay: float = 0, + priority: int = None, + stale_timeout: float = None, + orphan_timeout: float = None + ) -> dict: + """ + Refresh the jobs queue: add new jobs and clean up stale/orphaned jobs. + + Operations performed: + 1. Add new jobs: (key_source & restrictions) - target - jobs → insert as 'pending' + 2. Re-pend success jobs: if keep_completed=True and key in key_source but not in target + 3. Remove stale jobs: jobs older than stale_timeout whose keys no longer in key_source + 4. Remove orphaned jobs: reserved jobs older than orphan_timeout (if specified) + + Args: + restrictions: Conditions to filter key_source (for adding new jobs) + delay: Seconds from now until new jobs become available for processing. + Default: 0 (immediately available). Uses database server time. + priority: Priority for new jobs (lower = more urgent). + Default from config: jobs.default_priority (5) + stale_timeout: Seconds after which jobs are checked for staleness. + Jobs older than this are removed if key not in key_source. + Default from config: jobs.stale_timeout (3600s) + Set to 0 to skip stale cleanup. + orphan_timeout: Seconds after which reserved jobs are considered orphaned. + Reserved jobs older than this are deleted and re-added as pending. + Default: None (no orphan cleanup - must be explicit). + Typical value: 3600 (1 hour) or based on expected job duration. + + Returns: + { + 'added': int, # New pending jobs added + 'removed': int, # Stale jobs removed + 'orphaned': int, # Orphaned jobs reset to pending + 're_pended': int # Success jobs re-pended (keep_completed mode) + } + """ + ... + + def reserve(self, key: dict) -> bool: + """ + Attempt to reserve a job for processing. + + Updates status to 'reserved' if currently 'pending' and scheduled_time <= now. + No locking is used; rare conflicts are resolved by the make() transaction. + + Returns: + True if reservation successful, False if job not found or not pending. + """ + ... + + def complete(self, key: dict, duration: float = None) -> None: + """ + Mark a job as successfully completed. + + Updates status to 'success', records duration and completion time. + """ + ... + + def error(self, key: dict, error_message: str, error_stack: str = None) -> None: + """ + Mark a job as failed with error details. + + Updates status to 'error', records error message and stack trace. + """ + ... + + def ignore(self, key: dict) -> None: + """ + Mark a job to be ignored (skipped during populate). + + To reset an ignored job, delete it and call refresh(). + """ + ... + + # delete() is inherited from delete_quick() - no confirmation required + # Usage: (jobs & condition).delete() or jobs.errors.delete() + + @property + def pending(self) -> QueryExpression: + """Return query for pending jobs.""" + return self & 'status="pending"' + + @property + def reserved(self) -> QueryExpression: + """Return query for reserved jobs.""" + return self & 'status="reserved"' + + @property + def errors(self) -> QueryExpression: + """Return query for error jobs.""" + return self & 'status="error"' + + @property + def ignored(self) -> QueryExpression: + """Return query for ignored jobs.""" + return self & 'status="ignore"' + + @property + def completed(self) -> QueryExpression: + """Return query for completed jobs.""" + return self & 'status="success"' + + def progress(self) -> dict: + """ + Return job status breakdown. + + Returns: + { + 'pending': int, # Jobs waiting to be processed + 'reserved': int, # Jobs currently being processed + 'success': int, # Completed jobs (if keep_completed=True) + 'error': int, # Failed jobs + 'ignore': int, # Ignored jobs + 'total': int # Total jobs in table + } + """ + ... +``` + +### AutoPopulate Integration + +The `populate()` method is updated to use the new jobs table: + +```python +def populate( + self, + *restrictions, + suppress_errors: bool = False, + return_exception_objects: bool = False, + reserve_jobs: bool = False, + max_calls: int = None, + display_progress: bool = False, + processes: int = 1, + make_kwargs: dict = None, + # New parameters + priority: int = None, # Only process jobs at this priority or more urgent (lower values) + refresh: bool = None, # Refresh jobs queue before processing (default from config) +) -> dict: + """ + Populate the table by calling make() for each missing entry. + + Behavior depends on reserve_jobs parameter: + + When reserve_jobs=False (default, 1.0 compatibility mode): + - Jobs table is NOT used + - Keys computed directly from: (key_source & restrictions) - target + - No job reservation, no status tracking + - Suitable for single-worker scenarios + + When reserve_jobs=True (distributed mode): + 1. If refresh=True (or config['jobs.auto_refresh'] when refresh=None): + Call self.jobs.refresh(*restrictions) to sync jobs queue + 2. Fetch pending jobs ordered by (priority ASC, scheduled_time ASC) + Apply max_calls limit to fetched keys (total across all processes) + 3. For each pending job where scheduled_time <= now: + a. Mark job as 'reserved' + b. Call make(key) + c. On success: mark job as 'success' or delete (based on keep_completed) + d. On error: mark job as 'error' with message/stack + 4. Continue until all fetched jobs processed + + Args: + restrictions: Conditions to filter key_source + suppress_errors: If True, collect errors instead of raising + return_exception_objects: Return exception objects vs strings + reserve_jobs: Enable job reservation for distributed processing + max_calls: Maximum number of make() calls (total across all processes) + display_progress: Show progress bar + processes: Number of worker processes + make_kwargs: Non-computation kwargs passed to make() + priority: Only process jobs at this priority or more urgent (lower values) + refresh: Refresh jobs queue before processing. Default from config['jobs.auto_refresh'] + + Deprecated parameters (removed in 2.0): + - 'order': Job ordering now controlled by priority. Use refresh(priority=N). + - 'limit': Use max_calls instead. The distinction was confusing (see #1203). + - 'keys': Use restrictions instead. Direct key specification bypassed job tracking. + """ + ... +``` + +### Progress and Monitoring + +```python +# Current progress reporting +remaining, total = MyTable.progress() + +# Enhanced progress with jobs table +MyTable.jobs.progress() # Returns detailed status breakdown + +# Example output: +# { +# 'pending': 150, +# 'reserved': 3, +# 'success': 847, +# 'error': 12, +# 'ignore': 5, +# 'total': 1017 +# } +``` + +### Priority and Scheduling + +Priority and scheduling are handled via `refresh()` parameters. Lower priority values are more urgent (0 = highest priority). Scheduling uses relative time (seconds from now) based on database server time. + +```python +# Add urgent jobs (priority=0 is most urgent) +MyTable.jobs.refresh(priority=0) + +# Add normal jobs (default priority=5) +MyTable.jobs.refresh() + +# Add low-priority background jobs +MyTable.jobs.refresh(priority=10) + +# Schedule jobs for future processing (2 hours from now) +MyTable.jobs.refresh(delay=2*60*60) # 7200 seconds + +# Schedule jobs for tomorrow (24 hours from now) +MyTable.jobs.refresh(delay=24*60*60) + +# Combine: urgent jobs with 1-hour delay +MyTable.jobs.refresh(priority=0, delay=3600) + +# Add urgent jobs for specific subjects +MyTable.jobs.refresh(Subject & 'priority="urgent"', priority=0) +``` + +## Implementation Details + +### Table Naming Convention + +Jobs tables use the `~~` prefix (double tilde): +- Table `FilteredImage` (stored as `__filtered_image`) +- Jobs table: `~~filtered_image` (stored as `~~filtered_image`) + +The `~~` prefix distinguishes jobs tables from other hidden tables (`~jobs`, `~lineage`) while keeping names short. + +### Primary Key Constraint + +**New tables**: Auto-populated tables (`dj.Computed`, `dj.Imported`) must have primary keys composed entirely of foreign key references. Non-FK primary key attributes are prohibited. + +```python +# ALLOWED - all PK attributes come from foreign keys +@schema +class FilteredImage(dj.Computed): + definition = """ + -> Image + --- + filtered_image : + """ + +# ALLOWED - multiple FKs in primary key +@schema +class Analysis(dj.Computed): + definition = """ + -> Recording + -> AnalysisMethod # method comes from FK to lookup table + --- + result : float64 + """ + +# NOT ALLOWED - raises error on table declaration +@schema +class Analysis(dj.Computed): + definition = """ + -> Recording + method : varchar(32) # ERROR: non-FK primary key attribute + --- + result : float64 + """ +``` + +**Rationale**: This constraint ensures 1:1 correspondence between jobs and target rows, simplifying job status tracking and eliminating ambiguity. + +**Legacy table support**: Existing tables with non-FK primary key attributes continue to work. The jobs table uses only the FK-derived attributes, treating additional PK attributes as if they were secondary attributes. This means: +- One job entry may correspond to multiple target rows +- Job marked `success` when ANY matching target row exists +- Job marked `pending` only when NO matching target rows exist + +```python +# Legacy table (created before 2.0) +# Jobs table primary key: (recording_id) only +# One job covers all 'method' values for a given recording +@schema +class LegacyAnalysis(dj.Computed): + definition = """ + -> Recording + method : varchar(32) # Non-FK attribute (legacy, not recommended) + --- + result : float64 + """ +``` + +The jobs table has **no foreign key constraints** for performance reasons. + +### Stale Job Handling + +Stale jobs are jobs (any status except `ignore`) whose keys no longer exist in `key_source`. Since there are no FK constraints on jobs tables, these jobs remain until cleaned up by `refresh()`: + +```python +# refresh() handles stale jobs automatically +result = FilteredImage.jobs.refresh() +# Returns: {'added': 10, 'removed': 3, 'orphaned': 0, 're_pended': 0} + +# Stale detection logic: +# 1. Find jobs where created_time < (now - stale_timeout) +# 2. Check if their keys still exist in key_source +# 3. Remove jobs (pending, reserved, success, error) whose keys no longer exist +# 4. Jobs with status='ignore' are never removed (permanent until manual delete) +``` + +**Why not use foreign key cascading deletes?** +- FK constraints add overhead on every insert/update/delete operation +- Jobs tables are high-traffic (frequent reservations and status updates) +- Stale jobs are harmless until refresh—they simply won't match key_source +- The `refresh()` approach is more efficient for batch cleanup + +### Orphaned Job Handling + +Orphaned jobs are `reserved` jobs whose worker is no longer running. Unlike stale jobs, orphaned jobs reference valid keys—only the worker has disappeared. + +```python +# Automatic orphan cleanup (use with caution) +result = FilteredImage.jobs.refresh(orphan_timeout=3600) # 1 hour +# Jobs reserved more than 1 hour ago are deleted and re-added as pending +# Returns: {'added': 0, 'removed': 0, 'orphaned': 5, 're_pended': 0} + +# Manual orphan cleanup (more control) +(FilteredImage.jobs.reserved & 'reserved_time < NOW() - INTERVAL 2 HOUR').delete() +FilteredImage.jobs.refresh() # Re-adds as pending if key still in key_source +``` + +**When to use orphan_timeout**: +- In automated pipelines where job duration is predictable +- When workers are known to have failed (cluster node died) +- Set timeout > expected max job duration to avoid killing active jobs + +**When NOT to use orphan_timeout**: +- When job durations are highly variable +- When you need to coordinate with external orchestration +- Default is None (disabled) for safety + +### Table Drop and Alter Behavior + +When an auto-populated table is **dropped**, its associated jobs table is automatically dropped: + +```python +# Dropping FilteredImage also drops ~~filtered_image +FilteredImage.drop() +``` + +When an auto-populated table is **altered** (e.g., primary key changes), the jobs table is dropped and can be recreated via `refresh()`: + +```python +# Alter that changes primary key structure +# Jobs table is dropped since its structure no longer matches +FilteredImage.alter() + +# Recreate jobs table with new structure +FilteredImage.jobs.refresh() +``` + +### Lazy Table Creation + +Jobs tables are created automatically on first use: + +```python +# First call to populate with reserve_jobs=True creates the jobs table +FilteredImage.populate(reserve_jobs=True) +# Creates ~~filtered_image if it doesn't exist, then populates + +# Alternatively, explicitly create/refresh the jobs table +FilteredImage.jobs.refresh() +``` + +The jobs table is created with a primary key derived from the target table's foreign key attributes. + +### Conflict Resolution + +Conflict resolution relies on the transaction surrounding each `make()` call: + +- With `reserve_jobs=False`: Workers query `key_source` directly and may attempt the same key +- With `reserve_jobs=True`: Job reservation reduces conflicts but doesn't eliminate them entirely + +When two workers attempt to populate the same key: +1. Both workers attempt to reserve the same job (near-simultaneous) +2. Both reservation attempts succeed (no locking used) +3. Both call `make()` for the same key +4. First worker's `make()` transaction commits successfully +5. Second worker's `make()` transaction fails with duplicate key error +6. Second worker silently moves to next job (no status update) +7. First worker marks job `success` or deletes it + +**Important**: Only errors inside `make()` are logged with `error` status. Duplicate key errors from collisions are coordination artifacts handled silently—the first worker's completion takes precedence. + +**Edge case - first worker crashes after insert**: +- Job stays `reserved` (orphaned) +- Row exists in table (insert succeeded) +- Resolution: `refresh(orphan_timeout=...)` sees key exists in table, removes orphaned job + +**Why this is acceptable**: +- The `make()` transaction guarantees data integrity +- Duplicate key error is a clean, expected signal (not a real error) +- With `reserve_jobs=True`, conflicts are rare +- Wasted computation is minimal compared to locking complexity + +### Job Reservation vs Pre-Partitioning + +The job reservation mechanism (`reserve_jobs=True`) allows workers to dynamically claim jobs from a shared queue. However, some orchestration systems may prefer to **pre-partition** jobs before distributing them to workers: + +```python +# Pre-partitioning example: orchestrator divides work explicitly +all_pending = FilteredImage.jobs.pending.fetch("KEY") + +# Split jobs among workers (e.g., by worker index) +n_workers = 4 +for worker_id in range(n_workers): + worker_keys = all_pending[worker_id::n_workers] # Round-robin assignment + # Send worker_keys to worker via orchestration system (Slurm, K8s, etc.) + +# Worker receives its assigned keys and processes them directly +# Pass keys as restrictions to filter key_source +for key in assigned_keys: + FilteredImage.populate(key) # key acts as restriction, reserve_jobs=False by default +``` + +**When to use each approach**: + +| Approach | Use Case | +|----------|----------| +| **Dynamic reservation** (`reserve_jobs=True`) | Simple setups, variable job durations, workers that start/stop dynamically | +| **Pre-partitioning** | Batch schedulers (Slurm, PBS), predictable job counts, avoiding reservation overhead | + +Both approaches benefit from the same transaction-based conflict resolution as a safety net. + +## Configuration Options + +New configuration settings for job management: + +```python +# In datajoint config +dj.config['jobs.auto_refresh'] = True # Auto-refresh on populate (default: True) +dj.config['jobs.keep_completed'] = False # Keep success records (default: False) +dj.config['jobs.stale_timeout'] = 3600 # Seconds before pending job is considered stale (default: 3600) +dj.config['jobs.default_priority'] = 5 # Default priority for new jobs (lower = more urgent) +dj.config['jobs.version'] = None # Version string for jobs (default: None) + # Special values: 'git' = auto-detect git hash +``` + +### Config vs Parameter Precedence + +When both config and method parameters are available, **explicit parameters override config values**: + +```python +# Config sets defaults +dj.config['jobs.auto_refresh'] = True +dj.config['jobs.default_priority'] = 5 + +# Parameter overrides config +MyTable.populate(reserve_jobs=True, refresh=False) # refresh=False wins +MyTable.jobs.refresh(priority=0) # priority=0 wins +``` + +Parameters set to `None` use the config default. This allows per-call customization while maintaining global defaults. + +## Usage Examples + +### Basic Distributed Computing + +```python +# Worker 1 +FilteredImage.populate(reserve_jobs=True) + +# Worker 2 (can run simultaneously) +FilteredImage.populate(reserve_jobs=True) + +# Monitor progress +print(FilteredImage.jobs.progress()) +``` + +### Priority-Based Processing + +```python +# Add urgent jobs (priority=0 is most urgent) +urgent_subjects = Subject & 'priority="urgent"' +FilteredImage.jobs.refresh(urgent_subjects, priority=0) + +# Workers will process lowest-priority-value jobs first +FilteredImage.populate(reserve_jobs=True) +``` + +### Scheduled Processing + +```python +# Schedule jobs for overnight processing (8 hours from now) +FilteredImage.jobs.refresh('subject_id > 100', delay=8*60*60) + +# Only jobs whose scheduled_time <= now will be processed +FilteredImage.populate(reserve_jobs=True) +``` + +### Error Recovery + +```python +# View errors +errors = FilteredImage.jobs.errors.fetch(as_dict=True) +for err in errors: + print(f"Key: {err['subject_id']}, Error: {err['error_message']}") + +# Delete specific error jobs after fixing the issue +(FilteredImage.jobs & 'subject_id=42').delete() + +# Delete all error jobs +FilteredImage.jobs.errors.delete() + +# Re-add deleted jobs as pending (if keys still in key_source) +FilteredImage.jobs.refresh() +``` + +### Dashboard Queries + +```python +# Get pipeline-wide status using schema.jobs +def pipeline_status(schema): + return { + jt.table_name: jt.progress() + for jt in schema.jobs + } + +# Example output: +# { +# 'FilteredImage': {'pending': 150, 'reserved': 3, 'success': 847, 'error': 12}, +# 'Analysis': {'pending': 500, 'reserved': 0, 'success': 0, 'error': 0}, +# } + +# Refresh all jobs tables in the schema +for jobs_table in schema.jobs: + jobs_table.refresh() + +# Get all errors across the pipeline +all_errors = [] +for jt in schema.jobs: + errors = jt.errors.fetch(as_dict=True) + for err in errors: + err['_table'] = jt.table_name + all_errors.append(err) +``` + +## Backward Compatibility + +### Migration + +This is a major release. The legacy schema-level `~jobs` table is replaced by per-table jobs tables: + +- **Legacy `~jobs` table**: No longer used; can be dropped manually if present +- **New jobs tables**: Created automatically on first `populate(reserve_jobs=True)` call +- **No parallel support**: Teams should migrate cleanly to the new system + +### API Compatibility + +The `schema.jobs` property returns a list of all jobs table objects for auto-populated tables in the schema: + +```python +# Returns list of JobsTable objects +schema.jobs +# [FilteredImage.jobs, Analysis.jobs, ...] + +# Iterate over all jobs tables +for jobs_table in schema.jobs: + print(f"{jobs_table.table_name}: {jobs_table.progress()}") + +# Query all errors across the schema +all_errors = [job for jt in schema.jobs for job in jt.errors.fetch(as_dict=True)] + +# Refresh all jobs tables +for jobs_table in schema.jobs: + jobs_table.refresh() +``` + +This replaces the legacy single `~jobs` table with direct access to per-table jobs. + +## Hazard Analysis + +This section identifies potential hazards and their mitigations. + +### Race Conditions + +| Hazard | Description | Mitigation | +|--------|-------------|------------| +| **Simultaneous reservation** | Two workers reserve the same pending job at nearly the same time | Acceptable: duplicate `make()` calls are resolved by transaction—second worker gets duplicate key error | +| **Reserve during refresh** | Worker reserves a job while another process is running `refresh()` | No conflict: `refresh()` adds new jobs and removes stale ones; reservation updates existing rows | +| **Concurrent refresh calls** | Multiple processes call `refresh()` simultaneously | Acceptable: may result in duplicate insert attempts, but primary key constraint prevents duplicates | +| **Complete vs delete race** | One process completes a job while another deletes it | Acceptable: one operation succeeds, other becomes no-op (row not found) | + +### State Transitions + +| Hazard | Description | Mitigation | +|--------|-------------|------------| +| **Invalid state transition** | Code attempts illegal transition (e.g., pending → success) | Implementation enforces valid transitions; invalid attempts raise error | +| **Stuck in reserved** | Worker crashes while job is reserved (orphaned job) | Manual intervention required: `jobs.reserved.delete()` (see Orphaned Job Handling) | +| **Success re-pended unexpectedly** | `refresh()` re-pends a success job when user expected it to stay | Only occurs if `keep_completed=True` AND key exists in `key_source` but not in target; document clearly | +| **Ignore not respected** | Ignored jobs get processed anyway | Implementation must skip `status='ignore'` in `populate()` job fetching | + +### Data Integrity + +| Hazard | Description | Mitigation | +|--------|-------------|------------| +| **Stale job processed** | Job references deleted upstream data | `make()` will fail or produce invalid results; `refresh()` cleans stale jobs before processing | +| **Jobs table out of sync** | Jobs table doesn't match `key_source` | `refresh()` synchronizes; call periodically or rely on `populate(refresh=True)` | +| **Partial make failure** | `make()` partially succeeds then fails | DataJoint transaction rollback ensures atomicity; job marked as error | +| **Error message truncation** | Error details exceed `varchar(2047)` | Full stack stored in `error_stack` (mediumblob); `error_message` is summary only | + +### Performance + +| Hazard | Description | Mitigation | +|--------|-------------|------------| +| **Large jobs table** | Jobs table grows very large with `keep_completed=True` | Default is `keep_completed=False`; provide guidance on periodic cleanup | +| **Slow refresh on large key_source** | `refresh()` queries entire `key_source` | Can restrict refresh to subsets: `jobs.refresh(Subject & 'lab="smith"')` | +| **Many jobs tables per schema** | Schema with many computed tables has many jobs tables | Jobs tables are lightweight; only created on first use | + +### Operational + +| Hazard | Description | Mitigation | +|--------|-------------|------------| +| **Accidental job deletion** | User runs `jobs.delete()` without restriction | `delete()` inherits from `delete_quick()` (no confirmation); users must apply restrictions carefully | +| **Clearing active jobs** | User clears reserved jobs while workers are still running | May cause duplicated work if job is refreshed and picked up again; coordinate with orchestrator | +| **Priority confusion** | User expects higher number = higher priority | Document clearly: lower values are more urgent (0 = highest priority) | + +### Migration + +| Hazard | Description | Mitigation | +|--------|-------------|------------| +| **Legacy ~jobs table conflict** | Old `~jobs` table exists alongside new per-table jobs | Systems are independent; legacy table can be dropped manually | +| **Mixed version workers** | Some workers use old system, some use new | Major release; do not support mixed operation—require full migration | +| **Lost error history** | Migrating loses error records from legacy table | Document migration procedure; users can export legacy errors before migration | + +## Future Extensions + +- [ ] Web-based dashboard for job monitoring +- [ ] Webhook notifications for job completion/failure +- [ ] Job dependencies (job B waits for job A) +- [ ] Resource tagging (GPU required, high memory, etc.) +- [ ] Retry policies (max retries, exponential backoff) +- [ ] Job grouping/batching for efficiency +- [ ] Integration with external schedulers (Slurm, PBS, etc.) + +## Rationale + +### Why Not External Orchestration? + +The team considered integrating external tools like Airflow or Flyte but rejected this approach because: + +1. **Deployment complexity**: External orchestrators require significant infrastructure +2. **Maintenance burden**: Additional systems to maintain and monitor +3. **Accessibility**: Not all DataJoint users have access to orchestration platforms +4. **Tight integration**: DataJoint's transaction model requires close coordination + +The built-in jobs system provides 80% of the value with minimal additional complexity. + +### Why Per-Table Jobs? + +Per-table jobs tables provide: + +1. **Better isolation**: Jobs for one table don't affect others +2. **Simpler queries**: No need to filter by table_name +3. **Native keys**: Primary keys are readable, not hashed +4. **High performance**: No FK constraints means minimal overhead on job operations +5. **Scalability**: Each table's jobs can be indexed independently + +### Why Remove Key Hashing? + +The current system hashes primary keys to support arbitrary key types. The new system uses native keys because: + +1. **Readability**: Debugging is much easier with readable keys +2. **Query efficiency**: Native keys can use table indexes +3. **Foreign keys**: Hash-based keys cannot participate in foreign key relationships +4. **Simplicity**: No need for hash computation and comparison + +### Why FK-Derived Primary Keys Only? + +The jobs table primary key includes only attributes derived from foreign keys in the target table's primary key. This design: + +1. **Aligns with key_source**: The `key_source` query naturally produces keys matching the FK-derived attributes +2. **Simplifies job identity**: A job's identity is determined by its upstream dependencies +3. **Handles additional PK attributes**: When targets have additional PK attributes (e.g., `method`), one job covers all values for that attribute diff --git a/docs/src/design/hidden-job-metadata-spec.md b/docs/src/design/hidden-job-metadata-spec.md new file mode 100644 index 000000000..a33a8d51d --- /dev/null +++ b/docs/src/design/hidden-job-metadata-spec.md @@ -0,0 +1,355 @@ +# Hidden Job Metadata in Computed Tables + +## Overview + +Job execution metadata (start time, duration, code version) should be persisted in computed tables themselves, not just in ephemeral job entries. This is accomplished using hidden attributes. + +## Motivation + +The current job table (`~~table_name`) tracks execution metadata, but: +1. Job entries are deleted after completion (unless `keep_completed=True`) +2. Users often need to know when and with what code version each row was computed +3. This metadata should be transparent - not cluttering the user-facing schema + +Hidden attributes (prefixed with `_`) provide the solution: stored in the database but filtered from user-facing APIs. + +## Hidden Job Metadata Attributes + +| Attribute | Type | Description | +|-----------|------|-------------| +| `_job_start_time` | datetime(3) | When computation began | +| `_job_duration` | float32 | Computation duration in seconds | +| `_job_version` | varchar(64) | Code version (e.g., git commit hash) | + +**Design notes:** +- `_job_duration` (elapsed time) rather than `_job_completed_time` because duration is more informative for performance analysis +- `varchar(64)` for version is sufficient for git hashes (40 chars for SHA-1, 7-8 for short hash) +- `datetime(3)` provides millisecond precision + +## Configuration + +### Settings Structure + +Job metadata is controlled via `config.jobs` settings: + +```python +class JobsSettings(BaseSettings): + """Job queue configuration for AutoPopulate 2.0.""" + + model_config = SettingsConfigDict( + env_prefix="DJ_JOBS_", + case_sensitive=False, + extra="forbid", + validate_assignment=True, + ) + + # Existing settings + auto_refresh: bool = Field(default=True, ...) + keep_completed: bool = Field(default=False, ...) + stale_timeout: int = Field(default=3600, ...) + default_priority: int = Field(default=5, ...) + version_method: Literal["git", "none"] | None = Field(default=None, ...) + allow_new_pk_fields_in_computed_tables: bool = Field(default=False, ...) + + # New setting for hidden job metadata + add_job_metadata: bool = Field( + default=False, + description="Add hidden job metadata attributes (_job_start_time, _job_duration, _job_version) " + "to Computed and Imported tables during declaration. Tables created without this setting " + "will not receive metadata updates during populate." + ) +``` + +### Access Patterns + +```python +import datajoint as dj + +# Read setting +dj.config.jobs.add_job_metadata # False (default) + +# Enable programmatically +dj.config.jobs.add_job_metadata = True + +# Enable via environment variable +# DJ_JOBS_ADD_JOB_METADATA=true + +# Enable in config file (dj_config.yaml) +# jobs: +# add_job_metadata: true + +# Temporary override +with dj.config.override(jobs={"add_job_metadata": True}): + schema(MyComputedTable) # Declared with metadata columns +``` + +### Setting Interactions + +| Setting | Effect on Job Metadata | +|---------|----------------------| +| `add_job_metadata=True` | New Computed/Imported tables get hidden metadata columns | +| `add_job_metadata=False` | Tables declared without metadata columns (default) | +| `version_method="git"` | `_job_version` populated with git short hash | +| `version_method="none"` | `_job_version` left empty | +| `version_method=None` | `_job_version` left empty (same as "none") | + +### Behavior at Declaration vs Populate + +| `add_job_metadata` at declare | `add_job_metadata` at populate | Result | +|------------------------------|-------------------------------|--------| +| True | True | Metadata columns created and populated | +| True | False | Metadata columns exist but not populated | +| False | True | No metadata columns, populate skips silently | +| False | False | No metadata columns, normal behavior | + +### Retrofitting Existing Tables + +Tables created before enabling `add_job_metadata` do not have the hidden metadata columns. +To add metadata columns to existing tables, use the migration utility (not automatic): + +```python +from datajoint.migrate import add_job_metadata_columns + +# Add hidden metadata columns to specific table +add_job_metadata_columns(MyComputedTable) + +# Add to all Computed/Imported tables in a schema +add_job_metadata_columns(schema) +``` + +This utility: +- ALTERs the table to add the three hidden columns +- Does NOT populate existing rows (metadata remains NULL) +- Future `populate()` calls will populate metadata for new rows + +## Behavior + +### Declaration-time + +When `config.jobs.add_job_metadata=True` and a Computed/Imported table is declared: +- Hidden metadata columns are added to the table definition +- Only master tables receive metadata columns; Part tables never get them + +### Population-time + +After `make()` completes successfully: +1. Check if the table has hidden metadata columns +2. If yes: UPDATE the just-inserted rows with start_time, duration, version +3. If no: Silently skip (no error, no ALTER) + +This applies to both: +- **Direct mode** (`reserve_jobs=False`): Single-process populate +- **Distributed mode** (`reserve_jobs=True`): Multi-worker with job table coordination + +## Excluding Hidden Attributes from Binary Operators + +### Problem Statement + +If two tables have hidden attributes with the same name (e.g., both have `_job_start_time`), SQL's NATURAL JOIN would incorrectly match on them: + +```sql +-- NATURAL JOIN matches ALL common attributes including hidden +SELECT * FROM table_a NATURAL JOIN table_b +-- Would incorrectly match on _job_start_time! +``` + +### Solution: Replace NATURAL JOIN with USING Clause + +Hidden attributes must be excluded from all binary operator considerations. The result of a join does not preserve hidden attributes from its operands. + +**Current implementation:** +```python +def from_clause(self): + clause = next(support) + for s, left in zip(support, self._left): + clause += " NATURAL{left} JOIN {clause}".format(...) +``` + +**Proposed implementation:** +```python +def from_clause(self): + clause = next(support) + for s, (left, using_attrs) in zip(support, self._joins): + if using_attrs: + using = "USING ({})".format(", ".join(f"`{a}`" for a in using_attrs)) + clause += " {left}JOIN {s} {using}".format( + left="LEFT " if left else "", + s=s, + using=using + ) + else: + # Cross join (no common non-hidden attributes) + clause += " CROSS JOIN " + s if not left else " LEFT JOIN " + s + " ON TRUE" + return clause +``` + +### Changes Required + +#### 1. `QueryExpression._left` → `QueryExpression._joins` + +Replace `_left: List[bool]` with `_joins: List[Tuple[bool, List[str]]]` + +Each join stores: +- `left`: Whether it's a left join +- `using_attrs`: Non-hidden common attributes to join on + +```python +# Before +result._left = self._left + [left] + other._left + +# After +join_attributes = [n for n in self.heading.names if n in other.heading.names] +result._joins = self._joins + [(left, join_attributes)] + other._joins +``` + +#### 2. `heading.names` (existing behavior) + +Already filters out hidden attributes: +```python +@property +def names(self): + return [k for k in self.attributes] # attributes excludes is_hidden=True +``` + +This ensures join attribute computation automatically excludes hidden attributes. + +### Behavior Summary + +| Scenario | Hidden Attributes | Result | +|----------|-------------------|--------| +| `A * B` (join) | Same hidden attr in both | NOT matched - excluded from USING | +| `A & B` (semijoin) | Same hidden attr in both | NOT matched | +| `A - B` (antijoin) | Same hidden attr in both | NOT matched | +| `A.proj()` | Hidden attrs in A | NOT projected (unless explicitly named) | +| `A.fetch()` | Hidden attrs in A | NOT returned by default | + +## Implementation Details + +### 1. Declaration (declare.py) + +```python +def declare(full_table_name, definition, context): + # ... existing code ... + + # Add hidden job metadata for auto-populated tables + if config.jobs.add_job_metadata and table_tier in (TableTier.COMPUTED, TableTier.IMPORTED): + # Only for master tables, not parts + if not is_part_table: + job_metadata_sql = [ + "`_job_start_time` datetime(3) DEFAULT NULL", + "`_job_duration` float DEFAULT NULL", + "`_job_version` varchar(64) DEFAULT ''", + ] + attribute_sql.extend(job_metadata_sql) +``` + +### 2. Population (autopopulate.py) + +```python +def _populate1(self, key, callback, use_jobs, jobs): + start_time = datetime.now() + version = _get_job_version() + + # ... call make() ... + + duration = time.time() - start_time.timestamp() + + # Update job metadata if table has the hidden attributes + if self._has_job_metadata_attrs(): + self._update_job_metadata( + key, + start_time=start_time, + duration=duration, + version=version + ) + +def _has_job_metadata_attrs(self): + """Check if table has hidden job metadata columns.""" + hidden_attrs = self.heading._attributes # includes hidden + return '_job_start_time' in hidden_attrs + +def _update_job_metadata(self, key, start_time, duration, version): + """Update hidden job metadata for the given key.""" + # UPDATE using primary key + pk_condition = make_condition(self, key, set()) + self.connection.query( + f"UPDATE {self.full_table_name} SET " + f"`_job_start_time`=%s, `_job_duration`=%s, `_job_version`=%s " + f"WHERE {pk_condition}", + args=(start_time, duration, version[:64]) + ) +``` + +### 3. Job table (jobs.py) + +Update version field length: +```python +version="" : varchar(64) +``` + +### 4. Version helper + +```python +def _get_job_version() -> str: + """Get version string, truncated to 64 chars.""" + from .settings import config + + method = config.jobs.version_method + if method is None or method == "none": + return "" + elif method == "git": + try: + result = subprocess.run( + ["git", "rev-parse", "--short", "HEAD"], + capture_output=True, + text=True, + timeout=5, + ) + return result.stdout.strip()[:64] if result.returncode == 0 else "" + except Exception: + return "" + return "" +``` + +## Example Usage + +```python +# Enable job metadata for new tables +dj.config.jobs.add_job_metadata = True + +@schema +class ProcessedData(dj.Computed): + definition = """ + -> RawData + --- + result : float + """ + + def make(self, key): + # User code - unaware of hidden attributes + self.insert1({**key, 'result': compute(key)}) + +# Job metadata automatically added and populated: +# _job_start_time, _job_duration, _job_version + +# User-facing API unaffected: +ProcessedData().heading.names # ['raw_data_id', 'result'] +ProcessedData().fetch() # Returns only visible attributes + +# Access hidden attributes explicitly if needed: +ProcessedData().fetch('_job_start_time', '_job_duration', '_job_version') +``` + +## Summary of Design Decisions + +| Decision | Resolution | +|----------|------------| +| Configuration | `config.jobs.add_job_metadata` (default False) | +| Environment variable | `DJ_JOBS_ADD_JOB_METADATA` | +| Existing tables | No automatic ALTER - silently skip metadata if columns absent | +| Retrofitting | Manual via `datajoint.migrate.add_job_metadata_columns()` utility | +| Populate modes | Record metadata in both direct and distributed modes | +| Part tables | No metadata columns - only master tables | +| Version length | varchar(64) in both jobs table and computed tables | +| Binary operators | Hidden attributes excluded via USING clause instead of NATURAL JOIN | +| Failed makes | N/A - transaction rolls back, no rows to update | diff --git a/specs/autopopulate-1.0.md b/specs/autopopulate-1.0.md new file mode 100644 index 000000000..44d5a0b1e --- /dev/null +++ b/specs/autopopulate-1.0.md @@ -0,0 +1,622 @@ +# AutoPopulate 1.0 Specification + +This document describes the legacy AutoPopulate system in DataJoint Python, documenting how automated computation pipelines work. This specification serves as a reference for the system being replaced by AutoPopulate 2.0. + +## Overview + +AutoPopulate is a mixin class that adds the `populate()` method to a Table class. Auto-populated tables inherit from both `Table` and `AutoPopulate`, define the `key_source` property, and implement the `make` callback method. + +**Source Files:** +- `src/datajoint/autopopulate.py` - Main AutoPopulate mixin +- `src/datajoint/jobs.py` - Job reservation table +- `src/datajoint/schemas.py` - Schema class with jobs property + +## Key Characteristics (1.0 vs 2.0) + +| Aspect | AutoPopulate 1.0 | AutoPopulate 2.0 | +|--------|------------------|------------------| +| **Jobs table scope** | Schema-level (`~jobs`) | Per-table (`~table__jobs`) | +| **Primary key** | `(table_name, key_hash)` | FK-derived attributes only | +| **Key storage** | MD5 hash + pickled blob | Native column values | +| **Status values** | `reserved`, `error`, `ignore` | `pending`, `reserved`, `success`, `error`, `ignore` | +| **Pending tracking** | None (computed on-the-fly) | Explicit `pending` status | +| **Priority** | None | Integer priority (lower = more urgent) | +| **Scheduling** | None | `scheduled_time` for delayed execution | +| **Duration tracking** | None | `duration` in seconds | +| **Code version** | None | `version` field | +| **`schema.jobs`** | Single `JobTable` | List of per-table `JobsTable` objects | +| **Job refresh** | None | `refresh()` syncs with `key_source` | + +## 1. Key Source Generation + +### Default Behavior + +The `key_source` property returns a `QueryExpression` yielding primary key values to be passed to `make()`. + +**Default implementation** (`autopopulate.py:59-83`): +1. Fetch all primary parent tables via `self.target.parents(primary=True, as_objects=True, foreign_key_info=True)` +2. Handle aliased attributes by projecting with renamed columns +3. Join all parent tables using the `*` operator (natural join) + +```python +@property +def key_source(self): + def _rename_attributes(table, props): + return ( + table.proj(**{attr: ref for attr, ref in props["attr_map"].items() if attr != ref}) + if props["aliased"] + else table.proj() + ) + + if self._key_source is None: + parents = self.target.parents(primary=True, as_objects=True, foreign_key_info=True) + if not parents: + raise DataJointError( + "A table must have dependencies from its primary key for auto-populate to work" + ) + self._key_source = _rename_attributes(*parents[0]) + for q in parents[1:]: + self._key_source *= _rename_attributes(*q) + return self._key_source +``` + +### Custom Key Source + +Subclasses may override `key_source` to change the scope or granularity of `make()` calls. + +### Jobs To Do Computation + +The `_jobs_to_do()` method (`autopopulate.py:171-197`): +1. Validates `key_source` is a `QueryExpression` +2. Verifies target table has all primary key attributes from `key_source` +3. Applies restrictions via `AndList` +4. Projects to primary key attributes only + +```python +def _jobs_to_do(self, restrictions): + todo = self.key_source + # ... validation ... + return (todo & AndList(restrictions)).proj() +``` + +The actual keys to populate are computed as: +```python +keys = (self._jobs_to_do(restrictions) - self.target).fetch("KEY", limit=limit) +``` + +This subtracts already-populated keys from the todo list. + +## 2. Job Table Creation and Management + +### Schema-Level Job Tables + +Each schema has its own job reservation table named `~jobs`. The job table is created lazily when first accessed. + +**Schema.jobs property** (`schemas.py:367-377`): +```python +@property +def jobs(self): + """ + schema.jobs provides a view of the job reservation table for the schema + """ + self._assert_exists() + if self._jobs is None: + self._jobs = JobTable(self.connection, self.database) + return self._jobs +``` + +### JobTable Initialization + +**JobTable.__init__** (`jobs.py:18-40`): +```python +def __init__(self, conn, database): + self.database = database + self._connection = conn + self._heading = Heading(table_info=dict( + conn=conn, database=database, table_name=self.table_name, context=None + )) + self._support = [self.full_table_name] + + self._definition = """ # job reservation table for `{database}` + table_name :varchar(255) # className of the table + key_hash :char(32) # key hash + --- + status :enum('reserved','error','ignore') + key=null : # structure containing the key + error_message="" :varchar({error_message_length}) + error_stack=null : # error stack if failed + user="" :varchar(255) + host="" :varchar(255) + pid=0 :int unsigned + connection_id = 0 : bigint unsigned + timestamp=CURRENT_TIMESTAMP :timestamp + """.format(database=database, error_message_length=ERROR_MESSAGE_LENGTH) + if not self.is_declared: + self.declare() + self._user = self.connection.get_user() +``` + +The `~jobs` table is automatically declared (created) if it doesn't exist when the `JobTable` is instantiated. + +### Schema Registration + +When a schema is activated, it registers itself with the connection (`schemas.py:136`): +```python +self.connection.register(self) +``` + +**Connection.register** (`connection.py:222-224`): +```python +def register(self, schema): + self.schemas[schema.database] = schema + self.dependencies.clear() +``` + +This allows `populate()` to access the jobs table via: +```python +jobs = self.connection.schemas[self.target.database].jobs +``` + +### Job Table Name + +The job table uses a special name prefixed with `~` (`jobs.py:47-48`): +```python +@property +def table_name(self): + return "~jobs" +``` + +Tables prefixed with `~` are system tables excluded from `schema.list_tables()`. + +## 3. Job Reservation System + +### Job Table Structure + +The `~jobs` table (`jobs.py:24-37`) stores job reservations: + +| Attribute | Type | Description | +|-----------|------|-------------| +| `table_name` | varchar(255) | Full table name (`database.table_name`) | +| `key_hash` | char(32) | MD5 hash of primary key dict | +| `status` | enum | `'reserved'`, `'error'`, or `'ignore'` | +| `key` | blob | Pickled key dict | +| `error_message` | varchar(2047) | Truncated error message | +| `error_stack` | blob | Full stack trace | +| `user` | varchar(255) | Database user | +| `host` | varchar(255) | System hostname | +| `pid` | int unsigned | Process ID | +| `connection_id` | bigint unsigned | MySQL connection ID | +| `timestamp` | timestamp | Automatic timestamp | + +### Reservation Flow + +**Reserve** (`jobs.py:58-81`): +```python +def reserve(self, table_name, key): + job = dict( + table_name=table_name, + key_hash=key_hash(key), + status="reserved", + host=platform.node(), + pid=os.getpid(), + connection_id=self.connection.connection_id, + key=key, + user=self._user, + ) + try: + self.insert1(job, ignore_extra_fields=True) + except DuplicateError: + return False + return True +``` + +Atomicity is guaranteed by MySQL's unique constraint on `(table_name, key_hash)`. + +**Complete** (`jobs.py:113-121`): +```python +def complete(self, table_name, key): + job_key = dict(table_name=table_name, key_hash=key_hash(key)) + (self & job_key).delete_quick() +``` + +**Error** (`jobs.py:123-150`): +```python +def error(self, table_name, key, error_message, error_stack=None): + if len(error_message) > ERROR_MESSAGE_LENGTH: + error_message = error_message[:ERROR_MESSAGE_LENGTH - len(TRUNCATION_APPENDIX)] + TRUNCATION_APPENDIX + self.insert1( + dict( + table_name=table_name, + key_hash=key_hash(key), + status="error", + # ... metadata ... + error_message=error_message, + error_stack=error_stack, + ), + replace=True, + ) +``` + +**Ignore** (`jobs.py:83-111`): +```python +def ignore(self, table_name, key): + job = dict( + table_name=table_name, + key_hash=key_hash(key), + status="ignore", + # ... metadata ... + ) + try: + self.insert1(job, ignore_extra_fields=True) + except DuplicateError: + return False + return True +``` + +### Job Filtering in Populate + +Before populating, keys with existing job entries are excluded (`autopopulate.py:257-261`): +```python +if reserve_jobs: + exclude_key_hashes = ( + jobs & {"table_name": self.target.table_name} & 'status in ("error", "ignore", "reserved")' + ).fetch("key_hash") + keys = [key for key in keys if key_hash(key) not in exclude_key_hashes] +``` + +### Job Table Maintenance + +The `JobTable` class provides simplified `delete()` and `drop()` methods (`jobs.py:50-56`): +```python +def delete(self): + """bypass interactive prompts and dependencies""" + self.delete_quick() + +def drop(self): + """bypass interactive prompts and dependencies""" + self.drop_quick() +``` + +These bypass normal safety prompts since the jobs table is a system table. + +## 4. Make Method Invocation + +### Make Method Contract + +The `make(key)` method must perform three steps: +1. **Fetch**: Retrieve data from parent tables, restricted by key +2. **Compute**: Calculate secondary attributes from fetched data +3. **Insert**: Insert new tuple(s) into the target table + +### Two Implementation Patterns + +#### Pattern A: Regular Method + +All three steps execute within a single database transaction. + +**Execution flow** (`autopopulate.py:340-355`): +```python +if not is_generator: + self.connection.start_transaction() + # ... key existence check ... + make(dict(key), **(make_kwargs or {})) +``` + +#### Pattern B: Generator (Tripartite) Method + +Separates computation from transaction to allow long-running computation outside the transaction window. + +**Required methods**: +- `make_fetch(key)` - All database queries +- `make_compute(key, *fetched_data)` - All computation +- `make_insert(key, *computed_result)` - All inserts + +**Default generator implementation** (`autopopulate.py:140-152`): +```python +def make(self, key): + fetched_data = self.make_fetch(key) + computed_result = yield fetched_data + + if computed_result is None: + computed_result = self.make_compute(key, *fetched_data) + yield computed_result + + self.make_insert(key, *computed_result) + yield +``` + +**Execution flow** (`autopopulate.py:356-370`): +```python +# Phase 1: Fetch and compute OUTSIDE transaction +gen = make(dict(key), **(make_kwargs or {})) +fetched_data = next(gen) +fetch_hash = deepdiff.DeepHash(fetched_data, ignore_iterable_order=False)[fetched_data] +computed_result = next(gen) + +# Phase 2: Verify and insert INSIDE transaction +self.connection.start_transaction() +gen = make(dict(key), **(make_kwargs or {})) # restart +fetched_data = next(gen) +if fetch_hash != deepdiff.DeepHash(fetched_data, ignore_iterable_order=False)[fetched_data]: + raise DataJointError("Referential integrity failed! The `make_fetch` data has changed") +gen.send(computed_result) # insert +``` + +The deep hash comparison ensures data integrity by detecting concurrent modifications. + +### Legacy Support + +The legacy `_make_tuples` method name is supported (`autopopulate.py:333`): +```python +make = self._make_tuples if hasattr(self, "_make_tuples") else self.make +``` + +### Insert Protection + +Direct inserts into auto-populated tables are blocked outside `make()` (`autopopulate.py:351, 402`): +```python +self.__class__._allow_insert = True +try: + # ... make() execution ... +finally: + self.__class__._allow_insert = False +``` + +The `Table.insert()` method checks this flag and raises `DataJointError` if insert is attempted outside the populate context (unless `allow_direct_insert=True`). + +## 5. Transaction Management + +### Transaction Lifecycle + +**Start** (`connection.py:322-327`): +```python +def start_transaction(self): + if self.in_transaction: + raise DataJointError("Nested transactions are not supported.") + self.query("START TRANSACTION WITH CONSISTENT SNAPSHOT") + self._in_transaction = True +``` + +Uses MySQL's `WITH CONSISTENT SNAPSHOT` for repeatable read isolation. + +**Commit** (`connection.py:337-343`): +```python +def commit_transaction(self): + self.query("COMMIT") + self._in_transaction = False +``` + +**Cancel/Rollback** (`connection.py:329-335`): +```python +def cancel_transaction(self): + self.query("ROLLBACK") + self._in_transaction = False +``` + +### Transaction Rules + +1. **No nested transactions** - `populate()` cannot be called during an existing transaction (`autopopulate.py:237-238`) +2. **Regular make**: Transaction spans entire `make()` execution +3. **Generator make**: Transaction spans only the final fetch verification and insert phase + +## 6. Error Management + +### Error Handling Flow + +(`autopopulate.py:372-402`): + +```python +try: + # ... make() execution ... +except (KeyboardInterrupt, SystemExit, Exception) as error: + try: + self.connection.cancel_transaction() + except LostConnectionError: + pass # Connection lost during rollback + + error_message = "{exception}{msg}".format( + exception=error.__class__.__name__, + msg=": " + str(error) if str(error) else "", + ) + + if jobs is not None: + jobs.error( + self.target.table_name, + self._job_key(key), + error_message=error_message, + error_stack=traceback.format_exc(), + ) + + if not suppress_errors or isinstance(error, SystemExit): + raise + else: + logger.error(error) + return key, error if return_exception_objects else error_message +else: + self.connection.commit_transaction() + if jobs is not None: + jobs.complete(self.target.table_name, self._job_key(key)) + return True +``` + +### Error Suppression + +When `suppress_errors=True`: +- Errors are logged to the jobs table +- Errors are collected and returned instead of raised +- `SystemExit` is never suppressed (for graceful SIGTERM handling) + +### SIGTERM Handling + +When `reserve_jobs=True`, a SIGTERM handler is installed (`autopopulate.py:245-251`): +```python +def handler(signum, frame): + logger.info("Populate terminated by SIGTERM") + raise SystemExit("SIGTERM received") + +old_handler = signal.signal(signal.SIGTERM, handler) +``` + +This allows graceful termination of long-running populate jobs. + +## 7. Populate Method Interface + +### Full Signature + +```python +def populate( + self, + *restrictions, + keys=None, + suppress_errors=False, + return_exception_objects=False, + reserve_jobs=False, + order="original", + limit=None, + max_calls=None, + display_progress=False, + processes=1, + make_kwargs=None, +): +``` + +### Parameters + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `*restrictions` | various | - | Restrictions AND-ed to filter `key_source` | +| `keys` | list[dict] | None | Explicit keys to populate (bypasses `key_source`) | +| `suppress_errors` | bool | False | Collect errors instead of raising | +| `return_exception_objects` | bool | False | Return exception objects vs. strings | +| `reserve_jobs` | bool | False | Enable job reservation for distributed processing | +| `order` | str | "original" | Key order: "original", "reverse", "random" | +| `limit` | int | None | Max keys to fetch from `key_source` | +| `max_calls` | int | None | Max `make()` calls to execute | +| `display_progress` | bool | False | Show progress bar | +| `processes` | int | 1 | Number of worker processes | +| `make_kwargs` | dict | None | Non-computation kwargs passed to `make()` | + +### Return Value + +```python +{ + "success_count": int, # Number of successful make() calls + "error_list": list, # List of (key, error) tuples if suppress_errors=True +} +``` + +## 8. Multiprocessing Support + +### Process Initialization + +(`autopopulate.py:27-36`): +```python +def _initialize_populate(table, jobs, populate_kwargs): + process = mp.current_process() + process.table = table + process.jobs = jobs + process.populate_kwargs = populate_kwargs + table.connection.connect() # reconnect +``` + +### Connection Handling + +Before forking (`autopopulate.py:296-297`): +```python +self.connection.close() # Disconnect parent +del self.connection._conn.ctx # SSLContext not pickleable +``` + +After workers complete (`autopopulate.py:311`): +```python +self.connection.connect() # Reconnect parent +``` + +### Worker Execution + +```python +def _call_populate1(key): + process = mp.current_process() + return process.table._populate1(key, process.jobs, **process.populate_kwargs) +``` + +Uses `Pool.imap()` with `chunksize=1` for ordered execution with progress tracking. + +## 9. Return Values from _populate1 + +| Value | Meaning | +|-------|---------| +| `True` | Successfully completed `make()` and inserted data | +| `False` | Key already exists in target OR job reservation failed | +| `(key, error)` | Error occurred (when `suppress_errors=True`) | + +## 10. Key Observations + +### Strengths + +1. **Atomic job reservation** via MySQL unique constraints +2. **Generator pattern** allows long computation outside transactions +3. **Deep hash verification** ensures data consistency +4. **Graceful shutdown** via SIGTERM handling +5. **Error persistence** in jobs table for debugging +6. **Per-schema job tables** allow independent job management + +### Limitations (Addressed in 2.0) + +The following limitations are documented in GitHub issue [#1258](https://github.com/datajoint/datajoint-python/issues/1258) and related issues. + +#### Job Table Design Issues + +1. **Limited status tracking**: Only `reserved`, `error`, and `ignore` statuses. No explicit tracking of pending jobs or successful completions. + +2. **Functions as error log**: Cannot track pending or completed jobs efficiently. Finding pending jobs requires computing `key_source - target - jobs` each time. + +3. **Poor dashboard visibility**: No way to monitor pipeline progress without querying multiple tables and computing set differences. See [#873](https://github.com/datajoint/datajoint-python/issues/873). + +4. **Key hashing obscures data**: Primary keys stored as 32-character MD5 hashes. Actual keys stored as pickled blobs requiring deserialization to inspect. + +5. **No referential integrity**: Jobs table is independent of computed tables. Orphaned jobs accumulate when upstream data is deleted. + +6. **Schema-level scope**: All computed tables share one jobs table. Filtering by `table_name` required for all operations. + +#### Key Source Issues + +1. **Frequent manual modifications**: Subset operations require modifying `key_source` in Python code. No database-level persistence. + +2. **Local visibility only**: Custom key sources not accessible database-wide. See discussion in [#1258](https://github.com/datajoint/datajoint-python/issues/1258). + +3. **Performance bottleneck**: Multiple workers querying `key_source` simultaneously strains database. See [#749](https://github.com/datajoint/datajoint-python/issues/749). + +4. **Codebase dependency**: Requires full pipeline codebase to determine pending work. Cannot query job status from SQL alone. + +#### Missing Features + +1. **No priority system**: Jobs processed in fetch order only (original, reverse, random). + +2. **No scheduling**: Cannot delay job execution to a future time. + +3. **No duration tracking**: No record of how long jobs take to complete. + +4. **No version tracking**: No record of which code version processed a job. + +5. **Simple retry logic**: Failed jobs stay in `error` status until manually cleared. + +6. **No stale job cleanup**: Jobs referencing deleted upstream data remain indefinitely. + +7. **No orphaned job handling**: Reserved jobs from crashed workers remain forever. See [#665](https://github.com/datajoint/datajoint-python/issues/665). + +#### Populate Parameter Confusion + +The `limit` vs `max_calls` parameters have confusing behavior. See [#1203](https://github.com/datajoint/datajoint-python/issues/1203): +- `limit`: Applied before excluding reserved/error jobs (can result in no work even when jobs available) +- `max_calls`: Applied after excluding reserved/error jobs (usually what users expect) + +## 11. Related GitHub Issues + +| Issue | Title | Status | +|-------|-------|--------| +| [#1258](https://github.com/datajoint/datajoint-python/issues/1258) | FEAT: Autopopulate 2.0 | Open | +| [#1203](https://github.com/datajoint/datajoint-python/issues/1203) | Unexpected behaviour of `limit` in populate() | Open | +| [#749](https://github.com/datajoint/datajoint-python/issues/749) | Strain on MySQL with expensive key-source | Closed | +| [#873](https://github.com/datajoint/datajoint-python/issues/873) | Provide way to list specific jobs | Closed | +| [#665](https://github.com/datajoint/datajoint-python/issues/665) | Cluster support - machine failures | Closed | diff --git a/specs/autopopulate-2.0-implementation.md b/specs/autopopulate-2.0-implementation.md new file mode 100644 index 000000000..15960a202 --- /dev/null +++ b/specs/autopopulate-2.0-implementation.md @@ -0,0 +1,458 @@ +# AutoPopulate 2.0 Implementation Plan + +This document outlines the implementation steps for AutoPopulate 2.0 based on the specification in `docs/src/compute/autopopulate2.0-spec.md`. + +## Overview + +The implementation involves changes to these files: +- `src/datajoint/jobs.py` - New `JobsTable` class (per-table jobs) +- `src/datajoint/autopopulate.py` - Updated `AutoPopulate` mixin +- `src/datajoint/user_tables.py` - FK-only PK constraint for Computed/Imported +- `src/datajoint/schemas.py` - Updated `schema.jobs` property +- `src/datajoint/settings.py` - New configuration options + +## Table Naming Convention + +Jobs tables use the `~~` prefix (double tilde): + +| Table Type | Example Class | MySQL Table Name | +|------------|---------------|------------------| +| Manual | `Subject` | `subject` | +| Lookup | `#Method` | `#method` | +| Imported | `_Recording` | `_recording` | +| Computed | `__Analysis` | `__analysis` | +| Hidden | `~jobs` | `~jobs` | +| **Jobs (new)** | N/A | `~~analysis` | + +The `~~` prefix: +- Distinguishes from single-tilde hidden tables (`~jobs`, `~lineage`) +- Shorter than suffix-based naming +- Excluded from `list_tables()` (tables starting with `~`) + +## Execution Modes + +AutoPopulate 2.0 supports two execution modes, both equally valid: + +### Direct Mode (`reserve_jobs=False`, default) + +Best for: +- Early development and debugging +- Single-worker execution +- Simple pipelines without distributed computing +- Interactive exploration + +Behavior: +- Computes `(key_source & restrictions) - self` directly +- No jobs table involvement +- No coordination overhead + +### Distributed Mode (`reserve_jobs=True`) + +Best for: +- Multi-worker parallel processing +- Production pipelines with monitoring +- Job prioritization and scheduling +- Error tracking and retry workflows + +Behavior: +- Uses per-table jobs table for coordination +- Supports priority, scheduling, status tracking +- Enables dashboard monitoring + +## Phase 1: JobsTable Class + +### 1.1 Create JobsTable Class + +**File**: `src/datajoint/jobs.py` + +```python +class JobsTable(Table): + """Hidden table managing job queue for an auto-populated table.""" + + _prefix = "~~" + + def __init__(self, target_table): + """ + Initialize jobs table for an auto-populated table. + + Args: + target_table: The Computed/Imported table instance + """ + self._target_class = target_table.__class__ + self._connection = target_table.connection + self.database = target_table.database + self._definition = self._generate_definition(target_table) + + @property + def table_name(self): + """Jobs table name: ~~base_name""" + target_name = self._target_class.table_name + base_name = target_name.lstrip('_') + return f"~~{base_name}" +``` + +### 1.2 Core Methods + +```python +def refresh( + self, + *restrictions, + delay: float = 0, + priority: int = None, + stale_timeout: float = None, + orphan_timeout: float = None +) -> dict: + """ + Refresh jobs queue: add new, remove stale, handle orphans. + + Args: + restrictions: Filter key_source when adding new jobs + delay: Seconds until new jobs become available + priority: Priority for new jobs (lower = more urgent) + stale_timeout: Remove jobs older than this if key not in key_source + orphan_timeout: Reset reserved jobs older than this to pending + + Returns: + {'added': int, 'removed': int, 'orphaned': int, 're_pended': int} + """ + +def reserve(self, key: dict) -> bool: + """ + Reserve a pending job for processing. + + Returns True if reservation successful, False if job not available. + """ + +def complete(self, key: dict, duration: float = None) -> None: + """Mark job as completed (success or delete based on config).""" + +def error(self, key: dict, error_message: str, error_stack: str = None) -> None: + """Mark job as failed with error details.""" + +def ignore(self, key: dict) -> None: + """Mark job to be skipped during populate.""" + +def progress(self) -> dict: + """Return job status breakdown.""" +``` + +### 1.3 Status Properties + +```python +@property +def pending(self) -> QueryExpression: + return self & 'status="pending"' + +@property +def reserved(self) -> QueryExpression: + return self & 'status="reserved"' + +@property +def errors(self) -> QueryExpression: + return self & 'status="error"' + +@property +def ignored(self) -> QueryExpression: + return self & 'status="ignore"' + +@property +def completed(self) -> QueryExpression: + return self & 'status="success"' +``` + +### 1.4 Definition Generation + +```python +def _generate_definition(self, target_table): + """Build jobs table definition from target's FK-derived primary key.""" + fk_attrs = self._get_fk_derived_pk_attrs(target_table) + pk_lines = "\n ".join(f"{name} : {dtype}" for name, dtype in fk_attrs) + + return f""" + # Job queue for {target_table.full_table_name} + {pk_lines} + --- + status : enum('pending', 'reserved', 'success', 'error', 'ignore') + priority : uint8 # Set by refresh(), default from config + created_time=CURRENT_TIMESTAMP : timestamp + scheduled_time=CURRENT_TIMESTAMP : timestamp + reserved_time=null : timestamp + completed_time=null : timestamp + duration=null : float64 + error_message="" : varchar(2047) + error_stack=null : + user="" : varchar(255) + host="" : varchar(255) + pid=0 : uint32 + connection_id=0 : uint64 + version="" : varchar(255) + """ +``` + +## Phase 2: FK-Only Primary Key Constraint + +### 2.1 Validation for New Tables + +**File**: `src/datajoint/user_tables.py` + +New auto-populated tables must have FK-only primary keys: + +```python +@classmethod +def _validate_pk_constraint(cls): + """Enforce FK-only PK for new auto-populated tables.""" + if cls.is_declared: + return # Skip validation for existing tables + + heading = cls.heading + non_fk_pk = [ + name for name in heading.primary_key + if not heading[name].is_foreign_key + ] + if non_fk_pk: + raise DataJointError( + f"Auto-populated table {cls.__name__} has non-FK primary key " + f"attributes: {non_fk_pk}. Move these to secondary attributes " + f"or reference a lookup table." + ) +``` + +### 2.2 Legacy Table Support + +Existing tables with non-FK PK attributes continue to work: +- Jobs table uses only FK-derived attributes +- Warning logged about degraded granularity +- One job may cover multiple target rows + +## Phase 3: AutoPopulate Mixin Updates + +### 3.1 Add `jobs` Property + +**File**: `src/datajoint/autopopulate.py` + +```python +class AutoPopulate: + _jobs_table = None + + @property + def jobs(self): + """Access the jobs table for this auto-populated table.""" + if self._jobs_table is None: + self._jobs_table = JobsTable(self) + if not self._jobs_table.is_declared: + self._jobs_table.declare() + return self._jobs_table +``` + +### 3.2 Update `populate()` Signature + +```python +def populate( + self, + *restrictions, + suppress_errors: bool = False, + return_exception_objects: bool = False, + reserve_jobs: bool = False, + max_calls: int = None, + display_progress: bool = False, + processes: int = 1, + make_kwargs: dict = None, + priority: int = None, + refresh: bool = None, +) -> dict: +``` + +### 3.3 Execution Path Selection + +```python +def populate(self, *restrictions, reserve_jobs=False, **kwargs): + if self.connection.in_transaction: + raise DataJointError("Populate cannot be called during a transaction.") + + if reserve_jobs: + return self._populate_distributed(*restrictions, **kwargs) + else: + return self._populate_direct(*restrictions, **kwargs) +``` + +### 3.4 Direct Mode Implementation + +```python +def _populate_direct(self, *restrictions, max_calls=None, suppress_errors=False, ...): + """ + Populate without jobs table coordination. + + Computes keys directly from key_source, suitable for single-worker + execution, development, and debugging. + """ + keys = (self.key_source & AndList(restrictions)) - self + keys = keys.fetch('KEY', limit=max_calls) + + success_count = 0 + error_list = [] + + for key in tqdm(keys, disable=not display_progress): + result = self._populate1(key, jobs=None, suppress_errors=suppress_errors, ...) + # ... handle result +``` + +### 3.5 Distributed Mode Implementation + +```python +def _populate_distributed(self, *restrictions, refresh=None, priority=None, max_calls=None, ...): + """ + Populate with jobs table coordination. + + Uses jobs table for multi-worker coordination, priority scheduling, + and status tracking. + """ + # Refresh if configured + if refresh is None: + refresh = config['jobs.auto_refresh'] + if refresh: + self.jobs.refresh(*restrictions, priority=priority) + + # Fetch pending jobs + pending = ( + self.jobs.pending & 'scheduled_time <= NOW()' + ).fetch('KEY', order_by='priority ASC, scheduled_time ASC', limit=max_calls) + + success_count = 0 + error_list = [] + + for key in tqdm(pending, disable=not display_progress): + if not self.jobs.reserve(key): + continue # Already reserved by another worker + + start_time = time.time() + try: + self._call_make(key, ...) + duration = time.time() - start_time + self.jobs.complete(key, duration=duration) + success_count += 1 + except Exception as e: + self.connection.cancel_transaction() + self.jobs.error(key, str(e), traceback.format_exc()) + if not suppress_errors: + raise + error_list.append((key, e)) + + return {'success_count': success_count, 'error_list': error_list} +``` + +## Phase 4: Schema Updates + +### 4.1 Update `schema.jobs` Property + +**File**: `src/datajoint/schemas.py` + +```python +@property +def jobs(self): + """ + Return list of JobsTable objects for all auto-populated tables. + + Returns: + List[JobsTable]: Jobs tables for Computed/Imported tables in schema + """ + from .jobs import JobsTable + + jobs_tables = [] + for table_name in self.list_tables(): + table_class = self(table_name) + if hasattr(table_class, 'jobs'): + jobs_tables.append(table_class.jobs) + return jobs_tables +``` + +### 4.2 Exclude `~~` from `list_tables()` + +Already handled - tables starting with `~` are excluded. + +## Phase 5: Configuration + +### 5.1 Add Config Options + +**File**: `src/datajoint/settings.py` + +```python +DEFAULTS = { + 'jobs.auto_refresh': True, + 'jobs.keep_completed': False, + 'jobs.stale_timeout': 3600, + 'jobs.default_priority': 5, + 'jobs.version': None, +} +``` + +### 5.2 Version Helper + +```python +def get_job_version() -> str: + """Get version string based on config.""" + version = config['jobs.version'] + if version == 'git': + try: + result = subprocess.run( + ['git', 'rev-parse', '--short', 'HEAD'], + capture_output=True, text=True, timeout=5 + ) + return result.stdout.strip() if result.returncode == 0 else '' + except Exception: + return '' + return version or '' +``` + +## Phase 6: Table Lifecycle + +### 6.1 Drop Jobs Table with Target + +When an auto-populated table is dropped, its jobs table is also dropped: + +```python +def drop(self): + if hasattr(self, '_jobs_table') and self._jobs_table is not None: + if self._jobs_table.is_declared: + self._jobs_table.drop_quick() + # ... existing drop logic +``` + +## Phase 7: Update Spec + +Update `docs/src/compute/autopopulate2.0-spec.md`: +- Change `~table__jobs` references to `~~table` +- Update table naming section + +## Implementation Order + +1. **Phase 5**: Configuration (foundation) +2. **Phase 1**: JobsTable class +3. **Phase 2**: FK-only PK constraint +4. **Phase 3**: AutoPopulate updates +5. **Phase 4**: Schema.jobs property +6. **Phase 6**: Table lifecycle +7. **Phase 7**: Spec update +8. **Testing**: Throughout + +## Testing Strategy + +### Unit Tests +- `test_jobs_table_naming` - `~~` prefix +- `test_jobs_definition_generation` - FK-derived PK +- `test_refresh_operations` - add/remove/orphan/repend +- `test_reserve_complete_error_flow` - job lifecycle +- `test_progress_counts` - status aggregation + +### Integration Tests +- `test_populate_direct_mode` - without jobs table +- `test_populate_distributed_mode` - with jobs table +- `test_multiprocess_populate` - concurrent workers +- `test_legacy_table_support` - non-FK PK tables +- `test_schema_jobs_property` - list of jobs tables + +## Migration Notes + +- Legacy `~jobs` table is NOT auto-deleted +- New `~~` tables created on first access to `.jobs` +- Both can coexist during transition +- Manual cleanup of legacy `~jobs` when ready diff --git a/src/datajoint/autopopulate.py b/src/datajoint/autopopulate.py index 677a8113c..c8fd58a2c 100644 --- a/src/datajoint/autopopulate.py +++ b/src/datajoint/autopopulate.py @@ -5,7 +5,6 @@ import inspect import logging import multiprocessing as mp -import random import signal import traceback @@ -14,7 +13,6 @@ from .errors import DataJointError, LostConnectionError from .expression import AndList, QueryExpression -from .hash import key_hash # noinspection PyExceptionInherit,PyCallingNonCallable @@ -55,6 +53,61 @@ class AutoPopulate: _key_source = None _allow_insert = False + _jobs = None + + @property + def jobs(self): + """ + Access the job table for this auto-populated table. + + The job table (~~table_name) is created lazily on first access. + It tracks job status, priority, scheduling, and error information + for distributed populate operations. + + :return: Job object for this table + """ + if self._jobs is None: + from .jobs import Job + + self._jobs = Job(self) + if not self._jobs.is_declared: + self._jobs.declare() + return self._jobs + + def _declare_check(self, primary_key, fk_attribute_map): + """ + Validate FK-only primary key constraint for auto-populated tables. + + Auto-populated tables (Computed/Imported) must derive all primary key + attributes from foreign key references. This ensures proper job granularity + for distributed populate operations. + + This validation can be bypassed by setting: + dj.config.jobs.allow_new_pk_fields_in_computed_tables = True + + :param primary_key: list of primary key attribute names + :param fk_attribute_map: dict mapping child_attr -> (parent_table, parent_attr) + :raises DataJointError: if native PK attributes are found (unless bypassed) + """ + from .settings import config + + # Check if validation is bypassed + if config.jobs.allow_new_pk_fields_in_computed_tables: + return + + # Check for native (non-FK) primary key attributes + native_pk_attrs = [attr for attr in primary_key if attr not in fk_attribute_map] + + if native_pk_attrs: + raise DataJointError( + f"Auto-populated table `{self.full_table_name}` has non-FK primary key " + f"attribute(s): {', '.join(native_pk_attrs)}. " + f"Computed and Imported tables must derive all primary key attributes " + f"from foreign key references. The make() method is called once per entity " + f"(row) in the table. If you need to compute multiple entities per job, " + f"define a Part table to store them. " + f"To bypass this restriction, set: dj.config.jobs.allow_new_pk_fields_in_computed_tables = True" + ) @property def key_source(self): @@ -74,7 +127,7 @@ def _rename_attributes(table, props): ) if self._key_source is None: - parents = self.target.parents(primary=True, as_objects=True, foreign_key_info=True) + parents = self.parents(primary=True, as_objects=True, foreign_key_info=True) if not parents: raise DataJointError("A table must have dependencies from its primary key for auto-populate to work") self._key_source = _rename_attributes(*parents[0]) @@ -151,23 +204,6 @@ def make(self, key): self.make_insert(key, *computed_result) yield - @property - def target(self): - """ - :return: table to be populated. - In the typical case, dj.AutoPopulate is mixed into a dj.Table class by - inheritance and the target is self. - """ - return self - - def _job_key(self, key): - """ - :param key: they key returned for the job from the key source - :return: the dict to use to generate the job reservation hash - This method allows subclasses to control the job reservation granularity. - """ - return key - def _jobs_to_do(self, restrictions): """ :return: the query yielding the keys to be computed (derived from self.key_source) @@ -190,7 +226,7 @@ def _jobs_to_do(self, restrictions): raise DataJointError( "The populate target lacks attribute %s " "from the primary key of key_source" - % next(name for name in todo.heading.primary_key if name not in self.target.heading) + % next(name for name in todo.heading.primary_key if name not in self.heading) ) except StopIteration: pass @@ -199,71 +235,87 @@ def _jobs_to_do(self, restrictions): def populate( self, *restrictions, - keys=None, suppress_errors=False, return_exception_objects=False, reserve_jobs=False, - order="original", - limit=None, max_calls=None, display_progress=False, processes=1, make_kwargs=None, + priority=None, + refresh=None, ): """ ``table.populate()`` calls ``table.make(key)`` for every primary key in ``self.key_source`` for which there is not already a tuple in table. - :param restrictions: a list of restrictions each restrict - (table.key_source - target.proj()) - :param keys: The list of keys (dicts) to send to self.make(). - If None (default), then use self.key_source to query they keys. - :param suppress_errors: if True, do not terminate execution. + Two execution modes: + + **Direct mode** (reserve_jobs=False, default): + Keys computed directly from: (key_source & restrictions) - target + No job table involvement. Suitable for single-worker scenarios, + development, and debugging. + + **Distributed mode** (reserve_jobs=True): + Uses the job table (~~table_name) for multi-worker coordination. + Supports priority, scheduling, and status tracking. + + :param restrictions: conditions to filter key_source + :param suppress_errors: if True, collect errors instead of raising :param return_exception_objects: return error objects instead of just error messages - :param reserve_jobs: if True, reserve jobs to populate in asynchronous fashion - :param order: "original"|"reverse"|"random" - the order of execution - :param limit: if not None, check at most this many keys - :param max_calls: if not None, populate at most this many keys - :param display_progress: if True, report progress_bar - :param processes: number of processes to use. Set to None to use all cores - :param make_kwargs: Keyword arguments which do not affect the result of computation - to be passed down to each ``make()`` call. Computation arguments should be - specified within the pipeline e.g. using a `dj.Lookup` table. - :type make_kwargs: dict, optional - :return: a dict with two keys - "success_count": the count of successful ``make()`` calls in this ``populate()`` call - "error_list": the error list that is filled if `suppress_errors` is True + :param reserve_jobs: if True, use job table for distributed processing + :param max_calls: maximum number of make() calls (total across all processes) + :param display_progress: if True, show progress bar + :param processes: number of worker processes + :param make_kwargs: keyword arguments passed to each make() call + :param priority: (reserve_jobs only) only process jobs at this priority or more urgent + :param refresh: (reserve_jobs only) refresh job queue before processing. + Default from config.jobs.auto_refresh + :return: dict with "success_count" and "error_list" """ if self.connection.in_transaction: raise DataJointError("Populate cannot be called during a transaction.") - valid_order = ["original", "reverse", "random"] - if order not in valid_order: - raise DataJointError("The order argument must be one of %s" % str(valid_order)) - jobs = self.connection.schemas[self.target.database].jobs if reserve_jobs else None - if reserve_jobs: - # Define a signal handler for SIGTERM - def handler(signum, frame): - logger.info("Populate terminated by SIGTERM") - raise SystemExit("SIGTERM received") - - old_handler = signal.signal(signal.SIGTERM, handler) - - if keys is None: - keys = (self._jobs_to_do(restrictions) - self.target).fetch("KEY", limit=limit) + return self._populate_distributed( + *restrictions, + suppress_errors=suppress_errors, + return_exception_objects=return_exception_objects, + max_calls=max_calls, + display_progress=display_progress, + processes=processes, + make_kwargs=make_kwargs, + priority=priority, + refresh=refresh, + ) + else: + return self._populate_direct( + *restrictions, + suppress_errors=suppress_errors, + return_exception_objects=return_exception_objects, + max_calls=max_calls, + display_progress=display_progress, + processes=processes, + make_kwargs=make_kwargs, + ) - # exclude "error", "ignore" or "reserved" jobs - if reserve_jobs: - exclude_key_hashes = ( - jobs & {"table_name": self.target.table_name} & 'status in ("error", "ignore", "reserved")' - ).fetch("key_hash") - keys = [key for key in keys if key_hash(key) not in exclude_key_hashes] + def _populate_direct( + self, + *restrictions, + suppress_errors, + return_exception_objects, + max_calls, + display_progress, + processes, + make_kwargs, + ): + """ + Populate without job table coordination. - if order == "reverse": - keys.reverse() - elif order == "random": - random.shuffle(keys) + Computes keys directly from key_source, suitable for single-worker + execution, development, and debugging. + """ + keys = (self._jobs_to_do(restrictions) - self).fetch("KEY") logger.debug("Found %d keys to populate" % len(keys)) @@ -284,7 +336,7 @@ def handler(signum, frame): if processes == 1: for key in tqdm(keys, desc=self.__class__.__name__) if display_progress else keys: - status = self._populate1(key, jobs, **populate_kwargs) + status = self._populate1(key, jobs=None, **populate_kwargs) if status is True: success_list.append(1) elif isinstance(status, tuple): @@ -293,10 +345,10 @@ def handler(signum, frame): assert status is False else: # spawn multiple processes - self.connection.close() # disconnect parent process from MySQL server + self.connection.close() del self.connection._conn.ctx # SSLContext is not pickleable with ( - mp.Pool(processes, _initialize_populate, (self, jobs, populate_kwargs)) as pool, + mp.Pool(processes, _initialize_populate, (self, None, populate_kwargs)) as pool, tqdm(desc="Processes: ", total=nkeys) if display_progress else contextlib.nullcontext() as progress_bar, ): for status in pool.imap(_call_populate1, keys, chunksize=1): @@ -308,46 +360,138 @@ def handler(signum, frame): assert status is False if display_progress: progress_bar.update() - self.connection.connect() # reconnect parent process to MySQL server - - # restore original signal handler: - if reserve_jobs: - signal.signal(signal.SIGTERM, old_handler) + self.connection.connect() return { "success_count": sum(success_list), "error_list": error_list, } + def _populate_distributed( + self, + *restrictions, + suppress_errors, + return_exception_objects, + max_calls, + display_progress, + processes, + make_kwargs, + priority, + refresh, + ): + """ + Populate with job table coordination. + + Uses job table for multi-worker coordination, priority scheduling, + and status tracking. + """ + from .settings import config + + # Define a signal handler for SIGTERM + def handler(signum, frame): + logger.info("Populate terminated by SIGTERM") + raise SystemExit("SIGTERM received") + + old_handler = signal.signal(signal.SIGTERM, handler) + + try: + # Refresh job queue if configured + if refresh is None: + refresh = config.jobs.auto_refresh + if refresh: + self.jobs.refresh(*restrictions, priority=priority) + + # Fetch pending jobs ordered by priority + pending_query = self.jobs.pending & "scheduled_time <= NOW()" + if priority is not None: + pending_query = pending_query & f"priority <= {priority}" + + keys = pending_query.fetch("KEY", order_by="priority ASC, scheduled_time ASC", limit=max_calls) + + logger.debug("Found %d pending jobs to populate" % len(keys)) + + nkeys = len(keys) + error_list = [] + success_list = [] + + if nkeys: + processes = min(_ for _ in (processes, nkeys, mp.cpu_count()) if _) + + populate_kwargs = dict( + suppress_errors=suppress_errors, + return_exception_objects=return_exception_objects, + make_kwargs=make_kwargs, + ) + + if processes == 1: + for key in tqdm(keys, desc=self.__class__.__name__) if display_progress else keys: + status = self._populate1(key, jobs=self.jobs, **populate_kwargs) + if status is True: + success_list.append(1) + elif isinstance(status, tuple): + error_list.append(status) + # status is False means job was already reserved + else: + # spawn multiple processes + self.connection.close() + del self.connection._conn.ctx # SSLContext is not pickleable + with ( + mp.Pool(processes, _initialize_populate, (self, self.jobs, populate_kwargs)) as pool, + tqdm(desc="Processes: ", total=nkeys) + if display_progress + else contextlib.nullcontext() as progress_bar, + ): + for status in pool.imap(_call_populate1, keys, chunksize=1): + if status is True: + success_list.append(1) + elif isinstance(status, tuple): + error_list.append(status) + if display_progress: + progress_bar.update() + self.connection.connect() + + return { + "success_count": sum(success_list), + "error_list": error_list, + } + finally: + signal.signal(signal.SIGTERM, old_handler) + def _populate1(self, key, jobs, suppress_errors, return_exception_objects, make_kwargs=None): """ - populates table for one source key, calling self.make inside a transaction. - :param jobs: the jobs table or None if not reserve_jobs + Populate table for one source key, calling self.make inside a transaction. + :param key: dict specifying job to populate - :param suppress_errors: bool if errors should be suppressed and returned - :param return_exception_objects: if True, errors must be returned as objects + :param jobs: the Job object or None if not reserve_jobs + :param suppress_errors: if True, errors are suppressed and returned + :param return_exception_objects: if True, errors returned as objects :return: (key, error) when suppress_errors=True, - True if successfully invoke one `make()` call, otherwise False + True if successfully invoke one make() call, otherwise False """ + import time + # use the legacy `_make_tuples` callback. make = self._make_tuples if hasattr(self, "_make_tuples") else self.make - if jobs is not None and not jobs.reserve(self.target.table_name, self._job_key(key)): + # Try to reserve the job (distributed mode only) + if jobs is not None and not jobs.reserve(key): return False - # if make is a generator, it transaction can be delayed until the final stage + start_time = time.time() + + # if make is a generator, transaction can be delayed until the final stage is_generator = inspect.isgeneratorfunction(make) if not is_generator: self.connection.start_transaction() - if key in self.target: # already populated + if key in self: # already populated if not is_generator: self.connection.cancel_transaction() if jobs is not None: - jobs.complete(self.target.table_name, self._job_key(key)) + jobs.complete(key) return False - logger.debug(f"Making {key} -> {self.target.full_table_name}") + logger.debug(f"Making {key} -> {self.full_table_name}") self.__class__._allow_insert = True try: @@ -378,15 +522,9 @@ def _populate1(self, key, jobs, suppress_errors, return_exception_objects, make_ exception=error.__class__.__name__, msg=": " + str(error) if str(error) else "", ) - logger.debug(f"Error making {key} -> {self.target.full_table_name} - {error_message}") + logger.debug(f"Error making {key} -> {self.full_table_name} - {error_message}") if jobs is not None: - # show error name and error message (if any) - jobs.error( - self.target.table_name, - self._job_key(key), - error_message=error_message, - error_stack=traceback.format_exc(), - ) + jobs.error(key, error_message=error_message, error_stack=traceback.format_exc()) if not suppress_errors or isinstance(error, SystemExit): raise else: @@ -394,9 +532,22 @@ def _populate1(self, key, jobs, suppress_errors, return_exception_objects, make_ return key, error if return_exception_objects else error_message else: self.connection.commit_transaction() - logger.debug(f"Success making {key} -> {self.target.full_table_name}") + duration = time.time() - start_time + logger.debug(f"Success making {key} -> {self.full_table_name}") + + # Update hidden job metadata if table has the columns + if self._has_job_metadata_attrs(): + from .jobs import _get_job_version + + self._update_job_metadata( + key, + start_time=datetime.datetime.fromtimestamp(start_time), + duration=duration, + version=_get_job_version(), + ) + if jobs is not None: - jobs.complete(self.target.table_name, self._job_key(key)) + jobs.complete(key, duration=duration) return True finally: self.__class__._allow_insert = False @@ -404,11 +555,61 @@ def _populate1(self, key, jobs, suppress_errors, return_exception_objects, make_ def progress(self, *restrictions, display=False): """ Report the progress of populating the table. + + Uses a single aggregation query to efficiently compute both total and + remaining counts. + + :param restrictions: conditions to restrict key_source + :param display: if True, log the progress :return: (remaining, total) -- numbers of tuples to be populated """ todo = self._jobs_to_do(restrictions) - total = len(todo) - remaining = len(todo - self.target) + + # Get primary key attributes from key_source for join condition + # These are the "job keys" - the granularity at which populate() works + pk_attrs = todo.primary_key + assert pk_attrs, "key_source must have a primary key" + + # Find common attributes between key_source and self for the join + # This handles cases where self has additional PK attributes + common_attrs = [attr for attr in pk_attrs if attr in self.heading.names] + + if not common_attrs: + # No common attributes - fall back to two-query method + total = len(todo) + remaining = len(todo - self) + else: + # Build a single query that computes both total and remaining + # Using LEFT JOIN with COUNT(DISTINCT) to handle 1:many relationships + todo_sql = todo.make_sql() + target_sql = self.make_sql() + + # Build join condition on common attributes + join_cond = " AND ".join(f"`$ks`.`{attr}` = `$tgt`.`{attr}`" for attr in common_attrs) + + # Build DISTINCT key expression for counting unique jobs + # Use CONCAT for composite keys to create a single distinct value + if len(pk_attrs) == 1: + distinct_key = f"`$ks`.`{pk_attrs[0]}`" + null_check = f"`$tgt`.`{common_attrs[0]}`" + else: + distinct_key = "CONCAT_WS('|', {})".format(", ".join(f"`$ks`.`{attr}`" for attr in pk_attrs)) + null_check = f"`$tgt`.`{common_attrs[0]}`" + + # Single aggregation query: + # - COUNT(DISTINCT key) gives total unique jobs in key_source + # - Remaining = jobs where no matching target row exists + sql = f""" + SELECT + COUNT(DISTINCT {distinct_key}) AS total, + COUNT(DISTINCT CASE WHEN {null_check} IS NULL THEN {distinct_key} END) AS remaining + FROM ({todo_sql}) AS `$ks` + LEFT JOIN ({target_sql}) AS `$tgt` ON {join_cond} + """ + + result = self.connection.query(sql).fetchone() + total, remaining = result + if display: logger.info( "%-20s" % self.__class__.__name__ @@ -421,3 +622,29 @@ def progress(self, *restrictions, display=False): ), ) return remaining, total + + def _has_job_metadata_attrs(self): + """Check if table has hidden job metadata columns.""" + # Access _attributes directly to include hidden attributes + all_attrs = self.heading._attributes + return all_attrs is not None and "_job_start_time" in all_attrs + + def _update_job_metadata(self, key, start_time, duration, version): + """ + Update hidden job metadata for the given key. + + Args: + key: Primary key dict identifying the row(s) to update + start_time: datetime when computation started + duration: float seconds elapsed + version: str code version (truncated to 64 chars) + """ + from .condition import make_condition + + pk_condition = make_condition(self, key, set()) + self.connection.query( + f"UPDATE {self.full_table_name} SET " + "`_job_start_time`=%s, `_job_duration`=%s, `_job_version`=%s " + f"WHERE {pk_condition}", + args=(start_time, duration, version[:64] if version else ""), + ) diff --git a/src/datajoint/condition.py b/src/datajoint/condition.py index 085fb3d89..24c898112 100644 --- a/src/datajoint/condition.py +++ b/src/datajoint/condition.py @@ -264,7 +264,7 @@ def combine_conditions(negate, conditions): # restrict by another expression (aka semijoin and antijoin) if isinstance(condition, QueryExpression): assert_join_compatibility(query_expression, condition, semantic_check=semantic_check) - # Always match on all namesakes (natural join semantics) + # Match on all non-hidden namesakes (hidden attributes excluded) common_attributes = [q for q in condition.heading.names if q in query_expression.heading.names] columns.update(common_attributes) if isinstance(condition, Aggregation): diff --git a/src/datajoint/declare.py b/src/datajoint/declare.py index 05c0fab64..77638d4f7 100644 --- a/src/datajoint/declare.py +++ b/src/datajoint/declare.py @@ -5,7 +5,6 @@ import logging import re -from hashlib import sha1 import pyparsing as pp @@ -38,7 +37,7 @@ "bytes": (r"bytes$", "longblob"), # Temporal "date": (r"date$", None), - "datetime": (r"datetime$", None), + "datetime": (r"datetime(\s*\(\d+\))?$", None), # datetime with optional fractional seconds precision # String types (with parameters) "char": (r"char\s*\(\d+\)$", None), "varchar": (r"varchar\s*\(\d+\)$", None), @@ -297,12 +296,20 @@ def declare(full_table_name, definition, context): fk_attribute_map, ) = prepare_declare(definition, context) - if config.get("add_hidden_timestamp", False): - metadata_attr_sql = ["`_{full_table_name}_timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP"] - attribute_sql.extend( - attr.format(full_table_name=sha1(full_table_name.replace("`", "").encode("utf-8")).hexdigest()) - for attr in metadata_attr_sql - ) + # Add hidden job metadata for Computed/Imported tables (not parts) + # Note: table_name may still have backticks, strip them for prefix checking + clean_table_name = table_name.strip("`") + if config.jobs.add_job_metadata: + # Check if this is a Computed (__) or Imported (_) table, but not a Part (contains __ in middle) + is_computed = clean_table_name.startswith("__") and "__" not in clean_table_name[2:] + is_imported = clean_table_name.startswith("_") and not clean_table_name.startswith("__") + if is_computed or is_imported: + job_metadata_sql = [ + "`_job_start_time` datetime(3) DEFAULT NULL", + "`_job_duration` float DEFAULT NULL", + "`_job_version` varchar(64) DEFAULT ''", + ] + attribute_sql.extend(job_metadata_sql) if not primary_key: raise DataJointError("Table must have a primary key") diff --git a/src/datajoint/expression.py b/src/datajoint/expression.py index 2724fcb86..f885c77d3 100644 --- a/src/datajoint/expression.py +++ b/src/datajoint/expression.py @@ -46,7 +46,7 @@ class QueryExpression: _restriction = None _restriction_attributes = None - _left = [] # list of booleans True for left joins, False for inner joins + _joins = [] # list of (is_left: bool, using_attrs: list[str]) for each join _original_heading = None # heading before projections # subclasses or instantiators must provide values @@ -110,8 +110,17 @@ def from_clause(self): for src in self.support ) clause = next(support) - for s, left in zip(support, self._left): - clause += " NATURAL{left} JOIN {clause}".format(left=" LEFT" if left else "", clause=s) + for s, (is_left, using_attrs) in zip(support, self._joins): + left_kw = "LEFT " if is_left else "" + if using_attrs: + using = "USING ({})".format(", ".join(f"`{a}`" for a in using_attrs)) + clause += f" {left_kw}JOIN {s} {using}" + else: + # Cross join (no common non-hidden attributes) + if is_left: + clause += f" LEFT JOIN {s} ON TRUE" + else: + clause += f" CROSS JOIN {s}" return clause def where_clause(self): @@ -326,7 +335,7 @@ def join(self, other, semantic_check=True, left=False, allow_nullable_pk=False): "Use an inner join or restructure the query." ) - # Always natural join on all namesakes + # Always join on all non-hidden namesakes join_attributes = set(n for n in self.heading.names if n in other.heading.names) # needs subquery if self's FROM clause has common attributes with other's FROM clause need_subquery1 = need_subquery2 = bool( @@ -345,6 +354,12 @@ def join(self, other, semantic_check=True, left=False, allow_nullable_pk=False): or any(n in other.heading.new_attributes for n in join_attributes) or isinstance(self, Union) ) + # With USING clause (instead of NATURAL JOIN), we need subqueries when + # joining with multi-table expressions to ensure correct column matching + if len(self.support) > 1 and join_attributes: + need_subquery1 = True + if len(other.support) > 1 and join_attributes: + need_subquery2 = True if need_subquery1: self = self.make_subquery() if need_subquery2: @@ -352,12 +367,14 @@ def join(self, other, semantic_check=True, left=False, allow_nullable_pk=False): result = QueryExpression() result._connection = self.connection result._support = self.support + other.support - result._left = self._left + [left] + other._left + # Store join info: (is_left, using_attrs) - using_attrs excludes hidden attributes + using_attrs = [n for n in self.heading.names if n in other.heading.names] + result._joins = self._joins + [(left, using_attrs)] + other._joins result._heading = self.heading.join(other.heading, nullable_pk=nullable_pk) result._restriction = AndList(self.restriction) result._restriction.append(other.restriction) result._original_heading = self.original_heading.join(other.original_heading, nullable_pk=nullable_pk) - assert len(result.support) == len(result._left) + 1 + assert len(result.support) == len(result._joins) + 1 return result def extend(self, other, semantic_check=True): @@ -571,11 +588,12 @@ def tail(self, limit=25, **fetch_kwargs): def __len__(self): """:return: number of elements in the result set e.g. ``len(q1)``.""" result = self.make_subquery() if self._top else copy.copy(self) + has_left_join = any(is_left for is_left, _ in result._joins) return result.connection.query( "SELECT {select_} FROM {from_}{where}".format( select_=( "count(*)" - if any(result._left) + if has_left_join else "count(DISTINCT {fields})".format( fields=result.heading.as_sql(result.primary_key, include_aliases=False) ) @@ -718,7 +736,7 @@ def create(cls, groupby, group, keep_all_rows=False): result._connection = join.connection result._heading = join.heading.set_primary_key(groupby.primary_key) result._support = join.support - result._left = join._left + result._joins = join._joins result._left_restrict = join.restriction # WHERE clause applied before GROUP BY result._grouping_attributes = result.primary_key @@ -930,7 +948,7 @@ def aggr(self, group, **named_attributes): result._connection = group.connection result._heading = group.heading.set_primary_key(list(self.primary_key)) result._support = group.support - result._left = group._left + result._joins = group._joins result._left_restrict = group.restriction result._grouping_attributes = list(self.primary_key) diff --git a/src/datajoint/hash.py b/src/datajoint/hash.py index 88a737fb7..3c67af4d1 100644 --- a/src/datajoint/hash.py +++ b/src/datajoint/hash.py @@ -6,7 +6,6 @@ def key_hash(mapping): """ 32-byte hash of the mapping's key values sorted by the key name. This is often used to convert a long primary key value into a shorter hash. - For example, the JobTable in datajoint.jobs uses this function to hash the primary key of autopopulated tables. """ hashed = hashlib.md5() for k, v in sorted(mapping.items()): diff --git a/src/datajoint/jobs.py b/src/datajoint/jobs.py index b542f9364..82815a878 100644 --- a/src/datajoint/jobs.py +++ b/src/datajoint/jobs.py @@ -1,150 +1,499 @@ +""" +Job queue management for AutoPopulate 2.0. + +Each auto-populated table (Computed/Imported) has an associated jobs table +with the naming pattern ~~table_name. The jobs table tracks job status, +priority, scheduling, and error information. +""" + +import logging import os import platform +import subprocess -from .errors import DuplicateError -from .hash import key_hash +from .condition import AndList +from .errors import DataJointError, DuplicateError from .heading import Heading from .table import Table ERROR_MESSAGE_LENGTH = 2047 TRUNCATION_APPENDIX = "...truncated" +logger = logging.getLogger(__name__.split(".")[0]) -class JobTable(Table): + +def _get_job_version() -> str: """ - A base table with no definition. Allows reserving jobs + Get version string based on config settings. + + Returns: + Version string, or empty string if version tracking disabled. """ + from .settings import config - def __init__(self, conn, database): - self.database = database - self._connection = conn - self._heading = Heading(table_info=dict(conn=conn, database=database, table_name=self.table_name, context=None)) + method = config.jobs.version_method + if method is None or method == "none": + return "" + elif method == "git": + try: + result = subprocess.run( + ["git", "rev-parse", "--short", "HEAD"], + capture_output=True, + text=True, + timeout=5, + ) + return result.stdout.strip() if result.returncode == 0 else "" + except Exception: + return "" + return "" + + +class Job(Table): + """ + Per-table job queue for AutoPopulate 2.0. + + Each auto-populated table (Computed/Imported) has an associated job table + with the naming pattern ~~table_name. The job table tracks job status, + priority, scheduling, and error information. + + Access via the `jobs` property on any auto-populated table: + MyTable.jobs.refresh() + MyTable.jobs.pending + MyTable.jobs.errors + """ + + def __init__(self, target_table): + """ + Initialize jobs table for an auto-populated table. + + Args: + target_table: The Computed/Imported table instance this jobs table manages. + """ + self._target = target_table + self._connection = target_table.connection + self.database = target_table.database + + # Compute table name: ~~base_name + target_name = target_table.table_name + base_name = target_name.lstrip("_") + self._table_name = f"~~{base_name}" + + # Generate definition from target's FK-derived primary key + self._definition = self._generate_definition() + + # Initialize heading and support + self._heading = Heading( + table_info=dict( + conn=self._connection, + database=self.database, + table_name=self._table_name, + context=None, + ) + ) self._support = [self.full_table_name] - self._definition = """ # job reservation table for `{database}` - table_name :varchar(255) # className of the table - key_hash :char(32) # key hash - --- - status :enum('reserved','error','ignore') # if tuple is missing, the job is available - key=null : # structure containing the key - error_message="" :varchar({error_message_length}) # error message returned if failed - error_stack=null : # error stack if failed - user="" :varchar(255) # database user - host="" :varchar(255) # system hostname - pid=0 :int unsigned # system process id - connection_id = 0 : bigint unsigned # connection_id() - timestamp=CURRENT_TIMESTAMP :timestamp # automatic timestamp - """.format(database=database, error_message_length=ERROR_MESSAGE_LENGTH) - if not self.is_declared: - self.declare() - self._user = self.connection.get_user() + @property + def table_name(self): + return self._table_name @property def definition(self): return self._definition @property - def table_name(self): - return "~jobs" + def target(self): + """The auto-populated table this jobs table manages.""" + return self._target + + def _generate_definition(self) -> str: + """ + Generate jobs table definition from target's FK-derived primary key. + + Returns: + DataJoint table definition string. + """ + pk_attrs = self._get_fk_derived_pk_attrs() + + if not pk_attrs: + raise DataJointError( + f"Cannot create jobs table for {self._target.full_table_name}: " "no FK-derived primary key attributes found." + ) + + pk_lines = "\n ".join(f"{name} : {dtype}" for name, dtype in pk_attrs) + + return f""" + # Job queue for {self._target.full_table_name} + {pk_lines} + --- + status : enum('pending', 'reserved', 'success', 'error', 'ignore') + priority : uint8 + created_time=CURRENT_TIMESTAMP(3) : datetime(3) + scheduled_time=CURRENT_TIMESTAMP(3) : datetime(3) + reserved_time=null : datetime(3) + completed_time=null : datetime(3) + duration=null : float64 + error_message="" : varchar({ERROR_MESSAGE_LENGTH}) + error_stack=null : + user="" : varchar(255) + host="" : varchar(255) + pid=0 : uint32 + connection_id=0 : uint64 + version="" : varchar(64) + """ + + def _get_fk_derived_pk_attrs(self) -> list[tuple[str, str]]: + """ + Extract FK-derived primary key attributes using the dependency graph. + + FK-derived attributes are those that come from primary FK references. + Uses connection.dependencies to identify FK relationships. + + Returns: + List of (attribute_name, datatype) tuples in target PK order. + """ + heading = self._target.heading + target_pk = heading.primary_key + + # Load dependency graph if not already loaded + self._connection.dependencies.load() + + # Get primary FK parents and collect their attribute mappings + # parents(primary=True) returns FKs that contribute to primary key + parents = self._target.parents(primary=True, foreign_key_info=True) + fk_derived_attrs = set() + for _parent_name, props in parents: + # attr_map: child_attr -> parent_attr + fk_derived_attrs.update(props.get("attr_map", {}).keys()) + + fk_attrs = [] + for name in target_pk: + if name in fk_derived_attrs: + # FK-derived: comes from a primary FK parent + attr = heading[name] + fk_attrs.append((name, attr.type)) + else: + # Native PK attribute - not from FK + logger.warning( + f"Ignoring non-FK primary key attribute '{name}' in jobs table " + f"for {self._target.full_table_name}. Job granularity will be degraded." + ) + + return fk_attrs + + def _get_pk(self, key: dict) -> dict: + """Extract primary key values from a key dict.""" + return {k: key[k] for k in self.primary_key if k in key} def delete(self): - """bypass interactive prompts and dependencies""" + """Bypass interactive prompts and dependencies.""" self.delete_quick() def drop(self): - """bypass interactive prompts and dependencies""" + """Bypass interactive prompts and dependencies.""" self.drop_quick() - def reserve(self, table_name, key): + # ------------------------------------------------------------------------- + # Status filter properties + # ------------------------------------------------------------------------- + + @property + def pending(self): + """Return query for pending jobs.""" + return self & 'status="pending"' + + @property + def reserved(self): + """Return query for reserved jobs.""" + return self & 'status="reserved"' + + @property + def errors(self): + """Return query for error jobs.""" + return self & 'status="error"' + + @property + def ignored(self): + """Return query for ignored jobs.""" + return self & 'status="ignore"' + + @property + def completed(self): + """Return query for completed (success) jobs.""" + return self & 'status="success"' + + # ------------------------------------------------------------------------- + # Core job management methods + # ------------------------------------------------------------------------- + + def refresh( + self, + *restrictions, + delay: float = 0, + priority: int | None = None, + stale_timeout: float | None = None, + orphan_timeout: float | None = None, + ) -> dict: """ - Reserve a job for computation. When a job is reserved, the job table contains an entry for the - job key, identified by its hash. When jobs are completed, the entry is removed. + Refresh the jobs queue: add new jobs and clean up stale/orphaned jobs. - :param table_name: `database`.`table_name` - :param key: the dict of the job's primary key - :return: True if reserved job successfully. False = the jobs is already taken + Operations performed: + 1. Add new jobs: (key_source & restrictions) - target - jobs -> insert as 'pending' + 2. Re-pend success jobs: if keep_completed=True and key in key_source but not in target + 3. Remove stale jobs: jobs older than stale_timeout whose keys not in key_source + 4. Remove orphaned jobs: reserved jobs older than orphan_timeout (if specified) + + Args: + restrictions: Conditions to filter key_source (for adding new jobs). + delay: Seconds from now until new jobs become available for processing. + Default: 0 (immediately available). Uses database server time. + priority: Priority for new jobs (lower = more urgent). + Default from config.jobs.default_priority. + stale_timeout: Seconds after which jobs are checked for staleness. + Jobs older than this are removed if key not in key_source. + Default from config.jobs.stale_timeout. + Set to 0 to skip stale cleanup. + orphan_timeout: Seconds after which reserved jobs are considered orphaned. + Reserved jobs older than this are deleted and re-added as pending. + Default: None (no orphan cleanup - must be explicit). + + Returns: + { + 'added': int, # New pending jobs added + 'removed': int, # Stale jobs removed + 'orphaned': int, # Orphaned jobs reset to pending + 're_pended': int # Success jobs re-pended (keep_completed mode) + } """ - job = dict( - table_name=table_name, - key_hash=key_hash(key), - status="reserved", - host=platform.node(), - pid=os.getpid(), - connection_id=self.connection.connection_id, - key=key, - user=self._user, - ) - try: - self.insert1(job, ignore_extra_fields=True) - except DuplicateError: - return False - return True + from datetime import datetime, timedelta + + from .settings import config + + # Ensure jobs table exists + if not self.is_declared: + self.declare() + + # Get defaults from config + if priority is None: + priority = config.jobs.default_priority + if stale_timeout is None: + stale_timeout = config.jobs.stale_timeout + + result = {"added": 0, "removed": 0, "orphaned": 0, "re_pended": 0} + now = datetime.now() + + # 1. Add new jobs + key_source = self._target.key_source + if restrictions: + key_source = key_source & AndList(restrictions) + + # Keys that need jobs: in key_source, not in target, not in jobs + # Disable semantic_check for Job table (self) because its attributes may not have matching lineage + from .condition import Not - def ignore(self, table_name, key): + new_keys = (key_source - self._target).restrict(Not(self), semantic_check=False).proj() + new_key_list = new_keys.fetch("KEY") + + if new_key_list: + scheduled_time = now + timedelta(seconds=delay) if delay > 0 else now + for key in new_key_list: + job_entry = { + **key, + "status": "pending", + "priority": priority, + "scheduled_time": scheduled_time, + } + try: + self.insert1(job_entry, ignore_extra_fields=True) + result["added"] += 1 + except DuplicateError: + pass # Job already exists + + # 2. Re-pend success jobs if keep_completed=True + if config.jobs.keep_completed: + # Success jobs whose keys are in key_source but not in target + # Disable semantic_check for Job table operations + success_to_repend = self.completed.restrict(key_source, semantic_check=False) - self._target + repend_keys = success_to_repend.fetch("KEY") + for key in repend_keys: + (self & key).delete_quick() + self.insert1({**key, "status": "pending", "priority": priority}) + result["re_pended"] += 1 + + # 3. Remove stale jobs (not ignore status) + if stale_timeout > 0: + stale_cutoff = now - timedelta(seconds=stale_timeout) + old_jobs = self & f'created_time < "{stale_cutoff}"' & 'status != "ignore"' + + for key in old_jobs.fetch("KEY"): + # Check if key still in key_source + if not (key_source & key): + (self & key).delete_quick() + result["removed"] += 1 + + # 4. Handle orphaned reserved jobs + if orphan_timeout is not None and orphan_timeout > 0: + orphan_cutoff = now - timedelta(seconds=orphan_timeout) + orphaned_jobs = self.reserved & f'reserved_time < "{orphan_cutoff}"' + + for key in orphaned_jobs.fetch("KEY"): + (self & key).delete_quick() + self.insert1({**key, "status": "pending", "priority": priority}) + result["orphaned"] += 1 + + return result + + def reserve(self, key: dict) -> bool: """ - Set a job to be ignored for computation. When a job is ignored, the job table contains an entry for the - job key, identified by its hash, with status "ignore". + Attempt to reserve a pending job for processing. + + Updates status to 'reserved' if currently 'pending' and scheduled_time <= now. Args: - table_name: - Table name (str) - `database`.`table_name` - key: - The dict of the job's primary key + key: Primary key dict of the job to reserve. Returns: - True if ignore job successfully. False = the jobs is already taken - """ - job = dict( - table_name=table_name, - key_hash=key_hash(key), - status="ignore", - host=platform.node(), - pid=os.getpid(), - connection_id=self.connection.connection_id, - key=key, - user=self._user, - ) + True if reservation successful, False if job not available. + """ + from datetime import datetime + + # Check if job is pending and scheduled + now = datetime.now() + job = (self & key & 'status="pending"' & f'scheduled_time <= "{now}"').fetch(as_dict=True) + + if not job: + return False + + # Build update row with primary key and new values + pk = self._get_pk(key) + update_row = { + **pk, + "status": "reserved", + "reserved_time": now, + "host": platform.node(), + "pid": os.getpid(), + "connection_id": self.connection.connection_id, + "user": self.connection.get_user(), + "version": _get_job_version(), + } + try: - self.insert1(job, ignore_extra_fields=True) - except DuplicateError: + self.update1(update_row) + return True + except Exception: return False - return True - def complete(self, table_name, key): + def complete(self, key: dict, duration: float | None = None) -> None: """ - Log a completed job. When a job is completed, its reservation entry is deleted. + Mark a job as successfully completed. - :param table_name: `database`.`table_name` - :param key: the dict of the job's primary key + Based on config.jobs.keep_completed: + - If True: updates status to 'success' with completion time and duration + - If False: deletes the job entry + + Args: + key: Primary key dict of the job. + duration: Execution duration in seconds. """ - job_key = dict(table_name=table_name, key_hash=key_hash(key)) - (self & job_key).delete_quick() + from datetime import datetime - def error(self, table_name, key, error_message, error_stack=None): + from .settings import config + + if config.jobs.keep_completed: + pk = self._get_pk(key) + update_row = { + **pk, + "status": "success", + "completed_time": datetime.now(), + } + if duration is not None: + update_row["duration"] = duration + self.update1(update_row) + else: + (self & key).delete_quick() + + def error(self, key: dict, error_message: str, error_stack: str | None = None) -> None: """ - Log an error message. The job reservation is replaced with an error entry. - if an error occurs, leave an entry describing the problem + Mark a job as failed with error details. - :param table_name: `database`.`table_name` - :param key: the dict of the job's primary key - :param error_message: string error message - :param error_stack: stack trace + Args: + key: Primary key dict of the job. + error_message: Error message (truncated to 2047 chars). + error_stack: Full stack trace. """ + from datetime import datetime + if len(error_message) > ERROR_MESSAGE_LENGTH: error_message = error_message[: ERROR_MESSAGE_LENGTH - len(TRUNCATION_APPENDIX)] + TRUNCATION_APPENDIX - self.insert1( - dict( - table_name=table_name, - key_hash=key_hash(key), - status="error", - host=platform.node(), - pid=os.getpid(), - connection_id=self.connection.connection_id, - user=self._user, - key=key, - error_message=error_message, - error_stack=error_stack, - ), - replace=True, - ignore_extra_fields=True, - ) + + pk = self._get_pk(key) + update_row = { + **pk, + "status": "error", + "completed_time": datetime.now(), + "error_message": error_message, + } + if error_stack is not None: + update_row["error_stack"] = error_stack + + self.update1(update_row) + + def ignore(self, key: dict) -> None: + """ + Mark a job to be ignored (skipped during populate). + + If the key doesn't exist in the jobs table, inserts it with status='ignore'. + If it exists, updates the status to 'ignore'. + + Args: + key: Primary key dict of the job. + """ + from .settings import config + + pk = self._get_pk(key) + if pk in self: + self.update1({**pk, "status": "ignore"}) + else: + priority = config.jobs.default_priority + self.insert1({**pk, "status": "ignore", "priority": priority}) + + def progress(self) -> dict: + """ + Return job status breakdown. + + Returns: + { + 'pending': int, + 'reserved': int, + 'success': int, + 'error': int, + 'ignore': int, + 'total': int + } + """ + if not self.is_declared: + return { + "pending": 0, + "reserved": 0, + "success": 0, + "error": 0, + "ignore": 0, + "total": 0, + } + + # Query status counts + result = self.connection.query(f"SELECT status, COUNT(*) as n FROM {self.full_table_name} GROUP BY status").fetchall() + + counts = { + "pending": 0, + "reserved": 0, + "success": 0, + "error": 0, + "ignore": 0, + } + + for row in result: + status, n = row + counts[status] = n + + counts["total"] = sum(counts.values()) + return counts diff --git a/src/datajoint/migrate.py b/src/datajoint/migrate.py index 1948cbe06..7429cc938 100644 --- a/src/datajoint/migrate.py +++ b/src/datajoint/migrate.py @@ -248,3 +248,167 @@ def check_migration_status(schema: Schema) -> dict: "pending": sum(1 for c in columns if c["needs_migration"]), "columns": columns, } + + +# ============================================================================= +# Job Metadata Migration +# ============================================================================= + +# Hidden job metadata columns added by config.jobs.add_job_metadata +JOB_METADATA_COLUMNS = [ + ("_job_start_time", "datetime(3) DEFAULT NULL"), + ("_job_duration", "float DEFAULT NULL"), + ("_job_version", "varchar(64) DEFAULT ''"), +] + + +def _get_existing_columns(connection, database: str, table_name: str) -> set[str]: + """Get set of existing column names for a table.""" + result = connection.query( + """ + SELECT COLUMN_NAME + FROM information_schema.COLUMNS + WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s + """, + args=(database, table_name), + ) + return {row[0] for row in result.fetchall()} + + +def _is_autopopulated_table(table_name: str) -> bool: + """Check if a table name indicates a Computed or Imported table.""" + # Computed tables start with __ (but not part tables which have __ in middle) + # Imported tables start with _ (but not __) + if table_name.startswith("__"): + # Computed table if no __ after the prefix + return "__" not in table_name[2:] + elif table_name.startswith("_"): + # Imported table + return True + return False + + +def add_job_metadata_columns(target, dry_run: bool = True) -> dict: + """ + Add hidden job metadata columns to existing Computed/Imported tables. + + This migration utility adds the hidden columns (_job_start_time, _job_duration, + _job_version) to tables that were created before config.jobs.add_job_metadata + was enabled. + + Args: + target: Either a table class/instance (dj.Computed or dj.Imported) or + a Schema object. If a Schema, all Computed/Imported tables in + the schema will be processed. + dry_run: If True (default), only preview changes without applying. + + Returns: + Dict with keys: + - tables_analyzed: Number of tables checked + - tables_modified: Number of tables that were/would be modified + - columns_added: Total columns added across all tables + - details: List of dicts with per-table information + + Example: + >>> import datajoint as dj + >>> from datajoint.migrate import add_job_metadata_columns + >>> + >>> # Preview migration for a single table + >>> result = add_job_metadata_columns(MyComputedTable, dry_run=True) + >>> print(f"Would add {result['columns_added']} columns") + >>> + >>> # Apply migration to all tables in a schema + >>> result = add_job_metadata_columns(schema, dry_run=False) + >>> print(f"Modified {result['tables_modified']} tables") + + Note: + - Only Computed and Imported tables are modified (not Manual, Lookup, or Part tables) + - Existing rows will have NULL values for _job_start_time and _job_duration + - Future populate() calls will fill in metadata for new rows + - This does NOT retroactively populate metadata for existing rows + """ + from .schemas import Schema + from .table import Table + + result = { + "tables_analyzed": 0, + "tables_modified": 0, + "columns_added": 0, + "details": [], + } + + # Determine tables to process + if isinstance(target, Schema): + schema = target + # Get all user tables in the schema + tables_query = """ + SELECT TABLE_NAME + FROM information_schema.TABLES + WHERE TABLE_SCHEMA = %s + AND TABLE_TYPE = 'BASE TABLE' + AND TABLE_NAME NOT LIKE '~%%' + """ + table_names = [row[0] for row in schema.connection.query(tables_query, args=(schema.database,)).fetchall()] + tables_to_process = [ + (schema.database, name, schema.connection) for name in table_names if _is_autopopulated_table(name) + ] + elif isinstance(target, type) and issubclass(target, Table): + # Table class + instance = target() + tables_to_process = [(instance.database, instance.table_name, instance.connection)] + elif isinstance(target, Table): + # Table instance + tables_to_process = [(target.database, target.table_name, target.connection)] + else: + raise DataJointError(f"target must be a Table class, Table instance, or Schema, got {type(target)}") + + for database, table_name, connection in tables_to_process: + result["tables_analyzed"] += 1 + + # Skip non-autopopulated tables + if not _is_autopopulated_table(table_name): + continue + + # Check which columns need to be added + existing_columns = _get_existing_columns(connection, database, table_name) + columns_to_add = [(name, definition) for name, definition in JOB_METADATA_COLUMNS if name not in existing_columns] + + if not columns_to_add: + result["details"].append( + { + "table": f"{database}.{table_name}", + "status": "already_migrated", + "columns_added": 0, + } + ) + continue + + # Generate and optionally execute ALTER statements + table_detail = { + "table": f"{database}.{table_name}", + "status": "migrated" if not dry_run else "pending", + "columns_added": len(columns_to_add), + "sql_statements": [], + } + + for col_name, col_definition in columns_to_add: + sql = f"ALTER TABLE `{database}`.`{table_name}` ADD COLUMN `{col_name}` {col_definition}" + table_detail["sql_statements"].append(sql) + + if not dry_run: + try: + connection.query(sql) + logger.info(f"Added column {col_name} to {database}.{table_name}") + except Exception as e: + logger.error(f"Failed to add column {col_name} to {database}.{table_name}: {e}") + table_detail["status"] = "error" + table_detail["error"] = str(e) + raise DataJointError(f"Migration failed: {e}") from e + else: + logger.info(f"Would add column {col_name} to {database}.{table_name}") + + result["tables_modified"] += 1 + result["columns_added"] += len(columns_to_add) + result["details"].append(table_detail) + + return result diff --git a/src/datajoint/schemas.py b/src/datajoint/schemas.py index ae03d328c..f9c925440 100644 --- a/src/datajoint/schemas.py +++ b/src/datajoint/schemas.py @@ -9,7 +9,7 @@ from .connection import conn from .errors import AccessError, DataJointError from .heading import Heading -from .jobs import JobTable +from .jobs import Job from .settings import config from .table import FreeTable, lookup_class_name from .user_tables import Computed, Imported, Lookup, Manual, Part, _get_tier @@ -68,7 +68,6 @@ def __init__( self.context = context self.create_schema = create_schema self.create_tables = create_tables - self._jobs = None self.add_objects = add_objects self.declare_list = [] if schema_name: @@ -367,14 +366,34 @@ def rebuild_lineage(self): @property def jobs(self): """ - schema.jobs provides a view of the job reservation table for the schema + Return list of Job objects for auto-populated tables that have job tables. - :return: jobs table + Only returns Job objects when both the target table and its ~~table_name + job table exist in the database. Job tables are created lazily on first + access to table.jobs or populate(reserve_jobs=True). + + :return: list of Job objects for existing job tables """ self._assert_exists() - if self._jobs is None: - self._jobs = JobTable(self.connection, self.database) - return self._jobs + jobs_list = [] + + # Get all existing job tables (~~prefix) + # Note: %% escapes the % in pymysql + result = self.connection.query(f"SHOW TABLES IN `{self.database}` LIKE '~~%%'").fetchall() + existing_job_tables = {row[0] for row in result} + + # Iterate over auto-populated tables and check if their job table exists + for table_name in self.list_tables(): + table = FreeTable(self.connection, f"`{self.database}`.`{table_name}`") + tier = _get_tier(table.full_table_name) + if tier in (Computed, Imported): + # Compute expected job table name: ~~base_name + base_name = table_name.lstrip("_") + job_table_name = f"~~{base_name}" + if job_table_name in existing_job_tables: + jobs_list.append(Job(table)) + + return jobs_list @property def code(self): diff --git a/src/datajoint/settings.py b/src/datajoint/settings.py index fc8af197e..99e7feeab 100644 --- a/src/datajoint/settings.py +++ b/src/datajoint/settings.py @@ -201,6 +201,36 @@ class ExternalSettings(BaseSettings): aws_secret_access_key: SecretStr | None = Field(default=None, validation_alias="DJ_AWS_SECRET_ACCESS_KEY") +class JobsSettings(BaseSettings): + """Job queue configuration for AutoPopulate 2.0.""" + + model_config = SettingsConfigDict( + env_prefix="DJ_JOBS_", + case_sensitive=False, + extra="forbid", + validate_assignment=True, + ) + + auto_refresh: bool = Field(default=True, description="Auto-refresh jobs queue on populate") + keep_completed: bool = Field(default=False, description="Keep success records in jobs table") + stale_timeout: int = Field(default=3600, ge=0, description="Seconds before pending job is checked for staleness") + default_priority: int = Field(default=5, ge=0, le=255, description="Default priority for new jobs (lower = more urgent)") + version_method: Literal["git", "none"] | None = Field( + default=None, description="Method to obtain version: 'git' (commit hash), 'none' (empty), or None (disabled)" + ) + allow_new_pk_fields_in_computed_tables: bool = Field( + default=False, + description="Allow native (non-FK) primary key fields in Computed/Imported tables. " + "When True, bypasses the FK-only PK validation. Job granularity will be degraded for such tables.", + ) + add_job_metadata: bool = Field( + default=False, + description="Add hidden job metadata attributes (_job_start_time, _job_duration, _job_version) " + "to Computed and Imported tables during declaration. Tables created without this setting " + "will not receive metadata updates during populate.", + ) + + class ObjectStorageSettings(BaseSettings): """Object storage configuration for the object type.""" @@ -264,6 +294,7 @@ class Config(BaseSettings): connection: ConnectionSettings = Field(default_factory=ConnectionSettings) display: DisplaySettings = Field(default_factory=DisplaySettings) external: ExternalSettings = Field(default_factory=ExternalSettings) + jobs: JobsSettings = Field(default_factory=JobsSettings) object_storage: ObjectStorageSettings = Field(default_factory=ObjectStorageSettings) # Top-level settings @@ -271,7 +302,6 @@ class Config(BaseSettings): safemode: bool = True fetch_format: Literal["array", "frame"] = "array" enable_python_native_blobs: bool = True - add_hidden_timestamp: bool = False filepath_checksum_size_limit: int | None = None # External stores configuration diff --git a/src/datajoint/table.py b/src/datajoint/table.py index 00d1e8de8..b5647ffa0 100644 --- a/src/datajoint/table.py +++ b/src/datajoint/table.py @@ -101,6 +101,10 @@ def declare(self, context=None): + "Classes defining tables should be formatted in strict CamelCase." ) sql, _external_stores, primary_key, fk_attribute_map = declare(self.full_table_name, self.definition, context) + + # Call declaration hook for validation (subclasses like AutoPopulate can override) + self._declare_check(primary_key, fk_attribute_map) + sql = sql.format(database=self.database) try: self.connection.query(sql) @@ -111,6 +115,18 @@ def declare(self, context=None): # Populate lineage table for this table's attributes self._populate_lineage(primary_key, fk_attribute_map) + def _declare_check(self, primary_key, fk_attribute_map): + """ + Hook for declaration-time validation. Subclasses can override. + + Called before the table is created in the database. Override this method + to add validation logic (e.g., AutoPopulate validates FK-only primary keys). + + :param primary_key: list of primary key attribute names + :param fk_attribute_map: dict mapping child_attr -> (parent_table, parent_attr) + """ + pass # Default: no validation + def _populate_lineage(self, primary_key, fk_attribute_map): """ Populate the ~lineage table with lineage information for this table's attributes. diff --git a/src/datajoint/user_tables.py b/src/datajoint/user_tables.py index d06bc5371..384d908c8 100644 --- a/src/datajoint/user_tables.py +++ b/src/datajoint/user_tables.py @@ -205,12 +205,21 @@ class Part(UserTable, metaclass=PartMeta): + ")" ) - def delete(self, force=False): + def delete(self, force=False, **kwargs): """ - unless force is True, prohibits direct deletes from parts. + Delete from a Part table. + + Args: + force: If True, allow direct deletion from Part table. + If False (default), raise an error. + **kwargs: Additional arguments passed to Table.delete() + (transaction, safemode, force_masters) + + Raises: + DataJointError: If force is False (direct Part deletes are prohibited) """ if force: - super().delete(force_parts=True) + super().delete(force_parts=True, **kwargs) else: raise DataJointError("Cannot delete from a Part directly. Delete from master instead") diff --git a/src/datajoint/version.py b/src/datajoint/version.py index 4684015ad..da2a4c956 100644 --- a/src/datajoint/version.py +++ b/src/datajoint/version.py @@ -1,4 +1,4 @@ # version bump auto managed by Github Actions: # label_prs.yaml(prep), release.yaml(bump), post_release.yaml(edit) # manually set this version will be eventually overwritten by the above actions -__version__ = "2.0.0a9" +__version__ = "2.0.0a12" diff --git a/tests/conftest.py b/tests/conftest.py index 14b848d4b..6d03dece7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -454,29 +454,6 @@ def minio_client(s3_creds, s3fs_client, teardown=False): pass -# ============================================================================= -# Utility Fixtures -# ============================================================================= - - -@pytest.fixture(scope="session") -def monkeysession(): - with pytest.MonkeyPatch.context() as mp: - yield mp - - -@pytest.fixture(scope="module") -def monkeymodule(): - with pytest.MonkeyPatch.context() as mp: - yield mp - - -@pytest.fixture -def enable_adapted_types(): - """Deprecated - custom attribute types no longer require a feature flag.""" - yield - - # ============================================================================= # Cleanup Fixtures # ============================================================================= @@ -494,10 +471,12 @@ def clean_autopopulate(experiment, trial, ephys): @pytest.fixture def clean_jobs(schema_any): """Cleanup fixture for jobs tests.""" - try: - schema_any.jobs.delete() - except DataJointError: - pass + # schema.jobs returns a list of Job objects for existing job tables + for job in schema_any.jobs: + try: + job.delete() + except DataJointError: + pass yield @@ -522,10 +501,15 @@ def clean_test_tables(test, test_extra, test_no_extra): def schema_any(connection_test, prefix): schema_any = dj.Schema(prefix + "_test1", schema.LOCALS_ANY, connection=connection_test) assert schema.LOCALS_ANY, "LOCALS_ANY is empty" - try: - schema_any.jobs.delete() - except DataJointError: - pass + # Clean up any existing job tables (schema.jobs returns a list) + for job in schema_any.jobs: + try: + job.delete() + except DataJointError: + pass + # Allow native PK fields for legacy test tables (Experiment, Trial) + original_value = dj.config.jobs.allow_new_pk_fields_in_computed_tables + dj.config.jobs.allow_new_pk_fields_in_computed_tables = True schema_any(schema.TTest) schema_any(schema.TTest2) schema_any(schema.TTest3) @@ -564,11 +548,16 @@ def schema_any(connection_test, prefix): schema_any(schema.SessionDateA) schema_any(schema.Stimulus) schema_any(schema.Longblob) + # Restore original config value after all tables are declared + dj.config.jobs.allow_new_pk_fields_in_computed_tables = original_value yield schema_any - try: - schema_any.jobs.delete() - except DataJointError: - pass + # Clean up job tables before dropping schema (if schema still exists) + if schema_any.exists: + for job in schema_any.jobs: + try: + job.delete() + except DataJointError: + pass schema_any.drop() @@ -577,10 +566,15 @@ def schema_any_fresh(connection_test, prefix): """Function-scoped schema_any for tests that need fresh schema state.""" schema_any = dj.Schema(prefix + "_test1_fresh", schema.LOCALS_ANY, connection=connection_test) assert schema.LOCALS_ANY, "LOCALS_ANY is empty" - try: - schema_any.jobs.delete() - except DataJointError: - pass + # Clean up any existing job tables + for job in schema_any.jobs: + try: + job.delete() + except DataJointError: + pass + # Allow native PK fields for legacy test tables (Experiment, Trial) + original_value = dj.config.jobs.allow_new_pk_fields_in_computed_tables + dj.config.jobs.allow_new_pk_fields_in_computed_tables = True schema_any(schema.TTest) schema_any(schema.TTest2) schema_any(schema.TTest3) @@ -619,11 +613,16 @@ def schema_any_fresh(connection_test, prefix): schema_any(schema.SessionDateA) schema_any(schema.Stimulus) schema_any(schema.Longblob) + # Restore original config value after all tables are declared + dj.config.jobs.allow_new_pk_fields_in_computed_tables = original_value yield schema_any - try: - schema_any.jobs.delete() - except DataJointError: - pass + # Clean up job tables before dropping schema (if schema still exists) + if schema_any.exists: + for job in schema_any.jobs: + try: + job.delete() + except DataJointError: + pass schema_any.drop() diff --git a/tests/integration/test_alter.py b/tests/integration/test_alter.py index 45448f966..fbf074332 100644 --- a/tests/integration/test_alter.py +++ b/tests/integration/test_alter.py @@ -45,9 +45,10 @@ def test_alter_part(self, schema_alter): """ https://github.com/datajoint/datajoint-python/issues/936 """ - self.verify_alter(schema_alter, table=Parent.Child, attribute_sql="`child_id` .* DEFAULT NULL") + # Regex includes optional COMMENT for type annotations + self.verify_alter(schema_alter, table=Parent.Child, attribute_sql=r"`child_id` .* DEFAULT NULL[^,]*") self.verify_alter( schema_alter, table=Parent.Grandchild, - attribute_sql="`grandchild_id` .* DEFAULT NULL", + attribute_sql=r"`grandchild_id` .* DEFAULT NULL[^,]*", ) diff --git a/tests/integration/test_autopopulate.py b/tests/integration/test_autopopulate.py index de9dc95a3..07f4333c5 100644 --- a/tests/integration/test_autopopulate.py +++ b/tests/integration/test_autopopulate.py @@ -49,31 +49,36 @@ def test_populate_with_success_count(clean_autopopulate, subject, experiment, tr assert len(trial.key_source & trial) == success_count -def test_populate_key_list(clean_autopopulate, subject, experiment, trial): - # test simple populate +def test_populate_max_calls(clean_autopopulate, subject, experiment, trial): + # test populate with max_calls limit assert subject, "root tables are empty" assert not experiment, "table already filled?" - keys = experiment.key_source.fetch("KEY", order_by="KEY") n = 3 - assert len(keys) > n - keys = keys[:n] - ret = experiment.populate(keys=keys) + total_keys = len(experiment.key_source) + assert total_keys > n + ret = experiment.populate(max_calls=n) assert n == ret["success_count"] -def test_populate_exclude_error_and_ignore_jobs(clean_autopopulate, schema_any, subject, experiment): - # test simple populate +def test_populate_exclude_error_and_ignore_jobs(clean_autopopulate, subject, experiment): + # test that error and ignore jobs are excluded from populate assert subject, "root tables are empty" assert not experiment, "table already filled?" - keys = experiment.key_source.fetch("KEY", limit=2) + # Refresh jobs to create pending entries + experiment.jobs.refresh() + + keys = experiment.jobs.pending.fetch("KEY", limit=2) for idx, key in enumerate(keys): if idx == 0: - schema_any.jobs.ignore(experiment.table_name, key) + experiment.jobs.ignore(key) else: - schema_any.jobs.error(experiment.table_name, key, "") + # Create an error job by first reserving then setting error + experiment.jobs.reserve(key) + experiment.jobs.error(key, "test error") - experiment.populate(reserve_jobs=True) + # Populate should skip error and ignore jobs + experiment.populate(reserve_jobs=True, refresh=False) assert len(experiment.key_source & experiment) == len(experiment.key_source) - 2 diff --git a/tests/integration/test_declare.py b/tests/integration/test_declare.py index e6e72a716..3097a9457 100644 --- a/tests/integration/test_declare.py +++ b/tests/integration/test_declare.py @@ -4,7 +4,6 @@ import datajoint as dj from datajoint.declare import declare -from datajoint.settings import config from tests.schema import ( Auto, @@ -22,24 +21,6 @@ ) -@pytest.fixture(scope="function") -def enable_add_hidden_timestamp(): - orig_config_val = config.get("add_hidden_timestamp") - config["add_hidden_timestamp"] = True - yield - if orig_config_val is not None: - config["add_hidden_timestamp"] = orig_config_val - - -@pytest.fixture(scope="function") -def disable_add_hidden_timestamp(): - orig_config_val = config.get("add_hidden_timestamp") - config["add_hidden_timestamp"] = False - yield - if orig_config_val is not None: - config["add_hidden_timestamp"] = orig_config_val - - def test_schema_decorator(schema_any): assert issubclass(Subject, dj.Lookup) assert not issubclass(Subject, dj.Part) @@ -114,10 +95,9 @@ class Type(dj.Part): def test_attributes(schema_any): """ - Test autoincrement declaration + Test attribute declarations """ auto = Auto() - auto.fill() subject = Subject() experiment = Experiment() trial = Trial() @@ -125,7 +105,7 @@ def test_attributes(schema_any): channel = Ephys.Channel() assert auto.heading.names == ["id", "name"] - assert auto.heading.attributes["id"].autoincrement + assert auto.heading.attributes["id"].numeric # test attribute declarations assert subject.heading.names == [ @@ -385,26 +365,3 @@ class Table_With_Underscores(dj.Manual): schema_any(TableNoUnderscores) with pytest.raises(dj.DataJointError, match="must be alphanumeric in CamelCase"): schema_any(Table_With_Underscores) - - -def test_add_hidden_timestamp_default_value(): - config_val = config.get("add_hidden_timestamp") - assert config_val is not None and not config_val, "Default value for add_hidden_timestamp is not False" - - -def test_add_hidden_timestamp_enabled(enable_add_hidden_timestamp, schema_any_fresh): - assert config["add_hidden_timestamp"], "add_hidden_timestamp is not enabled" - msg = f"{Experiment().heading._attributes=}" - assert any(a.name.endswith("_timestamp") for a in Experiment().heading._attributes.values()), msg - assert any(a.name.startswith("_") for a in Experiment().heading._attributes.values()), msg - assert any(a.is_hidden for a in Experiment().heading._attributes.values()), msg - assert not any(a.is_hidden for a in Experiment().heading.attributes.values()), msg - - -def test_add_hidden_timestamp_disabled(disable_add_hidden_timestamp, schema_any_fresh): - assert not config["add_hidden_timestamp"], "expected add_hidden_timestamp to be False" - msg = f"{Experiment().heading._attributes=}" - assert not any(a.name.endswith("_timestamp") for a in Experiment().heading._attributes.values()), msg - assert not any(a.name.startswith("_") for a in Experiment().heading._attributes.values()), msg - assert not any(a.is_hidden for a in Experiment().heading._attributes.values()), msg - assert not any(a.is_hidden for a in Experiment().heading.attributes.values()), msg diff --git a/tests/integration/test_hidden_job_metadata.py b/tests/integration/test_hidden_job_metadata.py new file mode 100644 index 000000000..fb4ba387b --- /dev/null +++ b/tests/integration/test_hidden_job_metadata.py @@ -0,0 +1,273 @@ +"""Tests for hidden job metadata in computed tables.""" + +import time + +import pytest + +import datajoint as dj + + +@pytest.fixture +def schema_job_metadata(connection_test, prefix): + """Create a schema with job metadata enabled.""" + # Enable job metadata for this test + original_setting = dj.config.jobs.add_job_metadata + dj.config.jobs.add_job_metadata = True + + schema = dj.Schema(prefix + "_job_metadata", connection=connection_test) + + class Source(dj.Lookup): + definition = """ + source_id : uint8 + --- + value : float32 + """ + contents = [(1, 1.0), (2, 2.0), (3, 3.0)] + + class ComputedWithMetadata(dj.Computed): + definition = """ + -> Source + --- + result : float32 + """ + + def make(self, key): + time.sleep(0.01) # Small delay to ensure non-zero duration + source = (Source & key).fetch1() + self.insert1({**key, "result": source["value"] * 2}) + + class ImportedWithMetadata(dj.Imported): + definition = """ + -> Source + --- + imported_value : float32 + """ + + def make(self, key): + source = (Source & key).fetch1() + self.insert1({**key, "imported_value": source["value"] + 10}) + + class ManualTable(dj.Manual): + definition = """ + manual_id : uint8 + --- + data : float32 + """ + + class ComputedWithPart(dj.Computed): + definition = """ + -> Source + --- + total : float32 + """ + + class Detail(dj.Part): + definition = """ + -> master + detail_idx : uint8 + --- + detail_value : float32 + """ + + def make(self, key): + source = (Source & key).fetch1() + self.insert1({**key, "total": source["value"] * 3}) + self.Detail.insert1({**key, "detail_idx": 0, "detail_value": source["value"]}) + + context = { + "Source": Source, + "ComputedWithMetadata": ComputedWithMetadata, + "ImportedWithMetadata": ImportedWithMetadata, + "ManualTable": ManualTable, + "ComputedWithPart": ComputedWithPart, + } + + schema(Source, context=context) + schema(ComputedWithMetadata, context=context) + schema(ImportedWithMetadata, context=context) + schema(ManualTable, context=context) + schema(ComputedWithPart, context=context) + + yield { + "schema": schema, + "Source": Source, + "ComputedWithMetadata": ComputedWithMetadata, + "ImportedWithMetadata": ImportedWithMetadata, + "ManualTable": ManualTable, + "ComputedWithPart": ComputedWithPart, + } + + # Cleanup + schema.drop() + dj.config.jobs.add_job_metadata = original_setting + + +class TestHiddenJobMetadataDeclaration: + """Test that hidden job metadata columns are added during declaration.""" + + def test_computed_table_has_hidden_metadata(self, schema_job_metadata): + """Computed tables should have hidden job metadata columns.""" + table = schema_job_metadata["ComputedWithMetadata"] + # Force heading to load from database + _ = table.heading.attributes + # Check _attributes (includes hidden) + all_attrs = table.heading._attributes + assert all_attrs is not None, "heading._attributes should not be None after loading" + assert "_job_start_time" in all_attrs + assert "_job_duration" in all_attrs + assert "_job_version" in all_attrs + # Check that they're hidden + assert all_attrs["_job_start_time"].is_hidden + assert all_attrs["_job_duration"].is_hidden + assert all_attrs["_job_version"].is_hidden + + def test_imported_table_has_hidden_metadata(self, schema_job_metadata): + """Imported tables should have hidden job metadata columns.""" + table = schema_job_metadata["ImportedWithMetadata"] + _ = table.heading.attributes # Force load + all_attrs = table.heading._attributes + assert "_job_start_time" in all_attrs + assert "_job_duration" in all_attrs + assert "_job_version" in all_attrs + + def test_manual_table_no_hidden_metadata(self, schema_job_metadata): + """Manual tables should NOT have hidden job metadata columns.""" + table = schema_job_metadata["ManualTable"] + _ = table.heading.attributes # Force load + all_attrs = table.heading._attributes + assert "_job_start_time" not in all_attrs + assert "_job_duration" not in all_attrs + assert "_job_version" not in all_attrs + + def test_lookup_table_no_hidden_metadata(self, schema_job_metadata): + """Lookup tables should NOT have hidden job metadata columns.""" + table = schema_job_metadata["Source"] + _ = table.heading.attributes # Force load + all_attrs = table.heading._attributes + assert "_job_start_time" not in all_attrs + assert "_job_duration" not in all_attrs + assert "_job_version" not in all_attrs + + def test_part_table_no_hidden_metadata(self, schema_job_metadata): + """Part tables should NOT have hidden job metadata columns.""" + master = schema_job_metadata["ComputedWithPart"] + part = master.Detail + _ = part.heading.attributes # Force load + all_attrs = part.heading._attributes + assert "_job_start_time" not in all_attrs + assert "_job_duration" not in all_attrs + assert "_job_version" not in all_attrs + + +class TestHiddenJobMetadataPopulation: + """Test that job metadata is populated during make().""" + + def test_metadata_populated_after_make(self, schema_job_metadata): + """Job metadata should be populated after make() completes.""" + table = schema_job_metadata["ComputedWithMetadata"] + table.populate() + + # Fetch hidden attributes using raw SQL since fetch() filters them + conn = table.connection + result = conn.query(f"SELECT _job_start_time, _job_duration, _job_version FROM {table.full_table_name}").fetchall() + assert len(result) == 3 + + for row in result: + start_time, duration, version = row + assert start_time is not None + assert duration is not None + assert duration >= 0 + # Version may be empty string if git not available + assert version is not None + + def test_metadata_not_in_default_fetch(self, schema_job_metadata): + """Hidden metadata should not appear in default fetch().""" + table = schema_job_metadata["ComputedWithMetadata"] + table.populate() + + result = table.fetch(as_dict=True) + for row in result: + assert "_job_start_time" not in row + assert "_job_duration" not in row + assert "_job_version" not in row + + def test_hidden_attrs_not_in_heading_names(self, schema_job_metadata): + """Hidden attributes should not appear in heading.names.""" + table = schema_job_metadata["ComputedWithMetadata"] + _ = table.heading.attributes # Force load + names = table.heading.names + assert "_job_start_time" not in names + assert "_job_duration" not in names + assert "_job_version" not in names + + +class TestHiddenAttributesExcludedFromJoins: + """Test that hidden attributes are excluded from join operations.""" + + def test_hidden_attrs_excluded_from_join(self, schema_job_metadata): + """Hidden attributes should not participate in join matching.""" + computed = schema_job_metadata["ComputedWithMetadata"] + imported = schema_job_metadata["ImportedWithMetadata"] + + # Populate both tables + computed.populate() + imported.populate() + + # Both have _job_start_time, _job_duration, _job_version + # But these should NOT be used for joining + joined = computed * imported + # Should join on source_id only + assert len(joined) == 3 + + # The result heading should not have hidden attributes + assert "_job_start_time" not in joined.heading.names + assert "_job_duration" not in joined.heading.names + + +class TestConfigDisabled: + """Test behavior when add_job_metadata is disabled.""" + + def test_no_metadata_when_disabled(self, connection_test, prefix): + """Tables should not have metadata columns when config is disabled.""" + # Ensure disabled + original_setting = dj.config.jobs.add_job_metadata + dj.config.jobs.add_job_metadata = False + + schema = dj.Schema(prefix + "_no_metadata", connection=connection_test) + + class Source(dj.Lookup): + definition = """ + source_id : uint8 + """ + contents = [(1,), (2,)] + + class ComputedNoMetadata(dj.Computed): + definition = """ + -> Source + --- + result : float32 + """ + + def make(self, key): + self.insert1({**key, "result": 1.0}) + + context = {"Source": Source, "ComputedNoMetadata": ComputedNoMetadata} + schema(Source, context=context) + schema(ComputedNoMetadata, context=context) + + try: + # Force heading to load from database + _ = ComputedNoMetadata.heading.attributes + # Check no hidden metadata columns + all_attrs = ComputedNoMetadata.heading._attributes + assert all_attrs is not None + assert "_job_start_time" not in all_attrs + assert "_job_duration" not in all_attrs + assert "_job_version" not in all_attrs + + # Populate should still work + ComputedNoMetadata.populate() + assert len(ComputedNoMetadata()) == 2 + finally: + schema.drop() + dj.config.jobs.add_job_metadata = original_setting diff --git a/tests/integration/test_jobs.py b/tests/integration/test_jobs.py index 6a44b2689..f68508071 100644 --- a/tests/integration/test_jobs.py +++ b/tests/integration/test_jobs.py @@ -1,130 +1,160 @@ +"""Tests for per-table Job management (AutoPopulate 2.0).""" + import random import string - import datajoint as dj from datajoint.jobs import ERROR_MESSAGE_LENGTH, TRUNCATION_APPENDIX from tests import schema -def test_reserve_job(clean_jobs, subject, schema_any): +def test_reserve_job(clean_jobs, subject, experiment): + """Test job reservation, completion, and error workflows.""" assert subject - table_name = "fake_table" - # reserve jobs - for key in subject.fetch("KEY"): - assert schema_any.jobs.reserve(table_name, key), "failed to reserve a job" + # Refresh jobs to create pending entries + experiment.jobs.refresh() + pending_count = len(experiment.jobs.pending) + assert pending_count > 0, "no pending jobs created" + + # Reserve all pending jobs + keys = experiment.jobs.pending.fetch("KEY") + for key in keys: + assert experiment.jobs.reserve(key), "failed to reserve a job" + + # Try to reserve already-reserved jobs - should fail + for key in keys: + assert not experiment.jobs.reserve(key), "failed to respect reservation" + + # Complete jobs + for key in keys: + experiment.jobs.complete(key) - # refuse jobs - for key in subject.fetch("KEY"): - assert not schema_any.jobs.reserve(table_name, key), "failed to respect reservation" + # Check jobs are completed (or deleted if keep_completed=False) + if dj.config.jobs.keep_completed: + assert len(experiment.jobs.completed) == len(keys) + else: + assert len(experiment.jobs) == 0, "failed to free jobs" - # complete jobs - for key in subject.fetch("KEY"): - schema_any.jobs.complete(table_name, key) - assert not schema_any.jobs, "failed to free jobs" + # Refresh again to create new pending jobs + experiment.jobs.refresh() + keys = experiment.jobs.pending.fetch("KEY") - # reserve jobs again - for key in subject.fetch("KEY"): - assert schema_any.jobs.reserve(table_name, key), "failed to reserve new jobs" + # Reserve and mark as error + for key in keys: + experiment.jobs.reserve(key) + experiment.jobs.error(key, "error message") - # finish with error - for key in subject.fetch("KEY"): - schema_any.jobs.error(table_name, key, "error message") + # Try to reserve error jobs - should fail + for key in keys: + assert not experiment.jobs.reserve(key), "failed to ignore error jobs" - # refuse jobs with errors - for key in subject.fetch("KEY"): - assert not schema_any.jobs.reserve(table_name, key), "failed to ignore error jobs" + # Clear error jobs + experiment.jobs.errors.delete() + assert len(experiment.jobs) == 0, "failed to clear error jobs" - # clear error jobs - (schema_any.jobs & dict(status="error")).delete() - assert not schema_any.jobs, "failed to clear error jobs" +def test_job_status_filters(clean_jobs, subject, experiment): + """Test job status filter properties.""" + # Refresh to create pending jobs + experiment.jobs.refresh() -def test_restrictions(clean_jobs, schema_any): - jobs = schema_any.jobs - jobs.delete() - jobs.reserve("a", {"key": "a1"}) - jobs.reserve("a", {"key": "a2"}) - jobs.reserve("b", {"key": "b1"}) - jobs.error("a", {"key": "a2"}, "error") - jobs.error("b", {"key": "b1"}, "error") + # All should be pending + total = len(experiment.jobs) + assert total > 0 + assert len(experiment.jobs.pending) == total + assert len(experiment.jobs.reserved) == 0 + assert len(experiment.jobs.errors) == 0 - assert len(jobs & {"table_name": "a"}) == 2 - assert len(jobs & {"status": "error"}) == 2 - assert len(jobs & {"table_name": "a", "status": "error"}) == 1 - jobs.delete() + # Reserve some jobs + keys = experiment.jobs.pending.fetch("KEY", limit=2) + for key in keys: + experiment.jobs.reserve(key) + + assert len(experiment.jobs.reserved) == 2 + + # Mark one as error + experiment.jobs.error(keys[0], "test error") + assert len(experiment.jobs.errors) == 1 def test_sigint(clean_jobs, schema_any): + """Test that KeyboardInterrupt is recorded as error.""" + sig_int_table = schema.SigIntTable() try: - schema.SigIntTable().populate(reserve_jobs=True) + sig_int_table.populate(reserve_jobs=True) except KeyboardInterrupt: pass - assert len(schema_any.jobs.fetch()), "SigInt jobs table is empty" - status, error_message = schema_any.jobs.fetch1("status", "error_message") + assert len(sig_int_table.jobs.errors) > 0, "SigInt job error not recorded" + status, error_message = sig_int_table.jobs.errors.fetch1("status", "error_message") assert status == "error" - assert error_message == "KeyboardInterrupt" + assert "KeyboardInterrupt" in error_message def test_sigterm(clean_jobs, schema_any): + """Test that SystemExit is recorded as error.""" + sig_term_table = schema.SigTermTable() try: - schema.SigTermTable().populate(reserve_jobs=True) + sig_term_table.populate(reserve_jobs=True) except SystemExit: pass - assert len(schema_any.jobs.fetch()), "SigTerm jobs table is empty" - status, error_message = schema_any.jobs.fetch1("status", "error_message") + assert len(sig_term_table.jobs.errors) > 0, "SigTerm job error not recorded" + status, error_message = sig_term_table.jobs.errors.fetch1("status", "error_message") assert status == "error" - assert error_message == "SystemExit: SIGTERM received" + assert "SIGTERM" in error_message or "SystemExit" in error_message def test_suppress_dj_errors(clean_jobs, schema_any): - """test_suppress_dj_errors: dj errors suppressible w/o native py blobs""" + """Test that DataJoint errors are suppressible without native py blobs.""" + error_class = schema.ErrorClass() with dj.config.override(enable_python_native_blobs=False): - schema.ErrorClass.populate(reserve_jobs=True, suppress_errors=True) - assert len(schema.DjExceptionName()) == len(schema_any.jobs) > 0 + error_class.populate(reserve_jobs=True, suppress_errors=True) + assert len(schema.DjExceptionName()) == len(error_class.jobs.errors) > 0 -def test_long_error_message(clean_jobs, subject, schema_any): - # create long error message +def test_long_error_message(clean_jobs, subject, experiment): + """Test that long error messages are truncated.""" + # Create long and short error messages long_error_message = "".join(random.choice(string.ascii_letters) for _ in range(ERROR_MESSAGE_LENGTH + 100)) short_error_message = "".join(random.choice(string.ascii_letters) for _ in range(ERROR_MESSAGE_LENGTH // 2)) - assert subject - table_name = "fake_table" - key = subject.fetch("KEY", limit=1)[0] + # Refresh to create pending jobs + experiment.jobs.refresh() + key = experiment.jobs.pending.fetch("KEY", limit=1)[0] - # test long error message - schema_any.jobs.reserve(table_name, key) - schema_any.jobs.error(table_name, key, long_error_message) - error_message = schema_any.jobs.fetch1("error_message") + # Test long error message truncation + experiment.jobs.reserve(key) + experiment.jobs.error(key, long_error_message) + error_message = experiment.jobs.errors.fetch1("error_message") assert len(error_message) == ERROR_MESSAGE_LENGTH, "error message is longer than max allowed" assert error_message.endswith(TRUNCATION_APPENDIX), "appropriate ending missing for truncated error message" - schema_any.jobs.delete() - - # test long error message - schema_any.jobs.reserve(table_name, key) - schema_any.jobs.error(table_name, key, short_error_message) - error_message = schema_any.jobs.fetch1("error_message") + experiment.jobs.delete() + + # Refresh and test short error message (not truncated) + experiment.jobs.refresh() + key = experiment.jobs.pending.fetch("KEY", limit=1)[0] + experiment.jobs.reserve(key) + experiment.jobs.error(key, short_error_message) + error_message = experiment.jobs.errors.fetch1("error_message") assert error_message == short_error_message, "error messages do not agree" assert not error_message.endswith(TRUNCATION_APPENDIX), "error message should not be truncated" - schema_any.jobs.delete() -def test_long_error_stack(clean_jobs, subject, schema_any): - # create long error stack +def test_long_error_stack(clean_jobs, subject, experiment): + """Test that long error stacks are stored correctly.""" + # Create long error stack STACK_SIZE = 89942 # Does not fit into small blob (should be 64k, but found to be higher) long_error_stack = "".join(random.choice(string.ascii_letters) for _ in range(STACK_SIZE)) - assert subject - table_name = "fake_table" - key = subject.fetch("KEY", limit=1)[0] + # Refresh to create pending jobs + experiment.jobs.refresh() + key = experiment.jobs.pending.fetch("KEY", limit=1)[0] - # test long error stack - schema_any.jobs.reserve(table_name, key) - schema_any.jobs.error(table_name, key, "error message", long_error_stack) - error_stack = schema_any.jobs.fetch1("error_stack") + # Test long error stack + experiment.jobs.reserve(key) + experiment.jobs.error(key, "error message", long_error_stack) + error_stack = experiment.jobs.errors.fetch1("error_stack") assert error_stack == long_error_stack, "error stacks do not agree" diff --git a/tests/schema.py b/tests/schema.py index 99a7c457d..d196063d0 100644 --- a/tests/schema.py +++ b/tests/schema.py @@ -16,24 +16,24 @@ class TTest(dj.Lookup): """ definition = """ - key : int # key + key : int32 # key --- - value : int # value + value : int32 # value """ contents = [(k, 2 * k) for k in range(10)] class TTest2(dj.Manual): definition = """ - key : int # key + key : int32 # key --- - value : int # value + value : int32 # value """ class TTest3(dj.Manual): definition = """ - key : int + key : int32 --- value : varchar(300) """ @@ -41,11 +41,11 @@ class TTest3(dj.Manual): class NullableNumbers(dj.Manual): definition = """ - key : int + key : int32 --- - fvalue = null : float - dvalue = null : double - ivalue = null : int + fvalue = null : float32 + dvalue = null : float64 + ivalue = null : int32 """ @@ -54,7 +54,7 @@ class TTestExtra(dj.Manual): clone of Test but with an extra field """ - definition = TTest.definition + "\nextra : int # extra int\n" + definition = TTest.definition + "\nextra : int32 # extra int\n" class TTestNoExtra(dj.Manual): @@ -67,14 +67,11 @@ class TTestNoExtra(dj.Manual): class Auto(dj.Lookup): definition = """ - id :int auto_increment + id : uint8 --- name :varchar(12) """ - - def fill(self): - if not self: - self.insert([dict(name="Godel"), dict(name="Escher"), dict(name="Bach")]) + contents = [(1, "Godel"), (2, "Escher"), (3, "Bach")] class User(dj.Lookup): @@ -94,7 +91,7 @@ class User(dj.Lookup): class Subject(dj.Lookup): definition = """ # Basic information about animal subjects used in experiments - subject_id :int # unique subject id + subject_id :int32 # unique subject id --- real_id :varchar(40) # real-world name. Omit if the same as subject_id species = "mouse" :enum('mouse', 'monkey', 'human') @@ -131,13 +128,13 @@ class Language(dj.Lookup): class Experiment(dj.Imported): definition = """ # information about experiments -> Subject - experiment_id :smallint # experiment number for this subject + experiment_id :int16 # experiment number for this subject --- experiment_date :date # date when experiment was started -> [nullable] User data_path="" :varchar(255) # file path to recorded data notes="" :varchar(2048) # e.g. purpose of experiment - entry_time=CURRENT_TIMESTAMP :timestamp # automatic timestamp + entry_time=CURRENT_TIMESTAMP :datetime # automatic timestamp """ fake_experiments_per_subject = 5 @@ -164,17 +161,17 @@ def make(self, key): class Trial(dj.Imported): definition = """ # a trial within an experiment -> Experiment.proj(animal='subject_id') - trial_id :smallint # trial number + trial_id :int16 # trial number --- - start_time :double # (s) + start_time :float64 # (s) """ class Condition(dj.Part): definition = """ # trial conditions -> Trial - cond_idx : smallint # condition number + cond_idx : int16 # condition number ---- - orientation : float # degrees + orientation : float32 # degrees """ def make(self, key): @@ -191,14 +188,14 @@ class Ephys(dj.Imported): definition = """ # some kind of electrophysiological recording -> Trial ---- - sampling_frequency :double # (Hz) + sampling_frequency :float64 # (Hz) duration :decimal(7,3) # (s) """ class Channel(dj.Part): definition = """ # subtable containing individual channels -> master - channel :tinyint unsigned # channel number within Ephys + channel :uint8 # channel number within Ephys ---- voltage : current = null : # optional current to test null handling @@ -226,7 +223,7 @@ def _make_tuples(self, key): class Image(dj.Manual): definition = """ # table for testing blob inserts - id : int # image identifier + id : int32 # image identifier --- img : # image """ @@ -234,7 +231,7 @@ class Image(dj.Manual): class UberTrash(dj.Lookup): definition = """ - id : int + id : int32 --- """ contents = [(1,)] @@ -243,7 +240,7 @@ class UberTrash(dj.Lookup): class UnterTrash(dj.Lookup): definition = """ -> UberTrash - my_id : int + my_id : int32 --- """ contents = [(1, 1), (1, 2)] @@ -251,7 +248,7 @@ class UnterTrash(dj.Lookup): class SimpleSource(dj.Lookup): definition = """ - id : int # id + id : int32 # id """ contents = [(x,) for x in range(10)] @@ -311,7 +308,7 @@ class IndexRich(dj.Manual): --- -> [unique, nullable] User.proj(first="username") first_date : date - value : int + value : int32 index (first_date, value) """ @@ -319,16 +316,16 @@ class IndexRich(dj.Manual): # Schema for issue 656 class ThingA(dj.Manual): definition = """ - a: int + a: int32 """ class ThingB(dj.Manual): definition = """ - b1: int - b2: int + b1: int32 + b2: int32 --- - b3: int + b3: int32 """ @@ -343,7 +340,7 @@ class ThingC(dj.Manual): # Additional tables for #1159 class ThingD(dj.Manual): definition = """ - d: int + d: int32 --- -> ThingC """ @@ -357,7 +354,7 @@ class ThingE(dj.Manual): class Parent(dj.Lookup): definition = """ - parent_id: int + parent_id: int32 --- name: varchar(30) """ @@ -367,7 +364,7 @@ class Parent(dj.Lookup): class Child(dj.Lookup): definition = """ -> Parent - child_id: int + child_id: int32 --- name: varchar(30) """ @@ -376,12 +373,12 @@ class Child(dj.Lookup): # Related to issue #886 (8), #883 (5) class ComplexParent(dj.Lookup): - definition = "\n".join(["parent_id_{}: int".format(i + 1) for i in range(8)]) + definition = "\n".join(["parent_id_{}: int32".format(i + 1) for i in range(8)]) contents = [tuple(i for i in range(8))] class ComplexChild(dj.Lookup): - definition = "\n".join(["-> ComplexParent"] + ["child_id_{}: int".format(i + 1) for i in range(1)]) + definition = "\n".join(["-> ComplexParent"] + ["child_id_{}: int32".format(i + 1) for i in range(1)]) contents = [tuple(i for i in range(9))] @@ -443,16 +440,16 @@ class SessionDateA(dj.Lookup): class Stimulus(dj.Lookup): definition = """ - id: int + id: int32 --- - contrast: int - brightness: int + contrast: int32 + brightness: int32 """ class Longblob(dj.Manual): definition = """ - id: int + id: int32 --- data: """ diff --git a/tests/schema_alter.py b/tests/schema_alter.py index ef8b35f0c..936d9cc12 100644 --- a/tests/schema_alter.py +++ b/tests/schema_alter.py @@ -6,30 +6,30 @@ class Experiment(dj.Imported): original_definition = """ # information about experiments -> Subject - experiment_id :smallint # experiment number for this subject + experiment_id :int16 # experiment number for this subject --- experiment_date :date # date when experiment was started -> [nullable] User data_path="" :varchar(255) # file path to recorded data notes="" :varchar(2048) # e.g. purpose of experiment - entry_time=CURRENT_TIMESTAMP :timestamp # automatic timestamp + entry_time=CURRENT_TIMESTAMP :datetime # automatic timestamp """ definition1 = """ # Experiment -> Subject - experiment_id :smallint # experiment number for this subject + experiment_id :int16 # experiment number for this subject --- - data_path : int # some number + data_path : int32 # some number extra=null : # just testing -> [nullable] User subject_notes=null :varchar(2048) # {notes} e.g. purpose of experiment - entry_time=CURRENT_TIMESTAMP :timestamp # automatic timestamp + entry_time=CURRENT_TIMESTAMP :datetime # automatic timestamp """ class Parent(dj.Manual): definition = """ - parent_id: int + parent_id: int32 """ class Child(dj.Part): @@ -39,7 +39,7 @@ class Child(dj.Part): definition_new = """ -> master --- - child_id=null: int + child_id=null: int32 """ class Grandchild(dj.Part): @@ -49,7 +49,7 @@ class Grandchild(dj.Part): definition_new = """ -> master.Child --- - grandchild_id=null: int + grandchild_id=null: int32 """