Warehousing DbSNP, Part II: Streaming and Parsing SNP Data

Intro

Part II of our adventure to warehouse a subset of DBbSNP’s JSON data picks up where Part I left off.

Now that we have our database and chromosome 1 file readily available, let’s work to stream and parse each line of refsnp-chr1.json.gz, in preparation for database insertion.

Obtaining a File Handle

Typically in Python we would open our file as follows:

with open(dbsnp_filename) as fp:
    # Do stuff with fp here

However have a slight wrinkle here: we need to decompress our gzipped data. Conveniently, we can leverage the gzip package as follows:

with gzip.open(dbsnp_filname) as gzip_fp:
    # Do stuff here

Credit to Joe Jevnik for pointing out the (now obvious) step of decompressing after loading from disk, rather than decompressing the entire file before iteration

SnipLoader Class

Let’s start by creating a simple class called SnipLoader to place all of our methods and attributes under one roof:

class SnipLoader:
    def __init__(self, database_name):
        self.database_name = database_name
        self.file_blocksize = 1024 **2

Top-level File Iteration

Let’s introduce 2 methods:

  1. _generate_parsed_data(self, raw_line: str) -> RefSnpCopyFromData:
    • args: Takes a raw_line the JSON text from our file.
    • returns: An object of type RefSnpCopyFromData, holding parsed data we care about (defined in Part I)
  2. _load(self, parsed_data_iter: Iterator[RefSnpCopyFromData]) -> None
    • args: Takes an Iterator that yields a RefSnpCopyFromData object at each iteration.
    • side effects: Accumulates each iteration, and efficiently inserts the data into our database.

We tie these 2 ideas together to get our top-level, public API method load_ref_snps():

def load_ref_snps(self, dbsnp_filename, chromosome):
    self.chromosome = chromosome
    with gzip.open(dbsnp_filename) as gzip_fp:
        self._load((self._generate_parsed_data(line)
                    for line in gzip_fp))

Note: We create an iterator out of our generate_parsed_data(raw_line) method and our gzip_fp file handle using the (func(item) for item in iter) generator-comprehension syntax above, which produces a generator.

DbSNP’s JSON Schema

When implementing generate_parsed_data(raw_line), we need to know the JSON schema of each object. There are some schema details buried here in the Variation Services API, but with JSON, I always find it easier to view the data directly.

For brevity’s sake, here is the top-level of our JSON object (that we care about):

Github Link to Sample RefSNP JSON

{
    "refsnp_id": "242",
    ...,
    "primary_snapshot_data": {
        "placements_with_allele": [
            {
                "placement_annot": {
                    "seq_id_traits_by_assembly": [
                        {
                            "assembly_name": "GRCh38.p7",
                            ...
                        }
                    ],
                    ...
                },
                "alleles": [...],
                ...
            },
            { /* More placements here*/ }
        ],
        "allele_annotations": [
            /* One allele_annotation per allele above */
        ]
    }
}

In generate_parsed_data(raw_line), our first line of code should parse the JSON string into a dict:

rsnp_json = json.loads(raw_line)

We use this object moving forward to parse out data.

Finding Alleles from a Specific Assembly Genome

We want to first find the placement that corresponds to our target assembly_name, which in my specific case is GRCh38 (this is what 23&Me uses in their current pipeline). The most recent build is GRCh38.p12.

An assembly is short for an assembly genome, which is the best representation of the typical human genome, and we use this as a point of reference for detecting variations.

Github Link

@staticmethod
def _find_alleles_from_assembly(rsnp_placements,
                                assembly_name="GRCh38"):
    for rsnp_placement in rsnp_placements:
        annot = rsnp_placement.get('placement_annot')
        if not annot or not annot.get('seq_id_traits_by_assembly'):
            return
        assembly_info_ls = annot['seq_id_traits_by_assembly']
        assembly_info = assembly_info_ls[0]
        this_assembly_name = assembly_info.get("assembly_name")
        if assembly_name in this_assembly_name:
            return rsnp_placement['alleles']

rsnp_placements = rsnp_json['primary_snapshot_data']
    'placements_with_allele']
alleles = _find_alleles_from_assembly(rsnp_placements)

