import math from typing import Optional, Tuple from tinygrad import Tensor, dtypes import librosa import soundfile import numpy as np import parselmouth class PMF0Predictor: # from https://github.com/svc-develop-team/so-vits-svc/ def __init__(self,hop_length=512,f0_min=50,f0_max=1100,sampling_rate=44100): self.hop_length, self.f0_min, self.f0_max, self.sampling_rate, self.name = hop_length, f0_min, f0_max, sampling_rate, "pm" def interpolate_f0(self,f0): vuv_vector = np.zeros_like(f0, dtype=np.float32) vuv_vector[f0 > 0.0] = 1.0 vuv_vector[f0 <= 0.0] = 0.0 nzindex = np.nonzero(f0)[0] data = f0[nzindex] nzindex = nzindex.astype(np.float32) time_org = self.hop_length / self.sampling_rate * nzindex time_frame = np.arange(f0.shape[0]) * self.hop_length / self.sampling_rate if data.shape[0] <= 0: return np.zeros(f0.shape[0], dtype=np.float32),vuv_vector if data.shape[0] == 1: return np.ones(f0.shape[0], dtype=np.float32) * f0[0],vuv_vector f0 = np.interp(time_frame, time_org, data, left=data[0], right=data[-1]) return f0,vuv_vector def compute_f0(self,wav,p_len=None): x = wav if p_len is None: p_len = x.shape[0]//self.hop_length else: assert abs(p_len-x.shape[0]//self.hop_length) < 4, "pad length error" time_step = self.hop_length / self.sampling_rate * 1000 f0 = parselmouth.Sound(x, self.sampling_rate) \ .to_pitch_ac(time_step=time_step / 1000, voicing_threshold=0.6,pitch_floor=self.f0_min, pitch_ceiling=self.f0_max) \ .selected_array['frequency'] pad_size=(p_len - len(f0) + 1) // 2 if(pad_size>0 or p_len - len(f0) - pad_size>0): f0 = np.pad(f0,[[pad_size,p_len - len(f0) - pad_size]], mode='constant') f0,uv = self.interpolate_f0(f0) return f0 def compute_f0_uv(self,wav,p_len=None): x = wav if p_len is None: p_len = x.shape[0]//self.hop_length else: assert abs(p_len-x.shape[0]//self.hop_length) < 4, "pad length error" time_step = self.hop_length / self.sampling_rate * 1000 f0 = parselmouth.Sound(x, self.sampling_rate).to_pitch_ac( time_step=time_step / 1000, voicing_threshold=0.6, pitch_floor=self.f0_min, pitch_ceiling=self.f0_max).selected_array['frequency'] pad_size=(p_len - len(f0) + 1) // 2 if(pad_size>0 or p_len - len(f0) - pad_size>0): f0 = np.pad(f0,[[pad_size,p_len - len(f0) - pad_size]], mode='constant') f0,uv = self.interpolate_f0(f0) return f0,uv class Slicer: # from https://github.com/svc-develop-team/so-vits-svc/ def __init__(self, sr: int, threshold: float = -40., min_length: int = 5000, min_interval: int = 300, hop_size: int = 20, max_sil_kept: int = 5000): if not min_length >= min_interval >= hop_size: raise ValueError('The following condition must be satisfied: min_length >= min_interval >= hop_size') if not max_sil_kept >= hop_size: raise ValueError('The following condition must be satisfied: max_sil_kept >= hop_size') min_interval = sr * min_interval / 1000 self.threshold = 10 ** (threshold / 20.) self.hop_size = round(sr * hop_size / 1000) self.win_size = min(round(min_interval), 4 * self.hop_size) self.min_length = round(sr * min_length / 1000 / self.hop_size) self.min_interval = round(min_interval / self.hop_size) self.max_sil_kept = round(sr * max_sil_kept / 1000 / self.hop_size) def _apply_slice(self, waveform, begin, end): if len(waveform.shape) > 1: return waveform[:, begin * self.hop_size: min(waveform.shape[1], end * self.hop_size)] else: return waveform[begin * self.hop_size: min(waveform.shape[0], end * self.hop_size)] def slice(self, waveform): samples = librosa.to_mono(waveform) if len(waveform.shape) > 1 else waveform if samples.shape[0] <= self.min_length: return {"0": {"slice": False, "split_time": f"0,{len(waveform)}"}} rms_list = librosa.feature.rms(y=samples, frame_length=self.win_size, hop_length=self.hop_size).squeeze(0) sil_tags, silence_start, clip_start = [], None, 0 for i, rms in enumerate(rms_list): if rms < self.threshold: # Keep looping while frame is silent. if silence_start is None: # Record start of silent frames. silence_start = i continue if silence_start is None: continue # Keep looping while frame is not silent and silence start has not been recorded. # Clear recorded silence start if interval is not enough or clip is too short is_leading_silence = silence_start == 0 and i > self.max_sil_kept need_slice_middle = i - silence_start >= self.min_interval and i - clip_start >= self.min_length if not is_leading_silence and not need_slice_middle: silence_start = None continue if i - silence_start <= self.max_sil_kept: # Need slicing. Record the range of silent frames to be removed. pos = rms_list[silence_start: i + 1].argmin() + silence_start sil_tags.append((0, pos) if silence_start == 0 else (pos, pos)) clip_start = pos elif i - silence_start <= self.max_sil_kept * 2: pos = rms_list[i - self.max_sil_kept: silence_start + self.max_sil_kept + 1].argmin() pos += i - self.max_sil_kept pos_l = rms_list[silence_start: silence_start + self.max_sil_kept + 1].argmin() + silence_start pos_r = rms_list[i - self.max_sil_kept: i + 1].argmin() + i - self.max_sil_kept if silence_start == 0: sil_tags.append((0, pos_r)) clip_start = pos_r else: sil_tags.append((min(pos_l, pos), max(pos_r, pos))) clip_start = max(pos_r, pos) else: pos_l = rms_list[silence_start: silence_start + self.max_sil_kept + 1].argmin() + silence_start pos_r = rms_list[i - self.max_sil_kept: i + 1].argmin() + i - self.max_sil_kept sil_tags.append((0, pos_r) if silence_start == 0 else (pos_l, pos_r)) clip_start = pos_r silence_start = None total_frames = rms_list.shape[0] if silence_start is not None and total_frames - silence_start >= self.min_interval: # Deal with trailing silence. silence_end = min(total_frames, silence_start + self.max_sil_kept) pos = rms_list[silence_start: silence_end + 1].argmin() + silence_start sil_tags.append((pos, total_frames + 1)) if len(sil_tags) == 0: return {"0": {"slice": False, "split_time": f"0,{len(waveform)}"}} # Apply and return slices. chunks = [] if sil_tags[0][0]: chunks.append({"slice": False, "split_time": f"0,{min(waveform.shape[0], sil_tags[0][0] * self.hop_size)}"}) for i in range(0, len(sil_tags)): if i: chunks.append({"slice": False, "split_time": f"{sil_tags[i - 1][1] * self.hop_size},{min(waveform.shape[0], sil_tags[i][0] * self.hop_size)}"}) chunks.append({"slice": True, "split_time": f"{sil_tags[i][0] * self.hop_size},{min(waveform.shape[0], sil_tags[i][1] * self.hop_size)}"}) if sil_tags[-1][1] * self.hop_size < len(waveform): chunks.append({"slice": False, "split_time": f"{sil_tags[-1][1] * self.hop_size},{len(waveform)}"}) chunk_dict = {} for i in range(len(chunks)): chunk_dict[str(i)] = chunks[i] return chunk_dict # sinc_interp_hann audio resampling class Resample: def __init__(self, orig_freq:int=16000, new_freq:int=16000, lowpass_filter_width:int=6, rolloff:float=0.99, beta:Optional[float]=None, dtype:Optional[dtypes]=None): self.orig_freq, self.new_freq, self.lowpass_filter_width, self.rolloff, self.beta = orig_freq, new_freq, lowpass_filter_width, rolloff, beta self.gcd = math.gcd(int(self.orig_freq), int(self.new_freq)) self.kernel, self.width = self._get_sinc_resample_kernel(dtype) if self.orig_freq != self.new_freq else (None, None) def __call__(self, waveform:Tensor) -> Tensor: if self.orig_freq == self.new_freq: return waveform return self._apply_sinc_resample_kernel(waveform) def _apply_sinc_resample_kernel(self, waveform:Tensor): if not waveform.is_floating_point(): raise TypeError(f"Waveform tensor expected to be of type float, but received {waveform.dtype}.") orig_freq, new_freq = (int(self.orig_freq) // self.gcd), (int(self.new_freq) // self.gcd) shape = waveform.shape waveform = waveform.reshape(-1, shape[-1]) # pack batch num_wavs, length = waveform.shape target_length = int(math.ceil(new_freq * length / orig_freq)) waveform = waveform.pad((self.width, self.width + orig_freq)) resampled = waveform[:, None].conv2d(self.kernel, stride=orig_freq) resampled = resampled.transpose(1, 2).reshape(num_wavs, -1) resampled = resampled[..., :target_length] resampled = resampled.reshape(shape[:-1] + resampled.shape[-1:]) # unpack batch return resampled def _get_sinc_resample_kernel(self, dtype=None): orig_freq, new_freq = (int(self.orig_freq) // self.gcd), (int(self.new_freq) // self.gcd) if self.lowpass_filter_width <= 0: raise ValueError("Low pass filter width should be positive.") base_freq = min(orig_freq, new_freq) base_freq *= self.rolloff width = math.ceil(self.lowpass_filter_width * orig_freq / base_freq) idx = Tensor.arange(-width, width + orig_freq, dtype=(dtype if dtype is not None else dtypes.float32))[None, None] / orig_freq t = Tensor.arange(0, -new_freq, -1, dtype=dtype)[:, None, None] / new_freq + idx t *= base_freq t = t.clip(-self.lowpass_filter_width, self.lowpass_filter_width) window = (t * math.pi / self.lowpass_filter_width / 2).cos() ** 2 t *= math.pi scale = base_freq / orig_freq kernels = Tensor.where(t == 0, Tensor(1.0, dtype=t.dtype).to(t.device), t.sin() / t) kernels *= window * scale if dtype is None: kernels = kernels.cast(dtype=dtypes.float32) return kernels, width def sinc_interp_resample(x:Tensor, orig_freq:int=16000, new_freq:int=1600, lowpass_filter_width:int=6, rolloff:float=0.99, beta:Optional[float]=None): resamp = Resample(orig_freq, new_freq, lowpass_filter_width, rolloff, beta, x.dtype) return resamp(x) def cut(audio_path, db_thresh=-30, min_len=5000): audio, sr = librosa.load(audio_path, sr=None) slicer = Slicer(sr=sr, threshold=db_thresh, min_length=min_len) chunks = slicer.slice(audio) return chunks def chunks2audio(audio_path, chunks): chunks = dict(chunks) audio, sr = load_audiofile(audio_path) if len(audio.shape) == 2 and audio.shape[1] >= 2: audio = audio.mean(0).unsqueeze(0) audio = audio.numpy()[0] result = [] for k, v in chunks.items(): tag = v["split_time"].split(",") if tag[0] != tag[1]: result.append((v["slice"], audio[int(tag[0]):int(tag[1])])) return result, sr def load_audiofile(filepath:str, frame_offset:int=0, num_frames:int=-1, channels_first:bool=True): with soundfile.SoundFile(filepath, "r") as file_: frames = file_._prepare_read(frame_offset, None, num_frames) waveform = file_.read(frames, "float32", always_2d=True) sample_rate = file_.samplerate waveform = Tensor(waveform) if channels_first: waveform = waveform.transpose(0, 1) return waveform, sample_rate def get_unit_f0(wav:Tensor, tran, hop_length, target_sample, f0_filter=False) -> Tuple[Tensor,Tensor,Tensor]: f0_predictor = PMF0Predictor(hop_length, sampling_rate=target_sample) f0, uv = f0_predictor.compute_f0_uv(wav.numpy()) if f0_filter and sum(f0) == 0: raise RuntimeError("No voice detected") f0 = Tensor(f0.astype(np.float32)).float() f0 = (f0 * 2 ** (tran / 12)).unsqueeze(0) uv = Tensor(uv.astype(np.float32)).float().unsqueeze(0) wav16k = sinc_interp_resample(wav[None,:], target_sample, 16000)[0] return wav16k.realize(), f0.realize(), uv.realize()