-
Notifications
You must be signed in to change notification settings - Fork 6
/
InfoNCE.py
85 lines (65 loc) · 3.52 KB
/
InfoNCE.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import torch
import torch.nn.functional as F
from torch import nn
__all__ = ['InfoNCE', 'info_nce']
class InfoNCE(nn.Module):
def __init__(self, temperature=0.1, reduction='mean', negative_mode='unpaired'):
super().__init__()
self.temperature = temperature
self.reduction = reduction
self.negative_mode = negative_mode
def forward(self, query, positive_key, negative_keys=None):
return info_nce(query, positive_key, negative_keys,
temperature=self.temperature,
reduction=self.reduction,
negative_mode=self.negative_mode)
def info_nce(query, positive_key, negative_keys=None, temperature=0.1, reduction='mean', negative_mode='unpaired'):
# Check input dimensionality.
if query.dim() != 2:
raise ValueError('<query> must have 2 dimensions.')
if positive_key.dim() != 2:
raise ValueError('<positive_key> must have 2 dimensions.')
if negative_keys is not None:
if negative_mode == 'unpaired' and negative_keys.dim() != 2:
raise ValueError("<negative_keys> must have 2 dimensions if <negative_mode> == 'unpaired'.")
if negative_mode == 'paired' and negative_keys.dim() != 3:
raise ValueError("<negative_keys> must have 3 dimensions if <negative_mode> == 'paired'.")
# Check matching number of samples.
if len(query) != len(positive_key):
raise ValueError('<query> and <positive_key> must must have the same number of samples.')
if negative_keys is not None:
if negative_mode == 'paired' and len(query) != len(negative_keys):
raise ValueError("If negative_mode == 'paired', then <negative_keys> must have the same number of samples as <query>.")
# Embedding vectors should have same number of components.
if query.shape[-1] != positive_key.shape[-1]:
raise ValueError('Vectors of <query> and <positive_key> should have the same number of components.')
if negative_keys is not None:
if query.shape[-1] != negative_keys.shape[-1]:
raise ValueError('Vectors of <query> and <negative_keys> should have the same number of components.')
# Normalize to unit vectors
query, positive_key, negative_keys = normalize(query, positive_key, negative_keys)
if negative_keys is not None:
# Explicit negative keys
# Cosine between positive pairs
positive_logit = torch.sum(query * positive_key, dim=1, keepdim=True)
if negative_mode == 'unpaired':
# Cosine between all query-negative combinations
negative_logits = query @ transpose(negative_keys)
elif negative_mode == 'paired':
query = query.unsqueeze(1)
negative_logits = query @ transpose(negative_keys)
negative_logits = negative_logits.squeeze(1)
# First index in last dimension are the positive samples
logits = torch.cat([positive_logit, negative_logits], dim=1)
labels = torch.zeros(len(logits), dtype=torch.long, device=query.device)
else:
# Negative keys are implicitly off-diagonal positive keys.
# Cosine between all combinations
logits = query @ transpose(positive_key)
# Positive keys are the entries on the diagonal
labels = torch.arange(len(query), device=query.device)
return F.cross_entropy(logits / temperature, labels, reduction=reduction)
def transpose(x):
return x.transpose(-2, -1)
def normalize(*xs):
return [None if x is None else F.normalize(x, dim=-1) for x in xs]