from abc import abstractmethod
import multiprocessing
import threading
from typing import Callable
from merlin.core import analysistask
[docs]class Executor(object):
def __init__(self):
super().__init__()
[docs] @abstractmethod
def run(self, task: analysistask.AnalysisTask, index: int=None,
rerunCompleted: bool=False) -> None:
"""Run an analysis task.
This method will not run analysis tasks that are already currently
running and analysis is terminated early due to error or otherwise
will not be restarted.
Args:
task: the analysis task to run.
index: index of the analysis to run for a parallel analysis task.
rerunCompleted: flag indicating if previous analysis should be
run again even if it has previously completed. If overwrite
is True, analysis will be run on the task regardless of its
status. If overwrite is False, analysis will only be run on
the task or fragments of the task that have either not been
started or have previously completed in error.
"""
pass
[docs]class LocalExecutor(Executor):
def __init__(self, coreCount=None):
super().__init__()
if coreCount is None:
self.coreCount = int(multiprocessing.cpu_count()*0.7)
else:
self.coreCount = coreCount
[docs] def run(self, task: analysistask.AnalysisTask, index: int=None,
rerunCompleted: bool=False) -> None:
if task.is_complete() and not rerunCompleted:
return
if index is not None:
task.run(index)
else:
task.run()