From 46391257ff50848efa1aa251ab3f15dc8b7a2d2c Mon Sep 17 00:00:00 2001 From: Jordan Gong Date: Sat, 27 Feb 2021 22:14:21 +0800 Subject: Implement Batch Hard triplet loss and soft margin --- utils/triplet_loss.py | 84 ++++++++++++++++++++++++++++++++++----------------- 1 file changed, 56 insertions(+), 28 deletions(-) (limited to 'utils/triplet_loss.py') diff --git a/utils/triplet_loss.py b/utils/triplet_loss.py index 0df2188..c3e5802 100644 --- a/utils/triplet_loss.py +++ b/utils/triplet_loss.py @@ -1,32 +1,36 @@ +from typing import Optional + import torch import torch.nn as nn import torch.nn.functional as F -class BatchAllTripletLoss(nn.Module): - def __init__(self, margin: float = 0.2): +class BatchTripletLoss(nn.Module): + def __init__( + self, + is_hard: bool = True, + margin: Optional[float] = 0.2, + ): super().__init__() + self.is_hard = is_hard self.margin = margin def forward(self, x, y): p, n, c = x.size() - dist = self._batch_distance(x) - positive_negative_dist = self._hard_distance(dist, y, p, n) - all_loss = F.relu(self.margin + positive_negative_dist).view(p, -1) - parted_loss_mean = self._none_zero_parted_mean(all_loss) - return parted_loss_mean + if self.is_hard: + positive_negative_dist = self._hard_distance(dist, y, p, n) + else: # is_all + positive_negative_dist = self._all_distance(dist, y, p, n) - @staticmethod - def _hard_distance(dist, y, p, n): - hard_positive_mask = y.unsqueeze(1) == y.unsqueeze(2) - hard_negative_mask = y.unsqueeze(1) != y.unsqueeze(2) - all_hard_positive = dist[hard_positive_mask].view(p, n, -1, 1) - all_hard_negative = dist[hard_negative_mask].view(p, n, 1, -1) - positive_negative_dist = all_hard_positive - all_hard_negative + if self.margin: + all_loss = F.relu(self.margin + positive_negative_dist).view(p, -1) + else: + all_loss = F.softplus(positive_negative_dist).view(p, -1) + non_zero_mean, non_zero_counts = self._none_zero_parted_mean(all_loss) - return positive_negative_dist + return non_zero_mean, dist.mean((1, 2)), non_zero_counts @staticmethod def _batch_distance(x): @@ -38,41 +42,65 @@ class BatchAllTripletLoss(nn.Module): dist = torch.sqrt( F.relu(x1_squared_sum - 2 * x1_times_x2_sum + x2_squared_sum) ) - return dist + @staticmethod + def _hard_distance(dist, y, p, n): + positive_mask = y.unsqueeze(1) == y.unsqueeze(2) + negative_mask = y.unsqueeze(1) != y.unsqueeze(2) + hard_positive = dist[positive_mask].view(p, n, -1).max(-1).values + hard_negative = dist[negative_mask].view(p, n, -1).min(-1).values + positive_negative_dist = hard_positive - hard_negative + + return positive_negative_dist + + @staticmethod + def _all_distance(dist, y, p, n): + positive_mask = y.unsqueeze(1) == y.unsqueeze(2) + negative_mask = y.unsqueeze(1) != y.unsqueeze(2) + all_positive = dist[positive_mask].view(p, n, -1, 1) + all_negative = dist[negative_mask].view(p, n, 1, -1) + positive_negative_dist = all_positive - all_negative + + return positive_negative_dist + @staticmethod def _none_zero_parted_mean(all_loss): # Non-zero parted mean - non_zero_counts = (all_loss != 0).sum(1) - parted_loss_mean = all_loss.sum(1) / non_zero_counts - parted_loss_mean[non_zero_counts == 0] = 0 + non_zero_counts = (all_loss != 0).sum(1).float() + non_zero_mean = all_loss.sum(1) / non_zero_counts + non_zero_mean[non_zero_counts == 0] = 0 - return parted_loss_mean + return non_zero_mean, non_zero_counts -class JointBatchAllTripletLoss(BatchAllTripletLoss): +class JointBatchTripletLoss(BatchTripletLoss): def __init__( self, hpm_num_parts: int, + is_hard: bool = True, margins: tuple[float, float] = (0.2, 0.2) ): - super().__init__() + super().__init__(is_hard) self.hpm_num_parts = hpm_num_parts self.margin_hpm, self.margin_pn = margins def forward(self, x, y): p, n, c = x.size() - dist = self._batch_distance(x) - positive_negative_dist = self._hard_distance(dist, y, p, n) + + if self.is_hard: + positive_negative_dist = self._hard_distance(dist, y, p, n) + else: # is_all + positive_negative_dist = self._all_distance(dist, y, p, n) + hpm_part_loss = F.relu( self.margin_hpm + positive_negative_dist[:self.hpm_num_parts] - ).view(self.hpm_num_parts, -1) + ) pn_part_loss = F.relu( self.margin_pn + positive_negative_dist[self.hpm_num_parts:] - ).view(p - self.hpm_num_parts, -1) + ) all_loss = torch.cat((hpm_part_loss, pn_part_loss)).view(p, -1) - parted_loss_mean = self._none_zero_parted_mean(all_loss) + non_zero_mean, non_zero_counts = self._none_zero_parted_mean(all_loss) - return parted_loss_mean + return non_zero_mean, dist.mean((1, 2)), non_zero_counts -- cgit v1.2.3