Parallel Queries: PARTGOR¶
%%capture
# load the magic extension and imports
%reload_ext nextcode
import pandas as pd
import GOR_query_helper as GQH
%env LOG_QUERY=1
project = "bch_connect_hg38"
%env GOR_API_PROJECT={project}
Parallel GORpipe queries with PARTGOR¶
This notebook explains advanced parallel concepts in GOR, beyond the standard PGOR macro command used to split up data processing across the genome. GOR files/tables are always consistently ordered to enable rapid data access based on genomic position. Additionally, GOR allows data tables to be partitioned, to provide fast access to subset of data based on tags (keys), using so called GOR-dictionaries. Such partitions are on two levels, based on individual tags or based on buckets storing data for multiple tags. This data architecture allow for very large tables that can be updated efficiently and queries efficiently with parallel processing. An example of such tables can be variants or sequence read coverage on patient sample level or GWAS analysis partitioned based on phenotypes.
Similar to PGOR, PARTGOR is a macro command that makes it easy to setup parallel queries that recognize the partition setup of dictionaries. The following examples will explore this.
Variant count¶
First, we use it to get information of the variants from all the samples in a single gene.
qh = GQH.GOR_Query_Helper()
mydefs = qh.add_many("""
def ##wgsvars## = source/var/wgs_varcalls.gord -s PN;
def ##segcov## = source/cov/segment_cov.gord -s PN;
""")
We can explore the metadata in ##wgsvars## by using the -asdict option with the NOR command:
%%gor
$mydefs
nor -asdict ##wgsvars## | colsplit #1 2 x -s '\|' | group -gc x_2 -count | granno -sum -ic allcount -count
Query ran in 1.16 sec Query fetched 152 rows in 0.58 sec (total time 1.73 sec)
x_2 | allCount | allCountx | sum_allCount | |
---|---|---|---|---|
0 | NaN | 5 | 152 | 24099 |
1 | .wgs_varcalls/buckets/wgs_varcalls_bucket_2023... | 200 | 152 | 24099 |
2 | .wgs_varcalls/buckets/wgs_varcalls_bucket_2023... | 200 | 152 | 24099 |
3 | .wgs_varcalls/buckets/wgs_varcalls_bucket_2023... | 200 | 152 | 24099 |
4 | .wgs_varcalls/buckets/wgs_varcalls_bucket_2023... | 200 | 152 | 24099 |
... | ... | ... | ... | ... |
147 | .wgs_varcalls/buckets/wgs_varcalls_bucket_2023... | 100 | 152 | 24099 |
148 | .wgs_varcalls/buckets/wgs_varcalls_bucket_2023... | 100 | 152 | 24099 |
149 | .wgs_varcalls/buckets/wgs_varcalls_bucket_2023... | 20 | 152 | 24099 |
150 | .wgs_varcalls/buckets/wgs_varcalls_bucket_2023... | 54 | 152 | 24099 |
151 | .wgs_varcalls/buckets/wgs_varcalls_bucket_2023... | 20 | 152 | 24099 |
152 rows × 4 columns
We see that there are 23163 samples imported, 141 buckets, most of which have 200 samples.
Now we create a temp table with all the genes of interest and join them with the variants:
mydefs = qh.add("""create #mygenes# = gor #genes# | where gene_symbol in ('BRCA2')""")
%%gor
$mydefs
gor [#mygenes#] | join -segsnp -ir <(gor ##wgsvars##)
| group 1 -gc reference,call,callcopies -count
Query ran in 6.17 sec. Query fetched 5,052 rows in 36.94 sec (total time 43.11 sec)
CHROM | POS | Reference | Call | CallCopies | allCount | |
---|---|---|---|---|---|---|
0 | chr13 | 32315226 | G | A | 1 | 461 |
1 | chr13 | 32315226 | G | A | 2 | 54 |
2 | chr13 | 32315300 | G | A | 1 | 9 |
3 | chr13 | 32315336 | T | C | 1 | 1 |
4 | chr13 | 32315382 | A | T | 1 | 1 |
... | ... | ... | ... | ... | ... | ... |
5047 | chr13 | 32400114 | G | A | 1 | 3 |
5048 | chr13 | 32400151 | T | A | 1 | 618 |
5049 | chr13 | 32400151 | T | A | 2 | 826 |
5050 | chr13 | 32400200 | A | G | 1 | 1 |
5051 | chr13 | 32400233 | C | T | 1 | 1 |
5052 rows × 6 columns
While the above query, that processed all the variants of 23k samples in BRCA2 ran pretty quickly, we execute it faster with parallel processing. Doing so by using multiple small regions over the BRCA2 will actually create significant overhead for each worker because of the fixed seek time for each file (here 141 buckets because we are not filtering on samples). Rather, we will paralellize over the sample partitions using PARTGOR.
mydefs = qh.add("""
create #partcount# = partgor -dict ##wgsvars##
<(gor [#mygenes#] | join -segsnp -ir <(gor ##wgsvars## -f #{tags})
| group 1 -gc reference,call,callcopies -count);
""")
%%gor
$mydefs
gor [#partcount#] | granno 1 -gc #3,#4 -count | top 200
Query ran in 4.56 sec. Query fetched 200 rows in 4.06 sec (total time 8.62 sec)
CHROM | POS | Reference | Call | CallCopies | allCount | allCountx | |
---|---|---|---|---|---|---|---|
0 | chr13 | 32315226 | G | A | 1 | 4 | 121 |
1 | chr13 | 32315226 | G | A | 1 | 5 | 121 |
2 | chr13 | 32315226 | G | A | 1 | 2 | 121 |
3 | chr13 | 32315226 | G | A | 1 | 4 | 121 |
4 | chr13 | 32315226 | G | A | 2 | 2 | 121 |
... | ... | ... | ... | ... | ... | ... | ... |
195 | chr13 | 32315655 | A | G | 1 | 1 | 117 |
196 | chr13 | 32315655 | A | G | 1 | 6 | 117 |
197 | chr13 | 32315655 | A | G | 2 | 2 | 117 |
198 | chr13 | 32315655 | A | G | 1 | 4 | 117 |
199 | chr13 | 32315655 | A | G | 2 | 1 | 117 |
200 rows × 7 columns
We see that we get multiple rows for each variant, with sample counts from each partition for the common variants. Therefore, we need to sum the sums because GOR does not automatically handle distributvie aggregates. We will now change the partition size sum up:
mydefs = qh.add("""
create #partcount# = partgor -dict ##wgsvars## -partsize 1000
<(gor [#mygenes#] | join -segsnp -ir <(gor ##wgsvars## -f #{tags})
| group 1 -gc reference,call,callcopies -count);
""")
%%gor
$mydefs
gor [#partcount#] | group 1 -gc #3,#4,callcopies -sum -ic allcount | rename sum_allcount gtCount
Query ran in 1.56 sec Query fetched 5,052 rows in 1.18 sec (total time 2.73 sec)
CHROM | POS | Reference | Call | CallCopies | gtCount | |
---|---|---|---|---|---|---|
0 | chr13 | 32315226 | G | A | 1 | 461 |
1 | chr13 | 32315226 | G | A | 2 | 54 |
2 | chr13 | 32315300 | G | A | 1 | 9 |
3 | chr13 | 32315336 | T | C | 1 | 1 |
4 | chr13 | 32315382 | A | T | 1 | 1 |
... | ... | ... | ... | ... | ... | ... |
5047 | chr13 | 32400114 | G | A | 1 | 3 |
5048 | chr13 | 32400151 | T | A | 1 | 618 |
5049 | chr13 | 32400151 | T | A | 2 | 826 |
5050 | chr13 | 32400200 | A | G | 1 | 1 |
5051 | chr13 | 32400233 | C | T | 1 | 1 |
5052 rows × 6 columns
Accessing more than one table¶
Now we will extend the query above and use sequence read coverage from each sample too understand the quality where variant may be absent in a sample. We will use the allvars.gord table in the VEP folder to list all possible variants within the gene. Then we will left-join the variant data on sample level and the coverage data.
mydefs = qh.add_many("""
def ##allvars## = source/anno/vep_v107/allvars.gord;
create #partcount# = partgor -dict ##wgsvars## -partsize 1000
<(gor [#mygenes#] | join -segsnp -ir <(gor ##allvars##)
| calc pn '#{tags}' | split pn
| varjoin -r -l -xl PN -xr PN -e 'miss' <(gor ##wgsvars## -f #{tags})
| group 1 -gc reference,call,callcopies -count
);
""")
%%gor
$mydefs
gor [#partcount#] | group 1 -gc #3,#4,callcopies -sum -ic allcount | rename sum_allcount gtCount
Query ran in 2.55 sec Query fetched 22,744 rows in 40.65 sec (total time 43.20 sec)
CHROM | POS | Reference | Call | CallCopies | gtCount | |
---|---|---|---|---|---|---|
0 | chr13 | 32315226 | G | A | 1 | 461 |
1 | chr13 | 32315226 | G | A | 2 | 54 |
2 | chr13 | 32315226 | G | A | miss | 23584 |
3 | chr13 | 32315300 | G | A | 1 | 9 |
4 | chr13 | 32315300 | G | A | miss | 24090 |
... | ... | ... | ... | ... | ... | ... |
22739 | chr13 | 32400151 | T | A | miss | 22655 |
22740 | chr13 | 32400200 | A | G | 1 | 1 |
22741 | chr13 | 32400200 | A | G | miss | 24098 |
22742 | chr13 | 32400233 | C | T | 1 | 1 |
22743 | chr13 | 32400233 | C | T | miss | 24098 |
22744 rows × 6 columns
We see that for most of the variants, the callcopies count indicates that they are missing. The fact that the variants in ##wgsvars## are sparse, we have to use the sequence read coverage to separate homozygous reference state from absense of variant due to lack of data. Therefore, we join the coverage information for each sample:
mydefs = qh.add("""
def ##allvars## = source/anno/vep_v107/allvars.gord;
create #partcount# = partgor -dict ##wgsvars## -partsize 1000
<(gor [#mygenes#] | join -segsnp -ir <(gor ##allvars##)
| calc pn '#{tags}' | split pn
| varjoin -r -l -xl PN -xr PN -e 'miss' <(gor ##wgsvars## -f #{tags})
| join -snpseg -maxseg 10000 -l -e 0 -xl PN -xr PN -rprefix Cov <(gor ##segcov## -f #{tags})
| replace callcopies if(callcopies!='miss',callcopies,if(Cov_depth > 5,'0','3'))
| group 1 -gc reference,call,callcopies -count
);
""")
%%gor
$mydefs
gor [#partcount#] | group 1 -gc #3,#4,callcopies -sum -ic allcount | rename sum_allcount gtCount
Query ran in 5.70 sec. Query fetched 40,428 rows in 97.91 sec (total time 103.60 sec)
CHROM | POS | Reference | Call | CallCopies | gtCount | |
---|---|---|---|---|---|---|
0 | chr13 | 32315226 | G | A | 0 | 1245 |
1 | chr13 | 32315226 | G | A | 1 | 461 |
2 | chr13 | 32315226 | G | A | 2 | 54 |
3 | chr13 | 32315226 | G | A | 3 | 22339 |
4 | chr13 | 32315300 | G | A | 0 | 1790 |
... | ... | ... | ... | ... | ... | ... |
40423 | chr13 | 32400200 | A | G | 1 | 1 |
40424 | chr13 | 32400200 | A | G | 3 | 22429 |
40425 | chr13 | 32400233 | C | T | 0 | 1657 |
40426 | chr13 | 32400233 | C | T | 1 | 1 |
40427 | chr13 | 32400233 | C | T | 3 | 22441 |
40428 rows × 6 columns
The query above is using first-principles method to perform a simplistic "joint-genotyping". GOR does have a much more efficient commands to do this, namely GTGEN and PRGTGEN. However, this approach is still reasonably fast as shown here for over 20k samples and 17k variants. We can for instance calculate the allele frequence like this:
%%gor
$mydefs
gor [#partcount#] | group 1 -gc #3,#4,callcopies -sum -ic allcount | rename sum_allcount gtCount
| pivot callcopies -v '0','1','2','3' -gc #3,#4 -e 0
| prefix call[+1]- x
| calc AN round(2.0*(float(x_0_gtcount)+float(x_1_gtcount)+float(x_2_gtcount)))
| calc AC x_1_gtcount+2*x_2_gtcount
| calc AF form(AC/float(AN),6,5)
| hide x_*
| granno chrom -count
| where AN > 40000
Query ran in 0.78 sec Query fetched 14,684 rows in 1.40 sec (total time 2.18 sec)
CHROM | POS | Reference | Call | AN | AC | AF | allCount | |
---|---|---|---|---|---|---|---|---|
0 | chr13 | 32316333 | CAA | C | 43358 | 1 | 0.00002 | 17684 |
1 | chr13 | 32316345 | C | A | 44856 | 2 | 0.00004 | 17684 |
2 | chr13 | 32316359 | A | G | 46436 | 1 | 0.00002 | 17684 |
3 | chr13 | 32316360 | C | A | 46560 | 1 | 0.00002 | 17684 |
4 | chr13 | 32316361 | C | A | 46624 | 1 | 0.00002 | 17684 |
... | ... | ... | ... | ... | ... | ... | ... | ... |
14679 | chr13 | 32398918 | G | A | 42656 | 2 | 0.00005 | 17684 |
14680 | chr13 | 32398926 | C | A | 41886 | 1 | 0.00002 | 17684 |
14681 | chr13 | 32398926 | C | T | 41886 | 4 | 0.00010 | 17684 |
14682 | chr13 | 32398927 | G | A | 41670 | 2 | 0.00005 | 17684 |
14683 | chr13 | 32398933 | C | T | 40768 | 1 | 0.00002 | 17684 |
14684 rows × 8 columns
Working with a subset of samples (tags)¶
Before we end this demonstration, we will show how we can calculate the AF for a subset of the samples, e.g. based on HPO phenotypes.
%%gor dfSamples <<
nor SubjectReports/Phenotypes.rep
| where description in ('Abnormality of head or neck',
'Abnormality of the face',
'Abnormality of the head') and present='Y'
| group -gc pn -set -sc code
| where listsize(set_code)=3
| select pn
Query ran in 9.16 sec. Query fetched 2,042 rows in 2.69 sec (total time 11.85 sec)
dfSamples.shape
(2042, 1)
We see that we have just over 2000 samples with all three HPO phenotypes present. Below, we define ##myPNs## using a federated approach, i.e. as the Pandas-dataframe dfSamples.
mydefs = qh.add_many("""
create ##myPNs## = nor [var:dfSamples];
def ##allvars## = source/anno/vep_v107/allvars.gord;
create #partcount# = partgor -dict ##wgsvars## -partscale 2 -ff [##myPNs##]
<(gor [#mygenes#] | join -segsnp -ir <(gor ##allvars##)
| calc pn '#{tags}' | split pn
| varjoin -r -l -xl PN -xr PN -e 'miss' <(gor ##wgsvars## -f #{tags})
| join -snpseg -maxseg 10000 -l -e 0 -xl PN -xr PN -rprefix Cov <(gor ##segcov## -f #{tags})
| replace callcopies if(callcopies!='miss',callcopies,if(Cov_depth > 5,'0','3'))
| group 1 -gc reference,call,callcopies -count
);
""")
%%gor
$mydefs
gor [#partcount#] | group 1 -gc #3,#4,callcopies -sum -ic allcount | rename sum_allcount gtCount
| pivot callcopies -v '0','1','2','3' -gc #3,#4 -e 0
| prefix call[+1]- x
| calc AN round(2.0*(float(x_0_gtcount)+float(x_1_gtcount)+float(x_2_gtcount)))
| calc AC x_1_gtcount+2*x_2_gtcount
| calc AF form(AC/float(AN),6,5)
| hide x_*
| granno chrom -count
| where AN > 4000
Query ran in 0.07 sec Loading relations [var:dfSamples] from local state Query ran in 0.64 sec Query fetched 14,386 rows in 22.01 sec (total time 22.65 sec)
CHROM | POS | Reference | Call | AN | AC | AF | allCount | |
---|---|---|---|---|---|---|---|---|
0 | chr13 | 32316367 | C | T | 4008 | 0 | 0.00000 | 17684 |
1 | chr13 | 32316370 | A | G | 4026 | 1 | 0.00025 | 17684 |
2 | chr13 | 32316386 | C | G | 4070 | 0 | 0.00000 | 17684 |
3 | chr13 | 32316388 | C | T | 4070 | 0 | 0.00000 | 17684 |
4 | chr13 | 32316393 | T | G | 4074 | 0 | 0.00000 | 17684 |
... | ... | ... | ... | ... | ... | ... | ... | ... |
14381 | chr13 | 32398878 | A | G | 4078 | 0 | 0.00000 | 17684 |
14382 | chr13 | 32398900 | T | A | 4038 | 0 | 0.00000 | 17684 |
14383 | chr13 | 32398901 | C | T | 4036 | 0 | 0.00000 | 17684 |
14384 | chr13 | 32398904 | C | T | 4018 | 0 | 0.00000 | 17684 |
14385 | chr13 | 32398905 | G | A | 4016 | 1 | 0.00025 | 17684 |
14386 rows × 8 columns
Mixing together PARTGOR and PGOR¶
We can apply parallelism in two dimensions simultaneously, i.e. along the partition axis with PARTGOR and along the genomic axis with PGOR. To do this, we apply PGOR within the PARGOR macro command. A practical case might be if we want to calculate the number of variants in a projects:
qh2 = GQH.GOR_Query_Helper()
mydefs2 = qh2.add_many("""
def #wgsvars# = source/var/wgs_varcalls.gord -s PN;
create #temp# = partgor -dict #wgsvars# -partscale 10 <(pgor #wgsvars# -f #{tags} | group chrom -gc pn -count);
""")
%%gor
$mydefs2
nor -asdict [#temp#] | calc file_ending right(#1,40)
Query ran in 5.11 sec. Query fetched 13 rows in 575.11 sec (total time 580.22 sec)
col1 | col2 | file_ending | |
---|---|---|---|
0 | ../../../../cache/result_cache/cache01/eak/eak... | 1 | e01/eak/eak5njnebc4d9f0ggefgoc1kbgk.gord |
1 | ../../../../cache/result_cache/cache01/138/138... | 6 | e01/138/1389320co9icjdh6od3domcd37c.gord |
2 | ../../../../cache/result_cache/cache01/d3i/d3i... | 9 | e01/d3i/d3ihdl3e80akj3gj5b5cl2n3b5e.gord |
3 | ../../../../cache/result_cache/cache01/1p0/1p0... | 11 | e01/1p0/1p01a31i559nn23a4cm0d05e62a.gord |
4 | ../../../../cache/result_cache/cache01/d38/d38... | 3 | e01/d38/d38fi2m4n3737ol9j61hjib2ol9.gord |
5 | ../../../../cache/result_cache/cache01/jkb/jkb... | 7 | e01/jkb/jkba4ebgabfmnhmghi1fp70fido.gord |
6 | ../../../../cache/result_cache/cache01/78f/78f... | 2 | e01/78f/78f0np9ed5o8hf16jdm56o2enao.gord |
7 | ../../../../cache/result_cache/cache01/104/104... | 10 | 01/104/104aclcl3oom29o58mhl9ka76hl2.gord |
8 | ../../../../cache/result_cache/cache01/kgc/kgc... | 5 | e01/kgc/kgcdmdhnke4ailg3fa2eo66c8ic.gord |
9 | ../../../../cache/result_cache/cache01/9lf/9lf... | 13 | e01/9lf/9lf3dk95j8nfal8pha7fn4mb53e.gord |
10 | ../../../../cache/result_cache/cache01/6ho/6ho... | 4 | e01/6ho/6ho41hdk6f6hd8a6dl1i3ncdomd.gord |
11 | ../../../../cache/result_cache/cache01/10h/10h... | 8 | 01/10h/10hn6nakh9anm4ohp1el52ga9ko8.gord |
12 | ../../../../cache/result_cache/cache01/g71/g71... | 12 | e01/g71/g71k5e7la2mgflg550a7b634j8e.gord |
We see that the temp dictionary #temp# is indeed with multiple lines of dictionaries, each of which represents the output from PGOR parallel executions. Using -partscale of 10, the above query generated over 300 parallel queries to count the variants per PN across the genome. A -partscale of 1 would have resulted in 3000 queries and the same number of temporary files. With the longest partitions taking about 5 minutes to run, the overall query time was about 10minutes with 100 workers. We can now sum up the total counts:
%%gor
$mydefs2
nor [#temp#] | group -gc pn -sum -ic allcount | granno -sum -avg -min -max -ic sum_allcount
| replace avg_sum_allCount int(float(#rc))
Query ran in 1.02 sec Query fetched 24,099 rows in 1.27 sec (total time 2.29 sec)
PN | sum_allCount | min_sum_allCount | max_sum_allCount | avg_sum_allCount | sum_sum_allCount | |
---|---|---|---|---|---|---|
0 | BCH_00HI1377A_1_38E_BC38 | 93044 | 233 | 6339576 | 455953 | 10988032042 |
1 | BCH_019-III-01-P2_1_38E_BC38 | 132944 | 233 | 6339576 | 455953 | 10988032042 |
2 | BCH_01HI1915A_1_38E_BC38 | 97958 | 233 | 6339576 | 455953 | 10988032042 |
3 | BCH_01HI2033A_1_38E_BC38 | 97677 | 233 | 6339576 | 455953 | 10988032042 |
4 | BCH_01HI2317A_1_38E_BC38 | 99034 | 233 | 6339576 | 455953 | 10988032042 |
... | ... | ... | ... | ... | ... | ... |
24094 | BCH_WWS902A_1_38E_BC38 | 91661 | 233 | 6339576 | 455953 | 10988032042 |
24095 | BCH_WWS904A_1_38E_BC38 | 93378 | 233 | 6339576 | 455953 | 10988032042 |
24096 | BCH_WWS9101X_1_38E_BC38 | 106516 | 233 | 6339576 | 455953 | 10988032042 |
24097 | BCH_WWS9104X_1_38E_BC38 | 105853 | 233 | 6339576 | 455953 | 10988032042 |
24098 | BCH_WWS9902X_1_38E_BC38 | 108602 | 233 | 6339576 | 455953 | 10988032042 |
24099 rows × 6 columns
We see that there are close to 11billion variants - about 455k on average per sample.