Source code for capreolus.extractor.common

import numpy as np
from pymagnitude import Magnitude, MagnitudeUtils
import tensorflow as tf

from capreolus import constants, get_logger

[docs]logger = get_logger(__name__)
[docs]embedding_paths = { "glove6b": "glove/light/glove.6B.300d", "glove6b.50d": "glove/light/glove.6B.50d", "w2vnews": "word2vec/light/GoogleNews-vectors-negative300", "fasttext": "fasttext/light/wiki-news-300d-1M-subword",
}
[docs]pad_tok = "<pad>"
[docs]def load_pretrained_embeddings(embedding_name): if embedding_name not in embedding_paths: raise ValueError(f"embedding name '{embedding_name}' is not a recognized embedding: {sorted(embedding_paths.keys())}") embedding_cache = constants["CACHE_BASE_PATH"] / "embeddings" numpy_cache = embedding_cache / (embedding_name + ".npy") vocab_cache = embedding_cache / (embedding_name + ".vocab.txt") if numpy_cache.exists() and vocab_cache.exists(): logger.debug("loading embeddings from %s", numpy_cache) stoi, itos = load_vocab_file(vocab_cache) embeddings = np.load(numpy_cache, mmap_mode="r").reshape(len(stoi), -1) return embeddings, itos, stoi logger.debug("preparing embeddings and vocab") magnitude = Magnitude(MagnitudeUtils.download_model(embedding_paths[embedding_name], download_dir=embedding_cache)) terms, vectors = zip(*((term, vector) for term, vector in magnitude)) pad_vector = np.zeros(magnitude.dim, dtype=np.float32) terms = [pad_tok] + list(terms) vectors = np.array([pad_vector] + list(vectors), dtype=np.float32) itos = {idx: term for idx, term in enumerate(terms)} logger.debug("saving embeddings to %s", numpy_cache) np.save(numpy_cache, vectors, allow_pickle=False) save_vocab_file(itos, vocab_cache) stoi = {s: i for i, s in itos.items()} return vectors, itos, stoi
[docs]def load_vocab_file(fn): stoi, itos = {}, {} with open(fn, "rt") as f: for idx, line in enumerate(f): term = line.strip() stoi[term] = idx itos[idx] = term assert itos[0] == pad_tok return stoi, itos
[docs]def save_vocab_file(itos, fn): with open(fn, "wt") as outf: for idx, term in sorted(itos.items()): print(term, file=outf)
[docs]class MultipleTrainingPassagesMixin: """ Prepare and parse TF training feature that contain multiple passage per query. That is, the "pos_bert_input" features prepared by extractor's `id2vec()` function should have 3 dimension """
[docs] def create_tf_train_feature(self, sample): """ Returns a set of features from a doc. Of the num_passages passages that are present in a document, we use only a subset of it. params: sample - A dict where each entry has the shape [batch_size, num_passages, maxseqlen] Returns a list of features. Each feature is a dict, and each value in the dict has the shape [batch_size, maxseqlen]. Yes, the output shape is different to the input shape because we sample from the passages. """ num_passages = self.config["numpassages"] def _bytes_feature(value): """Returns a bytes_list from a string / byte. Our features are multi-dimensional tensors.""" if isinstance(value, type(tf.constant(0))): # if value ist tensor value = value.numpy() # get value of tensor return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value])) def transpose_neg_input(neg_inp): return tf.cast(tf.transpose(neg_inp, perm=[1, 0, 2]), tf.int64) posdoc, negdoc, negdoc_id = sample["pos_bert_input"], sample["neg_bert_input"], sample["negdocid"] posdoc_mask, posdoc_seg, negdoc_mask, negdoc_seg = ( sample["pos_mask"], sample["pos_seg"], sample["neg_mask"], sample["neg_seg"], ) label = sample["label"] features = [] negdoc = transpose_neg_input(negdoc) negdoc_seg = transpose_neg_input(negdoc_seg) negdoc_mask = transpose_neg_input(negdoc_mask) for i in range(num_passages): if i > 0 and self.rng.random() > self.config["prob"]: continue bert_input_line = posdoc[i] bert_input_line = " ".join(self.tokenizer.bert_tokenizer.convert_ids_to_tokens(list(bert_input_line))) passage = bert_input_line.split(self.sep_tok)[-2] # Ignore empty passages as well if passage.strip() == self.pad_tok: continue feature = { "pos_bert_input": _bytes_feature(tf.io.serialize_tensor(posdoc[i])), "pos_mask": _bytes_feature(tf.io.serialize_tensor(posdoc_mask[i])), "pos_seg": _bytes_feature(tf.io.serialize_tensor(posdoc_seg[i])), "neg_bert_input": _bytes_feature(tf.io.serialize_tensor(negdoc[i])), "neg_mask": _bytes_feature(tf.io.serialize_tensor(negdoc_mask[i])), "neg_seg": _bytes_feature(tf.io.serialize_tensor(negdoc_seg[i])), "label": _bytes_feature(tf.io.serialize_tensor(label[i])), } features.append(feature) return features
[docs] def parse_tf_train_example(self, example_proto): maxseqlen = self.config["maxseqlen"] feature_description = self.get_tf_feature_description() parsed_example = tf.io.parse_example(example_proto, feature_description) def parse_tensor_as_int(x): parsed_tensor = tf.io.parse_tensor(x, tf.int64) parsed_tensor.set_shape([maxseqlen]) return parsed_tensor def parse_neg_tensor_as_int(x): parsed_tensor = tf.io.parse_tensor(x, tf.int64) return parsed_tensor def parse_label_tensor(x): parsed_tensor = tf.io.parse_tensor(x, tf.float32) return parsed_tensor pos_bert_input = tf.map_fn(parse_tensor_as_int, parsed_example["pos_bert_input"], dtype=tf.int64) pos_mask = tf.map_fn(parse_tensor_as_int, parsed_example["pos_mask"], dtype=tf.int64) pos_seg = tf.map_fn(parse_tensor_as_int, parsed_example["pos_seg"], dtype=tf.int64) neg_bert_input = tf.map_fn(parse_neg_tensor_as_int, parsed_example["neg_bert_input"], dtype=tf.int64) neg_mask = tf.map_fn(parse_neg_tensor_as_int, parsed_example["neg_mask"], dtype=tf.int64) neg_seg = tf.map_fn(parse_neg_tensor_as_int, parsed_example["neg_seg"], dtype=tf.int64) label = tf.map_fn(parse_label_tensor, parsed_example["label"], dtype=tf.float32) return (pos_bert_input, pos_mask, pos_seg, neg_bert_input, neg_mask, neg_seg), label
[docs]class SingleTrainingPassagesMixin: """ Prepare and parse TF training feature that contain single passage per query. That is, the "pos_bert_input" features prepared by extractor's `id2vec()` function should have 2 dimension """
[docs] def create_tf_train_feature(self, sample): """ Returns a set of features from a doc. Of the num_passages passages that are present in a document, we use only a subset of it. params: sample - A dict where each entry has the shape [batch_size, num_passages, maxseqlen] Returns a list of features. Each feature is a dict, and each value in the dict has the shape [batch_size, maxseqlen]. Yes, the output shape is different to the input shape because we sample from the passages. """ num_passages = self.config["numpassages"] def _bytes_feature(value): """Returns a bytes_list from a string / byte. Our features are multi-dimensional tensors.""" if isinstance(value, type(tf.constant(0))): # if value ist tensor value = value.numpy() # get value of tensor return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value])) posdoc, negdoc, negdoc_id = sample["pos_bert_input"], sample["neg_bert_input"], sample["negdocid"] posdoc_mask, posdoc_seg, negdoc_mask, negdoc_seg = ( sample["pos_mask"], sample["pos_seg"], sample["neg_mask"], sample["neg_seg"], ) label = sample["label"] feature = { "pos_bert_input": _bytes_feature(tf.io.serialize_tensor(posdoc)), "pos_mask": _bytes_feature(tf.io.serialize_tensor(posdoc_mask)), "pos_seg": _bytes_feature(tf.io.serialize_tensor(posdoc_seg)), "neg_bert_input": _bytes_feature(tf.io.serialize_tensor(negdoc)), "neg_mask": _bytes_feature(tf.io.serialize_tensor(negdoc_mask)), "neg_seg": _bytes_feature(tf.io.serialize_tensor(negdoc_seg)), "label": _bytes_feature(tf.io.serialize_tensor(label)), } return [feature]
[docs] def parse_tf_train_example(self, example_proto): feature_description = self.get_tf_feature_description() parsed_example = tf.io.parse_example(example_proto, feature_description) def parse_tensor_as_int(x): parsed_tensor = tf.io.parse_tensor(x, tf.int64) parsed_tensor.set_shape([self.config["maxseqlen"]]) return parsed_tensor def parse_label_tensor(x): parsed_tensor = tf.io.parse_tensor(x, tf.float32) parsed_tensor.set_shape([2]) return parsed_tensor pos_bert_input = tf.map_fn(parse_tensor_as_int, parsed_example["pos_bert_input"], dtype=tf.int64) pos_mask = tf.map_fn(parse_tensor_as_int, parsed_example["pos_mask"], dtype=tf.int64) pos_seg = tf.map_fn(parse_tensor_as_int, parsed_example["pos_seg"], dtype=tf.int64) neg_bert_input = tf.map_fn(parse_tensor_as_int, parsed_example["neg_bert_input"], dtype=tf.int64) neg_mask = tf.map_fn(parse_tensor_as_int, parsed_example["neg_mask"], dtype=tf.int64) neg_seg = tf.map_fn(parse_tensor_as_int, parsed_example["neg_seg"], dtype=tf.int64) label = tf.map_fn(parse_label_tensor, parsed_example["label"], dtype=tf.float32) return (pos_bert_input, pos_mask, pos_seg, neg_bert_input, neg_mask, neg_seg), label