diff --git a/spatialscaper/core.py b/spatialscaper/core.py index 8775a94..2c0816d 100644 --- a/spatialscaper/core.py +++ b/spatialscaper/core.py @@ -26,7 +26,7 @@ save_output, sort_matrix_by_columns, ) -from .sofa_utils import load_rir_pos, load_pos +from .room import get_room # Sound event classes for DCASE Challenge @@ -63,23 +63,6 @@ ], ) -# Paths for room SOFA files -__SPATIAL_SCAPER_RIRS_DIR__ = "spatialscaper_RIRs" -__PATH_TO_AMBIENT_NOISE_FILES__ = os.path.join("source_data", "TAU-SNoise_DB") -__ROOM_RIR_FILE__ = { - "metu": "metu_sparg_em32.sofa", - "arni": "arni_{fmt}.sofa", - "bomb_shelter": "bomb_shelter_{fmt}.sofa", - "gym": "gym_{fmt}.sofa", - "pb132": "pb132_{fmt}.sofa", - "pc226": "pc226_{fmt}.sofa", - "sa203": "sa203_{fmt}.sofa", - "sc203": "sc203_{fmt}.sofa", - "se203": "se203_{fmt}.sofa", - "tb103": "tb103_{fmt}.sofa", - "tc352": "tc352_{fmt}.sofa", -} - class Scaper: def __init__( @@ -89,7 +72,6 @@ def __init__( rir_dir="", fmt="mic", room="metu", - use_room_ambient_noise=True, background_dir=None, sr=24000, DCASE_format=True, @@ -137,18 +119,20 @@ def __init__( """ self.duration = duration + self.sr = sr + + self.room = get_room(rir_dir, room, fmt=fmt) self.foreground_dir = foreground_dir self.background_dir = background_dir - self.rir_dir = rir_dir - self.room = room - self.use_room_ambient_noise = use_room_ambient_noise - self.format = fmt - self.sr = sr + self.DCASE_format = DCASE_format if self.DCASE_format: self.label_rate = __DCASE_LABEL_RATE__ + + self.max_sample_attempts = max_sample_attempts self.max_event_overlap = max_event_overlap self.max_event_dur = max_event_dur + self.speed_limit = speed_limit self.ref_db = ref_db self.fg_events = [] @@ -162,46 +146,15 @@ def __init__( else: self.fg_labels = {l: i for i, l in enumerate(fg_label_list)} - self.speed_limit = speed_limit - - self.max_sample_attempts = max_sample_attempts - - def get_path_to_room_ambient_noise(self): - path_to_ambient_noise_files = os.path.join( - self.rir_dir, __PATH_TO_AMBIENT_NOISE_FILES__ - ) - all_ambient_noise_files = glob.glob( - os.path.join(path_to_ambient_noise_files, "*", "*") - ) - if self.format == "mic": - ambient_noise_format_files = [ - f for f in all_ambient_noise_files if "tetra" in f - ] - elif self.format == "foa": - ambient_noise_format_files = [ - f for f in all_ambient_noise_files if "foa" in f - ] - if self.room == "bomb_shelter": - room_ambient_noise_file = [ - f for f in ambient_noise_format_files if "bomb_center" in f - ] - else: - room_ambient_noise_file = [ - f for f in ambient_noise_format_files if self.room in f - ] - assert len(room_ambient_noise_file) < 2 - if room_ambient_noise_file: - return room_ambient_noise_file[0] - else: - return random.choice(ambient_noise_format_files) - - def add_background(self): + def add_background(self, use_room_ambient_noise=True): """ Adds a background event to the soundscape. This method sets fixed values for event time, duration, and SNR, and adds the event to the background events list. """ label = None + source_file = None + source_time = None snr = ("const", 0) role = "background" pitch_shift = None @@ -210,18 +163,14 @@ def add_background(self): event_duration = ("const", self.duration) event_position = None - if self.use_room_ambient_noise: - source_file = self.get_path_to_room_ambient_noise() + if use_room_ambient_noise: + source_files = self.room.get_ambient_noise_paths() + source_file = random.choice(source_files) ambient_noise_duration = librosa.get_duration(path=source_file) if ambient_noise_duration > self.duration: source_time = round( random.uniform(0, ambient_noise_duration - self.duration) ) - else: - source_time = None - else: - source_file = None - source_time = None self.bg_events.append( Event( @@ -335,7 +284,7 @@ def add_event( self.speed_limit, ) else: - xyz_min, xyz_max = self._get_room_min_max() + xyz_min, xyz_max = self.room.get_boundaries() event_position_ = [self._gen_xyz(xyz_min, xyz_max)] if snr[0] == "uniform" and len(snr) == 3: @@ -425,18 +374,6 @@ def _gen_xyz(self, xyz_min, xyz_max): xyz.append(random.uniform(xyz_min[i], xyz_max[i])) return xyz - def _get_room_min_max(self): - """ - Determines the minimum and maximum XYZ coordinates for the current room setup. - - Returns: - tuple: A tuple containing the minimum and maximum XYZ coordinates for the room. - """ - all_xyz = self.get_room_irs_xyz() - xyz_min = all_xyz.min(axis=0) - xyz_max = all_xyz.max(axis=0) - return xyz_min, xyz_max - def generate_end_point( self, xyz_start, xyz_min, xyz_max, speed_limit, event_duration ): @@ -516,7 +453,7 @@ def define_trajectory( if all(trajectory_params[1:]): xyz_min, xyz_max = trajectory_params[1:] else: - xyz_min, xyz_max = self._get_room_min_max() + xyz_min, xyz_max = self.room.get_boundaries() xyz_start = self._gen_xyz(xyz_min, xyz_max) xyz_end = self.generate_end_point( xyz_start, xyz_min, xyz_max, speed_limit, event_duration @@ -537,72 +474,9 @@ def define_position(self, position_params): if position_params: xyz_min, xyz_max = position_params else: - xyz_min, xyz_max = self._get_room_min_max() + xyz_min, xyz_max = self.room.get_boundaries() return [self._gen_xyz(xyz_min, xyz_max)] - def get_room_irs_xyz(self): - """ - Retrieves the XYZ coordinates of impulse response positions in the room. - - Returns: - numpy.ndarray: An array of XYZ coordinates for the impulse response positions. - """ - if self.format == "foa" and self.room == "metu": - raise ValueError( - '"metu" room is currently only supported in mic (tetrahedral) format. please check again soon.' - ) - room_sofa_path = os.path.join( - self.rir_dir, - __SPATIAL_SCAPER_RIRS_DIR__, - __ROOM_RIR_FILE__[self.room].format(fmt=self.format), - ) - return load_pos(room_sofa_path, doas=False) - - def get_room_irs_wav_xyz(self, wav=True, pos=True): - """ - Retrieves impulse responses and their positions for the room. - - Args: - wav (bool): Whether to include the waveforms of the impulse responses. - pos (bool): Whether to include the positions of the impulse responses. - - Returns: - tuple: A tuple containing the impulse responses, their sampling rate, and their XYZ positions. - """ - if self.format == "foa" and self.room == "metu": - raise ValueError( - '"metu" room is currently only supported in mic (tetrahedral) format. please check again soon.' - ) - room_sofa_path = os.path.join( - self.rir_dir, - __SPATIAL_SCAPER_RIRS_DIR__, - __ROOM_RIR_FILE__[self.room].format(fmt=self.format), - ) - all_irs, ir_sr, all_ir_xyzs = load_rir_pos(room_sofa_path, doas=False) - ir_sr = ir_sr.data[0] - all_irs = all_irs.data - all_ir_xyzs = all_ir_xyzs.data - if ir_sr != self.sr: - all_irs = librosa.resample(all_irs, orig_sr=ir_sr, target_sr=self.sr) - ir_sr = self.sr - return all_irs, ir_sr, all_ir_xyzs - - def get_format_irs(self, all_irs, fmt="mic"): - """ - Retrieves impulse responses according to the specified format. - - Args: - all_irs (numpy.ndarray): Array of all impulse responses. - fmt (str): The format for retrieving impulse responses (e.g., 'mic'). - - Returns: - numpy.ndarray: An array of impulse responses formatted according to the specified format. - """ - if fmt == "mic" and self.room == "metu": - return all_irs[:, [5, 9, 25, 21], :] - else: - return all_irs - def generate_noise(self, event): """ Generates noise to be used as background ambient. @@ -768,8 +642,7 @@ def generate(self, audiopath, labelpath): and that the output audio and labels are accurately saved for further use or analysis. """ - all_irs, ir_sr, all_ir_xyzs = self.get_room_irs_wav_xyz() - all_irs = self.get_format_irs(all_irs) + all_irs, ir_sr, all_ir_xyzs = self.room.get_irs(self.sr) self.nchans = all_irs.shape[1] # a bit ugly but works for now # initialize output audio array diff --git a/spatialscaper/room.py b/spatialscaper/room.py new file mode 100644 index 0000000..857893e --- /dev/null +++ b/spatialscaper/room.py @@ -0,0 +1,144 @@ +import os +import glob + +import librosa + +# Local application/library specific imports +from .sofa_utils import load_rir_pos, load_pos + + +# Paths for room SOFA files +__SPATIAL_SCAPER_RIRS_DIR__ = "spatialscaper_RIRs" +__PATH_TO_AMBIENT_NOISE_FILES__ = os.path.join("source_data", "TAU-SNoise_DB") +__ROOM_RIR_FILE__ = { + "metu": "metu_sparg_{fmt}.sofa", + "arni": "arni_{fmt}.sofa", + "bomb_shelter": "bomb_shelter_{fmt}.sofa", + "gym": "gym_{fmt}.sofa", + "pb132": "pb132_{fmt}.sofa", + "pc226": "pc226_{fmt}.sofa", + "sa203": "sa203_{fmt}.sofa", + "sc203": "sc203_{fmt}.sofa", + "se203": "se203_{fmt}.sofa", + "tb103": "tb103_{fmt}.sofa", + "tc352": "tc352_{fmt}.sofa", +} + + +class BaseRoom: + """ + Initialize a Room object. + + A Room encapsulates the spatial and acoustic characteristics available of a physical room. + This includes a collection of impulse response measurements taken at different positions in + the room. + """ + + def __init__(self) -> None: + pass + + def get_ambient_noise_paths(self): + """ + Retrieves paths to ambient noise audio files specific to this room. + + Returns: + list[str]: A list of audio paths. + """ + raise NotImplementedError + + def get_positions(self): + """ + Retrieves the XYZ coordinates of impulse response positions in the room. + + Returns: + numpy.ndarray: An array of XYZ coordinates for the impulse response positions. + """ + raise NotImplementedError + + def get_irs(self, format=True): + """ + Retrieves impulse responses and their positions for the room. + + Args: + wav (bool): Whether to include the waveforms of the impulse responses. + pos (bool): Whether to include the positions of the impulse responses. + + Returns: + tuple: A tuple containing the impulse responses, their sampling rate, and their XYZ positions. + """ + raise NotImplementedError + + def get_boundaries(self): + """ + Determines the minimum and maximum XYZ coordinates for the current room setup. + + Returns: + tuple: A tuple containing the minimum and maximum XYZ coordinates for the room. + """ + all_xyz = self.get_positions() + xyz_min = all_xyz.min(axis=0) + xyz_max = all_xyz.max(axis=0) + return xyz_min, xyz_max + + +class SOFARoom(BaseRoom): + def __init__(self, rir_dir, room, fmt): + self.rir_dir = rir_dir + self.room = room + self.format = fmt + + @property + def sofa_path(self): + """Path to the SOFA file for this room.""" + return os.path.join( + self.rir_dir, + __SPATIAL_SCAPER_RIRS_DIR__, + __ROOM_RIR_FILE__[self.room].format(fmt=self.format), + ) + + def get_ambient_noise_paths(self): + # list files from disk + path_to_ambient_noise_files = os.path.join( + self.rir_dir, __PATH_TO_AMBIENT_NOISE_FILES__ + ) + ambient_noise_format_files = glob.glob( + os.path.join(path_to_ambient_noise_files, "*", "*") + ) + + # translate format + fmt = self.format + if self.format == "mic": + fmt = "tetra" + room = self.room + if self.room == "bomb_shelter": + room = "bomb_center" + + # filter files + ambient_noise_format_files = [ + f for f in ambient_noise_format_files if fmt in os.path.basename(f) + ] + room_ambient_noise_files = [ + f for f in ambient_noise_format_files if room in os.path.basename(f) + ] + + # assert len(room_ambient_noise_files) < 2 + return room_ambient_noise_files or ambient_noise_format_files + + def get_positions(self): + return load_pos(self.sofa_path, doas=False) + + def get_irs(self, sr=None, format=True): + all_irs, ir_sr, all_ir_xyzs = load_rir_pos(self.sofa_path, doas=False) + ir_sr = ir_sr.data[0] + all_irs = all_irs.data + all_ir_xyzs = all_ir_xyzs.data + if sr is not None and ir_sr != sr: + all_irs = librosa.resample(all_irs, orig_sr=ir_sr, target_sr=sr) + ir_sr = sr + return all_irs, ir_sr, all_ir_xyzs + + +def get_room(rir_dir, *a, **kw): + if isinstance(rir_dir, BaseRoom): + return rir_dir + return SOFARoom(rir_dir, *a, **kw)