Skip to content

Commit

Permalink
Added a test that checks if the forward pass of the CRF module works …
Browse files Browse the repository at this point in the history
…on a GPU
  • Loading branch information
Academich committed Oct 30, 2024
1 parent fed5f80 commit 966af5a
Showing 1 changed file with 319 additions and 4 deletions.
323 changes: 319 additions & 4 deletions tests/test_crf.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,32 @@ def compute_score(crf, emission, tag):
return score


def make_crf(num_tags=5, batch_first=False):
return CRF(num_tags, batch_first=batch_first)
def make_crf(num_tags=5, batch_first=False, device=None):
model = CRF(num_tags, batch_first=batch_first)
if device is not None:
model = model.to(device)
return model


def make_emissions(crf, seq_length=3, batch_size=2):
def make_emissions(crf, seq_length=3, batch_size=2, device=None):
em = torch.randn(seq_length, batch_size, crf.num_tags)
if crf.batch_first:
em = em.transpose(0, 1)
if device is not None:
em = em.to(device)
return em


def make_tags(crf, seq_length=3, batch_size=2):
def make_tags(crf, seq_length=3, batch_size=2, device=None):
# shape: (seq_length, batch_size)
ts = torch.tensor([[random.randrange(crf.num_tags)
for b in range(batch_size)]
for _ in range(seq_length)],
dtype=torch.long)
if crf.batch_first:
ts = ts.transpose(0, 1)
if device is not None:
ts = ts.to(device)
return ts


Expand Down Expand Up @@ -340,6 +347,314 @@ def test_invalid_reduction(self):
assert 'invalid reduction: foo' in str(excinfo.value)


class TestForwardOnCuda:

def test_works_with_mask(self):
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
raise RuntimeError("No GPU with CUDA to test on.")
crf = make_crf(device=device)
seq_length, batch_size = 3, 2

# shape: (seq_length, batch_size, num_tags)
emissions = make_emissions(crf, seq_length, batch_size, device=device)
# shape: (seq_length, batch_size)
tags = make_tags(crf, seq_length, batch_size, device=device)
# mask should have size of (seq_length, batch_size)
mask = torch.tensor([[1, 1, 1], [1, 1, 0]], dtype=torch.bool, device=device).transpose(0, 1)

# shape: ()
llh = crf(emissions, tags, mask=mask)

# shape: (batch_size, seq_length, num_tags)
emissions = emissions.transpose(0, 1)
# shape: (batch_size, seq_length)
tags = tags.transpose(0, 1)
# shape: (batch_size, seq_length)
mask = mask.transpose(0, 1)

# Compute log likelihood manually
manual_llh = 0.
for emission, tag, mask_ in zip(emissions, tags, mask):
seq_len = mask_.sum()
emission, tag = emission[:seq_len], tag[:seq_len]
numerator = compute_score(crf, emission, tag)
all_scores = [
compute_score(crf, emission, t)
for t in itertools.product(range(crf.num_tags), repeat=seq_len)
]
denominator = math.log(sum(math.exp(s) for s in all_scores))
manual_llh += numerator - denominator

assert_close(llh, manual_llh)
llh.backward() # ensure gradients can be computed

def test_works_without_mask(self):
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
raise RuntimeError("No GPU with CUDA to test on.")
crf = make_crf(device=device)
# shape: (seq_length, batch_size, num_tags)
emissions = make_emissions(crf, device=device)
# shape: (seq_length, batch_size)
tags = make_tags(crf, device=device)

llh_no_mask = crf(emissions, tags)
# No mask means the mask is all ones
llh_mask = crf(emissions, tags, mask=torch.ones_like(tags).bool())

assert_close(llh_no_mask, llh_mask)

def test_batched_loss(self):
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
raise RuntimeError("No GPU with CUDA to test on.")
crf = make_crf(device=device)
batch_size = 10

# shape: (seq_length, batch_size, num_tags)
emissions = make_emissions(crf, batch_size=batch_size, device=device)
# shape: (seq_length, batch_size)
tags = make_tags(crf, batch_size=batch_size, device=device)

