import gzip
import os
import xml.etree.ElementTree as ET
from collections import defaultdict
[docs]def load_ntcir_topics(fn):
topics = {}
tree = ET.parse(fn)
for child in tree.getroot():
qid = child.find("qid").text.strip()
query = child.find("content").text.strip()
assert qid not in topics
assert len(qid) > 0 and len(query) > 0
topics[qid] = query
return {"content": topics}
[docs]def load_trec_topics(queryfn):
title, desc, narr = defaultdict(list), defaultdict(list), defaultdict(list)
block = None
if str(queryfn).endswith(".gz"):
openf = gzip.open
else:
openf = open
with openf(queryfn, "rt") as f:
for line in f:
line = line.strip()
if line.startswith("<num>"):
# <num> Number: 700
qid = line.split()[-1]
# no longer an int
# assert qid > 0
block = None
elif line.startswith("<title>"):
# <title> query here
title[qid].extend(line.strip().split()[1:])
block = "title"
# TODO does this sometimes start with Topic: ?
assert "Topic:" not in line
elif line.startswith("<desc>"):
# <desc> description \n description
desc[qid].extend(line.strip().split()[1:])
block = "desc"
elif line.startswith("<narr>"):
# same format as <desc>
narr[qid].extend(line.strip().split()[1:])
block = "narr"
elif line.startswith("</top>") or line.startswith("<top>"):
block = None
elif block == "title":
title[qid].extend(line.strip().split())
elif block == "desc":
desc[qid].extend(line.strip().split())
elif block == "narr":
narr[qid].extend(line.strip().split())
out = {}
if len(title) > 0:
out["title"] = {qid: " ".join(terms) for qid, terms in title.items()}
if len(desc) > 0:
out["desc"] = {qid: " ".join(terms) for qid, terms in desc.items()}
if len(narr) > 0:
out["narr"] = {qid: " ".join(terms) for qid, terms in narr.items()}
return out
[docs]def load_qrels(qrelfile, qids=None, include_spam=True):
labels = defaultdict(dict)
with open(qrelfile, "rt") as f:
for line in f:
line = line.strip()
if len(line) == 0:
continue
cols = line.split()
qid, docid, label = cols[0], cols[2], int(cols[3])
if qids is not None and qid not in qids:
continue
if label < 0 and not include_spam:
continue
labels[qid][docid] = label
# remove qids with no relevant docs
for qid in list(labels.keys()):
if max(labels[qid].values()) <= 0:
del labels[qid]
labels.default_factory = None # behave like normal dict
return labels
[docs]def document_to_trectxt(docno, txt):
s = f"<DOC>\n<DOCNO> {docno} </DOCNO>\n"
s += f"<TEXT>\n{txt}\n</TEXT>\n</DOC>\n"
return s
[docs]def topic_to_trectxt(qno, title, desc=None, narr=None):
return (
f"<top>\n\n"
f"<num> Number: {qno}\n"
f"<title> {title}\n\n"
f"<desc> Description:\n{desc or title}\n\n"
f"<narr> Narrative:\n{narr or title}\n\n"
f"</top>\n\n\n"
)
[docs]def anserini_index_to_trec_docs(index_dir, output_dir, expected_doc_count):
from jnius import autoclass
JFile = autoclass("java.io.File")
JFSDirectory = autoclass("org.apache.lucene.store.FSDirectory")
JIndexReaderUtils = autoclass("io.anserini.index.IndexReaderUtils")
JIndexUtils = autoclass("io.anserini.index.IndexUtils")
index_utils = JIndexUtils(index_dir)
index_reader_utils = JIndexReaderUtils()
fsdir = JFSDirectory.open(JFile(index_dir).toPath())
reader = autoclass("org.apache.lucene.index.DirectoryReader").open(fsdir)
docids = set()
for i in range(expected_doc_count):
try:
docid = index_reader_utils.convertLuceneDocidToDocid(reader, i)
docids.add(docid)
except: # lgtm [py/catch-base-exception]
# we reached the end?
pass
if len(docids) != expected_doc_count:
raise ValueError(
f"we expected to retrieve {expected_doc_count} documents from the index, but actually found {len(docids)}"
)
output_handles = [gzip.open(os.path.join(output_dir, f"{i}.gz"), "wt", encoding="utf-8") for i in range(100, 200)]
for docidx, docid in enumerate(sorted(docids)):
txt = document_to_trectxt(docid, index_utils.getRawDocument(docid))
handleidx = docidx % len(output_handles)
print(txt, file=output_handles[handleidx])
for handle in output_handles:
handle.close()