Source code for merlin.core.analysistask

import copy
from abc import ABC, abstractmethod
import threading
import multiprocessing
from typing import List

import merlin


[docs]class AnalysisAlreadyStartedException(Exception): pass
[docs]class AnalysisAlreadyExistsException(Exception): pass
[docs]class InvalidParameterException(Exception): pass
[docs]class AnalysisTask(ABC): """ An abstract class for performing analysis on a DataSet. Subclasses should implement the analysis to perform in the run_analysis() function. """ def __init__(self, dataSet, parameters=None, analysisName=None): """Creates an AnalysisTask object that performs analysis on the specified DataSet. Args: dataSet: the DataSet to run analysis on. parameters: a dictionary containing parameters used to run the analysis. analysisName: specifies a unique identifier for this AnalysisTask. If analysisName is not set, the analysis name will default to the name of the class. """ self.dataSet = dataSet if parameters is None: self.parameters = {} else: self.parameters = copy.deepcopy(parameters) if analysisName is None: self.analysisName = type(self).__name__ else: self.analysisName = analysisName if 'merlin_version' not in self.parameters: self.parameters['merlin_version'] = merlin.version() else: if not merlin.is_compatible(self.parameters['merlin_version']): raise merlin.IncompatibleVersionException( ('Analysis task %s has already been created by MERlin ' + 'version %s, which is incompatible with the current ' + 'MERlin version, %s') % (self.analysisName, self.parameters['merlin_version'], merlin.version())) self.parameters['module'] = type(self).__module__ self.parameters['class'] = type(self).__name__ if 'codebookNum' in self.parameters: self.codebookNum = self.parameters['codebookNum']
[docs] def save(self, overwrite=False) -> None: """Save a copy of this AnalysisTask into the data set. Args: overwrite: flag indicating if an existing analysis task with the same name as this analysis task should be overwritten even if the specified parameters are different. Raises: AnalysisAlreadyExistsException: if an analysis task with the same name as this analysis task already exists in the data set with different parameters. """ self.dataSet.save_analysis_task(self, overwrite)
[docs] def run(self, overwrite=True) -> None: """Run this AnalysisTask. Upon completion of the analysis, this function informs the DataSet that analysis is complete. Args: overwrite: flag indicating if previous analysis from this analysis task should be overwritten. Raises: AnalysisAlreadyStartedException: if this analysis task is currently already running or if overwrite is not True and this analysis task has already completed or exited with an error. """ logger = self.dataSet.get_logger(self) logger.info('Beginning ' + self.get_analysis_name()) try: if self.is_running(): raise AnalysisAlreadyStartedException( 'Unable to run %s since it is already running' % self.analysisName) if overwrite: self._reset_analysis() if self.is_complete() or self.is_error(): raise AnalysisAlreadyStartedException( 'Unable to run %s since it has already run' % self.analysisName) self.dataSet.record_analysis_started(self) self._indicate_running() self._run_analysis() self.dataSet.record_analysis_complete(self) logger.info('Completed ' + self.get_analysis_name()) self.dataSet.close_logger(self) except Exception as e: logger.exception(e) self.dataSet.record_analysis_error(self) self.dataSet.close_logger(self) raise e
def _reset_analysis(self) -> None: """Remove files created by this analysis task and remove markers indicating that this analysis has been started, or has completed. This function should be overridden by subclasses so that they can delete the analysis files. """ self.dataSet.reset_analysis_status(self) def _indicate_running(self) -> None: """A loop that regularly signals to the dataset that this analysis task is still running successfully. Once this function is called, the dataset will be notified every minute that this analysis is still running until the analysis completes. """ if self.is_complete() or self.is_error(): return self.dataSet.record_analysis_running(self) self.runTimer = threading.Timer(30, self._indicate_running) self.runTimer.daemon = True self.runTimer.start() @abstractmethod def _run_analysis(self) -> None: """Perform the analysis for this AnalysisTask. This function should be implemented in all subclasses with the logic to complete the analysis. """ pass
[docs] @abstractmethod def get_estimated_memory(self) -> float: """Get an estimate of how much memory is required for this AnalysisTask. Returns: a memory estimate in megabytes. """ pass
[docs] @abstractmethod def get_estimated_time(self) -> float: """Get an estimate for the amount of time required to complete this AnalysisTask. Returns: a time estimate in minutes. """ pass
[docs] @abstractmethod def get_dependencies(self) -> List[str]: """Get the analysis tasks that must be completed before this analysis task can proceed. Returns: a list containing the names of the analysis tasks that this analysis task depends on. If there are no dependencies, an empty list is returned. """ pass
[docs] def get_parameters(self): """Get the parameters for this analysis task. Returns: the parameter dictionary """ return self.parameters
[docs] def is_error(self): """Determines if an error has occurred while running this analysis Returns: True if the analysis is complete and otherwise False. """ return self.dataSet.check_analysis_error(self)
[docs] def is_complete(self): """Determines if this analysis has completed successfully Returns: True if the analysis is complete and otherwise False. """ return self.dataSet.check_analysis_done(self)
[docs] def is_started(self): """Determines if this analysis has started. Returns: True if the analysis has begun and otherwise False. """ return self.dataSet.check_analysis_started(self)
[docs] def is_running(self): """Determines if this analysis task is expected to be running, but has unexpectedly stopped for more than two minutes. """ if not self.is_started(): return False if self.is_complete(): return False return not self.dataSet.is_analysis_idle(self)
[docs] def get_analysis_name(self): """Get the name for this AnalysisTask. Returns: the name of this AnalysisTask """ return self.analysisName
[docs] def is_parallel(self): """Determine if this analysis task uses multiple cores.""" return False
[docs]class InternallyParallelAnalysisTask(AnalysisTask): """ An abstract class for analysis that can only be run in one part, but can internally be sped up using multiple processes. Subclasses should implement the analysis to perform in te run_analysis() function. """ def __init__(self, dataSet, parameters=None, analysisName=None): super().__init__(dataSet, parameters, analysisName) self.coreCount = multiprocessing.cpu_count()
[docs] def set_core_count(self, coreCount): """Set the number of parallel processes this analysis task is allowed to use. """ self.coreCount = coreCount
[docs] def is_parallel(self): return True
[docs]class ParallelAnalysisTask(AnalysisTask): # TODO - this can be restructured so that AnalysisTask is instead a subclass # of ParallelAnalysisTask where fragment count is set to 1. This could # help remove some of the redundant code """ An abstract class for analysis that can be run in multiple parts independently. Subclasses should implement the analysis to perform in the run_analysis() function """ def __init__(self, dataSet, parameters=None, analysisName=None): super().__init__(dataSet, parameters, analysisName)
[docs] @abstractmethod def fragment_count(self): pass
[docs] def run(self, fragmentIndex: int=None, overwrite=True) -> None: """Run the specified index of this analysis task. If fragment index is not provided. All fragments for this analysis task are run in serial. Args: fragmentIndex: the index of the analysis fragment to run or None if all fragments should be run. """ if fragmentIndex is None: for i in range(self.fragment_count()): self.run(i, overwrite) else: logger = self.dataSet.get_logger(self, fragmentIndex) logger.info( 'Beginning %s %i' % (self.get_analysis_name(), fragmentIndex)) try: if self.is_running(fragmentIndex): raise AnalysisAlreadyStartedException( ('Unable to run %s fragment %i since it is already ' + 'running') % (self.analysisName, fragmentIndex)) if overwrite: self._reset_analysis(fragmentIndex) if self.is_complete(fragmentIndex) \ or self.is_error(fragmentIndex): raise AnalysisAlreadyStartedException( 'Unable to run %s fragment %i since it has already run' % (self.analysisName, fragmentIndex)) self.dataSet.record_analysis_started(self, fragmentIndex) self._indicate_running(fragmentIndex) self._run_analysis(fragmentIndex) self.dataSet.record_analysis_complete(self, fragmentIndex) logger.info('Completed %s %i' % (self.get_analysis_name(), fragmentIndex)) self.dataSet.close_logger(self, fragmentIndex) except Exception as e: logger.exception(e) self.dataSet.record_analysis_error(self, fragmentIndex) self.dataSet.close_logger(self, fragmentIndex) raise e
def _reset_analysis(self, fragmentIndex: int=None) -> None: """Remove files created by this analysis task and remove markers indicating that this analysis has been started, or has completed. """ if fragmentIndex is None: for i in range(self.fragment_count()): self._reset_analysis(i) else: self.dataSet.reset_analysis_status(self, fragmentIndex) def _indicate_running(self, fragmentIndex: int) -> None: """A loop that regularly signals to the dataset that this analysis task is still running successfully. Once this function is called, the dataset will be notified every minute that this analysis is still running until the analysis completes. """ if self.is_complete(fragmentIndex) or self.is_error(fragmentIndex): return self.dataSet.record_analysis_running(self, fragmentIndex) self.runTimer = threading.Timer( 30, self._indicate_running, [fragmentIndex]) self.runTimer.daemon = True self.runTimer.start() @abstractmethod def _run_analysis(self, fragmentIndex): pass
[docs] def is_error(self, fragmentIndex=None): if fragmentIndex is None: for i in range(self.fragment_count()): if self.is_error(i): return True return False else: return self.dataSet.check_analysis_error(self, fragmentIndex)
[docs] def is_complete(self, fragmentIndex=None): if fragmentIndex is None: for i in range(self.fragment_count()): if not self.is_complete(i): return False return True else: return self.dataSet.check_analysis_done(self, fragmentIndex)
[docs] def is_started(self, fragmentIndex=None): if fragmentIndex is None: for i in range(self.fragment_count()): if self.is_started(i): return True return False else: return self.dataSet.check_analysis_started(self, fragmentIndex)
[docs] def is_running(self, fragmentIndex=None): if not self.is_started(fragmentIndex): return False if self.is_complete(fragmentIndex): return False return not self.dataSet.is_analysis_idle(self, fragmentIndex)
[docs] def is_parallel(self): return True