This gives us all alleles, or “gene variant”, at that specific RefSNP. Our goal is to find alleles which differ from our assembly genome.

Find Variant Alleles for RefSNP

To find our variant alleles, we need to look at one level deeper in our JSON schema to see how they are represented:

"alleles": [
    {
        "allele": {
            "spdi": {
                "seq_id": "NC_000001.11",
                "position": 20542967,
                "deleted_sequence": "T",
                "inserted_sequence": "T"
            }
        },
        "hgvs": "NC_000001.11:g.20542968T="
    },
    {
        "allele": {
            "spdi": {
                "seq_id": "NC_000001.11",
                "position": 20542967,
                "deleted_sequence": "T",
                "inserted_sequence": ""
            }
        },
        "hgvs": "NC_000001.11:g.20542968delT"
    }
]

Above, we see two alleles of a gene, at a single position where a SNP occurs (by definition, in more than 1% of the population). The first allele is the non-variant. We know this because the deleted_sequence == inserted_sequence.

The second is allele, is a variant. Specifically, it is a deletion SNP, where a base-pair is deleted from a gene.

We only want to grab variants, like the second allele, and we do so with the following simple logic:

Github Link

@staticmethod
def _get_variant_alleles(chromosome, alleles,
                         ref_snp_id) -> List[RefSnpAllele]:
    variant_alleles = []
    for idx, allele in enumerate(alleles):
        spdi = allele['allele']['spdi']
        ins, delete = spdi['inserted_sequence'], spdi['deleted_sequence']
        if ins != delete:
            variant_alleles.append(RefSnpAllele(
                ins_seq=ins,
                del_seq=delete,
                position=spdi['position'],
                ref_snp_allele_idx=idx,
                chromosome=chromosome,
                ref_snp_id=ref_snp_id,
                gene_locii=None))
    return variant_alleles

Note: We are returning data from the method above using our RefSnpAllele Data Transfer Object, defined in Part I, for readability, and error prevention! We continue to shuttle data following the same pattern, in methods below, leveraging our other DTOs.

Parse Frequency Studies, Clinical Diseases, and Gene Locii

In our top-level JSON object we had a key named allele_annotations. This key maps to a list, with 1 element per allele found our previous section above.

While not made obvious by documentation, the allele_annotations and alleles share a common index. Since we had 2 alleles above (1 variant, 1 non-variant), we’d expect 2 annotations in our JSON.

Let’s take a look at the JSON fields (that we care about), only considering the variant allele, again, for brevity’s sake.

{
    "frequency": [
        {
            "study_name": "1000Genomes",
            ...
            "allele_count": 257,
            "total_count": 5008,
        }
    ],
    "clinical": [
        {
            "disease_names": [
                "Alzheimer's"
            ],
            ...,
            "clinical_significances": [
            /* These are ClinVar Enumerated types */
                "likely-benign"
            ],
            "citations": [12345], /* These are Pubmed IDs */
        }

    ],
    "assembly_annotation": [
        {
            ...,
            "genes": [
                {
                    "locus": "ASH1L",
                    "orientation": "minus"
                }
            ]
        }
    ]
}

We can first parse out Frequency Study information as follows:

Github Link

@staticmethod
def _parse_freq_studies(allele_annotation,
                        ref_snp_id,
                        allele_idx) -> List[RefSnpAlleleFreqStudy]:
    return [RefSnpAlleleFreqStudy(
        name=freq['study_name'],
        allele_count=freq['allele_count'],
        total_count=freq['total_count'],
        ref_snp_allele_idx=allele_idx,
        ref_snp_id=ref_snp_id)
        for freq in allele_annotation['frequency'] or []]

After these, we can grab the ClinVar (clincial disease) data. ClinVar data includes PubMed Citation IDs, Disease Names, and Clinical Signifiance, which takes 1 of the following values:

  • not_provided
  • pathogenic
  • likely_pathogenic
  • benign
  • likely_benign
  • drug_response
  • confers_sensitivity
  • risk_factor
  • association
  • protective
  • conflicting_interpretations_of_pathogenicity
  • uncertain_significance
  • affects
  • association_not_found
  • other

Github Link

