Glow PySpark Functions¶
Glow includes a number of functions that operate on PySpark columns. These functions are interoperable with functions provided by PySpark or other libraries.
-
glow.functions.add_struct_fields(struct: Union[pyspark.sql.column.Column, str], *fields) → pyspark.sql.column.Column[source]¶ Adds fields to a struct.
Added in version 0.3.0.
Examples
>>> df = spark.createDataFrame([Row(struct=Row(a=1))]) >>> df.select(glow.add_struct_fields('struct', lit('b'), lit(2)).alias('struct')).collect() [Row(struct=Row(a=1, b=2))]
- Parameters
struct – The struct to which fields will be added
fields – The new fields to add. The arguments must alternate between string-typed literal field names and field values.
- Returns
A struct consisting of the input struct and the added fields
-
glow.functions.array_summary_stats(arr: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Computes the minimum, maximum, mean, standard deviation for an array of numerics.
Added in version 0.3.0.
Examples
>>> df = spark.createDataFrame([Row(arr=[1, 2, 3])]) >>> df.select(glow.expand_struct(glow.array_summary_stats('arr'))).collect() [Row(mean=2.0, stdDev=1.0, min=1.0, max=3.0)]
- Parameters
arr – An array of any numeric type
- Returns
A struct containing double
mean,stdDev,min, andmaxfields
-
glow.functions.array_to_dense_vector(arr: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Converts an array of numerics into a
spark.mlDenseVector.Added in version 0.3.0.
Examples
>>> from pyspark.ml.linalg import DenseVector >>> df = spark.createDataFrame([Row(arr=[1, 2, 3])]) >>> df.select(glow.array_to_dense_vector('arr').alias('v')).collect() [Row(v=DenseVector([1.0, 2.0, 3.0]))]
- Parameters
arr – The array of numerics
- Returns
A
spark.mlDenseVector
-
glow.functions.array_to_sparse_vector(arr: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Converts an array of numerics into a
spark.mlSparseVector.Added in version 0.3.0.
Examples
>>> from pyspark.ml.linalg import SparseVector >>> df = spark.createDataFrame([Row(arr=[1, 0, 2, 0, 3, 0])]) >>> df.select(glow.array_to_sparse_vector('arr').alias('v')).collect() [Row(v=SparseVector(6, {0: 1.0, 2: 2.0, 4: 3.0}))]
- Parameters
arr – The array of numerics
- Returns
A
spark.mlSparseVector
-
glow.functions.call_summary_stats(genotypes: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Computes call summary statistics for an array of genotype structs. See Variant Quality Control for more details.
Added in version 0.3.0.
Examples
>>> schema = 'genotypes: array<struct<calls: array<int>>>' >>> df = spark.createDataFrame([Row(genotypes=[Row(calls=[0, 0]), Row(calls=[1, 0]), Row(calls=[1, 1])])], schema) >>> df.select(glow.expand_struct(glow.call_summary_stats('genotypes'))).collect() [Row(callRate=1.0, nCalled=3, nUncalled=0, nHet=1, nHomozygous=[1, 1], nNonRef=2, nAllelesCalled=6, alleleCounts=[3, 3], alleleFrequencies=[0.5, 0.5])]
- Parameters
genotypes – The array of genotype structs with
callsfield- Returns
A struct containing
callRate,nCalled,nUncalled,nHet,nHomozygous,nNonRef,nAllelesCalled,alleleCounts,alleleFrequenciesfields. See Variant Quality Control.
-
glow.functions.dp_summary_stats(genotypes: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Computes summary statistics for the depth field from an array of genotype structs. See Variant Quality Control.
Added in version 0.3.0.
Examples
>>> df = spark.createDataFrame([Row(genotypes=[Row(depth=1), Row(depth=2), Row(depth=3)])], 'genotypes: array<struct<depth: int>>') >>> df.select(glow.expand_struct(glow.dp_summary_stats('genotypes'))).collect() [Row(mean=2.0, stdDev=1.0, min=1.0, max=3.0)]
- Parameters
genotypes – An array of genotype structs with
depthfield- Returns
A struct containing
mean,stdDev,min, andmaxof genotype depths
-
glow.functions.expand_struct(struct: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Promotes fields of a nested struct to top-level columns similar to using
struct.*from SQL, but can be used in more contexts.Added in version 0.3.0.
Examples
>>> df = spark.createDataFrame([Row(struct=Row(a=1, b=2))]) >>> df.select(glow.expand_struct(col('struct'))).collect() [Row(a=1, b=2)]
- Parameters
struct – The struct to expand
- Returns
Columns corresponding to fields of the input struct
-
glow.functions.explode_matrix(matrix: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Explodes a
spark.mlMatrix(sparse or dense) into multiple arrays, one per row of the matrix.Added in version 0.3.0.
Examples
>>> from pyspark.ml.linalg import DenseMatrix >>> m = DenseMatrix(numRows=3, numCols=2, values=[1, 2, 3, 4, 5, 6]) >>> df = spark.createDataFrame([Row(matrix=m)]) >>> df.select(glow.explode_matrix('matrix').alias('row')).collect() [Row(row=[1.0, 4.0]), Row(row=[2.0, 5.0]), Row(row=[3.0, 6.0])]
- Parameters
matrix – The
sparl.mlMatrixto explode- Returns
An array column in which each row is a row of the input matrix
-
glow.functions.genotype_states(genotypes: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Gets the number of alternate alleles for an array of genotype structs. Returns
-1if there are any-1s (no-calls) in the calls array.Added in version 0.3.0.
Examples
>>> genotypes = [ ... Row(calls=[1, 1]), ... Row(calls=[1, 0]), ... Row(calls=[0, 0]), ... Row(calls=[-1, -1])] >>> df = spark.createDataFrame([Row(genotypes=genotypes)], 'genotypes: array<struct<calls: array<int>>>') >>> df.select(glow.genotype_states('genotypes').alias('states')).collect() [Row(states=[2, 1, 0, -1])]
- Parameters
genotypes – An array of genotype structs with
callsfield- Returns
An array of integers containing the number of alternate alleles in each call array
-
glow.functions.gq_summary_stats(genotypes: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Computes summary statistics about the genotype quality field for an array of genotype structs. See Variant Quality Control.
Added in version 0.3.0.
Examples
>>> genotypes = [ ... Row(conditionalQuality=1), ... Row(conditionalQuality=2), ... Row(conditionalQuality=3)] >>> df = spark.createDataFrame([Row(genotypes=genotypes)], 'genotypes: array<struct<conditionalQuality: int>>') >>> df.select(glow.expand_struct(glow.gq_summary_stats('genotypes'))).collect() [Row(mean=2.0, stdDev=1.0, min=1.0, max=3.0)]
- Parameters
genotypes – The array of genotype structs with
conditionalQualityfield- Returns
A struct containing
mean,stdDev,min, andmaxof genotype qualities
-
glow.functions.hard_calls(probabilities: Union[pyspark.sql.column.Column, str], numAlts: Union[pyspark.sql.column.Column, str], phased: Union[pyspark.sql.column.Column, str], threshold: float = None) → pyspark.sql.column.Column[source]¶ Converts an array of probabilities to hard calls. The probabilities are assumed to be diploid. See Variant data transformations for more details.
Added in version 0.3.0.
Examples
>>> df = spark.createDataFrame([Row(probs=[0.95, 0.05, 0.0])]) >>> df.select(glow.hard_calls('probs', numAlts=lit(1), phased=lit(False)).alias('calls')).collect() [Row(calls=[0, 0])] >>> df = spark.createDataFrame([Row(probs=[0.05, 0.95, 0.0])]) >>> df.select(glow.hard_calls('probs', numAlts=lit(1), phased=lit(False)).alias('calls')).collect() [Row(calls=[1, 0])] >>> # Use the threshold parameter to change the minimum probability required for a call >>> df = spark.createDataFrame([Row(probs=[0.05, 0.95, 0.0])]) >>> df.select(glow.hard_calls('probs', numAlts=lit(1), phased=lit(False), threshold=0.99).alias('calls')).collect() [Row(calls=[-1, -1])]
- Parameters
probabilities – The array of probabilities to convert
numAlts – The number of alternate alleles
phased – Whether the probabilities are phased. If phased, we expect one
2 * numAltsvalues in the probabilities array. If unphased, we expect one probability per possible genotype.threshold – The minimum probability to make a call. If no probability falls into the range of
[0, 1 - threshold]or[threshold, 1], a no-call (represented by-1s) will be emitted. If not provided, this parameter defaults to0.9.
- Returns
An array of hard calls
-
glow.functions.hardy_weinberg(genotypes: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Computes statistics relating to the Hardy Weinberg equilibrium. See Variant Quality Control for more details.
Added in version 0.3.0.
Examples
>>> genotypes = [ ... Row(calls=[1, 1]), ... Row(calls=[1, 0]), ... Row(calls=[0, 0])] >>> df = spark.createDataFrame([Row(genotypes=genotypes)], 'genotypes: array<struct<calls: array<int>>>') >>> df.select(glow.expand_struct(glow.hardy_weinberg('genotypes'))).collect() [Row(hetFreqHwe=0.6, pValueHwe=0.7)]
- Parameters
genotypes – The array of genotype structs with
callsfield- Returns
A struct containing two fields,
hetFreqHwe(the expected heterozygous frequency according to Hardy-Weinberg equilibrium) andpValueHwe(the associated p-value)
-
glow.functions.lift_over_coordinates(contigName: Union[pyspark.sql.column.Column, str], start: Union[pyspark.sql.column.Column, str], end: Union[pyspark.sql.column.Column, str], chainFile: str, minMatchRatio: float = None) → pyspark.sql.column.Column[source]¶ Performs liftover for the coordinates of a variant. To perform liftover of alleles and add additional metadata, see Liftover.
Added in version 0.3.0.
Examples
>>> df = spark.read.format('vcf').load('test-data/liftover/unlifted.test.vcf').where('start = 18210071') >>> chain_file = 'test-data/liftover/hg38ToHg19.over.chain.gz' >>> reference_file = 'test-data/liftover/hg19.chr20.fa.gz' >>> df.select('contigName', 'start', 'end').head() Row(contigName='chr20', start=18210071, end=18210072) >>> lifted_df = df.select(glow.expand_struct(glow.lift_over_coordinates('contigName', 'start', 'end', chain_file))) >>> lifted_df.head() Row(contigName='chr20', start=18190715, end=18190716)
- Parameters
contigName – The current contig name
start – The current start
end – The current end
chainFile – Location of the chain file on each node in the cluster
minMatchRatio – Minimum fraction of bases that must remap to do liftover successfully. If not provided, defaults to
0.95.
- Returns
A struct containing
contigName,start, andendfields after liftover
-
glow.functions.linear_regression_gwas(genotypes: Union[pyspark.sql.column.Column, str], phenotypes: Union[pyspark.sql.column.Column, str], covariates: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Performs a linear regression association test optimized for performance in a GWAS setting. See Linear regression for details.
Added in version 0.3.0.
Examples
>>> from pyspark.ml.linalg import DenseMatrix >>> phenotypes = [2, 3, 4] >>> genotypes = [0, 1, 2] >>> covariates = DenseMatrix(numRows=3, numCols=1, values=[1, 1, 1]) >>> df = spark.createDataFrame([Row(genotypes=genotypes, phenotypes=phenotypes, covariates=covariates)]) >>> df.select(glow.expand_struct(glow.linear_regression_gwas('genotypes', 'phenotypes', 'covariates'))).collect() [Row(beta=0.9999999999999998, standardError=1.4901161193847656e-08, pValue=9.486373847239922e-09)]
- Parameters
genotypes – A numeric array of genotypes
phenotypes – A numeric array of phenotypes
covariates – A
spark.mlMatrixof covariates
- Returns
A struct containing
beta,standardError, andpValuefields. See Linear regression.
-
glow.functions.logistic_regression_gwas(genotypes: Union[pyspark.sql.column.Column, str], phenotypes: Union[pyspark.sql.column.Column, str], covariates: Union[pyspark.sql.column.Column, str], test: str, offset: Union[pyspark.sql.column.Column, str] = None) → pyspark.sql.column.Column[source]¶ Performs a logistic regression association test optimized for performance in a GWAS setting. See Logistic regression for more details.
Added in version 0.3.0.
Examples
>>> from pyspark.ml.linalg import DenseMatrix >>> phenotypes = [1, 0, 0, 1, 1] >>> genotypes = [0, 0, 1, 2, 2] >>> covariates = DenseMatrix(numRows=5, numCols=1, values=[1, 1, 1, 1, 1]) >>> offset = [1, 0, 1, 0, 1] >>> df = spark.createDataFrame([Row(genotypes=genotypes, phenotypes=phenotypes, covariates=covariates, offset=offset)]) >>> df.select(glow.expand_struct(glow.logistic_regression_gwas('genotypes', 'phenotypes', 'covariates', 'Firth'))).collect() [Row(beta=0.7418937644793101, oddsRatio=2.09990848346903, waldConfidenceInterval=[0.2509874689201784, 17.569066925598555], pValue=0.3952193664793294)] >>> df.select(glow.expand_struct(glow.logistic_regression_gwas('genotypes', 'phenotypes', 'covariates', 'LRT'))).collect() [Row(beta=1.1658962684583645, oddsRatio=3.208797538802116, waldConfidenceInterval=[0.29709600522888285, 34.65674887513274], pValue=0.2943946848756769)] >>> df.select(glow.expand_struct(glow.logistic_regression_gwas('genotypes', 'phenotypes', 'covariates', 'Firth', 'offset'))).collect() [Row(beta=0.8024832156793392, oddsRatio=2.231074294047771, waldConfidenceInterval=[0.2540891981649045, 19.590334974925725], pValue=0.3754070658316332)] >>> df.select(glow.expand_struct(glow.logistic_regression_gwas('genotypes', 'phenotypes', 'covariates', 'LRT', 'offset'))).collect() [Row(beta=1.1996041727573317, oddsRatio=3.3188029900720117, waldConfidenceInterval=[0.3071189078535928, 35.863807161497334], pValue=0.2857137988674153)]
- Parameters
genotypes – An numeric array of genotypes
phenotypes – A double array of phenotype values
covariates – A
spark.mlMatrixof covariatestest – Which logistic regression test to use. Can be
LRTorFirthoffset – An optional double array of offset values. The offset vector is added with coefficient 1 to the linear predictor term X*b.
- Returns
A struct containing
beta,oddsRatio,waldConfidenceInterval, andpValuefields. See Logistic regression.
-
glow.functions.mean_substitute(array: Union[pyspark.sql.column.Column, str], missingValue: Union[pyspark.sql.column.Column, str] = None) → pyspark.sql.column.Column[source]¶ Substitutes the missing values of a numeric array using the mean of the non-missing values. Any values that are NaN, null or equal to the missing value parameter are considered missing. See Variant data transformations for more details.
Added in version 0.4.0.
Examples
>>> df = spark.createDataFrame([Row(unsubstituted_values=[float('nan'), None, 0.0, 1.0, 2.0, 3.0, 4.0])]) >>> df.select(glow.mean_substitute('unsubstituted_values', lit(0.0)).alias('substituted_values')).collect() [Row(substituted_values=[2.5, 2.5, 2.5, 1.0, 2.0, 3.0, 4.0])] >>> df = spark.createDataFrame([Row(unsubstituted_values=[0, 1, 2, 3, -1, None])]) >>> df.select(glow.mean_substitute('unsubstituted_values').alias('substituted_values')).collect() [Row(substituted_values=[0.0, 1.0, 2.0, 3.0, 1.5, 1.5])]
- Parameters
array – A numeric array that may contain missing values
missingValue – A value that should be considered missing. If not provided, this parameter defaults to
-1.
- Returns
A numeric array with substituted missing values
-
glow.functions.normalize_variant(contigName: Union[pyspark.sql.column.Column, str], start: Union[pyspark.sql.column.Column, str], end: Union[pyspark.sql.column.Column, str], refAllele: Union[pyspark.sql.column.Column, str], altAlleles: Union[pyspark.sql.column.Column, str], refGenomePathString: str) → pyspark.sql.column.Column[source]¶ Normalizes the variant with a behavior similar to vt normalize or bcftools norm. Creates a StructType column including the normalized
start,end,referenceAlleleandalternateAllelesfields (whether they are changed or unchanged as the result of normalization) as well as a StructType field callednormalizationStatusthat contains the following fields:changed: A boolean field indicating whether the variant data was changed as a result of normalizationerrorMessage: An error message in case the attempt at normalizing the row hit an error. In this case, thechangedfield will be set tofalse. If no errors occur, this field will benull.In case of an error, the
start,end,referenceAlleleandalternateAllelesfields in the generated struct will benull.Added in version 0.3.0.
Examples
>>> df = spark.read.format('vcf').load('test-data/variantsplitternormalizer-test/test_left_align_hg38_altered.vcf') >>> ref_genome = 'test-data/variantsplitternormalizer-test/Homo_sapiens_assembly38.20.21_altered.fasta' >>> df.select('contigName', 'start', 'end', 'referenceAllele', 'alternateAlleles').head() Row(contigName='chr20', start=400, end=401, referenceAllele='G', alternateAlleles=['GATCTTCCCTCTTTTCTAATATAAACACATAAAGCTCTGTTTCCTTCTAGGTAACTGGTTTGAG']) >>> normalized_df = df.select('contigName', glow.expand_struct(glow.normalize_variant('contigName', 'start', 'end', 'referenceAllele', 'alternateAlleles', ref_genome))) >>> normalized_df.head() Row(contigName='chr20', start=268, end=269, referenceAllele='A', alternateAlleles=['ATTTGAGATCTTCCCTCTTTTCTAATATAAACACATAAAGCTCTGTTTCCTTCTAGGTAACTGG'], normalizationStatus=Row(changed=True, errorMessage=None))
- Parameters
contigName – The current contig name
start – The current start
end – The current end
refAllele – The current reference allele
altAlleles – The current array of alternate alleles
refGenomePathString – A path to the reference genome
.fastafile. The.fastafile must be accompanied with a.faiindex file in the same folder.
- Returns
A struct as explained above
-
glow.functions.sample_call_summary_stats(genotypes: Union[pyspark.sql.column.Column, str], refAllele: Union[pyspark.sql.column.Column, str], alternateAlleles: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Computes per-sample call summary statistics. See Sample Quality Control for more details.
Added in version 0.3.0.
Examples
>>> sites = [ ... Row(refAllele='C', alternateAlleles=['G'], genotypes=[Row(sampleId='NA12878', calls=[0, 0])]), ... Row(refAllele='A', alternateAlleles=['G'], genotypes=[Row(sampleId='NA12878', calls=[1, 1])]), ... Row(refAllele='AG', alternateAlleles=['A'], genotypes=[Row(sampleId='NA12878', calls=[1, 0])])] >>> df = spark.createDataFrame(sites, 'alternateAlleles: array<string>, genotypes: array<struct<sampleId: string, calls: array<int>>>, refAllele: string') >>> df.select(glow.sample_call_summary_stats('genotypes', 'refAllele', 'alternateAlleles').alias('stats')).collect() [Row(stats=[Row(sampleId='NA12878', callRate=1.0, nCalled=3, nUncalled=0, nHomRef=1, nHet=1, nHomVar=1, nSnp=2, nInsertion=0, nDeletion=1, nTransition=2, nTransversion=0, nSpanningDeletion=0, rTiTv=inf, rInsertionDeletion=0.0, rHetHomVar=1.0)])]
- Parameters
genotypes – An array of genotype structs with
callsfieldrefAllele – The reference allele
alternateAlleles – An array of alternate alleles
- Returns
A struct containing
sampleId,callRate,nCalled,nUncalled,nHomRef,nHet,nHomVar,nSnp,nInsertion,nDeletion,nTransition,nTransversion,nSpanningDeletion,rTiTv,rInsertionDeletion,rHetHomVarfields. See Sample Quality Control.
-
glow.functions.sample_dp_summary_stats(genotypes: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Computes per-sample summary statistics about the depth field in an array of genotype structs.
Added in version 0.3.0.
Examples
>>> sites = [ ... Row(genotypes=[Row(sampleId='NA12878', depth=1)]), ... Row(genotypes=[Row(sampleId='NA12878', depth=2)]), ... Row(genotypes=[Row(sampleId='NA12878', depth=3)])] >>> df = spark.createDataFrame(sites, 'genotypes: array<struct<depth: int, sampleId: string>>') >>> df.select(glow.sample_dp_summary_stats('genotypes').alias('stats')).collect() [Row(stats=[Row(sampleId='NA12878', mean=2.0, stdDev=1.0, min=1.0, max=3.0)])]
- Parameters
genotypes – An array of genotype structs with
depthfield- Returns
An array of structs where each struct contains
mean,stDev,min, andmaxof the genotype depths for a sample. IfsampleIdis present in a genotype, it will be propagated to the resulting struct as an extra field.
-
glow.functions.sample_gq_summary_stats(genotypes: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Computes per-sample summary statistics about the genotype quality field in an array of genotype structs.
Added in version 0.3.0.
Examples
>>> sites = [ ... Row(genotypes=[Row(sampleId='NA12878', conditionalQuality=1)]), ... Row(genotypes=[Row(sampleId='NA12878', conditionalQuality=2)]), ... Row(genotypes=[Row(sampleId='NA12878', conditionalQuality=3)])] >>> df = spark.createDataFrame(sites, 'genotypes: array<struct<conditionalQuality: int, sampleId: string>>') >>> df.select(glow.sample_gq_summary_stats('genotypes').alias('stats')).collect() [Row(stats=[Row(sampleId='NA12878', mean=2.0, stdDev=1.0, min=1.0, max=3.0)])]
- Parameters
genotypes – An array of genotype structs with
conditionalQualityfield- Returns
An array of structs where each struct contains
mean,stDev,min, andmaxof the genotype qualities for a sample. IfsampleIdis present in a genotype, it will be propagated to the resulting struct as an extra field.
-
glow.functions.subset_struct(struct: Union[pyspark.sql.column.Column, str], *fields) → pyspark.sql.column.Column[source]¶ Selects fields from a struct.
Added in version 0.3.0.
Examples
>>> df = spark.createDataFrame([Row(struct=Row(a=1, b=2, c=3))]) >>> df.select(glow.subset_struct('struct', 'a', 'c').alias('struct')).collect() [Row(struct=Row(a=1, c=3))]
- Parameters
struct – Struct from which to select fields
fields – Fields to select
- Returns
A struct containing only the indicated fields
-
glow.functions.vector_to_array(vector: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Converts a
spark.mlVector(sparse or dense) to an array of doubles.Added in version 0.3.0.
Examples
>>> from pyspark.ml.linalg import DenseVector, SparseVector >>> df = spark.createDataFrame([Row(v=SparseVector(3, {0: 1.0, 2: 2.0})), Row(v=DenseVector([3.0, 4.0]))]) >>> df.select(glow.vector_to_array('v').alias('arr')).collect() [Row(arr=[1.0, 0.0, 2.0]), Row(arr=[3.0, 4.0])]
- Parameters
vector – Vector to convert
- Returns
An array of doubles