Source code for merlin.analysis.preprocess

import os
import cv2
import numpy as np

from merlin.core import analysistask
from merlin.util import deconvolve
from merlin.util import aberration
from merlin.data import codebook


[docs]class Preprocess(analysistask.ParallelAnalysisTask): """ An abstract class for preparing data for barcode calling. """ def _image_name(self, fov): destPath = self.dataSet.get_analysis_subdirectory( self.analysisName, subdirectory='preprocessed_images') return os.sep.join([destPath, 'fov_' + str(fov) + '.tif'])
[docs] def get_pixel_histogram(self, fov=None): if fov is not None: return self.dataSet.load_numpy_analysis_result( 'pixel_histogram', self.analysisName, fov, 'histograms') pixelHistogram = np.zeros(self.get_pixel_histogram( self.dataSet.get_fovs()[0]).shape) for f in self.dataSet.get_fovs(): pixelHistogram += self.get_pixel_histogram(f) return pixelHistogram
def _save_pixel_histogram(self, histogram, fov): self.dataSet.save_numpy_analysis_result( histogram, 'pixel_histogram', self.analysisName, fov, 'histograms')
[docs]class DeconvolutionPreprocess(Preprocess): def __init__(self, dataSet, parameters=None, analysisName=None): super().__init__(dataSet, parameters, analysisName) if 'highpass_sigma' not in self.parameters: self.parameters['highpass_sigma'] = 3 if 'decon_sigma' not in self.parameters: self.parameters['decon_sigma'] = 2 if 'decon_filter_size' not in self.parameters: self.parameters['decon_filter_size'] = \ int(2 * np.ceil(2 * self.parameters['decon_sigma']) + 1) if 'decon_iterations' not in self.parameters: self.parameters['decon_iterations'] = 20 if 'codebook_index' not in self.parameters: self.parameters['codebook_index'] = 0 self._highPassSigma = self.parameters['highpass_sigma'] self._deconSigma = self.parameters['decon_sigma'] self._deconIterations = self.parameters['decon_iterations'] self.warpTask = self.dataSet.load_analysis_task( self.parameters['warp_task'])
[docs] def fragment_count(self): return len(self.dataSet.get_fovs())
[docs] def get_estimated_memory(self): return 2048
[docs] def get_estimated_time(self): return 5
[docs] def get_dependencies(self): return [self.parameters['warp_task']]
[docs] def get_codebook(self) -> codebook.Codebook: return self.dataSet.get_codebook(self.parameters['codebook_index'])
[docs] def get_processed_image_set( self, fov, zIndex: int = None, chromaticCorrector: aberration.ChromaticCorrector = None ) -> np.ndarray: if zIndex is None: return np.array([[self.get_processed_image( fov, self.dataSet.get_data_organization() .get_data_channel_for_bit(b), zIndex, chromaticCorrector) for zIndex in range(len(self.dataSet.get_z_positions()))] for b in self.get_codebook().get_bit_names()]) else: return np.array([self.get_processed_image( fov, self.dataSet.get_data_organization() .get_data_channel_for_bit(b), zIndex, chromaticCorrector) for b in self.get_codebook().get_bit_names()])
[docs] def get_processed_image( self, fov: int, dataChannel: int, zIndex: int, chromaticCorrector: aberration.ChromaticCorrector = None ) -> np.ndarray: inputImage = self.warpTask.get_aligned_image(fov, dataChannel, zIndex, chromaticCorrector) return self._preprocess_image(inputImage)
def _run_analysis(self, fragmentIndex): warpTask = self.dataSet.load_analysis_task( self.parameters['warp_task']) histogramBins = np.arange(0, np.iinfo(np.uint16).max, 1) pixelHistogram = np.zeros( (self.get_codebook().get_bit_count(), len(histogramBins)-1)) # this currently only is to calculate the pixel histograms in order # to estimate the initial scale factors. This is likely unnecessary for bi, b in enumerate(self.get_codebook().get_bit_names()): dataChannel = self.dataSet.get_data_organization()\ .get_data_channel_for_bit(b) for i in range(len(self.dataSet.get_z_positions())): inputImage = warpTask.get_aligned_image( fragmentIndex, dataChannel, i) deconvolvedImage = self._preprocess_image(inputImage) pixelHistogram[bi, :] += np.histogram( deconvolvedImage, bins=histogramBins)[0] self._save_pixel_histogram(pixelHistogram, fragmentIndex) def _preprocess_image(self, inputImage: np.ndarray) -> np.ndarray: highPassFilterSize = int(2 * np.ceil(2 * self._highPassSigma) + 1) deconFilterSize = self.parameters['decon_filter_size'] filteredImage = inputImage.astype(float) - cv2.GaussianBlur( inputImage, (highPassFilterSize, highPassFilterSize), self._highPassSigma, borderType=cv2.BORDER_REPLICATE) filteredImage[filteredImage < 0] = 0 deconvolvedImage = deconvolve.deconvolve_lucyrichardson( filteredImage, deconFilterSize, self._deconSigma, self._deconIterations).astype(np.uint16) return deconvolvedImage