@staticmethod
def _parse_clin_diseases(self, allele_annotation,
                         ref_snp_id,
                         allele_idx) -> List[
                             RefSnpAlleleClinDisease]:
    return [RefSnpAlleleClinDisease(
        citation_list=clin['citations'],
        disease_name_csv=",".join(clin['disease_names']),
        clinical_significance_csv=",".join(
            clin['clinical_significances']),
        ref_snp_allele_idx=allele_idx,
        ref_snp_id=ref_snp_id)
            for clin in allele_annotation['clinical']]

Finally, let’s grab the locii of all genes that this SNP has an effect on.

Note: I am still unsure why there are “multiple genes” per Allele Annotation. My current theory is that NCBI is including effects of this gene being altered on a gene downstream in some network/pathway. If anyone has any insight, please send me an email

Github Link

@staticmethod
def _parse_gene_locii(allele_annotation) -> List[str]:
    assembly_annotation = allele_annotation[
        'assembly_annotation']
    return set(
        [gene['locus'] for gene in
            (assembly_annotation[0]['genes']
             if assembly_annotation else [])])

Gathering Data for Insert

We want to put this all together into a function which returns a RefSnpCopyFromData object. This houses all data pertaining to a single RefSNP that is to be inserted into our database:

The last two functions look like this:

Github Link

def _generate_parsed_data(self, raw_line) -> RefSnpCopyFromData:
    rsnp_json = json.loads(raw_line)
    ref_snp_id = int(rsnp_json['refsnp_id'])
    rsnp_placements = rsnp_json['primary_snapshot_data'][
                            'placements_with_allele']
    copy_from_data = RefSnpCopyFromData(
        ref_snp_alleles=[],
        ref_snp_allele_freq_studies=[],
        ref_snp_allele_clin_diseases=[])
    if not rsnp_placements:
        return copy_from_data
    allele_data = self._find_alleles_from_assembly(rsnp_placements)
    if not allele_data:
        return copy_from_data
    variant_ref_snp_alleles = self._get_variant_alleles(
        self.chromosome,
        allele_data,
        ref_snp_id)
    if not variant_ref_snp_alleles:
        return copy_from_data
    for allele in variant_ref_snp_alleles:
        allele_idx = allele.ref_snp_allele_idx
        allele_annotation = rsnp_json['primary_snapshot_data'][
            'allele_annotations'][allele_idx]
        gene_locii = self._parse_gene_locii(allele_annotation)
        freq_studies = self._parse_freq_studies(allele_annotation,
                                                ref_snp_id,
                                                allele_idx)
        clin_diseases = self._parse_clin_diseases(allele_annotation,
                                                  ref_snp_id,
                                                  allele_idx)
        copy_from_data = self._update_copy_from_data(
           copy_from_data, allele, freq_studies,
           clin_diseases, gene_locii)
    return copy_from_data

And…

Github Link

@staticmethod
def _update_copy_from_data(copy_from_data: RefSnpCopyFromData,
                           allele: RefSnpAllele,
                           freq_studies: List[
                               RefSnpAlleleFreqStudy],
                           clin_diseases: List[
                               RefSnpAlleleClinDisease],
                           gene_locii: List[str]):
    copy_from_data.ref_snp_alleles.append(
        RefSnpAllele(
            del_seq=allele.del_seq,
            ins_seq=allele.ins_seq,
            position=allele.position,
            ref_snp_allele_idx=allele.ref_snp_allele_idx,
            chromosome=allele.chromosome,
            ref_snp_id=allele.ref_snp_id,
            gene_locii=gene_locii))
    copy_from_data.ref_snp_allele_freq_studies.extend(
        freq_studies)
    copy_from_data.ref_snp_allele_clin_diseases.extend(
        clin_diseases)
    return copy_from_data

Note: We leverage @staticmethod decorators in almost all of the methods above. Static methods do NOT have access to the object’s internal state (hence why the self parameter is gone), and therefore have no side-effects. Since these are now explicitly pure functions, they are extremely easy to test, and read.

Woah. That’s a good chunk of code for 1 post. Let’s get to our final destination: Inserting our DbSNP data efficiently into PostgreSQL.