From 1084d427731b81e6ca207727e402b636b4a544d9 Mon Sep 17 00:00:00 2001 From: Ftps Date: Sun, 19 Nov 2023 04:29:35 +0900 Subject: [PATCH] Add infer_pack --- rvc/lib/infer_pack/attentions.py | 459 ++++++ rvc/lib/infer_pack/commons.py | 172 ++ rvc/lib/infer_pack/models.py | 1420 +++++++++++++++++ rvc/lib/infer_pack/models_onnx.py | 821 ++++++++++ rvc/lib/infer_pack/modules.py | 615 +++++++ .../modules/F0Predictor/DioF0Predictor.py | 91 ++ .../modules/F0Predictor/F0Predictor.py | 16 + .../modules/F0Predictor/HarvestF0Predictor.py | 87 + .../modules/F0Predictor/PMF0Predictor.py | 98 ++ .../modules/F0Predictor/__init__.py | 0 rvc/lib/infer_pack/onnx_inference.py | 149 ++ rvc/lib/infer_pack/transforms.py | 207 +++ 12 files changed, 4135 insertions(+) create mode 100644 rvc/lib/infer_pack/attentions.py create mode 100644 rvc/lib/infer_pack/commons.py create mode 100644 rvc/lib/infer_pack/models.py create mode 100644 rvc/lib/infer_pack/models_onnx.py create mode 100644 rvc/lib/infer_pack/modules.py create mode 100644 rvc/lib/infer_pack/modules/F0Predictor/DioF0Predictor.py create mode 100644 rvc/lib/infer_pack/modules/F0Predictor/F0Predictor.py create mode 100644 rvc/lib/infer_pack/modules/F0Predictor/HarvestF0Predictor.py create mode 100644 rvc/lib/infer_pack/modules/F0Predictor/PMF0Predictor.py create mode 100644 rvc/lib/infer_pack/modules/F0Predictor/__init__.py create mode 100644 rvc/lib/infer_pack/onnx_inference.py create mode 100644 rvc/lib/infer_pack/transforms.py diff --git a/rvc/lib/infer_pack/attentions.py b/rvc/lib/infer_pack/attentions.py new file mode 100644 index 0000000..0778c40 --- /dev/null +++ b/rvc/lib/infer_pack/attentions.py @@ -0,0 +1,459 @@ +import copy +import math +from typing import Optional + +import numpy as np +import torch +from torch import nn +from torch.nn import functional as F + +from rvc.lib.infer_pack import commons, modules +from rvc.lib.infer_pack.modules import LayerNorm + + +class Encoder(nn.Module): + def __init__( + self, + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size=1, + p_dropout=0.0, + window_size=10, + **kwargs + ): + super(Encoder, self).__init__() + self.hidden_channels = hidden_channels + self.filter_channels = filter_channels + self.n_heads = n_heads + self.n_layers = int(n_layers) + self.kernel_size = kernel_size + self.p_dropout = p_dropout + self.window_size = window_size + + self.drop = nn.Dropout(p_dropout) + self.attn_layers = nn.ModuleList() + self.norm_layers_1 = nn.ModuleList() + self.ffn_layers = nn.ModuleList() + self.norm_layers_2 = nn.ModuleList() + for i in range(self.n_layers): + self.attn_layers.append( + MultiHeadAttention( + hidden_channels, + hidden_channels, + n_heads, + p_dropout=p_dropout, + window_size=window_size, + ) + ) + self.norm_layers_1.append(LayerNorm(hidden_channels)) + self.ffn_layers.append( + FFN( + hidden_channels, + hidden_channels, + filter_channels, + kernel_size, + p_dropout=p_dropout, + ) + ) + self.norm_layers_2.append(LayerNorm(hidden_channels)) + + def forward(self, x, x_mask): + attn_mask = x_mask.unsqueeze(2) * x_mask.unsqueeze(-1) + x = x * x_mask + zippep = zip( + self.attn_layers, self.norm_layers_1, self.ffn_layers, self.norm_layers_2 + ) + for attn_layers, norm_layers_1, ffn_layers, norm_layers_2 in zippep: + y = attn_layers(x, x, attn_mask) + y = self.drop(y) + x = norm_layers_1(x + y) + + y = ffn_layers(x, x_mask) + y = self.drop(y) + x = norm_layers_2(x + y) + x = x * x_mask + return x + + +class Decoder(nn.Module): + def __init__( + self, + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size=1, + p_dropout=0.0, + proximal_bias=False, + proximal_init=True, + **kwargs + ): + super(Decoder, self).__init__() + self.hidden_channels = hidden_channels + self.filter_channels = filter_channels + self.n_heads = n_heads + self.n_layers = n_layers + self.kernel_size = kernel_size + self.p_dropout = p_dropout + self.proximal_bias = proximal_bias + self.proximal_init = proximal_init + + self.drop = nn.Dropout(p_dropout) + self.self_attn_layers = nn.ModuleList() + self.norm_layers_0 = nn.ModuleList() + self.encdec_attn_layers = nn.ModuleList() + self.norm_layers_1 = nn.ModuleList() + self.ffn_layers = nn.ModuleList() + self.norm_layers_2 = nn.ModuleList() + for i in range(self.n_layers): + self.self_attn_layers.append( + MultiHeadAttention( + hidden_channels, + hidden_channels, + n_heads, + p_dropout=p_dropout, + proximal_bias=proximal_bias, + proximal_init=proximal_init, + ) + ) + self.norm_layers_0.append(LayerNorm(hidden_channels)) + self.encdec_attn_layers.append( + MultiHeadAttention( + hidden_channels, hidden_channels, n_heads, p_dropout=p_dropout + ) + ) + self.norm_layers_1.append(LayerNorm(hidden_channels)) + self.ffn_layers.append( + FFN( + hidden_channels, + hidden_channels, + filter_channels, + kernel_size, + p_dropout=p_dropout, + causal=True, + ) + ) + self.norm_layers_2.append(LayerNorm(hidden_channels)) + + def forward(self, x, x_mask, h, h_mask): + """ + x: decoder input + h: encoder output + """ + self_attn_mask = commons.subsequent_mask(x_mask.size(2)).to( + device=x.device, dtype=x.dtype + ) + encdec_attn_mask = h_mask.unsqueeze(2) * x_mask.unsqueeze(-1) + x = x * x_mask + for i in range(self.n_layers): + y = self.self_attn_layers[i](x, x, self_attn_mask) + y = self.drop(y) + x = self.norm_layers_0[i](x + y) + + y = self.encdec_attn_layers[i](x, h, encdec_attn_mask) + y = self.drop(y) + x = self.norm_layers_1[i](x + y) + + y = self.ffn_layers[i](x, x_mask) + y = self.drop(y) + x = self.norm_layers_2[i](x + y) + x = x * x_mask + return x + + +class MultiHeadAttention(nn.Module): + def __init__( + self, + channels, + out_channels, + n_heads, + p_dropout=0.0, + window_size=None, + heads_share=True, + block_length=None, + proximal_bias=False, + proximal_init=False, + ): + super(MultiHeadAttention, self).__init__() + assert channels % n_heads == 0 + + self.channels = channels + self.out_channels = out_channels + self.n_heads = n_heads + self.p_dropout = p_dropout + self.window_size = window_size + self.heads_share = heads_share + self.block_length = block_length + self.proximal_bias = proximal_bias + self.proximal_init = proximal_init + self.attn = None + + self.k_channels = channels // n_heads + self.conv_q = nn.Conv1d(channels, channels, 1) + self.conv_k = nn.Conv1d(channels, channels, 1) + self.conv_v = nn.Conv1d(channels, channels, 1) + self.conv_o = nn.Conv1d(channels, out_channels, 1) + self.drop = nn.Dropout(p_dropout) + + if window_size is not None: + n_heads_rel = 1 if heads_share else n_heads + rel_stddev = self.k_channels**-0.5 + self.emb_rel_k = nn.Parameter( + torch.randn(n_heads_rel, window_size * 2 + 1, self.k_channels) + * rel_stddev + ) + self.emb_rel_v = nn.Parameter( + torch.randn(n_heads_rel, window_size * 2 + 1, self.k_channels) + * rel_stddev + ) + + nn.init.xavier_uniform_(self.conv_q.weight) + nn.init.xavier_uniform_(self.conv_k.weight) + nn.init.xavier_uniform_(self.conv_v.weight) + if proximal_init: + with torch.no_grad(): + self.conv_k.weight.copy_(self.conv_q.weight) + self.conv_k.bias.copy_(self.conv_q.bias) + + def forward( + self, x: torch.Tensor, c: torch.Tensor, attn_mask: Optional[torch.Tensor] = None + ): + q = self.conv_q(x) + k = self.conv_k(c) + v = self.conv_v(c) + + x, _ = self.attention(q, k, v, mask=attn_mask) + + x = self.conv_o(x) + return x + + def attention( + self, + query: torch.Tensor, + key: torch.Tensor, + value: torch.Tensor, + mask: Optional[torch.Tensor] = None, + ): + # reshape [b, d, t] -> [b, n_h, t, d_k] + b, d, t_s = key.size() + t_t = query.size(2) + query = query.view(b, self.n_heads, self.k_channels, t_t).transpose(2, 3) + key = key.view(b, self.n_heads, self.k_channels, t_s).transpose(2, 3) + value = value.view(b, self.n_heads, self.k_channels, t_s).transpose(2, 3) + + scores = torch.matmul(query / math.sqrt(self.k_channels), key.transpose(-2, -1)) + if self.window_size is not None: + assert ( + t_s == t_t + ), "Relative attention is only available for self-attention." + key_relative_embeddings = self._get_relative_embeddings(self.emb_rel_k, t_s) + rel_logits = self._matmul_with_relative_keys( + query / math.sqrt(self.k_channels), key_relative_embeddings + ) + scores_local = self._relative_position_to_absolute_position(rel_logits) + scores = scores + scores_local + if self.proximal_bias: + assert t_s == t_t, "Proximal bias is only available for self-attention." + scores = scores + self._attention_bias_proximal(t_s).to( + device=scores.device, dtype=scores.dtype + ) + if mask is not None: + scores = scores.masked_fill(mask == 0, -1e4) + if self.block_length is not None: + assert ( + t_s == t_t + ), "Local attention is only available for self-attention." + block_mask = ( + torch.ones_like(scores) + .triu(-self.block_length) + .tril(self.block_length) + ) + scores = scores.masked_fill(block_mask == 0, -1e4) + p_attn = F.softmax(scores, dim=-1) # [b, n_h, t_t, t_s] + p_attn = self.drop(p_attn) + output = torch.matmul(p_attn, value) + if self.window_size is not None: + relative_weights = self._absolute_position_to_relative_position(p_attn) + value_relative_embeddings = self._get_relative_embeddings( + self.emb_rel_v, t_s + ) + output = output + self._matmul_with_relative_values( + relative_weights, value_relative_embeddings + ) + output = ( + output.transpose(2, 3).contiguous().view(b, d, t_t) + ) # [b, n_h, t_t, d_k] -> [b, d, t_t] + return output, p_attn + + def _matmul_with_relative_values(self, x, y): + """ + x: [b, h, l, m] + y: [h or 1, m, d] + ret: [b, h, l, d] + """ + ret = torch.matmul(x, y.unsqueeze(0)) + return ret + + def _matmul_with_relative_keys(self, x, y): + """ + x: [b, h, l, d] + y: [h or 1, m, d] + ret: [b, h, l, m] + """ + ret = torch.matmul(x, y.unsqueeze(0).transpose(-2, -1)) + return ret + + def _get_relative_embeddings(self, relative_embeddings, length: int): + max_relative_position = 2 * self.window_size + 1 + # Pad first before slice to avoid using cond ops. + pad_length: int = max(length - (self.window_size + 1), 0) + slice_start_position = max((self.window_size + 1) - length, 0) + slice_end_position = slice_start_position + 2 * length - 1 + if pad_length > 0: + padded_relative_embeddings = F.pad( + relative_embeddings, + # commons.convert_pad_shape([[0, 0], [pad_length, pad_length], [0, 0]]), + [0, 0, pad_length, pad_length, 0, 0], + ) + else: + padded_relative_embeddings = relative_embeddings + used_relative_embeddings = padded_relative_embeddings[ + :, slice_start_position:slice_end_position + ] + return used_relative_embeddings + + def _relative_position_to_absolute_position(self, x): + """ + x: [b, h, l, 2*l-1] + ret: [b, h, l, l] + """ + batch, heads, length, _ = x.size() + # Concat columns of pad to shift from relative to absolute indexing. + x = F.pad( + x, + # commons.convert_pad_shape([[0, 0], [0, 0], [0, 0], [0, 1]]) + [0, 1, 0, 0, 0, 0, 0, 0], + ) + + # Concat extra elements so to add up to shape (len+1, 2*len-1). + x_flat = x.view([batch, heads, length * 2 * length]) + x_flat = F.pad( + x_flat, + # commons.convert_pad_shape([[0, 0], [0, 0], [0, int(length) - 1]]) + [0, int(length) - 1, 0, 0, 0, 0], + ) + + # Reshape and slice out the padded elements. + x_final = x_flat.view([batch, heads, length + 1, 2 * length - 1])[ + :, :, :length, length - 1 : + ] + return x_final + + def _absolute_position_to_relative_position(self, x): + """ + x: [b, h, l, l] + ret: [b, h, l, 2*l-1] + """ + batch, heads, length, _ = x.size() + # padd along column + x = F.pad( + x, + # commons.convert_pad_shape([[0, 0], [0, 0], [0, 0], [0, int(length) - 1]]) + [0, int(length) - 1, 0, 0, 0, 0, 0, 0], + ) + x_flat = x.view([batch, heads, int(length**2) + int(length * (length - 1))]) + # add 0's in the beginning that will skew the elements after reshape + x_flat = F.pad( + x_flat, + # commons.convert_pad_shape([[0, 0], [0, 0], [int(length), 0]]) + [length, 0, 0, 0, 0, 0], + ) + x_final = x_flat.view([batch, heads, length, 2 * length])[:, :, :, 1:] + return x_final + + def _attention_bias_proximal(self, length: int): + """Bias for self-attention to encourage attention to close positions. + Args: + length: an integer scalar. + Returns: + a Tensor with shape [1, 1, length, length] + """ + r = torch.arange(length, dtype=torch.float32) + diff = torch.unsqueeze(r, 0) - torch.unsqueeze(r, 1) + return torch.unsqueeze(torch.unsqueeze(-torch.log1p(torch.abs(diff)), 0), 0) + + +class FFN(nn.Module): + def __init__( + self, + in_channels, + out_channels, + filter_channels, + kernel_size, + p_dropout=0.0, + activation: str = None, + causal=False, + ): + super(FFN, self).__init__() + self.in_channels = in_channels + self.out_channels = out_channels + self.filter_channels = filter_channels + self.kernel_size = kernel_size + self.p_dropout = p_dropout + self.activation = activation + self.causal = causal + self.is_activation = True if activation == "gelu" else False + # if causal: + # self.padding = self._causal_padding + # else: + # self.padding = self._same_padding + + self.conv_1 = nn.Conv1d(in_channels, filter_channels, kernel_size) + self.conv_2 = nn.Conv1d(filter_channels, out_channels, kernel_size) + self.drop = nn.Dropout(p_dropout) + + def padding(self, x: torch.Tensor, x_mask: torch.Tensor) -> torch.Tensor: + if self.causal: + padding = self._causal_padding(x * x_mask) + else: + padding = self._same_padding(x * x_mask) + return padding + + def forward(self, x: torch.Tensor, x_mask: torch.Tensor): + x = self.conv_1(self.padding(x, x_mask)) + if self.is_activation: + x = x * torch.sigmoid(1.702 * x) + else: + x = torch.relu(x) + x = self.drop(x) + + x = self.conv_2(self.padding(x, x_mask)) + return x * x_mask + + def _causal_padding(self, x): + if self.kernel_size == 1: + return x + pad_l: int = self.kernel_size - 1 + pad_r: int = 0 + # padding = [[0, 0], [0, 0], [pad_l, pad_r]] + x = F.pad( + x, + # commons.convert_pad_shape(padding) + [pad_l, pad_r, 0, 0, 0, 0], + ) + return x + + def _same_padding(self, x): + if self.kernel_size == 1: + return x + pad_l: int = (self.kernel_size - 1) // 2 + pad_r: int = self.kernel_size // 2 + # padding = [[0, 0], [0, 0], [pad_l, pad_r]] + x = F.pad( + x, + # commons.convert_pad_shape(padding) + [pad_l, pad_r, 0, 0, 0, 0], + ) + return x diff --git a/rvc/lib/infer_pack/commons.py b/rvc/lib/infer_pack/commons.py new file mode 100644 index 0000000..85594be --- /dev/null +++ b/rvc/lib/infer_pack/commons.py @@ -0,0 +1,172 @@ +import math +from typing import List, Optional + +import numpy as np +import torch +from torch import nn +from torch.nn import functional as F + + +def init_weights(m, mean=0.0, std=0.01): + classname = m.__class__.__name__ + if classname.find("Conv") != -1: + m.weight.data.normal_(mean, std) + + +def get_padding(kernel_size, dilation=1): + return int((kernel_size * dilation - dilation) / 2) + + +# def convert_pad_shape(pad_shape): +# l = pad_shape[::-1] +# pad_shape = [item for sublist in l for item in sublist] +# return pad_shape + + +def kl_divergence(m_p, logs_p, m_q, logs_q): + """KL(P||Q)""" + kl = (logs_q - logs_p) - 0.5 + kl += ( + 0.5 * (torch.exp(2.0 * logs_p) + ((m_p - m_q) ** 2)) * torch.exp(-2.0 * logs_q) + ) + return kl + + +def rand_gumbel(shape): + """Sample from the Gumbel distribution, protect from overflows.""" + uniform_samples = torch.rand(shape) * 0.99998 + 0.00001 + return -torch.log(-torch.log(uniform_samples)) + + +def rand_gumbel_like(x): + g = rand_gumbel(x.size()).to(dtype=x.dtype, device=x.device) + return g + + +def slice_segments(x, ids_str, segment_size=4): + ret = torch.zeros_like(x[:, :, :segment_size]) + for i in range(x.size(0)): + idx_str = ids_str[i] + idx_end = idx_str + segment_size + ret[i] = x[i, :, idx_str:idx_end] + return ret + + +def slice_segments2(x, ids_str, segment_size=4): + ret = torch.zeros_like(x[:, :segment_size]) + for i in range(x.size(0)): + idx_str = ids_str[i] + idx_end = idx_str + segment_size + ret[i] = x[i, idx_str:idx_end] + return ret + + +def rand_slice_segments(x, x_lengths=None, segment_size=4): + b, d, t = x.size() + if x_lengths is None: + x_lengths = t + ids_str_max = x_lengths - segment_size + 1 + ids_str = (torch.rand([b]).to(device=x.device) * ids_str_max).to(dtype=torch.long) + ret = slice_segments(x, ids_str, segment_size) + return ret, ids_str + + +def get_timing_signal_1d(length, channels, min_timescale=1.0, max_timescale=1.0e4): + position = torch.arange(length, dtype=torch.float) + num_timescales = channels // 2 + log_timescale_increment = math.log(float(max_timescale) / float(min_timescale)) / ( + num_timescales - 1 + ) + inv_timescales = min_timescale * torch.exp( + torch.arange(num_timescales, dtype=torch.float) * -log_timescale_increment + ) + scaled_time = position.unsqueeze(0) * inv_timescales.unsqueeze(1) + signal = torch.cat([torch.sin(scaled_time), torch.cos(scaled_time)], 0) + signal = F.pad(signal, [0, 0, 0, channels % 2]) + signal = signal.view(1, channels, length) + return signal + + +def add_timing_signal_1d(x, min_timescale=1.0, max_timescale=1.0e4): + b, channels, length = x.size() + signal = get_timing_signal_1d(length, channels, min_timescale, max_timescale) + return x + signal.to(dtype=x.dtype, device=x.device) + + +def cat_timing_signal_1d(x, min_timescale=1.0, max_timescale=1.0e4, axis=1): + b, channels, length = x.size() + signal = get_timing_signal_1d(length, channels, min_timescale, max_timescale) + return torch.cat([x, signal.to(dtype=x.dtype, device=x.device)], axis) + + +def subsequent_mask(length): + mask = torch.tril(torch.ones(length, length)).unsqueeze(0).unsqueeze(0) + return mask + + +@torch.jit.script +def fused_add_tanh_sigmoid_multiply(input_a, input_b, n_channels): + n_channels_int = n_channels[0] + in_act = input_a + input_b + t_act = torch.tanh(in_act[:, :n_channels_int, :]) + s_act = torch.sigmoid(in_act[:, n_channels_int:, :]) + acts = t_act * s_act + return acts + + +# def convert_pad_shape(pad_shape): +# l = pad_shape[::-1] +# pad_shape = [item for sublist in l for item in sublist] +# return pad_shape + + +def convert_pad_shape(pad_shape: List[List[int]]) -> List[int]: + return torch.tensor(pad_shape).flip(0).reshape(-1).int().tolist() + + +def shift_1d(x): + x = F.pad(x, convert_pad_shape([[0, 0], [0, 0], [1, 0]]))[:, :, :-1] + return x + + +def sequence_mask(length: torch.Tensor, max_length: Optional[int] = None): + if max_length is None: + max_length = length.max() + x = torch.arange(max_length, dtype=length.dtype, device=length.device) + return x.unsqueeze(0) < length.unsqueeze(1) + + +def generate_path(duration, mask): + """ + duration: [b, 1, t_x] + mask: [b, 1, t_y, t_x] + """ + device = duration.device + + b, _, t_y, t_x = mask.shape + cum_duration = torch.cumsum(duration, -1) + + cum_duration_flat = cum_duration.view(b * t_x) + path = sequence_mask(cum_duration_flat, t_y).to(mask.dtype) + path = path.view(b, t_x, t_y) + path = path - F.pad(path, convert_pad_shape([[0, 0], [1, 0], [0, 0]]))[:, :-1] + path = path.unsqueeze(1).transpose(2, 3) * mask + return path + + +def clip_grad_value_(parameters, clip_value, norm_type=2): + if isinstance(parameters, torch.Tensor): + parameters = [parameters] + parameters = list(filter(lambda p: p.grad is not None, parameters)) + norm_type = float(norm_type) + if clip_value is not None: + clip_value = float(clip_value) + + total_norm = 0 + for p in parameters: + param_norm = p.grad.data.norm(norm_type) + total_norm += param_norm.item() ** norm_type + if clip_value is not None: + p.grad.data.clamp_(min=-clip_value, max=clip_value) + total_norm = total_norm ** (1.0 / norm_type) + return total_norm diff --git a/rvc/lib/infer_pack/models.py b/rvc/lib/infer_pack/models.py new file mode 100644 index 0000000..2699bd8 --- /dev/null +++ b/rvc/lib/infer_pack/models.py @@ -0,0 +1,1420 @@ +import logging +import math +from typing import Optional + +logger = logging.getLogger(__name__) + +import numpy as np +import torch +from torch import nn +from torch.nn import AvgPool1d, Conv1d, Conv2d, ConvTranspose1d +from torch.nn import functional as F +from torch.nn.utils import remove_weight_norm, spectral_norm, weight_norm + +from rvc.lib.infer_pack import attentions, commons, modules +from rvc.lib.infer_pack.commons import get_padding, init_weights + +has_xpu = bool(hasattr(torch, "xpu") and torch.xpu.is_available()) + + +class TextEncoder256(nn.Module): + def __init__( + self, + out_channels, + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size, + p_dropout, + f0=True, + ): + super(TextEncoder256, self).__init__() + self.out_channels = out_channels + self.hidden_channels = hidden_channels + self.filter_channels = filter_channels + self.n_heads = n_heads + self.n_layers = n_layers + self.kernel_size = kernel_size + self.p_dropout = float(p_dropout) + self.emb_phone = nn.Linear(256, hidden_channels) + self.lrelu = nn.LeakyReLU(0.1, inplace=True) + if f0 == True: + self.emb_pitch = nn.Embedding(256, hidden_channels) # pitch 256 + self.encoder = attentions.Encoder( + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size, + float(p_dropout), + ) + self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1) + + def forward( + self, phone: torch.Tensor, pitch: Optional[torch.Tensor], lengths: torch.Tensor + ): + if pitch is None: + x = self.emb_phone(phone) + else: + x = self.emb_phone(phone) + self.emb_pitch(pitch) + x = x * math.sqrt(self.hidden_channels) # [b, t, h] + x = self.lrelu(x) + x = torch.transpose(x, 1, -1) # [b, h, t] + x_mask = torch.unsqueeze(commons.sequence_mask(lengths, x.size(2)), 1).to( + x.dtype + ) + x = self.encoder(x * x_mask, x_mask) + stats = self.proj(x) * x_mask + + m, logs = torch.split(stats, self.out_channels, dim=1) + return m, logs, x_mask + + +class TextEncoder768(nn.Module): + def __init__( + self, + out_channels, + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size, + p_dropout, + f0=True, + ): + super(TextEncoder768, self).__init__() + self.out_channels = out_channels + self.hidden_channels = hidden_channels + self.filter_channels = filter_channels + self.n_heads = n_heads + self.n_layers = n_layers + self.kernel_size = kernel_size + self.p_dropout = float(p_dropout) + self.emb_phone = nn.Linear(768, hidden_channels) + self.lrelu = nn.LeakyReLU(0.1, inplace=True) + if f0 == True: + self.emb_pitch = nn.Embedding(256, hidden_channels) # pitch 256 + self.encoder = attentions.Encoder( + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size, + float(p_dropout), + ) + self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1) + + def forward(self, phone: torch.Tensor, pitch: torch.Tensor, lengths: torch.Tensor): + if pitch is None: + x = self.emb_phone(phone) + else: + x = self.emb_phone(phone) + self.emb_pitch(pitch) + x = x * math.sqrt(self.hidden_channels) # [b, t, h] + x = self.lrelu(x) + x = torch.transpose(x, 1, -1) # [b, h, t] + x_mask = torch.unsqueeze(commons.sequence_mask(lengths, x.size(2)), 1).to( + x.dtype + ) + x = self.encoder(x * x_mask, x_mask) + stats = self.proj(x) * x_mask + + m, logs = torch.split(stats, self.out_channels, dim=1) + return m, logs, x_mask + + +class ResidualCouplingBlock(nn.Module): + def __init__( + self, + channels, + hidden_channels, + kernel_size, + dilation_rate, + n_layers, + n_flows=4, + gin_channels=0, + ): + super(ResidualCouplingBlock, self).__init__() + self.channels = channels + self.hidden_channels = hidden_channels + self.kernel_size = kernel_size + self.dilation_rate = dilation_rate + self.n_layers = n_layers + self.n_flows = n_flows + self.gin_channels = gin_channels + + self.flows = nn.ModuleList() + for i in range(n_flows): + self.flows.append( + modules.ResidualCouplingLayer( + channels, + hidden_channels, + kernel_size, + dilation_rate, + n_layers, + gin_channels=gin_channels, + mean_only=True, + ) + ) + self.flows.append(modules.Flip()) + + def forward( + self, + x: torch.Tensor, + x_mask: torch.Tensor, + g: Optional[torch.Tensor] = None, + reverse: bool = False, + ): + if not reverse: + for flow in self.flows: + x, _ = flow(x, x_mask, g=g, reverse=reverse) + else: + for flow in self.flows[::-1]: + x, _ = flow.forward(x, x_mask, g=g, reverse=reverse) + return x + + def remove_weight_norm(self): + for i in range(self.n_flows): + self.flows[i * 2].remove_weight_norm() + + def __prepare_scriptable__(self): + for i in range(self.n_flows): + for hook in self.flows[i * 2]._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(self.flows[i * 2]) + + return self + + +class PosteriorEncoder(nn.Module): + def __init__( + self, + in_channels, + out_channels, + hidden_channels, + kernel_size, + dilation_rate, + n_layers, + gin_channels=0, + ): + super(PosteriorEncoder, self).__init__() + self.in_channels = in_channels + self.out_channels = out_channels + self.hidden_channels = hidden_channels + self.kernel_size = kernel_size + self.dilation_rate = dilation_rate + self.n_layers = n_layers + self.gin_channels = gin_channels + + self.pre = nn.Conv1d(in_channels, hidden_channels, 1) + self.enc = modules.WN( + hidden_channels, + kernel_size, + dilation_rate, + n_layers, + gin_channels=gin_channels, + ) + self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1) + + def forward( + self, x: torch.Tensor, x_lengths: torch.Tensor, g: Optional[torch.Tensor] = None + ): + x_mask = torch.unsqueeze(commons.sequence_mask(x_lengths, x.size(2)), 1).to( + x.dtype + ) + x = self.pre(x) * x_mask + x = self.enc(x, x_mask, g=g) + stats = self.proj(x) * x_mask + m, logs = torch.split(stats, self.out_channels, dim=1) + z = (m + torch.randn_like(m) * torch.exp(logs)) * x_mask + return z, m, logs, x_mask + + def remove_weight_norm(self): + self.enc.remove_weight_norm() + + def __prepare_scriptable__(self): + for hook in self.enc._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(self.enc) + return self + + +class Generator(torch.nn.Module): + def __init__( + self, + initial_channel, + resblock, + resblock_kernel_sizes, + resblock_dilation_sizes, + upsample_rates, + upsample_initial_channel, + upsample_kernel_sizes, + gin_channels=0, + ): + super(Generator, self).__init__() + self.num_kernels = len(resblock_kernel_sizes) + self.num_upsamples = len(upsample_rates) + self.conv_pre = Conv1d( + initial_channel, upsample_initial_channel, 7, 1, padding=3 + ) + resblock = modules.ResBlock1 if resblock == "1" else modules.ResBlock2 + + self.ups = nn.ModuleList() + for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)): + self.ups.append( + weight_norm( + ConvTranspose1d( + upsample_initial_channel // (2**i), + upsample_initial_channel // (2 ** (i + 1)), + k, + u, + padding=(k - u) // 2, + ) + ) + ) + + self.resblocks = nn.ModuleList() + for i in range(len(self.ups)): + ch = upsample_initial_channel // (2 ** (i + 1)) + for j, (k, d) in enumerate( + zip(resblock_kernel_sizes, resblock_dilation_sizes) + ): + self.resblocks.append(resblock(ch, k, d)) + + self.conv_post = Conv1d(ch, 1, 7, 1, padding=3, bias=False) + self.ups.apply(init_weights) + + if gin_channels != 0: + self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1) + + def forward(self, x: torch.Tensor, g: Optional[torch.Tensor] = None): + x = self.conv_pre(x) + if g is not None: + x = x + self.cond(g) + + for i in range(self.num_upsamples): + x = F.leaky_relu(x, modules.LRELU_SLOPE) + x = self.ups[i](x) + xs = None + for j in range(self.num_kernels): + if xs is None: + xs = self.resblocks[i * self.num_kernels + j](x) + else: + xs += self.resblocks[i * self.num_kernels + j](x) + x = xs / self.num_kernels + x = F.leaky_relu(x) + x = self.conv_post(x) + x = torch.tanh(x) + + return x + + def __prepare_scriptable__(self): + for l in self.ups: + for hook in l._forward_pre_hooks.values(): + # The hook we want to remove is an instance of WeightNorm class, so + # normally we would do `if isinstance(...)` but this class is not accessible + # because of shadowing, so we check the module name directly. + # https://github.com/pytorch/pytorch/blob/be0ca00c5ce260eb5bcec3237357f7a30cc08983/torch/nn/utils/__init__.py#L3 + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(l) + + for l in self.resblocks: + for hook in l._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(l) + return self + + def remove_weight_norm(self): + for l in self.ups: + remove_weight_norm(l) + for l in self.resblocks: + l.remove_weight_norm() + + +class SineGen(torch.nn.Module): + """Definition of sine generator + SineGen(samp_rate, harmonic_num = 0, + sine_amp = 0.1, noise_std = 0.003, + voiced_threshold = 0, + flag_for_pulse=False) + samp_rate: sampling rate in Hz + harmonic_num: number of harmonic overtones (default 0) + sine_amp: amplitude of sine-wavefrom (default 0.1) + noise_std: std of Gaussian noise (default 0.003) + voiced_thoreshold: F0 threshold for U/V classification (default 0) + flag_for_pulse: this SinGen is used inside PulseGen (default False) + Note: when flag_for_pulse is True, the first time step of a voiced + segment is always sin(torch.pi) or cos(0) + """ + + def __init__( + self, + samp_rate, + harmonic_num=0, + sine_amp=0.1, + noise_std=0.003, + voiced_threshold=0, + flag_for_pulse=False, + ): + super(SineGen, self).__init__() + self.sine_amp = sine_amp + self.noise_std = noise_std + self.harmonic_num = harmonic_num + self.dim = self.harmonic_num + 1 + self.sampling_rate = samp_rate + self.voiced_threshold = voiced_threshold + + def _f02uv(self, f0): + # generate uv signal + uv = torch.ones_like(f0) + uv = uv * (f0 > self.voiced_threshold) + if uv.device.type == "privateuseone": # for DirectML + uv = uv.float() + return uv + + def forward(self, f0: torch.Tensor, upp: int): + """sine_tensor, uv = forward(f0) + input F0: tensor(batchsize=1, length, dim=1) + f0 for unvoiced steps should be 0 + output sine_tensor: tensor(batchsize=1, length, dim) + output uv: tensor(batchsize=1, length, 1) + """ + with torch.no_grad(): + f0 = f0[:, None].transpose(1, 2) + f0_buf = torch.zeros(f0.shape[0], f0.shape[1], self.dim, device=f0.device) + # fundamental component + f0_buf[:, :, 0] = f0[:, :, 0] + for idx in range(self.harmonic_num): + f0_buf[:, :, idx + 1] = f0_buf[:, :, 0] * ( + idx + 2 + ) # idx + 2: the (idx+1)-th overtone, (idx+2)-th harmonic + rad_values = (f0_buf / self.sampling_rate) % 1 ###%1意味着n_har的乘积无法后处理优化 + rand_ini = torch.rand( + f0_buf.shape[0], f0_buf.shape[2], device=f0_buf.device + ) + rand_ini[:, 0] = 0 + rad_values[:, 0, :] = rad_values[:, 0, :] + rand_ini + tmp_over_one = torch.cumsum(rad_values, 1) # % 1 #####%1意味着后面的cumsum无法再优化 + tmp_over_one *= upp + tmp_over_one = F.interpolate( + tmp_over_one.transpose(2, 1), + scale_factor=float(upp), + mode="linear", + align_corners=True, + ).transpose(2, 1) + rad_values = F.interpolate( + rad_values.transpose(2, 1), scale_factor=float(upp), mode="nearest" + ).transpose( + 2, 1 + ) ####### + tmp_over_one %= 1 + tmp_over_one_idx = (tmp_over_one[:, 1:, :] - tmp_over_one[:, :-1, :]) < 0 + cumsum_shift = torch.zeros_like(rad_values) + cumsum_shift[:, 1:, :] = tmp_over_one_idx * -1.0 + sine_waves = torch.sin( + torch.cumsum(rad_values + cumsum_shift, dim=1) * 2 * torch.pi + ) + sine_waves = sine_waves * self.sine_amp + uv = self._f02uv(f0) + uv = F.interpolate( + uv.transpose(2, 1), scale_factor=float(upp), mode="nearest" + ).transpose(2, 1) + noise_amp = uv * self.noise_std + (1 - uv) * self.sine_amp / 3 + noise = noise_amp * torch.randn_like(sine_waves) + sine_waves = sine_waves * uv + noise + return sine_waves, uv, noise + + +class SourceModuleHnNSF(torch.nn.Module): + """SourceModule for hn-nsf + SourceModule(sampling_rate, harmonic_num=0, sine_amp=0.1, + add_noise_std=0.003, voiced_threshod=0) + sampling_rate: sampling_rate in Hz + harmonic_num: number of harmonic above F0 (default: 0) + sine_amp: amplitude of sine source signal (default: 0.1) + add_noise_std: std of additive Gaussian noise (default: 0.003) + note that amplitude of noise in unvoiced is decided + by sine_amp + voiced_threshold: threhold to set U/V given F0 (default: 0) + Sine_source, noise_source = SourceModuleHnNSF(F0_sampled) + F0_sampled (batchsize, length, 1) + Sine_source (batchsize, length, 1) + noise_source (batchsize, length 1) + uv (batchsize, length, 1) + """ + + def __init__( + self, + sampling_rate, + harmonic_num=0, + sine_amp=0.1, + add_noise_std=0.003, + voiced_threshod=0, + is_half=True, + ): + super(SourceModuleHnNSF, self).__init__() + + self.sine_amp = sine_amp + self.noise_std = add_noise_std + self.is_half = is_half + # to produce sine waveforms + self.l_sin_gen = SineGen( + sampling_rate, harmonic_num, sine_amp, add_noise_std, voiced_threshod + ) + + # to merge source harmonics into a single excitation + self.l_linear = torch.nn.Linear(harmonic_num + 1, 1) + self.l_tanh = torch.nn.Tanh() + # self.ddtype:int = -1 + + def forward(self, x: torch.Tensor, upp: int = 1): + # if self.ddtype ==-1: + # self.ddtype = self.l_linear.weight.dtype + sine_wavs, uv, _ = self.l_sin_gen(x, upp) + # print(x.dtype,sine_wavs.dtype,self.l_linear.weight.dtype) + # if self.is_half: + # sine_wavs = sine_wavs.half() + # sine_merge = self.l_tanh(self.l_linear(sine_wavs.to(x))) + # print(sine_wavs.dtype,self.ddtype) + # if sine_wavs.dtype != self.l_linear.weight.dtype: + sine_wavs = sine_wavs.to(dtype=self.l_linear.weight.dtype) + sine_merge = self.l_tanh(self.l_linear(sine_wavs)) + return sine_merge, None, None # noise, uv + + +class GeneratorNSF(torch.nn.Module): + def __init__( + self, + initial_channel, + resblock, + resblock_kernel_sizes, + resblock_dilation_sizes, + upsample_rates, + upsample_initial_channel, + upsample_kernel_sizes, + gin_channels, + sr, + is_half=False, + ): + super(GeneratorNSF, self).__init__() + self.num_kernels = len(resblock_kernel_sizes) + self.num_upsamples = len(upsample_rates) + + self.f0_upsamp = torch.nn.Upsample(scale_factor=math.prod(upsample_rates)) + self.m_source = SourceModuleHnNSF( + sampling_rate=sr, harmonic_num=0, is_half=is_half + ) + self.noise_convs = nn.ModuleList() + self.conv_pre = Conv1d( + initial_channel, upsample_initial_channel, 7, 1, padding=3 + ) + resblock = modules.ResBlock1 if resblock == "1" else modules.ResBlock2 + + self.ups = nn.ModuleList() + for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)): + c_cur = upsample_initial_channel // (2 ** (i + 1)) + self.ups.append( + weight_norm( + ConvTranspose1d( + upsample_initial_channel // (2**i), + upsample_initial_channel // (2 ** (i + 1)), + k, + u, + padding=(k - u) // 2, + ) + ) + ) + if i + 1 < len(upsample_rates): + stride_f0 = math.prod(upsample_rates[i + 1 :]) + self.noise_convs.append( + Conv1d( + 1, + c_cur, + kernel_size=stride_f0 * 2, + stride=stride_f0, + padding=stride_f0 // 2, + ) + ) + else: + self.noise_convs.append(Conv1d(1, c_cur, kernel_size=1)) + + self.resblocks = nn.ModuleList() + for i in range(len(self.ups)): + ch = upsample_initial_channel // (2 ** (i + 1)) + for j, (k, d) in enumerate( + zip(resblock_kernel_sizes, resblock_dilation_sizes) + ): + self.resblocks.append(resblock(ch, k, d)) + + self.conv_post = Conv1d(ch, 1, 7, 1, padding=3, bias=False) + self.ups.apply(init_weights) + + if gin_channels != 0: + self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1) + + self.upp = math.prod(upsample_rates) + + self.lrelu_slope = modules.LRELU_SLOPE + + def forward(self, x, f0, g: Optional[torch.Tensor] = None): + har_source, noi_source, uv = self.m_source(f0, self.upp) + har_source = har_source.transpose(1, 2) + x = self.conv_pre(x) + if g is not None: + x = x + self.cond(g) + # torch.jit.script() does not support direct indexing of torch modules + # That's why I wrote this + for i, (ups, noise_convs) in enumerate(zip(self.ups, self.noise_convs)): + if i < self.num_upsamples: + x = F.leaky_relu(x, self.lrelu_slope) + x = ups(x) + x_source = noise_convs(har_source) + x = x + x_source + xs: Optional[torch.Tensor] = None + l = [i * self.num_kernels + j for j in range(self.num_kernels)] + for j, resblock in enumerate(self.resblocks): + if j in l: + if xs is None: + xs = resblock(x) + else: + xs += resblock(x) + # This assertion cannot be ignored! \ + # If ignored, it will cause torch.jit.script() compilation errors + assert isinstance(xs, torch.Tensor) + x = xs / self.num_kernels + x = F.leaky_relu(x) + x = self.conv_post(x) + x = torch.tanh(x) + return x + + def remove_weight_norm(self): + for l in self.ups: + remove_weight_norm(l) + for l in self.resblocks: + l.remove_weight_norm() + + def __prepare_scriptable__(self): + for l in self.ups: + for hook in l._forward_pre_hooks.values(): + # The hook we want to remove is an instance of WeightNorm class, so + # normally we would do `if isinstance(...)` but this class is not accessible + # because of shadowing, so we check the module name directly. + # https://github.com/pytorch/pytorch/blob/be0ca00c5ce260eb5bcec3237357f7a30cc08983/torch/nn/utils/__init__.py#L3 + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(l) + for l in self.resblocks: + for hook in self.resblocks._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(l) + return self + + +sr2sr = { + "32k": 32000, + "40k": 40000, + "48k": 48000, +} + + +class SynthesizerTrnMs256NSFsid(nn.Module): + def __init__( + self, + spec_channels, + segment_size, + inter_channels, + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size, + p_dropout, + resblock, + resblock_kernel_sizes, + resblock_dilation_sizes, + upsample_rates, + upsample_initial_channel, + upsample_kernel_sizes, + spk_embed_dim, + gin_channels, + sr, + **kwargs + ): + super(SynthesizerTrnMs256NSFsid, self).__init__() + if isinstance(sr, str): + sr = sr2sr[sr] + self.spec_channels = spec_channels + self.inter_channels = inter_channels + self.hidden_channels = hidden_channels + self.filter_channels = filter_channels + self.n_heads = n_heads + self.n_layers = n_layers + self.kernel_size = kernel_size + self.p_dropout = float(p_dropout) + self.resblock = resblock + self.resblock_kernel_sizes = resblock_kernel_sizes + self.resblock_dilation_sizes = resblock_dilation_sizes + self.upsample_rates = upsample_rates + self.upsample_initial_channel = upsample_initial_channel + self.upsample_kernel_sizes = upsample_kernel_sizes + self.segment_size = segment_size + self.gin_channels = gin_channels + # self.hop_length = hop_length# + self.spk_embed_dim = spk_embed_dim + self.enc_p = TextEncoder256( + inter_channels, + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size, + float(p_dropout), + ) + self.dec = GeneratorNSF( + inter_channels, + resblock, + resblock_kernel_sizes, + resblock_dilation_sizes, + upsample_rates, + upsample_initial_channel, + upsample_kernel_sizes, + gin_channels=gin_channels, + sr=sr, + is_half=kwargs["is_half"], + ) + self.enc_q = PosteriorEncoder( + spec_channels, + inter_channels, + hidden_channels, + 5, + 1, + 16, + gin_channels=gin_channels, + ) + self.flow = ResidualCouplingBlock( + inter_channels, hidden_channels, 5, 1, 3, gin_channels=gin_channels + ) + self.emb_g = nn.Embedding(self.spk_embed_dim, gin_channels) + logger.debug( + "gin_channels: " + + str(gin_channels) + + ", self.spk_embed_dim: " + + str(self.spk_embed_dim) + ) + + def remove_weight_norm(self): + self.dec.remove_weight_norm() + self.flow.remove_weight_norm() + self.enc_q.remove_weight_norm() + + def __prepare_scriptable__(self): + for hook in self.dec._forward_pre_hooks.values(): + # The hook we want to remove is an instance of WeightNorm class, so + # normally we would do `if isinstance(...)` but this class is not accessible + # because of shadowing, so we check the module name directly. + # https://github.com/pytorch/pytorch/blob/be0ca00c5ce260eb5bcec3237357f7a30cc08983/torch/nn/utils/__init__.py#L3 + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(self.dec) + for hook in self.flow._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(self.flow) + if hasattr(self, "enc_q"): + for hook in self.enc_q._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(self.enc_q) + return self + + @torch.jit.ignore + def forward( + self, + phone: torch.Tensor, + phone_lengths: torch.Tensor, + pitch: torch.Tensor, + pitchf: torch.Tensor, + y: torch.Tensor, + y_lengths: torch.Tensor, + ds: Optional[torch.Tensor] = None, + ): # 这里ds是id,[bs,1] + # print(1,pitch.shape)#[bs,t] + g = self.emb_g(ds).unsqueeze(-1) # [b, 256, 1]##1是t,广播的 + m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths) + z, m_q, logs_q, y_mask = self.enc_q(y, y_lengths, g=g) + z_p = self.flow(z, y_mask, g=g) + z_slice, ids_slice = commons.rand_slice_segments( + z, y_lengths, self.segment_size + ) + # print(-1,pitchf.shape,ids_slice,self.segment_size,self.hop_length,self.segment_size//self.hop_length) + pitchf = commons.slice_segments2(pitchf, ids_slice, self.segment_size) + # print(-2,pitchf.shape,z_slice.shape) + o = self.dec(z_slice, pitchf, g=g) + return o, ids_slice, x_mask, y_mask, (z, z_p, m_p, logs_p, m_q, logs_q) + + @torch.jit.export + def infer( + self, + phone: torch.Tensor, + phone_lengths: torch.Tensor, + pitch: torch.Tensor, + nsff0: torch.Tensor, + sid: torch.Tensor, + rate: Optional[torch.Tensor] = None, + ): + g = self.emb_g(sid).unsqueeze(-1) + m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths) + z_p = (m_p + torch.exp(logs_p) * torch.randn_like(m_p) * 0.66666) * x_mask + if rate is not None: + assert isinstance(rate, torch.Tensor) + head = int(z_p.shape[2] * (1 - rate.item())) + z_p = z_p[:, :, head:] + x_mask = x_mask[:, :, head:] + nsff0 = nsff0[:, head:] + z = self.flow(z_p, x_mask, g=g, reverse=True) + o = self.dec(z * x_mask, nsff0, g=g) + return o, x_mask, (z, z_p, m_p, logs_p) + + +class SynthesizerTrnMs768NSFsid(nn.Module): + def __init__( + self, + spec_channels, + segment_size, + inter_channels, + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size, + p_dropout, + resblock, + resblock_kernel_sizes, + resblock_dilation_sizes, + upsample_rates, + upsample_initial_channel, + upsample_kernel_sizes, + spk_embed_dim, + gin_channels, + sr, + **kwargs + ): + super(SynthesizerTrnMs768NSFsid, self).__init__() + if isinstance(sr, str): + sr = sr2sr[sr] + self.spec_channels = spec_channels + self.inter_channels = inter_channels + self.hidden_channels = hidden_channels + self.filter_channels = filter_channels + self.n_heads = n_heads + self.n_layers = n_layers + self.kernel_size = kernel_size + self.p_dropout = float(p_dropout) + self.resblock = resblock + self.resblock_kernel_sizes = resblock_kernel_sizes + self.resblock_dilation_sizes = resblock_dilation_sizes + self.upsample_rates = upsample_rates + self.upsample_initial_channel = upsample_initial_channel + self.upsample_kernel_sizes = upsample_kernel_sizes + self.segment_size = segment_size + self.gin_channels = gin_channels + # self.hop_length = hop_length# + self.spk_embed_dim = spk_embed_dim + self.enc_p = TextEncoder768( + inter_channels, + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size, + float(p_dropout), + ) + self.dec = GeneratorNSF( + inter_channels, + resblock, + resblock_kernel_sizes, + resblock_dilation_sizes, + upsample_rates, + upsample_initial_channel, + upsample_kernel_sizes, + gin_channels=gin_channels, + sr=sr, + is_half=kwargs["is_half"], + ) + self.enc_q = PosteriorEncoder( + spec_channels, + inter_channels, + hidden_channels, + 5, + 1, + 16, + gin_channels=gin_channels, + ) + self.flow = ResidualCouplingBlock( + inter_channels, hidden_channels, 5, 1, 3, gin_channels=gin_channels + ) + self.emb_g = nn.Embedding(self.spk_embed_dim, gin_channels) + logger.debug( + "gin_channels: " + + str(gin_channels) + + ", self.spk_embed_dim: " + + str(self.spk_embed_dim) + ) + + def remove_weight_norm(self): + self.dec.remove_weight_norm() + self.flow.remove_weight_norm() + self.enc_q.remove_weight_norm() + + def __prepare_scriptable__(self): + for hook in self.dec._forward_pre_hooks.values(): + # The hook we want to remove is an instance of WeightNorm class, so + # normally we would do `if isinstance(...)` but this class is not accessible + # because of shadowing, so we check the module name directly. + # https://github.com/pytorch/pytorch/blob/be0ca00c5ce260eb5bcec3237357f7a30cc08983/torch/nn/utils/__init__.py#L3 + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(self.dec) + for hook in self.flow._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(self.flow) + if hasattr(self, "enc_q"): + for hook in self.enc_q._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(self.enc_q) + return self + + @torch.jit.ignore + def forward( + self, phone, phone_lengths, pitch, pitchf, y, y_lengths, ds + ): # 这里ds是id,[bs,1] + # print(1,pitch.shape)#[bs,t] + g = self.emb_g(ds).unsqueeze(-1) # [b, 256, 1]##1是t,广播的 + m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths) + z, m_q, logs_q, y_mask = self.enc_q(y, y_lengths, g=g) + z_p = self.flow(z, y_mask, g=g) + z_slice, ids_slice = commons.rand_slice_segments( + z, y_lengths, self.segment_size + ) + # print(-1,pitchf.shape,ids_slice,self.segment_size,self.hop_length,self.segment_size//self.hop_length) + pitchf = commons.slice_segments2(pitchf, ids_slice, self.segment_size) + # print(-2,pitchf.shape,z_slice.shape) + o = self.dec(z_slice, pitchf, g=g) + return o, ids_slice, x_mask, y_mask, (z, z_p, m_p, logs_p, m_q, logs_q) + + @torch.jit.export + def infer( + self, + phone: torch.Tensor, + phone_lengths: torch.Tensor, + pitch: torch.Tensor, + nsff0: torch.Tensor, + sid: torch.Tensor, + rate: Optional[torch.Tensor] = None, + ): + g = self.emb_g(sid).unsqueeze(-1) + m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths) + z_p = (m_p + torch.exp(logs_p) * torch.randn_like(m_p) * 0.66666) * x_mask + if rate is not None: + head = int(z_p.shape[2] * (1.0 - rate.item())) + z_p = z_p[:, :, head:] + x_mask = x_mask[:, :, head:] + nsff0 = nsff0[:, head:] + z = self.flow(z_p, x_mask, g=g, reverse=True) + o = self.dec(z * x_mask, nsff0, g=g) + return o, x_mask, (z, z_p, m_p, logs_p) + + +class SynthesizerTrnMs256NSFsid_nono(nn.Module): + def __init__( + self, + spec_channels, + segment_size, + inter_channels, + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size, + p_dropout, + resblock, + resblock_kernel_sizes, + resblock_dilation_sizes, + upsample_rates, + upsample_initial_channel, + upsample_kernel_sizes, + spk_embed_dim, + gin_channels, + sr=None, + **kwargs + ): + super(SynthesizerTrnMs256NSFsid_nono, self).__init__() + self.spec_channels = spec_channels + self.inter_channels = inter_channels + self.hidden_channels = hidden_channels + self.filter_channels = filter_channels + self.n_heads = n_heads + self.n_layers = n_layers + self.kernel_size = kernel_size + self.p_dropout = float(p_dropout) + self.resblock = resblock + self.resblock_kernel_sizes = resblock_kernel_sizes + self.resblock_dilation_sizes = resblock_dilation_sizes + self.upsample_rates = upsample_rates + self.upsample_initial_channel = upsample_initial_channel + self.upsample_kernel_sizes = upsample_kernel_sizes + self.segment_size = segment_size + self.gin_channels = gin_channels + # self.hop_length = hop_length# + self.spk_embed_dim = spk_embed_dim + self.enc_p = TextEncoder256( + inter_channels, + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size, + float(p_dropout), + f0=False, + ) + self.dec = Generator( + inter_channels, + resblock, + resblock_kernel_sizes, + resblock_dilation_sizes, + upsample_rates, + upsample_initial_channel, + upsample_kernel_sizes, + gin_channels=gin_channels, + ) + self.enc_q = PosteriorEncoder( + spec_channels, + inter_channels, + hidden_channels, + 5, + 1, + 16, + gin_channels=gin_channels, + ) + self.flow = ResidualCouplingBlock( + inter_channels, hidden_channels, 5, 1, 3, gin_channels=gin_channels + ) + self.emb_g = nn.Embedding(self.spk_embed_dim, gin_channels) + logger.debug( + "gin_channels: " + + str(gin_channels) + + ", self.spk_embed_dim: " + + str(self.spk_embed_dim) + ) + + def remove_weight_norm(self): + self.dec.remove_weight_norm() + self.flow.remove_weight_norm() + self.enc_q.remove_weight_norm() + + def __prepare_scriptable__(self): + for hook in self.dec._forward_pre_hooks.values(): + # The hook we want to remove is an instance of WeightNorm class, so + # normally we would do `if isinstance(...)` but this class is not accessible + # because of shadowing, so we check the module name directly. + # https://github.com/pytorch/pytorch/blob/be0ca00c5ce260eb5bcec3237357f7a30cc08983/torch/nn/utils/__init__.py#L3 + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(self.dec) + for hook in self.flow._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(self.flow) + if hasattr(self, "enc_q"): + for hook in self.enc_q._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(self.enc_q) + return self + + @torch.jit.ignore + def forward(self, phone, phone_lengths, y, y_lengths, ds): # 这里ds是id,[bs,1] + g = self.emb_g(ds).unsqueeze(-1) # [b, 256, 1]##1是t,广播的 + m_p, logs_p, x_mask = self.enc_p(phone, None, phone_lengths) + z, m_q, logs_q, y_mask = self.enc_q(y, y_lengths, g=g) + z_p = self.flow(z, y_mask, g=g) + z_slice, ids_slice = commons.rand_slice_segments( + z, y_lengths, self.segment_size + ) + o = self.dec(z_slice, g=g) + return o, ids_slice, x_mask, y_mask, (z, z_p, m_p, logs_p, m_q, logs_q) + + @torch.jit.export + def infer( + self, + phone: torch.Tensor, + phone_lengths: torch.Tensor, + sid: torch.Tensor, + rate: Optional[torch.Tensor] = None, + ): + g = self.emb_g(sid).unsqueeze(-1) + m_p, logs_p, x_mask = self.enc_p(phone, None, phone_lengths) + z_p = (m_p + torch.exp(logs_p) * torch.randn_like(m_p) * 0.66666) * x_mask + if rate is not None: + head = int(z_p.shape[2] * (1.0 - rate.item())) + z_p = z_p[:, :, head:] + x_mask = x_mask[:, :, head:] + z = self.flow(z_p, x_mask, g=g, reverse=True) + o = self.dec(z * x_mask, g=g) + return o, x_mask, (z, z_p, m_p, logs_p) + + +class SynthesizerTrnMs768NSFsid_nono(nn.Module): + def __init__( + self, + spec_channels, + segment_size, + inter_channels, + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size, + p_dropout, + resblock, + resblock_kernel_sizes, + resblock_dilation_sizes, + upsample_rates, + upsample_initial_channel, + upsample_kernel_sizes, + spk_embed_dim, + gin_channels, + sr=None, + **kwargs + ): + super(SynthesizerTrnMs768NSFsid_nono, self).__init__() + self.spec_channels = spec_channels + self.inter_channels = inter_channels + self.hidden_channels = hidden_channels + self.filter_channels = filter_channels + self.n_heads = n_heads + self.n_layers = n_layers + self.kernel_size = kernel_size + self.p_dropout = float(p_dropout) + self.resblock = resblock + self.resblock_kernel_sizes = resblock_kernel_sizes + self.resblock_dilation_sizes = resblock_dilation_sizes + self.upsample_rates = upsample_rates + self.upsample_initial_channel = upsample_initial_channel + self.upsample_kernel_sizes = upsample_kernel_sizes + self.segment_size = segment_size + self.gin_channels = gin_channels + # self.hop_length = hop_length# + self.spk_embed_dim = spk_embed_dim + self.enc_p = TextEncoder768( + inter_channels, + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size, + float(p_dropout), + f0=False, + ) + self.dec = Generator( + inter_channels, + resblock, + resblock_kernel_sizes, + resblock_dilation_sizes, + upsample_rates, + upsample_initial_channel, + upsample_kernel_sizes, + gin_channels=gin_channels, + ) + self.enc_q = PosteriorEncoder( + spec_channels, + inter_channels, + hidden_channels, + 5, + 1, + 16, + gin_channels=gin_channels, + ) + self.flow = ResidualCouplingBlock( + inter_channels, hidden_channels, 5, 1, 3, gin_channels=gin_channels + ) + self.emb_g = nn.Embedding(self.spk_embed_dim, gin_channels) + logger.debug( + "gin_channels: " + + str(gin_channels) + + ", self.spk_embed_dim: " + + str(self.spk_embed_dim) + ) + + def remove_weight_norm(self): + self.dec.remove_weight_norm() + self.flow.remove_weight_norm() + self.enc_q.remove_weight_norm() + + def __prepare_scriptable__(self): + for hook in self.dec._forward_pre_hooks.values(): + # The hook we want to remove is an instance of WeightNorm class, so + # normally we would do `if isinstance(...)` but this class is not accessible + # because of shadowing, so we check the module name directly. + # https://github.com/pytorch/pytorch/blob/be0ca00c5ce260eb5bcec3237357f7a30cc08983/torch/nn/utils/__init__.py#L3 + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(self.dec) + for hook in self.flow._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(self.flow) + if hasattr(self, "enc_q"): + for hook in self.enc_q._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(self.enc_q) + return self + + @torch.jit.ignore + def forward(self, phone, phone_lengths, y, y_lengths, ds): # 这里ds是id,[bs,1] + g = self.emb_g(ds).unsqueeze(-1) # [b, 256, 1]##1是t,广播的 + m_p, logs_p, x_mask = self.enc_p(phone, None, phone_lengths) + z, m_q, logs_q, y_mask = self.enc_q(y, y_lengths, g=g) + z_p = self.flow(z, y_mask, g=g) + z_slice, ids_slice = commons.rand_slice_segments( + z, y_lengths, self.segment_size + ) + o = self.dec(z_slice, g=g) + return o, ids_slice, x_mask, y_mask, (z, z_p, m_p, logs_p, m_q, logs_q) + + @torch.jit.export + def infer( + self, + phone: torch.Tensor, + phone_lengths: torch.Tensor, + sid: torch.Tensor, + rate: Optional[torch.Tensor] = None, + ): + g = self.emb_g(sid).unsqueeze(-1) + m_p, logs_p, x_mask = self.enc_p(phone, None, phone_lengths) + z_p = (m_p + torch.exp(logs_p) * torch.randn_like(m_p) * 0.66666) * x_mask + if rate is not None: + head = int(z_p.shape[2] * (1.0 - rate.item())) + z_p = z_p[:, :, head:] + x_mask = x_mask[:, :, head:] + z = self.flow(z_p, x_mask, g=g, reverse=True) + o = self.dec(z * x_mask, g=g) + return o, x_mask, (z, z_p, m_p, logs_p) + + +class MultiPeriodDiscriminator(torch.nn.Module): + def __init__(self, use_spectral_norm=False): + super(MultiPeriodDiscriminator, self).__init__() + periods = [2, 3, 5, 7, 11, 17] + # periods = [3, 5, 7, 11, 17, 23, 37] + + discs = [DiscriminatorS(use_spectral_norm=use_spectral_norm)] + discs = discs + [ + DiscriminatorP(i, use_spectral_norm=use_spectral_norm) for i in periods + ] + self.discriminators = nn.ModuleList(discs) + + def forward(self, y, y_hat): + y_d_rs = [] # + y_d_gs = [] + fmap_rs = [] + fmap_gs = [] + for i, d in enumerate(self.discriminators): + y_d_r, fmap_r = d(y) + y_d_g, fmap_g = d(y_hat) + # for j in range(len(fmap_r)): + # print(i,j,y.shape,y_hat.shape,fmap_r[j].shape,fmap_g[j].shape) + y_d_rs.append(y_d_r) + y_d_gs.append(y_d_g) + fmap_rs.append(fmap_r) + fmap_gs.append(fmap_g) + + return y_d_rs, y_d_gs, fmap_rs, fmap_gs + + +class MultiPeriodDiscriminatorV2(torch.nn.Module): + def __init__(self, use_spectral_norm=False): + super(MultiPeriodDiscriminatorV2, self).__init__() + # periods = [2, 3, 5, 7, 11, 17] + periods = [2, 3, 5, 7, 11, 17, 23, 37] + + discs = [DiscriminatorS(use_spectral_norm=use_spectral_norm)] + discs = discs + [ + DiscriminatorP(i, use_spectral_norm=use_spectral_norm) for i in periods + ] + self.discriminators = nn.ModuleList(discs) + + def forward(self, y, y_hat): + y_d_rs = [] # + y_d_gs = [] + fmap_rs = [] + fmap_gs = [] + for i, d in enumerate(self.discriminators): + y_d_r, fmap_r = d(y) + y_d_g, fmap_g = d(y_hat) + # for j in range(len(fmap_r)): + # print(i,j,y.shape,y_hat.shape,fmap_r[j].shape,fmap_g[j].shape) + y_d_rs.append(y_d_r) + y_d_gs.append(y_d_g) + fmap_rs.append(fmap_r) + fmap_gs.append(fmap_g) + + return y_d_rs, y_d_gs, fmap_rs, fmap_gs + + +class DiscriminatorS(torch.nn.Module): + def __init__(self, use_spectral_norm=False): + super(DiscriminatorS, self).__init__() + norm_f = weight_norm if use_spectral_norm == False else spectral_norm + self.convs = nn.ModuleList( + [ + norm_f(Conv1d(1, 16, 15, 1, padding=7)), + norm_f(Conv1d(16, 64, 41, 4, groups=4, padding=20)), + norm_f(Conv1d(64, 256, 41, 4, groups=16, padding=20)), + norm_f(Conv1d(256, 1024, 41, 4, groups=64, padding=20)), + norm_f(Conv1d(1024, 1024, 41, 4, groups=256, padding=20)), + norm_f(Conv1d(1024, 1024, 5, 1, padding=2)), + ] + ) + self.conv_post = norm_f(Conv1d(1024, 1, 3, 1, padding=1)) + + def forward(self, x): + fmap = [] + + for l in self.convs: + x = l(x) + x = F.leaky_relu(x, modules.LRELU_SLOPE) + fmap.append(x) + x = self.conv_post(x) + fmap.append(x) + x = torch.flatten(x, 1, -1) + + return x, fmap + + +class DiscriminatorP(torch.nn.Module): + def __init__(self, period, kernel_size=5, stride=3, use_spectral_norm=False): + super(DiscriminatorP, self).__init__() + self.period = period + self.use_spectral_norm = use_spectral_norm + norm_f = weight_norm if use_spectral_norm == False else spectral_norm + self.convs = nn.ModuleList( + [ + norm_f( + Conv2d( + 1, + 32, + (kernel_size, 1), + (stride, 1), + padding=(get_padding(kernel_size, 1), 0), + ) + ), + norm_f( + Conv2d( + 32, + 128, + (kernel_size, 1), + (stride, 1), + padding=(get_padding(kernel_size, 1), 0), + ) + ), + norm_f( + Conv2d( + 128, + 512, + (kernel_size, 1), + (stride, 1), + padding=(get_padding(kernel_size, 1), 0), + ) + ), + norm_f( + Conv2d( + 512, + 1024, + (kernel_size, 1), + (stride, 1), + padding=(get_padding(kernel_size, 1), 0), + ) + ), + norm_f( + Conv2d( + 1024, + 1024, + (kernel_size, 1), + 1, + padding=(get_padding(kernel_size, 1), 0), + ) + ), + ] + ) + self.conv_post = norm_f(Conv2d(1024, 1, (3, 1), 1, padding=(1, 0))) + + def forward(self, x): + fmap = [] + + # 1d to 2d + b, c, t = x.shape + if t % self.period != 0: # pad first + n_pad = self.period - (t % self.period) + if has_xpu and x.dtype == torch.bfloat16: + x = F.pad(x.to(dtype=torch.float16), (0, n_pad), "reflect").to( + dtype=torch.bfloat16 + ) + else: + x = F.pad(x, (0, n_pad), "reflect") + t = t + n_pad + x = x.view(b, c, t // self.period, self.period) + + for l in self.convs: + x = l(x) + x = F.leaky_relu(x, modules.LRELU_SLOPE) + fmap.append(x) + x = self.conv_post(x) + fmap.append(x) + x = torch.flatten(x, 1, -1) + + return x, fmap diff --git a/rvc/lib/infer_pack/models_onnx.py b/rvc/lib/infer_pack/models_onnx.py new file mode 100644 index 0000000..094bf1a --- /dev/null +++ b/rvc/lib/infer_pack/models_onnx.py @@ -0,0 +1,821 @@ +import logging +import math + +logger = logging.getLogger(__name__) + +import numpy as np +import torch +from torch import nn +from torch.nn import AvgPool1d, Conv1d, Conv2d, ConvTranspose1d +from torch.nn import functional as F +from torch.nn.utils import remove_weight_norm, spectral_norm, weight_norm + +from rvc.lib.infer_pack import attentions, commons, modules +from rvc.lib.infer_pack.commons import get_padding, init_weights + + +class TextEncoder256(nn.Module): + def __init__( + self, + out_channels, + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size, + p_dropout, + f0=True, + ): + super().__init__() + self.out_channels = out_channels + self.hidden_channels = hidden_channels + self.filter_channels = filter_channels + self.n_heads = n_heads + self.n_layers = n_layers + self.kernel_size = kernel_size + self.p_dropout = p_dropout + self.emb_phone = nn.Linear(256, hidden_channels) + self.lrelu = nn.LeakyReLU(0.1, inplace=True) + if f0 == True: + self.emb_pitch = nn.Embedding(256, hidden_channels) # pitch 256 + self.encoder = attentions.Encoder( + hidden_channels, filter_channels, n_heads, n_layers, kernel_size, p_dropout + ) + self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1) + + def forward(self, phone, pitch, lengths): + if pitch == None: + x = self.emb_phone(phone) + else: + x = self.emb_phone(phone) + self.emb_pitch(pitch) + x = x * math.sqrt(self.hidden_channels) # [b, t, h] + x = self.lrelu(x) + x = torch.transpose(x, 1, -1) # [b, h, t] + x_mask = torch.unsqueeze(commons.sequence_mask(lengths, x.size(2)), 1).to( + x.dtype + ) + x = self.encoder(x * x_mask, x_mask) + stats = self.proj(x) * x_mask + + m, logs = torch.split(stats, self.out_channels, dim=1) + return m, logs, x_mask + + +class TextEncoder768(nn.Module): + def __init__( + self, + out_channels, + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size, + p_dropout, + f0=True, + ): + super().__init__() + self.out_channels = out_channels + self.hidden_channels = hidden_channels + self.filter_channels = filter_channels + self.n_heads = n_heads + self.n_layers = n_layers + self.kernel_size = kernel_size + self.p_dropout = p_dropout + self.emb_phone = nn.Linear(768, hidden_channels) + self.lrelu = nn.LeakyReLU(0.1, inplace=True) + if f0 == True: + self.emb_pitch = nn.Embedding(256, hidden_channels) # pitch 256 + self.encoder = attentions.Encoder( + hidden_channels, filter_channels, n_heads, n_layers, kernel_size, p_dropout + ) + self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1) + + def forward(self, phone, pitch, lengths): + if pitch == None: + x = self.emb_phone(phone) + else: + x = self.emb_phone(phone) + self.emb_pitch(pitch) + x = x * math.sqrt(self.hidden_channels) # [b, t, h] + x = self.lrelu(x) + x = torch.transpose(x, 1, -1) # [b, h, t] + x_mask = torch.unsqueeze(commons.sequence_mask(lengths, x.size(2)), 1).to( + x.dtype + ) + x = self.encoder(x * x_mask, x_mask) + stats = self.proj(x) * x_mask + + m, logs = torch.split(stats, self.out_channels, dim=1) + return m, logs, x_mask + + +class ResidualCouplingBlock(nn.Module): + def __init__( + self, + channels, + hidden_channels, + kernel_size, + dilation_rate, + n_layers, + n_flows=4, + gin_channels=0, + ): + super().__init__() + self.channels = channels + self.hidden_channels = hidden_channels + self.kernel_size = kernel_size + self.dilation_rate = dilation_rate + self.n_layers = n_layers + self.n_flows = n_flows + self.gin_channels = gin_channels + + self.flows = nn.ModuleList() + for i in range(n_flows): + self.flows.append( + modules.ResidualCouplingLayer( + channels, + hidden_channels, + kernel_size, + dilation_rate, + n_layers, + gin_channels=gin_channels, + mean_only=True, + ) + ) + self.flows.append(modules.Flip()) + + def forward(self, x, x_mask, g=None, reverse=False): + if not reverse: + for flow in self.flows: + x, _ = flow(x, x_mask, g=g, reverse=reverse) + else: + for flow in reversed(self.flows): + x = flow(x, x_mask, g=g, reverse=reverse) + return x + + def remove_weight_norm(self): + for i in range(self.n_flows): + self.flows[i * 2].remove_weight_norm() + + +class PosteriorEncoder(nn.Module): + def __init__( + self, + in_channels, + out_channels, + hidden_channels, + kernel_size, + dilation_rate, + n_layers, + gin_channels=0, + ): + super().__init__() + self.in_channels = in_channels + self.out_channels = out_channels + self.hidden_channels = hidden_channels + self.kernel_size = kernel_size + self.dilation_rate = dilation_rate + self.n_layers = n_layers + self.gin_channels = gin_channels + + self.pre = nn.Conv1d(in_channels, hidden_channels, 1) + self.enc = modules.WN( + hidden_channels, + kernel_size, + dilation_rate, + n_layers, + gin_channels=gin_channels, + ) + self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1) + + def forward(self, x, x_lengths, g=None): + x_mask = torch.unsqueeze(commons.sequence_mask(x_lengths, x.size(2)), 1).to( + x.dtype + ) + x = self.pre(x) * x_mask + x = self.enc(x, x_mask, g=g) + stats = self.proj(x) * x_mask + m, logs = torch.split(stats, self.out_channels, dim=1) + z = (m + torch.randn_like(m) * torch.exp(logs)) * x_mask + return z, m, logs, x_mask + + def remove_weight_norm(self): + self.enc.remove_weight_norm() + + +class Generator(torch.nn.Module): + def __init__( + self, + initial_channel, + resblock, + resblock_kernel_sizes, + resblock_dilation_sizes, + upsample_rates, + upsample_initial_channel, + upsample_kernel_sizes, + gin_channels=0, + ): + super(Generator, self).__init__() + self.num_kernels = len(resblock_kernel_sizes) + self.num_upsamples = len(upsample_rates) + self.conv_pre = Conv1d( + initial_channel, upsample_initial_channel, 7, 1, padding=3 + ) + resblock = modules.ResBlock1 if resblock == "1" else modules.ResBlock2 + + self.ups = nn.ModuleList() + for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)): + self.ups.append( + weight_norm( + ConvTranspose1d( + upsample_initial_channel // (2**i), + upsample_initial_channel // (2 ** (i + 1)), + k, + u, + padding=(k - u) // 2, + ) + ) + ) + + self.resblocks = nn.ModuleList() + for i in range(len(self.ups)): + ch = upsample_initial_channel // (2 ** (i + 1)) + for j, (k, d) in enumerate( + zip(resblock_kernel_sizes, resblock_dilation_sizes) + ): + self.resblocks.append(resblock(ch, k, d)) + + self.conv_post = Conv1d(ch, 1, 7, 1, padding=3, bias=False) + self.ups.apply(init_weights) + + if gin_channels != 0: + self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1) + + def forward(self, x, g=None): + x = self.conv_pre(x) + if g is not None: + x = x + self.cond(g) + + for i in range(self.num_upsamples): + x = F.leaky_relu(x, modules.LRELU_SLOPE) + x = self.ups[i](x) + xs = None + for j in range(self.num_kernels): + if xs is None: + xs = self.resblocks[i * self.num_kernels + j](x) + else: + xs += self.resblocks[i * self.num_kernels + j](x) + x = xs / self.num_kernels + x = F.leaky_relu(x) + x = self.conv_post(x) + x = torch.tanh(x) + + return x + + def remove_weight_norm(self): + for l in self.ups: + remove_weight_norm(l) + for l in self.resblocks: + l.remove_weight_norm() + + +class SineGen(torch.nn.Module): + """Definition of sine generator + SineGen(samp_rate, harmonic_num = 0, + sine_amp = 0.1, noise_std = 0.003, + voiced_threshold = 0, + flag_for_pulse=False) + samp_rate: sampling rate in Hz + harmonic_num: number of harmonic overtones (default 0) + sine_amp: amplitude of sine-wavefrom (default 0.1) + noise_std: std of Gaussian noise (default 0.003) + voiced_thoreshold: F0 threshold for U/V classification (default 0) + flag_for_pulse: this SinGen is used inside PulseGen (default False) + Note: when flag_for_pulse is True, the first time step of a voiced + segment is always sin(np.pi) or cos(0) + """ + + def __init__( + self, + samp_rate, + harmonic_num=0, + sine_amp=0.1, + noise_std=0.003, + voiced_threshold=0, + flag_for_pulse=False, + ): + super(SineGen, self).__init__() + self.sine_amp = sine_amp + self.noise_std = noise_std + self.harmonic_num = harmonic_num + self.dim = self.harmonic_num + 1 + self.sampling_rate = samp_rate + self.voiced_threshold = voiced_threshold + + def _f02uv(self, f0): + # generate uv signal + uv = torch.ones_like(f0) + uv = uv * (f0 > self.voiced_threshold) + return uv + + def forward(self, f0, upp): + """sine_tensor, uv = forward(f0) + input F0: tensor(batchsize=1, length, dim=1) + f0 for unvoiced steps should be 0 + output sine_tensor: tensor(batchsize=1, length, dim) + output uv: tensor(batchsize=1, length, 1) + """ + with torch.no_grad(): + f0 = f0[:, None].transpose(1, 2) + f0_buf = torch.zeros(f0.shape[0], f0.shape[1], self.dim, device=f0.device) + # fundamental component + f0_buf[:, :, 0] = f0[:, :, 0] + for idx in np.arange(self.harmonic_num): + f0_buf[:, :, idx + 1] = f0_buf[:, :, 0] * ( + idx + 2 + ) # idx + 2: the (idx+1)-th overtone, (idx+2)-th harmonic + rad_values = (f0_buf / self.sampling_rate) % 1 ###%1意味着n_har的乘积无法后处理优化 + rand_ini = torch.rand( + f0_buf.shape[0], f0_buf.shape[2], device=f0_buf.device + ) + rand_ini[:, 0] = 0 + rad_values[:, 0, :] = rad_values[:, 0, :] + rand_ini + tmp_over_one = torch.cumsum(rad_values, 1) # % 1 #####%1意味着后面的cumsum无法再优化 + tmp_over_one *= upp + tmp_over_one = F.interpolate( + tmp_over_one.transpose(2, 1), + scale_factor=upp, + mode="linear", + align_corners=True, + ).transpose(2, 1) + rad_values = F.interpolate( + rad_values.transpose(2, 1), scale_factor=upp, mode="nearest" + ).transpose( + 2, 1 + ) ####### + tmp_over_one %= 1 + tmp_over_one_idx = (tmp_over_one[:, 1:, :] - tmp_over_one[:, :-1, :]) < 0 + cumsum_shift = torch.zeros_like(rad_values) + cumsum_shift[:, 1:, :] = tmp_over_one_idx * -1.0 + sine_waves = torch.sin( + torch.cumsum(rad_values + cumsum_shift, dim=1) * 2 * np.pi + ) + sine_waves = sine_waves * self.sine_amp + uv = self._f02uv(f0) + uv = F.interpolate( + uv.transpose(2, 1), scale_factor=upp, mode="nearest" + ).transpose(2, 1) + noise_amp = uv * self.noise_std + (1 - uv) * self.sine_amp / 3 + noise = noise_amp * torch.randn_like(sine_waves) + sine_waves = sine_waves * uv + noise + return sine_waves, uv, noise + + +class SourceModuleHnNSF(torch.nn.Module): + """SourceModule for hn-nsf + SourceModule(sampling_rate, harmonic_num=0, sine_amp=0.1, + add_noise_std=0.003, voiced_threshod=0) + sampling_rate: sampling_rate in Hz + harmonic_num: number of harmonic above F0 (default: 0) + sine_amp: amplitude of sine source signal (default: 0.1) + add_noise_std: std of additive Gaussian noise (default: 0.003) + note that amplitude of noise in unvoiced is decided + by sine_amp + voiced_threshold: threhold to set U/V given F0 (default: 0) + Sine_source, noise_source = SourceModuleHnNSF(F0_sampled) + F0_sampled (batchsize, length, 1) + Sine_source (batchsize, length, 1) + noise_source (batchsize, length 1) + uv (batchsize, length, 1) + """ + + def __init__( + self, + sampling_rate, + harmonic_num=0, + sine_amp=0.1, + add_noise_std=0.003, + voiced_threshod=0, + is_half=True, + ): + super(SourceModuleHnNSF, self).__init__() + + self.sine_amp = sine_amp + self.noise_std = add_noise_std + self.is_half = is_half + # to produce sine waveforms + self.l_sin_gen = SineGen( + sampling_rate, harmonic_num, sine_amp, add_noise_std, voiced_threshod + ) + + # to merge source harmonics into a single excitation + self.l_linear = torch.nn.Linear(harmonic_num + 1, 1) + self.l_tanh = torch.nn.Tanh() + + def forward(self, x, upp=None): + sine_wavs, uv, _ = self.l_sin_gen(x, upp) + if self.is_half: + sine_wavs = sine_wavs.half() + sine_merge = self.l_tanh(self.l_linear(sine_wavs)) + return sine_merge, None, None # noise, uv + + +class GeneratorNSF(torch.nn.Module): + def __init__( + self, + initial_channel, + resblock, + resblock_kernel_sizes, + resblock_dilation_sizes, + upsample_rates, + upsample_initial_channel, + upsample_kernel_sizes, + gin_channels, + sr, + is_half=False, + ): + super(GeneratorNSF, self).__init__() + self.num_kernels = len(resblock_kernel_sizes) + self.num_upsamples = len(upsample_rates) + + self.f0_upsamp = torch.nn.Upsample(scale_factor=np.prod(upsample_rates)) + self.m_source = SourceModuleHnNSF( + sampling_rate=sr, harmonic_num=0, is_half=is_half + ) + self.noise_convs = nn.ModuleList() + self.conv_pre = Conv1d( + initial_channel, upsample_initial_channel, 7, 1, padding=3 + ) + resblock = modules.ResBlock1 if resblock == "1" else modules.ResBlock2 + + self.ups = nn.ModuleList() + for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)): + c_cur = upsample_initial_channel // (2 ** (i + 1)) + self.ups.append( + weight_norm( + ConvTranspose1d( + upsample_initial_channel // (2**i), + upsample_initial_channel // (2 ** (i + 1)), + k, + u, + padding=(k - u) // 2, + ) + ) + ) + if i + 1 < len(upsample_rates): + stride_f0 = np.prod(upsample_rates[i + 1 :]) + self.noise_convs.append( + Conv1d( + 1, + c_cur, + kernel_size=stride_f0 * 2, + stride=stride_f0, + padding=stride_f0 // 2, + ) + ) + else: + self.noise_convs.append(Conv1d(1, c_cur, kernel_size=1)) + + self.resblocks = nn.ModuleList() + for i in range(len(self.ups)): + ch = upsample_initial_channel // (2 ** (i + 1)) + for j, (k, d) in enumerate( + zip(resblock_kernel_sizes, resblock_dilation_sizes) + ): + self.resblocks.append(resblock(ch, k, d)) + + self.conv_post = Conv1d(ch, 1, 7, 1, padding=3, bias=False) + self.ups.apply(init_weights) + + if gin_channels != 0: + self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1) + + self.upp = np.prod(upsample_rates) + + def forward(self, x, f0, g=None): + har_source, noi_source, uv = self.m_source(f0, self.upp) + har_source = har_source.transpose(1, 2) + x = self.conv_pre(x) + if g is not None: + x = x + self.cond(g) + + for i in range(self.num_upsamples): + x = F.leaky_relu(x, modules.LRELU_SLOPE) + x = self.ups[i](x) + x_source = self.noise_convs[i](har_source) + x = x + x_source + xs = None + for j in range(self.num_kernels): + if xs is None: + xs = self.resblocks[i * self.num_kernels + j](x) + else: + xs += self.resblocks[i * self.num_kernels + j](x) + x = xs / self.num_kernels + x = F.leaky_relu(x) + x = self.conv_post(x) + x = torch.tanh(x) + return x + + def remove_weight_norm(self): + for l in self.ups: + remove_weight_norm(l) + for l in self.resblocks: + l.remove_weight_norm() + + +sr2sr = { + "32k": 32000, + "40k": 40000, + "48k": 48000, +} + + +class SynthesizerTrnMsNSFsidM(nn.Module): + def __init__( + self, + spec_channels, + segment_size, + inter_channels, + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size, + p_dropout, + resblock, + resblock_kernel_sizes, + resblock_dilation_sizes, + upsample_rates, + upsample_initial_channel, + upsample_kernel_sizes, + spk_embed_dim, + gin_channels, + sr, + version, + **kwargs, + ): + super().__init__() + if type(sr) == type("strr"): + sr = sr2sr[sr] + self.spec_channels = spec_channels + self.inter_channels = inter_channels + self.hidden_channels = hidden_channels + self.filter_channels = filter_channels + self.n_heads = n_heads + self.n_layers = n_layers + self.kernel_size = kernel_size + self.p_dropout = p_dropout + self.resblock = resblock + self.resblock_kernel_sizes = resblock_kernel_sizes + self.resblock_dilation_sizes = resblock_dilation_sizes + self.upsample_rates = upsample_rates + self.upsample_initial_channel = upsample_initial_channel + self.upsample_kernel_sizes = upsample_kernel_sizes + self.segment_size = segment_size + self.gin_channels = gin_channels + # self.hop_length = hop_length# + self.spk_embed_dim = spk_embed_dim + if version == "v1": + self.enc_p = TextEncoder256( + inter_channels, + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size, + p_dropout, + ) + else: + self.enc_p = TextEncoder768( + inter_channels, + hidden_channels, + filter_channels, + n_heads, + n_layers, + kernel_size, + p_dropout, + ) + self.dec = GeneratorNSF( + inter_channels, + resblock, + resblock_kernel_sizes, + resblock_dilation_sizes, + upsample_rates, + upsample_initial_channel, + upsample_kernel_sizes, + gin_channels=gin_channels, + sr=sr, + is_half=kwargs["is_half"], + ) + self.enc_q = PosteriorEncoder( + spec_channels, + inter_channels, + hidden_channels, + 5, + 1, + 16, + gin_channels=gin_channels, + ) + self.flow = ResidualCouplingBlock( + inter_channels, hidden_channels, 5, 1, 3, gin_channels=gin_channels + ) + self.emb_g = nn.Embedding(self.spk_embed_dim, gin_channels) + self.speaker_map = None + logger.debug( + f"gin_channels: {gin_channels}, self.spk_embed_dim: {self.spk_embed_dim}" + ) + + def remove_weight_norm(self): + self.dec.remove_weight_norm() + self.flow.remove_weight_norm() + self.enc_q.remove_weight_norm() + + def construct_spkmixmap(self, n_speaker): + self.speaker_map = torch.zeros((n_speaker, 1, 1, self.gin_channels)) + for i in range(n_speaker): + self.speaker_map[i] = self.emb_g(torch.LongTensor([[i]])) + self.speaker_map = self.speaker_map.unsqueeze(0) + + def forward(self, phone, phone_lengths, pitch, nsff0, g, rnd, max_len=None): + if self.speaker_map is not None: # [N, S] * [S, B, 1, H] + g = g.reshape((g.shape[0], g.shape[1], 1, 1, 1)) # [N, S, B, 1, 1] + g = g * self.speaker_map # [N, S, B, 1, H] + g = torch.sum(g, dim=1) # [N, 1, B, 1, H] + g = g.transpose(0, -1).transpose(0, -2).squeeze(0) # [B, H, N] + else: + g = g.unsqueeze(0) + g = self.emb_g(g).transpose(1, 2) + + m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths) + z_p = (m_p + torch.exp(logs_p) * rnd) * x_mask + z = self.flow(z_p, x_mask, g=g, reverse=True) + o = self.dec((z * x_mask)[:, :, :max_len], nsff0, g=g) + return o + + +class MultiPeriodDiscriminator(torch.nn.Module): + def __init__(self, use_spectral_norm=False): + super(MultiPeriodDiscriminator, self).__init__() + periods = [2, 3, 5, 7, 11, 17] + # periods = [3, 5, 7, 11, 17, 23, 37] + + discs = [DiscriminatorS(use_spectral_norm=use_spectral_norm)] + discs = discs + [ + DiscriminatorP(i, use_spectral_norm=use_spectral_norm) for i in periods + ] + self.discriminators = nn.ModuleList(discs) + + def forward(self, y, y_hat): + y_d_rs = [] # + y_d_gs = [] + fmap_rs = [] + fmap_gs = [] + for i, d in enumerate(self.discriminators): + y_d_r, fmap_r = d(y) + y_d_g, fmap_g = d(y_hat) + # for j in range(len(fmap_r)): + # print(i,j,y.shape,y_hat.shape,fmap_r[j].shape,fmap_g[j].shape) + y_d_rs.append(y_d_r) + y_d_gs.append(y_d_g) + fmap_rs.append(fmap_r) + fmap_gs.append(fmap_g) + + return y_d_rs, y_d_gs, fmap_rs, fmap_gs + + +class MultiPeriodDiscriminatorV2(torch.nn.Module): + def __init__(self, use_spectral_norm=False): + super(MultiPeriodDiscriminatorV2, self).__init__() + # periods = [2, 3, 5, 7, 11, 17] + periods = [2, 3, 5, 7, 11, 17, 23, 37] + + discs = [DiscriminatorS(use_spectral_norm=use_spectral_norm)] + discs = discs + [ + DiscriminatorP(i, use_spectral_norm=use_spectral_norm) for i in periods + ] + self.discriminators = nn.ModuleList(discs) + + def forward(self, y, y_hat): + y_d_rs = [] # + y_d_gs = [] + fmap_rs = [] + fmap_gs = [] + for i, d in enumerate(self.discriminators): + y_d_r, fmap_r = d(y) + y_d_g, fmap_g = d(y_hat) + # for j in range(len(fmap_r)): + # print(i,j,y.shape,y_hat.shape,fmap_r[j].shape,fmap_g[j].shape) + y_d_rs.append(y_d_r) + y_d_gs.append(y_d_g) + fmap_rs.append(fmap_r) + fmap_gs.append(fmap_g) + + return y_d_rs, y_d_gs, fmap_rs, fmap_gs + + +class DiscriminatorS(torch.nn.Module): + def __init__(self, use_spectral_norm=False): + super(DiscriminatorS, self).__init__() + norm_f = weight_norm if use_spectral_norm == False else spectral_norm + self.convs = nn.ModuleList( + [ + norm_f(Conv1d(1, 16, 15, 1, padding=7)), + norm_f(Conv1d(16, 64, 41, 4, groups=4, padding=20)), + norm_f(Conv1d(64, 256, 41, 4, groups=16, padding=20)), + norm_f(Conv1d(256, 1024, 41, 4, groups=64, padding=20)), + norm_f(Conv1d(1024, 1024, 41, 4, groups=256, padding=20)), + norm_f(Conv1d(1024, 1024, 5, 1, padding=2)), + ] + ) + self.conv_post = norm_f(Conv1d(1024, 1, 3, 1, padding=1)) + + def forward(self, x): + fmap = [] + + for l in self.convs: + x = l(x) + x = F.leaky_relu(x, modules.LRELU_SLOPE) + fmap.append(x) + x = self.conv_post(x) + fmap.append(x) + x = torch.flatten(x, 1, -1) + + return x, fmap + + +class DiscriminatorP(torch.nn.Module): + def __init__(self, period, kernel_size=5, stride=3, use_spectral_norm=False): + super(DiscriminatorP, self).__init__() + self.period = period + self.use_spectral_norm = use_spectral_norm + norm_f = weight_norm if use_spectral_norm == False else spectral_norm + self.convs = nn.ModuleList( + [ + norm_f( + Conv2d( + 1, + 32, + (kernel_size, 1), + (stride, 1), + padding=(get_padding(kernel_size, 1), 0), + ) + ), + norm_f( + Conv2d( + 32, + 128, + (kernel_size, 1), + (stride, 1), + padding=(get_padding(kernel_size, 1), 0), + ) + ), + norm_f( + Conv2d( + 128, + 512, + (kernel_size, 1), + (stride, 1), + padding=(get_padding(kernel_size, 1), 0), + ) + ), + norm_f( + Conv2d( + 512, + 1024, + (kernel_size, 1), + (stride, 1), + padding=(get_padding(kernel_size, 1), 0), + ) + ), + norm_f( + Conv2d( + 1024, + 1024, + (kernel_size, 1), + 1, + padding=(get_padding(kernel_size, 1), 0), + ) + ), + ] + ) + self.conv_post = norm_f(Conv2d(1024, 1, (3, 1), 1, padding=(1, 0))) + + def forward(self, x): + fmap = [] + + # 1d to 2d + b, c, t = x.shape + if t % self.period != 0: # pad first + n_pad = self.period - (t % self.period) + x = F.pad(x, (0, n_pad), "reflect") + t = t + n_pad + x = x.view(b, c, t // self.period, self.period) + + for l in self.convs: + x = l(x) + x = F.leaky_relu(x, modules.LRELU_SLOPE) + fmap.append(x) + x = self.conv_post(x) + fmap.append(x) + x = torch.flatten(x, 1, -1) + + return x, fmap diff --git a/rvc/lib/infer_pack/modules.py b/rvc/lib/infer_pack/modules.py new file mode 100644 index 0000000..aa0a120 --- /dev/null +++ b/rvc/lib/infer_pack/modules.py @@ -0,0 +1,615 @@ +import copy +import math +from typing import Optional, Tuple + +import numpy as np +import scipy +import torch +from torch import nn +from torch.nn import AvgPool1d, Conv1d, Conv2d, ConvTranspose1d +from torch.nn import functional as F +from torch.nn.utils import remove_weight_norm, weight_norm + +from rvc.lib.infer_pack import commons +from rvc.lib.infer_pack.commons import get_padding, init_weights +from rvc.lib.infer_pack.transforms import piecewise_rational_quadratic_transform + +LRELU_SLOPE = 0.1 + + +class LayerNorm(nn.Module): + def __init__(self, channels, eps=1e-5): + super(LayerNorm, self).__init__() + self.channels = channels + self.eps = eps + + self.gamma = nn.Parameter(torch.ones(channels)) + self.beta = nn.Parameter(torch.zeros(channels)) + + def forward(self, x): + x = x.transpose(1, -1) + x = F.layer_norm(x, (self.channels,), self.gamma, self.beta, self.eps) + return x.transpose(1, -1) + + +class ConvReluNorm(nn.Module): + def __init__( + self, + in_channels, + hidden_channels, + out_channels, + kernel_size, + n_layers, + p_dropout, + ): + super(ConvReluNorm, self).__init__() + self.in_channels = in_channels + self.hidden_channels = hidden_channels + self.out_channels = out_channels + self.kernel_size = kernel_size + self.n_layers = n_layers + self.p_dropout = float(p_dropout) + assert n_layers > 1, "Number of layers should be larger than 0." + + self.conv_layers = nn.ModuleList() + self.norm_layers = nn.ModuleList() + self.conv_layers.append( + nn.Conv1d( + in_channels, hidden_channels, kernel_size, padding=kernel_size // 2 + ) + ) + self.norm_layers.append(LayerNorm(hidden_channels)) + self.relu_drop = nn.Sequential(nn.ReLU(), nn.Dropout(float(p_dropout))) + for _ in range(n_layers - 1): + self.conv_layers.append( + nn.Conv1d( + hidden_channels, + hidden_channels, + kernel_size, + padding=kernel_size // 2, + ) + ) + self.norm_layers.append(LayerNorm(hidden_channels)) + self.proj = nn.Conv1d(hidden_channels, out_channels, 1) + self.proj.weight.data.zero_() + self.proj.bias.data.zero_() + + def forward(self, x, x_mask): + x_org = x + for i in range(self.n_layers): + x = self.conv_layers[i](x * x_mask) + x = self.norm_layers[i](x) + x = self.relu_drop(x) + x = x_org + self.proj(x) + return x * x_mask + + +class DDSConv(nn.Module): + """ + Dialted and Depth-Separable Convolution + """ + + def __init__(self, channels, kernel_size, n_layers, p_dropout=0.0): + super(DDSConv, self).__init__() + self.channels = channels + self.kernel_size = kernel_size + self.n_layers = n_layers + self.p_dropout = float(p_dropout) + + self.drop = nn.Dropout(float(p_dropout)) + self.convs_sep = nn.ModuleList() + self.convs_1x1 = nn.ModuleList() + self.norms_1 = nn.ModuleList() + self.norms_2 = nn.ModuleList() + for i in range(n_layers): + dilation = kernel_size**i + padding = (kernel_size * dilation - dilation) // 2 + self.convs_sep.append( + nn.Conv1d( + channels, + channels, + kernel_size, + groups=channels, + dilation=dilation, + padding=padding, + ) + ) + self.convs_1x1.append(nn.Conv1d(channels, channels, 1)) + self.norms_1.append(LayerNorm(channels)) + self.norms_2.append(LayerNorm(channels)) + + def forward(self, x, x_mask, g: Optional[torch.Tensor] = None): + if g is not None: + x = x + g + for i in range(self.n_layers): + y = self.convs_sep[i](x * x_mask) + y = self.norms_1[i](y) + y = F.gelu(y) + y = self.convs_1x1[i](y) + y = self.norms_2[i](y) + y = F.gelu(y) + y = self.drop(y) + x = x + y + return x * x_mask + + +class WN(torch.nn.Module): + def __init__( + self, + hidden_channels, + kernel_size, + dilation_rate, + n_layers, + gin_channels=0, + p_dropout=0, + ): + super(WN, self).__init__() + assert kernel_size % 2 == 1 + self.hidden_channels = hidden_channels + self.kernel_size = (kernel_size,) + self.dilation_rate = dilation_rate + self.n_layers = n_layers + self.gin_channels = gin_channels + self.p_dropout = float(p_dropout) + + self.in_layers = torch.nn.ModuleList() + self.res_skip_layers = torch.nn.ModuleList() + self.drop = nn.Dropout(float(p_dropout)) + + if gin_channels != 0: + cond_layer = torch.nn.Conv1d( + gin_channels, 2 * hidden_channels * n_layers, 1 + ) + self.cond_layer = torch.nn.utils.weight_norm(cond_layer, name="weight") + + for i in range(n_layers): + dilation = dilation_rate**i + padding = int((kernel_size * dilation - dilation) / 2) + in_layer = torch.nn.Conv1d( + hidden_channels, + 2 * hidden_channels, + kernel_size, + dilation=dilation, + padding=padding, + ) + in_layer = torch.nn.utils.weight_norm(in_layer, name="weight") + self.in_layers.append(in_layer) + + # last one is not necessary + if i < n_layers - 1: + res_skip_channels = 2 * hidden_channels + else: + res_skip_channels = hidden_channels + + res_skip_layer = torch.nn.Conv1d(hidden_channels, res_skip_channels, 1) + res_skip_layer = torch.nn.utils.weight_norm(res_skip_layer, name="weight") + self.res_skip_layers.append(res_skip_layer) + + def forward( + self, x: torch.Tensor, x_mask: torch.Tensor, g: Optional[torch.Tensor] = None + ): + output = torch.zeros_like(x) + n_channels_tensor = torch.IntTensor([self.hidden_channels]) + + if g is not None: + g = self.cond_layer(g) + + for i, (in_layer, res_skip_layer) in enumerate( + zip(self.in_layers, self.res_skip_layers) + ): + x_in = in_layer(x) + if g is not None: + cond_offset = i * 2 * self.hidden_channels + g_l = g[:, cond_offset : cond_offset + 2 * self.hidden_channels, :] + else: + g_l = torch.zeros_like(x_in) + + acts = commons.fused_add_tanh_sigmoid_multiply(x_in, g_l, n_channels_tensor) + acts = self.drop(acts) + + res_skip_acts = res_skip_layer(acts) + if i < self.n_layers - 1: + res_acts = res_skip_acts[:, : self.hidden_channels, :] + x = (x + res_acts) * x_mask + output = output + res_skip_acts[:, self.hidden_channels :, :] + else: + output = output + res_skip_acts + return output * x_mask + + def remove_weight_norm(self): + if self.gin_channels != 0: + torch.nn.utils.remove_weight_norm(self.cond_layer) + for l in self.in_layers: + torch.nn.utils.remove_weight_norm(l) + for l in self.res_skip_layers: + torch.nn.utils.remove_weight_norm(l) + + def __prepare_scriptable__(self): + if self.gin_channels != 0: + for hook in self.cond_layer._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(self.cond_layer) + for l in self.in_layers: + for hook in l._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(l) + for l in self.res_skip_layers: + for hook in l._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(l) + return self + + +class ResBlock1(torch.nn.Module): + def __init__(self, channels, kernel_size=3, dilation=(1, 3, 5)): + super(ResBlock1, self).__init__() + self.convs1 = nn.ModuleList( + [ + weight_norm( + Conv1d( + channels, + channels, + kernel_size, + 1, + dilation=dilation[0], + padding=get_padding(kernel_size, dilation[0]), + ) + ), + weight_norm( + Conv1d( + channels, + channels, + kernel_size, + 1, + dilation=dilation[1], + padding=get_padding(kernel_size, dilation[1]), + ) + ), + weight_norm( + Conv1d( + channels, + channels, + kernel_size, + 1, + dilation=dilation[2], + padding=get_padding(kernel_size, dilation[2]), + ) + ), + ] + ) + self.convs1.apply(init_weights) + + self.convs2 = nn.ModuleList( + [ + weight_norm( + Conv1d( + channels, + channels, + kernel_size, + 1, + dilation=1, + padding=get_padding(kernel_size, 1), + ) + ), + weight_norm( + Conv1d( + channels, + channels, + kernel_size, + 1, + dilation=1, + padding=get_padding(kernel_size, 1), + ) + ), + weight_norm( + Conv1d( + channels, + channels, + kernel_size, + 1, + dilation=1, + padding=get_padding(kernel_size, 1), + ) + ), + ] + ) + self.convs2.apply(init_weights) + self.lrelu_slope = LRELU_SLOPE + + def forward(self, x: torch.Tensor, x_mask: Optional[torch.Tensor] = None): + for c1, c2 in zip(self.convs1, self.convs2): + xt = F.leaky_relu(x, self.lrelu_slope) + if x_mask is not None: + xt = xt * x_mask + xt = c1(xt) + xt = F.leaky_relu(xt, self.lrelu_slope) + if x_mask is not None: + xt = xt * x_mask + xt = c2(xt) + x = xt + x + if x_mask is not None: + x = x * x_mask + return x + + def remove_weight_norm(self): + for l in self.convs1: + remove_weight_norm(l) + for l in self.convs2: + remove_weight_norm(l) + + def __prepare_scriptable__(self): + for l in self.convs1: + for hook in l._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(l) + for l in self.convs2: + for hook in l._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(l) + return self + + +class ResBlock2(torch.nn.Module): + def __init__(self, channels, kernel_size=3, dilation=(1, 3)): + super(ResBlock2, self).__init__() + self.convs = nn.ModuleList( + [ + weight_norm( + Conv1d( + channels, + channels, + kernel_size, + 1, + dilation=dilation[0], + padding=get_padding(kernel_size, dilation[0]), + ) + ), + weight_norm( + Conv1d( + channels, + channels, + kernel_size, + 1, + dilation=dilation[1], + padding=get_padding(kernel_size, dilation[1]), + ) + ), + ] + ) + self.convs.apply(init_weights) + self.lrelu_slope = LRELU_SLOPE + + def forward(self, x, x_mask: Optional[torch.Tensor] = None): + for c in self.convs: + xt = F.leaky_relu(x, self.lrelu_slope) + if x_mask is not None: + xt = xt * x_mask + xt = c(xt) + x = xt + x + if x_mask is not None: + x = x * x_mask + return x + + def remove_weight_norm(self): + for l in self.convs: + remove_weight_norm(l) + + def __prepare_scriptable__(self): + for l in self.convs: + for hook in l._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(l) + return self + + +class Log(nn.Module): + def forward( + self, + x: torch.Tensor, + x_mask: torch.Tensor, + g: Optional[torch.Tensor] = None, + reverse: bool = False, + ) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: + if not reverse: + y = torch.log(torch.clamp_min(x, 1e-5)) * x_mask + logdet = torch.sum(-y, [1, 2]) + return y, logdet + else: + x = torch.exp(x) * x_mask + return x + + +class Flip(nn.Module): + # torch.jit.script() Compiled functions \ + # can't take variable number of arguments or \ + # use keyword-only arguments with defaults + def forward( + self, + x: torch.Tensor, + x_mask: torch.Tensor, + g: Optional[torch.Tensor] = None, + reverse: bool = False, + ) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: + x = torch.flip(x, [1]) + if not reverse: + logdet = torch.zeros(x.size(0)).to(dtype=x.dtype, device=x.device) + return x, logdet + else: + return x, torch.zeros([1], device=x.device) + + +class ElementwiseAffine(nn.Module): + def __init__(self, channels): + super(ElementwiseAffine, self).__init__() + self.channels = channels + self.m = nn.Parameter(torch.zeros(channels, 1)) + self.logs = nn.Parameter(torch.zeros(channels, 1)) + + def forward(self, x, x_mask, reverse=False, **kwargs): + if not reverse: + y = self.m + torch.exp(self.logs) * x + y = y * x_mask + logdet = torch.sum(self.logs * x_mask, [1, 2]) + return y, logdet + else: + x = (x - self.m) * torch.exp(-self.logs) * x_mask + return x + + +class ResidualCouplingLayer(nn.Module): + def __init__( + self, + channels, + hidden_channels, + kernel_size, + dilation_rate, + n_layers, + p_dropout=0, + gin_channels=0, + mean_only=False, + ): + assert channels % 2 == 0, "channels should be divisible by 2" + super(ResidualCouplingLayer, self).__init__() + self.channels = channels + self.hidden_channels = hidden_channels + self.kernel_size = kernel_size + self.dilation_rate = dilation_rate + self.n_layers = n_layers + self.half_channels = channels // 2 + self.mean_only = mean_only + + self.pre = nn.Conv1d(self.half_channels, hidden_channels, 1) + self.enc = WN( + hidden_channels, + kernel_size, + dilation_rate, + n_layers, + p_dropout=float(p_dropout), + gin_channels=gin_channels, + ) + self.post = nn.Conv1d(hidden_channels, self.half_channels * (2 - mean_only), 1) + self.post.weight.data.zero_() + self.post.bias.data.zero_() + + def forward( + self, + x: torch.Tensor, + x_mask: torch.Tensor, + g: Optional[torch.Tensor] = None, + reverse: bool = False, + ): + x0, x1 = torch.split(x, [self.half_channels] * 2, 1) + h = self.pre(x0) * x_mask + h = self.enc(h, x_mask, g=g) + stats = self.post(h) * x_mask + if not self.mean_only: + m, logs = torch.split(stats, [self.half_channels] * 2, 1) + else: + m = stats + logs = torch.zeros_like(m) + + if not reverse: + x1 = m + x1 * torch.exp(logs) * x_mask + x = torch.cat([x0, x1], 1) + logdet = torch.sum(logs, [1, 2]) + return x, logdet + else: + x1 = (x1 - m) * torch.exp(-logs) * x_mask + x = torch.cat([x0, x1], 1) + return x, torch.zeros([1]) + + def remove_weight_norm(self): + self.enc.remove_weight_norm() + + def __prepare_scriptable__(self): + for hook in self.enc._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(self.enc) + return self + + +class ConvFlow(nn.Module): + def __init__( + self, + in_channels, + filter_channels, + kernel_size, + n_layers, + num_bins=10, + tail_bound=5.0, + ): + super(ConvFlow, self).__init__() + self.in_channels = in_channels + self.filter_channels = filter_channels + self.kernel_size = kernel_size + self.n_layers = n_layers + self.num_bins = num_bins + self.tail_bound = tail_bound + self.half_channels = in_channels // 2 + + self.pre = nn.Conv1d(self.half_channels, filter_channels, 1) + self.convs = DDSConv(filter_channels, kernel_size, n_layers, p_dropout=0.0) + self.proj = nn.Conv1d( + filter_channels, self.half_channels * (num_bins * 3 - 1), 1 + ) + self.proj.weight.data.zero_() + self.proj.bias.data.zero_() + + def forward( + self, + x: torch.Tensor, + x_mask: torch.Tensor, + g: Optional[torch.Tensor] = None, + reverse=False, + ): + x0, x1 = torch.split(x, [self.half_channels] * 2, 1) + h = self.pre(x0) + h = self.convs(h, x_mask, g=g) + h = self.proj(h) * x_mask + + b, c, t = x0.shape + h = h.reshape(b, c, -1, t).permute(0, 1, 3, 2) # [b, cx?, t] -> [b, c, t, ?] + + unnormalized_widths = h[..., : self.num_bins] / math.sqrt(self.filter_channels) + unnormalized_heights = h[..., self.num_bins : 2 * self.num_bins] / math.sqrt( + self.filter_channels + ) + unnormalized_derivatives = h[..., 2 * self.num_bins :] + + x1, logabsdet = piecewise_rational_quadratic_transform( + x1, + unnormalized_widths, + unnormalized_heights, + unnormalized_derivatives, + inverse=reverse, + tails="linear", + tail_bound=self.tail_bound, + ) + + x = torch.cat([x0, x1], 1) * x_mask + logdet = torch.sum(logabsdet * x_mask, [1, 2]) + if not reverse: + return x, logdet + else: + return x diff --git a/rvc/lib/infer_pack/modules/F0Predictor/DioF0Predictor.py b/rvc/lib/infer_pack/modules/F0Predictor/DioF0Predictor.py new file mode 100644 index 0000000..f99f108 --- /dev/null +++ b/rvc/lib/infer_pack/modules/F0Predictor/DioF0Predictor.py @@ -0,0 +1,91 @@ +import numpy as np +import pyworld + +from rvc.lib.infer_pack.modules.F0Predictor.F0Predictor import F0Predictor + + +class DioF0Predictor(F0Predictor): + def __init__(self, hop_length=512, f0_min=50, f0_max=1100, sampling_rate=44100): + self.hop_length = hop_length + self.f0_min = f0_min + self.f0_max = f0_max + self.sampling_rate = sampling_rate + + def interpolate_f0(self, f0): + """ + 对F0进行插值处理 + """ + + data = np.reshape(f0, (f0.size, 1)) + + vuv_vector = np.zeros((data.size, 1), dtype=np.float32) + vuv_vector[data > 0.0] = 1.0 + vuv_vector[data <= 0.0] = 0.0 + + ip_data = data + + frame_number = data.size + last_value = 0.0 + for i in range(frame_number): + if data[i] <= 0.0: + j = i + 1 + for j in range(i + 1, frame_number): + if data[j] > 0.0: + break + if j < frame_number - 1: + if last_value > 0.0: + step = (data[j] - data[i - 1]) / float(j - i) + for k in range(i, j): + ip_data[k] = data[i - 1] + step * (k - i + 1) + else: + for k in range(i, j): + ip_data[k] = data[j] + else: + for k in range(i, frame_number): + ip_data[k] = last_value + else: + ip_data[i] = data[i] # 这里可能存在一个没有必要的拷贝 + last_value = data[i] + + return ip_data[:, 0], vuv_vector[:, 0] + + def resize_f0(self, x, target_len): + source = np.array(x) + source[source < 0.001] = np.nan + target = np.interp( + np.arange(0, len(source) * target_len, len(source)) / target_len, + np.arange(0, len(source)), + source, + ) + res = np.nan_to_num(target) + return res + + def compute_f0(self, wav, p_len=None): + if p_len is None: + p_len = wav.shape[0] // self.hop_length + f0, t = pyworld.dio( + wav.astype(np.double), + fs=self.sampling_rate, + f0_floor=self.f0_min, + f0_ceil=self.f0_max, + frame_period=1000 * self.hop_length / self.sampling_rate, + ) + f0 = pyworld.stonemask(wav.astype(np.double), f0, t, self.sampling_rate) + for index, pitch in enumerate(f0): + f0[index] = round(pitch, 1) + return self.interpolate_f0(self.resize_f0(f0, p_len))[0] + + def compute_f0_uv(self, wav, p_len=None): + if p_len is None: + p_len = wav.shape[0] // self.hop_length + f0, t = pyworld.dio( + wav.astype(np.double), + fs=self.sampling_rate, + f0_floor=self.f0_min, + f0_ceil=self.f0_max, + frame_period=1000 * self.hop_length / self.sampling_rate, + ) + f0 = pyworld.stonemask(wav.astype(np.double), f0, t, self.sampling_rate) + for index, pitch in enumerate(f0): + f0[index] = round(pitch, 1) + return self.interpolate_f0(self.resize_f0(f0, p_len)) diff --git a/rvc/lib/infer_pack/modules/F0Predictor/F0Predictor.py b/rvc/lib/infer_pack/modules/F0Predictor/F0Predictor.py new file mode 100644 index 0000000..0d81b05 --- /dev/null +++ b/rvc/lib/infer_pack/modules/F0Predictor/F0Predictor.py @@ -0,0 +1,16 @@ +class F0Predictor(object): + def compute_f0(self, wav, p_len): + """ + input: wav:[signal_length] + p_len:int + output: f0:[signal_length//hop_length] + """ + pass + + def compute_f0_uv(self, wav, p_len): + """ + input: wav:[signal_length] + p_len:int + output: f0:[signal_length//hop_length],uv:[signal_length//hop_length] + """ + pass diff --git a/rvc/lib/infer_pack/modules/F0Predictor/HarvestF0Predictor.py b/rvc/lib/infer_pack/modules/F0Predictor/HarvestF0Predictor.py new file mode 100644 index 0000000..7daf5a0 --- /dev/null +++ b/rvc/lib/infer_pack/modules/F0Predictor/HarvestF0Predictor.py @@ -0,0 +1,87 @@ +import numpy as np +import pyworld + +from rvc.lib.infer_pack.modules.F0Predictor.F0Predictor import F0Predictor + + +class HarvestF0Predictor(F0Predictor): + def __init__(self, hop_length=512, f0_min=50, f0_max=1100, sampling_rate=44100): + self.hop_length = hop_length + self.f0_min = f0_min + self.f0_max = f0_max + self.sampling_rate = sampling_rate + + def interpolate_f0(self, f0): + """ + 对F0进行插值处理 + """ + + data = np.reshape(f0, (f0.size, 1)) + + vuv_vector = np.zeros((data.size, 1), dtype=np.float32) + vuv_vector[data > 0.0] = 1.0 + vuv_vector[data <= 0.0] = 0.0 + + ip_data = data + + frame_number = data.size + last_value = 0.0 + for i in range(frame_number): + if data[i] <= 0.0: + j = i + 1 + for j in range(i + 1, frame_number): + if data[j] > 0.0: + break + if j < frame_number - 1: + if last_value > 0.0: + step = (data[j] - data[i - 1]) / float(j - i) + for k in range(i, j): + ip_data[k] = data[i - 1] + step * (k - i + 1) + else: + for k in range(i, j): + ip_data[k] = data[j] + else: + for k in range(i, frame_number): + ip_data[k] = last_value + else: + ip_data[i] = data[i] # 这里可能存在一个没有必要的拷贝 + last_value = data[i] + + return ip_data[:, 0], vuv_vector[:, 0] + + def resize_f0(self, x, target_len): + source = np.array(x) + source[source < 0.001] = np.nan + target = np.interp( + np.arange(0, len(source) * target_len, len(source)) / target_len, + np.arange(0, len(source)), + source, + ) + res = np.nan_to_num(target) + return res + + def compute_f0(self, wav, p_len=None): + if p_len is None: + p_len = wav.shape[0] // self.hop_length + f0, t = pyworld.harvest( + wav.astype(np.double), + fs=self.hop_length, + f0_ceil=self.f0_max, + f0_floor=self.f0_min, + frame_period=1000 * self.hop_length / self.sampling_rate, + ) + f0 = pyworld.stonemask(wav.astype(np.double), f0, t, self.fs) + return self.interpolate_f0(self.resize_f0(f0, p_len))[0] + + def compute_f0_uv(self, wav, p_len=None): + if p_len is None: + p_len = wav.shape[0] // self.hop_length + f0, t = pyworld.harvest( + wav.astype(np.double), + fs=self.sampling_rate, + f0_floor=self.f0_min, + f0_ceil=self.f0_max, + frame_period=1000 * self.hop_length / self.sampling_rate, + ) + f0 = pyworld.stonemask(wav.astype(np.double), f0, t, self.sampling_rate) + return self.interpolate_f0(self.resize_f0(f0, p_len)) diff --git a/rvc/lib/infer_pack/modules/F0Predictor/PMF0Predictor.py b/rvc/lib/infer_pack/modules/F0Predictor/PMF0Predictor.py new file mode 100644 index 0000000..75987ff --- /dev/null +++ b/rvc/lib/infer_pack/modules/F0Predictor/PMF0Predictor.py @@ -0,0 +1,98 @@ +import numpy as np +import parselmouth + +from rvc.lib.infer_pack.modules.F0Predictor.F0Predictor import F0Predictor + + +class PMF0Predictor(F0Predictor): + def __init__(self, hop_length=512, f0_min=50, f0_max=1100, sampling_rate=44100): + self.hop_length = hop_length + self.f0_min = f0_min + self.f0_max = f0_max + self.sampling_rate = sampling_rate + + def interpolate_f0(self, f0): + """ + 对F0进行插值处理 + """ + + data = np.reshape(f0, (f0.size, 1)) + + vuv_vector = np.zeros((data.size, 1), dtype=np.float32) + vuv_vector[data > 0.0] = 1.0 + vuv_vector[data <= 0.0] = 0.0 + + ip_data = data + + frame_number = data.size + last_value = 0.0 + for i in range(frame_number): + if data[i] <= 0.0: + j = i + 1 + for j in range(i + 1, frame_number): + if data[j] > 0.0: + break + if j < frame_number - 1: + if last_value > 0.0: + step = (data[j] - data[i - 1]) / float(j - i) + for k in range(i, j): + ip_data[k] = data[i - 1] + step * (k - i + 1) + else: + for k in range(i, j): + ip_data[k] = data[j] + else: + for k in range(i, frame_number): + ip_data[k] = last_value + else: + ip_data[i] = data[i] # 这里可能存在一个没有必要的拷贝 + last_value = data[i] + + return ip_data[:, 0], vuv_vector[:, 0] + + def compute_f0(self, wav, p_len=None): + x = wav + if p_len is None: + p_len = x.shape[0] // self.hop_length + else: + assert abs(p_len - x.shape[0] // self.hop_length) < 4, "pad length error" + time_step = self.hop_length / self.sampling_rate * 1000 + f0 = ( + parselmouth.Sound(x, self.sampling_rate) + .to_pitch_ac( + time_step=time_step / 1000, + voicing_threshold=0.6, + pitch_floor=self.f0_min, + pitch_ceiling=self.f0_max, + ) + .selected_array["frequency"] + ) + + pad_size = (p_len - len(f0) + 1) // 2 + if pad_size > 0 or p_len - len(f0) - pad_size > 0: + f0 = np.pad(f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant") + f0, uv = self.interpolate_f0(f0) + return f0 + + def compute_f0_uv(self, wav, p_len=None): + x = wav + if p_len is None: + p_len = x.shape[0] // self.hop_length + else: + assert abs(p_len - x.shape[0] // self.hop_length) < 4, "pad length error" + time_step = self.hop_length / self.sampling_rate * 1000 + f0 = ( + parselmouth.Sound(x, self.sampling_rate) + .to_pitch_ac( + time_step=time_step / 1000, + voicing_threshold=0.6, + pitch_floor=self.f0_min, + pitch_ceiling=self.f0_max, + ) + .selected_array["frequency"] + ) + + pad_size = (p_len - len(f0) + 1) // 2 + if pad_size > 0 or p_len - len(f0) - pad_size > 0: + f0 = np.pad(f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant") + f0, uv = self.interpolate_f0(f0) + return f0, uv diff --git a/rvc/lib/infer_pack/modules/F0Predictor/__init__.py b/rvc/lib/infer_pack/modules/F0Predictor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rvc/lib/infer_pack/onnx_inference.py b/rvc/lib/infer_pack/onnx_inference.py new file mode 100644 index 0000000..86e2583 --- /dev/null +++ b/rvc/lib/infer_pack/onnx_inference.py @@ -0,0 +1,149 @@ +import logging + +import librosa +import numpy as np +import onnxruntime +import soundfile + +logger = logging.getLogger(__name__) + + +class ContentVec: + def __init__(self, vec_path="pretrained/vec-768-layer-12.onnx", device=None): + logger.info("Load model(s) from {}".format(vec_path)) + if device == "cpu" or device is None: + providers = ["CPUExecutionProvider"] + elif device == "cuda": + providers = ["CUDAExecutionProvider", "CPUExecutionProvider"] + elif device == "dml": + providers = ["DmlExecutionProvider"] + else: + raise RuntimeError("Unsportted Device") + self.model = onnxruntime.InferenceSession(vec_path, providers=providers) + + def __call__(self, wav): + return self.forward(wav) + + def forward(self, wav): + feats = wav + if feats.ndim == 2: # double channels + feats = feats.mean(-1) + assert feats.ndim == 1, feats.ndim + feats = np.expand_dims(np.expand_dims(feats, 0), 0) + onnx_input = {self.model.get_inputs()[0].name: feats} + logits = self.model.run(None, onnx_input)[0] + return logits.transpose(0, 2, 1) + + +def get_f0_predictor(f0_predictor, hop_length, sampling_rate, **kargs): + if f0_predictor == "pm": + from lib.infer_pack.modules.F0Predictor.PMF0Predictor import PMF0Predictor + + f0_predictor_object = PMF0Predictor( + hop_length=hop_length, sampling_rate=sampling_rate + ) + elif f0_predictor == "harvest": + from lib.infer_pack.modules.F0Predictor.HarvestF0Predictor import ( + HarvestF0Predictor, + ) + + f0_predictor_object = HarvestF0Predictor( + hop_length=hop_length, sampling_rate=sampling_rate + ) + elif f0_predictor == "dio": + from lib.infer_pack.modules.F0Predictor.DioF0Predictor import DioF0Predictor + + f0_predictor_object = DioF0Predictor( + hop_length=hop_length, sampling_rate=sampling_rate + ) + else: + raise Exception("Unknown f0 predictor") + return f0_predictor_object + + +class OnnxRVC: + def __init__( + self, + model_path, + sr=40000, + hop_size=512, + vec_path="vec-768-layer-12", + device="cpu", + ): + vec_path = f"pretrained/{vec_path}.onnx" + self.vec_model = ContentVec(vec_path, device) + if device == "cpu" or device is None: + providers = ["CPUExecutionProvider"] + elif device == "cuda": + providers = ["CUDAExecutionProvider", "CPUExecutionProvider"] + elif device == "dml": + providers = ["DmlExecutionProvider"] + else: + raise RuntimeError("Unsportted Device") + self.model = onnxruntime.InferenceSession(model_path, providers=providers) + self.sampling_rate = sr + self.hop_size = hop_size + + def forward(self, hubert, hubert_length, pitch, pitchf, ds, rnd): + onnx_input = { + self.model.get_inputs()[0].name: hubert, + self.model.get_inputs()[1].name: hubert_length, + self.model.get_inputs()[2].name: pitch, + self.model.get_inputs()[3].name: pitchf, + self.model.get_inputs()[4].name: ds, + self.model.get_inputs()[5].name: rnd, + } + return (self.model.run(None, onnx_input)[0] * 32767).astype(np.int16) + + def inference( + self, + raw_path, + sid, + f0_method="dio", + f0_up_key=0, + pad_time=0.5, + cr_threshold=0.02, + ): + f0_min = 50 + f0_max = 1100 + f0_mel_min = 1127 * np.log(1 + f0_min / 700) + f0_mel_max = 1127 * np.log(1 + f0_max / 700) + f0_predictor = get_f0_predictor( + f0_method, + hop_length=self.hop_size, + sampling_rate=self.sampling_rate, + threshold=cr_threshold, + ) + wav, sr = librosa.load(raw_path, sr=self.sampling_rate) + org_length = len(wav) + if org_length / sr > 50.0: + raise RuntimeError("Reached Max Length") + + wav16k = librosa.resample(wav, orig_sr=self.sampling_rate, target_sr=16000) + wav16k = wav16k + + hubert = self.vec_model(wav16k) + hubert = np.repeat(hubert, 2, axis=2).transpose(0, 2, 1).astype(np.float32) + hubert_length = hubert.shape[1] + + pitchf = f0_predictor.compute_f0(wav, hubert_length) + pitchf = pitchf * 2 ** (f0_up_key / 12) + pitch = pitchf.copy() + f0_mel = 1127 * np.log(1 + pitch / 700) + f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / ( + f0_mel_max - f0_mel_min + ) + 1 + f0_mel[f0_mel <= 1] = 1 + f0_mel[f0_mel > 255] = 255 + pitch = np.rint(f0_mel).astype(np.int64) + + pitchf = pitchf.reshape(1, len(pitchf)).astype(np.float32) + pitch = pitch.reshape(1, len(pitch)) + ds = np.array([sid]).astype(np.int64) + + rnd = np.random.randn(1, 192, hubert_length).astype(np.float32) + hubert_length = np.array([hubert_length]).astype(np.int64) + + out_wav = self.forward(hubert, hubert_length, pitch, pitchf, ds, rnd).squeeze() + out_wav = np.pad(out_wav, (0, 2 * self.hop_size), "constant") + return out_wav[0:org_length] diff --git a/rvc/lib/infer_pack/transforms.py b/rvc/lib/infer_pack/transforms.py new file mode 100644 index 0000000..6d07b3b --- /dev/null +++ b/rvc/lib/infer_pack/transforms.py @@ -0,0 +1,207 @@ +import numpy as np +import torch +from torch.nn import functional as F + +DEFAULT_MIN_BIN_WIDTH = 1e-3 +DEFAULT_MIN_BIN_HEIGHT = 1e-3 +DEFAULT_MIN_DERIVATIVE = 1e-3 + + +def piecewise_rational_quadratic_transform( + inputs, + unnormalized_widths, + unnormalized_heights, + unnormalized_derivatives, + inverse=False, + tails=None, + tail_bound=1.0, + min_bin_width=DEFAULT_MIN_BIN_WIDTH, + min_bin_height=DEFAULT_MIN_BIN_HEIGHT, + min_derivative=DEFAULT_MIN_DERIVATIVE, +): + if tails is None: + spline_fn = rational_quadratic_spline + spline_kwargs = {} + else: + spline_fn = unconstrained_rational_quadratic_spline + spline_kwargs = {"tails": tails, "tail_bound": tail_bound} + + outputs, logabsdet = spline_fn( + inputs=inputs, + unnormalized_widths=unnormalized_widths, + unnormalized_heights=unnormalized_heights, + unnormalized_derivatives=unnormalized_derivatives, + inverse=inverse, + min_bin_width=min_bin_width, + min_bin_height=min_bin_height, + min_derivative=min_derivative, + **spline_kwargs + ) + return outputs, logabsdet + + +def searchsorted(bin_locations, inputs, eps=1e-6): + bin_locations[..., -1] += eps + return torch.sum(inputs[..., None] >= bin_locations, dim=-1) - 1 + + +def unconstrained_rational_quadratic_spline( + inputs, + unnormalized_widths, + unnormalized_heights, + unnormalized_derivatives, + inverse=False, + tails="linear", + tail_bound=1.0, + min_bin_width=DEFAULT_MIN_BIN_WIDTH, + min_bin_height=DEFAULT_MIN_BIN_HEIGHT, + min_derivative=DEFAULT_MIN_DERIVATIVE, +): + inside_interval_mask = (inputs >= -tail_bound) & (inputs <= tail_bound) + outside_interval_mask = ~inside_interval_mask + + outputs = torch.zeros_like(inputs) + logabsdet = torch.zeros_like(inputs) + + if tails == "linear": + unnormalized_derivatives = F.pad(unnormalized_derivatives, pad=(1, 1)) + constant = np.log(np.exp(1 - min_derivative) - 1) + unnormalized_derivatives[..., 0] = constant + unnormalized_derivatives[..., -1] = constant + + outputs[outside_interval_mask] = inputs[outside_interval_mask] + logabsdet[outside_interval_mask] = 0 + else: + raise RuntimeError("{} tails are not implemented.".format(tails)) + + ( + outputs[inside_interval_mask], + logabsdet[inside_interval_mask], + ) = rational_quadratic_spline( + inputs=inputs[inside_interval_mask], + unnormalized_widths=unnormalized_widths[inside_interval_mask, :], + unnormalized_heights=unnormalized_heights[inside_interval_mask, :], + unnormalized_derivatives=unnormalized_derivatives[inside_interval_mask, :], + inverse=inverse, + left=-tail_bound, + right=tail_bound, + bottom=-tail_bound, + top=tail_bound, + min_bin_width=min_bin_width, + min_bin_height=min_bin_height, + min_derivative=min_derivative, + ) + + return outputs, logabsdet + + +def rational_quadratic_spline( + inputs, + unnormalized_widths, + unnormalized_heights, + unnormalized_derivatives, + inverse=False, + left=0.0, + right=1.0, + bottom=0.0, + top=1.0, + min_bin_width=DEFAULT_MIN_BIN_WIDTH, + min_bin_height=DEFAULT_MIN_BIN_HEIGHT, + min_derivative=DEFAULT_MIN_DERIVATIVE, +): + if torch.min(inputs) < left or torch.max(inputs) > right: + raise ValueError("Input to a transform is not within its domain") + + num_bins = unnormalized_widths.shape[-1] + + if min_bin_width * num_bins > 1.0: + raise ValueError("Minimal bin width too large for the number of bins") + if min_bin_height * num_bins > 1.0: + raise ValueError("Minimal bin height too large for the number of bins") + + widths = F.softmax(unnormalized_widths, dim=-1) + widths = min_bin_width + (1 - min_bin_width * num_bins) * widths + cumwidths = torch.cumsum(widths, dim=-1) + cumwidths = F.pad(cumwidths, pad=(1, 0), mode="constant", value=0.0) + cumwidths = (right - left) * cumwidths + left + cumwidths[..., 0] = left + cumwidths[..., -1] = right + widths = cumwidths[..., 1:] - cumwidths[..., :-1] + + derivatives = min_derivative + F.softplus(unnormalized_derivatives) + + heights = F.softmax(unnormalized_heights, dim=-1) + heights = min_bin_height + (1 - min_bin_height * num_bins) * heights + cumheights = torch.cumsum(heights, dim=-1) + cumheights = F.pad(cumheights, pad=(1, 0), mode="constant", value=0.0) + cumheights = (top - bottom) * cumheights + bottom + cumheights[..., 0] = bottom + cumheights[..., -1] = top + heights = cumheights[..., 1:] - cumheights[..., :-1] + + if inverse: + bin_idx = searchsorted(cumheights, inputs)[..., None] + else: + bin_idx = searchsorted(cumwidths, inputs)[..., None] + + input_cumwidths = cumwidths.gather(-1, bin_idx)[..., 0] + input_bin_widths = widths.gather(-1, bin_idx)[..., 0] + + input_cumheights = cumheights.gather(-1, bin_idx)[..., 0] + delta = heights / widths + input_delta = delta.gather(-1, bin_idx)[..., 0] + + input_derivatives = derivatives.gather(-1, bin_idx)[..., 0] + input_derivatives_plus_one = derivatives[..., 1:].gather(-1, bin_idx)[..., 0] + + input_heights = heights.gather(-1, bin_idx)[..., 0] + + if inverse: + a = (inputs - input_cumheights) * ( + input_derivatives + input_derivatives_plus_one - 2 * input_delta + ) + input_heights * (input_delta - input_derivatives) + b = input_heights * input_derivatives - (inputs - input_cumheights) * ( + input_derivatives + input_derivatives_plus_one - 2 * input_delta + ) + c = -input_delta * (inputs - input_cumheights) + + discriminant = b.pow(2) - 4 * a * c + assert (discriminant >= 0).all() + + root = (2 * c) / (-b - torch.sqrt(discriminant)) + outputs = root * input_bin_widths + input_cumwidths + + theta_one_minus_theta = root * (1 - root) + denominator = input_delta + ( + (input_derivatives + input_derivatives_plus_one - 2 * input_delta) + * theta_one_minus_theta + ) + derivative_numerator = input_delta.pow(2) * ( + input_derivatives_plus_one * root.pow(2) + + 2 * input_delta * theta_one_minus_theta + + input_derivatives * (1 - root).pow(2) + ) + logabsdet = torch.log(derivative_numerator) - 2 * torch.log(denominator) + + return outputs, -logabsdet + else: + theta = (inputs - input_cumwidths) / input_bin_widths + theta_one_minus_theta = theta * (1 - theta) + + numerator = input_heights * ( + input_delta * theta.pow(2) + input_derivatives * theta_one_minus_theta + ) + denominator = input_delta + ( + (input_derivatives + input_derivatives_plus_one - 2 * input_delta) + * theta_one_minus_theta + ) + outputs = input_cumheights + numerator / denominator + + derivative_numerator = input_delta.pow(2) * ( + input_derivatives_plus_one * theta.pow(2) + + 2 * input_delta * theta_one_minus_theta + + input_derivatives * (1 - theta).pow(2) + ) + logabsdet = torch.log(derivative_numerator) - 2 * torch.log(denominator) + + return outputs, logabsdet