diff --git a/minerva/data/data_modules/har.py b/minerva/data/data_modules/har.py new file mode 100644 index 0000000..dfa2209 --- /dev/null +++ b/minerva/data/data_modules/har.py @@ -0,0 +1,695 @@ +import os +import random +from collections import defaultdict +from pathlib import Path +from typing import Callable, Dict, List, Optional, Tuple, Union + +import lightning as L +from torch.utils.data import ConcatDataset, DataLoader, Subset + +from minerva.data.datasets.series_dataset import ( + MultiModalSeriesCSVDataset, + SeriesFolderCSVDataset, +) +from minerva.samplers.domain_sampler import RandomDomainSampler +from minerva.utils.typing import PathLike + + +def parse_transforms( + transforms: Union[List[Callable], Dict[str, List[Callable]]], +) -> Dict[str, List[Callable]]: + """Parse the transforms parameter to a dictionary with the split name as + key and a list of transforms as value. + + Parameters + ---------- + transforms : Union[List[Callable], Dict[str, List[Callable]]] + This could be: + - None: No transforms will be applied + - List[Callable]: A list of transforms that will be applied to the + data. The same transforms will be applied to all splits. + - Dict[str, List[Callable]]: A dictionary with the split name as + key and a list of transforms as value. The split name must be + one of: "train", "validation", "test" or "predict". + + Returns + ------- + Dict[str, List[Callable]] + A dictionary with the split name as key and a list of transforms as + value. + """ + if isinstance(transforms, list) or transforms is None: + return { + "train": transforms, + "validation": transforms, + "test": transforms, + "predict": transforms, + } + elif isinstance(transforms, dict): + # Check if the keys are valid + valid_keys = ["train", "validation", "test", "predict"] + assert all( + key in valid_keys for key in transforms.keys() + ), f"Invalid transform key. Must be one of: {valid_keys}" + new_transforms = { + "train": None, + "validation": None, + "test": None, + "predict": None, + } + new_transforms.update(transforms) + return new_transforms + + +def parse_num_workers(num_workers: Optional[int] = None) -> int: + """Parse the num_workers parameter. If None, use all cores. + + Parameters + ---------- + num_workers : int + Number of workers to load data. If None, then use all cores + + Returns + ------- + int + Number of workers to load data. + """ + n = num_workers if num_workers is not None else os.cpu_count() + return n or 1 + + +class UserActivityFolderDataModule(L.LightningDataModule): + def __init__( + self, + # Dataset Params + data_path: PathLike, + features: List[str] = ( + "accel-x", + "accel-y", + "accel-z", + "gyro-x", + "gyro-y", + "gyro-z", + ), # type: ignore + label: str = "standard activity code", + pad: bool = False, + transforms: Optional[Union[List[Callable], Dict[str, List[Callable]]]] = None, + cast_to: str = "float32", + # Loader params + batch_size: int = 1, + num_workers: Optional[int] = None, + ): + """Define the dataloaders for train, validation and test splits for + HAR datasets. The data must be in the following folder structure: + It is a wrapper around ``SeriesFolderCSVDataset`` dataset class. + The ``SeriesFolderCSVDataset`` class assumes that the data is in a + folder with multiple CSV files. Each CSV file is a single sample that + can be composed of multiple time steps (rows). Each column is a feature + of the sample. + + For instance, if we have two samples, user-1.csv and user-2.csv, + the directory structure will look something like: + + data_path + ├── user-1.csv + └── user-2.csv + + And the data will look something like: + - user-1.csv: + +---------+---------+--------+ + | accel-x | accel-y | class | + +---------+---------+--------+ + | 0.502123| 0.02123 | 1 | + | 0.682012| 0.02123 | 1 | + | 0.498217| 0.00001 | 1 | + +---------+---------+--------+ + - user-2.csv: + +---------+---------+--------+ + | accel-x | accel-y | class | + +---------+---------+--------+ + | 0.502123| 0.02123 | 0 | + | 0.682012| 0.02123 | 0 | + | 0.498217| 0.00001 | 0 | + | 3.141592| 1.414141| 0 | + +---------+---------+--------+ + + The ``features`` parameter is used to select the columns that will be + used as features. For instance, if we want to use only the accel-x + column, we can set ``features=["accel-x"]``. If we want to use both + accel-x and accel-y, we can set ``features=["accel-x", "accel-y"]``. + + The label column is specified by the ``label`` parameter. Note that we + have one label per time-step and not a single label per sample. + + The dataset will return a 2-element tuple with the data and the label, + if the ``label`` parameter is specified, otherwise return only the data. + + + Parameters + ---------- + data_path : PathLike + The location of the directory with CSV files. + features: List[str] + A list with column names that will be used as features. If None, + all columns except the label will be used as features. + pad: bool, optional + If True, the data will be padded to the length of the longest + sample. Note that padding will be applyied after the transforms, + and also to the labels if specified. + label: str, optional + Specify the name of the column with the label of the data + transforms : Union[List[Callable], Dict[str, List[Callable]]], optional + This could be: + - None: No transforms will be applied + - List[Callable]: A list of transforms that will be applied to the + data. The same transforms will be applied to all splits. + - Dict[str, List[Callable]]: A dictionary with the split name as + key and a list of transforms as value. The split name must be + one of: "train", "validation", "test" or "predict". + cast_to: str, optional + Cast the numpy data to the specified type + batch_size : int, optional + The size of the batch + num_workers : int, optional + Number of workers to load data. If None, then use all cores + """ + super().__init__() + + # ---- Dataset Parameters ---- + # Allowing multiple datasets + self.data_path = Path(data_path) + self.features = features + self.label = label + self.pad = pad + self.transforms = parse_transforms(transforms) + + # ---- Loader Parameters ---- + self.batch_size = batch_size + self.num_workers = parse_num_workers(num_workers) + self.cast_to = cast_to + + # ---- Class specific ---- + self.datasets = {} + + def _load_dataset(self, split_name: str) -> SeriesFolderCSVDataset: + """Create a ``SeriesFolderCSVDataset`` dataset with the given split. + + Parameters + ---------- + split_name : str + Name of the split (train, validation or test). This will be used to + load the corresponding CSV file. + + Returns + ------- + SeriesFolderCSVDataset + The dataset with the given split. + """ + assert split_name in [ + "train", + "validation", + "test", + "predict", + ], f"Invalid split_name: {split_name}" + + if split_name == "predict": + split_name = "test" + + return SeriesFolderCSVDataset( + self.data_path / split_name, + features=self.features, + label=self.label, + pad=self.pad, + transforms=self.transforms[split_name], + cast_to=self.cast_to, + ) + + def setup(self, stage: str): + """Assign the datasets to the corresponding split. ``self.datasets`` + will be a dictionary with the split name as key and the dataset as + value. + + Parameters + ---------- + stage : str + The stage of the setup. This could be: + - "fit": Load the train and validation datasets + - "test": Load the test dataset + - "predict": Load the predict dataset + + Raises + ------ + ValueError + If the stage is not one of: "fit", "test" or "predict" + """ + if stage == "fit": + self.datasets["train"] = self._load_dataset("train") + self.datasets["validation"] = self._load_dataset("validation") + elif stage == "test": + self.datasets["test"] = self._load_dataset("test") + elif stage == "predict": + self.datasets["predict"] = self._load_dataset("test") + else: + raise ValueError(f"Invalid setup stage: {stage}") + + def _get_loader(self, split_name: str, shuffle: bool) -> DataLoader: + """Get a dataloader for the given split. + + Parameters + ---------- + split_name : str + The name of the split. This must be one of: "train", "validation", + "test" or "predict". + shuffle : bool + Shuffle the data or not. + + Returns + ------- + DataLoader + A dataloader for the given split. + """ + return DataLoader( + self.datasets[split_name], + batch_size=self.batch_size, + num_workers=self.num_workers, + shuffle=shuffle, + pin_memory=True, + ) + + def train_dataloader(self) -> DataLoader: + return self._get_loader("train", shuffle=True) + + def val_dataloader(self) -> DataLoader: + return self._get_loader("validation", shuffle=False) + + def test_dataloader(self) -> DataLoader: + return self._get_loader("test", shuffle=False) + + def predict_dataloader(self) -> DataLoader: + return self._get_loader("predict", shuffle=False) + + def __str__(self): + return f"UserActivityFolderDataModule(data_path={self.data_path}, batch_size={self.batch_size})" + + def __repr__(self) -> str: + return str(self) + + +class MultiModalHARSeriesDataModule(L.LightningDataModule): + def __init__( + self, + # Dataset params + data_path: PathLike | List[PathLike], + feature_prefixes: List[str] = ( + "accel-x", + "accel-y", + "accel-z", + "gyro-x", + "gyro-y", + "gyro-z", + ), # type: ignore + label: str = "standard activity code", + features_as_channels: bool = True, + transforms: Optional[Union[List[Callable], Dict[str, List[Callable]]]] = None, + cast_to: str = "float32", + # Loader params + batch_size: int = 1, + num_workers: Optional[int] = None, + data_percentage: float = 1.0, + use_train_as_validation: bool = False, + use_val_with_train: bool = False, + map_labels: Optional[Dict[int, int]] = None, + drop_last: bool = True, + n_domains_per_sample: Optional[int] = None, + samples_per_class: Optional[int] = None, + seed: Optional[int] = None, + predict_split: str = "test", + shuffle_train: bool = True, + ): + """Define the dataloaders for train, validation and test splits for + HAR datasets. This datasets assumes that the data is in a single CSV + file with series of data. Each row is a single sample that can be + composed of multiple modalities (series). Each column is a feature of + some series with the prefix indicating the series. The suffix may + indicates the time step. For instance, if we have two series, accel-x + and accel-y, the data will look something like: + + +-----------+-----------+-----------+-----------+--------+ + | accel-x-0 | accel-x-1 | accel-y-0 | accel-y-1 | class | + +-----------+-----------+-----------+-----------+--------+ + | 0.502123 | 0.02123 | 0.502123 | 0.502123 | 0 | + | 0.6820123 | 0.02123 | 0.502123 | 0.502123 | 1 | + | 0.498217 | 0.00001 | 1.414141 | 3.141592 | 2 | + +-----------+-----------+-----------+-----------+--------+ + + The ``feature_prefixes`` parameter is used to select the columns that + will be used as features. For instance, if we want to use only the + accel-x series, we can set ``feature_prefixes=["accel-x"]``. If we want + to use both accel-x and accel-y, we can set + ``feature_prefixes=["accel-x", "accel-y"]``. If None is passed, all + columns will be used as features, except the label column. + The label column is specified by the ``label`` parameter. + + The dataset will return a 2-element tuple with the data and the label, + if the ``label`` parameter is specified, otherwise return only the data. + + If ``features_as_channels`` is ``True``, the data will be returned as a + vector of shape `(C, T)`, where C is the number of channels (features) + and `T` is the number of time steps. Else, the data will be returned as + a vector of shape T*C (a single vector with all the features). + + Parameters + ---------- + data_path : PathLike + The path to the folder with "train.csv", "validation.csv" and + "test.csv" files inside it. + feature_prefixes : Union[str, List[str]], optional + The prefix of the column names in the dataframe that will be used + to become features. If None, all columns except the label will be + used as features. + label : str, optional + The name of the column that will be used as label + features_as_channels : bool, optional + If True, the data will be returned as a vector of shape (C, T), + else the data will be returned as a vector of shape T*C. + cast_to: str, optional + Cast the numpy data to the specified type + transforms : Union[List[Callable], Dict[str, List[Callable]]], optional + This could be: + - None: No transforms will be applied + - List[Callable]: A list of transforms that will be applied to the + data. The same transforms will be applied to all splits. + - Dict[str, List[Callable]]: A dictionary with the split name as + key and a list of transforms as value. The split name must be + one of: "train", "validation", "test" or "predict". + batch_size : int, optional + The size of the batch + num_workers : int, optional + Number of workers to load data. If None, then use all cores + data_percentage : float, optional + The percentage of the data that will be used. This is useful to + create a small datasets. + use_train_as_validation : bool, optional + If True, the train dataset will be used as validation dataset. + use_val_with_train: bool, optional + If True, the validation and train sets will be concatenated in + order to create a large train set. By default, this is False. + map_labels : Dict[int, int], optional + A dictionary to map the labels to a new label. The key is the + original label and the value is the new label. + drop_last : bool, optional + Drop the last batch if it is not complete. + n_domains_per_sample : int, optional + This is inly useful when using multiple domains (`data_path`). It + will allow creating batches with same number of samples from + multiple domains. If None, it will just use concatenate all + datasets and sample in a non-stratified way. By default, None- + samples_per_class : int, optional + If not None, use this number of samples per class for the train + split. This will override the data_percentage parameter. + seed: Optional[int] = None + Seed for sampling the dataset. If None, no seed is set. + predict_split: str + The name of the split to use for prediction. This will be used to + load the dataset for prediction. By default, this is "test". + shuffle_train: str + If True, the train dataset will be shuffled. + + Notes + ----- + - If `data_percentage` is set to a value less than 1.0, a random subset + of the dataset will be used, containing approximately the specified + percentage of the total data. This sampling is not stratified. + - If `samples_per_class` is specified, the train split will contain an + equal number of samples for each class, as defined by this + parameter. This option is mutually exclusive with data_percentage; + both cannot be used at the same time. + - The `seed` parameter controls the randomness of sampling: If `seed` is + set (i.e., an integer), sampling becomes deterministic, ensuring + the same subset is selected on each run. This improves + reproducibility and supports cumulative sampling—for example, + progressively increasing `samples_per_class` will retain consistency + across runs by sampling the same initial elements. If seed is + `None`, sampling is non-deterministic, and different subsets may + be chosen each time. + + Raises + ------ + ValueError + If `samples_per_class` and `data_percentage` are both set. + """ + super().__init__() + self.data_path = data_path if isinstance(data_path, list) else [data_path] + self.data_path = [Path(data) for data in self.data_path] + self.feature_prefixes = feature_prefixes + if isinstance(self.feature_prefixes, tuple): + self.feature_prefixes = list(self.feature_prefixes) + self.label = label + self.features_as_channels = features_as_channels + self.transforms = parse_transforms(transforms) + self.cast_to = cast_to + self.batch_size = batch_size + self.num_workers = parse_num_workers(num_workers) + self.data_percentage = data_percentage + self.datasets = {} + self.use_train_as_validation = use_train_as_validation + self.use_val_with_train = use_val_with_train + self.map_labels = map_labels + self.drop_last = drop_last + self.n_domains_per_sample = n_domains_per_sample + self.samples_per_class = samples_per_class + if self.data_percentage < 1.0 and self.samples_per_class is not None: + raise ValueError("Cannot use both data_percentage and samples_per_class") + + if self.data_percentage > 1.0 or self.data_percentage < 0.0: + raise ValueError("data_percentage must be between 0 and 1.0, inclusive.") + self.seed = seed + self.rng = random.Random(seed) + self.predict_split = predict_split + self.shuffle_train = shuffle_train + + def _sample_dataset(self, dataset): + """Sample the dataset based on the specified parameters. + + If `samples_per_class` is specified, a subset will be created + containing the specified number of samples for each class. If + `data_percentage` is specified, a random subset of the dataset + will be created containing approximately the specified percentage + of the total data. If neither is specified, the entire dataset + will be returned. + + Note + ----- + The `seed` parameter controls the randomness of sampling: + If `seed` is set (i.e., an integer), sampling becomes deterministic, + ensuring the same subset is selected on each run and allowing for + cumulative sampling (e.g., progressively increasing `samples_per_class` + will retain consistency across runs by sampling the same initial + elements). If `seed` is `None`, sampling is non-deterministic, and + different subsets may be chosen each time. + + Parameters + ---------- + dataset: Dataset + A map-like dataset to sample from. This should be a- + + Returns + ------- + Dataset + A sampled dataset. + + Raises + ------ + ValueError + If `samples_per_class` is specified and a class has fewer + samples than the specified number. + """ + if self.samples_per_class is not None: + if self.samples_per_class <= 0: + raise ValueError("samples_per_class must be a positive integer.") + class_indices = defaultdict(list) + dset_indices = list(range(len(dataset))) + self.rng.shuffle(dset_indices) # Shuffle indices to ensure randomness + + # Group indices by class (note that are shuffled before!) + # Thus, each list in class_indices will contain indices of samples (shuffled) + for idx in dset_indices: + label = dataset[idx][1] + class_indices[label].append(idx) + + sampled_indices = [] + for label, indices in class_indices.items(): + if len(indices) < self.samples_per_class: + raise ValueError( + f"Class {label} has only {len(indices)} samples, " + f"but {self.samples_per_class} were requested." + ) + sampled_indices += indices[: self.samples_per_class] + subset_ = Subset(dataset, sampled_indices) + + return subset_ + + elif self.data_percentage < 1.0: + # Existing percentage-based sampling + indices = list(range(len(dataset))) + self.rng.shuffle(indices) + indices = indices[: int(self.data_percentage * len(dataset))] + subset_ = Subset(dataset, indices) + + return subset_ + else: + return dataset + + def _load_dataset( + self, split_name: str + ) -> Tuple[Union[MultiModalSeriesCSVDataset, ConcatDataset], List[int]]: + """Create a ``MultiModalSeriesCSVDataset`` dataset with the given split. + + Parameters + ---------- + split_name : str + The name of the split. This must be one of: "train", "validation", + "test" or "predict". + + Returns + ------- + MultiModalSeriesCSVDataset + A MultiModalSeriesCSVDataset dataset with the given split. + """ + if split_name == "predict": + split_name = self.predict_split + + assert split_name in [ + "train", + "validation", + "test", + "predict", + ], f"Invalid split_name: {split_name}" + + datasets = [] + domain_labels = [] + for i, data in enumerate(self.data_path): + data = Path(data) + dataset = MultiModalSeriesCSVDataset( + data / f"{split_name}.csv", + feature_prefixes=self.feature_prefixes, + label=self.label, + features_as_channels=self.features_as_channels, + cast_to=self.cast_to, + transforms=self.transforms[split_name], + map_labels=self.map_labels, + ) + + # Apply sampling only to the train split + if split_name == "train": + dataset = self._sample_dataset(dataset) + + domain_labels += [i] * len(dataset) + datasets.append(dataset) + + if len(datasets) == 1: + return datasets[0], domain_labels + else: + return ConcatDataset(datasets), domain_labels + + def setup(self, stage: str): + """Assign the datasets to the corresponding split. ``self.datasets`` + will be a dictionary with the split name as key and the dataset as + value. + + Parameters + ---------- + stage : str + The stage of the setup. This could be: + - "fit": Load the train and validation datasets + - "test": Load the test dataset + - "predict": Load the predict dataset + + Raises + ------ + ValueError + If the stage is not one of: "fit", "test" or "predict" + """ + if stage == "fit": + self.datasets["train"] = self._load_dataset("train") + + if self.use_val_with_train: + train_dataset, train_domains = self.datasets["train"] + val_dataset, val_domains = self._load_dataset("validation") + train_dataset = ConcatDataset([train_dataset, val_dataset]) + train_domains += val_domains + self.datasets["train"] = (train_dataset, train_domains) + + if self.use_train_as_validation: + self.datasets["validation"] = self.datasets["train"] + else: + self.datasets["validation"] = self._load_dataset("validation") + + elif stage == "test": + self.datasets["test"] = self._load_dataset("test") + elif stage == "predict": + self.datasets["predict"] = self._load_dataset("predict") + else: + raise ValueError(f"Invalid setup stage: {stage}") + + def _get_loader(self, split_name: str, shuffle: bool) -> DataLoader: + """Get a dataloader for the given split. + + Parameters + ---------- + split_name : str + The name of the split. This must be one of: "train", "validation", + "test" or "predict". + shuffle : bool + Shuffle the data or not. + + Returns + ------- + DataLoader + A dataloader for the given split. + """ + dataset, domain_labels = self.datasets[split_name] + if self.n_domains_per_sample is not None: + print( + f"Using DataLoader with RandomDomainSampler with n_domains_per_sample={self.n_domains_per_sample}" + ) + sampler = RandomDomainSampler( + dataset, + domain_labels, + batch_size=self.batch_size, + consistent_iterating=False, + n_domains_per_sample=self.n_domains_per_sample, + ) + return DataLoader( + dataset, + batch_sampler=sampler, + num_workers=self.num_workers, + ) + else: + print(f"Using DataLoader with shuffle={shuffle}") + return DataLoader( + dataset, + batch_size=self.batch_size, + num_workers=self.num_workers, + shuffle=shuffle, + pin_memory=True, + drop_last=self.drop_last, + ) + + def train_dataloader(self) -> DataLoader: + return self._get_loader("train", shuffle=self.shuffle_train) + + def val_dataloader(self) -> DataLoader: + return self._get_loader("validation", shuffle=False) + + def test_dataloader(self) -> DataLoader: + return self._get_loader("test", shuffle=False) + + def predict_dataloader(self) -> DataLoader: + return self._get_loader("predict", shuffle=False) + + def __str__(self): + return f"MultiModalHARSeriesDataModule(data_path={', '.join([str(d) for d in self.data_path])}, batch_size={self.batch_size})" + + def __repr__(self) -> str: + return str(self) diff --git a/minerva/data/data_modules/har_rodrigues_24.py b/minerva/data/data_modules/har_rodrigues_24.py new file mode 100644 index 0000000..2c3155b --- /dev/null +++ b/minerva/data/data_modules/har_rodrigues_24.py @@ -0,0 +1,132 @@ +from typing import List, Optional, Union + +from lightning import LightningDataModule +from torch.utils.data import DataLoader + +from minerva.data.datasets.har_rodrigues_24 import HARDatasetCPC +from minerva.utils.typing import PathLike + + +# Defining the data loader for the implementation +class HARDataModuleCPC(LightningDataModule): + def __init__( + self, + data_path: Union[PathLike, List[PathLike]], + input_size: int = 6, + window: int = 60, + overlap: int = 30, + batch_size: int = 64, + use_train_as_val: bool = False, + use_val_with_train: bool = True, + columns: Optional[List[str]] = None, + num_workers: int = 8, + drop_last: bool = True, + label: Optional[str] = "standard activity code", + transpose_data: bool = True, + ): + """Data module for Human Activity Recognition (HAR) using CPC. + + This class handles the creation of training, validation, and test + dataloaders for the HAR dataset. It uses the HARDatasetCPC class to + load the data. + + Parameters + ---------- + data_path : Union[PathLike, List[PathLike]] + The root directory where the dataset is stored. If a list is + the datasets will be concatenated, in their respective order, to + each partition (train, val, test). + input_size : int, optional + The number of input features (default is 6). + window : int, optional + The size of the sliding window (default is 60). + overlap : int, optional + The overlap size for the sliding window (default is 30). + batch_size : int, optional + The batch size for the dataloaders (default is 64). + use_val_with_train : bool + Whether to use the training set with validation set togheter. + label : Optional[str] + The column to be used as the label. If None, no labels will be + used. If 'return_index_as_label', the index of the data will be + used as the label. + transpose_data : bool + If True, the data will be returned as a vector of shape (C, T), + else the data will be returned as a vector of shape (T, C). + """ + super().__init__() + self.data_path = data_path + self.batch_size = batch_size + self.num_workers = num_workers + self.drop_last = drop_last + self.label = label + self.transpose_data = transpose_data + + self.train_dataset = HARDatasetCPC( + data_path, + input_size, + window, + overlap, + phase="train", + use_train_as_val=use_train_as_val, + use_val_with_train=use_val_with_train, + columns=columns, + label=label, + transpose_data=transpose_data, + ) + self.val_dataset = HARDatasetCPC( + data_path, + input_size, + window, + overlap, + phase="val", + use_train_as_val=use_train_as_val, + use_val_with_train=use_val_with_train, + columns=columns, + label=label, + transpose_data=transpose_data, + ) + self.test_dataset = HARDatasetCPC( + data_path, + input_size, + window, + overlap, + phase="test", + use_train_as_val=use_train_as_val, + use_val_with_train=use_val_with_train, + columns=columns, + label=label, + transpose_data=transpose_data, + ) + + def train_dataloader(self): + return DataLoader( + self.train_dataset, + batch_size=self.batch_size, + shuffle=True, + drop_last=self.drop_last, + num_workers=self.num_workers, + ) + + def val_dataloader(self): + return DataLoader( + self.val_dataset, + batch_size=self.batch_size, + shuffle=False, + drop_last=self.drop_last, + num_workers=self.num_workers, + ) + + def test_dataloader(self): + return DataLoader( + self.test_dataset, + batch_size=self.batch_size, + shuffle=False, + drop_last=self.drop_last, + num_workers=self.num_workers, + ) + + def __repr__(self): + return ( + f"HARDataModuleCPC(batch_size={self.batch_size}, datasets={self.data_path})" + ) diff --git a/minerva/data/data_modules/har_xu_23.py b/minerva/data/data_modules/har_xu_23.py new file mode 100644 index 0000000..9e8a740 --- /dev/null +++ b/minerva/data/data_modules/har_xu_23.py @@ -0,0 +1,165 @@ +import os +from pathlib import Path +from typing import List + +import lightning as L +import numpy as np +from torch.utils.data import DataLoader + +from minerva.data.datasets.har_xu_23 import HarDataset, TNCDataset +from minerva.utils.typing import PathLike + + +class HarDataModule(L.LightningDataModule): + def __init__( + self, + processed_data_dir: PathLike, + batch_size: int = 16, + mc_sample_size: int = 5, + epsilon: int = 3, + adf: bool = True, + window_size: int = 128, + use_train_as_val: bool = False, + num_workers: int = 8, + use_val_with_train: bool = False, + ): + """ + This DataModule handles the loading and preparation of data for + training, validation, and testing. The data is expected to be stored + in 3 numpy (.npy) files named `train_data.npy`, `val_data.npy`, and + `test_data.npy`. They are NumPy arrays storing the concatenated + accelerometer and gyroscope data. + + This numpy arrays (files) must have the following shape (n_samples, + n_timesteps, n_channels) and are produced at specific window size by + another data processing script available in + https://github.com/maxxu05/rebar/blob/main/data/process/har_processdata.py + + The original files have exact shape of: + - `train_data.npy`: `(41, 15038, 6)` + - `val_data.npy`: `(9, 15038, 6)` + - `test_data.npy`: `(9, 15038, 6)` + + The Python script performs a series of tasks to facilitate the + preprocessing and organization of dataset, processing + The raw accelerometer and gyroscope data for each participant are, + filtering out sequences shorter than a set threshold. + The data is then split into training, validation, and test sets, which + are saved as NumPy arrays along with corresponding participant names. + + For the dataloader, the .npy files are transposed into the shape + (n_samples, n_channels, n_timesteps) and passed to the TNCDataset + + Parameters + ---------- + processed_data_dir: PathLike + Path to the directory where the processed .npy files are stored. + Inside this path must have 3 files, named train_data.npy, + val_data.npy, and test_data.npy. + batch_size : int, optional + The batch size to use for the DataLoader. Defaults to 16. + mc_sample_size : int, optional + This value determines how many neighboring and non-neighboring + windows are used per data sample. Defaults to 5. + epsilon : int, optional + This parameter controls the "spread" of neighboring windows. + adf : bool, optional + Flag indicating whether to use ADF (Augmented Dickey-Fuller) + testing for finding neighbors. Defaults to True. + window_size : int, optional + The size of the windows to be used for each sample in the TNC + dataset. Defaults to 128. + use_val_with_train : bool, optional + If True, the validation and train sets will be concatenated in + order to create a large train set. By default, this is True. + """ + super().__init__() + self.processed_data_dir = Path(processed_data_dir) + self.batch_size = batch_size + self.mc_sample_size = mc_sample_size + self.epsilon = epsilon + self.adf = adf + self.window_size = window_size + self.num_workers = num_workers + self.use_val_with_train = use_val_with_train + + self.har_train = np.load(self.processed_data_dir / "train_data.npy") + self.har_val = np.load(self.processed_data_dir / "val_data.npy") + self.har_test = np.load(self.processed_data_dir / "test_data.npy") + + # Handle use_val_with_train and use_train_as_val + if use_train_as_val: + self.har_val = self.har_train + elif use_val_with_train: + self.har_train = np.concatenate([self.har_train, self.har_val], axis=0) + + # Print dataset sizes after concatenation + # print(f"\nFinal Training Data Size: {self.har_train.shape}") + # print(f"Final Validation Data Size: {self.har_val.shape}") + # print(f"Final Test Data Size: {self.har_test.shape}") + + def train_dataloader(self): + """ + Returns the DataLoader for the training dataset. + + Returns + ------- + DataLoader + DataLoader for the training dataset. + """ + return DataLoader( + TNCDataset( + np.transpose(self.har_train, (0, 2, 1)), + self.mc_sample_size, + self.window_size, + self.epsilon, + self.adf, + ), + batch_size=self.batch_size, + shuffle=True, + num_workers=self.num_workers, + ) + + def val_dataloader(self): + """ + Returns the DataLoader for the validation dataset. + + Returns + ------- + DataLoader + DataLoader for the validation dataset. + """ + return DataLoader( + TNCDataset( + np.transpose(self.har_val, (0, 2, 1)), + self.mc_sample_size, + self.window_size, + self.epsilon, + self.adf, + ), + batch_size=self.batch_size, + shuffle=False, + num_workers=self.num_workers, + ) + + def test_dataloader(self): + """ + Returns the DataLoader for the test dataset. + + Returns + ------- + DataLoader + DataLoader for the test dataset. + """ + return DataLoader( + TNCDataset( + np.transpose(self.har_test, (0, 2, 1)), + self.mc_sample_size, + self.window_size, + self.epsilon, + self.adf, + ), + batch_size=self.batch_size, + shuffle=False, + num_workers=self.num_workers, + ) diff --git a/minerva/data/datasets/har_rodrigues_24.py b/minerva/data/datasets/har_rodrigues_24.py new file mode 100644 index 0000000..c0bcc0d --- /dev/null +++ b/minerva/data/datasets/har_rodrigues_24.py @@ -0,0 +1,275 @@ +import os +from pathlib import Path +from typing import Iterable, List, Optional, Union + +import numpy as np +import pandas as pd +import torch +from numpy.lib.stride_tricks import as_strided as ast +from torch.utils.data import Dataset + +from minerva.utils.typing import PathLike + + +def norm_shape(shape): + """ + Normalize numpy array shapes so they're always expressed as a tuple, + even for one-dimensional shapes. + + Parameters + ---------- + shape : int, tuple, or numpy.ndarray + The shape to be normalized. + + Returns + ------- + Tuple[int, ...] + The normalized shape. + """ + if isinstance(shape, int): + return (shape,) + elif isinstance(shape, tuple): + return shape + elif isinstance(shape, np.ndarray): + return tuple(shape.tolist()) + else: + raise TypeError("shape must be an int, a tuple of ints, or a numpy array") + + +def sliding_window(a, ws, ss=None, flatten=True): + """ + Return a sliding window over a in any number of dimensions + + Parameters: + a - an n-dimensional numpy array + ws - an int (a is 1D) or tuple (a is 2D or greater) representing the size + of each dimension of the window + ss - an int (a is 1D) or tuple (a is 2D or greater) representing the + amount to slide the window in each dimension. If not specified, it + defaults to ws. + flatten - if True, all slices are flattened, otherwise, there is an + extra dimension for each dimension of the input. + + Returns + an array containing each n-dimensional window from a + """ + + if None is ss: + # ss was not provided. the windows will not overlap in any direction. + ss = ws + if isinstance(ws, int) and ws < 1: + raise ValueError("ws must be at least 1") + if isinstance(ss, int) and ss < 1: + raise ValueError("ss must be at least 1") + + # Will transform the ws and ss into a tuple if they are integers + ws = norm_shape(ws) + ss = norm_shape(ss) + + # convert ws, ss, and a.shape to numpy arrays so that we can do math in + # every dimension at once. + ws = np.array(ws) + ss = np.array(ss) + shape = np.array(a.shape) + + # ensure that ws, ss, and a.shape all have the same number of dimensions + ls = [len(shape), len(ws), len(ss)] + if 1 != len(set(ls)): + raise ValueError( + "a.shape, ws and ss must all have the same length. They were %s" % str(ls) + ) + + # ensure that ws is smaller than a in every dimension + if np.any(ws > shape): + raise ValueError("ws cannot be larger than a in any dimension.\ + a.shape was %s and ws was %s" % (str(a.shape), str(ws))) + + # how many slices will there be in each dimension? + newshape = norm_shape(((shape - ws) // ss) + 1) + # the shape of the strided array will be the number of slices in each dimension + # plus the shape of the window (tuple addition) + newshape += norm_shape(ws) + # the strides tuple will be the array's strides multiplied by step size, plus + # the array's strides (tuple addition) + newstrides = norm_shape(np.array(a.strides) * ss) + a.strides + strided = ast(a, shape=newshape, strides=newstrides) + if not flatten: + return strided + + # Collapse strided so that it has one more dimension than the window. I.e., + # the new array is a flat list of slices. + meat = len(ws) if ws.shape else 0 + firstdim = (np.prod(newshape[:-meat]),) if ws.shape else () + dim = firstdim + (newshape[-meat:]) + # remove any dimensions with size 1 + # dim = filter(lambda i : i != 1,dim) + return strided.reshape(dim) + + +def opp_sliding_window(data_x, data_y, ws, ss): + + data_x = sliding_window(data_x, (ws, data_x.shape[1]), (ss, 1)) + + data_y = np.reshape(data_y, (len(data_y),)) + data_y = np.asarray([[i[-1]] for i in sliding_window(data_y, ws, ss)]) + return data_x.astype(np.float32), data_y.reshape(len(data_y)).astype(np.uint8) + + +class HARDatasetCPC(Dataset): + def __init__( + self, + data_path: Union[PathLike, List[PathLike]], + input_size: int, + window: int, + overlap: int, + phase: str = "train", + use_train_as_val: bool = False, + use_val_with_train: bool = True, + columns: Optional[List[str]] = None, + label: Optional[str] = "standard activity code", + transpose_data: bool = True, + ): + """ + Initializes the dataset by loading the dataset from CSV files, + segmenting the data into windows, and preparing it for training + or evaluation. + + Parameters + ---------- + data_path : Union[PathLike, List[PathLike]] + The path to the directory containing the dataset files. If a list of + paths is provided, the datasets will be concatenated, in the order + provided, into a single dataset. + input_size : int + The expected size of input features. + window : int + The size of the sliding window used to segment the data. + overlap : int + The overlap between consecutive windows. + phase : str + The phase of the dataset ('train', 'val', or 'test'). + use_train_as_val : bool + Whether to use the training set as the validation set. + use_val_with_train : bool + Whether to use the validation set as the training set. + columns : Optional[List[str]] + The columns to be used as input features. If None, the default + columns ['accel-x', 'accel-y', 'accel-z', 'gyro-x', 'gyro-y', + 'gyro-z'] will be used. + label : Optional[str] + The column to be used as the label. If None, no labels will be + used. If 'return_index_as_label', the index of the data will be + used as the label. + transpose_data : bool + If True, the data will be returned as a vector of shape (C, T), + else the data will be returned as a vector of shape (T, C). + """ + # Create a list of paths if only one path is provided + self.paths = data_path if isinstance(data_path, list) else [data_path] + self.use_train_as_val = use_train_as_val + self.use_val_with_train = use_val_with_train + self.label = label + self.transpose_data = transpose_data + self.input_size = input_size + self.columns = ( + columns + if columns is not None + else [ + "accel-x", + "accel-y", + "accel-z", + "gyro-x", + "gyro-y", + "gyro-z", + ] + ) + + self.data_raw = self.load_dataset() + assert input_size == self.data_raw[phase]["data"].shape[1] + + # Obtaining the segmented data + self.data, self.labels = opp_sliding_window( + self.data_raw[phase]["data"], + self.data_raw[phase]["labels"], + window, + overlap, + ) + + if self.label and self.label == "return_index_as_label": + datum_index = np.arange(len(self.data)) + np.random.shuffle(datum_index) + self.labels = datum_index + + # Transpose the data if required + if self.transpose_data: + if self.data.ndim == 2: + self.data = self.data.T + elif self.data.ndim == 3: + self.data = self.data.transpose(0, 2, 1) + + # Load .csv file + + def load_dataset(self): + """ + Loads the dataset from CSV files, concatenates them into numpy arrays, + and converts them to the appropriate data types. + + Returns + ------- + dict + A dictionary containing 'data' and 'labels' for 'train', 'val', and 'test' + phases, where 'data' is a numpy array of concatenated data and 'labels' + is a numpy array of concatenated labels. + """ + datasets = {} + + for phase in ["train", "val", "test"]: + if phase == "val": + if self.use_train_as_val: + datasets[phase] = datasets["train"] + continue + + data_x = [] + data_y = [] + + for path in self.paths: + path = Path(path) + phase_path = path / phase + for f in phase_path.glob("*.csv"): + data = pd.read_csv(f) + x = data[self.columns].values + if self.label and self.label != "return_index_as_label": + y = data[self.label].values + else: + y = np.arange(len(x)) + data_x.append(x) + data_y.append(y) + + datasets[phase] = { + "data": np.concatenate(data_x), + "labels": np.concatenate(data_y), + } + datasets[phase]["data"] = datasets[phase]["data"].astype(np.float32) + datasets[phase]["labels"] = datasets[phase]["labels"].astype(np.uint8) + + # If use_val_with_train is True, concatene the training and validation datasets. + if self.use_val_with_train and "val" in datasets: + + datasets["train"]["data"] = np.concatenate( + [datasets["train"]["data"], datasets["val"]["data"]] + ) + datasets["train"]["labels"] = np.concatenate( + [datasets["train"]["labels"], datasets["val"]["labels"]] + ) + + return datasets + + def __len__(self): + return len(self.data) + + def __getitem__(self, index): + data = self.data[index] + + if self.label: + return data, self.labels[index] + return data diff --git a/minerva/data/datasets/har_xu_23.py b/minerva/data/datasets/har_xu_23.py new file mode 100644 index 0000000..fdc6c4f --- /dev/null +++ b/minerva/data/datasets/har_xu_23.py @@ -0,0 +1,454 @@ +import os +from typing import List, Tuple + +import numpy as np +import torch +from sklearn.metrics.pairwise import cosine_similarity +from statsmodels.tsa.stattools import adfuller +from torch.utils.data import Dataset + +from minerva.utils.typing import PathLike + + +class TNCDataset(Dataset): + def __init__( + self, + x: np.array, + mc_sample_size: int = 5, + window_size: int = 128, + epsilon=3, + adf: bool = True, + ): + """ + This TNCDataset class is designed to handle time series data for the TNC + (Temporal Neighborhood Coding) task. It includes methods to load data, + find close neighbors using ADF testing or cosine similarity, and find + distant non-neighbors. The dataset returns a tuple of the central + window, close neighbors, and distant non-neighbors for each sample. + + The `time_series` input should have the shape + (n_samples, n_channels, n_timesteps). + + The `__getitem__` method returns: + - `central_window`: (n_channels, window_size) + - `close_neighbors`: (mc_sample_size, n_channels, window_size) + - `non_neighbors`: (mc_sample_size, n_channels, window_size) + + Parameters + ---------- + x : np.ndarray + The time series data of shape (n_samples, n_channels, n_timesteps). + mc_sample_size : int + This value determines how many neighboring and non-neighboring + windows are used per data sample. + window_size : int + The size of the window to be used for each sample. + epsilon : int, optional + This parameter controls the "spread" of neighboring windows. + Higher values lead to more diverse neighbors within a larger search + radius around the center window. + adf : bool, optional + A flag indicating whether to use ADF (Augmented Dickey-Fuller) + testing for finding neighbors. Defaults to True. + + Neighbor Selection + ------------------ + The selection of neighbors and non-neighbors is crucial for TNC. Here's + how it's done: + + 1. **Finding Close Neighbors**: + - **ADF (Augmented Dickey-Fuller) Testing**: + - The ADF test checks the stationarity of the time series + segments. + - For each time window of size `w_t` (ranging from `window_size` + to `4 * window_size`), the ADF test is applied to determine + the p-value. + - The average p-value across all channels is calculated. + - The neighborhood size `epsilon` is determined based on the + p-values. If all p-values are below the threshold (0.01), + `epsilon` is set to the length of `corr`, otherwise, it is + set to the first index where the p-value exceeds 0.01. + - The `delta` is then set to `5 * epsilon * window_size`. + - Neighboring time steps are generated by adding a random value + from a normal distribution scaled by `epsilon * window_size` + to the current time step `t`. + - These time steps are adjusted to ensure they are within valid + bounds. + + - **Cosine Similarity**: + - If ADF is not used, cosine similarity is employed to find + close neighbors. + - The target window (current segment) is flattened, and its + cosine similarity with all other windows of the same size + in the time series is calculated. + - The top `mc_sample_size` windows with the highest cosine + similarity are selected as neighbors. + - The selected time steps are adjusted to ensure they are + within valid bounds. + + 2. **Finding Distant Non-Neighbors**: + - The method `_find_non_neighbors` generates non-neighbors by + selecting time steps far from the current time step `t`. + - Depending on whether `t` is in the first or second half of the + time series, the non-neighbor time steps are selected to be + either before or after the `delta` range. + - A fallback mechanism ensures at least one non-neighbor segment is + returned, even if the primary selection fails. + + Example Usage + ------------- + ```python + # Example configuration + from minerva.data.datasets.har_xu_23 import TNCDataset + import numpy as np + + data = np.random.randn(100, 6, 1000) # (samples, channels, timesteps) + + # Instantiate the dataset + tnc_dataset = TNCDataset( + x=data, + mc_sample_size=mc_sample_size, + window_size=window_size, + epsilon=epsilon, + adf=adf + ) + + # Retrieve a sample from the dataset + central_window, close_neighbors, non_neighbors = tnc_dataset[0] + + print("Central Window Shape:", central_window.shape) # (window_size,n_channels) + print("Close Neighbors Shape:", close_neighbors.shape) # (mc_sample_size,window_size, n_channels, ) + print("Non-Neighbors Shape:", non_neighbors.shape) # (mc_sample_size, n_channels, window_size) + ``` + """ + super(TNCDataset, self).__init__() + self.time_series = x + self.T = x.shape[-1] + self.window_size = window_size + self.mc_sample_size = mc_sample_size + self.adf = adf + if not self.adf: + self.epsilon = epsilon + self.delta = 5 * window_size * epsilon + + def __len__(self): + """ + Returns the number of samples in the dataset. + + Returns + ------- + int + The number of samples in the dataset. + """ + return self.time_series.shape[0] + + def __getitem__(self, ind): + """ + Returns a sample from the dataset. + + Parameters + ---------- + ind : int + The index of the sample to retrieve. + + Returns + ------- + tuple + A tuple containing the central window, close neighbors, and distant non-neighbors. + """ + ind = ind % len(self.time_series) + t = np.random.randint(2 * self.window_size, self.T - 2 * self.window_size) + x_t = ( + torch.from_numpy( + self.time_series[ind][ + :, t - self.window_size // 2 : t + self.window_size // 2 + ] + ) + .to(torch.float) + .transpose(-1, -2) + ) + X_close = ( + torch.from_numpy(self._find_neighours(self.time_series[ind], t)) + .to(torch.float) + .transpose(-1, -2) + ) + X_distant = ( + torch.from_numpy(self._find_non_neighours(self.time_series[ind], t)) + .to(torch.float) + .transpose(-1, -2) + ) + + return x_t, X_close, X_distant + + def _find_neighours(self, x, t): + """ + Finds close neighbors for a given time step. + + Parameters + ---------- + x : np.ndarray + The time series data for a single sample. + t : int + The current time step. + + Returns + ------- + np.ndarray + An array of close neighbors. + """ + T = self.time_series.shape[-1] + if self.adf: + gap = self.window_size + corr = [] + for w_t in range(self.window_size, 4 * self.window_size, gap): + try: + p_val = 0 + for f in range(x.shape[-2]): + p = adfuller( + np.array( + x[ + f, + max(0, t - w_t) : min(x.shape[-1], t + w_t), + ].reshape( + -1, + ) + ) + )[1] + p_val += 0.01 if np.isnan(p) else p + corr.append(p_val / x.shape[-2]) + except: + corr.append(0.6) + self.epsilon = ( + len(corr) + if len(np.where(np.array(corr) >= 0.01)[0]) == 0 + else (np.where(np.array(corr) >= 0.01)[0][0] + 1) + ) + self.delta = 5 * self.epsilon * self.window_size + + t_p = [ + int(t + np.random.randn() * self.epsilon * self.window_size) + for _ in range(self.mc_sample_size) + ] + t_p = [ + max( + self.window_size // 2 + 1, + min(t_pp, T - self.window_size // 2), + ) + for t_pp in t_p + ] + x_p = np.stack( + [ + x[ + :, + t_ind - self.window_size // 2 : t_ind + self.window_size // 2, + ] + for t_ind in t_p + ] + ) + else: + + target_window = x[ + :, t - self.window_size // 2 : t + self.window_size // 2 + ].flatten() + similarities = [] + gap = self.window_size + for w_t in range(self.window_size, T - self.window_size, gap): + window = x[ + :, w_t - self.window_size // 2 : w_t + self.window_size // 2 + ].flatten() + cos_sim = cosine_similarity([target_window], [window])[0][0] + similarities.append((w_t, cos_sim)) + + similarities = sorted(similarities, key=lambda x: x[1], reverse=True) + t_p = [w_t for w_t, _ in similarities[: self.mc_sample_size]] + + t_p = [ + max( + self.window_size // 2 + 1, + min(t_pp, T - self.window_size // 2), + ) + for t_pp in t_p + ] + x_p = np.stack( + [ + x[ + :, + t_ind - self.window_size // 2 : t_ind + self.window_size // 2, + ] + for t_ind in t_p + ] + ) + return x_p + + def _find_non_neighours(self, x, t): + """ + Finds distant non-neighbors for a given time step. + + Parameters + ---------- + x : np.ndarray + The time series data for a single sample. + t : int + The current time step. + + Returns + ------- + np.ndarray + An array of distant non-neighbors. + """ + T = self.time_series.shape[-1] + if t > T / 2: + t_n = np.random.randint( + self.window_size // 2, + max((t - self.delta + 1), self.window_size // 2 + 1), + self.mc_sample_size, + ) + else: + t_n = np.random.randint( + min((t + self.delta), (T - self.window_size - 1)), + (T - self.window_size // 2), + self.mc_sample_size, + ) + x_n = np.stack( + [ + x[ + :, + t_ind - self.window_size // 2 : t_ind + self.window_size // 2, + ] + for t_ind in t_n + ] + ) + + if len(x_n) == 0: + rand_t = np.random.randint(0, self.window_size // 5) + if t > T / 2: + x_n = x[:, rand_t : rand_t + self.window_size].unsqueeze(0) + else: + x_n = x[:, T - rand_t - self.window_size : T - rand_t].unsqueeze(0) + return x_n + + +class HarDataset(Dataset): + def __init__( + self, + data_path: PathLike, + annotate: str, + feature_column_prefixes: List[str] = [ + "accel-x", + "accel-y", + "accel-z", + "gyro-x", + "gyro-y", + "gyro-z", + ], + target_column: str = "standard activity code", + flatten: bool = False, + ): + """ + Dataset class for human activity recognition (HAR) data. + + Loads and prepares data from `.npy` files and returns features and labels. + + Parameters + ---------- + data_path : PathLike + Path to the directory containing dataset files. The directory should contain the following files: + - train_data_subseq.npy + - train_labels_subseq.npy + - val_data.npy + - val_labels_subseq.npy + - test_data.npy + - test_labels_subseq.npy + + These files should correspond to data segmented into subsequences and their labels. + annotate : str + Annotation type, indicating which subset of the data to load ('train', 'val', or 'test'). + feature_column_prefixes : List[str], optional + List of prefixes for feature columns. Defaults to: + ["accel-x", "accel-y", "accel-z", "gyro-x", "gyro-y", "gyro-z"]. + target_column : str, optional + Name of the column for the target variable. Defaults to 'standard activity code'. + flatten : bool, optional + If True, flattens the input data. Defaults to False. + + Attributes + ---------- + data : numpy.ndarray + Array of features with shape (num_samples, num_timesteps, num_features). + - num_samples: Total number of samples in the dataset. + - num_timesteps: Length of each subsequence (e.g., 128). + - num_features: Number of features per timestep (e.g., 6 for accelerometer and gyroscope data). + labels : numpy.ndarray + Array of labels with shape (num_samples,). + - num_samples: Total number of samples in the dataset. + + Methods + ------- + __len__() -> int + Returns the number of samples in the dataset. + __getitem__(idx: int) -> Tuple[torch.Tensor, int] + Retrieves a sample from the dataset. + - Features shape: [num_timesteps, num_features] if `flatten` is False, otherwise [num_timesteps * num_features]. + - Label shape: Scalar. + + Examples + -------- + from minerva.data.datasets.har_xu_23 import HarDataset + >>> dataset = HarDataset(data_path="/path/to/data", annotate="train") + >>> len(dataset) + 3178 + >>> sample = dataset[0] + >>> features, label = sample + >>> features.shape + torch.Size([128, 6]) + >>> label + tensor(4) + """ + super().__init__() + self.data_path = data_path + self.annotate = annotate + self.feature_column_prefixes = feature_column_prefixes + self.target_column = target_column + self.flatten = flatten + + self.data = np.load( + os.path.join(self.data_path, f"{self.annotate}_data_subseq.npy") + ) + self.labels = np.load( + os.path.join(self.data_path, f"{self.annotate}_labels_subseq.npy") + ) + + # self.labels = np.load(self.data_path / f"{self.annotate}_labels_subseq.npy") + assert len(self.data) == len( + self.labels + ), "Data and labels must have the same length" + + def __len__(self): + return len(self.data) + + def __getitem__(self, idx: int) -> Tuple[torch.Tensor, int]: + """ + Get a sample from the dataset. + + Parameters + ---------- + idx : int + Index of the sample to retrieve. + + Returns + ------- + Tuple[torch.Tensor, int] + Tuple containing the features and the target label. + """ + data = self.data[idx] + if self.flatten: + data = data.flatten() + + features = data + target = self.labels[idx] + + # Convert to torch.FloatTensor and torch.LongTensor + features = torch.FloatTensor(features) + target = torch.tensor(target, dtype=torch.long) + + return features, target diff --git a/minerva/data/datasets/series_dataset.py b/minerva/data/datasets/series_dataset.py new file mode 100644 index 0000000..e466b7f --- /dev/null +++ b/minerva/data/datasets/series_dataset.py @@ -0,0 +1,504 @@ +import contextlib +from pathlib import Path +from typing import Dict, List, Optional, Tuple, Union + +import numpy as np +import pandas as pd +from torch.utils.data import Dataset + +from minerva.transforms.transform import _Transform + + +class MultiModalSeriesCSVDataset(Dataset): + def __init__( + self, + data_path: Union[Path, str], + feature_prefixes: Optional[Union[str, List[str]]] = None, + label: Optional[str] = None, + features_as_channels: bool = True, + cast_to: str = "float32", + transforms: Optional[Union[_Transform, List[_Transform]]] = None, + map_labels: Optional[Dict[int, int]] = None, + ): + """This datasets assumes that the data is in a single CSV file with + series of data. Each row is a single sample that can be composed of + multiple modalities (series). Each column is a feature of some series + with the prefix indicating the series. The suffix may indicates the + time step. For instance, if we have two series, accel-x and accel-y, + the data will look something like: + + +-----------+-----------+-----------+-----------+--------+ + | accel-x-0 | accel-x-1 | accel-y-0 | accel-y-1 | class | + +-----------+-----------+-----------+-----------+--------+ + | 0.502123 | 0.02123 | 0.502123 | 0.502123 | 0 | + | 0.6820123 | 0.02123 | 0.502123 | 0.502123 | 1 | + | 0.498217 | 0.00001 | 1.414141 | 3.141592 | 2 | + +-----------+-----------+-----------+-----------+--------+ + + The ``feature_prefixes`` parameter is used to select the columns that + will be used as features. For instance, if we want to use only the + accel-x series, we can set ``feature_prefixes=["accel-x"]``. If we want + to use both accel-x and accel-y, we can set + ``feature_prefixes=["accel-x", "accel-y"]``. If None is passed, all + columns will be used as features, except the label column. + The label column is specified by the ``label`` parameter. + + The dataset will return a 2-element tuple with the data and the label, + if the ``label`` parameter is specified, otherwise return only the data. + + If ``features_as_channels`` is ``True``, the data will be returned as a + vector of shape `(C, T)`, where C is the number of channels (features) + and `T` is the number of time steps. Else, the data will be returned as + a vector of shape T*C (a single vector with all the features). + + Parameters + ---------- + data_path : Union[Path, str] + The location of the CSV file + feature_prefixes : Union[str, List[str]], optional + The prefix of the column names in the dataframe that will be used + to become features. If None, all columns except the label will be + used as features. + label : str, optional + The name of the column that will be used as label + features_as_channels : bool, optional + If True, the data will be returned as a vector of shape (C, T), + else the data will be returned as a vector of shape T*C. + cast_to: str, optional + Cast the numpy data to the specified type + transforms: Optional[List[Callable]], optional + A list of transforms that will be applied to each sample + individually. Each transform must be a callable that receives a + numpy array and returns a numpy array. The transforms will be + applied in the order they are specified. + map_labels: Optional[Dict[int, int]], optional + A dictionary to map the labels to a different set of labels. The + keys are the original labels and the values are the new labels. + + Examples + -------- + # Using the data from the example above, and features_as_channels=False + >>> data_path = "data.csv" + >>> dataset = MultiModalSeriesCSVDataset( + data_path, + feature_prefixes=["accel-x", "accel-y"], + label="class" + ) + >>> data, label = dataset[0] + >>> data.shape + (4, ) + + # Using the data from the example above, and features_as_channels=True + >>> dataset = MultiModalSeriesCSVDataset( + data_path, + feature_prefixes=["accel-x", "accel-y"], + label="class", + features_as_channels=True + ) + >>> data, label = dataset[0] + >>> data.shape + (2, 2) + + # And the dataset length + >>> len(dataset) + 3 + + """ + self.data_path = Path(data_path) + + if feature_prefixes is not None: + if not isinstance(feature_prefixes, list): + feature_prefixes = [feature_prefixes] + if len(feature_prefixes) == 0: + raise ValueError( + "feature_prefixes must have at least one element or be None" + ) + self.feature_prefixes = feature_prefixes + self.label = label + self.cast_to = cast_to + self.features_as_channels = features_as_channels + if transforms is not None: + if not isinstance(transforms, list): + transforms = [transforms] + else: + transforms = [] + self.transforms = transforms + self.map_labels = map_labels + self.data, self.labels = self._load_data() + + def _load_data(self) -> Tuple[np.ndarray, Optional[np.ndarray]]: + """Load data from the CSV file + + Returns + ------- + Tuple[np.ndarray, Optional[np.ndarray]] + A 2-element tuple with the data and the labels. The second element + is None if the label is not specified. + """ + df = pd.read_csv(self.data_path) + + # Select columns with the given prefixes: + # If None, select all columns except the label (if specified) and + # update the feature_prefixes attribute with the selected columns + if self.feature_prefixes is None: + selected_columns = [col for col in df.columns if col != self.label] + self.feature_prefixes = selected_columns + # Else select only the columns with the given prefixes + else: + selected_columns = [ + col + for col in df.columns + if any(prefix in col for prefix in self.feature_prefixes) + ] + # Select the columns + selected_columns = list(selected_columns) + data = df[selected_columns].to_numpy() + + # If features_as_channels is True, reshape the data to (N, C, T) where + # N=number of samples, C=number of channels, T=time steps + if self.features_as_channels: + data = data.reshape( + -1, + len(self.feature_prefixes), + data.shape[1] // len(self.feature_prefixes), + ) + + # Cast the data to the specified type + if self.cast_to: + data = data.astype(self.cast_to) + + # If label is specified, return the data and the labels + if self.label: + if self.label == "return_index_as_label": + return data, np.arange(len(data)) + if self.label not in df.columns: + raise ValueError( + f"Label column '{self.label}' not found in the CSV file" + ) + + if self.map_labels: + labels = df[self.label].map(self.map_labels).to_numpy() + else: + labels = df[self.label].to_numpy() + + if any(np.isnan(labels)): + raise ValueError("There are NaN values in the labels") + + return data, labels + # If label is not specified, return only the data + else: + return data, None + + def __len__(self) -> int: + return len(self.data) + + def __getitem__( + self, index: int + ) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]: + # Get data and apply transforms + data = self.data[index] + for transform in self.transforms: + data = transform(data) + + # Return data and label if specified, else return only the data + if self.label: + return data, self.labels[index] # type: ignore + else: + return data + + def __str__(self) -> str: + return f"MultiModalSeriesCSVDataset at {self.data_path} ({len(self)} samples)" + + def __repr__(self) -> str: + return str(self) + + +class SeriesFolderCSVDataset(Dataset): + def __init__( + self, + data_path: Union[Path, str], + features: Optional[Union[str, List[str]]] = None, + label: Optional[str] = None, + pad: bool = False, + cast_to: str = "float32", + transforms: Optional[Union[_Transform, List[_Transform]]] = None, + lazy: bool = False, + ): + """This dataset assumes that the data is in a folder with multiple CSV + files. Each CSV file is a single sample that can be composed of + multiple time steps (rows). Each column is a feature of the sample. + + For instance, if we have two samples, sample-1.csv and sample-2.csv, + the directory structure will look something like: + + data_path + ├── sample-1.csv + └── sample-2.csv + + And the data will look something like: + - sample-1.csv: + +---------+---------+--------+ + | accel-x | accel-y | class | + +---------+---------+--------+ + | 0.502123| 0.02123 | 1 | + | 0.682012| 0.02123 | 1 | + | 0.498217| 0.00001 | 1 | + +---------+---------+--------+ + - sample-2.csv: + +---------+---------+--------+ + | accel-x | accel-y | class | + +---------+---------+--------+ + | 0.502123| 0.02123 | 0 | + | 0.682012| 0.02123 | 0 | + | 0.498217| 0.00001 | 0 | + | 3.141592| 1.414141| 0 | + +---------+---------+--------+ + + The ``features`` parameter is used to select the columns that will be + used as features. For instance, if we want to use only the accel-x + column, we can set ``features=["accel-x"]``. If we want to use both + accel-x and accel-y, we can set ``features=["accel-x", "accel-y"]``. + + The label column is specified by the ``label`` parameter. Note that we + have one label per time-step and not a single label per sample. + + The dataset will return a 2-element tuple with the data and the label, + if the ``label`` parameter is specified, otherwise return only the data. + + Notes + ----- + - Samples may have different number of time steps. Use ``pad`` to pad + the data to the length of the longest sample. + + Examples + -------- + # Using the data from the example above + >>> data_dir = "train_folder" + >>> dataset = SeriesFolderCSVDataset( + data_dir, + features=["accel-x", "accel-y"], + label="class" + ) + >>> data, label = dataset[0] + >>> data.shape + (2, 3) + >>> label.shape + (3,) + >>> data, label = dataset[1] + >>> data.shape + (2, 4) + >>> label.shape + (4,) + + Parameters + ---------- + data_path : str + The location of the directory with CSV files + features: List[str] + A list with column names that will be used as features. If None, + all columns except the label will be used as features. + pad: bool, optional + If True, the data will be padded to the length of the longest + sample. Note that padding will be applyied after the transforms, + and also to the labels if specified. + label: str, optional + Specify the name of the column with the label of the data + cast_to: str, optional + Cast the numpy data to the specified type + transforms: Optional[List[Callable]], optional + A list of transforms that will be applied to each sample + individually. Each transform must be a callable that receives a + numpy array and returns a numpy array. The transforms will be + applied in the order they are specified. + lazy: bool, optional + If True, the data will be loaded lazily (i.e. the CSV files will be + read only when needed) + """ + self.data_path = Path(data_path) + if not self.data_path.exists(): + raise ValueError(f"Data path {self.data_path} does not exist") + if not self.data_path.is_dir(): + raise ValueError(f"Data path {self.data_path} is not a directory") + if features is not None: + if not isinstance(features, list): + features = [features] + if len(features) == 0: + raise ValueError("features must have at least one element or be None") + + self.features = features + self.label = label + self.pad = pad + self.cast_to = cast_to + if transforms is not None: + if not isinstance(transforms, list): + transforms = [transforms] + else: + transforms = [] + self.transforms = transforms + + self._files = self._scan_data() + if len(self._files) == 0: + raise ValueError(f"No CSV files found in {self.data_path}") + # Data contains all the data if lazy is False else None + self._cache = self._read_all_csv() if not lazy else None + self._longest_sample_size = self._get_longest_sample_size() + + @contextlib.contextmanager + def _disable_fix_length(self): + """Decorator to disable fix_length when calling a function""" + old_fix_length = self.pad + self.pad = False + yield + self.pad = old_fix_length + + def _scan_data(self) -> List[Path]: + """List the CSV files in the data directory + + Returns + ------- + List[Path] + List of CSV files + """ + return list(sorted(self.data_path.glob("*.csv"))) + + def _get_longest_sample_size(self) -> int: + """Return the size of the longest sample in the dataset + + Returns + ------- + int + The size of the longest sample in the dataset + """ + if not self.pad: + return 0 + + # Iterate + with self._disable_fix_length(): + longest_sample_size = max(self[i][0].shape[-1] for i in range(len(self))) + return longest_sample_size + + def _read_csv(self, path: Path) -> Tuple[np.ndarray, Optional[np.ndarray]]: + """Read a single CSV file (a single sample) + + Parameters + ---------- + path : Path + The path to the CSV file + + Returns + ------- + Tuple[np.ndarray, Optional[np.ndarray]] + A 2-element tuple with the data and the label. If the label is not + specified, the second element is None. + """ + # Read the data + original_data = pd.read_csv(path) + + # Collect the features + if self.features is None: + selected_columns = [ + col for col in original_data.columns if col != self.label + ] + else: + selected_columns = self.features + # Transform it to a list if it is not + selected_columns = list(selected_columns) + + data = original_data[selected_columns].values + data = data.swapaxes(0, 1) + + # Cast the data to the specified type + if self.cast_to: + data = data.astype(self.cast_to) + + # Read the label if specified and return the data and the label + if self.label is not None: + return data, original_data[[self.label]].values + # If label is not specified, return only the data + else: + return data, None + + def _read_all_csv( + self, + ) -> List[Tuple[np.ndarray, Optional[np.ndarray]]]: + """Read all the CSV files in the data directory + + Returns + ------- + Union[Tuple[np.ndarray, np.ndarray], np.ndarray] + A list of 2-element tuple with the data and the label. If the label is not specified, the second element of the tuples are None. + """ + return [self._read_csv(f) for f in self._files] + + def __len__(self) -> int: + return len(self._files) + + def _pad_data(self, data: np.ndarray) -> np.ndarray: + """Pad the data to the length of the longest sample. In summary, this + function makes the data cyclic. + + Parameters + ---------- + data : np.ndarray + The data to pad + + Returns + ------- + np.ndarray + The padded data + """ + time_len = data.shape[-1] + + if time_len == self._longest_sample_size: + return data + + # Repeat the data along the time axis to match the longest sample size + repetitions = self._longest_sample_size // time_len + 1 + data = np.tile(data, (1, repetitions))[:, : self._longest_sample_size] + return data + + def __getitem__(self, idx: int) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]: + """Get a single sample from the dataset + + Parameters + ---------- + idx : int + The index of the sample + + Returns + ------- + Union[Tuple[np.ndarray, np.ndarray], np.ndarray] + A 2-element tuple with the data and the label if the label is + specified, otherwise only the data. + """ + # If the data is not loaded, load it lazily (read the CSV file) + if self._cache is None: + data, label = self._read_csv(self._files[idx]) + # Else, read from the loaded data + else: + data, label = self._cache[idx] + + # Pad the data if fix_length is True + if self.pad: + data = self._pad_data(data) + if label is not None: + # Transpose labels because _pad_data assumes that first + # dimension is the "time steps" + label = self._pad_data(label.T).T + + # Apply transforms + for transform in self.transforms: + data = transform(data) + + # If label is specified, return the data and the label + if label is not None: + return data, label + # Else, return only the data + else: + return data + + def __str__(self) -> str: + return f"SeriesFolderCSVDataset at {self.data_path} ({len(self)} samples)" + + def __repr__(self) -> str: + return str(self) diff --git a/minerva/models/nets/diet_linear.py b/minerva/models/nets/diet_linear.py new file mode 100644 index 0000000..4c15b2f --- /dev/null +++ b/minerva/models/nets/diet_linear.py @@ -0,0 +1,26 @@ +from typing import Callable + +import torch + + +class AdaptedHead(torch.nn.Module): + def __init__(self, model: torch.nn.Module, adapter: Callable): + super().__init__() + self.model = model + self.adapter = adapter + + def forward(self, x): + x = self.adapter(x) + return self.model.forward(x) + + +class DIETLinear(torch.nn.Module): + def __init__(self, in_features: int, out_features: int): + super().__init__() + self.in_features = in_features + self.out_features = out_features + self.fc = torch.nn.Linear(in_features, out_features) + + def forward(self, x): + x = self.fc(x) + return x diff --git a/minerva/models/nets/lfr_har_architectures.py b/minerva/models/nets/lfr_har_architectures.py index 537a0da..220cbe3 100644 --- a/minerva/models/nets/lfr_har_architectures.py +++ b/minerva/models/nets/lfr_har_architectures.py @@ -1,4 +1,5 @@ from torch import nn + from minerva.models.ssl.lfr import RepeatedModuleList @@ -14,6 +15,7 @@ def __init__( dim: int = 128, input_channel: int = 9, inner_conv_output_dim: int = 128 * 18, + permute: bool = False, ): """ Parameters @@ -54,9 +56,12 @@ def __init__( else: # use a linear layer to reach the latent shape self.mlp = nn.Linear(inner_conv_output_dim, dim) + self.permute = permute def forward(self, xb): # Flatten images into vectors + if self.permute: + xb = xb.permute(0, 2, 1) out = self.conv(xb) out = out.view(out.size(0), -1) out = self.mlp(out) diff --git a/minerva/models/ssl/diet.py b/minerva/models/ssl/diet.py new file mode 100644 index 0000000..add3fc3 --- /dev/null +++ b/minerva/models/ssl/diet.py @@ -0,0 +1,158 @@ +from typing import Callable, Optional + +import lightning as L +import torch +from torch import nn +from torch.nn import CrossEntropyLoss +from torch.optim import Adam + +from minerva.schedulers.warmup_cosine_annealing import WarmupCosineAnnealingLR + + +class DIET(L.LightningModule): + def __init__( + self, + backbone: nn.Module, + linear_head: Optional[torch.nn.Module] = None, + num_data: Optional[int] = None, + flatten: bool = True, + adapter: Optional[Callable[[torch.Tensor], torch.Tensor]] = None, + loss: Callable = None, + learning_rate: float = 3e-4, + weight_decay: float = 3e-4, + wca_scheduler_total_epochs: Optional[int] = None, + ): + """ + DIET model. + + Parameters + ---------- + backbone : torch.nn.Module + Backbone model. + linear_head: torch.nn.Module, optional + Linear head that computes logits from embeddings of the data input, by default None. + If None, the linear head is automatically defined before training. The lengths of + both training dataset and linear head output must match. + num_data : int, optional + Total number of samples in the training dataset, by default None. If None, the length + of the training dataset is computed before the training in the setup() function. + flatten : bool + If True, the output of the backbone is flattened before the linear layer, + by default True. + adapter : Optional[Callable[[torch.Tensor], torch.Tensor]], optional + If not None, an adapter is added after the backbone and before the flatten process, + by default None. + loss : Callable + Loss function, by default CrossEntropyLoss with label smoothing 0.8. + learning_rate : float, optional + Learning rate used in the optimizer, by default 3e-4. + weight_decay : float, optional + Weight decay used in the optimizer, by default 3e-4. + wca_scheduler_total_epochs : int, optional + Total number of epochs for the WarmupCosineAnnealing scheduler, by default None. + Must be None or an integer greater than 10. If None, no scheduler is used. + """ + super(DIET, self).__init__() + # Defining layers + self.backbone = backbone + self.linear_head = linear_head + self.num_data = num_data + # Defining adapter + self.adapter = adapter + self.flatten = flatten + # Defining loss + self.loss = loss or CrossEntropyLoss(label_smoothing=0.8) + # Defining other hyperparameters + self.learning_rate = learning_rate + self.weight_decay = weight_decay + self.wca_scheduler_total_epochs = wca_scheduler_total_epochs + + if ( + self.wca_scheduler_total_epochs is not None + and self.wca_scheduler_total_epochs <= 10 + ): + raise ValueError( + "Total number of epochs for the WarmupCosineAnnealing scheduler must be greater than 10." + ) + + def setup(self, stage): + """ + Setup function. If the model lacks a linear head, this function computes the length + of the training dataset, the encoding size, and creates a linear head accordingly. Also + checks whether the linear head output matches the length of the training dataset, + raising an error in case of mismatch. + """ + if stage != "fit": + return + # Get the training dataset + training_dataset = self.trainer.datamodule.train_dataloader().dataset + # Update num_data if None + if self.num_data is None: + self.num_data = len(training_dataset) + # Define a linear head if None + if self.linear_head is None: + # Simulated input for encoding_size calculation + random_input = torch.rand(training_dataset[:5][0].shape) + # Compute the encoding size + with torch.no_grad(): + # Obtain the embeddings from the random data + out = self.backbone(random_input) + if self.adapter: + out = self.adapter(out) + if self.flatten: + out = out.flatten(start_dim=1) + # Computes the encoding size + encoding_size = out.size(1) + # Defines the linear head + self.linear_head = nn.Linear(encoding_size, self.num_data) + else: + # Check if the linear head provided matches the length of the training dataset + assert ( + self.num_data == self.linear_head.out_features + ), f"Number of samples({self.num_data}) and output of linear head({self.linear_head.out_features}) do not match." + + def forward(self, x): + x = self.backbone(x) + if self.adapter: + x = self.adapter(x) + if self.flatten: + x = x.flatten(start_dim=1) + x = self.linear_head(x) + return x + + def training_step(self, batch, batch_idx): + """ + A simple training step. + """ + x, y = batch + y_hat = self(x) + loss = self.loss(y_hat, y) + self.log("train_loss", loss, on_epoch=True, on_step=False) + return loss + + def configure_optimizers(self): + optimizer = Adam( + self.parameters(), + lr=self.learning_rate, + weight_decay=self.weight_decay, + betas=(0.9, 0.99), + ) + # If self.wca_scheduler_total_epochs is not None, we return the optimizer and the scheduler + if self.wca_scheduler_total_epochs: + scheduler = WarmupCosineAnnealingLR( + optimizer, + warmup_epochs=10, + total_epochs=self.wca_scheduler_total_epochs, + ) + return { + "optimizer": optimizer, + "lr_scheduler": { + "scheduler": scheduler, + "interval": "epoch", + "frequency": 1, + "reduce_on_plateau": False, + "monitor": "train_loss", + }, + } + # If self.wca_scheduler_total_epochs is None, we return only the optimizer + return optimizer diff --git a/tests/data/data_modules/test_data_module_har.py b/tests/data/data_modules/test_data_module_har.py new file mode 100644 index 0000000..4caacff --- /dev/null +++ b/tests/data/data_modules/test_data_module_har.py @@ -0,0 +1,440 @@ +import numpy as np +import pandas as pd +import pytest + +from minerva.data.data_modules.har import MultiModalHARSeriesDataModule + + +@pytest.fixture +def sample_csv_dir(tmp_path): + # Create a DataFrame with 10 samples and 6 features (4 time steps per feature) + df = pd.DataFrame( + { + "accel-x-0": np.arange(100), + "accel-x-1": np.arange(100) + 10, + "accel-x-2": np.arange(100) + 100, + "accel-x-3": np.arange(100) + 1000, + "accel-y-0": np.arange(100), + "accel-y-1": np.arange(100) * 2, + "accel-y-2": np.arange(100) * 3, + "accel-y-3": np.arange(100) * 4, + "accel-z-0": np.arange(100) + 5, + "accel-z-1": np.arange(100) + 15, + "accel-z-2": np.arange(100) + 25, + "accel-z-3": np.arange(100) + 35, + "gyro-x-0": np.arange(100) - 10, + "gyro-x-1": np.arange(100) - 20, + "gyro-x-2": np.arange(100) - 30, + "gyro-x-3": np.arange(100) - 40, + "gyro-y-0": np.arange(100) + 1, + "gyro-y-1": np.arange(100) + 2, + "gyro-y-2": np.arange(100) + 3, + "gyro-y-3": np.arange(100) + 4, + "gyro-z-0": np.arange(100) + 6, + "gyro-z-1": np.arange(100) + 7, + "gyro-z-2": np.arange(100) + 8, + "gyro-z-3": np.arange(100) + 9, + "standard activity code": [i % 4 for i in range(100)], + } + ) + # Save as train.csv, validation.csv, and test.csv + for split in ["train", "validation", "test"]: + csv_path = tmp_path / f"{split}.csv" + df.to_csv(csv_path, index=False) + return tmp_path + + +def test_multimdodal_defaults(sample_csv_dir): + """Test that MultiModalHARSeriesDataModule initializes with default parameters.""" + data_module = MultiModalHARSeriesDataModule( + data_path=sample_csv_dir, + feature_prefixes=[ + "accel-x", + "accel-y", + "accel-z", + "gyro-x", + "gyro-y", + "gyro-z", + ], + features_as_channels=True, + label="standard activity code", + batch_size=4, + cast_to="float32", + shuffle_train=True, + ) + + data_module.setup("fit") + data_module.setup("test") + + assert len(data_module.datasets["train"][0]) == 100 + assert len(data_module.datasets["validation"][0]) == 100 + assert len(data_module.datasets["test"][0]) == 100 + + assert len(data_module.datasets["train"][1]) == 100 + assert len(data_module.datasets["validation"][1]) == 100 + assert len(data_module.datasets["test"][1]) == 100 + + # Single-domain + assert all(i == 0 for i in data_module.datasets["train"][1]) + assert all(i == 0 for i in data_module.datasets["validation"][1]) + assert all(i == 0 for i in data_module.datasets["test"][1]) + + train_dataset = data_module.datasets["train"][0] + val_dataset = data_module.datasets["validation"][0] + test_dataset = data_module.datasets["test"][0] + + train_x, train_y = train_dataset[0] + val_x, val_y = val_dataset[0] + test_x, test_y = test_dataset[0] + + assert train_x.shape == (6, 4) + assert val_x.shape == (6, 4) + assert test_x.shape == (6, 4) + + assert val_y == 0 + assert test_y == 0 + expected_val_y = np.array( + [ + [0, 10, 100, 1000], # accel-x + [0, 0, 0, 0], # accel-y + [5, 15, 25, 35], # accel-z + [-10, -20, -30, -40], # gyro-x + [1, 2, 3, 4], # gyro-y + [6, 7, 8, 9], # gyro-z + ], + dtype=np.float32, + ) + + np.testing.assert_equal(val_x, expected_val_y) + + with open(sample_csv_dir / "validation.csv", "r") as f: + val_df = pd.read_csv(f) + for r in range(10): + sample = val_df.iloc[r] + sample_values = ( + [sample[f"accel-x-{i}"] for i in range(4)] + + [sample[f"accel-y-{i}"] for i in range(4)] + + [sample[f"accel-z-{i}"] for i in range(4)] + + [sample[f"gyro-x-{i}"] for i in range(4)] + + [sample[f"gyro-y-{i}"] for i in range(4)] + + [sample[f"gyro-z-{i}"] for i in range(4)] + ) + + val_x = val_dataset[r][0] + assert np.array_equal(val_x.flatten(), np.array(sample_values)) + + +@pytest.mark.parametrize("data_percentage", [0.1, 0.5, 0.75, 1.0]) +def test_data_percentage(sample_csv_dir, data_percentage): + data_module = MultiModalHARSeriesDataModule( + data_path=sample_csv_dir, + feature_prefixes=[ + "accel-x", + "accel-y", + "accel-z", + "gyro-x", + "gyro-y", + "gyro-z", + ], + features_as_channels=True, + label="standard activity code", + batch_size=4, + cast_to="float32", + shuffle_train=True, + data_percentage=data_percentage, + ) + + data_module.setup("fit") + data_module.setup("test") + + assert len(data_module.datasets["train"][0]) == int(100 * data_percentage) + assert len(data_module.datasets["validation"][0]) == 100 + assert len(data_module.datasets["test"][0]) == 100 + + +@pytest.mark.parametrize("samples_per_class", [1, 2, 7, 10, 25]) +def test_samples_per_class(sample_csv_dir, samples_per_class): + """Test that samples_per_class gives exactly the requested number per class.""" + data_module = MultiModalHARSeriesDataModule( + data_path=sample_csv_dir, + feature_prefixes=[ + "accel-x", + "accel-y", + "accel-z", + "gyro-x", + "gyro-y", + "gyro-z", + ], + features_as_channels=True, + label="standard activity code", + batch_size=4, + cast_to="float32", + shuffle_train=True, + samples_per_class=samples_per_class, + seed=42, + ) + + data_module.setup("fit") + data_module.setup("test") + + train_dataset = data_module.datasets["train"][0] + + class_counts = {} + for i in range(len(train_dataset)): + _, y = train_dataset[i] + class_counts[y] = class_counts.get(y, 0) + 1 + + for count in class_counts.values(): + assert ( + count == samples_per_class + ), f"Should have exactly {samples_per_class} samples per class" + + +def test_samples_per_class_cumulative(sample_csv_dir): + data_module_1 = MultiModalHARSeriesDataModule( + data_path=sample_csv_dir, + feature_prefixes=[ + "accel-x", + "accel-y", + "accel-z", + "gyro-x", + "gyro-y", + "gyro-z", + ], + features_as_channels=True, + label="standard activity code", + batch_size=4, + cast_to="float32", + shuffle_train=True, + samples_per_class=10, + seed=42, + ) + + data_module_2 = MultiModalHARSeriesDataModule( + data_path=sample_csv_dir, + feature_prefixes=[ + "accel-x", + "accel-y", + "accel-z", + "gyro-x", + "gyro-y", + "gyro-z", + ], + features_as_channels=True, + label="standard activity code", + batch_size=4, + cast_to="float32", + shuffle_train=True, + samples_per_class=20, + seed=42, + ) + + data_module_1.setup("fit") + data_module_2.setup("fit") + + train_dataset_1 = data_module_1.datasets["train"][0] + train_dataset_2 = data_module_2.datasets["train"][0] + + xs_1 = [train_dataset_1[i][0] for i in range(len(train_dataset_1))] + xs_2 = [train_dataset_2[i][0] for i in range(len(train_dataset_2))] + + # Convert each sample to a hashable tuple for easier comparison + xs_1_set = set([tuple(sample.flatten()) for sample in xs_1]) + xs_2_set = set([tuple(sample.flatten()) for sample in xs_2]) + + # Assert that all samples in xs_1 are contained in xs_2 + assert xs_1_set.issubset( + xs_2_set + ), "Not all samples from the smaller subset are present in the larger subset" + + +# Should not be subsets if seeds are different +def test_samples_per_class_cumulative_different_seeds(sample_csv_dir): + data_module_1 = MultiModalHARSeriesDataModule( + data_path=sample_csv_dir, + feature_prefixes=[ + "accel-x", + "accel-y", + "accel-z", + "gyro-x", + "gyro-y", + "gyro-z", + ], + features_as_channels=True, + label="standard activity code", + batch_size=4, + cast_to="float32", + shuffle_train=True, + samples_per_class=10, + seed=42, + ) + + data_module_2 = MultiModalHARSeriesDataModule( + data_path=sample_csv_dir, + feature_prefixes=[ + "accel-x", + "accel-y", + "accel-z", + "gyro-x", + "gyro-y", + "gyro-z", + ], + features_as_channels=True, + label="standard activity code", + batch_size=4, + cast_to="float32", + shuffle_train=True, + samples_per_class=10, + seed=43, + ) + + data_module_1.setup("fit") + data_module_2.setup("fit") + + train_dataset_1 = data_module_1.datasets["train"][0] + train_dataset_2 = data_module_2.datasets["train"][0] + + xs_1 = [train_dataset_1[i][0] for i in range(len(train_dataset_1))] + xs_2 = [train_dataset_2[i][0] for i in range(len(train_dataset_2))] + + # Convert each sample to a hashable tuple for easier comparison + xs_1_set = set([tuple(sample.flatten()) for sample in xs_1]) + xs_2_set = set([tuple(sample.flatten()) for sample in xs_2]) + + # Assert that all samples in xs_1 are contained in xs_2 + assert not xs_1_set.issubset( + xs_2_set + ), "Samples from different seeds should not be subsets of each other" + + +def test_error_data_percentage_and_samples_per_class(sample_csv_dir): + """Test that an error is raised if both data_percentage and samples_per_class are set.""" + with pytest.raises( + ValueError, match="Cannot use both data_percentage and samples_per_class" + ): + MultiModalHARSeriesDataModule( + data_path=sample_csv_dir, + feature_prefixes=[ + "accel-x", + "accel-y", + "accel-z", + "gyro-x", + "gyro-y", + "gyro-z", + ], + features_as_channels=True, + label="standard activity code", + batch_size=4, + cast_to="float32", + shuffle_train=True, + data_percentage=0.5, + samples_per_class=10, + ) + + +def test_error_data_percentage(sample_csv_dir): + """Test that an error is raised if data_percentage is not between 0 and 1.""" + with pytest.raises(ValueError, match="data_percentage must be between 0 and 1"): + MultiModalHARSeriesDataModule( + data_path=sample_csv_dir, + feature_prefixes=[ + "accel-x", + "accel-y", + "accel-z", + "gyro-x", + "gyro-y", + "gyro-z", + ], + features_as_channels=True, + label="standard activity code", + batch_size=4, + cast_to="float32", + shuffle_train=True, + data_percentage=-0.1, + ) + + with pytest.raises(ValueError, match="data_percentage must be between 0 and 1"): + MultiModalHARSeriesDataModule( + data_path=sample_csv_dir, + feature_prefixes=[ + "accel-x", + "accel-y", + "accel-z", + "gyro-x", + "gyro-y", + "gyro-z", + ], + features_as_channels=True, + label="standard activity code", + batch_size=4, + cast_to="float32", + shuffle_train=True, + data_percentage=1.1, + ) + + +def test_error_samples_per_class(sample_csv_dir): + """Test that an error is raised if samples_per_class is not a positive integer.""" + with pytest.raises( + ValueError, match="samples_per_class must be a positive integer" + ): + data_module = MultiModalHARSeriesDataModule( + data_path=sample_csv_dir, + feature_prefixes=[ + "accel-x", + "accel-y", + "accel-z", + "gyro-x", + "gyro-y", + "gyro-z", + ], + features_as_channels=True, + label="standard activity code", + batch_size=4, + cast_to="float32", + shuffle_train=True, + samples_per_class=-1, + ) + data_module.setup("fit") + + with pytest.raises( + ValueError, match="samples_per_class must be a positive integer" + ): + data_module = MultiModalHARSeriesDataModule( + data_path=sample_csv_dir, + feature_prefixes=[ + "accel-x", + "accel-y", + "accel-z", + "gyro-x", + "gyro-y", + "gyro-z", + ], + features_as_channels=True, + label="standard activity code", + batch_size=4, + cast_to="float32", + shuffle_train=True, + samples_per_class=0, + ) + data_module.setup("fit") + + with pytest.raises(ValueError): + data_module = MultiModalHARSeriesDataModule( + data_path=sample_csv_dir, + feature_prefixes=[ + "accel-x", + "accel-y", + "accel-z", + "gyro-x", + "gyro-y", + "gyro-z", + ], + features_as_channels=True, + label="standard activity code", + batch_size=4, + cast_to="float32", + shuffle_train=True, + samples_per_class=100000, + ) + data_module.setup("fit") diff --git a/tests/data/data_modules/test_data_module_tnc.py b/tests/data/data_modules/test_data_module_tnc.py new file mode 100644 index 0000000..7266175 --- /dev/null +++ b/tests/data/data_modules/test_data_module_tnc.py @@ -0,0 +1,61 @@ +from pathlib import Path + +import numpy as np +import pytest + +from minerva.data.data_modules.har_xu_23 import HarDataModule +from minerva.data.datasets.har_xu_23 import HarDataset, TNCDataset + + +@pytest.mark.parametrize("use_val_with_train", [True, False]) +def test_har_data_module(tmp_path, use_val_with_train): + """ + Test function to verify the behavior of HarDataModule when `use_val_with_train` is True or False. + + Parameters + ---------- + tmp_path : Path + Pytest fixture providing a temporary directory unique to the test + use_val_with_train : bool + If True, the validation data will be concatenated with the training data. + If False, the validation data will remain separate. + """ + # Generate dummy data + n_samples_train = 10 + n_samples_val = 5 + n_samples_test = 5 + n_timesteps = 100 + n_channels = 6 + + # Create dummy training, validation, and test data + train_data = np.random.rand(n_samples_train, n_timesteps, n_channels) + val_data = np.random.rand(n_samples_val, n_timesteps, n_channels) + test_data = np.random.rand(n_samples_test, n_timesteps, n_channels) + + # Save dummy data to temporary directory + np.save(tmp_path / "train_data.npy", train_data) + np.save(tmp_path / "val_data.npy", val_data) + np.save(tmp_path / "test_data.npy", test_data) + + # Initialize HarDataModule with dummy data + data_module = HarDataModule( + processed_data_dir=tmp_path, + window_size=60, + batch_size=64, + adf=False, + use_val_with_train=use_val_with_train, + ) + + # Verify concatenation behavior + if use_val_with_train: + expected_train_shape = ( + n_samples_train + n_samples_val, + n_timesteps, + n_channels, + ) + assert data_module.har_train.shape == expected_train_shape + else: + expected_train_shape = (n_samples_train, n_timesteps, n_channels) + expected_val_shape = (n_samples_val, n_timesteps, n_channels) + assert data_module.har_train.shape == expected_train_shape + assert data_module.har_val.shape == expected_val_shape diff --git a/tests/data/datasets/test_har_rodrigues_24.py b/tests/data/datasets/test_har_rodrigues_24.py new file mode 100644 index 0000000..478f0cf --- /dev/null +++ b/tests/data/datasets/test_har_rodrigues_24.py @@ -0,0 +1,304 @@ +from pathlib import Path + +import numpy as np +import pandas as pd +import pytest +import torch + +from minerva.data.datasets.har_rodrigues_24 import ( + HARDatasetCPC, + norm_shape, + opp_sliding_window, + sliding_window, +) + + +@pytest.fixture +def sample_data(tmp_path): + # Create sample data + data = { + "accel-x": np.random.rand(100), + "accel-y": np.random.rand(100), + "accel-z": np.random.rand(100), + "gyro-x": np.random.rand(100), + "gyro-y": np.random.rand(100), + "gyro-z": np.random.rand(100), + "activity code": np.random.randint(0, 5, 100), + } + df = pd.DataFrame(data) + + # Create train, val, test directories and save sample data + for phase in ["train", "val", "test"]: + phase_path = tmp_path / phase + phase_path.mkdir() + df.to_csv(phase_path / "sample.csv", index=False) + + return tmp_path + + +def test_norm_shape(): + x = norm_shape(shape=1) + assert isinstance(x, tuple) + assert x == (1,) + + x = norm_shape(shape=(1, 2, 3)) + assert isinstance(x, tuple) + assert x == (1, 2, 3) + + x = norm_shape(np.array([1, 2, 3])) + assert isinstance(x, tuple) + assert x == (1, 2, 3) + + with pytest.raises(TypeError): + norm_shape(shape="invalid_type") + + +def test_sliding_window(): + a = np.arange(10) + ws = 3 + result = sliding_window(a, ws, ss=1) + expected = np.array( + [ + [0, 1, 2], + [1, 2, 3], + [2, 3, 4], + [3, 4, 5], + [4, 5, 6], + [5, 6, 7], + [6, 7, 8], + [7, 8, 9], + ] + ) + assert np.array_equal(result, expected) + + result = sliding_window(a, ws, ss=2) + expected = np.array( + [ + [0, 1, 2], + [2, 3, 4], + [4, 5, 6], + [6, 7, 8], + ] + ) + assert np.array_equal(result, expected) + + # ss = ws, in this case (ss=3) + result = sliding_window(a, ws, ss=None) + expected = np.array( + [ + [0, 1, 2], + [3, 4, 5], + [6, 7, 8], + ] + ) + assert np.array_equal(result, expected) + + # ss is greater than ws + result = sliding_window(a, ws, ss=5) + expected = np.array( + [ + [0, 1, 2], + [5, 6, 7], + ] + ) + assert np.array_equal(result, expected) + + # ss is greater than array size + # Then, only one window should be returned + result = sliding_window(a, ws, ss=100) + expected = np.array( + [ + [0, 1, 2], + ] + ) + assert np.array_equal(result, expected) + + # A 1D tuple is passed as ws and ss instead of an int + result = sliding_window(a, ws=(ws,), ss=(100,)) + assert np.array_equal(result, expected) + + +def test_sliding_window_error(): + a = np.arange(10) + + # Window size is greater than the array size + with pytest.raises(ValueError): + sliding_window(a, ws=11, ss=1) + + # Window size is greater than the array size (and ss=ws) + with pytest.raises(ValueError): + sliding_window(a, ws=11, ss=None) + + # Window size is 0 (invalid) + with pytest.raises(ValueError): + sliding_window(a, ws=0, ss=1) + + # ss is 0 (invalid) + with pytest.raises(ValueError): + sliding_window(a, ws=5, ss=0) + + # ws and ss are int (will be normalized as a 1-element tuple), but a is 2D + a = np.arange(10).reshape(2, 5) + with pytest.raises(ValueError): + sliding_window(a, ws=1, ss=1) + + # ws has 3 elements and array is 2D + with pytest.raises(ValueError): + sliding_window(a, ws=(1, 1, 1), ss=(1,)) + + # ws has 2 elements and array is 1D + a = np.arange(10) + with pytest.raises(ValueError): + sliding_window(a, ws=(1, 1), ss=(1,)) + + # ss has 3 elements and array is 1D + with pytest.raises(ValueError): + sliding_window(a, ws=1, ss=(1, 1, 1)) + + # ws has one dim greater than a + a = np.arange(10) + with pytest.raises(ValueError): + sliding_window(a, ws=(11,), ss=None) + + +def test_opp_sliding_window(): + data_x = np.random.rand(100, 6) + data_y = np.random.randint(0, 5, 100) + ws = 10 + ss = 5 + data_x_windowed, data_y_windowed = opp_sliding_window(data_x, data_y, ws, ss) + assert data_x_windowed.shape == (19, 10, 6) + assert data_y_windowed.shape == (19,) + assert data_x_windowed.dtype == np.float32 + assert data_y_windowed.dtype == np.uint8 + + assert isinstance(data_x_windowed, np.ndarray) + assert isinstance(data_y_windowed, np.ndarray) + + +def test_hardatasetcpc_init(sample_data): + dataset = HARDatasetCPC( + data_path=sample_data, + input_size=6, + window=10, + overlap=5, + phase="train", + label="activity code", + ) + assert len(dataset) > 0 + assert dataset.data.shape[1] == 6 + assert dataset.data.shape[2] == 10 + + +def test_hardatasetcpc_getitem(sample_data): + dataset = HARDatasetCPC( + data_path=sample_data, + input_size=6, + window=10, + overlap=5, + phase="train", + label="activity code", + ) + data, label = dataset[0] + assert data.shape == (6, 10) + np.testing.assert_allclose(data, dataset.data[0]) + np.testing.assert_allclose(label, dataset.labels[0]) + + +def test_hardatasetcpc_len(sample_data): + dataset = HARDatasetCPC( + data_path=sample_data, + input_size=6, + window=10, + overlap=5, + phase="train", + label="activity code", + ) + assert len(dataset) == dataset.data.shape[0] + + +def test_hardatasetcpc_return_index_as_label(sample_data): + dataset = HARDatasetCPC( + data_path=sample_data, + input_size=6, + window=10, + overlap=5, + phase="train", + label="return_index_as_label", + ) + data, label = dataset[0] + assert data.shape == (6, 10) + assert label.shape == () + + +def test_hardatasetcpc_transpose_data_true(sample_data): + dataset = HARDatasetCPC( + data_path=sample_data, + input_size=6, + window=10, + overlap=5, + phase="train", + transpose_data=True, + label="activity code", + ) + data, _ = dataset[0] + assert data.shape == (6, 10) + + +def test_hardatasetcpc_transpose_data_false(sample_data): + dataset = HARDatasetCPC( + data_path=sample_data, + input_size=6, + window=10, + overlap=5, + phase="train", + transpose_data=False, + label="activity code", + ) + data, _ = dataset[0] + assert data.shape == (10, 6) + + +def test_hardatasetcpc_use_train_as_val(sample_data): + # use_train_as_val = True + dataset = HARDatasetCPC( + data_path=sample_data, + input_size=6, + window=10, + overlap=5, + phase="val", + use_train_as_val=True, + label="activity code", + ) + + np.testing.assert_allclose( + dataset.data_raw["val"]["data"], dataset.data_raw["train"]["data"] + ) + + +def test_hardatasetcpc_use_val_with_train(sample_data): + dataset = HARDatasetCPC( + data_path=sample_data, + input_size=6, + window=10, + overlap=5, + phase="train", + use_val_with_train=True, + label="activity code", + ) + + # Directly load raw train and val data + df_train = pd.read_csv(sample_data / "train/sample.csv") + df_val = pd.read_csv(sample_data / "val/sample.csv") + + # Manually concatenate data as expected + expected_data = np.concatenate( + [df_train.iloc[:, :-1].values, df_val.iloc[:, :-1].values], axis=0 + ) + expected_labels = np.concatenate( + [df_train.iloc[:, -1].values, df_val.iloc[:, -1].values], axis=0 + ) + + # Final comparison + np.testing.assert_allclose(dataset.data_raw["train"]["data"], expected_data) + np.testing.assert_allclose(dataset.data_raw["train"]["labels"], expected_labels) diff --git a/tests/data/datasets/test_har_xu_23.py b/tests/data/datasets/test_har_xu_23.py new file mode 100644 index 0000000..a11d913 --- /dev/null +++ b/tests/data/datasets/test_har_xu_23.py @@ -0,0 +1,202 @@ +import os +from typing import List, Tuple + +import numpy as np +import pytest +import torch +from sklearn.metrics.pairwise import cosine_similarity +from statsmodels.tsa.stattools import adfuller +from torch.utils.data import Dataset + +from minerva.data.datasets.har_xu_23 import HarDataset, TNCDataset +from minerva.utils.typing import PathLike + +#################### TNC DATASET #################### + + +@pytest.fixture +def tnc_dataset_params(): + n_samples = 100 + n_channels = 6 + n_timesteps = 1000 + mc_sample_size = 5 + window_size = 128 + epsilon = 3 + rng = np.random.RandomState(42) + data = rng.randn(n_samples, n_channels, n_timesteps) + + return ( + data, + mc_sample_size, + window_size, + epsilon, + ) + + +@pytest.fixture(params=[True, False]) +def tnc_dataset(request, tnc_dataset_params): + data, mc_sample_size, window_size, epsilon = tnc_dataset_params + adf = ( + request.param + ) # Parametrize ADF test for determining neighbors and non-neighbors + return TNCDataset( + x=data, + mc_sample_size=mc_sample_size, + window_size=window_size, + epsilon=epsilon, + adf=adf, + ) + + +def test_len_tnc_dataset(tnc_dataset): + assert len(tnc_dataset) == 100 + + +def test_getitem_tnc_dataset(tnc_dataset): + central_window, close_neighbors, non_neighbors = tnc_dataset[0] + assert central_window.shape == (128, 6) + assert close_neighbors.shape == (5, 128, 6) + assert non_neighbors.shape == (5, 128, 6) + + +def test_getitem_tnc_dataset_error_correlation(tnc_dataset): + # this should trick adf test to return a correlation error + # The data is only zeros, so the correlation is not defined + tnc_dataset.time_series = np.zeros_like(tnc_dataset.time_series) + central_window, close_neighbors, non_neighbors = tnc_dataset[0] + assert central_window.shape == (128, 6) + assert close_neighbors.shape == (5, 128, 6) + assert non_neighbors.shape == (5, 128, 6) + + tnc_dataset.time_series = np.ones_like(tnc_dataset.time_series) + central_window, close_neighbors, non_neighbors = tnc_dataset[0] + assert central_window.shape == (128, 6) + assert close_neighbors.shape == (5, 128, 6) + assert non_neighbors.shape == (5, 128, 6) + + +def test_tnc_dataset_small_time_series(): + """ + Tests whether TNCDataset can handle a very short time series. + Should gracefully handle cases where `n_timesteps < 2 * window_size`. + """ + n_samples = 10 + n_channels = 6 + n_timesteps = 10 # Smaller than 2 * window_size (128 * 2 = 256) + mc_sample_size = 3 + window_size = 128 + epsilon = 2 + data = np.random.randn(n_samples, n_channels, n_timesteps) + + dataset = TNCDataset( + x=data, + mc_sample_size=mc_sample_size, + window_size=window_size, + epsilon=epsilon, + adf=True, + ) + + with pytest.raises(ValueError): + sample = dataset[0] + + +def test_tnc_dataset_identical_samples(tnc_dataset): + """ + Tests if TNCDataset can handle cases where all samples are identical. + The cosine similarity should be perfect, but it should still return + a diverse set of neighbors. + """ + tnc_dataset.time_series = np.ones_like(tnc_dataset.time_series) + _, close_neighbors, _ = tnc_dataset[0] + + assert close_neighbors.shape == (5, 128, 6) + assert not np.allclose( + close_neighbors, np.zeros_like(close_neighbors) + ), "Close neighbors should not be all zeros." + + +#################### HAR DATASET #################### +@pytest.fixture +def har_dataset_params(tmp_path): + n_samples = 100 + n_timesteps = 128 + n_features = 6 + rng = np.random.RandomState(42) + data = rng.randn(n_samples, n_timesteps, n_features) + labels = rng.randint(0, 10, size=(n_samples,)) + + data_path = tmp_path / "data" + data_path.mkdir() + np.save(data_path / "train_data_subseq.npy", data) + np.save(data_path / "train_labels_subseq.npy", labels) + np.save(data_path / "val_data_subseq.npy", data) + np.save(data_path / "val_labels_subseq.npy", labels) + np.save(data_path / "test_data_subseq.npy", data) + np.save(data_path / "test_labels_subseq.npy", labels) + + return data_path, "train" + + +@pytest.fixture +def har_dataset(har_dataset_params): + data_path, annotate = har_dataset_params + return HarDataset(data_path=data_path, annotate=annotate) + + +def test_len_har_dataset(har_dataset): + assert len(har_dataset) == 100 + + +def test_getitem_har_dataset(har_dataset): + features, label = har_dataset[0] + assert features.shape == (128, 6) + assert isinstance(label, torch.Tensor) + assert label.dtype == torch.long + + +def test_getitem_har_dataset_flatten(har_dataset_params): + data_path, annotate = har_dataset_params + dataset = HarDataset(data_path=data_path, annotate=annotate, flatten=True) + features, label = dataset[0] + assert features.shape == (128 * 6,) + assert isinstance(label, torch.Tensor) + assert label.dtype == torch.long + + +def test_har_dataset_invalid_path(): + with pytest.raises(FileNotFoundError): + HarDataset(data_path="/invalid/path", annotate="train") + + +def test_har_dataset_mismatched_data_labels(tmp_path): + n_samples = 100 + n_timesteps = 128 + n_features = 6 + rng = np.random.RandomState(42) + data = rng.randn(n_samples, n_timesteps, n_features) + labels = rng.randint(0, 10, size=(n_samples - 1,)) # Mismatched length + + data_path = tmp_path / "data" + data_path.mkdir() + np.save(data_path / "train_data_subseq.npy", data) + np.save(data_path / "train_labels_subseq.npy", labels) + + with pytest.raises(AssertionError): + HarDataset(data_path=data_path, annotate="train") + + +def test_har_dataset_different_annotate(har_dataset_params): + data_path, _ = har_dataset_params + dataset = HarDataset(data_path=data_path, annotate="val") + assert len(dataset) == 100 + features, label = dataset[0] + assert features.shape == (128, 6) + assert isinstance(label, torch.Tensor) + assert label.dtype == torch.long + + dataset = HarDataset(data_path=data_path, annotate="test") + assert len(dataset) == 100 + features, label = dataset[0] + assert features.shape == (128, 6) + assert isinstance(label, torch.Tensor) + assert label.dtype == torch.long diff --git a/tests/data/datasets/test_series_dataset_folder_csv.py b/tests/data/datasets/test_series_dataset_folder_csv.py new file mode 100644 index 0000000..0009ec7 --- /dev/null +++ b/tests/data/datasets/test_series_dataset_folder_csv.py @@ -0,0 +1,276 @@ +from pathlib import Path +from tempfile import NamedTemporaryFile, TemporaryDirectory +from unittest.mock import MagicMock, patch + +import numpy as np +import pandas as pd +import pytest + +from minerva.data.datasets.series_dataset import ( + SeriesFolderCSVDataset, +) +from minerva.transforms.transform import _Transform + + +@pytest.fixture +def sample_data(tmp_path): + """Fixture to create sample CSV data in a temporary directory.""" + data_path = Path(tmp_path) + + # Sample 1 + df1 = pd.DataFrame( + { + "accel-x": [0.5, 0.68, 0.49], + "accel-y": [0.02, 0.02, 0.00], + "class": [1, 1, 1], + } + ) + df1.to_csv(data_path / "sample-1.csv", index=False) + + # Sample 2 (longer sequence) + df2 = pd.DataFrame( + { + "accel-x": [0.5, 0.68, 0.49, 3.14], + "accel-y": [0.02, 0.02, 0.00, 1.41], + "class": [0, 0, 0, 0], + } + ) + df2.to_csv(data_path / "sample-2.csv", index=False) + return data_path + + +def test_dataset_initialization(sample_data): + """Test dataset initializes correctly with default and custom parameters.""" + dataset = SeriesFolderCSVDataset( + sample_data, features=["accel-x", "accel-y"], label="class" + ) + + assert len(dataset) == 2 # Check number of samples + assert isinstance(dataset.features, list) + assert dataset.label == "class" + assert dataset.cast_to == "float32" + assert dataset.transforms == [] + + +def test_data_loading(sample_data): + """Test that data is correctly loaded and structured.""" + dataset = SeriesFolderCSVDataset( + sample_data, features=["accel-x", "accel-y"], label="class" + ) + + data, label = dataset[0] + + assert isinstance(data, np.ndarray) + assert isinstance(label, np.ndarray) + assert data.shape == (2, 3) # (features, time-steps) + assert label.shape == (3, 1) # (time-steps, 1) + + data, label = dataset[1] + assert data.shape == (2, 4) # (features, time-steps) + assert label.shape == (4, 1) # (time-steps, 1) + + +def test_data_loading_single_feature(sample_data): + """Test that data is correctly loaded and structured.""" + dataset = SeriesFolderCSVDataset(sample_data, features="accel-x", label="class") + + data, label = dataset[0] + + assert isinstance(data, np.ndarray) + assert isinstance(label, np.ndarray) + assert data.shape == (1, 3) # (features, time-steps) + assert label.shape == (3, 1) # (time-steps, 1) + np.testing.assert_allclose(data, np.array([[0.5, 0.68, 0.49]])) + + data, label = dataset[1] + assert data.shape == (1, 4) # (features, time-steps) + assert label.shape == (4, 1) # (time-steps, 1) + np.testing.assert_allclose(data, np.array([[0.5, 0.68, 0.49, 3.14]])) + + +def test_data_loading_without_label(sample_data): + """Test that data is correctly loaded and structured when no label is specified.""" + dataset = SeriesFolderCSVDataset( + sample_data, features=["accel-x", "accel-y"], label=None + ) + + data = dataset[0] + + assert isinstance(data, np.ndarray) + assert data.shape == (2, 3) # (features, time-steps) + np.testing.assert_allclose(data, np.array([[0.5, 0.68, 0.49], [0.02, 0.02, 0.00]])) + + +def test_data_loading_without_features(sample_data): + """Test that data is correctly loaded and structured when no features are specified.""" + dataset = SeriesFolderCSVDataset(sample_data, features=None, label="class") + + data, label = dataset[0] + + assert isinstance(data, np.ndarray) + assert data.shape == (2, 3) # (features, time-steps) + np.testing.assert_allclose(data, np.array([[0.5, 0.68, 0.49], [0.02, 0.02, 0.00]])) + + +def test_data_loading_without_features_and_label(sample_data): + """Test that data is correctly loaded and structured when no features and label are specified.""" + dataset = SeriesFolderCSVDataset(sample_data, features=None, label=None) + + data = dataset[0] + + assert isinstance(data, np.ndarray) + assert data.shape == (3, 3) # (features, time-steps) + np.testing.assert_allclose( + data, np.array([[0.5, 0.68, 0.49], [0.02, 0.02, 0.00], [1, 1, 1]]) + ) + + data = dataset[1] + assert isinstance(data, np.ndarray) + assert data.shape == (3, 4) # (features, time-steps) + np.testing.assert_allclose( + data, + np.array([[0.5, 0.68, 0.49, 3.14], [0.02, 0.02, 0.00, 1.41], [0, 0, 0, 0]]), + ) + + +def test_no_feature(sample_data): + """Test that an error is raised when an invalid feature is specified.""" + with pytest.raises(ValueError): + dataset = SeriesFolderCSVDataset(sample_data, features=[], label="class") + + +def test_lazy_loading(sample_data): + """Test that lazy loading defers file reading until accessed.""" + dataset = SeriesFolderCSVDataset( + sample_data, features=["accel-x", "accel-y"], label="class", lazy=True + ) + + # Before accessing, _cache should be None + assert dataset._cache is None + + # Accessing an item should load data from file + data, label = dataset[0] + assert data.shape == (2, 3) + assert label.shape == (3, 1) + + +def test_padding_functionality(sample_data): + """Test that dataset correctly pads sequences to the longest sample size.""" + dataset = SeriesFolderCSVDataset( + sample_data, features=["accel-x", "accel-y"], label="class", pad=True + ) + + # Shorter sequence (padded to 4 time steps) 3->4 time steps (reflect) + data, label = dataset[0] + assert data.shape == (2, 4) # Padded to 4 time steps + assert label.shape == (4, 1) # Labels should also be padded + np.testing.assert_allclose( + data, np.array([[0.5, 0.68, 0.49, 0.5], [0.02, 0.02, 0.00, 0.02]]) + ) + np.testing.assert_allclose(label, np.array([[1], [1], [1], [1]])) + + # Longer sequence (no padding is needed) + data, label = dataset[1] + assert data.shape == (2, 4) # Padded to 4 time steps + assert label.shape == (4, 1) # Labels should also be padded + np.testing.assert_allclose( + data, np.array([[0.5, 0.68, 0.49, 3.14], [0.02, 0.02, 0.00, 1.41]]) + ) + np.testing.assert_allclose(label, np.array([[0], [0], [0], [0]])) + + +def test_transforms(sample_data): + """Test that transforms are correctly applied to the dataset.""" + + class Transform(_Transform): + def __init__(self, multiplier): + self.multiplier = multiplier + + def __call__(self, x): + return x * self.multiplier + + dataset = SeriesFolderCSVDataset( + sample_data, + features=["accel-x", "accel-y"], + label="class", + transforms=[Transform(2)], + ) + + data, label = dataset[0] + assert data.shape == (2, 3) # (features, time-steps) + np.testing.assert_allclose( + data, np.array([[0.5, 0.68, 0.49], [0.02, 0.02, 0.00]]) * 2 + ) + + data, label = dataset[1] + assert data.shape == (2, 4) # (features, time-steps) + np.testing.assert_allclose( + data, np.array([[0.5, 0.68, 0.49, 3.14], [0.02, 0.02, 0.00, 1.41]]) * 2 + ) + + # Single transform + dataset = SeriesFolderCSVDataset( + sample_data, + features=["accel-x", "accel-y"], + label="class", + transforms=Transform(2), + ) + + data, label = dataset[0] + assert data.shape == (2, 3) # (features, time-steps) + np.testing.assert_allclose( + data, np.array([[0.5, 0.68, 0.49], [0.02, 0.02, 0.00]]) * 2 + ) + + data, label = dataset[1] + assert data.shape == (2, 4) # (features, time-steps) + np.testing.assert_allclose( + data, np.array([[0.5, 0.68, 0.49, 3.14], [0.02, 0.02, 0.00, 1.41]]) * 2 + ) + + # Multiple transforms + dataset = SeriesFolderCSVDataset( + sample_data, + features=["accel-x", "accel-y"], + label="class", + transforms=[Transform(2), Transform(3)], + ) + + data, label = dataset[0] + assert data.shape == (2, 3) # (features, time-steps) + np.testing.assert_allclose( + data, np.array([[0.5, 0.68, 0.49], [0.02, 0.02, 0.00]]) * 6 + ) + + data, label = dataset[1] + assert data.shape == (2, 4) # (features, time-steps) + np.testing.assert_allclose( + data, np.array([[0.5, 0.68, 0.49, 3.14], [0.02, 0.02, 0.00, 1.41]]) * 6 + ) + + +def test_string_representation(sample_data): + """Test the __str__ and __repr__ methods.""" + dataset = SeriesFolderCSVDataset(sample_data) + assert str(dataset) == repr(dataset) + assert f"SeriesFolderCSVDataset at {sample_data}" in str(dataset) + + +def test_empty_directory(): + """Test dataset behavior with an empty directory.""" + with TemporaryDirectory() as tmpdir: + with pytest.raises(ValueError): + dataset = SeriesFolderCSVDataset(tmpdir) + + +def test_invalid_directory(): + """Test dataset behavior with an invalid directory.""" + with pytest.raises(ValueError): + dataset = SeriesFolderCSVDataset("invalid-directory") + + +def test_not_directory(): + """Test dataset behavior with a non-directory path.""" + with NamedTemporaryFile() as tmpfile: + with pytest.raises(ValueError): + dataset = SeriesFolderCSVDataset(tmpfile.name) diff --git a/tests/data/datasets/test_series_dataset_multimodal.py b/tests/data/datasets/test_series_dataset_multimodal.py new file mode 100644 index 0000000..87bed44 --- /dev/null +++ b/tests/data/datasets/test_series_dataset_multimodal.py @@ -0,0 +1,378 @@ +import contextlib +from collections.abc import Iterable +from pathlib import Path +from typing import Callable, Dict, List, Optional, Tuple, Union + +import numpy as np +import pandas as pd +import pytest +from torch.utils.data import Dataset + +from minerva.data.datasets.series_dataset import MultiModalSeriesCSVDataset +from minerva.transforms.transform import _Transform + + +@pytest.fixture +def sample_csv(tmp_path): + data = { + "accel-x-0": [0.502123, 0.6820123, 0.498217], + "accel-x-1": [0.02123, 0.02123, 0.00001], + "accel-y-0": [0.502123, 0.502123, 1.414141], + "accel-y-1": [0.502123, 0.502123, 3.141592], + "class": [0, 1, 2], + } + df = pd.DataFrame(data) + csv_path = tmp_path / "data.csv" + df.to_csv(csv_path, index=False) + return csv_path + + +def test_dataset_length(sample_csv): + dataset = MultiModalSeriesCSVDataset(data_path=sample_csv, label="class") + assert len(dataset) == 3 + + +def test_dataset_shape_features_as_channels(sample_csv): + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel-x", "accel-y"], + label="class", + features_as_channels=True, + ) + data, label = dataset[0] + assert data.shape == (2, 2) + assert label == 0 + + +def test_dataset_shape_features_as_vector(sample_csv): + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel-x", "accel-y"], + label="class", + features_as_channels=False, + ) + data, label = dataset[0] + assert data.shape == (4,) + assert label == 0 + + +def test_dataset_without_label(sample_csv): + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel-x", "accel-y"], + label=None, + features_as_channels=True, + ) + data = dataset[0] + assert data.shape == (2, 2) + + +def test_dataset_without_label_as_vector(sample_csv): + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel-x", "accel-y"], + label=None, + features_as_channels=False, + ) + data = dataset[0] + assert data.shape == (4,) + + +def test_dataset_without_any_feature(sample_csv): + with pytest.raises(ValueError): + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=[], + label="class", + features_as_channels=True, + ) + + +def test_dataset_with_single_feature(sample_csv): + # ------ Feature as channel = True ----- + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel-x-0"], + label="class", + features_as_channels=True, + ) + data, label = dataset[0] + assert data.shape == (1, 1) + assert label == 0 + np.testing.assert_allclose(data, np.array([[0.502123]])) + + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel"], + label="class", + features_as_channels=True, + ) + data, label = dataset[0] + assert data.shape == (1, 4) + assert label == 0 + np.testing.assert_allclose( + data, np.array([[0.502123, 0.02123, 0.502123, 0.502123]]) + ) + + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes="accel", + label="class", + features_as_channels=True, + ) + data, label = dataset[0] + assert data.shape == (1, 4) + assert label == 0 + np.testing.assert_allclose( + data, np.array([[0.502123, 0.02123, 0.502123, 0.502123]]) + ) + + # Feature as channel = False + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel-x-0"], + label="class", + features_as_channels=False, + ) + data, label = dataset[0] + assert data.shape == (1,) + assert label == 0 + np.testing.assert_allclose(data, np.array([0.502123])) + + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel"], + label="class", + features_as_channels=False, + ) + data, label = dataset[0] + assert data.shape == (4,) + assert label == 0 + np.testing.assert_allclose(data, np.array([0.502123, 0.02123, 0.502123, 0.502123])) + + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes="accel", + label="class", + features_as_channels=False, + ) + data, label = dataset[0] + assert data.shape == (4,) + assert label == 0 + np.testing.assert_allclose(data, np.array([0.502123, 0.02123, 0.502123, 0.502123])) + + +def test_dataset_without_label_and_single_feature(sample_csv): + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel-x-0"], + label=None, + features_as_channels=True, + ) + data = dataset[0] + assert data.shape == (1, 1) + np.testing.assert_allclose(data, np.array([[0.502123]])) + + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel"], + label=None, + features_as_channels=True, + ) + data = dataset[0] + assert data.shape == (1, 4) + np.testing.assert_allclose( + data, np.array([[0.502123, 0.02123, 0.502123, 0.502123]]) + ) + + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes="accel", + label=None, + features_as_channels=True, + ) + data = dataset[0] + assert data.shape == (1, 4) + np.testing.assert_allclose( + data, np.array([[0.502123, 0.02123, 0.502123, 0.502123]]) + ) + + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=None, + label=None, + features_as_channels=True, + ) + data = dataset[0] + assert data.shape == (5, 1) + np.testing.assert_allclose( + data, np.array([[0.502123], [0.02123], [0.502123], [0.502123], [0]]) + ) + + # Feature as channel = False + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel-x-0"], + label=None, + features_as_channels=False, + ) + data = dataset[0] + assert data.shape == (1,) + np.testing.assert_allclose(data, np.array([0.502123])) + + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel"], + label=None, + features_as_channels=False, + ) + data = dataset[0] + assert data.shape == (4,) + np.testing.assert_allclose(data, np.array([0.502123, 0.02123, 0.502123, 0.502123])) + + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes="accel", + label=None, + features_as_channels=False, + ) + data = dataset[0] + assert data.shape == (4,) + np.testing.assert_allclose(data, np.array([0.502123, 0.02123, 0.502123, 0.502123])) + + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=None, + label=None, + features_as_channels=False, + ) + data = dataset[0] + assert data.shape == (5,) + np.testing.assert_allclose( + data, np.array([0.502123, 0.02123, 0.502123, 0.502123, 0]) + ) + + +def test_dataset_with_transform(sample_csv): + class Transform(_Transform): + def __init__(self, multiplier: int): + self.multiplier = multiplier + + def __call__(self, data): + return data * self.multiplier + + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel-x", "accel-y"], + label="class", + features_as_channels=True, + transforms=[Transform(multiplier=2)], + ) + data, label = dataset[0] + np.testing.assert_allclose( + data, + np.array([[0.502123 * 2, 0.02123 * 2], [0.502123 * 2, 0.502123 * 2]]), + ) + assert label == 0 + + # Without a list of transforms + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel-x", "accel-y"], + label="class", + features_as_channels=True, + transforms=Transform(multiplier=2), + ) + data, label = dataset[0] + np.testing.assert_allclose( + data, + np.array([[0.502123 * 2, 0.02123 * 2], [0.502123 * 2, 0.502123 * 2]]), + ) + assert label == 0 + + # With multiple transforms + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel-x", "accel-y"], + label="class", + features_as_channels=True, + transforms=[Transform(multiplier=2), Transform(multiplier=3)], + ) + data, label = dataset[0] + np.testing.assert_allclose( + data, + np.array([[0.502123 * 6, 0.02123 * 6], [0.502123 * 6, 0.502123 * 6]]), + ) + assert label == 0 + + +def test_dataset_with_map_labels(sample_csv): + map_labels = {0: 10, 1: 20, 2: 30} + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel-x", "accel-y"], + label="class", + features_as_channels=True, + map_labels=map_labels, + ) + data, label = dataset[0] + assert label == 10 + + data, label = dataset[1] + assert label == 20 + + data, label = dataset[2] + assert label == 30 + + +def test_dataset_with_invalid_map_labels(sample_csv): + with pytest.raises(ValueError): + map_labels = {0: 10, 1: 20} + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel-x", "accel-y"], + label="class", + features_as_channels=True, + map_labels=map_labels, + ) + + +def test_dataset_str(sample_csv): + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel-x", "accel-y"], + label="class", + features_as_channels=True, + ) + assert str(dataset) == f"MultiModalSeriesCSVDataset at {sample_csv} (3 samples)" + assert repr(dataset) == f"MultiModalSeriesCSVDataset at {sample_csv} (3 samples)" + + +def test_dataset_with_invalid_label(sample_csv): + with pytest.raises(ValueError): + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel-x", "accel-y"], + label="invalid", + features_as_channels=True, + ) + + +def test_dataset_with_return_index_as_label(sample_csv): + dataset = MultiModalSeriesCSVDataset( + data_path=sample_csv, + feature_prefixes=["accel-x", "accel-y"], + label="return_index_as_label", + features_as_channels=True, + map_labels={0: 10, 1: 20, 2: 30}, + ) + labels_from_dataset = dataset[:][1] + ground_truth = np.arange(len(dataset)) + + while sum(abs(labels_from_dataset - ground_truth)) == 0: + np.random.shuffle(labels_from_dataset) + + assert len(labels_from_dataset) == len(ground_truth) + assert sum(abs(labels_from_dataset - ground_truth)) > 0 + sorted_labels_from_dataset = np.sort(labels_from_dataset) + assert sum(abs(sorted_labels_from_dataset - ground_truth)) == 0 diff --git a/tests/models/nets/test_diet_linear.py b/tests/models/nets/test_diet_linear.py new file mode 100644 index 0000000..c0fc896 --- /dev/null +++ b/tests/models/nets/test_diet_linear.py @@ -0,0 +1,23 @@ +import torch + +from minerva.models.nets.diet_linear import AdaptedHead, DIETLinear + + +def test_diet_linear(): + model = DIETLinear(in_features=256, out_features=100) + assert model is not None + + x = torch.rand(32, 256) + y = model(x) + assert y is not None + + +def test_adapted_diet_linear(): + model = DIETLinear(in_features=256, out_features=100) + assert model is not None + adapted_model = AdaptedHead(model=model, adapter=lambda x: x.reshape(32, -1)) + assert adapted_model is not None + + x = torch.rand(32, 128, 2) + y = adapted_model(x) + assert y is not None diff --git a/tests/models/nets/test_lfr_har_architectures.py b/tests/models/nets/test_lfr_har_architectures.py index f070982..8530706 100644 --- a/tests/models/nets/test_lfr_har_architectures.py +++ b/tests/models/nets/test_lfr_har_architectures.py @@ -1,11 +1,12 @@ +import torch + from minerva.models.nets.lfr_har_architectures import ( HARSCnnEncoder, - LFR_HAR_Projector, LFR_HAR_Predictor, - LFR_HAR_Projector_List, LFR_HAR_Predictor_List, + LFR_HAR_Projector, + LFR_HAR_Projector_List, ) -import torch def test_lfr_har_backbone(): diff --git a/tests/models/ssl/test_diet.py b/tests/models/ssl/test_diet.py new file mode 100644 index 0000000..0cbdad1 --- /dev/null +++ b/tests/models/ssl/test_diet.py @@ -0,0 +1,221 @@ +import lightning as L +import pytest +import torch + +from minerva.models.ssl.diet import DIET +from tests.pipelines.test_lightning_pipeline import MyDataModule + + +@pytest.mark.parametrize("num_samples", [100, 200]) +@pytest.mark.parametrize("batch_size", [1, 64]) +def test_diet_basic(num_samples, batch_size): + # Variables + features, model_encoding_size = 50, 10 + # Simple dataset + random_x = torch.rand((num_samples, features)) + data_index = torch.arange(0, len(random_x)) + dataset = torch.utils.data.TensorDataset(random_x, data_index) + datamodule = MyDataModule(dataset=dataset, batch_size=batch_size) + # Simple DIET model + simple_backbone = torch.nn.Linear(features, model_encoding_size) + linear_head = torch.nn.Linear(model_encoding_size, len(random_x)) + model = DIET( + backbone=simple_backbone, + linear_head=linear_head, + num_data=None, + flatten=True, + adapter=None, + loss=None, + learning_rate=3e-4, + weight_decay=3e-4, + wca_scheduler_total_epochs=None, + ) + # Simple trainer + trainer = L.Trainer( + max_epochs=1, + enable_progress_bar=False, + enable_model_summary=False, + logger=False, + accelerator="cpu", + devices=1, + enable_checkpointing=False, + ) + # Simple training + trainer.fit(model=model, datamodule=datamodule) + + +@pytest.mark.parametrize("num_samples", [50, 100]) +@pytest.mark.parametrize("batch_size", [1, 64]) +def test_diet_without_linear_head(num_samples, batch_size): + # Variables + features, model_encoding_size = 50, 10 + # Simple dataset + random_x = torch.rand((num_samples, features)) + data_index = torch.arange(0, len(random_x)) + dataset = torch.utils.data.TensorDataset(random_x, data_index) + datamodule = MyDataModule(dataset=dataset, batch_size=batch_size) + # Simple DIET model + simple_backbone = torch.nn.Linear(features, model_encoding_size) + model = DIET( + backbone=simple_backbone, + linear_head=None, + num_data=None, + flatten=True, + adapter=None, + loss=None, + learning_rate=3e-4, + weight_decay=3e-4, + wca_scheduler_total_epochs=None, + ) + # Simple trainer + trainer = L.Trainer( + max_epochs=1, + enable_progress_bar=False, + enable_model_summary=False, + logger=False, + accelerator="cpu", + devices=1, + enable_checkpointing=False, + ) + # Simple training + trainer.fit(model=model, datamodule=datamodule) + + assert model.linear_head is not None + assert model.linear_head.in_features == model_encoding_size + assert model.linear_head.out_features == len(dataset) + + +@pytest.mark.parametrize("num_samples", [50, 100]) +@pytest.mark.parametrize("batch_size", [1, 64]) +def test_diet_with_wrong_linear(num_samples, batch_size): + # Variables + features, model_encoding_size = 50, 10 + # Simple dataset + random_x = torch.rand((num_samples, features)) + data_index = torch.arange(0, len(random_x)) + dataset = torch.utils.data.TensorDataset(random_x, data_index) + datamodule = MyDataModule(dataset=dataset, batch_size=batch_size) + # Simple DIET model + simple_backbone = torch.nn.Linear(features, model_encoding_size) + + # WARNING CASE + # The linear head output exceeds dataset length + linear_head = torch.nn.Linear(model_encoding_size, len(random_x) + 1) + model = DIET( + backbone=simple_backbone, + linear_head=linear_head, + num_data=None, + flatten=True, + adapter=None, + loss=None, + learning_rate=3e-4, + weight_decay=3e-4, + wca_scheduler_total_epochs=None, + ) + # Simple trainer + trainer = L.Trainer( + max_epochs=1, + enable_progress_bar=False, + enable_model_summary=False, + logger=False, + accelerator="cpu", + devices=1, + enable_checkpointing=False, + ) + # Simple training + with pytest.raises( + AssertionError, + match=f"Number of samples\\({num_samples}\\) and output of linear head\\({linear_head.out_features}\\) do not match.", + ): + trainer.fit(model=model, datamodule=datamodule) + + # The linear head output exceeds dataset length and num_data is provided + linear_head = torch.nn.Linear(model_encoding_size, len(random_x) + 1) + model = DIET( + backbone=simple_backbone, + linear_head=linear_head, + num_data=len(random_x), + flatten=True, + adapter=None, + loss=None, + learning_rate=3e-4, + weight_decay=3e-4, + wca_scheduler_total_epochs=None, + ) + # Simple trainer + trainer = L.Trainer( + max_epochs=1, + enable_progress_bar=False, + enable_model_summary=False, + logger=False, + accelerator="cpu", + devices=1, + enable_checkpointing=False, + ) + # Simple training + with pytest.raises( + AssertionError, + match=f"Number of samples\\({num_samples}\\) and output of linear head\\({linear_head.out_features}\\) do not match.", + ): + trainer.fit(model=model, datamodule=datamodule) + + # ERROR CASE + # The linear head output is less than dataset length + linear_head = torch.nn.Linear(model_encoding_size, len(random_x) - 1) + model = DIET( + backbone=simple_backbone, + linear_head=linear_head, + num_data=None, + flatten=True, + adapter=None, + loss=None, + learning_rate=3e-4, + weight_decay=3e-4, + wca_scheduler_total_epochs=None, + ) + # Simple trainer + trainer = L.Trainer( + max_epochs=1, + enable_progress_bar=False, + enable_model_summary=False, + logger=False, + accelerator="cpu", + devices=1, + enable_checkpointing=False, + ) + # Simple training + with pytest.raises( + AssertionError, + match=f"Number of samples\\({num_samples}\\) and output of linear head\\({linear_head.out_features}\\) do not match.", + ): + trainer.fit(model=model, datamodule=datamodule) + + # The linear head output is less than dataset length and num_data is provided + linear_head = torch.nn.Linear(model_encoding_size, len(random_x) - 1) + model = DIET( + backbone=simple_backbone, + linear_head=linear_head, + num_data=len(random_x), + flatten=True, + adapter=None, + loss=None, + learning_rate=3e-4, + weight_decay=3e-4, + wca_scheduler_total_epochs=None, + ) + # Simple trainer + trainer = L.Trainer( + max_epochs=1, + enable_progress_bar=False, + enable_model_summary=False, + logger=False, + accelerator="cpu", + devices=1, + enable_checkpointing=False, + ) + # Simple training + with pytest.raises( + AssertionError, + match=f"Number of samples\\({num_samples}\\) and output of linear head\\({linear_head.out_features}\\) do not match.", + ): + trainer.fit(model=model, datamodule=datamodule) diff --git a/tests/models/ssl/test_lfr_implementation_har.py b/tests/models/ssl/test_lfr_implementation_har.py index 17de8d9..778ef10 100644 --- a/tests/models/ssl/test_lfr_implementation_har.py +++ b/tests/models/ssl/test_lfr_implementation_har.py @@ -1,12 +1,12 @@ -from minerva.models.ssl.lfr import LearnFromRandomnessModel -from minerva.models.nets.lfr_har_architectures import HARSCnnEncoder import torch +import torch.nn.functional as F + from minerva.models.nets.lfr_har_architectures import ( HARSCnnEncoder, - LFR_HAR_Projector_List, LFR_HAR_Predictor_List, + LFR_HAR_Projector_List, ) -import torch.nn.functional as F +from minerva.models.ssl.lfr import LearnFromRandomnessModel # These tests should ensure that the LFR implementation matches the code in https://github.com/layer6ai-labs/lfr