diff options
author | Jordan Gong <jordan.gong@protonmail.com> | 2021-03-01 18:23:33 +0800 |
---|---|---|
committer | Jordan Gong <jordan.gong@protonmail.com> | 2021-03-01 18:26:47 +0800 |
commit | d88e40217f56d96e568335ccee1f14ff3ea5a696 (patch) | |
tree | f24b204794fbb8fc501d2124ae67a73faf82db1f /utils/triplet_loss.py | |
parent | 5f75d7ef65f6dcd0e72df320c58b6bd141937b5f (diff) | |
parent | 6002b2d2017912f90e8917e6e8b71b78ce58e7c2 (diff) |
Merge branch 'master' into data_parallel
# Conflicts:
# models/model.py
Diffstat (limited to 'utils/triplet_loss.py')
-rw-r--r-- | utils/triplet_loss.py | 117 |
1 files changed, 83 insertions, 34 deletions
diff --git a/utils/triplet_loss.py b/utils/triplet_loss.py index 0df2188..e05b69d 100644 --- a/utils/triplet_loss.py +++ b/utils/triplet_loss.py @@ -1,32 +1,48 @@ +from typing import Optional + import torch import torch.nn as nn import torch.nn.functional as F -class BatchAllTripletLoss(nn.Module): - def __init__(self, margin: float = 0.2): +class BatchTripletLoss(nn.Module): + def __init__( + self, + is_hard: bool = True, + is_mean: bool = True, + margin: Optional[float] = 0.2, + ): super().__init__() + self.is_hard = is_hard + self.is_mean = is_mean self.margin = margin def forward(self, x, y): p, n, c = x.size() - dist = self._batch_distance(x) - positive_negative_dist = self._hard_distance(dist, y, p, n) - all_loss = F.relu(self.margin + positive_negative_dist).view(p, -1) - parted_loss_mean = self._none_zero_parted_mean(all_loss) - - return parted_loss_mean - - @staticmethod - def _hard_distance(dist, y, p, n): - hard_positive_mask = y.unsqueeze(1) == y.unsqueeze(2) - hard_negative_mask = y.unsqueeze(1) != y.unsqueeze(2) - all_hard_positive = dist[hard_positive_mask].view(p, n, -1, 1) - all_hard_negative = dist[hard_negative_mask].view(p, n, 1, -1) - positive_negative_dist = all_hard_positive - all_hard_negative - - return positive_negative_dist + flat_dist_mask = torch.tril_indices(n, n, offset=-1, device=dist.device) + flat_dist = dist[:, flat_dist_mask[0], flat_dist_mask[1]] + + if self.is_hard: + positive_negative_dist = self._hard_distance(dist, y, p, n) + else: # is_all + positive_negative_dist = self._all_distance(dist, y, p, n) + + if self.margin: + losses = F.relu(self.margin + positive_negative_dist).view(p, -1) + non_zero_counts = (losses != 0).sum(1).float() + if self.is_mean: + loss_metric = self._none_zero_mean(losses, non_zero_counts) + else: # is_sum + loss_metric = losses.sum(1) + return loss_metric, flat_dist, non_zero_counts + else: # Soft margin + losses = F.softplus(positive_negative_dist).view(p, -1) + if self.is_mean: + loss_metric = losses.mean(1) + else: # is_sum + loss_metric = losses.sum(1) + return loss_metric, flat_dist, None @staticmethod def _batch_distance(x): @@ -38,41 +54,74 @@ class BatchAllTripletLoss(nn.Module): dist = torch.sqrt( F.relu(x1_squared_sum - 2 * x1_times_x2_sum + x2_squared_sum) ) - return dist @staticmethod - def _none_zero_parted_mean(all_loss): - # Non-zero parted mean - non_zero_counts = (all_loss != 0).sum(1) - parted_loss_mean = all_loss.sum(1) / non_zero_counts - parted_loss_mean[non_zero_counts == 0] = 0 + def _hard_distance(dist, y, p, n): + positive_mask = y.unsqueeze(1) == y.unsqueeze(2) + negative_mask = y.unsqueeze(1) != y.unsqueeze(2) + hard_positive = dist[positive_mask].view(p, n, -1).max(-1).values + hard_negative = dist[negative_mask].view(p, n, -1).min(-1).values + positive_negative_dist = hard_positive - hard_negative - return parted_loss_mean + return positive_negative_dist + + @staticmethod + def _all_distance(dist, y, p, n): + # Unmask identical samples + positive_mask = torch.eye( + n, dtype=torch.bool, device=y.device + ) ^ (y.unsqueeze(1) == y.unsqueeze(2)) + negative_mask = y.unsqueeze(1) != y.unsqueeze(2) + all_positive = dist[positive_mask].view(p, n, -1, 1) + all_negative = dist[negative_mask].view(p, n, 1, -1) + positive_negative_dist = all_positive - all_negative + + return positive_negative_dist + + @staticmethod + def _none_zero_mean(losses, non_zero_counts): + # Non-zero parted mean + non_zero_mean = losses.sum(1) / non_zero_counts + non_zero_mean[non_zero_counts == 0] = 0 + return non_zero_mean -class JointBatchAllTripletLoss(BatchAllTripletLoss): +class JointBatchTripletLoss(BatchTripletLoss): def __init__( self, hpm_num_parts: int, + is_hard: bool = True, + is_mean: bool = True, margins: tuple[float, float] = (0.2, 0.2) ): - super().__init__() + super().__init__(is_hard, is_mean) self.hpm_num_parts = hpm_num_parts self.margin_hpm, self.margin_pn = margins def forward(self, x, y): p, n, c = x.size() - dist = self._batch_distance(x) - positive_negative_dist = self._hard_distance(dist, y, p, n) + flat_dist_mask = torch.tril_indices(n, n, offset=-1, device=dist.device) + flat_dist = dist[:, flat_dist_mask[0], flat_dist_mask[1]] + + if self.is_hard: + positive_negative_dist = self._hard_distance(dist, y, p, n) + else: # is_all + positive_negative_dist = self._all_distance(dist, y, p, n) + hpm_part_loss = F.relu( self.margin_hpm + positive_negative_dist[:self.hpm_num_parts] - ).view(self.hpm_num_parts, -1) + ) pn_part_loss = F.relu( self.margin_pn + positive_negative_dist[self.hpm_num_parts:] - ).view(p - self.hpm_num_parts, -1) - all_loss = torch.cat((hpm_part_loss, pn_part_loss)).view(p, -1) - parted_loss_mean = self._none_zero_parted_mean(all_loss) + ) + losses = torch.cat((hpm_part_loss, pn_part_loss)).view(p, -1) + + non_zero_counts = (losses != 0).sum(1).float() + if self.is_mean: + loss_metric = self._none_zero_mean(losses, non_zero_counts) + else: # is_sum + loss_metric = losses.sum(1) - return parted_loss_mean + return loss_metric, flat_dist, non_zero_counts |