Glow PySpark Functions¶
Glow includes a number of functions that operate on PySpark columns. These functions are interoperable with functions provided by PySpark or other libraries.
-
glow.functions.
add_struct_fields
(struct: Union[pyspark.sql.column.Column, str], *fields) → pyspark.sql.column.Column[source]¶ Adds fields to a struct.
Added in version 0.3.0.
Examples
>>> df = spark.createDataFrame([Row(struct=Row(a=1))]) >>> df.select(glow.add_struct_fields('struct', lit('b'), lit(2)).alias('struct')).collect() [Row(struct=Row(a=1, b=2))]
- Parameters
struct – The struct to which fields will be added
fields – The new fields to add. The arguments must alternate between string-typed literal field names and field values.
- Returns
A struct consisting of the input struct and the added fields
-
glow.functions.
array_summary_stats
(arr: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Computes the minimum, maximum, mean, standard deviation for an array of numerics.
Added in version 0.3.0.
Examples
>>> df = spark.createDataFrame([Row(arr=[1, 2, 3])]) >>> df.select(glow.expand_struct(glow.array_summary_stats('arr'))).collect() [Row(mean=2.0, stdDev=1.0, min=1.0, max=3.0)]
- Parameters
arr – An array of any numeric type
- Returns
A struct containing double
mean
,stdDev
,min
, andmax
fields
-
glow.functions.
array_to_dense_vector
(arr: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Converts an array of numerics into a
spark.ml
DenseVector
.Added in version 0.3.0.
Examples
>>> from pyspark.ml.linalg import DenseVector >>> df = spark.createDataFrame([Row(arr=[1, 2, 3])]) >>> df.select(glow.array_to_dense_vector('arr').alias('v')).collect() [Row(v=DenseVector([1.0, 2.0, 3.0]))]
- Parameters
arr – The array of numerics
- Returns
A
spark.ml
DenseVector
-
glow.functions.
array_to_sparse_vector
(arr: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Converts an array of numerics into a
spark.ml
SparseVector
.Added in version 0.3.0.
Examples
>>> from pyspark.ml.linalg import SparseVector >>> df = spark.createDataFrame([Row(arr=[1, 0, 2, 0, 3, 0])]) >>> df.select(glow.array_to_sparse_vector('arr').alias('v')).collect() [Row(v=SparseVector(6, {0: 1.0, 2: 2.0, 4: 3.0}))]
- Parameters
arr – The array of numerics
- Returns
A
spark.ml
SparseVector
-
glow.functions.
call_summary_stats
(genotypes: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Computes call summary statistics for an array of genotype structs. See Variant Quality Control for more details.
Added in version 0.3.0.
Examples
>>> schema = 'genotypes: array<struct<calls: array<int>>>' >>> df = spark.createDataFrame([Row(genotypes=[Row(calls=[0, 0]), Row(calls=[1, 0]), Row(calls=[1, 1])])], schema) >>> df.select(glow.expand_struct(glow.call_summary_stats('genotypes'))).collect() [Row(callRate=1.0, nCalled=3, nUncalled=0, nHet=1, nHomozygous=[1, 1], nNonRef=2, nAllelesCalled=6, alleleCounts=[3, 3], alleleFrequencies=[0.5, 0.5])]
- Parameters
genotypes – The array of genotype structs with
calls
field- Returns
A struct containing
callRate
,nCalled
,nUncalled
,nHet
,nHomozygous
,nNonRef
,nAllelesCalled
,alleleCounts
,alleleFrequencies
fields. See Variant Quality Control.
-
glow.functions.
dp_summary_stats
(genotypes: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Computes summary statistics for the depth field from an array of genotype structs. See Variant Quality Control.
Added in version 0.3.0.
Examples
>>> df = spark.createDataFrame([Row(genotypes=[Row(depth=1), Row(depth=2), Row(depth=3)])], 'genotypes: array<struct<depth: int>>') >>> df.select(glow.expand_struct(glow.dp_summary_stats('genotypes'))).collect() [Row(mean=2.0, stdDev=1.0, min=1.0, max=3.0)]
- Parameters
genotypes – An array of genotype structs with
depth
field- Returns
A struct containing
mean
,stdDev
,min
, andmax
of genotype depths
-
glow.functions.
expand_struct
(struct: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Promotes fields of a nested struct to top-level columns similar to using
struct.*
from SQL, but can be used in more contexts.Added in version 0.3.0.
Examples
>>> df = spark.createDataFrame([Row(struct=Row(a=1, b=2))]) >>> df.select(glow.expand_struct(col('struct'))).collect() [Row(a=1, b=2)]
- Parameters
struct – The struct to expand
- Returns
Columns corresponding to fields of the input struct
-
glow.functions.
explode_matrix
(matrix: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Explodes a
spark.ml
Matrix
(sparse or dense) into multiple arrays, one per row of the matrix.Added in version 0.3.0.
Examples
>>> from pyspark.ml.linalg import DenseMatrix >>> m = DenseMatrix(numRows=3, numCols=2, values=[1, 2, 3, 4, 5, 6]) >>> df = spark.createDataFrame([Row(matrix=m)]) >>> df.select(glow.explode_matrix('matrix').alias('row')).collect() [Row(row=[1.0, 4.0]), Row(row=[2.0, 5.0]), Row(row=[3.0, 6.0])]
- Parameters
matrix – The
sparl.ml
Matrix
to explode- Returns
An array column in which each row is a row of the input matrix
-
glow.functions.
genotype_states
(genotypes: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Gets the number of alternate alleles for an array of genotype structs. Returns
-1
if there are any-1
s (no-calls) in the calls array.Added in version 0.3.0.
Examples
>>> genotypes = [ ... Row(calls=[1, 1]), ... Row(calls=[1, 0]), ... Row(calls=[0, 0]), ... Row(calls=[-1, -1])] >>> df = spark.createDataFrame([Row(genotypes=genotypes)], 'genotypes: array<struct<calls: array<int>>>') >>> df.select(glow.genotype_states('genotypes').alias('states')).collect() [Row(states=[2, 1, 0, -1])]
- Parameters
genotypes – An array of genotype structs with
calls
field- Returns
An array of integers containing the number of alternate alleles in each call array
-
glow.functions.
gq_summary_stats
(genotypes: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Computes summary statistics about the genotype quality field for an array of genotype structs. See Variant Quality Control.
Added in version 0.3.0.
Examples
>>> genotypes = [ ... Row(conditionalQuality=1), ... Row(conditionalQuality=2), ... Row(conditionalQuality=3)] >>> df = spark.createDataFrame([Row(genotypes=genotypes)], 'genotypes: array<struct<conditionalQuality: int>>') >>> df.select(glow.expand_struct(glow.gq_summary_stats('genotypes'))).collect() [Row(mean=2.0, stdDev=1.0, min=1.0, max=3.0)]
- Parameters
genotypes – The array of genotype structs with
conditionalQuality
field- Returns
A struct containing
mean
,stdDev
,min
, andmax
of genotype qualities
-
glow.functions.
hard_calls
(probabilities: Union[pyspark.sql.column.Column, str], numAlts: Union[pyspark.sql.column.Column, str], phased: Union[pyspark.sql.column.Column, str], threshold: float = None) → pyspark.sql.column.Column[source]¶ Converts an array of probabilities to hard calls. The probabilities are assumed to be diploid. See Variant data transformations for more details.
Added in version 0.3.0.
Examples
>>> df = spark.createDataFrame([Row(probs=[0.95, 0.05, 0.0])]) >>> df.select(glow.hard_calls('probs', numAlts=lit(1), phased=lit(False)).alias('calls')).collect() [Row(calls=[0, 0])] >>> df = spark.createDataFrame([Row(probs=[0.05, 0.95, 0.0])]) >>> df.select(glow.hard_calls('probs', numAlts=lit(1), phased=lit(False)).alias('calls')).collect() [Row(calls=[1, 0])] >>> # Use the threshold parameter to change the minimum probability required for a call >>> df = spark.createDataFrame([Row(probs=[0.05, 0.95, 0.0])]) >>> df.select(glow.hard_calls('probs', numAlts=lit(1), phased=lit(False), threshold=0.99).alias('calls')).collect() [Row(calls=[-1, -1])]
- Parameters
probabilities – The array of probabilities to convert
numAlts – The number of alternate alleles
phased – Whether the probabilities are phased. If phased, we expect one
2 * numAlts
values in the probabilities array. If unphased, we expect one probability per possible genotype.threshold – The minimum probability to make a call. If no probability falls into the range of
[0, 1 - threshold]
or[threshold, 1]
, a no-call (represented by-1
s) will be emitted. If not provided, this parameter defaults to0.9
.
- Returns
An array of hard calls
-
glow.functions.
hardy_weinberg
(genotypes: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Computes statistics relating to the Hardy Weinberg equilibrium. See Variant Quality Control for more details.
Added in version 0.3.0.
Examples
>>> genotypes = [ ... Row(calls=[1, 1]), ... Row(calls=[1, 0]), ... Row(calls=[0, 0])] >>> df = spark.createDataFrame([Row(genotypes=genotypes)], 'genotypes: array<struct<calls: array<int>>>') >>> df.select(glow.expand_struct(glow.hardy_weinberg('genotypes'))).collect() [Row(hetFreqHwe=0.6, pValueHwe=0.7)]
- Parameters
genotypes – The array of genotype structs with
calls
field- Returns
A struct containing two fields,
hetFreqHwe
(the expected heterozygous frequency according to Hardy-Weinberg equilibrium) andpValueHwe
(the associated p-value)
-
glow.functions.
lift_over_coordinates
(contigName: Union[pyspark.sql.column.Column, str], start: Union[pyspark.sql.column.Column, str], end: Union[pyspark.sql.column.Column, str], chainFile: str, minMatchRatio: float = None) → pyspark.sql.column.Column[source]¶ Performs liftover for the coordinates of a variant. To perform liftover of alleles and add additional metadata, see Liftover.
Added in version 0.3.0.
Examples
>>> df = spark.read.format('vcf').load('test-data/liftover/unlifted.test.vcf').where('start = 18210071') >>> chain_file = 'test-data/liftover/hg38ToHg19.over.chain.gz' >>> reference_file = 'test-data/liftover/hg19.chr20.fa.gz' >>> df.select('contigName', 'start', 'end').head() Row(contigName='chr20', start=18210071, end=18210072) >>> lifted_df = df.select(glow.expand_struct(glow.lift_over_coordinates('contigName', 'start', 'end', chain_file))) >>> lifted_df.head() Row(contigName='chr20', start=18190715, end=18190716)
- Parameters
contigName – The current contig name
start – The current start
end – The current end
chainFile – Location of the chain file on each node in the cluster
minMatchRatio – Minimum fraction of bases that must remap to do liftover successfully. If not provided, defaults to
0.95
.
- Returns
A struct containing
contigName
,start
, andend
fields after liftover
-
glow.functions.
linear_regression_gwas
(genotypes: Union[pyspark.sql.column.Column, str], phenotypes: Union[pyspark.sql.column.Column, str], covariates: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Performs a linear regression association test optimized for performance in a GWAS setting. See Linear regression for details.
Added in version 0.3.0.
Examples
>>> from pyspark.ml.linalg import DenseMatrix >>> phenotypes = [2, 3, 4] >>> genotypes = [0, 1, 2] >>> covariates = DenseMatrix(numRows=3, numCols=1, values=[1, 1, 1]) >>> df = spark.createDataFrame([Row(genotypes=genotypes, phenotypes=phenotypes, covariates=covariates)]) >>> df.select(glow.expand_struct(glow.linear_regression_gwas('genotypes', 'phenotypes', 'covariates'))).collect() [Row(beta=0.9999999999999998, standardError=1.4901161193847656e-08, pValue=9.486373847239922e-09)]
- Parameters
genotypes – A numeric array of genotypes
phenotypes – A numeric array of phenotypes
covariates – A
spark.ml
Matrix
of covariates
- Returns
A struct containing
beta
,standardError
, andpValue
fields. See Linear regression.
-
glow.functions.
logistic_regression_gwas
(genotypes: Union[pyspark.sql.column.Column, str], phenotypes: Union[pyspark.sql.column.Column, str], covariates: Union[pyspark.sql.column.Column, str], test: str, offset: Union[pyspark.sql.column.Column, str] = None) → pyspark.sql.column.Column[source]¶ Performs a logistic regression association test optimized for performance in a GWAS setting. See Logistic regression for more details.
Added in version 0.3.0.
Examples
>>> from pyspark.ml.linalg import DenseMatrix >>> phenotypes = [1, 0, 0, 1, 1] >>> genotypes = [0, 0, 1, 2, 2] >>> covariates = DenseMatrix(numRows=5, numCols=1, values=[1, 1, 1, 1, 1]) >>> offset = [1, 0, 1, 0, 1] >>> df = spark.createDataFrame([Row(genotypes=genotypes, phenotypes=phenotypes, covariates=covariates, offset=offset)]) >>> df.select(glow.expand_struct(glow.logistic_regression_gwas('genotypes', 'phenotypes', 'covariates', 'Firth'))).collect() [Row(beta=0.7418937644793101, oddsRatio=2.09990848346903, waldConfidenceInterval=[0.2509874689201784, 17.569066925598555], pValue=0.3952193664793294)] >>> df.select(glow.expand_struct(glow.logistic_regression_gwas('genotypes', 'phenotypes', 'covariates', 'LRT'))).collect() [Row(beta=1.1658962684583645, oddsRatio=3.208797538802116, waldConfidenceInterval=[0.29709600522888285, 34.65674887513274], pValue=0.2943946848756769)] >>> df.select(glow.expand_struct(glow.logistic_regression_gwas('genotypes', 'phenotypes', 'covariates', 'Firth', 'offset'))).collect() [Row(beta=0.8024832156793392, oddsRatio=2.231074294047771, waldConfidenceInterval=[0.2540891981649045, 19.590334974925725], pValue=0.3754070658316332)] >>> df.select(glow.expand_struct(glow.logistic_regression_gwas('genotypes', 'phenotypes', 'covariates', 'LRT', 'offset'))).collect() [Row(beta=1.1996041727573317, oddsRatio=3.3188029900720117, waldConfidenceInterval=[0.3071189078535928, 35.863807161497334], pValue=0.2857137988674153)]
- Parameters
genotypes – An numeric array of genotypes
phenotypes – A double array of phenotype values
covariates – A
spark.ml
Matrix
of covariatestest – Which logistic regression test to use. Can be
LRT
orFirth
offset – An optional double array of offset values. The offset vector is added with coefficient 1 to the linear predictor term X*b.
- Returns
A struct containing
beta
,oddsRatio
,waldConfidenceInterval
, andpValue
fields. See Logistic regression.
-
glow.functions.
mean_substitute
(array: Union[pyspark.sql.column.Column, str], missingValue: Union[pyspark.sql.column.Column, str] = None) → pyspark.sql.column.Column[source]¶ Substitutes the missing values of a numeric array using the mean of the non-missing values. Any values that are NaN, null or equal to the missing value parameter are considered missing. See Variant data transformations for more details.
Added in version 0.4.0.
Examples
>>> df = spark.createDataFrame([Row(unsubstituted_values=[float('nan'), None, 0.0, 1.0, 2.0, 3.0, 4.0])]) >>> df.select(glow.mean_substitute('unsubstituted_values', lit(0.0)).alias('substituted_values')).collect() [Row(substituted_values=[2.5, 2.5, 2.5, 1.0, 2.0, 3.0, 4.0])] >>> df = spark.createDataFrame([Row(unsubstituted_values=[0, 1, 2, 3, -1, None])]) >>> df.select(glow.mean_substitute('unsubstituted_values').alias('substituted_values')).collect() [Row(substituted_values=[0.0, 1.0, 2.0, 3.0, 1.5, 1.5])]
- Parameters
array – A numeric array that may contain missing values
missingValue – A value that should be considered missing. If not provided, this parameter defaults to
-1
.
- Returns
A numeric array with substituted missing values
-
glow.functions.
normalize_variant
(contigName: Union[pyspark.sql.column.Column, str], start: Union[pyspark.sql.column.Column, str], end: Union[pyspark.sql.column.Column, str], refAllele: Union[pyspark.sql.column.Column, str], altAlleles: Union[pyspark.sql.column.Column, str], refGenomePathString: str) → pyspark.sql.column.Column[source]¶ Normalizes the variant with a behavior similar to vt normalize or bcftools norm. Creates a StructType column including the normalized
start
,end
,referenceAllele
andalternateAlleles
fields (whether they are changed or unchanged as the result of normalization) as well as a StructType field callednormalizationStatus
that contains the following fields:changed
: A boolean field indicating whether the variant data was changed as a result of normalizationerrorMessage
: An error message in case the attempt at normalizing the row hit an error. In this case, thechanged
field will be set tofalse
. If no errors occur, this field will benull
.In case of an error, the
start
,end
,referenceAllele
andalternateAlleles
fields in the generated struct will benull
.Added in version 0.3.0.
Examples
>>> df = spark.read.format('vcf').load('test-data/variantsplitternormalizer-test/test_left_align_hg38_altered.vcf') >>> ref_genome = 'test-data/variantsplitternormalizer-test/Homo_sapiens_assembly38.20.21_altered.fasta' >>> df.select('contigName', 'start', 'end', 'referenceAllele', 'alternateAlleles').head() Row(contigName='chr20', start=400, end=401, referenceAllele='G', alternateAlleles=['GATCTTCCCTCTTTTCTAATATAAACACATAAAGCTCTGTTTCCTTCTAGGTAACTGGTTTGAG']) >>> normalized_df = df.select('contigName', glow.expand_struct(glow.normalize_variant('contigName', 'start', 'end', 'referenceAllele', 'alternateAlleles', ref_genome))) >>> normalized_df.head() Row(contigName='chr20', start=268, end=269, referenceAllele='A', alternateAlleles=['ATTTGAGATCTTCCCTCTTTTCTAATATAAACACATAAAGCTCTGTTTCCTTCTAGGTAACTGG'], normalizationStatus=Row(changed=True, errorMessage=None))
- Parameters
contigName – The current contig name
start – The current start
end – The current end
refAllele – The current reference allele
altAlleles – The current array of alternate alleles
refGenomePathString – A path to the reference genome
.fasta
file. The.fasta
file must be accompanied with a.fai
index file in the same folder.
- Returns
A struct as explained above
-
glow.functions.
sample_call_summary_stats
(genotypes: Union[pyspark.sql.column.Column, str], refAllele: Union[pyspark.sql.column.Column, str], alternateAlleles: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Computes per-sample call summary statistics. See Sample Quality Control for more details.
Added in version 0.3.0.
Examples
>>> sites = [ ... Row(refAllele='C', alternateAlleles=['G'], genotypes=[Row(sampleId='NA12878', calls=[0, 0])]), ... Row(refAllele='A', alternateAlleles=['G'], genotypes=[Row(sampleId='NA12878', calls=[1, 1])]), ... Row(refAllele='AG', alternateAlleles=['A'], genotypes=[Row(sampleId='NA12878', calls=[1, 0])])] >>> df = spark.createDataFrame(sites, 'alternateAlleles: array<string>, genotypes: array<struct<sampleId: string, calls: array<int>>>, refAllele: string') >>> df.select(glow.sample_call_summary_stats('genotypes', 'refAllele', 'alternateAlleles').alias('stats')).collect() [Row(stats=[Row(sampleId='NA12878', callRate=1.0, nCalled=3, nUncalled=0, nHomRef=1, nHet=1, nHomVar=1, nSnp=2, nInsertion=0, nDeletion=1, nTransition=2, nTransversion=0, nSpanningDeletion=0, rTiTv=inf, rInsertionDeletion=0.0, rHetHomVar=1.0)])]
- Parameters
genotypes – An array of genotype structs with
calls
fieldrefAllele – The reference allele
alternateAlleles – An array of alternate alleles
- Returns
A struct containing
sampleId
,callRate
,nCalled
,nUncalled
,nHomRef
,nHet
,nHomVar
,nSnp
,nInsertion
,nDeletion
,nTransition
,nTransversion
,nSpanningDeletion
,rTiTv
,rInsertionDeletion
,rHetHomVar
fields. See Sample Quality Control.
-
glow.functions.
sample_dp_summary_stats
(genotypes: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Computes per-sample summary statistics about the depth field in an array of genotype structs.
Added in version 0.3.0.
Examples
>>> sites = [ ... Row(genotypes=[Row(sampleId='NA12878', depth=1)]), ... Row(genotypes=[Row(sampleId='NA12878', depth=2)]), ... Row(genotypes=[Row(sampleId='NA12878', depth=3)])] >>> df = spark.createDataFrame(sites, 'genotypes: array<struct<depth: int, sampleId: string>>') >>> df.select(glow.sample_dp_summary_stats('genotypes').alias('stats')).collect() [Row(stats=[Row(sampleId='NA12878', mean=2.0, stdDev=1.0, min=1.0, max=3.0)])]
- Parameters
genotypes – An array of genotype structs with
depth
field- Returns
An array of structs where each struct contains
mean
,stDev
,min
, andmax
of the genotype depths for a sample. IfsampleId
is present in a genotype, it will be propagated to the resulting struct as an extra field.
-
glow.functions.
sample_gq_summary_stats
(genotypes: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Computes per-sample summary statistics about the genotype quality field in an array of genotype structs.
Added in version 0.3.0.
Examples
>>> sites = [ ... Row(genotypes=[Row(sampleId='NA12878', conditionalQuality=1)]), ... Row(genotypes=[Row(sampleId='NA12878', conditionalQuality=2)]), ... Row(genotypes=[Row(sampleId='NA12878', conditionalQuality=3)])] >>> df = spark.createDataFrame(sites, 'genotypes: array<struct<conditionalQuality: int, sampleId: string>>') >>> df.select(glow.sample_gq_summary_stats('genotypes').alias('stats')).collect() [Row(stats=[Row(sampleId='NA12878', mean=2.0, stdDev=1.0, min=1.0, max=3.0)])]
- Parameters
genotypes – An array of genotype structs with
conditionalQuality
field- Returns
An array of structs where each struct contains
mean
,stDev
,min
, andmax
of the genotype qualities for a sample. IfsampleId
is present in a genotype, it will be propagated to the resulting struct as an extra field.
-
glow.functions.
subset_struct
(struct: Union[pyspark.sql.column.Column, str], *fields) → pyspark.sql.column.Column[source]¶ Selects fields from a struct.
Added in version 0.3.0.
Examples
>>> df = spark.createDataFrame([Row(struct=Row(a=1, b=2, c=3))]) >>> df.select(glow.subset_struct('struct', 'a', 'c').alias('struct')).collect() [Row(struct=Row(a=1, c=3))]
- Parameters
struct – Struct from which to select fields
fields – Fields to select
- Returns
A struct containing only the indicated fields
-
glow.functions.
vector_to_array
(vector: Union[pyspark.sql.column.Column, str]) → pyspark.sql.column.Column[source]¶ Converts a
spark.ml
Vector
(sparse or dense) to an array of doubles.Added in version 0.3.0.
Examples
>>> from pyspark.ml.linalg import DenseVector, SparseVector >>> df = spark.createDataFrame([Row(v=SparseVector(3, {0: 1.0, 2: 2.0})), Row(v=DenseVector([3.0, 4.0]))]) >>> df.select(glow.vector_to_array('v').alias('arr')).collect() [Row(arr=[1.0, 0.0, 2.0]), Row(arr=[3.0, 4.0])]
- Parameters
vector – Vector to convert
- Returns
An array of doubles