Intro
In Part II we parsed the JSON data found in the DbSNP JSON download. In Part III, we will bulk-insert this data into our PostgreSQL database.
Choosing a PostgreSQL Python Driver
The list of options here is long, but I ended up choosing asyncpg for a number of reasons:
- Great support for PostgreSQL’s
COPY FROM ... STDIN
command.- Albeit this is supported in other drivers, but doesn’t have the python-object level of abstraction that I’d like.
- I tried adding a higher-level-of-abstraction to
psycopg2
’scopy_from()
method, but got nowhere fast with this Pull Request
- Great support for PostgreSQL’s
PREPARE
statement - The entire library is asynchronous, and writing lots of data to a remote datastore is a good usecase for
async
Python.- …we are writing to localhost in this walkthrough, so we don’t actually leverage async concurrency.
Writing to Database
In Part II we introduced the _load(self, parsed_data_iter)
method, agnostic of any implementation details. Let’s implement the method!
Naively, we could insert one row at a time, as each is parsed, as follows:
async def _load(self,
parsed_data_iter: Iterator[RefSnpCopyFromData]):
conn = await asyncpg.connect(database=self.database_name,
user="SeanH")
for copy_from_data in parsed_data_iter:
for table_name in copy_from_data._fields:
rows = getattr(copy_from_data, table_name)
for row in rows:
await conn.execute(
f"INSERT INTO {table_name} "
"{row._fields} "
"VALUES ($1, $2)", *row)
But we can do better than this!
PostgreSQL has the best (strong opinion loosely held) BULK INSERT method available. I make this claim having written ETLAlchemy: a library facilitating full-database (schema and data) migrations between any database supported by SQLAlchemy Core.
To properly bulk-insert, we need a buffer to continuously aggregate our data, and bulk-insert/dump buffer to the database intermittently when we hit our threshold.
Let’s try this:
async def _load(self, parsed_data_iter):
conn = await asyncpg.connect(
database=self.database_name, user="SeanH")
await conn.execute(
"SET session_replication_role to 'replica'")
# This should stop FK checks...
table_names = [table_name
for table_name in RefSnpCopyFromData._fields]
row_buff_dict = {table_name: []
for table_name in table_names}
buff_size = 0
for copy_from_data in parsed_data_iter:
for table_name in table_names:
for copy_from_row in getattr(copy_from_data,
table_name):
row_buff_dict[table_name].append(copy_from_row)
buff_size += 1
if buff_size % 5000 == 0: # Dump
print(f"Dumping (SNPs Processed: {buff_size})")
await self._dump_buffer(row_buff_dict, conn)
row_buff_dict = {table_name: []
for table_name in table_names}
print("Done.")
await conn.close()
async def _dump_buffer(self, row_buff_dict, conn):
for table_name in row_buff_dict.keys():
records = row_buff_dict[table_name]
if not records:
continue
await conn.copy_records_to_table(
table_name,
records=records,
columns=records[0]._fields)
As each RefSnpCopyFromData
object becomes available, we add its contents to our buffer. We apply the KISS principle, implementing our buffer as a dict
, mapping table_name
-> List[namedtuple]
.
Note: For simplicity’s sake, rather than tracking the memory footprint of our buffer, we are just tracking the # of
RefSNPs
processed.
When we hit our buff_size
, in this case 5000
, we leverage asyncpg
’s copy_records_to_table()
method to write to the database, and free up memory by re-initializing our buffer.
The Top-level Method: Revisited
Remember our load_ref_snps()
method? Recall that our _load()
method is asynchronous, this means we will have to run it with a Python asyncio
loop. We update our method as follows:
def load_ref_snps(self, dbsnp_filename, chromosome):
self.chromosome = chromosome
loop = asyncio.get_event_loop()
with gzip.open(dbsnp_filename) as gzip_fp:
loop.run_until_complete(
self._load((self._generate_parsed_data(line)
for line in gzip_fp)))
Note: It’s worth noting that we get NOTHING from running the
_load()
method in anasyncio
loop. We are only using asyncio because theasyncpg
library requires us to do so, that’s the only reason!
In the future, this also opens up the door to asynchronously download/stream each refsnp-chr*.json.gz
file, while we concurrently parse and insert the data into the database. This would be a great optimization, but is outside our scope!
The Top-level Method: Multiprocessed
Our current implementation presents a great opportunity to leverage Python’s multiprocessing
library, without any design changes. We just simply “hand-off” the streaming and parsing of our JSON to a pool of child processes, while our parent process accumulates our data buffer, eventually dumping to our database.
On my 8-core 2016 Macbook Pro, making the change from non-parallelized code to parallelized code reduces the run time from 1 hour and 3 minutes down to 20 minutes: a 300% performance increase. Not too shabby…
def load_ref_snps(self, dbsnp_filename, chromosome):
self.chromosome = chromosome
num_processes = cpu_count()
print(f"Found '{num_processes}' CPUs")
loop = asyncio.get_event_loop()
with Pool(num_processes) as pool:
with gzip.open(dbsnp_filename) as gzip_fp:
copy_from_data_iter = pool.imap_unordered(
self._generate_parsed_data,
gzip_fp,
1024) # chunksize
loop.run_until_complete(
self._load(copy_from_data_iter))
That’s it! Our Pool
object manages each child process by handing off lines from gzip_fp
to each process, returning an Iterator of RefSnpCopyFromData
objects which is handed off to load()
, satisfying the methods contract!
Note: It is not by coincidence that we were able to easily parallelize our application, nor was it by some perfect design. In my experience, it is a combination of:
- A desire to design and produce small, concise pieces of code, encapsulated in classes and methods with well-defined interfaces.
- Many, many iterations. (I went through 3 full refactors while putting this together)
Knowing that you want to parallelize something can obviously be factored into design, but is generally easier to do when you satisfy (1) above.
Putting it All Together
We can now easily write a script to download, parse and bulk insert all 23 Chromosome’s DbSNP data into our local database:
import os
from snip_warehouse import SnipLoader
from snip_warehouse.schema import init_db
input("Are you sure you want to re-init DB?"
"This takes 2-3 hrs per chromosome")
init_db(DB_NAME)
snip_loader = SnipLoader(DB_NAME)
chr_suffixes = [str(i) for i in range(1, 23)]
chr_suffixes += ["X", "Y", "MT"]
for chr_suffix in chr_suffixes:
snip_loader.download_dbsnp_file(
f"refsnp-chr{chr_suffix}.json.gz",
chr_suffix)
snip_loader.load_ref_snps(
f"refsnp-chr{chr_suffix}.json.gz",
chr_suffix)
os.system(f"rm refsnp-chr{chr_suffix}.json.gz")
More good news, on my 2016, 8-core Macbook Pro (w/ SSD), the Wall Time of warehousing the data of all 23 Chromosomes is 9 hours and 21 minutes. This satisfies the “overnight goal” - you can kick the above script off before bed, wake up, eat breakfast, and it should be done!
What’s Next?
You can find the full source code repo on github here
By open-sourcing this project, it is my hope that folks can take snippets from the walkthrough to build their own hand-curated, DbSNP-related database. Whether you are a Bioinformatician, a hobbying dev, or a PhD without much time or money, don’t recreate the wheel. Take bits and pieces and build something better.
I’ve already built one practical application of this database: The Snip API. This API allows a user to turn his/her 23&Me
Raw SNP Export into insights. It helped me discover that the instigator of my father’s Gallstones and Gallbladder removal was likely genetic.
You can find the Snip API client on github here.
I encourage everyone to use the API, and explore the JSON output: you will likely find something of interest. Further, I would encourage folks to:
- Build more APIs like it
- Build a GUI for Snip
- Find bugs/issues with Snip
- Integrate this data with other datasets
- Share all of this code/information with as many people as you can!
It’s an exciting time for the intersection of Biology and Software Engineering. My hope is that we can all ride the wave.