llh = crf(emissions, tags)
assert torch.is_tensor(llh)
assert llh.shape == ()

total_llh = 0.
for i in range(batch_size):
# shape: (seq_length, 1, num_tags)
emissions_ = emissions[:, i, :].unsqueeze(1)
# shape: (seq_length, 1)
tags_ = tags[:, i].unsqueeze(1)
# shape: ()
total_llh += crf(emissions_, tags_)

assert_close(llh, total_llh)

def test_reduction_none(self):
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
raise RuntimeError("No GPU with CUDA to test on.")
crf = make_crf(device=device)
# shape: (seq_length, batch_size, num_tags)
emissions = make_emissions(crf, device=device)
# shape: (seq_length, batch_size)
tags = make_tags(crf, device=device)

seq_length, batch_size = tags.shape

llh = crf(emissions, tags, reduction='none')

assert torch.is_tensor(llh)
assert llh.shape == (batch_size,)

# shape: (batch_size, seq_length, num_tags)
emissions = emissions.transpose(0, 1)
# shape: (batch_size, seq_length)
tags = tags.transpose(0, 1)

# Compute log likelihood manually
manual_llh = []
for emission, tag in zip(emissions, tags):
numerator = compute_score(crf, emission, tag)
all_scores = [
compute_score(crf, emission, t)
for t in itertools.product(range(crf.num_tags), repeat=seq_length)
]
denominator = math.log(sum(math.exp(s) for s in all_scores))
manual_llh.append(numerator - denominator)

assert_close(llh, torch.tensor(manual_llh, device=device))

def test_reduction_mean(self):
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
raise RuntimeError("No GPU with CUDA to test on.")
crf = make_crf(device=device)
# shape: (seq_length, batch_size, num_tags)
emissions = make_emissions(crf, device=device)
# shape: (seq_length, batch_size)
tags = make_tags(crf, device=device)

seq_length, batch_size = tags.shape

llh = crf(emissions, tags, reduction='mean')

assert torch.is_tensor(llh)
assert llh.shape == ()

# shape: (batch_size, seq_length, num_tags)
emissions = emissions.transpose(0, 1)
# shape: (batch_size, seq_length)
tags = tags.transpose(0, 1)

# Compute log likelihood manually
manual_llh = 0
for emission, tag in zip(emissions, tags):
numerator = compute_score(crf, emission, tag)
all_scores = [
compute_score(crf, emission, t)
for t in itertools.product(range(crf.num_tags), repeat=seq_length)
]
denominator = math.log(sum(math.exp(s) for s in all_scores))
manual_llh += numerator - denominator

assert_close(llh, manual_llh / batch_size)

def test_reduction_token_mean(self):
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
raise RuntimeError("No GPU with CUDA to test on.")
crf = make_crf(device=device)
seq_length, batch_size = 3, 2

# shape: (seq_length, batch_size, num_tags)
emissions = make_emissions(crf, seq_length, batch_size, device=device)
# shape: (seq_length, batch_size)
tags = make_tags(crf, seq_length, batch_size, device=device)
# mask should have size of (seq_length, batch_size)
mask = torch.tensor([[1, 1, 1], [1, 1, 0]], dtype=torch.bool, device=device).transpose(0, 1)

llh = crf(emissions, tags, mask=mask, reduction='token_mean')

assert torch.is_tensor(llh)
assert llh.shape == ()

# shape: (batch_size, seq_length, num_tags)
emissions = emissions.transpose(0, 1)
# shape: (batch_size, seq_length)
tags = tags.transpose(0, 1)
# shape: (batch_size, seq_length)
mask = mask.transpose(0, 1)

# Compute log likelihood manually
manual_llh, n_tokens = 0, 0
for emission, tag, mask_ in zip(emissions, tags, mask):
seq_len = mask_.sum()
emission, tag = emission[:seq_len], tag[:seq_len]
numerator = compute_score(crf, emission, tag)
all_scores = [
compute_score(crf, emission, t)
for t in itertools.product(range(crf.num_tags), repeat=seq_len)
]
denominator = math.log(sum(math.exp(s) for s in all_scores))
manual_llh += numerator - denominator
n_tokens += seq_len

