Incremental horizontal (pVCF-like) genotype generation¶
%%capture
# load the magic extension and imports
%reload_ext nextcode
import pandas as pd
import GOR_query_helper as GQH
%env LOG_QUERY=1
project = "clin-hg19"
%env GOR_API_PROJECT={project}
Incremental freezes¶
In this notebook we will explain how we can create genotype freezes and update them in an efficient way inrementally. The fact that we will update them shows that the name "freeze" is a misnomer for our usage. The name derives from the fact that a pVCF like horizontal genotype structures were used for storing versioned genotypes for a population. A more appropriate name would be horizontal genotype structure. In GOR, these structures have the following characteristics:
- Genotypes are stored with values 0,1,2, or 3, representing homzygous reference, het, hom alt, or unknown.
- The genotypes are stored horizontally, in a value column which character position represents a genotype of a sample.
- For large number of samples, the genotypes for each variant can be split up into multiple buckets.
- A bucket info file stores a mapping between samples (PN) and buckets. The buckets don't have to be of the same size.
- The genotype variant files that represent each bucket can be one or more GOR files.
- The GOR files representing the buckets can be "sparse" on variant level but fixed size for the length of the value column. Typically though, the genotype files for each bucket are set to have the same number of rows.
- For large buckets (e.g. 10k to 100k) it can be beneficial to "deflate" (compress) its content. Commands used to deal with these data structures, such as CSVSEL and CSVCC, automatically "inflate" their content. Using this type of row-level compression, on top of the regular block compression in GORZ, allows for faster join-filtering and better overall compression rates.
As shown in "Ultra-fast joint-genotyping with SparkGOR" (https://doi.org/10.1101/2022.10.25.513331), the number of observed variants in a population grows similarly as the square-root of its size. While the variants for a single sample are typically 75% with AF > 1%, the majority of the variants observed in a population are rare. For instance, in the XA database, with 630k samples, 83% of the variants have AF less than 0.001% and this ratio will grow as the number of samples increases.
A consequence of the above is that most of the characters used to represent the genotypes in the non-sparse value column of each variant row will be zero. Ultimitely, for a VERY large population, it may be more efficient to store the genotypes in a row-based representation. As an example, a use-case to find the carriers of a particular rare variant may be more efficient by accessing the VCF like variant rows from a GORdb "bucketized" table. However, use-cases that require not only the information about the presence of variants but also the reason for absense, i.e. the lack of sequence read data vs homozygous reference, benefit from non-sparse horizontal representation. Example of such use-cases are variant AF calculation or case-control analysis. In particular, case-control analysis for multiple phenotypes can be processed together for a single sweep thought the data, making the "freeze" structures very efficient.
The fact that GOR allows freezes and their samples to be split into multiple horizontal buckets makes them also easy to update incrementally. Without further ado, lets dive into the steps for generating a freeze!
Defining the input data¶
We will work with data that has been onboarded into the clin-hg19 project. For variant source we pick the WGS table with all the variants and for a good sequence read coverage cut-off, we will use 8 reads.
qh = GQH.GOR_Query_Helper()
mydefs = ""
mydefs = qh.add_many("""
def #variants# = source/var/wgs_varcalls.gord -s PN;
def #goodcov# = source/cov/goodcov_8.wgs.gord -s PN;
""")
%%gor
$mydefs
gor #variants# | top 5
Query ran in 1.15 sec Query fetched 5 rows in 0.01 sec (total time 1.16 sec)
CHROM | POS | Reference | Call | CallCopies | CallRatio | Depth | GL_Call | FILTER | FS | formatZip | PN | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | chr1 | 10048 | CT | C | 1 | 0.743 | 35 | 3 | PASS | 0.0 | NaN | GDX_2875539 |
1 | chr1 | 10057 | A | AC | 1 | 0.286 | 49 | 17 | PASS | 0.0 | NaN | GDX_2860014 |
2 | chr1 | 10108 | C | T | 1 | 0.235 | 34 | 37 | DRAGENSnpHardQUAL | 0.0 | NaN | GDX_2885755 |
3 | chr1 | 10119 | CT | C | 1 | 0.226 | 31 | 18 | PASS | 0.0 | NaN | GDX_2866201 |
4 | chr1 | 10119 | CT | C | 1 | 0.242 | 33 | 17 | PASS | 0.0 | NaN | GDX_2866252 |
%%gor
$mydefs
gor #goodcov# | top 5 | calc size #3-#2
Query ran in 0.80 sec Query fetched 5 rows in 0.01 sec (total time 0.82 sec)
Chrom | bpStart | bpStop | PN | size | |
---|---|---|---|---|---|
0 | chr1 | 9996 | 10786 | GDX_2850046 | 790 |
1 | chr1 | 9996 | 10800 | GDX_2865775 | 804 |
2 | chr1 | 9996 | 10797 | GDX_2853415 | 801 |
3 | chr1 | 9996 | 10796 | GDX_2869363 | 800 |
4 | chr1 | 9996 | 10768 | GDX_2877734 | 772 |
We see that the good coverage is stored as segments. For a given PN, if a segment spans a region, it means the sequence read coverage is 8 reads or better. Absense of segment, means poorer coverage. The maximum size of the seqments is set to be 10kb.
Determining the number of samples¶
We inspect the metadata of these GORdb dictionary tables to see the number of samples in the project and to understand how they are bucketized, i.e. organized into bucket partitions.
%%gor
$mydefs
nor -asdict #goodcov# | colsplit col1 2 x -s '\|' | group -gc x_2 -count | granno -sum -ic allcount
Query ran in 0.07 sec Query fetched 11 rows in 0.01 sec (total time 0.08 sec)
x_2 | allCount | sum_allCount | |
---|---|---|---|
0 | NaN | 19 | 787 |
1 | .goodcov_8.wgs.gord.buckets/b_1023_gK3g_1.gorz | 100 | 787 |
2 | .goodcov_8.wgs.gord.buckets/b_1237_Q7YQ_1.gorz | 100 | 787 |
3 | .goodcov_8.wgs.gord.buckets/b_1437_DZEM_1.gorz | 100 | 787 |
4 | .goodcov_8.wgs.gord.buckets/b_1476_YjWV_1.gorz | 20 | 787 |
5 | .goodcov_8.wgs.gord.buckets/b_1529_egom_1.gorz | 26 | 787 |
6 | .goodcov_8.wgs.gord.buckets/b_1574_4kcg_1.gorz | 22 | 787 |
7 | .goodcov_8.wgs.gord.buckets/b_213_LKUE_1.gorz | 100 | 787 |
8 | .goodcov_8.wgs.gord.buckets/b_417_oGNx_1.gorz | 100 | 787 |
9 | .goodcov_8.wgs.gord.buckets/b_620_4Hxq_1.gorz | 100 | 787 |
10 | .goodcov_8.wgs.gord.buckets/b_817_OlGG_1.gorz | 100 | 787 |
%%gor
$mydefs
nor -asdict #variants# | colsplit col1 2 x -s '\|' | group -gc x_2 -count | granno -sum -ic allcount
Query ran in 0.07 sec Query fetched 11 rows in 0.01 sec (total time 0.08 sec)
x_2 | allCount | sum_allCount | |
---|---|---|---|
0 | NaN | 8 | 787 |
1 | .wgs_varcalls.gord.buckets/b_1028_960w_1.gorz | 100 | 787 |
2 | .wgs_varcalls.gord.buckets/b_1253_N5bG_1.gorz | 100 | 787 |
3 | .wgs_varcalls.gord.buckets/b_1438_HPRP_1.gorz | 100 | 787 |
4 | .wgs_varcalls.gord.buckets/b_1490_l7E0_1.gorz | 27 | 787 |
5 | .wgs_varcalls.gord.buckets/b_1551_HzlH_1.gorz | 30 | 787 |
6 | .wgs_varcalls.gord.buckets/b_1596_Po5a_1.gorz | 22 | 787 |
7 | .wgs_varcalls.gord.buckets/b_213_l2AS_1.gorz | 100 | 787 |
8 | .wgs_varcalls.gord.buckets/b_416_8W3I_1.gorz | 100 | 787 |
9 | .wgs_varcalls.gord.buckets/b_627_Dfjz_1.gorz | 100 | 787 |
10 | .wgs_varcalls.gord.buckets/b_823_ue3t_1.gorz | 100 | 787 |
For some reason, the bucket structure is slightly different in the #variants# table than in #goodcov#, i.e. it seams that the last bucketization has for some reason failed in #goodcov#. This will however not disturb us for now. For the initial freeze setup, we will pick samples from 5 buckets of size 100 and for the incremental step, we will pick the rest.
mydefs = qh.add_many("""
create #pns_1# = nor -asdict #goodcov# | colsplit col1 2 x -s '\|'
| where x_2 in ('.goodcov_8.wgs.gord.buckets/b_213_LKUE_1.gorz','.goodcov_8.wgs.gord.buckets/b_417_oGNx_1.gorz','.goodcov_8.wgs.gord.buckets/b_620_4Hxq_1.gorz','.goodcov_8.wgs.gord.buckets/b_817_OlGG_1.gorz','.goodcov_8.wgs.gord.buckets/b_1437_DZEM_1.gorz')
| select col2 | rename col2 PN;
create #pns_2# = nor -asdict #goodcov# | colsplit col1 2 x -s '\|'
| where not( x_2 in ('.goodcov_8.wgs.gord.buckets/b_213_LKUE_1.gorz','.goodcov_8.wgs.gord.buckets/b_417_oGNx_1.gorz','.goodcov_8.wgs.gord.buckets/b_620_4Hxq_1.gorz','.goodcov_8.wgs.gord.buckets/b_817_OlGG_1.gorz','.goodcov_8.wgs.gord.buckets/b_1437_DZEM_1.gorz') )
| select col2 | rename col2 PN;
""")
%%gor
$mydefs
nor [#pns_1#] | merge -s [#pns_2#] | group -gc source -count
Query ran in 0.22 sec Query fetched 2 rows in 0.00 sec (total time 0.23 sec)
Source | allCount | |
---|---|---|
0 | L | 500 |
1 | R | 287 |
Initial freeze generation¶
For the initial freeze, we use the samples in #pns_1# and split them into two buckets. For the horizontal genotype structures, we will define a different bucket size than for the row level genotypes. For demonstration, we set it to 250, to split the 500 samples into two buckets, while in practice we would use much larger sizes, e.g. 50k.
mydefs = qh.add_many("""
def #gt_buck_size# = 250;
def #buck_start_offset_1# = 0;
create #PNbuckets_1# = nor [#pns_1#]
| rownum | calc bucket 'b_'+str(div((rownum-1), #gt_buck_size# ) + 1 + #buck_start_offset_1# )
| select PN,bucket;
""")
To inspect the nature of the bucket file we run the following:
%%gor
$mydefs
nor [#PNbuckets_1#] | granno -dis -sc bucket | top 5
Query ran in 0.21 sec Query fetched 5 rows in 0.00 sec (total time 0.21 sec)
PN | bucket | dis_bucket | |
---|---|---|---|
0 | GDX_2760240 | b_1 | 2 |
1 | GDX_2830067 | b_1 | 2 |
2 | GDX_2833994 | b_1 | 2 |
3 | GDX_2833984 | b_1 | 2 |
4 | GDX_2817183 | b_1 | 2 |
We see that in our bucket file, there are now two buckets. The row position within each bucket determines the position of the samples genotype within the value column of the GOR variant rows.
Distributive genotype processing¶
While we have very few samples to process here, we will setup the queries such that they scale for VERY large datasets. We rely on the fact that the processing can be done in a distributive (parallel) manner for each bucket. However, because we want to store rows that distinguish between values of 0 and 3 (unknown), we must generate a complete list of all the variants sites where we want to call genotypes.
- In #allvars_temp_1, we use the PARALLEL macro-command to do this in distributive manner for the buckets and PGOR within each bucket.
- In #allvars_1#, we then collapse the distinct list of variants from each bucket into a globally distinct list.
parts_1# simply defindes all of our parallel partitions.¶
pn_buck_filter_1# is a definition that filters our bucket relation for each partition. This binary relation can also be used as an input to the file-filter (-ff) because it uses the left-most column which in this case is PN, as shown above.¶
mydefs = qh.add_many("""
create #parts_1# = nor [#PNbuckets_1#] | group -gc bucket;
def #pn_buck_filter_1# = <(nor [#PNbuckets_1#] | where bucket = '#{col:bucket}');
create #allvars_temp_1 = parallel -parts [#parts_1#]
<(pgor #variants# -ff #pn_buck_filter_1# | where filter = 'PASS' | select 1-4 | distinct);
create #allvars_1# = pgor [#allvars_temp_1] | distinct;
""")
%%gor
$mydefs
create #temp# = pgor [#allvars_1#] | group chrom -count;
nor [#temp#] | group -sum -ic allcount
Query ran in 2.06 sec Query fetched 1 rows in 125.64 sec (total time 127.70 sec)
sum_allCount | |
---|---|
0 | 38502441 |
We see that for the first 500 samples, there are close to 39M variants. Now we are ready to setup the query to call the genotypes. Again, we use the PARALLEL command, now to show how we can call both our buckets of size 250. We start with only chr1 while it is trivial to change to a genome wide query with PGOR and thus have two-dimensional parallel execution.
mydefs = qh.add_many("""
create #hgt_1# = parallel -parts [#parts_1#] <(gor -p chr1 <(gor #variants# -ff #pn_buck_filter_1#
| calc gt if(not( filter = 'PASS' ),'3',callcopies)
| where gt = '1' or gt = '2'
| select 1-4,gt,PN
| merge [#allvars_1#])
| varjoin -i [#allvars_1#]
| rename #3 Ref | rename #4 Alt
| gtgen -gc Ref,Alt #pn_buck_filter_1# <(gor #goodcov# -s PN -ff #pn_buck_filter_1# )
);
""")
The above query uses the #pn_buck_filter_1# definition to filter the #variants# table based on the PNs in the partition ingested by the PARALLEL command, e.g. the bucket column in the #parts_1# relation as denoted by #{col:bucket}. This same relation, #pn_buck_filter_1#, is provided as a buckets description metadata to GTGEN and also to filter the #goodcov# table in the same manner as #variants#.
The GTGEN command can be thought of as a specialized join command that takes as a left-source the variant input stream and joins it with the coverage stream, as a right-source. The variant rows in the left-source define which horizontal genothype rows are called. Since the variant source is sparse, the GTGEN command needs to know all possible samples and their bucket structure. This is provided as a parameter, here #pn_buck_filter_1#, as is the right-source coverage stream. Note that if we would NOT filter the #PNbuckets_1# relation, each query/partition would output variant rows for all bucket, since each query in the PARALLEL macro is unaware of each other. The filtering of #variants# and #goodcov# is simply to avoid reading all the data for each partition.
The minimal input relation to GTGEN is (chrom,pos,ref,alt,gt,pn). If a value of GT is provided in the left-source, it will be placed in the proper location of the values column of the variant row representing its bucket. If the GT value of a PN is missing, it is because the sample is homozygous reference and the sparse nature of VCF/GOR or because it has been filtered away. Then the coverage kicks in, i.e. to decide if the read depth is sufficient to label the sample homozygous ref (0) or unknown (3). As shown in CALC GT, even if the variant is present in the left-source, we can also fail the variant explicitly, by setting it to unknown (3). However, for now, we will filter such variants away and let the read coverage decide on unknown statut - something we will revisit in later discussion.
Finally, to ensure that each partition and all the buckets representing the horizontal genotypes have the same number of variants (rows), we must merge the complete list of variants from all the samples, #allvars_1#, which most likely is larger than the variants within a single partition. Similarly, because we don't want variant rows that only have failed variants (3), we apply an intersection join. This means the variant list in each partition and buckets is no less and no more than each other!
We can quickly inspect few of the rows in our new freeze:
%%gor
$mydefs
gor [#hgt_1#]
| calc values2 'x_'+replace(replace(values,'3',''),'0','')
| calc len len(values)
| replace values 'x_'+left(values,30)
| top 6
Query ran in 2.18 sec Query fetched 6 rows in 103.82 sec (total time 106.00 sec)
CHROM | POS | Ref | Alt | Bucket | Values | values2 | len | |
---|---|---|---|---|---|---|---|---|
0 | chr1 | 10057 | A | AC | b_2 | x_333330033333333333333333333333 | x_ | 250 |
1 | chr1 | 10057 | A | AC | b_1 | x_333330330333330333330300333303 | x_1 | 250 |
2 | chr1 | 10119 | CT | C | b_2 | x_333333033333333330333333333333 | x_ | 250 |
3 | chr1 | 10119 | CT | C | b_1 | x_033330330033330333330330333030 | x_11 | 250 |
4 | chr1 | 10143 | CTAACCCCT | C | b_2 | x_333333033333333333333333333333 | x_ | 250 |
5 | chr1 | 10143 | CTAACCCCT | C | b_1 | x_333330330333330333330333333030 | x_1 | 250 |
What we see is as expected that each bucket has a values column of length 250. We also see that the values2 column, that shown only non-ref genotypes has very few elements. Indeed, some of the buckets only store 0 or 3 for these very rare variants!
Incremental freeze¶
Now we want to update our freeze (how crazy is that?) with the remaining samples that are stored in #pns_2#. We add code that is similar as before, however, now we set the bucket offset to 2 because we already have created two buckets. Like before, we find all the variants in the new partitions in #parts_2#, however, when we generate #allvars_2# we add all previously found variants in #allvars_1#. Finally, in #newvars_1# we define the list of new variants by subtracting the variants found in step one from our complete list of variants, using negative join. We use the -norm option because we are not assuming that the variant have been normalized.
mydefs = qh.add_many("""
def #buck_start_offset_2# = 2;
create #PNbuckets_2# = nor [#pns_2#] | rownum | calc bucket 'b_'+str(div((rownum-1), #gt_buck_size# ) + 1 + #buck_start_offset_2# ) | select PN,bucket;
create #parts_2# = nor [#PNbuckets_2#] | group -gc bucket;
def #pn_buck_filter_2# = <(nor [#PNbuckets_2#] | where bucket = '#{col:bucket}');
create #allvars_temp_2# = parallel -parts [#parts_2#] <(pgor #variants# -ff #pn_buck_filter_2# | where filter = 'PASS' | select 1-4 | distinct);
create #allvars_2# = pgor [#allvars_temp_2#] [#allvars_1#] | distinct;
create #newvars_1# = pgor [#allvars_2#] | varjoin -norm -n [#allvars_1#];
""")
%%gor
$mydefs
gor [#newvars_1#] | group genome -count
Query ran in 0.68 sec Query fetched 1 rows in 51.23 sec (total time 51.92 sec)
CHROM | bpStart | bpStop | allCount | |
---|---|---|---|---|
0 | chrA | 0 | 1000000000 | 8036969 |
We see that our new variants are only 8M compared with 39M in the first step.
Calling the next two partitions¶
The query below is equivalent to #hgt_1#, however, now #allvars_2# also includes all variants variants from step 1.
mydefs = qh.add_many("""
create #hgt_2# = parallel -parts [#parts_2#] <(gor -p chr1 <(gor #variants# -ff #pn_buck_filter_2#
| calc gt if(not( filter = 'PASS'),'3',callcopies)
| where gt = '1' or gt = '2'
| select 1-4,gt,PN
| merge [#allvars_2#])
| varjoin -i [#allvars_2#]
| rename #3 Ref | rename #4 Alt
| gtgen -gc Ref,Alt #pn_buck_filter_2# <(gor #goodcov# -ff #pn_buck_filter_2# )
);
""")
Calling the delta in the first two partitions¶
We must now update the old partitions because there are now new variants present, i.e. #newvars_1#. We know that none of the samples in #pns_1# are carriers for these variants (then they would be included in #allvars_1#). Thus, our task is only to record if they should be labeled with 0 or 3, which is what we decide upon using the sequence read coverage cut-off. In the query below, we therefore only have to read all the variants in #newvars_1#, add empty GT and PN columns to be compatible with GTGEN and force it to look at the coverage in each new variant site. Because the data footprint of #goodcov# is 10x smaller than #variants#, this is much faster than calling by having to read all the the variants!
mydefs = qh.add_many("""
create #hgt_delta_1# = parallel -parts [#parts_1#] <(gor -p chr1 [#newvars_1#] | calc GT '' | calc PN ''
| rename #3 Ref | rename #4 Alt
| gtgen -gc Ref,Alt #pn_buck_filter_1# <(gor #goodcov# -ff #pn_buck_filter_1# )
);
""")
%%gor
$mydefs
gor [#hgt_delta_1#]
| calc values2 'x_'+replace(replace(values,'3',''),'0','')
| calc len len(values)
| replace values 'x_'+left(values,30)
| throwif len(values2) > 2
| skip 10000
| top 6
Query ran in 1.56 sec Query fetched 6 rows in 103.86 sec (total time 105.42 sec)
CHROM | POS | Ref | Alt | Bucket | Values | values2 | len | |
---|---|---|---|---|---|---|---|---|
0 | chr1 | 1413295 | G | C | b_2 | x_333333333333333333333333333333 | x_ | 250 |
1 | chr1 | 1413295 | G | C | b_1 | x_333333333333333333333333333333 | x_ | 250 |
2 | chr1 | 1413508 | C | T | b_2 | x_333333333333333333333333333333 | x_ | 250 |
3 | chr1 | 1413508 | C | T | b_1 | x_333333333333333333333333333333 | x_ | 250 |
4 | chr1 | 1413644 | T | C | b_2 | x_333333330303333333333330333333 | x_ | 250 |
5 | chr1 | 1413644 | T | C | b_1 | x_330003303030030003000330333333 | x_ | 250 |
As expected, all the values that we check are without non-ref genotypes.
Comparison with single step calling¶
As a test, we will now make a comparison with genotype calling where will call all the variants in one go. Given the above, the code is straight forward. We set #pns_a# as the combination of #pns_1# and #pns_2# and similarly we merge the bucket metadata from the two steps. Again, we only look at chr1.
mydefs = qh.add_many("""
create #pns_a# = nor [#pns_1#] | merge [#pns_2#];
create #PNbuckets_a# = nor [#PNbuckets_1#] | merge [#PNbuckets_2#];
create #parts_a# = nor [#PNbuckets_a#] | group -gc bucket;
create #allvars_temp_a# = parallel -parts [#parts_a#] <(gor -p chr1 #variants# -ff #pn_buck_filter_a# | where filter = 'PASS' | select 1-4 | distinct);
create #allvars_a# = pgor [#allvars_temp_a#] | distinct;
def #pn_buck_filter_a# = <(nor [#PNbuckets_a#] | where bucket = '#{col:bucket}');
create #hgt_a# = parallel -parts [#parts_a#] <(gor -p chr1 <(gor #variants# -ff #pn_buck_filter_a#
| calc gt if(not( filter = 'PASS'),'3',callcopies)
| where gt = '1' or gt = '2'
| select 1-4,gt,PN
| merge [#allvars_a#])
| varjoin -i [#allvars_a#]
| rename #3 Ref | rename #4 Alt
| gtgen -gc Ref,Alt #pn_buck_filter_a# <(gor #goodcov# -ff #pn_buck_filter_a# )
);
""")
%%gor
$mydefs
gor -p chr1 <(gor [#allvars_a#] | merge -s [#allvars_2#])
| group 1 -gc #3,#4 -count
| throwif allcount != 2
| group chrom -count
Query ran in 0.80 sec Query fetched 1 rows in 6.97 sec (total time 7.76 sec)
CHROM | bpStart | bpStop | allCount | |
---|---|---|---|---|
0 | chr1 | 0 | 249250621 | 3633503 |
We see that the variant in both methods agree. Next check the data itself. Since we have not created a GORD file that points to all the partitions, we do that by merging the GOR relations that form the freeze.
%%gor
$mydefs
create #temp# = gor -p chr1 <(gor [#hgt_a#] | calc r '#hgt_a#'
| merge <(gor [#hgt_1#] | calc r '#hgt_1#')
| merge <(gor [#hgt_delta_1#] | calc r '#hgt_delta_1#' )
| merge <(gor [#hgt_2#] | calc r '#hgt_2#' ))
| group 1 -gc ref,alt,bucket -count -dis -sc values
| where allcount != 2 or dis_values > 1;
nor [#temp#]
Query ran in 0.67 sec Query fetched 0 rows in 0.00 sec (total time 0.67 sec)
CHROM | POS | Ref | Alt | Bucket | allCount | dis_Values |
---|
Explicitly failing variants based on variant QC parameters¶
Here we will show how the queries can be changed to enable incremental freeze and single-step freeze to be identical while we allow variants genotypes to be set to 3 based on QC metrics instead of filtered away as we did above using WHERE gt = '1' or gt = '2'. This means that we will have to read #variants# in the incremental delta step:
mydefs = qh.add_many("""
create #hgt_delta_1_v2# = parallel -parts [#parts_1#] <(gor -p chr1 [#newvars_1#]
| varjoin -ir <(gor #variants# -ff #pn_buck_filter_1#)
| calc gt if(not( filter = 'PASS'),'3',callcopies)
| merge [#newvars_1#]
| rename #3 Ref | rename #4 Alt
| gtgen -gc Ref,Alt #pn_buck_filter_1# <(gor #goodcov# -ff #pn_buck_filter_1# )
);
""")
%%gor
$mydefs
gor [#hgt_delta_1_v2#] | top 1 | replace values len(values)
Query ran in 0.24 sec Query fetched 1 rows in 0.00 sec (total time 0.24 sec)
CHROM | POS | Ref | Alt | Bucket | Values | |
---|---|---|---|---|---|---|
0 | chr1 | 10273 | T | C | b_1 | 250 |
Notice how we use #newvars_1# to join into the #variants# table to extract the new variants that previously must have failed on QC for the samples in #pns_1# (otherwise they would not be in #newvars_1# while samples in #pns_2# have them). We also merge it into the stream, in case no sample in the partitions has it.
As the freeze grows, there should be small and smaller fraction of variants that goes into #newvars_1# and ultimitely the join logic in GOR should move from streaming all the relevant data in #variants# to performing skip-scan seek. In any case, the CPU cost of GTGEN should only be proportional to #newvars_1# and the total population size.
Round 2 - materializing the freeze output¶
Here will will repeat some of the steps from above, however, now write out the necessary files to represent the freeze. More like if as if we are simulating an incremental workflow. We will setup a new query holder.
qh2 = GQH.GOR_Query_Helper()
mydefs2 = ""
mydefs2 = qh2.add_many("""
def #variants# = source/var/wgs_varcalls.gord -s PN;
def #goodcov# = source/cov/goodcov_8.wgs.gord -s PN;
create #pns_1# = nor -asdict #goodcov# | colsplit col1 2 x -s '\|' | where x_2 in ('.goodcov_8.wgs.gord.buckets/b_213_LKUE_1.gorz','.goodcov_8.wgs.gord.buckets/b_417_oGNx_1.gorz','.goodcov_8.wgs.gord.buckets/b_620_4Hxq_1.gorz','.goodcov_8.wgs.gord.buckets/b_817_OlGG_1.gorz','.goodcov_8.wgs.gord.buckets/b_1437_DZEM_1.gorz')
| select col2 | rename col2 PN;
create #pns_2# = nor -asdict #goodcov# | colsplit col1 2 x -s '\|' | where not( x_2 in ('.goodcov_8.wgs.gord.buckets/b_213_LKUE_1.gorz','.goodcov_8.wgs.gord.buckets/b_417_oGNx_1.gorz','.goodcov_8.wgs.gord.buckets/b_620_4Hxq_1.gorz','.goodcov_8.wgs.gord.buckets/b_817_OlGG_1.gorz','.goodcov_8.wgs.gord.buckets/b_1437_DZEM_1.gorz') )
| select col2 | rename col2 PN;
def #gt_buck_size# = 250;
def #buck_start_offset_1# = 0;
create #PNbuckets_1# = nor [#pns_1#]
| rownum | calc bucket 'b_'+str(div((rownum-1), #gt_buck_size# ) + 1 + #buck_start_offset_1# )
| select PN,bucket;
create #parts_1# = nor [#PNbuckets_1#] | group -gc bucket;
def #pn_buck_filter_1# = <(nor [#PNbuckets_1#] | where bucket = '#{col:bucket}');
create #allvars_temp_1 = parallel -parts [#parts_1#]
<(pgor #variants# -ff #pn_buck_filter_1# | where filter = 'PASS' | select 1-4 | distinct);
create #allvars_1# = pgor [#allvars_temp_1] | distinct;
""")
Now we write out the files using dummy create statements in
%%gor
$mydefs2
create w1 = gor [#allvars_1#] | write user_data/IncrFreeze/allvars_1.gorz;
create w2 = nor [#PNbuckets_1#] | write user_data/IncrFreeze/PNbuckets_1.tsv;
create w3 = nor [#PNbuckets_1#] | write user_data/IncrFreeze/PNbuckets_v1.tsv;
norrows 1
Query ran in 0.18 sec Query fetched 1 rows in 0.00 sec (total time 0.18 sec)
RowNum | |
---|---|
0 | 0 |
Next we call the freeze and write it out. We do that by create statements for each horizontal buckets. To simplify that we use a parameterized definition, #pn_buck_filter_1($1).
mydefs2 = qh2.add_many("""
def #pn_buck_filter_1($1) = <(nor [#PNbuckets_1#] | where bucket = $1 );
create #hgt_1_1# = gor -p chr1 <(gor #variants# -ff #pn_buck_filter_1('b_1')
| calc gt if(not( filter = 'PASS' ),'3',callcopies)
| select 1-4,gt,PN
| merge [#allvars_1#])
| varjoin -i -norm [#allvars_1#]
| rename #3 Ref | rename #4 Alt
| gtgen -gc Ref,Alt #pn_buck_filter_1('b_1') <(gor #goodcov# -s PN -ff #pn_buck_filter_1('b_1') );
create #hgt_1_2# = gor -p chr1 <(gor #variants# -ff #pn_buck_filter_1('b_2')
| calc gt if(not( filter = 'PASS' ),'3',callcopies)
| select 1-4,gt,PN
| merge [#allvars_1#])
| varjoin -i -norm [#allvars_1#]
| rename #3 Ref | rename #4 Alt
| gtgen -gc Ref,Alt #pn_buck_filter_1('b_2') <(gor #goodcov# -s PN -ff #pn_buck_filter_1('b_2') );
""")
We write out the two bucket partitions in the freeze and use simple NOR expression to setup a dictionary. Notice that we use the bucket name in the tags column instead of the list of PNs that belong there. This is different from the convention and changes slightly the -ff usage of the freeze GORD file as shown below.
%%gor
$mydefs2
create w1 = gor [#hgt_1_1#] | write user_data/IncrFreeze/hgt_1_1.gorz;
create w2 = gor [#hgt_1_2#] | write user_data/IncrFreeze/hgt_1_2.gorz;
norrows 1
| calc file 'hgt_1_1.gorz,hgt_1_2.gorz'
| calc part 'b_1,b_2'
| calc chrstart 'chr1'
| calc posstart 0
| calc chrstop 'chrZ'
| calc posstop 1000000000
| calc tags 'b_1,b_2'
| split file,part,tags
| hide #1
| write user_data/IncrFreeze/freeze_v1.tsv
Query ran in 2.01 sec Query fetched 0 rows in 81.86 sec (total time 83.87 sec)
ChromNOR | PosNOR |
---|
We rename the TSV file into a dictionary file.
import os
proj_path = '/home/jovyan/projects/clin-hg19/'
os.rename(proj_path+'user_data/IncrFreeze/freeze_v1.tsv',proj_path+'user_data/IncrFreeze/freeze_v1.gord')
Now we can query the freeze and fetch genotypes from few samples as:
%%gor
$mydefs2
create #mypns# = nor user_data/IncrFreeze/PNbuckets_v1.tsv
| where PN in ('GDX_2827446','GDX_2823062','GDX_2835098','GDX_2835129','GDX_2835431','GDX_2834933')
| select PN;
create #used_buckets# = nor user_data/IncrFreeze/PNbuckets_v1.tsv | inset -c PN [#mypns#] | select bucket;
gor -p chr1:7,723,384-7,725,276 user_data/IncrFreeze/freeze_v1.gord -ff [#used_buckets#]
| csvsel -gc ref,alt -vs 1 user_data/IncrFreeze/PNbuckets_v1.tsv [#mypns#] -tag PN -hide '0,3'
Query ran in 0.26 sec Query fetched 8 rows in 0.00 sec (total time 0.26 sec)
CHROM | POS | Ref | Alt | PN | value | |
---|---|---|---|---|---|---|
0 | chr1 | 7723534 | G | A | GDX_2835098 | 1 |
1 | chr1 | 7723588 | C | T | GDX_2835129 | 1 |
2 | chr1 | 7723588 | C | T | GDX_2834933 | 1 |
3 | chr1 | 7723957 | G | A | GDX_2823062 | 2 |
4 | chr1 | 7723957 | G | A | GDX_2835098 | 1 |
5 | chr1 | 7723957 | G | A | GDX_2835129 | 2 |
6 | chr1 | 7723957 | G | A | GDX_2835431 | 1 |
7 | chr1 | 7723957 | G | A | GDX_2834933 | 1 |
Incremental partitions¶
mydefs2 = qh2.add_many("""
def #buck_start_offset_2# = 2;
create #PNbuckets_2# = nor [#pns_2#] | rownum | calc bucket 'b_'+str(div((rownum-1), #gt_buck_size# ) + 1 + #buck_start_offset_2# ) | select PN,bucket;
create #parts_2# = nor [#PNbuckets_2#] | group -gc bucket;
def #pn_buck_filter_2# = <(nor [#PNbuckets_2#] | where bucket = '#{col:bucket}');
create #allvars_temp_2# = parallel -parts [#parts_2#] <(pgor #variants# -ff #pn_buck_filter_2# | where filter = 'PASS' | select 1-4 | distinct);
create #allvars_2# = pgor [#allvars_temp_2#] [#allvars_1#] | distinct;
create #newvars_1# = pgor [#allvars_2#] | varjoin -norm -n [#allvars_1#];
""")
%%gor
$mydefs2
create w1 = gor [#allvars_2#] | write user_data/IncrFreeze/allvars_2.gorz;
create w2 = nor [#PNbuckets_2#] | write user_data/IncrFreeze/PNbuckets_2.tsv;
create w3 = gor [#newvars_1#] | write user_data/IncrFreeze/newvars_1.gorz;
norrows 1
Query ran in 0.34 sec Query fetched 1 rows in 0.00 sec (total time 0.34 sec)
RowNum | |
---|---|
0 | 0 |
mydefs2 = qh2.add_many("""
def #pn_buck_filter_2($1) = <(nor [#PNbuckets_2#] | where bucket = $1 );
create #hgt_2_3# = gor -p chr1 <(gor #variants# -ff #pn_buck_filter_2('b_3')
| calc gt if(not( filter = 'PASS' ),'3',callcopies)
| select 1-4,gt,PN
| merge [#allvars_2#])
| varjoin -i -norm [#allvars_2#]
| rename #3 Ref | rename #4 Alt
| gtgen -gc Ref,Alt #pn_buck_filter_2('b_3') <(gor #goodcov# -s PN -ff #pn_buck_filter_2('b_3') );
create #hgt_2_4# = gor -p chr1 <(gor #variants# -ff #pn_buck_filter_2('b_4')
| calc gt if(not( filter = 'PASS' ),'3',callcopies)
| select 1-4,gt,PN
| merge [#allvars_2#])
| varjoin -i -norm [#allvars_2#]
| rename #3 Ref | rename #4 Alt
| gtgen -gc Ref,Alt #pn_buck_filter_2('b_4') <(gor #goodcov# -s PN -ff #pn_buck_filter_2('b_4') );
create #hgt_delta_1_1# = gor -p chr1 <(gor [#newvars_1#]
| varjoin -ir -norm <(gor #variants# -ff #pn_buck_filter_1('b_1') )
| calc gt if(not( filter = 'PASS'),'3',callcopies)
| merge [#newvars_1#]
| rename #3 Ref | rename #4 Alt
| gtgen -gc Ref,Alt #pn_buck_filter_1('b_1') <(gor #goodcov# -ff #pn_buck_filter_1('b_1') ));
create #hgt_delta_1_2# = gor -p chr1 <(gor [#newvars_1#]
| varjoin -ir -norm <(gor #variants# -ff #pn_buck_filter_1('b_2') )
| calc gt if(not( filter = 'PASS'),'3',callcopies)
| merge [#newvars_1#]
| rename #3 Ref | rename #4 Alt
| gtgen -gc Ref,Alt #pn_buck_filter_1('b_2') <(gor #goodcov# -ff #pn_buck_filter_1('b_2') ));
""")
%%gor
$mydefs2
create w1 = gor [#hgt_2_3#] | write user_data/IncrFreeze/hgt_2_3.gorz;
create w2 = gor [#hgt_2_4#] | write user_data/IncrFreeze/hgt_2_4.gorz;
create w3 = gor [#hgt_delta_1_1#] | write user_data/IncrFreeze/hgt_delta_1_1.gorz;
create w4 = gor [#hgt_delta_1_2#] | write user_data/IncrFreeze/hgt_delta_1_2.gorz;
norrows 1
| calc file 'hgt_2_3.gorz,hgt_2_4.gorz,hgt_delta_1_1.gorz,hgt_delta_1_2.gorz'
| calc part 'b_3,b_4,b_1,b_2'
| calc chrstart 'chr1'
| calc posstart 0
| calc chrstop 'chrZ'
| calc posstop 1000000000
| calc tags 'b_3,b_4,b_1,b_2'
| split file,part,tags
| hide #1
| merge <(nor -asdict user_data/IncrFreeze/freeze_v1.gord)
| write user_data/IncrFreeze/freeze_v2.tsv
Query ran in 1.15 sec Query fetched 0 rows in 172.46 sec (total time 173.61 sec)
ChromNOR | PosNOR |
---|
%%gor
$mydefs2
gor [#hgt_1_2#] -p chr7 | top 1
Query ran in 0.24 sec Query fetched 0 rows in 0.00 sec (total time 0.25 sec)
CHROM | POS | Ref | Alt | Bucket | Values |
---|
We also write a combined bucket file. To keep consistency in the relative order of PNS within the buckets, we use rownum and SORT because NOR merge does not behave like concatenation of tables. This will only take few seconds, even for freezes with few millions of samples but can also be replaced with a "cat with a tail"!
%%gor
nor user_data/IncrFreeze/PNbuckets_1.tsv | rownum
| merge -s <(nor user_data/IncrFreeze/PNbuckets_2.tsv | rownum)
| sort -c source,rownum:n | hide rownum,source
| write user_data/IncrFreeze/PNbuckets_v2.tsv
Query ran in 0.61 sec Query fetched 0 rows in 0.01 sec (total time 0.62 sec)
ChromNOR | PosNOR |
---|
and a rename to GORD
os.rename(proj_path+'user_data/IncrFreeze/freeze_v2.tsv',proj_path+'user_data/IncrFreeze/freeze_v2.gord')
The complete list of our output files is here:
%%gor
nor user_data/IncrFreeze | select filename
| where right(#1,4) in ('gorz','gord','.tsv') | sort -c #1
Query ran in 0.07 sec Query fetched 15 rows in 0.01 sec (total time 0.08 sec)
Filename | |
---|---|
0 | PNbuckets_1.tsv |
1 | PNbuckets_2.tsv |
2 | PNbuckets_v1.tsv |
3 | PNbuckets_v2.tsv |
4 | allvars_1.gorz |
5 | allvars_2.gorz |
6 | freeze_v1.gord |
7 | freeze_v2.gord |
8 | hgt_1_1.gorz |
9 | hgt_1_2.gorz |
10 | hgt_2_3.gorz |
11 | hgt_2_4.gorz |
12 | hgt_delta_1_1.gorz |
13 | hgt_delta_1_2.gorz |
14 | newvars_1.gorz |
We can now look at the GORD file after incremental update, i.e. version 2 of the freeze:
%%gor
nor -asdict user_data/IncrFreeze/freeze_v2.gord
Query ran in 0.07 sec Query fetched 6 rows in 0.00 sec (total time 0.08 sec)
file | part | chrstart | posstart | chrstop | posstop | tags | |
---|---|---|---|---|---|---|---|
0 | hgt_1_1.gorz | b_1 | chr1 | 0 | chrZ | 1000000000 | b_1 |
1 | hgt_1_2.gorz | b_2 | chr1 | 0 | chrZ | 1000000000 | b_2 |
2 | hgt_2_3.gorz | b_3 | chr1 | 0 | chrZ | 1000000000 | b_3 |
3 | hgt_2_4.gorz | b_4 | chr1 | 0 | chrZ | 1000000000 | b_4 |
4 | hgt_delta_1_1.gorz | b_1 | chr1 | 0 | chrZ | 1000000000 | b_1 |
5 | hgt_delta_1_2.gorz | b_2 | chr1 | 0 | chrZ | 1000000000 | b_2 |
In other words, buckets b_1 and b_2 are now stored in two GORZ files and they are transparently merged by the GOR-driver when we access them. In the next incremental step, we have to create delta files for all the existing partition. What is now allvars_2.gorz then becomes equivalent to allvars_1.gorz and we calculate the new variant from the next partitions based on it. Importantly, when we then write out the deltas, such as #hgt_delta_1_1#, we have to GOR-merge the existing deltas that belong to partitions that have already been delta-called. This is cheap since the delta files will have few variants compared to the other files. Eventually, the delta files can be GOR-merged with the main partition files, collapsing them like in the VEP insert process.
Finally, the buckets can easily be restructured by using the CSVSEL command, e.g. by doubling their size if needed. As a very last remark, the write steps that we show here, e.g. w1, w2 etc, can be made into parallel write via PGOR, however, then the output file/folder needs to have a gord ending. And the writing can then even be combined with the calling step. This may save some time, however, calls for using nested dictionaries and more files. Another option to reduce the overhead of writing the buckets into a single file per genome is to use "DEFLATECOLUM values" following the GTGEN. It will compress the data on a row level on top of the block compression and reduce the overall data footprint and the GOR parsing cost in the write step.
In another notebook we will cover how we can update allele frequencies from a freeze in incremental fashion.