import os
from collections import defaultdict
import numpy as np
import pytrec_eval
from capreolus.searcher import Searcher
from capreolus.utils.loginit import get_logger
[docs]logger = get_logger(__name__)
[docs]DEFAULT_METRICS = [
"P_1",
"P_5",
"P_10",
"P_20",
"judged_10",
"judged_20",
"judged_200",
"map",
"ndcg_cut_5",
"ndcg_cut_10",
"ndcg_cut_20",
"recall_100",
"recall_1000",
"recip_rank",
]
[docs]def judged(qrels, runs, n):
scores = []
for q, rundocs in runs.items():
if q not in qrels:
logger.error(f"{q} in run files cannot be found in qrels")
continue
topn = sorted(rundocs.keys(), key=rundocs.get, reverse=True)[:n]
score = sum(docid in qrels[q] for docid in topn) / len(topn)
scores.append(score)
return sum(scores) / len(scores)
def _eval_runs(runs, qrels, metrics, dev_qids, relevance_level):
assert isinstance(metrics, list)
calc_judged = [int(metric.split("_")[1]) for metric in metrics if metric.startswith("judged_")]
for n in calc_judged:
metrics.remove(f"judged_{n}")
dev_qrels = {qid: labels for qid, labels in qrels.items() if qid in dev_qids}
evaluator = pytrec_eval.RelevanceEvaluator(dev_qrels, metrics, relevance_level=int(relevance_level))
scores = [[metrics_dict.get(m, -1) for m in metrics] for metrics_dict in evaluator.evaluate(runs).values()]
scores = np.array(scores).mean(axis=0).tolist()
scores = dict(zip(metrics, scores))
for n in calc_judged:
scores[f"judged_{n}"] = judged(qrels, runs, n)
return scores
[docs]def eval_runs(runs, qrels, metrics, relevance_level=1):
"""
Evaluate runs produced by a ranker (or loaded with Searcher.load_trec_run)
Args:
runs: dict in the format ``{qid: {docid: score}}``
qrels: dict containing relevance judgements (e.g., ``benchmark.qrels``)
metrics (str or list): metrics to calculate (e.g., ``evaluator.DEFAULT_METRICS``)
relevance_level (int): relevance label threshold to use with non-graded metrics (equivalent to trec_eval's --level_for_rel)
Returns:
dict: a dict in the format ``{metric: score}`` containing the average score for each metric
"""
metrics = [metrics] if isinstance(metrics, str) else list(metrics)
return _eval_runs(runs, qrels, metrics, list(qrels.keys()), relevance_level)
[docs]def eval_runfile(runfile, qrels, metrics, relevance_level):
"""
Evaluate a single runfile produced by ranker or reranker
Args:
runfile: str, path to runfile
qrels: dict, containing the judgements provided by benchmark
metrics: str or list, metrics expected to calculate, e.g. ndcg_cut_20, etc
Returns:
a dict with format {metric: score}, containing the evaluation score of specified metrics
"""
metrics = [metrics] if isinstance(metrics, str) else list(metrics)
runs = Searcher.load_trec_run(runfile)
return _eval_runs(runs, qrels, metrics, list(qrels.keys()), relevance_level)
[docs]def search_best_run(runfile_dirs, benchmark, primary_metric, metrics=None, folds=None):
"""
Select the runfile with respect to the specified metric
Args:
runfile_dirs: the directory path to all the runfiles to select from
benchmark: Benchmark class
primary_metric: str, metric used to select the best runfile , e.g. ndcg_cut_20, etc
metrics: str or list, metric expected by be calculated on the best runs
folds: str, the name of fold to select from
Returns:
a dict storing specified metric score and path to the corresponding runfile
"""
if not isinstance(runfile_dirs, (list, tuple)):
runfile_dirs = [runfile_dirs]
metrics = [] if not metrics else ([metrics] if isinstance(metrics, str) else list(metrics))
if primary_metric not in metrics:
metrics = [primary_metric] + metrics
folds = {s: benchmark.folds[s] for s in [folds]} if folds else benchmark.folds
runfiles = [
os.path.join(runfile_dir, f)
for runfile_dir in runfile_dirs
for f in os.listdir(runfile_dir)
if (f != "done" and not os.path.isdir(os.path.join(runfile_dir, f)))
]
best_scores = {s: {primary_metric: 0, "path": None} for s in folds}
for runfile in runfiles:
runs = Searcher.load_trec_run(runfile)
for s, v in folds.items():
score = _eval_runs(
runs,
benchmark.qrels,
[primary_metric],
(set(v["train_qids"]) | set(v["predict"]["dev"])),
benchmark.relevance_level,
)[primary_metric]
if score > best_scores[s][primary_metric]:
best_scores[s] = {primary_metric: score, "path": runfile}
test_runs, test_qrels = {}, {}
for s, score_dict in best_scores.items():
test_qids = folds[s]["predict"]["test"]
test_runs.update({qid: v for qid, v in Searcher.load_trec_run(score_dict["path"]).items() if qid in test_qids})
test_qrels.update({qid: v for qid, v in benchmark.qrels.items() if qid in test_qids})
scores = eval_runs(test_runs, benchmark.qrels, metrics, benchmark.relevance_level)
return {"score": scores, "path": {s: v["path"] for s, v in best_scores.items()}}