Source code for capreolus.collection.covid

import filecmp
import math
import os
import tarfile
from pathlib import Path

import pandas as pd
from tqdm import tqdm

from capreolus import ConfigOption, constants
from capreolus.utils.common import download_file
from capreolus.utils.loginit import get_logger

from . import Collection

[docs]logger = get_logger(__name__)
[docs]PACKAGE_PATH = constants["PACKAGE_PATH"]
[docs]@Collection.register class COVID(Collection): """ The COVID-19 Open Research Dataset (https://www.semanticscholar.org/cord19) """
[docs] module_name = "covid"
[docs] url = "https://ai2-semanticscholar-cord-19.s3-us-west-2.amazonaws.com/historical_releases/cord-19_%s.tar.gz"
[docs] generator_type = "Cord19Generator"
[docs] config_spec = [ ConfigOption("round", 3), ConfigOption("coll_type", "abstract", "one of: abstract, fulltext, paragraph"),
]
[docs] def build(self): coll_type, round = self.config["coll_type"], self.config["round"] type2coll = { "abstract": "Cord19AbstractCollection", "fulltext": "Cord19FullTextCollection", "paragraph": "Cord19ParagraphCollection", } dates = ["2020-04-10", "2020-05-01", "2020-05-19", "2020-06-19", "2020-07-16"] if coll_type not in type2coll: raise ValueError(f"Unexpected coll_type: {coll_type}; expeced one of: {' '.join(type2coll.keys())}") if round > len(dates): raise ValueError(f"Unexpected round number: {round}; only {len(dates)} number of rounds are provided") self.collection_type = type2coll[coll_type] self.date = dates[round - 1]
[docs] def download_if_missing(self): cachedir = self.get_cache_path() tmp_dir, document_dir = Path("/tmp"), cachedir / "documents" expected_fns = [document_dir / "metadata.csv", document_dir / "document_parses"] if all([os.path.exists(f) for f in expected_fns]): return document_dir.as_posix() url = self.url % self.date tar_file = tmp_dir / f"covid-19-{self.date}.tar.gz" if not tar_file.exists(): download_file(url, tar_file) with tarfile.open(tar_file) as f: f.extractall(path=cachedir) # emb.tar.gz, metadata.csv, doc.tar.gz, changelog os.rename(cachedir / self.date, document_dir) doc_fn = "document_parses" if f"{doc_fn}.tar.gz" in os.listdir(document_dir): with tarfile.open(document_dir / f"{doc_fn}.tar.gz") as f: f.extractall(path=document_dir) else: self.transform_metadata(document_dir) # only document_parses and metadata.csv are expected for fn in os.listdir(document_dir): if (document_dir / fn) not in expected_fns: os.remove(document_dir / fn) return document_dir.as_posix()
[docs] def transform_metadata(self, root_path): """ the transformation is necessary for dataset round 1 and 2 according to https://discourse.cord-19.semanticscholar.org/t/faqs-about-cord-19-dataset/94 the assumed directory under root_path: ./root_path ./metadata.csv ./comm_use_subset ./noncomm_use_subset ./custom_license ./biorxiv_medrxiv ./archive In a nutshell: 1. renaming: Microsoft Academic Paper ID -> mag_id; WHO #Covidence -> who_covidence_id 2. update: has_pdf_parse -> pdf_json_files # e.g. document_parses/pmc_json/PMC125340.xml.json has_pmc_xml_parse -> pmc_json_files """ metadata_csv = str(root_path / "metadata.csv") orifiles = ["arxiv", "custom_license", "biorxiv_medrxiv", "comm_use_subset", "noncomm_use_subset"] for fn in orifiles: if (root_path / fn).exists(): continue tar_fn = root_path / f"{fn}.tar.gz" if not tar_fn.exists(): continue with tarfile.open(str(tar_fn)) as f: f.extractall(path=root_path) os.remove(tar_fn) metadata = pd.read_csv(metadata_csv, header=0) columns = metadata.columns.values cols_before = [ "cord_uid", "sha", "source_x", "title", "doi", "pmcid", "pubmed_id", "license", "abstract", "publish_time", "authors", "journal", "Microsoft Academic Paper ID", "WHO #Covidence", "arxiv_id", "has_pdf_parse", "has_pmc_xml_parse", "full_text_file", "url", ] assert all(columns == cols_before) # step 1: rename column cols_to_rename = {"Microsoft Academic Paper ID": "mag_id", "WHO #Covidence": "who_covidence_id"} metadata.columns = [cols_to_rename.get(c, c) for c in columns] # step 2: parse path & move json file doc_outp = root_path / "document_parses" pdf_dir, pmc_dir = doc_outp / "pdf_json", doc_outp / "pmc_json" pdf_dir.mkdir(exist_ok=True, parents=True) pmc_dir.mkdir(exist_ok=True, parents=True) new_cols = ["pdf_json_files", "pmc_json_files"] for col in new_cols: metadata[col] = "" metadata["s2_id"] = math.nan # tmp, what's this column?? iterbar = tqdm(desc="transforming data", total=len(metadata)) for i, row in metadata.iterrows(): dir = row["full_text_file"] if row["has_pmc_xml_parse"]: name = row["pmcid"] + ".xml.json" ori_fn = root_path / dir / "pmc_json" / name pmc_fn = f"document_parses/pmc_json/{name}" metadata.at[i, "pmc_json_files"] = pmc_fn pmc_fn = root_path / pmc_fn if not pmc_fn.exists(): os.rename(ori_fn, pmc_fn) # check else: metadata.at[i, "pmc_json_files"] = math.nan if row["has_pdf_parse"]: shas = str(row["sha"]).split(";") pdf_fn_final = "" for sha in shas: name = sha.strip() + ".json" ori_fn = root_path / dir / "pdf_json" / name pdf_fn = f"document_parses/pdf_json/{name}" pdf_fn_final = f"{pdf_fn_final};{pdf_fn}" if pdf_fn_final else pdf_fn pdf_fn = root_path / pdf_fn if not pdf_fn.exists(): os.rename(ori_fn, pdf_fn) # check else: if ori_fn.exists(): assert filecmp.cmp(ori_fn, pdf_fn) os.remove(ori_fn) metadata.at[i, "pdf_json_files"] = pdf_fn_final else: metadata.at[i, "pdf_json_files"] = math.nan iterbar.update() # step 3: remove deprecated columns, remove unwanted directories cols_to_remove = ["has_pdf_parse", "has_pmc_xml_parse", "full_text_file"] metadata.drop(columns=cols_to_remove) dir_to_remove = ["comm_use_subset", "noncomm_use_subset", "custom_license", "biorxiv_medrxiv", "arxiv"] for dir in dir_to_remove: dir = root_path / dir for subdir in os.listdir(dir): os.rmdir(dir / subdir) # since we are supposed to move away all the files os.rmdir(dir) # assert len(metadata.columns) == 19 # step 4: save back metadata.to_csv(metadata_csv, index=False)