assert_close(llh, manual_llh / n_tokens)

def test_batch_first(self):
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
raise RuntimeError("No GPU with CUDA to test on.")
crf = make_crf(device=device)
# shape: (seq_length, batch_size, num_tags)
emissions = make_emissions(crf, device=device)
# shape: (seq_length, batch_size)
tags = make_tags(crf, device=device)
llh = crf(emissions, tags)

crf_bf = make_crf(batch_first=True, device=device)
# Copy parameter values from non-batch-first CRF; requires_grad must be False
# to avoid runtime error of in-place operation on a leaf variable
crf_bf.start_transitions.requires_grad_(False).copy_(crf.start_transitions)
crf_bf.end_transitions.requires_grad_(False).copy_(crf.end_transitions)
crf_bf.transitions.requires_grad_(False).copy_(crf.transitions)

# shape: (batch_size, seq_length, num_tags)
emissions = emissions.transpose(0, 1)
# shape: (batch_size, seq_length)
tags = tags.transpose(0, 1)
llh_bf = crf_bf(emissions, tags)

assert_close(llh, llh_bf)

def test_emissions_has_bad_number_of_dimension(self):
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
raise RuntimeError("No GPU with CUDA to test on.")
emissions = torch.randn(1, 2).to(device)
tags = torch.empty(2, 2, dtype=torch.long, device=device)
crf = make_crf(device=device)

with pytest.raises(ValueError) as excinfo:
crf(emissions, tags)
assert 'emissions must have dimension of 3, got 2' in str(excinfo.value)

def test_emissions_and_tags_size_mismatch(self):
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
raise RuntimeError("No GPU with CUDA to test on.")
emissions = torch.randn(1, 2, 3).to(device)
tags = torch.empty(2, 2, dtype=torch.long, device=device)
crf = make_crf(3, device=device)

with pytest.raises(ValueError) as excinfo:
crf(emissions, tags)
assert (
'the first two dimensions of emissions and tags must match, '
'got (1, 2) and (2, 2)') in str(excinfo.value)

def test_emissions_last_dimension_not_equal_to_number_of_tags(self):
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
raise RuntimeError("No GPU with CUDA to test on.")
emissions = torch.randn(1, 2, 3).to(device)
tags = torch.empty(1, 2, dtype=torch.long, device=device)
crf = make_crf(10, device=device)

with pytest.raises(ValueError) as excinfo:
crf(emissions, tags)
assert 'expected last dimension of emissions is 10, got 3' in str(excinfo.value)

def test_first_timestep_mask_is_not_all_on(self):
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
raise RuntimeError("No GPU with CUDA to test on.")
emissions = torch.randn(3, 2, 4).to(device)
tags = torch.empty(3, 2, dtype=torch.long, device=device)
mask = torch.tensor([[1, 1, 1], [0, 0, 0]], dtype=torch.bool, device=device).transpose(0, 1)
crf = make_crf(4, device=device)

with pytest.raises(ValueError) as excinfo:
crf(emissions, tags, mask=mask)
assert 'mask of the first timestep must all be on' in str(excinfo.value)

emissions = emissions.transpose(0, 1)
tags = tags.transpose(0, 1)
mask = mask.transpose(0, 1)
crf = make_crf(4, batch_first=True)

with pytest.raises(ValueError) as excinfo:
crf(emissions, tags, mask=mask)
assert 'mask of the first timestep must all be on' in str(excinfo.value)

def test_invalid_reduction(self):
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
raise RuntimeError("No GPU with CUDA to test on.")
crf = make_crf(device=device)
emissions = make_emissions(crf, device=device)
tags = make_tags(crf, device=device)

with pytest.raises(ValueError) as excinfo:
crf(emissions, tags, reduction='foo')
assert 'invalid reduction: foo' in str(excinfo.value)


class TestDecode:
def test_works_with_mask(self):
crf = make_crf()
Expand Down

0 comments on commit 966af5a

Please sign in to comment.