import os
import json
import shutil
import pandas
import numpy as np
import tifffile
import importlib
import time
import logging
import pickle
import datetime
from matplotlib import pyplot as plt
from typing import List
from typing import Tuple
from typing import Union
from typing import Dict
from typing import Optional
import h5py
import tables
import xmltodict
from merlin.util import imagereader
import merlin
from merlin.core import analysistask
from merlin.data import dataorganization
from merlin.data import codebook
from merlin.util import dataportal
TaskOrName = Union[analysistask.AnalysisTask, str]
[docs]class DataSet(object):
def __init__(self, dataDirectoryName: str,
dataHome: str = None, analysisHome: str = None):
"""Create a dataset for the specified raw data.
Args:
dataDirectoryName: the relative directory to the raw data
dataHome: the base path to the data. The data is expected
to be in dataHome/dataDirectoryName. If dataHome
is not specified, DATA_HOME is read from the
.env file.
analysisHome: the base path for storing analysis results. Analysis
results for this DataSet will be stored in
analysisHome/dataDirectoryName. If analysisHome is not
specified, ANALYSIS_HOME is read from the .env file.
"""
if dataHome is None:
dataHome = merlin.DATA_HOME
if analysisHome is None:
analysisHome = merlin.ANALYSIS_HOME
self.dataSetName = dataDirectoryName
self.dataHome = dataHome
self.analysisHome = analysisHome
self.rawDataPath = os.sep.join([dataHome, dataDirectoryName])
self.rawDataPortal = dataportal.DataPortal.create_portal(
self.rawDataPath)
if not self.rawDataPortal.is_available():
print('The raw data is not available at %s'.format(
self.rawDataPath))
self.analysisPath = os.sep.join([analysisHome, dataDirectoryName])
os.makedirs(self.analysisPath, exist_ok=True)
self.logPath = os.sep.join([self.analysisPath, 'logs'])
os.makedirs(self.logPath, exist_ok=True)
self._store_dataset_metadata()
def _store_dataset_metadata(self) -> None:
try:
oldMetadata = self.load_json_analysis_result('dataset', None)
if not merlin.is_compatible(oldMetadata['merlin_version']):
raise merlin.IncompatibleVersionException(
('Analysis was performed on dataset %s with MERlin '
+ 'version %s, which is not compatible with the current '
+ 'MERlin version %s')
% (self.dataSetName, oldMetadata['version'],
merlin.version()))
except FileNotFoundError:
newMetadata = {
'merlin_version': merlin.version(),
'module': type(self).__module__,
'class': type(self).__name__,
'dataset_name': self.dataSetName,
'creation_date': str(datetime.datetime.now())
}
self.save_json_analysis_result(newMetadata, 'dataset', None)
[docs] def save_workflow(self, workflowString: str) -> str:
""" Save a snakemake workflow for analysis of this dataset.
Args:
workflowString: a string containing the snakemake workflow
to save
Returns: the path to the saved workflow
"""
snakemakePath = self.get_snakemake_path()
os.makedirs(snakemakePath, exist_ok=True)
workflowPath = os.sep.join(
[snakemakePath, datetime.datetime.now().strftime('%y%m%d_%H%M%S')])\
+ '.Snakefile'
with open(workflowPath, 'w') as outFile:
outFile.write(workflowString)
return workflowPath
[docs] def get_snakemake_path(self) -> str:
"""Get the directory for storing files related to snakemake.
Returns: the snakemake path as a string
"""
return os.sep.join([self.analysisPath, 'snakemake'])
[docs] def get_analysis_image_set(
self, analysisTask: TaskOrName, imageBaseName: str,
imageIndex: int = None) -> np.ndarray:
"""Get an analysis image set saved in the analysis for this data set.
Args:
analysisTask: the analysis task that generated and stored the
image set.
imageBaseName: the base name of the image
imageIndex: index of the image set to retrieve
"""
return tifffile.imread(self._analysis_image_name(
analysisTask, imageBaseName, imageIndex))
[docs] def get_analysis_image(
self, analysisTask: TaskOrName, imageBaseName: str, imageIndex: int,
imagesPerSlice: int, sliceIndex: int,
frameIndex: int) -> np.ndarray:
"""Get an image from an image set save in the analysis for this
data set.
Args:
analysisTask: the analysis task that generated and stored the
image set.
imageBaseName: the base name of the image
imageIndex: index of the image set to retrieve
imagesPerSlice: the number of images in each slice of the image
file
sliceIndex: the index of the slice to get the image
frameIndex: the index of the frame in the specified slice
"""
# TODO - It may be useful to add a function that gets all
# frames in a slice
imageFile = tifffile.TiffFile(self._analysis_image_name(
analysisTask, imageBaseName, imageIndex))
indexInFile = sliceIndex*imagesPerSlice + frameIndex
return imageFile.asarray(key=int(indexInFile))
[docs] def writer_for_analysis_images(
self, analysisTask: TaskOrName, imageBaseName: str,
imageIndex: int = None, imagej: bool = True) -> tifffile.TiffWriter:
"""Get a writer for writing tiff files from an analysis task.
Args:
analysisTask:
imageBaseName:
imageIndex:
imagej:
Returns:
"""
return tifffile.TiffWriter(self._analysis_image_name(
analysisTask, imageBaseName, imageIndex), imagej=imagej)
[docs] @staticmethod
def analysis_tiff_description(sliceCount: int, frameCount: int) -> Dict:
imageDescription = {'ImageJ': '1.47a\n',
'images': sliceCount*frameCount,
'channels': 1,
'slices': sliceCount,
'frames': frameCount,
'hyperstack': True,
'loop': False}
return imageDescription
def _analysis_image_name(self, analysisTask: TaskOrName,
imageBaseName: str, imageIndex: int) -> str:
destPath = self.get_analysis_subdirectory(
analysisTask, subdirectory='images')
if imageIndex is None:
return os.sep.join([destPath, imageBaseName+'.tif'])
else:
return os.sep.join([destPath, imageBaseName+str(imageIndex)+'.tif'])
def _analysis_result_save_path(
self, resultName: str, analysisTask: TaskOrName,
resultIndex: int=None, subdirectory: str=None,
fileExtension: str=None) -> str:
saveName = resultName
if resultIndex is not None:
saveName += '_' + str(resultIndex)
if fileExtension is not None:
saveName += fileExtension
if analysisTask is None:
return os.sep.join([self.analysisPath, saveName])
else:
return os.sep.join([self.get_analysis_subdirectory(
analysisTask, subdirectory), saveName])
[docs] def list_analysis_files(self, analysisTask: TaskOrName = None,
subdirectory: str = None, extension: str = None,
fullPath: bool = True) -> List[str]:
basePath = self._analysis_result_save_path(
'', analysisTask, subdirectory=subdirectory)
fileList = os.listdir(basePath)
if extension:
fileList = [x for x in fileList if x.endswith(extension)]
if fullPath:
fileList = [os.path.join(basePath, x) for x in fileList]
return fileList
[docs] def save_dataframe_to_csv(
self, dataframe: pandas.DataFrame, resultName: str,
analysisTask: TaskOrName = None, resultIndex: int = None,
subdirectory: str = None, **kwargs) -> None:
"""Save a pandas data frame to a csv file stored in this dataset.
If a previous pandas data frame has been save with the same resultName,
it will be overwritten
Args:
dataframe: the data frame to save
resultName: the name of the output file
analysisTask: the analysis task that the dataframe should be
saved under. If None, the dataframe is saved to the
data set root.
resultIndex: index of the dataframe to save or None if no index
should be specified
subdirectory: subdirectory of the analysis task that the dataframe
should be saved to or None if the dataframe should be
saved to the root directory for the analysis task.
**kwargs: arguments to pass on to pandas.to_csv
"""
savePath = self._analysis_result_save_path(
resultName, analysisTask, resultIndex, subdirectory, '.csv')
with open(savePath, 'w') as f:
dataframe.to_csv(f, **kwargs)
[docs] def load_dataframe_from_csv(
self, resultName: str, analysisTask: TaskOrName = None,
resultIndex: int = None, subdirectory: str = None,
**kwargs) -> Union[pandas.DataFrame, None]:
"""Load a pandas data frame from a csv file stored in this data set.
Args:
resultName:
analysisTask:
resultIndex:
subdirectory:
**kwargs:
Returns:
the pandas data frame
Raises:
FileNotFoundError: if the file does not exist
"""
savePath = self._analysis_result_save_path(
resultName, analysisTask, resultIndex, subdirectory, '.csv') \
with open(savePath, 'r') as f:
return pandas.read_csv(f, **kwargs)
[docs] def open_pandas_hdfstore(self, mode: str, resultName: str,
analysisName: str, resultIndex: int = None,
subdirectory: str = None) -> pandas.HDFStore:
savePath = self._analysis_result_save_path(
resultName, analysisName, resultIndex, subdirectory, '.h5')
return pandas.HDFStore(savePath, mode=mode)
[docs] def delete_pandas_hdfstore(
self, resultName: str, analysisTask: TaskOrName = None,
resultIndex: int = None, subdirectory: str = None) -> None:
hPath = self._analysis_result_save_path(
resultName, analysisTask, resultIndex, subdirectory, '.h5')
if os.path.exists(hPath):
os.remove(hPath)
[docs] def open_table(self, mode: str, resultName: str, analysisName: str,
resultIndex: int = None, subdirectory: str = None
) -> tables.file:
savePath = self._analysis_result_save_path(
resultName, analysisName, resultIndex, subdirectory, '.h5')
return tables.open_file(savePath, mode=mode)
[docs] def delete_table(self, resultName: str, analysisTask: TaskOrName = None,
resultIndex: int = None, subdirectory: str = None) -> None:
"""Delete an hdf5 file stored in this data set if it exists.
Args:
resultName: the name of the output file
analysisTask: the analysis task that should be associated with this
hdf5 file. If None, the file is assumed to be in the
data set root.
resultIndex: index of the dataframe to save or None if no index
should be specified
subdirectory: subdirectory of the analysis task that the dataframe
should be saved to or None if the dataframe should be
saved to the root directory for the analysis task.
"""
hPath = self._analysis_result_save_path(
resultName, analysisTask, resultIndex, subdirectory, '.h5')
if os.path.exists(hPath):
os.remove(hPath)
[docs] def open_hdf5_file(self, mode: str, resultName: str,
analysisTask: TaskOrName = None, resultIndex: int = None,
subdirectory: str = None) -> h5py.File:
"""Open an hdf5 file stored in this data set.
Args:
mode: the mode for opening the file, either 'r', 'r+', 'w', 'w-',
or 'a'.
resultName: the name of the output file
analysisTask: the analysis task that should be associated with this
hdf5 file. If None, the file is assumed to be in the
data set root.
resultIndex: index of the dataframe to save or None if no index
should be specified
subdirectory: subdirectory of the analysis task that the dataframe
should be saved to or None if the dataframe should be
saved to the root directory for the analysis task.
Returns:
a h5py file object connected to the hdf5 file
Raise:
FileNotFoundError: if the mode is 'r' and the specified hdf5 file
does not exist
"""
hPath = self._analysis_result_save_path(
resultName, analysisTask, resultIndex, subdirectory, '.hdf5') \
if mode == 'r' and not os.path.exists(hPath):
raise FileNotFoundError(('Unable to open %s for reading since ' +
'it does not exist.') % hPath)
return h5py.File(hPath, mode)
[docs] def delete_hdf5_file(self, resultName: str, analysisTask: TaskOrName = None,
resultIndex: int = None, subdirectory: str = None
) -> None:
"""Delete an hdf5 file stored in this data set if it exists.
Args:
resultName: the name of the output file
analysisTask: the analysis task that should be associated with this
hdf5 file. If None, the file is assumed to be in the
data set root.
resultIndex: index of the dataframe to save or None if no index
should be specified
subdirectory: subdirectory of the analysis task that the dataframe
should be saved to or None if the dataframe should be
saved to the root directory for the analysis task.
"""
hPath = self._analysis_result_save_path(
resultName, analysisTask, resultIndex, subdirectory, '.hdf5') \
if os.path.exists(hPath):
os.remove(hPath)
[docs] def save_json_analysis_result(
self, analysisResult: Dict, resultName: str,
analysisName: str, resultIndex: int = None,
subdirectory: str = None) -> None:
savePath = self._analysis_result_save_path(
resultName, analysisName, resultIndex, subdirectory, '.json')
with open(savePath, 'w') as f:
json.dump(analysisResult, f)
[docs] def load_json_analysis_result(
self, resultName: str, analysisName: str, resultIndex: int = None,
subdirectory: str = None) -> Dict:
savePath = self._analysis_result_save_path(
resultName, analysisName, resultIndex, subdirectory, '.json')
with open(savePath, 'r') as f:
return json.load(f)
[docs] def load_pickle_analysis_result(
self, resultName: str, analysisName: str, resultIndex: int = None,
subdirectory: str = None) -> Dict:
savePath = self._analysis_result_save_path(
resultName, analysisName, resultIndex, subdirectory, '.pkl')
with open(savePath, 'rb') as f:
return pickle.load(f)
[docs] def save_pickle_analysis_result(
self, analysisResult, resultName: str, analysisName: str,
resultIndex: int = None, subdirectory: str = None):
savePath = self._analysis_result_save_path(
resultName, analysisName, resultIndex, subdirectory, '.pkl')
with open(savePath, 'wb') as f:
pickle.dump(analysisResult, f)
[docs] def save_numpy_analysis_result(
self, analysisResult: np.ndarray, resultName: str,
analysisName: str, resultIndex: int = None,
subdirectory: str = None) -> None:
savePath = self._analysis_result_save_path(
resultName, analysisName, resultIndex, subdirectory)
np.save(savePath, analysisResult)
[docs] def save_numpy_txt_analysis_result(
self, analysisResult: np.ndarray, resultName: str,
analysisName: str, resultIndex: int = None,
subdirectory: str = None) -> None:
savePath = self._analysis_result_save_path(
resultName, analysisName, resultIndex, subdirectory)
np.savetxt(savePath + '.csv', analysisResult)
[docs] def load_numpy_analysis_result(
self, resultName: str, analysisName: str, resultIndex: int = None,
subdirectory: str = None) -> np.array:
savePath = self._analysis_result_save_path(
resultName, analysisName, resultIndex, subdirectory, '.npy')
return np.load(savePath, allow_pickle=True)
[docs] def load_numpy_analysis_result_if_available(
self, resultName: str, analysisName: str, defaultValue,
resultIndex: int = None, subdirectory: str = None) -> np.array:
""" Load the specified analysis result or return the specified default
value if the analysis result does not exist.
Args:
resultName: The name of the analysis result
analysisName: The name of the analysis task the result is saved in
defaultValue: The value to return if the specified analysis result
does not exist
resultIndex: The index of the analysi result
subdirectory: The subdirectory within the analysis task that the
result is saved in
Returns: The analysis result or defaultValue if the analysis result
doesn't exist.
"""
try:
return self.load_numpy_analysis_result(
resultName, analysisName, resultIndex, subdirectory)
except IOError:
return defaultValue
[docs] def get_analysis_subdirectory(
self, analysisTask: TaskOrName, subdirectory: str = None,
create: bool = True) -> str:
"""
analysisTask can either be the class or a string containing the
class name.
create - Flag indicating if the analysis subdirectory should be
created if it does not already exist.
"""
if isinstance(analysisTask, analysistask.AnalysisTask):
analysisName = analysisTask.get_analysis_name()
else:
analysisName = analysisTask
if subdirectory is None:
subdirectoryPath = os.sep.join(
[self.analysisPath, analysisName])
else:
subdirectoryPath = os.sep.join(
[self.analysisPath, analysisName, subdirectory])
if create:
os.makedirs(subdirectoryPath, exist_ok=True)
return subdirectoryPath
[docs] def get_task_subdirectory(self, analysisTask: TaskOrName):
return self.get_analysis_subdirectory(
analysisTask, subdirectory='tasks')
[docs] def get_log_subdirectory(self, analysisTask: TaskOrName):
return self.get_analysis_subdirectory(
analysisTask, subdirectory='log')
[docs] def save_analysis_task(self, analysisTask: analysistask.AnalysisTask,
overwrite: bool = False):
saveName = os.sep.join([self.get_task_subdirectory(
analysisTask), 'task.json'])
try:
existingTask = self.load_analysis_task(
analysisTask.get_analysis_name())
existingParameters = existingTask.get_parameters().copy()
existingVersion = existingParameters['merlin_version']
newParameters = analysisTask.get_parameters().copy()
newVersion = newParameters['merlin_version']
if not merlin.is_compatible(existingVersion, newVersion):
raise merlin.IncompatibleVersionException(
('Analysis task with name %s has been previously created '
+ 'with MERlin version %s, which is incompatible with '
+ 'the current MERlin version, %s. Please remove the '
+ 'old analysis folder to continue.')
% (analysisTask.analysisName, existingVersion, newVersion))
existingParameters.pop('merlin_version')
newParameters.pop('merlin_version')
if not overwrite and not existingParameters == newParameters:
raise analysistask.AnalysisAlreadyExistsException(
('Analysis task with name %s already exists in this ' +
'data set with different parameters.')
% analysisTask.get_analysis_name())
except FileNotFoundError:
pass
with open(saveName, 'w') as outFile:
json.dump(analysisTask.get_parameters(), outFile, indent=4)
[docs] def load_analysis_task(self, analysisTaskName: str) \
-> analysistask.AnalysisTask:
loadName = os.sep.join([self.get_task_subdirectory(
analysisTaskName), 'task.json'])
with open(loadName, 'r') as inFile:
parameters = json.load(inFile)
analysisModule = importlib.import_module(parameters['module'])
analysisTask = getattr(analysisModule, parameters['class'])
return analysisTask(self, parameters, analysisTaskName)
[docs] def delete_analysis(self, analysisTask: TaskOrName) -> None:
"""
Remove all files associated with the provided analysis
from this data set.
Before deleting an analysis task, it must be verified that the
analysis task is not running.
"""
analysisDirectory = self.get_analysis_subdirectory(analysisTask)
shutil.rmtree(analysisDirectory)
[docs] def get_analysis_tasks(self) -> List[str]:
"""
Get a list of the analysis tasks within this dataset.
Returns: A list of the analysis task names.
"""
analysisList = []
for a in os.listdir(self.analysisPath):
if os.path.isdir(os.path.join(self.analysisPath, a)):
if os.path.exists(
os.path.join(self.analysisPath, a, 'tasks')):
analysisList.append(a)
analysisList.sort()
return analysisList
[docs] def analysis_exists(self, analysisTaskName: str) -> bool:
"""
Determine if an analysis task with the specified name exists in this
dataset.
"""
analysisPath = self.get_analysis_subdirectory(
analysisTaskName, create=False)
return os.path.exists(analysisPath)
[docs] def get_logger(self, analysisTask: analysistask.AnalysisTask,
fragmentIndex: int = None) -> logging.Logger:
loggerName = analysisTask.get_analysis_name()
if fragmentIndex is not None:
loggerName += '.' + str(fragmentIndex)
logger = logging.getLogger(loggerName)
logger.setLevel(logging.DEBUG)
fileHandler = logging.FileHandler(
self._log_path(analysisTask, fragmentIndex))
fileHandler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
return logger
[docs] def close_logger(self, analysisTask: analysistask.AnalysisTask,
fragmentIndex: int = None) -> None:
loggerName = analysisTask.get_analysis_name()
if fragmentIndex is not None:
loggerName += '.' + str(fragmentIndex)
logger = logging.getLogger(loggerName)
handlerList = list(logger.handlers)
for handler in handlerList:
logger.removeHandler(handler)
handler.flush()
handler.close()
def _log_path(self, analysisTask: analysistask.AnalysisTask,
fragmentIndex: int = None) -> str:
logName = analysisTask.get_analysis_name()
if fragmentIndex is not None:
logName += '_' + str(fragmentIndex)
logName += '.log'
return os.sep.join([self.get_log_subdirectory(analysisTask), logName])
def _analysis_status_file(self, analysisTask: analysistask.AnalysisTask,
eventName: str, fragmentIndex: int = None) -> str:
if isinstance(analysisTask, str):
analysisTask = self.load_analysis_task(analysisTask)
if fragmentIndex is None:
fileName = analysisTask.get_analysis_name() + '.' + eventName
else:
fileName = analysisTask.get_analysis_name() + \
'_' + str(fragmentIndex) + '.' + eventName
return os.sep.join([self.get_task_subdirectory(analysisTask),
fileName])
[docs] def get_analysis_environment(self, analysisTask: analysistask.AnalysisTask,
fragmentIndex: int = None) -> None:
"""Get the environment variables for the system used to run the
specified analysis task.
Args:
analysisTask: The completed analysis task to get the environment
variables for.
fragmentIndex: The fragment index of the analysis task to
get the environment variables for.
Returns: A dictionary of the environment variables. If the job has not
yet run, then None is returned.
"""
if not self.check_analysis_done(analysisTask, fragmentIndex):
return None
fileName = self._analysis_status_file(
analysisTask, 'environment', fragmentIndex)
with open(fileName, 'r') as inFile:
envDict = json.load(inFile)
return envDict
def _record_analysis_environment(
self, analysisTask: analysistask.AnalysisTask,
fragmentIndex: int = None) -> None:
fileName = self._analysis_status_file(
analysisTask, 'environment', fragmentIndex)
with open(fileName, 'w') as outFile:
json.dump(dict(os.environ), outFile, indent=4)
[docs] def record_analysis_started(self, analysisTask: analysistask.AnalysisTask,
fragmentIndex: int = None) -> None:
self._record_analysis_event(analysisTask, 'start', fragmentIndex)
self._record_analysis_environment(analysisTask, fragmentIndex)
[docs] def record_analysis_running(self, analysisTask: analysistask.AnalysisTask,
fragmentIndex: int = None) -> None:
self._record_analysis_event(analysisTask, 'run', fragmentIndex)
[docs] def record_analysis_complete(self, analysisTask: analysistask.AnalysisTask,
fragmentIndex: int = None) -> None:
self._record_analysis_event(analysisTask, 'done', fragmentIndex)
[docs] def record_analysis_error(self, analysisTask: analysistask.AnalysisTask,
fragmentIndex: int = None) -> None:
self._record_analysis_event(analysisTask, 'error', fragmentIndex)
[docs] def get_analysis_start_time(self, analysisTask: analysistask.AnalysisTask,
fragmentIndex: int = None) -> float:
"""Get the time that this analysis task started
Returns:
The start time for the analysis task execution in seconds since
the epoch in UTC.
"""
with open(self._analysis_status_file(analysisTask, 'start',
fragmentIndex), 'r') as f:
return float(f.read())
[docs] def get_analysis_complete_time(self,
analysisTask: analysistask.AnalysisTask,
fragmentIndex: int = None) -> float:
"""Get the time that this analysis task completed.
Returns:
The completion time for the analysis task execution in seconds since
the epoch in UTC.
"""
with open(self._analysis_status_file(analysisTask, 'done',
fragmentIndex), 'r') as f:
return float(f.read())
[docs] def get_analysis_elapsed_time(self, analysisTask: analysistask.AnalysisTask,
fragmentIndex: int=None) -> float:
"""Get the time that this analysis took to complete.
Returns:
The elapsed time for the analysis task execution in seconds.
Returns None if the analysis task has not yet completed.
"""
return self.get_analysis_complete_time(analysisTask, fragmentIndex) -\
self.get_analysis_start_time(analysisTask, fragmentIndex)
def _record_analysis_event(
self, analysisTask: analysistask.AnalysisTask, eventName: str,
fragmentIndex: int = None) -> None:
fileName = self._analysis_status_file(
analysisTask, eventName, fragmentIndex)
with open(fileName, 'w') as f:
f.write('%s' % time.time())
def _check_analysis_event(
self, analysisTask: analysistask.AnalysisTask, eventName: str,
fragmentIndex: int = None) -> bool:
fileName = self._analysis_status_file(
analysisTask, eventName, fragmentIndex)
return os.path.exists(fileName)
def _reset_analysis_event(
self, analysisTask: analysistask.AnalysisTask, eventName: str,
fragmentIndex: int = None):
fileName = self._analysis_status_file(
analysisTask, eventName, fragmentIndex)
try:
os.remove(fileName)
except FileNotFoundError:
pass
[docs] def is_analysis_idle(self, analysisTask: analysistask.AnalysisTask,
fragmentIndex: int = None) -> bool:
fileName = self._analysis_status_file(
analysisTask, 'run', fragmentIndex)
try:
return time.time() - os.path.getmtime(fileName) > 120
except FileNotFoundError:
return True
[docs] def check_analysis_started(self, analysisTask: analysistask.AnalysisTask,
fragmentIndex: int = None) -> bool:
return self._check_analysis_event(analysisTask, 'start', fragmentIndex)
[docs] def check_analysis_done(self, analysisTask: analysistask.AnalysisTask,
fragmentIndex: int = None) -> bool:
return self._check_analysis_event(analysisTask, 'done', fragmentIndex)
[docs] def analysis_done_filename(self, analysisTask: analysistask.AnalysisTask,
fragmentIndex: int = None) -> str:
return self._analysis_status_file(analysisTask, 'done', fragmentIndex)
[docs] def check_analysis_error(self, analysisTask: analysistask.AnalysisTask,
fragmentIndex: int = None) -> bool:
return self._check_analysis_event(analysisTask, 'error', fragmentIndex)
[docs] def reset_analysis_status(self, analysisTask: analysistask.AnalysisTask,
fragmentIndex: int = None):
if analysisTask.is_running():
raise analysistask.AnalysisAlreadyStartedException()
self._reset_analysis_event(analysisTask, 'start', fragmentIndex)
self._reset_analysis_event(analysisTask, 'run', fragmentIndex)
self._reset_analysis_event(analysisTask, 'done', fragmentIndex)
self._reset_analysis_event(analysisTask, 'error', fragmentIndex)
[docs]class ImageDataSet(DataSet):
def __init__(self, dataDirectoryName: str, dataHome: str = None,
analysisHome: str = None,
microscopeParametersName: str = None):
"""Create a dataset for the specified raw data.
Args:
dataDirectoryName: the relative directory to the raw data
dataHome: the base path to the data. The data is expected
to be in dataHome/dataDirectoryName. If dataHome
is not specified, DATA_HOME is read from the
.env file.
analysisHome: the base path for storing analysis results. Analysis
results for this DataSet will be stored in
analysisHome/dataDirectoryName. If analysisHome is not
specified, ANALYSIS_HOME is read from the .env file.
microscopeParametersName: the name of the microscope parameters
file that specifies properties of the microscope used
to acquire the images represented by this ImageDataSet
"""
super().__init__(dataDirectoryName, dataHome, analysisHome)
if microscopeParametersName is not None:
self._import_microscope_parameters(microscopeParametersName)
self._load_microscope_parameters()
[docs] def get_image_file_names(self):
return sorted(self.rawDataPortal.list_files(
extensionList=['.dax', '.tif', '.tiff']))
[docs] def load_image(self, imagePath, frameIndex):
with imagereader.infer_reader(
self.rawDataPortal.open_file(imagePath)) as reader:
imageIn = reader.load_frame(int(frameIndex))
if self.transpose:
imageIn = np.transpose(imageIn)
if self.flipHorizontal:
imageIn = np.flip(imageIn, axis=1)
if self.flipVertical:
imageIn = np.flip(imageIn, axis=0)
return imageIn
[docs] def image_stack_size(self, imagePath):
"""
Get the size of the image stack stored in the specified image path.
Returns:
a three element list with [width, height, frameCount] or None
if the file does not exist
"""
with imagereader.infer_reader(self.rawDataPortal.open_file(imagePath)
) as reader:
return reader.film_size()
def _import_microscope_parameters(self, microscopeParametersName):
sourcePath = os.sep.join([merlin.MICROSCOPE_PARAMETERS_HOME,
microscopeParametersName])
destPath = os.sep.join(
[self.analysisPath, 'microscope_parameters.json'])
shutil.copyfile(sourcePath, destPath)
def _load_microscope_parameters(self):
path = os.sep.join(
[self.analysisPath, 'microscope_parameters.json'])
if os.path.exists(path):
with open(path) as inputFile:
self.microscopeParameters = json.load(inputFile)
else:
self.microscopeParameters = {}
self.flipHorizontal = self.microscopeParameters.get(
'flip_horizontal', True)
self.flipVertical = self.microscopeParameters.get(
'flip_vertical', False)
self.transpose = self.microscopeParameters.get('transpose', True)
self.micronsPerPixel = self.microscopeParameters.get(
'microns_per_pixel', 0.108)
self.imageDimensions = self.microscopeParameters.get(
'image_dimensions', [2048, 2048])
[docs] def get_microns_per_pixel(self):
"""Get the conversion factor to convert pixels to microns."""
return self.micronsPerPixel
[docs] def get_image_dimensions(self):
"""Get the dimensions of the images in this data set.
Returns:
A tuple containing the width and height of each image in pixels.
"""
return self.imageDimensions
[docs]class MERFISHDataSet(ImageDataSet):
def __init__(self, dataDirectoryName: str, codebookNames: List[str] = None,
dataOrganizationName: str = None, positionFileName: str = None,
dataHome: str = None, analysisHome: str = None,
microscopeParametersName: str = None):
"""Create a MERFISH dataset for the specified raw data.
Args:
dataDirectoryName: the relative directory to the raw data
codebookNames: A list of the names of codebooks to use. The codebook
should be present in the analysis parameters
directory. Full paths can be provided for codebooks
present other directories.
dataOrganizationName: the name of the data organization to use.
The data organization should be present in the analysis
parameters directory. A full path can be provided for
a codebook present in another directory.
positionFileName: the name of the position file to use.
dataHome: the base path to the data. The data is expected
to be in dataHome/dataDirectoryName. If dataHome
is not specified, DATA_HOME is read from the
.env file.
analysisHome: the base path for storing analysis results. Analysis
results for this DataSet will be stored in
analysisHome/dataDirectoryName. If analysisHome is not
specified, ANALYSIS_HOME is read from the .env file.
microscopeParametersName: the name of the microscope parameters
file that specifies properties of the microscope used
to acquire the images represented by this ImageDataSet
"""
super().__init__(dataDirectoryName, dataHome, analysisHome,
microscopeParametersName)
self.dataOrganization = dataorganization.DataOrganization(
self, dataOrganizationName)
if codebookNames:
self.codebooks = [codebook.Codebook(self, name, i)
for i, name in enumerate(codebookNames)]
else:
self.codebooks = self.load_codebooks()
if positionFileName is not None:
self._import_positions(positionFileName)
self._load_positions()
[docs] def save_codebook(self, codebook: codebook.Codebook) -> None:
""" Store the specified codebook in this dataset.
If a codebook with the same codebook index and codebook name as the
specified codebook already exists in this dataset, it is not
overwritten.
Args:
codebook: the codebook to store
Raises:
FileExistsError: If a codebook with the same codebook index but
a different codebook name is already save within this dataset.
"""
existingCodebookName = self.get_stored_codebook_name(
codebook.get_codebook_index())
if existingCodebookName and existingCodebookName \
!= codebook.get_codebook_name():
raise FileExistsError(('Unable to save codebook %s with index %i '
+ ' since codebook %s already exists with '
+ 'the same index')
% (codebook.get_codebook_name(),
codebook.get_codebook_index(),
existingCodebookName))
if not existingCodebookName:
self.save_dataframe_to_csv(
codebook.get_data(),
'_'.join(['codebook', str(codebook.get_codebook_index()),
codebook.get_codebook_name()]), index=False)
[docs] def load_codebooks(self) -> List[codebook.Codebook]:
""" Get all the codebooks stored within this dataset.
Returns:
A list of all the stored codebooks.
"""
codebookList = []
currentIndex = 0
currentCodebook = self.load_codebook(currentIndex)
while currentCodebook is not None:
codebookList.append(currentCodebook)
currentIndex += 1
currentCodebook = self.load_codebook(currentIndex)
return codebookList
[docs] def load_codebook(self, codebookIndex: int = 0
) -> Optional[codebook.Codebook]:
""" Load the codebook stored within this dataset with the specified
index.
Args:
codebookIndex: the index of the codebook to load.
Returns:
The codebook stored with the specified codebook index. If no
codebook exists with the specified index then None is returned.
"""
codebookFile = [x for x in self.list_analysis_files(extension='.csv')
if ('codebook_%i_' % codebookIndex) in x]
if len(codebookFile) < 1:
return None
codebookName = '_'.join(os.path.splitext(os.path.basename(
codebookFile[0]))[0].split('_')[2:])
return codebook.Codebook(
self, codebookFile[0], codebookIndex, codebookName)
[docs] def get_stored_codebook_name(self, codebookIndex: int = 0) -> Optional[str]:
""" Get the name of the codebook stored within this dataset with the
specified index.
Args:
codebookIndex: the index of the codebook to load to find the name
of.
Returns:
The name of the codebook stored with the specified codebook index.
If no codebook exists with the specified index then None is
returned.
"""
codebookFile = [x for x in self.list_analysis_files(extension='.csv')
if ('codebook_%i_' % codebookIndex) in x]
if len(codebookFile) < 1:
return None
return '_'.join(os.path.splitext(
os.path.basename(codebookFile[0]))[0].split('_')[2:])
[docs] def get_codebooks(self) -> List[codebook.Codebook]:
""" Get the codebooks associated with this dataset.
Returns:
A list containing the codebooks for this dataset.
"""
return self.codebooks
[docs] def get_codebook(self, codebookIndex: int = 0) -> codebook.Codebook:
return self.codebooks[codebookIndex]
[docs] def get_data_organization(self) -> dataorganization.DataOrganization:
return self.dataOrganization
[docs] def get_stage_positions(self) -> List[List[float]]:
return self.positions
[docs] def get_fov_offset(self, fov: int) -> Tuple[float, float]:
"""Get the offset of the specified fov in the global coordinate system.
This offset is based on the anticipated stage position.
Args:
fov: index of the field of view
Returns:
A tuple specifying the x and y offset of the top right corner
of the specified fov in pixels.
"""
# TODO - this should be implemented using the position of the fov.
return self.positions.loc[fov]['X'], self.positions.loc[fov]['Y']
[docs] def z_index_to_position(self, zIndex: int) -> float:
"""Get the z position associated with the provided z index."""
return self.get_z_positions()[zIndex]
[docs] def position_to_z_index(self, zPosition: float) -> int:
"""Get the z index associated with the specified z position
Raises:
Exception: If the provided z position is not specified in this
dataset
"""
zIndex = np.where(self.get_z_positions() == zPosition)[0]
if len(zIndex) == 0:
raise Exception('Requested z=%0.2f position not found.' % zPosition)
return zIndex[0]
[docs] def get_z_positions(self) -> List[float]:
"""Get the z positions present in this dataset.
Returns:
A sorted list of all unique z positions
"""
return self.dataOrganization.get_z_positions()
[docs] def get_fovs(self) -> List[int]:
return self.dataOrganization.get_fovs()
[docs] def get_imaging_rounds(self) -> List[int]:
# TODO - check this function
return np.unique(self.dataOrganization.fileMap['imagingRound'])
[docs] def get_raw_image(self, dataChannel, fov, zPosition):
return self.load_image(
self.dataOrganization.get_image_filename(dataChannel, fov),
self.dataOrganization.get_image_frame_index(
dataChannel, zPosition))
[docs] def get_fiducial_image(self, dataChannel, fov):
return self.load_image(
self.dataOrganization.get_fiducial_filename(dataChannel, fov),
self.dataOrganization.get_fiducial_frame_index(dataChannel))
def _import_positions_from_metadata(self):
positionData = []
for f in self.get_fovs():
metadata = self.get_image_xml_metadata(
self.dataOrganization.get_image_filename(0, f))
currentPositions = \
metadata['settings']['acquisition']['stage_position']['#text'] \
.split(',')
positionData.append([float(x) for x in currentPositions])
positionPath = os.sep.join([self.analysisPath, 'positions.csv'])
np.savetxt(positionPath, np.array(positionData), delimiter=',')
def _load_positions(self):
positionPath = os.sep.join([self.analysisPath, 'positions.csv'])
if not os.path.exists(positionPath):
self._import_positions_from_metadata()
self.positions = pandas.read_csv(
positionPath, header=None, names=['X', 'Y'])
def _import_positions(self, positionFileName):
sourcePath = os.sep.join([merlin.POSITION_HOME, positionFileName])
destPath = os.sep.join([self.analysisPath, 'positions.csv'])
shutil.copyfile(sourcePath, destPath)
def _convert_parameter_list(self, listIn, castFunction, delimiter=';'):
return [castFunction(x) for x in listIn.split(delimiter) if len(x)>0]