Source code for merlin.analysis.filterbarcodes

import numpy as np
import pandas
from scipy import optimize

from merlin.core import analysistask
from merlin.data.codebook import Codebook
from merlin.analysis import decode


[docs]class FilterBarcodes(decode.BarcodeSavingParallelAnalysisTask): """ An analysis task that filters barcodes based on area and mean intensity. """ def __init__(self, dataSet, parameters=None, analysisName=None): super().__init__(dataSet, parameters, analysisName) if 'area_threshold' not in self.parameters: self.parameters['area_threshold'] = 3 if 'intensity_threshold' not in self.parameters: self.parameters['intensity_threshold'] = 200 if 'distance_threshold' not in self.parameters: self.parameters['distance_threshold'] = 1e6
[docs] def fragment_count(self): return len(self.dataSet.get_fovs())
[docs] def get_estimated_memory(self): return 1000
[docs] def get_estimated_time(self): return 30
[docs] def get_dependencies(self): return [self.parameters['decode_task']]
[docs] def get_codebook(self): decodeTask = self.dataSet.load_analysis_task( self.parameters['decode_task']) return decodeTask.get_codebook()
def _run_analysis(self, fragmentIndex): decodeTask = self.dataSet.load_analysis_task( self.parameters['decode_task']) areaThreshold = self.parameters['area_threshold'] intensityThreshold = self.parameters['intensity_threshold'] distanceThreshold = self.parameters['distance_threshold'] barcodeDB = self.get_barcode_database() currentBC = decodeTask.get_barcode_database() \ .get_filtered_barcodes(areaThreshold, intensityThreshold, distanceThreshold=distanceThreshold, fov=fragmentIndex) barcodeDB.write_barcodes(currentBC, fov=fragmentIndex)
[docs]class GenerateAdaptiveThreshold(analysistask.AnalysisTask): """ An analysis task that generates a three-dimension mean intenisty, area, minimum distance histogram for barcodes as they are decoded. """ def __init__(self, dataSet, parameters=None, analysisName=None): super().__init__(dataSet, parameters, analysisName) # ensure decode_task is specified decodeTask = self.parameters['decode_task']
[docs] def fragment_count(self): return len(self.dataSet.get_fovs())
[docs] def get_estimated_memory(self): return 5000
[docs] def get_estimated_time(self): return 1800
[docs] def get_dependencies(self): return [self.parameters['run_after_task']]
[docs] def get_blank_count_histogram(self) -> np.ndarray: return self.dataSet.load_numpy_analysis_result('blank_counts', self)
[docs] def get_coding_count_histogram(self) -> np.ndarray: return self.dataSet.load_numpy_analysis_result('coding_counts', self)
[docs] def get_total_count_histogram(self) -> np.ndarray: return self.get_blank_count_histogram() \ + self.get_coding_count_histogram()
[docs] def get_area_bins(self) -> np.ndarray: return self.dataSet.load_numpy_analysis_result('area_bins', self)
[docs] def get_distance_bins(self) -> np.ndarray: return self.dataSet.load_numpy_analysis_result( 'distance_bins', self)
[docs] def get_intensity_bins(self) -> np.ndarray: return self.dataSet.load_numpy_analysis_result( 'intensity_bins', self, None)
[docs] def get_blank_fraction_histogram(self) -> np.ndarray: """ Get the normalized blank fraction histogram indicating the normalized blank fraction for each intensity, distance, and area bin. Returns: The normalized blank fraction histogram. The histogram has three dimensions: mean intensity, minimum distance, and area. The bins in each dimension are defined by the bins returned by get_area_bins, get_distance_bins, and get_area_bins, respectively. Each entry indicates the number of blank barcodes divided by the number of coding barcodes within the corresponding bin normalized by the fraction of blank barcodes in the codebook. With this normalization, when all (both blank and coding) barcodes are selected with equal probability, the blank fraction is expected to be 1. """ blankHistogram = self.get_blank_count_histogram() totalHistogram = self.get_coding_count_histogram() blankFraction = blankHistogram / totalHistogram blankFraction[totalHistogram == 0] = np.finfo(blankFraction.dtype).max decodeTask = self.dataSet.load_analysis_task( self.parameters['decode_task']) codebook = decodeTask.get_codebook() blankBarcodeCount = len(codebook.get_blank_indexes()) codingBarcodeCount = len(codebook.get_coding_indexes()) blankFraction /= blankBarcodeCount/( blankBarcodeCount + codingBarcodeCount) return blankFraction
[docs] def calculate_misidentification_rate_for_threshold( self, threshold: float) -> float: """ Calculate the misidentification rate for a specified blank fraction threshold. Args: threshold: the normalized blank fraction threshold Returns: The estimated misidentification rate, estimated as the number of blank barcodes per blank barcode divided by the number of coding barcodes per coding barcode. """ decodeTask = self.dataSet.load_analysis_task( self.parameters['decode_task']) codebook = decodeTask.get_codebook() blankBarcodeCount = len(codebook.get_blank_indexes()) codingBarcodeCount = len(codebook.get_coding_indexes()) blankHistogram = self.get_blank_count_histogram() codingHistogram = self.get_coding_count_histogram() blankFraction = self.get_blank_fraction_histogram() selectBins = blankFraction < threshold codingCounts = np.sum(codingHistogram[selectBins]) blankCounts = np.sum(blankHistogram[selectBins]) return ((blankCounts/blankBarcodeCount) / (codingCounts/codingBarcodeCount))
[docs] def calculate_threshold_for_misidentification_rate( self, targetMisidentificationRate: float) -> float: """ Calculate the normalized blank fraction threshold that achieves a specified misidentification rate. Args: targetMisidentificationRate: the target misidentification rate Returns: the normalized blank fraction threshold that achieves targetMisidentificationRate """ def misidentification_rate_error_for_threshold(x, targetError): return self.calculate_misidentification_rate_for_threshold(x) \ - targetError return optimize.newton( misidentification_rate_error_for_threshold, 0.2, args=[targetMisidentificationRate], tol=0.001, x1=0.3, disp=False)
[docs] def calculate_barcode_count_for_threshold(self, threshold: float) -> float: """ Calculate the number of barcodes remaining after applying the specified normalized blank fraction threshold. Args: threshold: the normalized blank fraction threshold Returns: The number of barcodes passing the threshold. """ blankHistogram = self.get_blank_count_histogram() codingHistogram = self.get_coding_count_histogram() blankFraction = self.get_blank_fraction_histogram() return np.sum(blankHistogram[blankFraction < threshold]) \ + np.sum(codingHistogram[blankFraction < threshold])
[docs] def extract_barcodes_with_threshold(self, blankThreshold: float, barcodeSet: pandas.DataFrame ) -> pandas.DataFrame: selectData = barcodeSet[ ['mean_intensity', 'min_distance', 'area']].values selectData[:, 0] = np.log10(selectData[:, 0]) blankFractionHistogram = self.get_blank_fraction_histogram() barcodeBins = np.array( (np.digitize(selectData[:, 0], self.get_intensity_bins(), right=True), np.digitize(selectData[:, 1], self.get_distance_bins(), right=True), np.digitize(selectData[:, 2], self.get_area_bins()))) - 1 barcodeBins[0, :] = np.clip( barcodeBins[0, :], 0, blankFractionHistogram.shape[0]-1) barcodeBins[1, :] = np.clip( barcodeBins[1, :], 0, blankFractionHistogram.shape[1]-1) barcodeBins[2, :] = np.clip( barcodeBins[2, :], 0, blankFractionHistogram.shape[2]-1) raveledIndexes = np.ravel_multi_index( barcodeBins[:, :], blankFractionHistogram.shape) thresholdedBlankFraction = blankFractionHistogram < blankThreshold return barcodeSet[np.take(thresholdedBlankFraction, raveledIndexes)]
@staticmethod def _extract_counts(barcodes, intensityBins, distanceBins, areaBins): barcodeData = barcodes[ ['mean_intensity', 'min_distance', 'area']].values barcodeData[:, 0] = np.log10(barcodeData[:, 0]) return np.histogramdd( barcodeData, bins=(intensityBins, distanceBins, areaBins))[0] def _run_analysis(self): decodeTask = self.dataSet.load_analysis_task( self.parameters['decode_task']) codebook = decodeTask.get_codebook() barcodeDB = decodeTask.get_barcode_database() completeFragments = \ self.dataSet.load_numpy_analysis_result_if_available( 'complete_fragments', self, [False]*self.fragment_count()) pendingFragments = [ decodeTask.is_complete(i) and not completeFragments[i] for i in range(self.fragment_count())] areaBins = self.dataSet.load_numpy_analysis_result_if_available( 'area_bins', self, np.arange(1, 35)) distanceBins = self.dataSet.load_numpy_analysis_result_if_available( 'distance_bins', self, np.arange( 0, decodeTask.parameters['distance_threshold']+0.02, 0.01)) intensityBins = self.dataSet.load_numpy_analysis_result_if_available( 'intensity_bins', self, None) blankCounts = self.dataSet.load_numpy_analysis_result_if_available( 'blank_counts', self, None) codingCounts = self.dataSet.load_numpy_analysis_result_if_available( 'coding_counts', self, None) self.dataSet.save_numpy_analysis_result( areaBins, 'area_bins', self) self.dataSet.save_numpy_analysis_result( distanceBins, 'distance_bins', self) updated = False while not all(completeFragments): if intensityBins is None: for i in range(self.fragment_count()): if not pendingFragments[i] and decodeTask.is_complete(i): pendingFragments[i] = decodeTask.is_complete(i) if np.sum(pendingFragments) >= min(20, self.fragment_count()): def extreme_values(inputData: pandas.Series): return inputData.min(), inputData.max() sampledFragments = np.random.choice( [i for i, p in enumerate(pendingFragments) if p], size=20) intensityExtremes = [ extreme_values(barcodeDB.get_barcodes( i, columnList=['mean_intensity'])['mean_intensity']) for i in sampledFragments] maxIntensity = np.log10( np.max([x[1] for x in intensityExtremes])) intensityBins = np.arange(0, 2 * maxIntensity, maxIntensity / 100) self.dataSet.save_numpy_analysis_result( intensityBins, 'intensity_bins', self) blankCounts = np.zeros((len(intensityBins)-1, len(distanceBins)-1, len(areaBins)-1)) codingCounts = np.zeros((len(intensityBins)-1, len(distanceBins)-1, len(areaBins)-1)) else: for i in range(self.fragment_count()): if not completeFragments[i] and decodeTask.is_complete(i): barcodes = barcodeDB.get_barcodes( i, columnList=['barcode_id', 'mean_intensity', 'min_distance', 'area']) blankCounts += self._extract_counts( barcodes[barcodes['barcode_id'].isin( codebook.get_blank_indexes())], intensityBins, distanceBins, areaBins) codingCounts += self._extract_counts( barcodes[barcodes['barcode_id'].isin( codebook.get_coding_indexes())], intensityBins, distanceBins, areaBins) updated = True completeFragments[i] = True if updated: self.dataSet.save_numpy_analysis_result( completeFragments, 'complete_fragments', self) self.dataSet.save_numpy_analysis_result( blankCounts, 'blank_counts', self) self.dataSet.save_numpy_analysis_result( codingCounts, 'coding_counts', self)
[docs]class AdaptiveFilterBarcodes(decode.BarcodeSavingParallelAnalysisTask): """ An analysis task that filters barcodes based on a mean intensity threshold for each area based on the abundance of blank barcodes. The threshold is selected to achieve a specified misidentification rate. """ def __init__(self, dataSet, parameters=None, analysisName=None): super().__init__(dataSet, parameters, analysisName) if 'misidentification_rate' not in self.parameters: self.parameters['misidentification_rate'] = 0.05
[docs] def fragment_count(self): return len(self.dataSet.get_fovs())
[docs] def get_estimated_memory(self): return 1000
[docs] def get_estimated_time(self): return 30
[docs] def get_dependencies(self): return [self.parameters['adaptive_task'], self.parameters['decode_task']]
[docs] def get_adaptive_thresholds(self): """ Get the adaptive thresholds used for filtering barcodes. Returns: The GenerateaAdaptiveThershold task using for this adaptive filter. """ return self.dataSet.load_analysis_task( self.parameters['adaptive_task'])
[docs] def get_codebook(self) -> Codebook: decodeTask = self.dataSet.load_analysis_task( self.parameters['decode_task']) return decodeTask.get_codebook()
def _run_analysis(self, fragmentIndex): adaptiveTask = self.dataSet.load_analysis_task( self.parameters['adaptive_task']) decodeTask = self.dataSet.load_analysis_task( self.parameters['decode_task']) threshold = adaptiveTask.calculate_threshold_for_misidentification_rate( self.parameters['misidentification_rate']) bcDatabase = self.get_barcode_database() currentBarcodes = decodeTask.get_barcode_database()\ .get_barcodes(fragmentIndex) bcDatabase.write_barcodes( adaptiveTask.extract_barcodes_with_threshold( threshold, currentBarcodes), fov=fragmentIndex)