diff --git a/README.md b/README.md index 3198398..2f0230f 100644 --- a/README.md +++ b/README.md @@ -48,16 +48,6 @@ is provided at the docstring of each class. See also [here](#approximate-duplicate-code-detection) for a command line tool. * [`treesitter.parser_for`](python/dpu_utils/codeutils/treesitter/parser.py) get [Tree-sitter](https://tree-sitter.github.io/tree-sitter/) parser by language name. -##### TensorFlow 1.x Utilities `dpu_utils.tfutils` -* [`get_activation`](python/dpu_utils/tfutils/activation.py) retrieve activations function by name. -* [`GradRatioLoggingOptimizer`](python/dpu_utils/tfutils/gradratiologgingoptimizer.py) a wrapper around optimizers that logs the ratios of grad norms to parameter norms. -* [`TFVariableSaver`](python/dpu_utils/tfutils/tfvariablesaver.py) save TF variables in an object that can be pickled. - -Unsorted segment operations following TensorFlow's [`unsorted_segment_sum`](https://www.tensorflow.org/api_docs/python/tf/math/unsorted_segment_sum) operations: -* [`unsorted_segment_logsumexp`](python/dpu_utils/tfutils/unsortedsegmentops.py) -* [`unsorted_segment_log_softmax`](python/dpu_utils/tfutils/unsortedsegmentops.py) -* [`unsorted_segment_softmax`](python/dpu_utils/tfutils/unsortedsegmentops.py) - ##### TensorFlow 2.x Utilities `dpu_utils.tf2utils` * [`get_activation_function_by_name`](python/dpu_utils/tf2utils/activation.py) retrieve activation functions by name. * [`gelu`](python/dpu_utils/tf2utils/activation.py) The GeLU activation function. @@ -69,18 +59,6 @@ Unsorted segment operations following TensorFlow's [`unsorted_segment_sum`](http * [`unsorted_segment_softmax`](python/dpu_utils/tf2utils/unsorted_segment_ops.py) -##### TensorFlow Models `dpu_utils.tfmodels` -* [`SparseGGNN`](python/dpu_utils/tfmodels/sparsegnn.py) a sparse GGNN implementation. -* [`AsyncGGNN`](python/dpu_utils/tfmodels/asyncgnn.py) an asynchronous GGNN implementation. - -These models have not been tested with TF 2.0. - -##### PyTorch Utilities `dpu_utils.ptutils` -* [`BaseComponent`](python/dpu_utils/ptutils/basecomponent.py) a wrapper abstract class around `nn.Module` that - takes care of essential elements of most neural network components. -* [`ComponentTrainer`](python/dpu_utils/ptutils/basecomponent.py) a training loop for `BaseComponent`s. - - ### Command-line tools #### Approximate Duplicate Code Detection diff --git a/__azurite_db_blob__.json b/__azurite_db_blob__.json new file mode 100644 index 0000000..d1cd487 --- /dev/null +++ b/__azurite_db_blob__.json @@ -0,0 +1 @@ +{"filename":"/mnt/c/Users/miallama/Documents/src/dpu-utils/__azurite_db_blob__.json","collections":[{"name":"$SERVICES_COLLECTION$","data":[],"idIndex":null,"binaryIndices":{},"constraints":null,"uniqueNames":["accountName"],"transforms":{},"objType":"$SERVICES_COLLECTION$","dirty":false,"cachedIndex":null,"cachedBinaryIndex":null,"cachedData":null,"adaptiveBinaryIndices":true,"transactional":false,"cloneObjects":false,"cloneMethod":"parse-stringify","asyncListeners":false,"disableMeta":false,"disableChangesApi":true,"disableDeltaChangesApi":true,"autoupdate":false,"serializableIndices":true,"disableFreeze":true,"ttl":null,"maxId":0,"DynamicViews":[],"events":{"insert":[],"update":[],"pre-insert":[],"pre-update":[],"close":[],"flushbuffer":[],"error":[],"delete":[null],"warning":[null]},"changes":[],"dirtyIds":[]},{"name":"$CONTAINERS_COLLECTION$","data":[{"accountName":"devstoreaccount1","name":"test1","properties":{"etag":"\"0x202253268606A80\"","lastModified":"2021-02-09T09:42:06.576Z","leaseStatus":"unlocked","leaseState":"available","hasImmutabilityPolicy":false,"hasLegalHold":false},"meta":{"revision":0,"created":1612863726585,"version":0},"$loki":1}],"idIndex":null,"binaryIndices":{"accountName":{"name":"accountName","dirty":false,"values":[0]},"name":{"name":"name","dirty":false,"values":[0]}},"constraints":null,"uniqueNames":[],"transforms":{},"objType":"$CONTAINERS_COLLECTION$","dirty":false,"cachedIndex":null,"cachedBinaryIndex":null,"cachedData":null,"adaptiveBinaryIndices":true,"transactional":false,"cloneObjects":false,"cloneMethod":"parse-stringify","asyncListeners":false,"disableMeta":false,"disableChangesApi":true,"disableDeltaChangesApi":true,"autoupdate":false,"serializableIndices":true,"disableFreeze":true,"ttl":null,"maxId":1,"DynamicViews":[],"events":{"insert":[],"update":[],"pre-insert":[],"pre-update":[],"close":[],"flushbuffer":[],"error":[],"delete":[null],"warning":[null]},"changes":[],"dirtyIds":[]},{"name":"$BLOBS_COLLECTION$","data":[],"idIndex":[],"binaryIndices":{"accountName":{"name":"accountName","dirty":false,"values":[]},"containerName":{"name":"containerName","dirty":false,"values":[]},"name":{"name":"name","dirty":false,"values":[]},"snapshot":{"name":"snapshot","dirty":false,"values":[]}},"constraints":null,"uniqueNames":[],"transforms":{},"objType":"$BLOBS_COLLECTION$","dirty":false,"cachedIndex":null,"cachedBinaryIndex":null,"cachedData":null,"adaptiveBinaryIndices":true,"transactional":false,"cloneObjects":false,"cloneMethod":"parse-stringify","asyncListeners":false,"disableMeta":false,"disableChangesApi":true,"disableDeltaChangesApi":true,"autoupdate":false,"serializableIndices":true,"disableFreeze":true,"ttl":null,"maxId":57,"DynamicViews":[],"events":{"insert":[],"update":[],"pre-insert":[],"pre-update":[],"close":[],"flushbuffer":[],"error":[],"delete":[null],"warning":[null]},"changes":[],"dirtyIds":[]},{"name":"$BLOCKS_COLLECTION$","data":[],"idIndex":null,"binaryIndices":{"accountName":{"name":"accountName","dirty":false,"values":[]},"containerName":{"name":"containerName","dirty":false,"values":[]},"blobName":{"name":"blobName","dirty":false,"values":[]},"name":{"name":"name","dirty":false,"values":[]}},"constraints":null,"uniqueNames":[],"transforms":{},"objType":"$BLOCKS_COLLECTION$","dirty":false,"cachedIndex":null,"cachedBinaryIndex":null,"cachedData":null,"adaptiveBinaryIndices":true,"transactional":false,"cloneObjects":false,"cloneMethod":"parse-stringify","asyncListeners":false,"disableMeta":false,"disableChangesApi":true,"disableDeltaChangesApi":true,"autoupdate":false,"serializableIndices":true,"disableFreeze":true,"ttl":null,"maxId":0,"DynamicViews":[],"events":{"insert":[],"update":[],"pre-insert":[],"pre-update":[],"close":[],"flushbuffer":[],"error":[],"delete":[null],"warning":[null]},"changes":[],"dirtyIds":[]}],"databaseVersion":1.5,"engineVersion":1.5,"autosave":true,"autosaveInterval":5000,"autosaveHandle":null,"throttledSaves":true,"options":{"autosave":true,"autosaveInterval":5000,"serializationMethod":"normal","destructureDelimiter":"$<\n"},"persistenceMethod":"fs","persistenceAdapter":null,"verbose":false,"events":{"init":[null],"loaded":[],"flushChanges":[],"close":[],"changes":[],"warning":[]},"ENV":"NODEJS"} \ No newline at end of file diff --git a/__azurite_db_blob_extent__.json b/__azurite_db_blob_extent__.json new file mode 100644 index 0000000..4ddf7d0 --- /dev/null +++ b/__azurite_db_blob_extent__.json @@ -0,0 +1 @@ +{"filename":"/mnt/c/Users/miallama/Documents/src/dpu-utils/__azurite_db_blob_extent__.json","collections":[{"name":"$EXTENTS_COLLECTION$","data":[{"id":"715987c7-5b32-48a5-8994-ec4200188d4a","locationId":"Default","path":"715987c7-5b32-48a5-8994-ec4200188d4a","size":33890,"lastModifiedInMS":1613484063683,"meta":{"revision":9,"created":1613484063684,"version":0,"updated":1613484064944},"$loki":2,"LastModifyInMS":1613484064944}],"idIndex":[2],"binaryIndices":{"id":{"name":"id","dirty":false,"values":[0]}},"constraints":null,"uniqueNames":[],"transforms":{},"objType":"$EXTENTS_COLLECTION$","dirty":false,"cachedIndex":null,"cachedBinaryIndex":null,"cachedData":null,"adaptiveBinaryIndices":true,"transactional":false,"cloneObjects":false,"cloneMethod":"parse-stringify","asyncListeners":false,"disableMeta":false,"disableChangesApi":true,"disableDeltaChangesApi":true,"autoupdate":false,"serializableIndices":true,"disableFreeze":true,"ttl":null,"maxId":2,"DynamicViews":[],"events":{"insert":[],"update":[],"pre-insert":[],"pre-update":[],"close":[],"flushbuffer":[],"error":[],"delete":[null],"warning":[null]},"changes":[],"dirtyIds":[]}],"databaseVersion":1.5,"engineVersion":1.5,"autosave":true,"autosaveInterval":5000,"autosaveHandle":null,"throttledSaves":true,"options":{"autosave":true,"autosaveInterval":5000,"serializationMethod":"normal","destructureDelimiter":"$<\n"},"persistenceMethod":"fs","persistenceAdapter":null,"verbose":false,"events":{"init":[null],"loaded":[],"flushChanges":[],"close":[],"changes":[],"warning":[]},"ENV":"NODEJS"} \ No newline at end of file diff --git a/__blobstorage__/715987c7-5b32-48a5-8994-ec4200188d4a b/__blobstorage__/715987c7-5b32-48a5-8994-ec4200188d4a new file mode 100644 index 0000000..cf06785 Binary files /dev/null and b/__blobstorage__/715987c7-5b32-48a5-8994-ec4200188d4a differ diff --git a/python/__azurite_db_blob__.json b/python/__azurite_db_blob__.json new file mode 100644 index 0000000..cdf1e1e --- /dev/null +++ b/python/__azurite_db_blob__.json @@ -0,0 +1 @@ +{"filename":"/mnt/c/Users/miallama/Documents/src/dpu-utils/python/__azurite_db_blob__.json","collections":[{"name":"$SERVICES_COLLECTION$","data":[],"idIndex":null,"binaryIndices":{},"constraints":null,"uniqueNames":["accountName"],"transforms":{},"objType":"$SERVICES_COLLECTION$","dirty":false,"cachedIndex":null,"cachedBinaryIndex":null,"cachedData":null,"adaptiveBinaryIndices":true,"transactional":false,"cloneObjects":false,"cloneMethod":"parse-stringify","asyncListeners":false,"disableMeta":false,"disableChangesApi":true,"disableDeltaChangesApi":true,"autoupdate":false,"serializableIndices":true,"disableFreeze":true,"ttl":null,"maxId":0,"DynamicViews":[],"events":{"insert":[],"update":[],"pre-insert":[],"pre-update":[],"close":[],"flushbuffer":[],"error":[],"delete":[null],"warning":[null]},"changes":[],"dirtyIds":[]},{"name":"$CONTAINERS_COLLECTION$","data":[{"accountName":"devstoreaccount1","name":"test1","properties":{"etag":"\"0x20DFF9BFFA6CE20\"","lastModified":"2021-02-16T15:31:20.786Z","leaseStatus":"unlocked","leaseState":"available","hasImmutabilityPolicy":false,"hasLegalHold":false},"meta":{"revision":0,"created":1613489480796,"version":0},"$loki":1}],"idIndex":null,"binaryIndices":{"accountName":{"name":"accountName","dirty":false,"values":[0]},"name":{"name":"name","dirty":false,"values":[0]}},"constraints":null,"uniqueNames":[],"transforms":{},"objType":"$CONTAINERS_COLLECTION$","dirty":false,"cachedIndex":null,"cachedBinaryIndex":null,"cachedData":null,"adaptiveBinaryIndices":true,"transactional":false,"cloneObjects":false,"cloneMethod":"parse-stringify","asyncListeners":false,"disableMeta":false,"disableChangesApi":true,"disableDeltaChangesApi":true,"autoupdate":false,"serializableIndices":true,"disableFreeze":true,"ttl":null,"maxId":1,"DynamicViews":[],"events":{"insert":[],"update":[],"pre-insert":[],"pre-update":[],"close":[],"flushbuffer":[],"error":[],"delete":[null],"warning":[null]},"changes":[],"dirtyIds":[]},{"name":"$BLOBS_COLLECTION$","data":[],"idIndex":[],"binaryIndices":{"accountName":{"name":"accountName","dirty":false,"values":[]},"containerName":{"name":"containerName","dirty":false,"values":[]},"name":{"name":"name","dirty":false,"values":[]},"snapshot":{"name":"snapshot","dirty":false,"values":[]}},"constraints":null,"uniqueNames":[],"transforms":{},"objType":"$BLOBS_COLLECTION$","dirty":false,"cachedIndex":null,"cachedBinaryIndex":null,"cachedData":null,"adaptiveBinaryIndices":true,"transactional":false,"cloneObjects":false,"cloneMethod":"parse-stringify","asyncListeners":false,"disableMeta":false,"disableChangesApi":true,"disableDeltaChangesApi":true,"autoupdate":false,"serializableIndices":true,"disableFreeze":true,"ttl":null,"maxId":11,"DynamicViews":[],"events":{"insert":[],"update":[],"pre-insert":[],"pre-update":[],"close":[],"flushbuffer":[],"error":[],"delete":[null],"warning":[null]},"changes":[],"dirtyIds":[]},{"name":"$BLOCKS_COLLECTION$","data":[],"idIndex":null,"binaryIndices":{"accountName":{"name":"accountName","dirty":false,"values":[]},"containerName":{"name":"containerName","dirty":false,"values":[]},"blobName":{"name":"blobName","dirty":false,"values":[]},"name":{"name":"name","dirty":false,"values":[]}},"constraints":null,"uniqueNames":[],"transforms":{},"objType":"$BLOCKS_COLLECTION$","dirty":false,"cachedIndex":null,"cachedBinaryIndex":null,"cachedData":null,"adaptiveBinaryIndices":true,"transactional":false,"cloneObjects":false,"cloneMethod":"parse-stringify","asyncListeners":false,"disableMeta":false,"disableChangesApi":true,"disableDeltaChangesApi":true,"autoupdate":false,"serializableIndices":true,"disableFreeze":true,"ttl":null,"maxId":0,"DynamicViews":[],"events":{"insert":[],"update":[],"pre-insert":[],"pre-update":[],"close":[],"flushbuffer":[],"error":[],"delete":[null],"warning":[null]},"changes":[],"dirtyIds":[]}],"databaseVersion":1.5,"engineVersion":1.5,"autosave":true,"autosaveInterval":5000,"autosaveHandle":null,"throttledSaves":true,"options":{"autosave":true,"autosaveInterval":5000,"serializationMethod":"normal","destructureDelimiter":"$<\n"},"persistenceMethod":"fs","persistenceAdapter":null,"verbose":false,"events":{"init":[null],"loaded":[],"flushChanges":[],"close":[],"changes":[],"warning":[]},"ENV":"NODEJS"} \ No newline at end of file diff --git a/python/__azurite_db_blob_extent__.json b/python/__azurite_db_blob_extent__.json new file mode 100644 index 0000000..6bd3ea8 --- /dev/null +++ b/python/__azurite_db_blob_extent__.json @@ -0,0 +1 @@ +{"filename":"/mnt/c/Users/miallama/Documents/src/dpu-utils/python/__azurite_db_blob_extent__.json","collections":[{"name":"$EXTENTS_COLLECTION$","data":[{"id":"3f3dd13c-ce2f-46c3-bf31-4ebd32f54594","locationId":"Default","path":"3f3dd13c-ce2f-46c3-bf31-4ebd32f54594","size":33890,"lastModifiedInMS":1613489480814,"meta":{"revision":9,"created":1613489480814,"version":0,"updated":1613489481821},"$loki":1,"LastModifyInMS":1613489481821}],"idIndex":[1],"binaryIndices":{"id":{"name":"id","dirty":false,"values":[0]}},"constraints":null,"uniqueNames":[],"transforms":{},"objType":"$EXTENTS_COLLECTION$","dirty":false,"cachedIndex":null,"cachedBinaryIndex":null,"cachedData":null,"adaptiveBinaryIndices":true,"transactional":false,"cloneObjects":false,"cloneMethod":"parse-stringify","asyncListeners":false,"disableMeta":false,"disableChangesApi":true,"disableDeltaChangesApi":true,"autoupdate":false,"serializableIndices":true,"disableFreeze":true,"ttl":null,"maxId":1,"DynamicViews":[],"events":{"insert":[],"update":[],"pre-insert":[],"pre-update":[],"close":[],"flushbuffer":[],"error":[],"delete":[null],"warning":[null]},"changes":[],"dirtyIds":[]}],"databaseVersion":1.5,"engineVersion":1.5,"autosave":true,"autosaveInterval":5000,"autosaveHandle":null,"throttledSaves":true,"options":{"autosave":true,"autosaveInterval":5000,"serializationMethod":"normal","destructureDelimiter":"$<\n"},"persistenceMethod":"fs","persistenceAdapter":null,"verbose":false,"events":{"init":[null],"loaded":[],"flushChanges":[],"close":[],"changes":[],"warning":[]},"ENV":"NODEJS"} \ No newline at end of file diff --git a/python/__blobstorage__/3f3dd13c-ce2f-46c3-bf31-4ebd32f54594 b/python/__blobstorage__/3f3dd13c-ce2f-46c3-bf31-4ebd32f54594 new file mode 100644 index 0000000..60634e0 Binary files /dev/null and b/python/__blobstorage__/3f3dd13c-ce2f-46c3-bf31-4ebd32f54594 differ diff --git a/python/dpu_utils/mlutils/vocabulary.py b/python/dpu_utils/mlutils/vocabulary.py index 9f58819..1814294 100644 --- a/python/dpu_utils/mlutils/vocabulary.py +++ b/python/dpu_utils/mlutils/vocabulary.py @@ -87,9 +87,9 @@ def __str__(self): def get_all_names(self) -> FrozenSet[str]: return frozenset(self.token_to_id.keys()) - def __batch_add_from_counter(self, token_counter: typing.Counter[str], count_threshold: int, max_size: int) -> None: - """Update dictionary with elements of the token_counter""" - for token, count in token_counter.most_common(max_size): + def __batch_add_from_counter(self, token_counter: typing.Counter[str], count_threshold: int, max_num_elements_to_add: int) -> None: + """Update dictionary with at most max_num_elements_to_add elements of the token_counter""" + for token, count in token_counter.most_common(max_num_elements_to_add): if count >= count_threshold: self.add_or_get_id(token) else: @@ -115,9 +115,14 @@ def create_vocabulary(tokens: Union[Iterable[str], typing.Counter[str]], max_siz vocab.__batch_add_from_counter(token_counter, count_threshold, max_size - num_base_tokens) return vocab - def update(self, token_counter: typing.Counter[str], max_size: int, count_threshold: int=5): - assert len(self) < max_size, 'Dictionary is already larger than max_size.' - self.__batch_add_from_counter(token_counter, count_threshold=count_threshold, max_size=max_size) + def update(self, token_counter: typing.Counter[str], max_num_elements_to_add: int, count_threshold: int=5) -> None: + """ + Update this Vocabulary instance with elements from the new token_counter. This will add at most + max_num_elements_to_add elements from token_counter with a count above the count_threshold. + Elements that already existed will maintain their id, but new ids may be added. + """ + self.__batch_add_from_counter(token_counter, count_threshold=count_threshold, + max_num_elements_to_add=max_num_elements_to_add) def get_empirical_distribution(self, elements: Iterable[str], dirichlet_alpha: float = 10.) -> np.ndarray: """Retrieve empirical distribution of elements.""" diff --git a/python/dpu_utils/ptutils/__init__.py b/python/dpu_utils/ptutils/__init__.py deleted file mode 100644 index d554ed4..0000000 --- a/python/dpu_utils/ptutils/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .basecomponent import BaseComponent -from .componenttrainer import ComponentTrainer, AbstractScheduler \ No newline at end of file diff --git a/python/dpu_utils/ptutils/basecomponent.py b/python/dpu_utils/ptutils/basecomponent.py deleted file mode 100644 index 68c8830..0000000 --- a/python/dpu_utils/ptutils/basecomponent.py +++ /dev/null @@ -1,372 +0,0 @@ -import gzip -import os -from abc import abstractmethod, ABC -from itertools import islice -from tempfile import TemporaryDirectory -from typing import Any, Dict, Optional, Tuple, Iterator, TypeVar, Generic - -import torch -from torch import nn -from typing_extensions import final # For backwards compatibility with Py < 3.8 - -from dpu_utils.utils import RichPath - -InputData = TypeVar('InputData') -TensorizedData = TypeVar('TensorizedData') - -__all__ = ['BaseComponent'] - -class BaseComponent(ABC, nn.Module, Generic[InputData, TensorizedData]): - """ - Implements the base class for neural network components in pyTorch. Each component has - a few basic functionalities that all components should implement. All components are also - PyTorch `nn.Module`s and hence maintain all of their methods. - - Specifically, - - * Metadata: All information that are needed to build the component that depend on the data. - For example, a vocabulary of common words, a list of classes, etc. - * Hyperparameters: All parameters that are not learned but reflect design decisions about - the component. - * Tensor Conversion: The way that any input data is converted into tensors that can then be - input into the model. For example, a sentence is converted into a (padded) list of integer - word ids. - * Minibatching: How multiple input data are aggregated into a minibatch. Commonly, this - is implemented as stacking of per-data tensors, but this is not always true. - * Neural model: Only a tiny part of a component usually. Accepts a minibatch of - tensor data (if any) along with input data from other components and computes some - output. - * Metrics: Computes any metrics that can be reported during training/testing. - * Device: Handle pyTorch's annoying aspects about device placement and store which device this - component is in. - - To implement your own component override: - * _init_component_metadata (optional) - * _load_metadata_from_sample (optional) - * _finalize_component_metadata_and_model (optional) - * load_data_from_sample (mandatory) - * initialize_minibatch (mandatory) - * extend_minibatch_by_sample (mandatory) - * finalize_minibatch (mandatory) - * _component_metrics (optional) - * _reset_component_metrics (optional; mandatory if overriding `_component_metrics`) - """ - - def __init__(self, name: str, hyperparameters: Optional[Dict[str, Any]] = None): - super(BaseComponent, self).__init__() - self.__name = name - self.__metadata_initialized = False - self.__metadata_finalized = False - self.__set_hyperparameters(hyperparameters) - - @property - @final - def name(self) -> str: - return self.__name - - # region Metadata Loading - @final - def init_metadata(self) -> None: - """ - Initialize metadata recursively for all children components, by invoking - _init_component_metadata() for this component and all its children. - """ - if not self.__metadata_initialized: - # Initialize metadata for all children - for child_module in self.children(): - if isinstance(child_module, BaseComponent): - child_module.init_metadata() - - self._init_component_metadata() - self.__metadata_initialized = True - - def _init_component_metadata(self) -> None: - """ - Initialize the component's metadata. This usually initializes a set of temporary objects that will be - updated by `_load_metadata_from_sample` and converted to the final metadata by `_finalize_component_metadata_and_model`. - - For example, a component might create a token Counter at this stage, then update the counter from - `load_metadata_from_sample` and create the vocabulary at `finalize_metadata`. - """ - pass - - @final - def load_metadata_from_sample(self, data_to_load: InputData) -> None: - """ - Accept a single data point and load any metadata-related information from it. - """ - assert self.__metadata_initialized, 'Metadata is not initialized' - if not self.__metadata_finalized: - self._load_metadata_from_sample(data_to_load) - - @abstractmethod - def _load_metadata_from_sample(self, data_to_load: InputData) -> None: - """ - Accept a single data point and load any metadata-related information from it. - - Implementors of this function should: - * Load any metadata that are required by the component. - * Unpack any parts of the parts of `data_to_load` that are needed for the children components - and invoke their `load_metadata_from_sample` for those components. - - This means that for any child component the load_metadata_from_sample needs to be *explicitly* invoked. - - :param data_to_load: data relevant to this component and its children from a single data point. - """ - pass - - @final - def finalize_metadata_and_model(self) -> None: - """ - Compute the final metadata that this component will be using. - Recursively finalize the metadata for all children too. - """ - if not self.__metadata_finalized: - for child_module in self.children(): - if isinstance(child_module, BaseComponent): - child_module.finalize_metadata_and_model() - - self._finalize_component_metadata_and_model() - self.__metadata_finalized = True - - def _finalize_component_metadata_and_model(self) -> None: - """ - Finalize the metadata that this component will contain. - - Note to implementors: children component's metadata will have already been finalized when - this function is called and thus they may be used. - """ - pass - - # endregion - - # region Hyperparameters - @classmethod - @abstractmethod - def default_hyperparameters(cls) -> Dict[str, Any]: - """ - :return: the default hyperparameters of this component. - """ - pass - - @final - def __set_hyperparameters(self, component_hyperparameters: Optional[Dict[str, Any]]) -> None: - """ - Set the component hyperparameters. - """ - self.__hyperparameters_dict = self.default_hyperparameters() - if component_hyperparameters is not None: - self.__hyperparameters_dict.update(component_hyperparameters) - - @final - def get_hyperparameter(self, name: str) -> Any: - if name in self.__hyperparameters_dict: - return self.__hyperparameters_dict[name] - return self.default_hyperparameters()[name] - - @final - @property - def __hyperparameters(self) -> Dict[str, Any]: - if not hasattr(self, '__hyperparameters_dict'): - self.__hyperparameters_dict = self.default_hyperparameters() - return self.__hyperparameters_dict - - @final - @property - def hyperparameters(self) -> Dict[str, Any]: - """ - Get the hyperparameters of the component and its children. - """ - hypers = {self.__name: dict(self.__hyperparameters_dict)} - for child_module in self.children(): - if isinstance(child_module, BaseComponent): - hypers[self.__name][child_module.__name] = child_module.hyperparameters - return hypers - - # endregion - - # region Device Utilities - @property - def device(self): - """Retrieve the device where this component lives.""" - return self.__device - - @final - def to(self, *args, **kwargs): - super(BaseComponent, self).to(*args, **kwargs) - # Ugly but seemingly necessary hack: implicit dependency on non-exposed interface. - device, _, _, _ = torch._C._nn._parse_to(*args, **kwargs) - self.__device = device - for child_module in self.children(): - child_module.to(*args, **kwargs) - - @final - def cuda(self, device=None): - """Move the component to a GPU.""" - super(BaseComponent, self).cuda(device=device) - self.__device = device or 'cuda:0' - for child_module in self.children(): - child_module.cuda(device=device) - - @final - def cpu(self): - """Move the component to the CPU.""" - super(BaseComponent, self).cpu() - self.__device = 'cpu' - for child_module in self.children(): - child_module.cpu() - - # endregion - - # region Tensor Conversion - @abstractmethod - def load_data_from_sample(self, data_to_load: InputData) -> Optional[TensorizedData]: - """ - This is called to load the data (tensorize) from a single example in a form that can be consumed by the - neural network. - - Note to implementors: this usually involves unpacking data_to_load and invoking children component's - load_data_from_sample so that each component loads parts of the data it cares about and then composing - them into a single object along with any extra information. - - :param data_to_load: The data sample to be loaded. - :return: A data structure that contains the tensorized data for the given sample - or None if the datapoint should be rejected. - """ - pass - - # endregion - - # region Minibatching Logic - @abstractmethod - def initialize_minibatch(self) -> Dict[str, Any]: - """ - Initialize a dictionary that will be populated by `extend_minibatch_by_sample`. - """ - pass - - @abstractmethod - def extend_minibatch_by_sample(self, datapoint: TensorizedData, accumulated_minibatch_data: Dict[str, Any]) -> bool: - """ - Add a datapoint to the minibatch. If for some component-related reason the minibatch cannot accumulate - additional samples, this function should return False. - - :param datapoint: the datapoint to be added. This is a what `load_data_from_sample` returns. - :param accumulated_minibatch_data: the minibatch data to be populated. - :return true if we can continue extending the minibatch. False if for some reason the minibatch is full. - """ - pass - - @abstractmethod - def finalize_minibatch(self, accumulated_minibatch_data: Dict[str, Any]) -> Dict[str, Any]: - """ - Finalize the minibatch data and make sure that the data is in an appropriate format to be consumed by - the model. Commonly the values of the returned dictionary are `torch.tensor(..., device=device)`. - - :param accumulated_minibatch_data: the data that has been accumulated by `extend_minibatch_by_sample`. - :return: the dictionary that will be passed as `**kwargs` to this component `forward()` - """ - pass - - @final - def create_minibatch(self, data_iterator_to_consume: Iterator[TensorizedData], max_num_items: int) -> \ - Tuple[Dict[str, Any], bool, int]: - """ - Creates a minibatch from a finalized minibatch. - - :return: the data of the minibatch, a bool indicating whether the data iterator was fully consumed and - the number of elements in the minibatch - """ - mb_data = self.initialize_minibatch() - num_elements_added = 0 - for element in islice(data_iterator_to_consume, max_num_items): - continue_extending = self.extend_minibatch_by_sample(element, mb_data) - num_elements_added += 1 - if not continue_extending: - # The implementation of the component asked to stop extending the minibatch. - batch_is_full = True - break - else: - # At this point, the batch is full if we finished iterating through the loop and have max_num_items - batch_is_full = num_elements_added == max_num_items - - if num_elements_added == 0: - assert not batch_is_full, 'The data iterator was not exhausted but zero items were returned.' - return {}, True, 0 - - return self.finalize_minibatch(mb_data), batch_is_full, num_elements_added - - # endregion - - # region Component Loading/Unloading - @final - def save(self, path: RichPath) -> None: - """Save the model at a given location.""" - with TemporaryDirectory() as tmpdir: - target_file = os.path.join(tmpdir, 'model.pkl.gz') - with gzip.open(target_file, 'wb') as f: - torch.save(self, f) - path.copy_from(RichPath.create(target_file)) - - @classmethod - def restore_model(cls, path: RichPath, device=None) -> 'BaseComponent': - """Restore model to a given device.""" - model_path = path.to_local_path().path - with gzip.open(model_path, 'rb') as f: - model = torch.load(f, map_location=device) # type: BaseComponent - if device is not None: - model.to(device) - return model - - # endregion - - # region Model Statistics and Metrics - @final - def report_metrics(self) -> Dict[str, Any]: - """ - Report the collected metrics for this component and its children. - - Each component can internally collect its own metrics as the implementor sees fit. For example, - a counter may be incremented when the `forward()` function is invoked or a running average may - by updated when a loss is computed. The metrics counter can be reset outside of the component - when `reset_metrics` is invoked. - - To add metrics to a component, implementors need to: - * Implement `_component_metrics` that computes the reported metrics from any component-internal variables. - * Implement `_reset_component_metrics` which resets any variables that compute metrics. - * Store any metric-related variables as fields in their component. - - """ - metrics = self._component_metrics() - for child_module in self.children(): - if isinstance(child_module, BaseComponent): - child_metrics = child_module._component_metrics() - if len(child_metrics) > 0: - metrics[child_module.__name] = child_metrics - return metrics - - @final - def reset_metrics(self) -> None: - """Reset any reported metrics. Often called after report_metrics() to reset any counters etc.""" - self._reset_component_metrics() - for child_module in self.children(): - if isinstance(child_module, BaseComponent): - child_module._reset_component_metrics() - - def _component_metrics(self) -> Dict[str, Any]: - """ - Return a dictionary of metrics for the current component. - - The key is the name of the metric as it will be appear reported. - The value can be anything, but using a formatted string may often be the preferred choice. - """ - return {} - - def _reset_component_metrics(self) -> None: - """Reset any metrics related to the component, such as any counters, running sums, averages, etc.""" - pass - - def num_parameters(self) -> int: - """Compute the number of trainable parameters in this component and its children.""" - return sum(param.numel() for param in self.parameters(recurse=True) if param.requires_grad) - # endregion diff --git a/python/dpu_utils/ptutils/componenttrainer.py b/python/dpu_utils/ptutils/componenttrainer.py deleted file mode 100644 index 78da31b..0000000 --- a/python/dpu_utils/ptutils/componenttrainer.py +++ /dev/null @@ -1,276 +0,0 @@ -import json -import logging -import time -from abc import ABC, abstractmethod -import math -from typing import Optional, Iterable, Set, TypeVar, Generic, Callable, List, Dict, Iterator, Tuple, Union, Any - -import torch -from tqdm import tqdm - -from .basecomponent import BaseComponent -from dpu_utils.utils import RichPath, ThreadedIterator -from dpu_utils.utils.iterators import shuffled_iterator - -InputData = TypeVar('InputData') -TensorizedData = TypeVar('TensorizedData') -EndOfEpochHook = Callable[[BaseComponent, int, Dict], None] - -__all__ = ['ComponentTrainer', 'AbstractScheduler'] - -class AbstractScheduler(ABC): - @abstractmethod - def step(self, epoch_idx: int, epoch_step: int)-> None: - pass - -class ComponentTrainer(Generic[InputData, TensorizedData]): - """ - A trainer for `BaseComponent`s. Used mainly for supervised learning. - - Create a `ComponentTrainer` by passing a `BaseComponent` in the constructor. - Invoke `train()` to initiate the training loop. - - """ - - LOGGER = logging.getLogger('ComponentTrainer') - - def __init__(self, model: BaseComponent[InputData, TensorizedData], save_location: RichPath, - *, max_num_epochs: int = 200, minibatch_size: int = 200, - optimizer_creator: Optional[Callable[[Iterable[torch.Tensor]], torch.optim.Optimizer]]=None, - scheduler_creator: Optional[Callable[[torch.optim.Optimizer], AbstractScheduler]]=None): - """ - - :param model: The model to be trained. - :param save_location: The location where the trained model will be checkpointed and saved. - :param max_num_epochs: The maximum number of epochs to run training for. - :param minibatch_size: The maximum size of the minibatch (`BaseComponent`s can override this - by detecting full minibatches and returning False in `extend_minibatch_by_sample`) - :param optimizer_creator: An optional function that accepts an iterable of the training parameters - (pyTorch tensors) and returns a PyTorch optimizer. - :param scheduler_creator: An optional function that accepts an optimizer and creates a scheduler - implementing `AbstractScheduler`. This could be a wrapper for existing learning schedulers. - The scheduler will be invoked at after each training step. - """ - self.__model = model - self.__save_location = save_location - assert save_location.path.endswith('.pkl.gz'), 'All models are stored as .pkl.gz. Please indicate this in the save_location.' - - self.__max_num_epochs = max_num_epochs - self.__minibatch_size = minibatch_size - if optimizer_creator is None: - self.__create_optimizer = torch.optim.Adam - else: - self.__create_optimizer = optimizer_creator - - self.__create_scheduler = scheduler_creator - - self.__train_epoch_end_hooks = [] # type: List[EndOfEpochHook] - self.__validation_epoch_end_hooks = [] # type: List[EndOfEpochHook] - self.__metadata_finalized_hooks = [] # type: List[Callable[[BaseComponent], None]] - self.__training_start_hooks = [] # type: List[Callable[[BaseComponent, torch.optim.Optimizer], None]] - - @property - def model(self) -> BaseComponent[InputData, TensorizedData]: - return self.__model - - def __load_metadata(self, training_data: Iterable[InputData]) -> None: - """ - Ask all components of the model to compute their metadata by doing a full pass over the training data. - """ - self.__model.init_metadata() - for element in training_data: - self.__model.load_metadata_from_sample(element) - self.__model.finalize_metadata_and_model() - self.LOGGER.info('Model metadata loaded. The following model was created:\n %s', self.__model) - self.LOGGER.info('Hyperparameters:\n %s', json.dumps(self.__model.hyperparameters, indent=2)) - for hook in self.__metadata_finalized_hooks: - hook(self.__model) - - def __save_current_model(self) -> None: - self.__model.save(self.__save_location) - - def restore_model(self, device=None) -> None: - self.__model = BaseComponent.restore_model(self.__save_location, device=device) - - def register_train_epoch_end_hook(self, hook: EndOfEpochHook) -> None: - self.__train_epoch_end_hooks.append(hook) - - def register_validation_epoch_end_hook(self, hook: EndOfEpochHook) -> None: - self.__validation_epoch_end_hooks.append(hook) - - def register_model_metadata_finalized_hook(self, hook: Callable[[BaseComponent], None]) -> None: - self.__metadata_finalized_hooks.append(hook) - - def register_training_start_hook(self, hook: Callable[[BaseComponent, torch.optim.Optimizer], None]) -> None: - self.__training_start_hooks.append(hook) - - def train(self, training_data: Iterable[InputData], validation_data: Iterable[InputData], - show_progress_bar: bool = True, patience: int = 5, initialize_metadata: bool = True, - exponential_running_average_factor: float = 0.97, get_parameters_to_freeze: Optional[Callable[[], Set]] = None, - parallel_minibatch_creation: bool=False, device: Optional[Union[str, torch.device]] = None) -> None: - """ - The training-validation loop for `BaseComponent`s. - - :param training_data: An iterable that each iteration yields the full training data. - :param validation_data: An iterable that each iteration yields the full validation data. - :param show_progress_bar: Show a progress bar - :param patience: The number of iterations before early stopping kicks in. - :param initialize_metadata: If true, initialize the metadata from the training_data. Otherwise, - assume that the model that is being trained has its metadata already initialized. - :param exponential_running_average_factor: The factor of the running average of the training loss - displayed in the progress bar. - :param get_parameters_to_freeze: The (optional) callable that returns the set of parameters to freeze during training. - :param parallel_minibatch_creation: If True the minibatches will be created in a separate thread. - """ - if initialize_metadata: - self.__load_metadata(training_data) - - self.LOGGER.info('Model has %s parameters', self.__model.num_parameters()) - self.LOGGER.debug('Data Tensorization Started...') - - def data_to_tensor_iterator(data): - for datapoint in data: - tensorized_datapoint = self.__model.load_data_from_sample(datapoint) - if tensorized_datapoint is not None: - yield tensorized_datapoint - - def training_tensors(): - yield from ThreadedIterator( - original_iterator=data_to_tensor_iterator(training_data), - max_queue_size=10 * self.__minibatch_size - ) - - def validation_tensors(): - yield from ThreadedIterator( - original_iterator=data_to_tensor_iterator(validation_data), - max_queue_size=10 * self.__minibatch_size - ) - - def minibatch_iterator(data_iterator: Iterator[TensorizedData], return_partial_minibatches: bool = False) -> Tuple[Dict, int]: - while True: - mb_data, batch_is_full, num_elements = self.__model.create_minibatch(data_iterator, max_num_items=self.__minibatch_size) - if num_elements == 0: - break - elif not batch_is_full and not return_partial_minibatches: - break # Do not return partial minibatches when the iterator is exhausted. - else: - yield mb_data, num_elements - - if device is None: - device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') - self.__model.to(device) - self.LOGGER.info('Using %s for training.' % device) - - - if get_parameters_to_freeze is None: - get_parameters_to_freeze = lambda: set() - trainable_parameters = set(self.__model.parameters()) - get_parameters_to_freeze() - optimizer = self.__create_optimizer(trainable_parameters) - scheduler = None if self.__create_scheduler is None else self.__create_scheduler(optimizer) - - for hook in self.__training_start_hooks: - hook(self.__model, optimizer) - - best_loss = float('inf') # type: float - num_epochs_not_improved = 0 # type: int - for epoch in range(self.__max_num_epochs): - self.__model.train() - - data_iter = shuffled_iterator(training_tensors()) - sum_epoch_loss = 0.0 - running_avg_loss = 0.0 - num_minibatches = 0 - num_samples = 0 - - start_time = time.time() - self.__model.reset_metrics() - with tqdm(desc='Training', disable=not show_progress_bar, leave=False) as progress_bar: - for step_idx, (mb_data, num_elements) in enumerate(ThreadedIterator( - minibatch_iterator(data_iter, return_partial_minibatches=False), - enabled=parallel_minibatch_creation)): - optimizer.zero_grad() - mb_loss = self.__model(**mb_data) - mb_loss.backward() - - optimizer.step() - if scheduler is not None: - scheduler.step(epoch_idx=epoch, epoch_step=step_idx) - - loss = float(mb_loss.cpu()) - if math.isnan(loss): - raise Exception('Training Loss has a NaN value.') - - sum_epoch_loss += loss - num_minibatches += 1 - num_samples += num_elements - - if num_minibatches == 1: # First minibatch - running_avg_loss = loss - else: - running_avg_loss = exponential_running_average_factor * running_avg_loss + ( - 1 - exponential_running_average_factor) * loss - progress_bar.update() - progress_bar.set_postfix(Loss=f'{running_avg_loss:.2f}') - - elapsed_time = time.time() - start_time # type: float - self.LOGGER.info('Training complete in %.1fsec [%.2f samples/sec]', elapsed_time, - (num_samples / elapsed_time)) - assert num_minibatches > 0, 'No training minibatches were created. The minibatch size may be too large or the training dataset size too small.' - self.LOGGER.info('Epoch %i: Avg Train Loss %.2f', epoch + 1, sum_epoch_loss / num_minibatches) - train_metrics = self.__model.report_metrics() - for epoch_hook in self.__train_epoch_end_hooks: - epoch_hook(self.__model, epoch, train_metrics) - if len(train_metrics) > 0: - self.LOGGER.info('Training Metrics: %s', json.dumps(train_metrics, indent=2)) - - # Now do validation! - self.__model.eval() - data_iter = validation_tensors() - sum_epoch_loss = 0 - num_minibatches = 0 - num_samples = 0 - start_time = time.time() - self.__model.reset_metrics() - with tqdm(desc='Validation', disable=not show_progress_bar, leave=False) as progress_bar, torch.no_grad(): - for mb_data, num_elements in ThreadedIterator( - minibatch_iterator(data_iter, return_partial_minibatches=True), - enabled=parallel_minibatch_creation): - mb_loss = self.__model(**mb_data) - - loss = float(mb_loss.cpu()) - if math.isnan(loss): - raise Exception('Validation Loss has a NaN value.') - - sum_epoch_loss += loss - num_minibatches += 1 - num_samples += num_elements - - progress_bar.update() - progress_bar.set_postfix(Loss=f'{sum_epoch_loss / num_minibatches:.2f}') - - elapsed_time = time.time() - start_time - assert num_samples > 0, 'No validation data was found.' - validation_loss = sum_epoch_loss / num_minibatches - self.LOGGER.info('Validation complete in %.1fsec [%.2f samples/sec]', elapsed_time, - (num_samples / elapsed_time)) - self.LOGGER.info('Epoch %i: Avg Valid Loss %.2f', epoch + 1, validation_loss) - validation_metrics = self.__model.report_metrics() - for epoch_hook in self.__validation_epoch_end_hooks: - epoch_hook(self.__model, epoch, validation_metrics) - if len(validation_metrics) > 0: - self.LOGGER.info('Validation Metrics: %s', json.dumps(validation_metrics, indent=2)) - - if validation_loss < best_loss: - self.LOGGER.info('Best loss so far --- Saving model.') - num_epochs_not_improved = 0 - self.__save_current_model() - best_loss = validation_loss - else: - num_epochs_not_improved += 1 - if num_epochs_not_improved > patience: - self.LOGGER.warning('After %s epochs loss has not improved. Stopping.', num_epochs_not_improved) - break - - - # Restore the best model that was found. - self.restore_model() diff --git a/python/dpu_utils/tfmodels/__init__.py b/python/dpu_utils/tfmodels/__init__.py deleted file mode 100755 index 5a15345..0000000 --- a/python/dpu_utils/tfmodels/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .sparsegnn import SparseGGNN -from .asyncgnn import AsyncGGNN diff --git a/python/dpu_utils/tfmodels/asyncgnn.py b/python/dpu_utils/tfmodels/asyncgnn.py deleted file mode 100755 index 3b9ac18..0000000 --- a/python/dpu_utils/tfmodels/asyncgnn.py +++ /dev/null @@ -1,258 +0,0 @@ -from typing import List - -import numpy as np -import tensorflow as tf - -from dpu_utils.tfutils import get_activation - - -class AsyncGGNN(object): - @classmethod - def default_params(cls): - return { - 'hidden_size': 128, - 'edge_label_size': 16, - - 'propagation_rounds': 4, # Has to be an even number - 'propagation_substeps': 15, # This is the maximal number of considered substeps - - 'graph_rnn_cell': 'GRU', # GRU or RNN - 'graph_rnn_activation': 'tanh', # tanh, ReLU - - 'use_edge_bias': False, - 'num_labeled_edge_types': 1, - 'num_unlabeled_edge_types': 4, - } - - def __init__(self, hyperparams): - self.hyperparams = hyperparams - self.num_labeled_edge_types = self.hyperparams['num_labeled_edge_types'] - self.num_unlabeled_edge_types = self.hyperparams['num_unlabeled_edge_types'] - self.num_edge_types = self.num_labeled_edge_types + self.num_unlabeled_edge_types - - self.__parameters = {} - self.__make_parameters() - - @property - def parameters(self): - return self.__parameters - - def __make_parameters(self): - activation_name = self.hyperparams['graph_rnn_activation'].lower() - activation_fun = get_activation(activation_name) - - h_dim = self.hyperparams['hidden_size'] - e_dim = self.hyperparams['edge_label_size'] - self.__parameters['labeled_edge_weights'] = [tf.get_variable(name='labeled_edge_weights_typ%i' % e_typ, - shape=[h_dim + e_dim, h_dim], - initializer=tf.glorot_uniform_initializer()) - for e_typ in range(self.num_labeled_edge_types)] - self.__parameters['unlabeled_edge_weights'] = [tf.get_variable(name='unlabeled_edge_weights_typ%i' % e_typ, - shape=[h_dim, h_dim], - initializer=tf.glorot_uniform_initializer()) - for e_typ in range(self.num_unlabeled_edge_types)] - - if self.hyperparams['use_edge_bias']: - self.__parameters['labeled_edge_biases'] = [tf.Variable(np.zeros([h_dim], dtype=np.float32), - name='labeled_edge_biases_typ%i' % e_typ) - for e_typ in range(self.num_labeled_edge_types)] - self.__parameters['unlabeled_edge_biases'] = [tf.Variable(np.zeros([h_dim], dtype=np.float32), - name='edge_biases_typ%i' % e_typ) - for e_typ in range(self.num_unlabeled_edge_types)] - - cell_type = self.hyperparams['graph_rnn_cell'].lower() - if cell_type == 'gru': - cell = tf.nn.rnn_cell.GRUCell(h_dim, activation=activation_fun) - elif cell_type == 'rnn': - cell = tf.nn.rnn_cell.BasicRNNCell(h_dim, activation=activation_fun) - else: - raise Exception("Unknown RNN cell type '%s'." % cell_type) - - self.__parameters['rnn_cell'] = cell - - def async_ggnn_layer(self, - initial_node_representation: tf.Tensor, - initial_nodes: List[tf.Tensor], - sending_nodes: List[List[List[tf.Tensor]]], - edge_labels: List[List[List[tf.Tensor]]], - msg_targets: List[List[tf.Tensor]], - receiving_nodes: List[List[tf.Tensor]], - receiving_node_num: List[tf.Tensor]) -> tf.Tensor: - """ - Run through an async GGNN and return the representations of all nodes. - :param initial_node_representation: the initial embeddings of the nodes. - Shape: [-1, h_dim] - :param initial_nodes: List of node id tensors I_{r}: Node IDs that will have no incoming edges in round r. - Inner Tensor Shape: [-1] - :param sending_nodes: List of lists of lists of sending nodes S_{r,s,e}: Source node ids of edges of type e - propagating in step s of round r. By convention, 0..self.num_labeled_edges are labeled - edge types, and self.num_labeled_edges.. are unlabeled edge types. - Restrictions: If v in S_{r,s,e}, then v in R_{r,s'} for s' < s or v in I_{r}. - Inner Tensor Shape: [-1] - :param edge_labels: List of lists of lists of (embeddings of) labels of edges L_{r,s,e}: Labels of edges of type - e propagating in step s of round r. - Restrictions: len(L_{r,s,e}) = len(S_{r,s,e}) - Inner Tensor Shape: [-1, e_dim] - :param msg_targets: List of lists of normalised edge target nodes T_{r,s}: Targets of edges propagating in step - s of round r, normalised to a continuous range starting from 0. - This is used for aggregating messages from the sending nodes. - Inner Tensor Shape: [-1] - :param receiving_nodes: List of lists of receiving nodes R_{r,s}: Target node ids of aggregated messages in - propagation step s of round r. - Restrictions: If v in R_{r,s}, v not in R_{r,s'} for all s' != s and v not in I_{r}. - Inner Tensor Shape: [-1] - :param receiving_node_num: Number of receiving nodes N_{r,s} - Restrictions: N_{r,s} = len(R_{r,s}) - Inner Tensor Shape: [|Substeps|] - :return: representations of all nodes after propagation according to schedule. Shape: [-1, h_dim] - """ - with tf.variable_scope('async_ggnn'): - cur_node_states = initial_node_representation - - for prop_round in range(self.hyperparams['propagation_rounds']): - with tf.variable_scope('prop_round%i' % (prop_round,)): - # ---- Declare and fill tensor arrays used in tf.while_loop: - sending_nodes_ta = tf.TensorArray( - tf.int32, - infer_shape=False, - element_shape=[None], - size=self.hyperparams['propagation_substeps'] * self.num_edge_types, - name='sending_nodes' - ) - edge_labels_ta = tf.TensorArray( - tf.float32, - infer_shape=False, - element_shape=[None, self.hyperparams['edge_label_size']], - size=self.hyperparams['propagation_substeps'] * self.num_labeled_edge_types, - name='edge_labels' - ) - msg_targets_ta = tf.TensorArray(tf.int32, - infer_shape=False, - element_shape=[None], - size=self.hyperparams['propagation_substeps'], - name='msg_targets') - receiving_nodes_ta = tf.TensorArray(tf.int32, - infer_shape=False, - element_shape=[None], - size=self.hyperparams['propagation_substeps'], - clear_after_read=False, - name='receiving_nodes') - receiving_node_num_ta = tf.TensorArray(tf.int32, - infer_shape=False, - element_shape=[], - size=self.hyperparams['propagation_substeps'], - name='receiving_nodes_num') - - for step in range(self.hyperparams['propagation_substeps']): - for labeled_edge_typ in range(self.num_labeled_edge_types): - sending_nodes_ta = sending_nodes_ta.write(step * self.num_edge_types + labeled_edge_typ, - sending_nodes[prop_round][step][labeled_edge_typ]) - edge_labels_ta = edge_labels_ta.write(step * self.num_labeled_edge_types + labeled_edge_typ, - edge_labels[prop_round][step][labeled_edge_typ]) - for unlabeled_edge_typ in range(self.num_unlabeled_edge_types): - shifted_edge_typ = self.num_labeled_edge_types + unlabeled_edge_typ - sending_nodes_ta = sending_nodes_ta.write(step * self.num_edge_types + shifted_edge_typ, - sending_nodes[prop_round][step][shifted_edge_typ]) - msg_targets_ta = msg_targets_ta.write(step, msg_targets[prop_round][step]) - receiving_nodes_ta = receiving_nodes_ta.write(step, receiving_nodes[prop_round][step]) - receiving_node_num_ta = receiving_node_num_ta.unstack(receiving_node_num[prop_round]) - - new_node_states_ta = tf.TensorArray(tf.float32, - infer_shape=False, - element_shape=[self.hyperparams['hidden_size']], - size=tf.shape(cur_node_states)[0], - clear_after_read=False, - name='new_node_states') - - # ---- Actual propagation schedule implementation: - # Initialize the initial nodes with their state from last round: - new_node_states_ta = new_node_states_ta.scatter(initial_nodes[prop_round], - tf.gather(cur_node_states, initial_nodes[prop_round])) - - def do_substep(substep_id, new_node_states_ta): - # For each edge active in this substep, pull source state and transform: - sending_states_per_edge_type = [] - edge_labels_per_type = [] - for labeled_edge_typ in range(self.num_labeled_edge_types): - sending_states_per_edge_type.append( - new_node_states_ta.gather(sending_nodes_ta.read( - substep_id * self.num_edge_types + labeled_edge_typ - )) - ) - edge_labels_per_type.append(edge_labels_ta.read( - substep_id * self.num_labeled_edge_types + labeled_edge_typ - )) - for unlabeled_edge_typ in range(self.num_unlabeled_edge_types): - shifted_edge_typ = self.num_labeled_edge_types + unlabeled_edge_typ - sending_states_per_edge_type.append(new_node_states_ta.gather( - sending_nodes_ta.read(substep_id * self.num_edge_types + shifted_edge_typ) - )) - - # Collect old states for receiving nodes - substep_receiving_nodes = receiving_nodes_ta.read(substep_id) - old_receiving_node_states = tf.gather(cur_node_states, substep_receiving_nodes) - old_receiving_node_states.set_shape([None, self.hyperparams['hidden_size']]) - - msg_targets_this_step = msg_targets_ta.read(substep_id) - receiving_node_num_this_step = receiving_node_num_ta.read(substep_id) - - substep_new_node_states = self.propagate_one_step( - sending_states_per_edge_type, edge_labels_per_type, - msg_targets_this_step, receiving_node_num_this_step, - old_receiving_node_states - ) - - # Write updated states back: - new_node_states_ta = new_node_states_ta.scatter(indices=substep_receiving_nodes, - value=substep_new_node_states, - name="state_scatter_round%i" % (prop_round,)) - return substep_id + 1, new_node_states_ta - - def is_done(substep_id, new_node_states_ta_unused): - return tf.logical_and(substep_id < self.hyperparams['propagation_substeps'], - tf.greater(tf.shape(receiving_nodes_ta.read(substep_id))[0], 0)) - - _, new_node_states_ta = tf.while_loop(cond=is_done, - body=do_substep, - loop_vars=[tf.constant(0), new_node_states_ta] - ) - - cur_node_states = new_node_states_ta.stack(name="state_stack_round%i" % (prop_round,)) - - return cur_node_states - - def propagate_one_step(self, - sending_states_per_edge_type: List[tf.Tensor], - edge_labels_per_type: List[tf.Tensor], - msg_targets_this_step: tf.Tensor, - receiving_node_num_this_step: tf.Tensor, - old_receiving_node_states: tf.Tensor) -> tf.Tensor: - sent_messages = [] - for (edge_typ, sending_state_representations) in enumerate(sending_states_per_edge_type): - if edge_typ < self.num_labeled_edge_types: - - messages = tf.matmul(tf.concat([sending_state_representations, edge_labels_per_type[edge_typ]], - axis=-1), - self.__parameters['labeled_edge_weights'][edge_typ]) - if self.hyperparams['use_edge_bias']: - messages += self.__parameters['labeled_edge_biases'][edge_typ] - else: - shifted_edge_typ = edge_typ - self.num_labeled_edge_types - messages = tf.matmul( - sending_state_representations, self.__parameters['unlabeled_edge_weights'][shifted_edge_typ] - ) - if self.hyperparams['use_edge_bias']: - messages += self.__parameters['unlabeled_edge_biases'][shifted_edge_typ] - sent_messages.append(messages) - - # Stack all edge messages and aggregate as sum for each receiving node: - sent_messages = tf.concat(sent_messages, axis=0) - aggregated_received_messages = tf.unsorted_segment_sum( - sent_messages, msg_targets_this_step, receiving_node_num_this_step - ) - - # Combine old states in RNN cell with incoming messages - aggregated_received_messages.set_shape([None, self.hyperparams['hidden_size']]) - new_node_states = self.__parameters['rnn_cell'](aggregated_received_messages, - old_receiving_node_states)[1] - return new_node_states diff --git a/python/dpu_utils/tfmodels/sparsegnn.py b/python/dpu_utils/tfmodels/sparsegnn.py deleted file mode 100644 index 4dcafa3..0000000 --- a/python/dpu_utils/tfmodels/sparsegnn.py +++ /dev/null @@ -1,239 +0,0 @@ -from typing import List, Optional, Dict, Any -from collections import namedtuple - -import tensorflow as tf - -from dpu_utils.tfutils import unsorted_segment_log_softmax - -from dpu_utils.tfutils import get_activation - -SMALL_NUMBER = 1e-7 - - -GGNNWeights = namedtuple('GGNNWeights', ['edge_weights', - 'edge_biases', - 'edge_type_attention_weights', - 'rnn_cells', - 'edge_feature_gate_weights', - 'edge_feature_gate_bias']) - - -class SparseGGNN: - def __init__(self, params: Dict[str, Any]): - self.params = params - self.num_edge_types = self.params['n_edge_types'] - assert self.num_edge_types > 0, 'GNN should have at least one edge type' - h_dim = self.params['hidden_size'] - - edge_feature_sizes = self.params.get('edge_features_size', {}) # type: Dict[int, int] - - if self.params['add_backwards_edges']: - effective_num_edge_types = self.num_edge_types * 2 - else: - effective_num_edge_types = self.num_edge_types - - message_aggregation_type = self.params.get('message_aggregation', 'sum') - if message_aggregation_type == 'sum': - self.unsorted_segment_aggregation_func = tf.unsorted_segment_sum - elif message_aggregation_type == 'max': - self.unsorted_segment_aggregation_func = tf.unsorted_segment_max - else: - raise Exception('Unrecognized message_aggregation type %s' % message_aggregation_type) - - # Generate per-layer values for edge weights, biases and gated units. If we tie them, they are just copies: - self.__weights = GGNNWeights([], [], [], [], [], []) - for layer_idx in range(len(self.params['layer_timesteps'])): - with tf.variable_scope('gnn_layer_%i' % layer_idx): - edge_weights = tf.get_variable(name='gnn_edge_weights', - shape=[effective_num_edge_types * h_dim, h_dim], - initializer=tf.glorot_normal_initializer()) - edge_weights = tf.reshape(edge_weights, [effective_num_edge_types, h_dim, h_dim]) - self.__weights.edge_weights.append(edge_weights) - - if self.params['use_propagation_attention']: - self.__weights.edge_type_attention_weights.append(tf.get_variable(name='edge_type_attention_weights', - shape=[effective_num_edge_types], - initializer=tf.ones_initializer())) - - self.__weights.edge_feature_gate_weights.append({}) - self.__weights.edge_feature_gate_bias.append({}) - for edge_type, edge_feature_size in edge_feature_sizes.items(): - self.__weights.edge_feature_gate_weights[layer_idx][edge_type] = \ - tf.get_variable(name='gnn_edge_%i_feature_gate_weights' % (edge_type,), - shape=[2 * edge_feature_size, 1], - initializer=tf.ones_initializer()) - self.__weights.edge_feature_gate_bias[layer_idx][edge_type] = \ - tf.get_variable(name='gnn_edge_%i_feature_gate_bias' % (edge_type,), - shape=[1], - initializer=tf.zeros_initializer()) - if self.params['add_backwards_edges']: - self.__weights.edge_feature_gate_weights[layer_idx][self.num_edge_types + edge_type] = \ - tf.get_variable(name='gnn_edge_%i_feature_gate_weights' % (self.num_edge_types + edge_type,), - shape=[2 * edge_feature_size, 1], - initializer=tf.ones_initializer()) - self.__weights.edge_feature_gate_bias[layer_idx][self.num_edge_types + edge_type] = \ - tf.get_variable(name='gnn_edge_%i_feature_gate_bias' % (self.num_edge_types + edge_type,), - shape=[1], - initializer=tf.zeros_initializer()) - - if self.params['use_edge_bias']: - self.__weights.edge_biases.append(tf.get_variable(name='gnn_edge_biases', - shape=[effective_num_edge_types, h_dim], - initializer=tf.zeros_initializer())) - - cell = self.__create_rnn_cell(h_dim) - self.__weights.rnn_cells.append(cell) - - def __create_rnn_cell(self, h_dim: int): - activation_name = self.params['graph_rnn_activation'].lower() - activation_fun = get_activation(activation_name) - - cell_type = self.params['graph_rnn_cell'].lower() - if cell_type == 'gru': - cell = tf.nn.rnn_cell.GRUCell(h_dim, activation=activation_fun) - elif cell_type == 'rnn': - cell = tf.nn.rnn_cell.BasicRNNCell(h_dim, activation=activation_fun) - else: - raise Exception("Unknown RNN cell type '%s'." % cell_type) - return cell - - def sparse_gnn_layer(self, - dropout_keep_rate: tf.Tensor, - node_embeddings: tf.Tensor, - adjacency_lists: List[tf.Tensor], - num_incoming_edges_per_type: Optional[tf.Tensor], - num_outgoing_edges_per_type: Optional[tf.Tensor], - edge_features: Dict[int, tf.Tensor]) -> tf.Tensor: - """ - Run through a GNN and return the representations of the nodes. - :param dropout_keep_rate: See name. - :param node_embeddings: the initial embeddings of the nodes. - :param adjacency_lists: a list of *sorted* adjacency indexes per edge type - :param num_incoming_edges_per_type: [v, num_edge_types] tensor indicating number of incoming edges per type. - Required if use_edge_bias or use_edge_msg_avg_aggregation is true. - :param num_outgoing_edges_per_type: [v, num_edge_types] tensor indicating number of incoming edges per type. - Required if add_backwards_edges and (use_edge_bias or use_edge_msg_avg_aggregation) is true. - :param edge_features: a dictionary of edge_type -> num_edges x feature_length for the edges that have features. - :return: the representations of the nodes - """ - # Used shape abbreviations: - # V ~ number of nodes - # D ~ state dimension - # E ~ number of edges of current type - # M ~ number of messages (sum of all E) - message_targets = [] # list of tensors of message targets of shape [E] - message_edge_types = [] # list of tensors of edge type of shape [E] - - # Note that we optionally support adding (implicit) backwards edges. If turned on, we introduce additional - # edge type indices [self.num_edge_types .. 2*self.num_edge_types - 1], with their own weights. - - for edge_type_idx, adjacency_list_for_edge_type in enumerate(adjacency_lists): - edge_targets = adjacency_list_for_edge_type[:, 1] - message_targets.append(edge_targets) - message_edge_types.append(tf.ones_like(edge_targets, dtype=tf.int32) * edge_type_idx) - if self.params['add_backwards_edges']: - for edge_type_idx, adjacency_list_for_edge_type in enumerate(adjacency_lists): - edge_targets = adjacency_list_for_edge_type[:, 0] - message_targets.append(edge_targets) - message_edge_types.append(tf.ones_like(edge_targets, dtype=tf.int32) * (self.num_edge_types + edge_type_idx )) - message_targets = tf.concat(message_targets, axis=0) # Shape [M] - message_edge_types = tf.concat(message_edge_types, axis=0) # Shape [M] - - with tf.variable_scope('gnn_scope'): - node_states_per_layer = [] # list of tensors of shape [V, D], one entry per layer (the final state of that layer) - node_states_per_layer.append(node_embeddings) - num_nodes = tf.shape(node_embeddings, out_type=tf.int32)[0] - - for (layer_idx, num_timesteps) in enumerate(self.params['layer_timesteps']): - with tf.variable_scope('gnn_layer_%i' % layer_idx): - # Extract residual messages, if any: - layer_residual_connections = self.params['residual_connections'].get(str(layer_idx)) - if layer_residual_connections is None: - layer_residual_states = [] - else: - layer_residual_states = [node_states_per_layer[residual_layer_idx] - for residual_layer_idx in layer_residual_connections] - - if self.params['use_propagation_attention']: - message_edge_type_factors = tf.nn.embedding_lookup(params=self.__weights.edge_type_attention_weights[layer_idx], - ids=message_edge_types) # Shape [M] - - # Record new states for this layer. Initialised to last state, but will be updated below: - node_states_per_layer.append(node_states_per_layer[-1]) - - for step in range(num_timesteps): - with tf.variable_scope('timestep_%i' % step): - messages = [] # list of tensors of messages of shape [E, D] - message_source_states = [] # list of tensors of edge source states of shape [E, D] - - # Collect incoming messages per edge type - def compute_messages_for_edge_type(data_edge_type_idx: int, weights_edge_type_idx: int, edge_sources: tf.Tensor) -> None: - edge_source_states = tf.nn.embedding_lookup(params=node_states_per_layer[-1], - ids=edge_sources) # Shape [E, D] - edge_weights = tf.nn.dropout(self.__weights.edge_weights[layer_idx][weights_edge_type_idx], - rate=1-dropout_keep_rate) - all_messages_for_edge_type = tf.matmul(edge_source_states, edge_weights) # Shape [E, D] - - if data_edge_type_idx in edge_features: - edge_feature_augmented = tf.concat([edge_features[data_edge_type_idx], - 1 / (edge_features[data_edge_type_idx] + SMALL_NUMBER)], - axis=-1) # Shape [E, 2*edge_size] - all_messages_gate_value = \ - tf.sigmoid(self.__weights.edge_feature_gate_bias[layer_idx][weights_edge_type_idx] - + tf.matmul(edge_feature_augmented, - self.__weights.edge_feature_gate_weights[layer_idx][weights_edge_type_idx])) # Shape [E, 1] - all_messages_for_edge_type = all_messages_gate_value * all_messages_for_edge_type - - messages.append(all_messages_for_edge_type) - message_source_states.append(edge_source_states) - - for edge_type_idx, adjacency_list_for_edge_type in enumerate(adjacency_lists): - compute_messages_for_edge_type(edge_type_idx, edge_type_idx, adjacency_list_for_edge_type[:, 0]) - if self.params['add_backwards_edges']: - for edge_type_idx, adjacency_list_for_edge_type in enumerate(adjacency_lists): - compute_messages_for_edge_type(edge_type_idx, self.num_edge_types + edge_type_idx, adjacency_list_for_edge_type[:, 1]) - - messages = tf.concat(messages, axis=0) # Shape [M, D] - - if self.params['use_propagation_attention']: - message_source_states = tf.concat(message_source_states, axis=0) # Shape [M, D] - message_target_states = tf.nn.embedding_lookup(params=node_states_per_layer[-1], - ids=message_targets) # Shape [M, D] - message_attention_scores = tf.einsum('mi,mi->m', message_source_states, message_target_states) # Shape [M] - message_attention_scores = message_attention_scores * message_edge_type_factors - - message_log_attention = unsorted_segment_log_softmax(logits=message_attention_scores, - segment_ids=message_targets, - num_segments=num_nodes) - - message_attention = tf.exp(message_log_attention) # Shape [M] - # Step (4): Weight messages using the attention prob: - messages = messages * tf.expand_dims(message_attention, -1) - - incoming_messages = self.unsorted_segment_aggregation_func(data=messages, - segment_ids=message_targets, - num_segments=num_nodes) # Shape [V, D] - - if self.params['use_edge_bias']: - incoming_messages += tf.matmul(num_incoming_edges_per_type, - self.__weights.edge_biases[layer_idx][0:self.num_edge_types]) # Shape [V, D] - if self.params['add_backwards_edges']: - incoming_messages += tf.matmul(num_outgoing_edges_per_type, - self.__weights.edge_biases[layer_idx][self.num_edge_types:]) # Shape [V, D] - - if self.params['use_edge_msg_avg_aggregation']: - num_incoming_edges = tf.reduce_sum(num_incoming_edges_per_type, - keep_dims=True, axis=-1) # Shape [V, 1] - if self.params['add_backwards_edges']: - num_incoming_edges += tf.reduce_sum(num_outgoing_edges_per_type, - keep_dims=True, axis=-1) # Shape [V, 1] - incoming_messages /= num_incoming_edges + SMALL_NUMBER - - incoming_information = tf.concat(layer_residual_states + [incoming_messages], - axis=-1) # Shape [V, D*(1 + num of residual connections)] - - # pass updated vertex features into RNN cell - node_states_per_layer[-1] = self.__weights.rnn_cells[layer_idx](incoming_information, - node_states_per_layer[-1])[1] # Shape [V, D] - - return node_states_per_layer[-1] diff --git a/python/dpu_utils/tfutils/__init__.py b/python/dpu_utils/tfutils/__init__.py deleted file mode 100755 index 44006c9..0000000 --- a/python/dpu_utils/tfutils/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .gradratiologgingoptimizer import GradRatioLoggingOptimizer -from .unsortedsegmentops import unsorted_segment_log_softmax, unsorted_segment_logsumexp, unsorted_segment_softmax -from .tfvariablesaver import TFVariableSaver -from .pick_indices import pick_indices_from_probs -from .activation import get_activation diff --git a/python/dpu_utils/tfutils/activation.py b/python/dpu_utils/tfutils/activation.py deleted file mode 100644 index 11326ab..0000000 --- a/python/dpu_utils/tfutils/activation.py +++ /dev/null @@ -1,29 +0,0 @@ -from typing import Optional, Callable - -import tensorflow as tf - -__all__ = [ 'get_activation' ] - -def get_activation(activation_fun: Optional[str]) -> Optional[Callable]: - if activation_fun is None: - return None - activation_fun = activation_fun.lower() - if activation_fun == 'linear': - return None - if activation_fun == 'tanh': - return tf.tanh - if activation_fun == 'relu': - return tf.nn.relu - if activation_fun == 'leaky_relu': - return tf.nn.leaky_relu - if activation_fun == 'elu': - return tf.nn.elu - if activation_fun == 'selu': - return tf.nn.selu - if activation_fun == 'gelu': - def gelu(input_tensor): - cdf = 0.5 * (1.0 + tf.erf(input_tensor / tf.sqrt(2.0))) - return input_tensor * cdf - return gelu - else: - raise ValueError("Unknown activation function '%s'!" % activation_fun) \ No newline at end of file diff --git a/python/dpu_utils/tfutils/gradratiologgingoptimizer.py b/python/dpu_utils/tfutils/gradratiologgingoptimizer.py deleted file mode 100644 index a613a8a..0000000 --- a/python/dpu_utils/tfutils/gradratiologgingoptimizer.py +++ /dev/null @@ -1,42 +0,0 @@ -from collections import OrderedDict - -import tensorflow as tf -from tensorflow.python.ops import control_flow_ops - - -class GradRatioLoggingOptimizer: - """Wraps optimizers that compute the ratio of the update to the parameter values.""" - def __init__(self, optimizer, name='training-optimizer'): - self.__optimizer = optimizer - self.__name = name - self.__acc_count = tf.Variable(0, dtype=tf.int32, trainable=False) - self.__grad_ratio_acc_vars = OrderedDict() # type: OrderedDict[str, tf.Variable] - - @property - def optimizer(self): - return self.__optimizer - - def print_ratios(self, session: tf.Session): - count = self.__acc_count.eval(session) + 1e-10 - print('======================') - print('Gradient Ratios') - print('======================') - for name, acc in self.__grad_ratio_acc_vars.items(): - print('%s: %.2e' % (name, acc.eval(session) / count)) - - reset_ops = [tf.assign(self.__acc_count, 0)] + [tf.assign(v, 0) for v in self.__grad_ratio_acc_vars.values()] - session.run(reset_ops) - - def minimize(self, loss): - update_ops = [tf.assign_add(self.__acc_count, 1)] - gradients_and_vars = self.__optimizer.compute_gradients(loss) - for grad, var in gradients_and_vars: - if grad is None: - continue - grad_ratio = tf.sqrt(tf.reduce_sum(tf.pow(grad, 2)) / tf.reduce_sum(tf.pow(var, 2))) - ratio_acc_var = tf.Variable(0, trainable=False, dtype=tf.float32) - self.__grad_ratio_acc_vars[var.name] = ratio_acc_var - update_ops.append(tf.assign_add(ratio_acc_var, grad_ratio)) - grad_apply_op = self.__optimizer.apply_gradients(gradients_and_vars) - update_ops.append(grad_apply_op) - return control_flow_ops.group(*update_ops) \ No newline at end of file diff --git a/python/dpu_utils/tfutils/pick_indices.py b/python/dpu_utils/tfutils/pick_indices.py deleted file mode 100755 index 4f02c41..0000000 --- a/python/dpu_utils/tfutils/pick_indices.py +++ /dev/null @@ -1,34 +0,0 @@ -import random -from typing import Iterable - -import numpy as np - -BIG_NUMBER = 1e7 -SMALL_NUMBER = 1e-7 - - -def pick_indices_from_probs(probs: np.ndarray, num_picks: int, use_sampling: bool=False, - temperature: float=0.5) -> Iterable[int]: - """Given an array of probabilities, pick up to num_samples unique indices from it.""" - if use_sampling: - # First, consider the temperature for sampling: - probs = probs ** (1.0 / temperature) - normaliser = np.sum(probs) - probs = probs / normaliser - - probs_cum = np.cumsum(probs) - probs_cum[-1] = 1.0 # To protect against floating point oddness - picked_indices = set() - remaining_picks = num_picks * 10 - while len(picked_indices) < num_picks and remaining_picks > 0: - remaining_picks -= 1 - picked_val = random.random() - picked_index = np.argmax(probs_cum >= picked_val) # type: int - if picked_index not in picked_indices and probs[picked_index] > SMALL_NUMBER: - picked_indices.add(picked_index) - return picked_indices - else: - num_samples = min(num_picks, len(probs)) - top_k_indices = np.argpartition(probs, -num_samples)[-num_samples:] - top_k_indices = [index for index in top_k_indices if probs[index] > SMALL_NUMBER] - return top_k_indices diff --git a/python/dpu_utils/tfutils/tfvariablesaver.py b/python/dpu_utils/tfutils/tfvariablesaver.py deleted file mode 100644 index 9f77cf3..0000000 --- a/python/dpu_utils/tfutils/tfvariablesaver.py +++ /dev/null @@ -1,45 +0,0 @@ -import tensorflow as tf -from typing import Callable, Dict, Optional -import numpy as np - - -class TFVariableSaver: - """ - Save all variables in the graph and restore them, in a way that the values are serializable by pickle. - """ - def __init__(self): - self.__saved_variables = {} # type: Dict[str, np.ndarray] - - def save_all(self, session: tf.Session, exclude_variable: Optional[Callable[[str], bool]]=None) -> None: - self.__saved_variables = {} - for variable in session.graph.get_collection(tf.GraphKeys.GLOBAL_VARIABLES): - assert variable.name not in self.__saved_variables - if exclude_variable is not None and exclude_variable(variable.name): - continue - self.__saved_variables[variable.name] = variable.value().eval() - - def has_saved_variables(self) -> bool: - return len(self.__saved_variables) > 0 - - def restore_saved_values(self, session: tf.Session) -> None: - assert len(self.__saved_variables) > 0 - save_ops = [] - with tf.name_scope("restore"): - for variable in session.graph.get_collection(tf.GraphKeys.GLOBAL_VARIABLES): - if variable.name in self.__saved_variables: - saved_value = self.__saved_variables[variable.name] - if len(variable.shape) == 0 or variable.shape[0]._value == saved_value.shape[0]: # Scalars or the size hasn't changed. - save_ops.append(variable.assign(saved_value)) - else: - # Allow expanding saved variables - print('Store value for %s has shape %s but the variable has shape %s. Padding with zeros.' - % (variable.name, saved_value.shape, variable.shape)) - - initial_value = np.zeros([variable.shape[i]._value for i in range(len(variable.shape))], - dtype=variable.dtype.as_numpy_dtype) - initial_value[:saved_value.shape[0]] = saved_value - save_ops.append(variable.assign(initial_value)) - else: - print('Initializing %s from random since no saved value was found.' % variable.name) - save_ops.append(tf.variables_initializer([variable])) - session.run(save_ops) diff --git a/python/dpu_utils/tfutils/unsortedsegmentops.py b/python/dpu_utils/tfutils/unsortedsegmentops.py deleted file mode 100644 index d1d4d84..0000000 --- a/python/dpu_utils/tfutils/unsortedsegmentops.py +++ /dev/null @@ -1,54 +0,0 @@ -import tensorflow as tf - -SMALL_NUMBER = 1e-7 - -def unsorted_segment_logsumexp(scores, segment_ids, num_segments): - """Perform an unsorted segment safe logsumexp.""" - # Note: if a segment is empty, the smallest value for the score will be returned, - # which yields the correct behavior - max_per_segment = tf.unsorted_segment_max(data=scores, - segment_ids=segment_ids, - num_segments=num_segments) - scattered_log_maxes = tf.gather(params=max_per_segment, - indices=segment_ids) - recentered_scores = scores - scattered_log_maxes - exped_recentered_scores = tf.exp(recentered_scores) - - per_segment_sums = tf.unsorted_segment_sum(exped_recentered_scores, segment_ids, num_segments) - per_segment_logs = tf.log(per_segment_sums) - return per_segment_logs + max_per_segment - - -def unsorted_segment_log_softmax(logits, segment_ids, num_segments): - """Perform an unsorted segment safe log_softmax.""" - # Note: if a segment is empty, the smallest value for the score will be returned, - # which yields the correct behavior - max_per_segment = tf.unsorted_segment_max(data=logits, - segment_ids=segment_ids, - num_segments=num_segments) - scattered_maxes = tf.gather(params=max_per_segment, - indices=segment_ids) - recentered_scores = logits - scattered_maxes - exped_recentered_scores = tf.exp(recentered_scores) - - per_segment_sums = tf.unsorted_segment_sum(exped_recentered_scores, segment_ids, num_segments) - per_segment_normalization_consts = tf.log(per_segment_sums) - - log_probs = recentered_scores - tf.gather(params=per_segment_normalization_consts, indices=segment_ids) - return log_probs - - -def unsorted_segment_softmax(logits, segment_ids, num_segments): - """Perform a safe unsorted segment softmax.""" - max_per_segment = tf.unsorted_segment_max(data=logits, - segment_ids=segment_ids, - num_segments=num_segments) - scattered_maxes = tf.gather(params=max_per_segment, - indices=segment_ids) - recentered_scores = logits - scattered_maxes - exped_recentered_scores = tf.exp(recentered_scores) - - per_segment_sums = tf.unsorted_segment_sum(exped_recentered_scores, segment_ids, num_segments) - - probs = exped_recentered_scores / (tf.gather(params=per_segment_sums, indices=segment_ids) + SMALL_NUMBER) - return probs diff --git a/python/setup.py b/python/setup.py index 95c8af9..85744dd 100755 --- a/python/setup.py +++ b/python/setup.py @@ -8,7 +8,7 @@ setuptools.setup( name='dpu_utils', - version='0.6.0', + version='1.0.0', license='MIT', description='Python utilities used by Deep Procedural Intelligence', long_description=long_description, diff --git a/python/tests/ptutils/__init__.py b/python/tests/ptutils/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/python/tests/ptutils/test_component.py b/python/tests/ptutils/test_component.py deleted file mode 100644 index b2ddba7..0000000 --- a/python/tests/ptutils/test_component.py +++ /dev/null @@ -1,83 +0,0 @@ -import unittest -import tempfile -from typing import Tuple, Iterator, Iterable - -import torch - -from dpu_utils.utils import RichPath -from dpu_utils.ptutils import ComponentTrainer - -from tests.ptutils.testdata import SyntheticData -from tests.ptutils.testmodel import SimpleRegression, SampleDatapoint - - -class TestPytorchComponent(unittest.TestCase): - def test_train_model(self): - num_features = 100 - training_data, validation_data = self.__get_data(num_features) - - with tempfile.TemporaryDirectory() as dir: - model_file = RichPath.create(dir).join('tmp.pkl.gz') - - model = SimpleRegression('SimpleRegressionTest', num_features) - trainer = ComponentTrainer(model, model_file, max_num_epochs=50) - trainer.train(training_data, validation_data, parallel_minibatch_creation=True) - model_acc_1 = self.__compute_accuracy(model, validation_data) - - trained_model = SimpleRegression.restore_model(model_file) # type: SimpleRegression - trained_model_acc = self.__compute_accuracy(trained_model, validation_data) - self.assertGreater(trained_model_acc, .95, f'Model achieves too low accuracy, {trained_model_acc:%}') - - self.assertAlmostEqual(trained_model_acc, model_acc_1, places=3, msg=f'Accuracy before and after loading does not match: {trained_model_acc} vs {model_acc_1}') - - def test_freeze_params(self): - num_features = 100 - training_data, validation_data = self.__get_data(num_features) - - with tempfile.TemporaryDirectory() as dir: - model_file = RichPath.create(dir).join('tmp.pkl.gz') - - model = SimpleRegression('SimpleRegressionTest', num_features) - trainer = ComponentTrainer(model, model_file, max_num_epochs=50) - - def get_freeze_weights(): - for p in model.parameters(): - if len(p.shape) == 2: # Just the weights - yield p - - trainer.train(training_data, validation_data, get_parameters_to_freeze=lambda: set(get_freeze_weights())) - trained_model_acc = self.__compute_accuracy(model, validation_data) - - self.assertLess(trained_model_acc, .7, f'Model achieves too high accuracy but the weights were frozen, {trained_model_acc:%}') - - - def __get_data(self, num_features): - data = SyntheticData(num_features) - all_data = list(data.generate(10000)) - training_data, validation_data = all_data[:9000], all_data[9000:] - return training_data, validation_data - - def __compute_accuracy(self, model: SimpleRegression, dataset: Iterable[SampleDatapoint]) -> float: - num_samples = 0 - num_correct = 0 - for point, prediction in self.__get_model_prediction(model, dataset): - num_samples += 1 - if point.target_class == prediction: - num_correct += 1 - return num_correct / num_samples - - def __get_model_prediction(self, model: SimpleRegression, data: Iterable[SampleDatapoint]) -> Iterator[Tuple[SampleDatapoint, bool]]: - for datapoint in data: - tensorized = model.load_data_from_sample(datapoint) - mb_data = model.initialize_minibatch() - model.extend_minibatch_by_sample(tensorized, mb_data) - mb_data = model.finalize_minibatch(mb_data) - - with torch.no_grad(): - predictions = model.predict(mb_data['inputs']).cpu().numpy() - yield datapoint, predictions[0] - - - -if __name__ == '__main__': - unittest.main() diff --git a/python/tests/ptutils/testdata.py b/python/tests/ptutils/testdata.py deleted file mode 100644 index 359c329..0000000 --- a/python/tests/ptutils/testdata.py +++ /dev/null @@ -1,20 +0,0 @@ -from typing import Iterator - -import numpy as np - -from tests.ptutils.testmodel import SampleDatapoint - - -class SyntheticData: - def __init__(self, num_features: int): - self.__num_features = num_features - self.__weights = np.random.randn(num_features) * 10 - - def generate(self, num_points: int) -> Iterator[SampleDatapoint]: - for _ in range(num_points): - # Avoid bias so that there is no obvious class imbalance. - input_features = np.random.randn(self.__num_features) * 5 - yield SampleDatapoint( - input_features=list(input_features), - target_class=sum(input_features * self.__weights) >= 0 - ) \ No newline at end of file diff --git a/python/tests/ptutils/testmodel.py b/python/tests/ptutils/testmodel.py deleted file mode 100644 index 084c733..0000000 --- a/python/tests/ptutils/testmodel.py +++ /dev/null @@ -1,68 +0,0 @@ -from typing import NamedTuple, List, Optional, Dict, Any - -import numpy as np -import torch -from torch import nn as nn - -from dpu_utils.ptutils import BaseComponent - - -class SampleDatapoint(NamedTuple): - input_features: List[float] - target_class: bool - - -class TensorizedDatapoint(NamedTuple): - input_features: np.ndarray - target_class: np.ndarray - - -class SimpleRegression(BaseComponent[SampleDatapoint, TensorizedDatapoint]): - """A simple linear regression model used for testing.""" - def __init__(self, name, num_features: int, hyperparameters: Optional[Dict[str, Any]] = None): - super(SimpleRegression, self).__init__(name, hyperparameters) - self.__num_features = num_features - - @classmethod - def default_hyperparameters(cls) -> Dict[str, Any]: - return {} - - def _load_metadata_from_sample(self, data_to_load: SampleDatapoint) -> None: - pass # No metadata in this simple model. - - def _finalize_component_metadata_and_model(self) -> None: - self.__layer = nn.Linear(self.__num_features, 1, bias=False) - self.__bias = nn.Parameter(torch.tensor(0, dtype=torch.float32)) # Use a separate bias to allow freezing the weights. - self.__loss = nn.BCEWithLogitsLoss() - - def load_data_from_sample(self, data_to_load: SampleDatapoint) -> Optional[TensorizedDatapoint]: - return TensorizedDatapoint( - input_features=np.array(data_to_load.input_features, dtype=np.float32), - target_class=np.array(1 if data_to_load.target_class else 0, dtype=np.float32) - ) - - def initialize_minibatch(self) -> Dict[str, Any]: - return { - 'inputs': [], - 'targets': [] - } - - def extend_minibatch_by_sample(self, datapoint: TensorizedDatapoint, accumulated_minibatch_data: Dict[str, Any]) -> bool: - accumulated_minibatch_data['inputs'].append(datapoint.input_features) - accumulated_minibatch_data['targets'].append(datapoint.target_class) - return True - - def finalize_minibatch(self, accumulated_minibatch_data: Dict[str, Any]) -> Dict[str, Any]: - return { - 'inputs': torch.tensor(np.stack(accumulated_minibatch_data['inputs'], axis=0), device=self.device), - 'targets': torch.tensor(np.stack(accumulated_minibatch_data['targets'], axis=0), device=self.device) - } - - def predict(self, inputs: torch.Tensor): - predicted = self.__layer(inputs)[:, 0] + self.__bias # B - return predicted >= 0 - - def forward(self, inputs, targets): - predicted = self.__layer(inputs)[:, 0] + self.__bias # B - loss = self.__loss(input=predicted, target=targets) - return loss \ No newline at end of file