import numpy as np
import pandas
from scipy import optimize
from merlin.core import analysistask
from merlin.data.codebook import Codebook
from merlin.analysis import decode
[docs]class FilterBarcodes(decode.BarcodeSavingParallelAnalysisTask):
"""
An analysis task that filters barcodes based on area and mean
intensity.
"""
def __init__(self, dataSet, parameters=None, analysisName=None):
super().__init__(dataSet, parameters, analysisName)
if 'area_threshold' not in self.parameters:
self.parameters['area_threshold'] = 3
if 'intensity_threshold' not in self.parameters:
self.parameters['intensity_threshold'] = 200
if 'distance_threshold' not in self.parameters:
self.parameters['distance_threshold'] = 1e6
[docs] def fragment_count(self):
return len(self.dataSet.get_fovs())
[docs] def get_estimated_memory(self):
return 1000
[docs] def get_estimated_time(self):
return 30
[docs] def get_dependencies(self):
return [self.parameters['decode_task']]
[docs] def get_codebook(self):
decodeTask = self.dataSet.load_analysis_task(
self.parameters['decode_task'])
return decodeTask.get_codebook()
def _run_analysis(self, fragmentIndex):
decodeTask = self.dataSet.load_analysis_task(
self.parameters['decode_task'])
areaThreshold = self.parameters['area_threshold']
intensityThreshold = self.parameters['intensity_threshold']
distanceThreshold = self.parameters['distance_threshold']
barcodeDB = self.get_barcode_database()
currentBC = decodeTask.get_barcode_database() \
.get_filtered_barcodes(areaThreshold, intensityThreshold,
distanceThreshold=distanceThreshold,
fov=fragmentIndex)
barcodeDB.write_barcodes(currentBC, fov=fragmentIndex)
[docs]class GenerateAdaptiveThreshold(analysistask.AnalysisTask):
"""
An analysis task that generates a three-dimension mean intenisty,
area, minimum distance histogram for barcodes as they are decoded.
"""
def __init__(self, dataSet, parameters=None, analysisName=None):
super().__init__(dataSet, parameters, analysisName)
# ensure decode_task is specified
decodeTask = self.parameters['decode_task']
[docs] def fragment_count(self):
return len(self.dataSet.get_fovs())
[docs] def get_estimated_memory(self):
return 5000
[docs] def get_estimated_time(self):
return 1800
[docs] def get_dependencies(self):
return [self.parameters['run_after_task']]
[docs] def get_blank_count_histogram(self) -> np.ndarray:
return self.dataSet.load_numpy_analysis_result('blank_counts', self)
[docs] def get_coding_count_histogram(self) -> np.ndarray:
return self.dataSet.load_numpy_analysis_result('coding_counts', self)
[docs] def get_total_count_histogram(self) -> np.ndarray:
return self.get_blank_count_histogram() \
+ self.get_coding_count_histogram()
[docs] def get_area_bins(self) -> np.ndarray:
return self.dataSet.load_numpy_analysis_result('area_bins', self)
[docs] def get_distance_bins(self) -> np.ndarray:
return self.dataSet.load_numpy_analysis_result(
'distance_bins', self)
[docs] def get_intensity_bins(self) -> np.ndarray:
return self.dataSet.load_numpy_analysis_result(
'intensity_bins', self, None)
[docs] def get_blank_fraction_histogram(self) -> np.ndarray:
""" Get the normalized blank fraction histogram indicating the
normalized blank fraction for each intensity, distance, and area
bin.
Returns: The normalized blank fraction histogram. The histogram
has three dimensions: mean intensity, minimum distance, and area.
The bins in each dimension are defined by the bins returned by
get_area_bins, get_distance_bins, and get_area_bins, respectively.
Each entry indicates the number of blank barcodes divided by the
number of coding barcodes within the corresponding bin
normalized by the fraction of blank barcodes in the codebook.
With this normalization, when all (both blank and coding) barcodes
are selected with equal probability, the blank fraction is
expected to be 1.
"""
blankHistogram = self.get_blank_count_histogram()
totalHistogram = self.get_coding_count_histogram()
blankFraction = blankHistogram / totalHistogram
blankFraction[totalHistogram == 0] = np.finfo(blankFraction.dtype).max
decodeTask = self.dataSet.load_analysis_task(
self.parameters['decode_task'])
codebook = decodeTask.get_codebook()
blankBarcodeCount = len(codebook.get_blank_indexes())
codingBarcodeCount = len(codebook.get_coding_indexes())
blankFraction /= blankBarcodeCount/(
blankBarcodeCount + codingBarcodeCount)
return blankFraction
[docs] def calculate_misidentification_rate_for_threshold(
self, threshold: float) -> float:
""" Calculate the misidentification rate for a specified blank
fraction threshold.
Args:
threshold: the normalized blank fraction threshold
Returns: The estimated misidentification rate, estimated as the
number of blank barcodes per blank barcode divided
by the number of coding barcodes per coding barcode.
"""
decodeTask = self.dataSet.load_analysis_task(
self.parameters['decode_task'])
codebook = decodeTask.get_codebook()
blankBarcodeCount = len(codebook.get_blank_indexes())
codingBarcodeCount = len(codebook.get_coding_indexes())
blankHistogram = self.get_blank_count_histogram()
codingHistogram = self.get_coding_count_histogram()
blankFraction = self.get_blank_fraction_histogram()
selectBins = blankFraction < threshold
codingCounts = np.sum(codingHistogram[selectBins])
blankCounts = np.sum(blankHistogram[selectBins])
return ((blankCounts/blankBarcodeCount) /
(codingCounts/codingBarcodeCount))
[docs] def calculate_threshold_for_misidentification_rate(
self, targetMisidentificationRate: float) -> float:
""" Calculate the normalized blank fraction threshold that achieves
a specified misidentification rate.
Args:
targetMisidentificationRate: the target misidentification rate
Returns: the normalized blank fraction threshold that achieves
targetMisidentificationRate
"""
def misidentification_rate_error_for_threshold(x, targetError):
return self.calculate_misidentification_rate_for_threshold(x) \
- targetError
return optimize.newton(
misidentification_rate_error_for_threshold, 0.2,
args=[targetMisidentificationRate], tol=0.001, x1=0.3, disp=False)
[docs] def calculate_barcode_count_for_threshold(self, threshold: float) -> float:
""" Calculate the number of barcodes remaining after applying
the specified normalized blank fraction threshold.
Args:
threshold: the normalized blank fraction threshold
Returns: The number of barcodes passing the threshold.
"""
blankHistogram = self.get_blank_count_histogram()
codingHistogram = self.get_coding_count_histogram()
blankFraction = self.get_blank_fraction_histogram()
return np.sum(blankHistogram[blankFraction < threshold]) \
+ np.sum(codingHistogram[blankFraction < threshold])
@staticmethod
def _extract_counts(barcodes, intensityBins, distanceBins, areaBins):
barcodeData = barcodes[
['mean_intensity', 'min_distance', 'area']].values
barcodeData[:, 0] = np.log10(barcodeData[:, 0])
return np.histogramdd(
barcodeData, bins=(intensityBins, distanceBins, areaBins))[0]
def _run_analysis(self):
decodeTask = self.dataSet.load_analysis_task(
self.parameters['decode_task'])
codebook = decodeTask.get_codebook()
barcodeDB = decodeTask.get_barcode_database()
completeFragments = \
self.dataSet.load_numpy_analysis_result_if_available(
'complete_fragments', self, [False]*self.fragment_count())
pendingFragments = [
decodeTask.is_complete(i) and not completeFragments[i]
for i in range(self.fragment_count())]
areaBins = self.dataSet.load_numpy_analysis_result_if_available(
'area_bins', self, np.arange(1, 35))
distanceBins = self.dataSet.load_numpy_analysis_result_if_available(
'distance_bins', self,
np.arange(
0, decodeTask.parameters['distance_threshold']+0.02, 0.01))
intensityBins = self.dataSet.load_numpy_analysis_result_if_available(
'intensity_bins', self, None)
blankCounts = self.dataSet.load_numpy_analysis_result_if_available(
'blank_counts', self, None)
codingCounts = self.dataSet.load_numpy_analysis_result_if_available(
'coding_counts', self, None)
self.dataSet.save_numpy_analysis_result(
areaBins, 'area_bins', self)
self.dataSet.save_numpy_analysis_result(
distanceBins, 'distance_bins', self)
updated = False
while not all(completeFragments):
if intensityBins is None:
for i in range(self.fragment_count()):
if not pendingFragments[i] and decodeTask.is_complete(i):
pendingFragments[i] = decodeTask.is_complete(i)
if np.sum(pendingFragments) >= min(20, self.fragment_count()):
def extreme_values(inputData: pandas.Series):
return inputData.min(), inputData.max()
sampledFragments = np.random.choice(
[i for i, p in enumerate(pendingFragments) if p],
size=20)
intensityExtremes = [
extreme_values(barcodeDB.get_barcodes(
i, columnList=['mean_intensity'])['mean_intensity'])
for i in sampledFragments]
maxIntensity = np.log10(
np.max([x[1] for x in intensityExtremes]))
intensityBins = np.arange(0, 2 * maxIntensity,
maxIntensity / 100)
self.dataSet.save_numpy_analysis_result(
intensityBins, 'intensity_bins', self)
blankCounts = np.zeros((len(intensityBins)-1,
len(distanceBins)-1,
len(areaBins)-1))
codingCounts = np.zeros((len(intensityBins)-1,
len(distanceBins)-1,
len(areaBins)-1))
else:
for i in range(self.fragment_count()):
if not completeFragments[i] and decodeTask.is_complete(i):
barcodes = barcodeDB.get_barcodes(
i, columnList=['barcode_id', 'mean_intensity',
'min_distance', 'area'])
blankCounts += self._extract_counts(
barcodes[barcodes['barcode_id'].isin(
codebook.get_blank_indexes())],
intensityBins, distanceBins, areaBins)
codingCounts += self._extract_counts(
barcodes[barcodes['barcode_id'].isin(
codebook.get_coding_indexes())],
intensityBins, distanceBins, areaBins)
updated = True
completeFragments[i] = True
if updated:
self.dataSet.save_numpy_analysis_result(
completeFragments, 'complete_fragments', self)
self.dataSet.save_numpy_analysis_result(
blankCounts, 'blank_counts', self)
self.dataSet.save_numpy_analysis_result(
codingCounts, 'coding_counts', self)
[docs]class AdaptiveFilterBarcodes(decode.BarcodeSavingParallelAnalysisTask):
"""
An analysis task that filters barcodes based on a mean intensity threshold
for each area based on the abundance of blank barcodes. The threshold
is selected to achieve a specified misidentification rate.
"""
def __init__(self, dataSet, parameters=None, analysisName=None):
super().__init__(dataSet, parameters, analysisName)
if 'misidentification_rate' not in self.parameters:
self.parameters['misidentification_rate'] = 0.05
[docs] def fragment_count(self):
return len(self.dataSet.get_fovs())
[docs] def get_estimated_memory(self):
return 1000
[docs] def get_estimated_time(self):
return 30
[docs] def get_dependencies(self):
return [self.parameters['adaptive_task'],
self.parameters['decode_task']]
[docs] def get_adaptive_thresholds(self):
""" Get the adaptive thresholds used for filtering barcodes.
Returns: The GenerateaAdaptiveThershold task using for this
adaptive filter.
"""
return self.dataSet.load_analysis_task(
self.parameters['adaptive_task'])
[docs] def get_codebook(self) -> Codebook:
decodeTask = self.dataSet.load_analysis_task(
self.parameters['decode_task'])
return decodeTask.get_codebook()
def _run_analysis(self, fragmentIndex):
adaptiveTask = self.dataSet.load_analysis_task(
self.parameters['adaptive_task'])
decodeTask = self.dataSet.load_analysis_task(
self.parameters['decode_task'])
threshold = adaptiveTask.calculate_threshold_for_misidentification_rate(
self.parameters['misidentification_rate'])
bcDatabase = self.get_barcode_database()
currentBarcodes = decodeTask.get_barcode_database()\
.get_barcodes(fragmentIndex)
bcDatabase.write_barcodes(
adaptiveTask.extract_barcodes_with_threshold(
threshold, currentBarcodes), fov=fragmentIndex)