diff --git a/tools/analysis_2d/README.md b/tools/analysis_2d/README.md new file mode 100644 index 00000000..ff0b54b5 --- /dev/null +++ b/tools/analysis_2d/README.md @@ -0,0 +1,61 @@ +# analysis_2d + +It provides a framework to developers in `AWML` to add analyses for 2D annotations in T4dataset easily. +With this framework, developers don't need to generate any `info` files or rewrite their data loading for the dataset. +They only need to follow `AnalysisCallbackInterface` to add the analyses they are interested in. + +## Summary + +- [Support priority](https://github.com/tier4/AWML/blob/main/docs/design/autoware_ml_design.md#support-priority): Tier B +- Supported dataset + - [x] T4dataset + - [] NuScenes +- Other supported feature + - [x] Distribution of categories + - [x] Distribution of attributes in each category + - [ ] Distribution of sizes + - [ ] Add unit tests + +## Get started +### 1. Setup + +- Please follow the [installation tutorial](/docs/tutorial/tutorial_detection_3d.md)to set up the environment. +- Run docker + +```sh +docker run -it --rm --gpus all --shm-size=64g --name awml -p 6006:6006 -v $PWD/:/workspace -v $PWD/data:/workspace/data autoware-ml +``` + +### 2. Analysis +#### 2.1. Dataset analysis + +Make sure the dataset follows the [T4dataset format](https://github.com/tier4/tier4_perception_dataset/blob/main/docs/t4_format_3d_detailed.md), note that it doesn't need any `info` file + +```sh +# T4dataset (classification for traffic light) +python tools/analysis_2d/run.py --config_path autoware_ml/configs/classification2d/dataset/t4dataset/tlr_classifier_car.py --data_root_path data/t4dataset/ --out_dir data/t4dataset/analyses/ +``` + +## For developer + +1. Add a new analysis to inherit `AnalysisCallbackInterface` as a callback, and implement `run()`, for example, `tools/analysis_2d/callbacks/category_attribute.py` +2. Import the new analysis in `AnalysisRunner`, and add them to the list of `analysis_callbacks`, for example, + +```python +self.analysis_callbacks: List[AnalysisCallbackInterface] = [ + ... + CategoryAttributeAnalysisCallback( + out_path=self.out_path, + category_name='green', + analysis_dir='green_attributes', + remapping_classes=self.remapping_classes), + # This is the new CategoryAttributeAnalysisCallback + CategoryAttributeAnalysisCallback( + out_path=self.out_path, + category_name='red', + analysis_dir='red_attributes' + ), +] +``` + +## References diff --git a/tools/analysis_2d/__init__.py b/tools/analysis_2d/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tools/analysis_2d/analysis_runner.py b/tools/analysis_2d/analysis_runner.py new file mode 100644 index 00000000..4f771275 --- /dev/null +++ b/tools/analysis_2d/analysis_runner.py @@ -0,0 +1,107 @@ +from typing import Dict, List + +from t4_devkit import Tier4 + +from tools.analysis_2d.data_classes import ( + SampleData2D, +) +from tools.analysis_2d.utils import extract_tier4_sample_data +from tools.analysis_3d.analysis_runner import AnalysisRunner +from tools.analysis_3d.callbacks.callback_interface import AnalysisCallbackInterface +from tools.analysis_3d.callbacks.category import CategoryAnalysisCallback +from tools.analysis_3d.callbacks.category_attribute import CategoryAttributeAnalysisCallback + + +class AnalysisRunner2D(AnalysisRunner): + """Runner to run list of analyses for the selected dataset.""" + + def __init__( + self, + data_root_path: str, + config_path: str, + out_path: str, + ) -> None: + """ + :param data_root_path: Path where to save data. + :param config_path: Configuration path for a dataset. + :param out_path: Path where to save output. + """ + super().__init__(data_root_path, config_path, out_path) + + # Override remapping_classes for 2D analysis and callbacks + self.remapping_classes = self.config.class_mappings + self.analysis_callbacks: List[AnalysisCallbackInterface] = [ + CategoryAnalysisCallback(out_path=self.out_path, remapping_classes=self.remapping_classes), + CategoryAttributeAnalysisCallback( + out_path=self.out_path, + category_name="green", + analysis_dir="green_attributes", + remapping_classes=self.remapping_classes, + ), + CategoryAttributeAnalysisCallback( + out_path=self.out_path, + category_name="red", + analysis_dir="red_attributes", + remapping_classes=self.remapping_classes, + ), + CategoryAttributeAnalysisCallback( + out_path=self.out_path, + category_name="yellow", + analysis_dir="yellow_attributes", + remapping_classes=self.remapping_classes, + ), + CategoryAttributeAnalysisCallback( + out_path=self.out_path, + category_name="left,red", + analysis_dir="left_red_attributes", + remapping_classes=self.remapping_classes, + ), + CategoryAttributeAnalysisCallback( + out_path=self.out_path, + category_name="red,up_left", + analysis_dir="red_up_left_attributes", + remapping_classes=self.remapping_classes, + ), + CategoryAttributeAnalysisCallback( + out_path=self.out_path, + category_name="red,right", + analysis_dir="red_right_attributes", + remapping_classes=self.remapping_classes, + ), + CategoryAttributeAnalysisCallback( + out_path=self.out_path, + category_name="red,straight", + analysis_dir="red_straight_attributes", + remapping_classes=self.remapping_classes, + ), + CategoryAttributeAnalysisCallback( + out_path=self.out_path, + category_name="left,red,straight", + analysis_dir="left_red_straight_attributes", + remapping_classes=self.remapping_classes, + ), + CategoryAttributeAnalysisCallback( + out_path=self.out_path, + category_name="unknown", + analysis_dir="unknown_attributes", + remapping_classes=self.remapping_classes, + ), + ] + + def _extract_sample_data(self, t4: Tier4) -> Dict[str, SampleData2D]: + """ + Extract data for every sample. + :param t4: Tier4 interface. + :return: A dict of {sample token: SampleData}. + """ + sample_data = {} + for sample in t4.sample: + # Extract sample data + tier4_sample_data = extract_tier4_sample_data(sample=sample, t4=t4) + + # Convert to SampleData + sample_data[sample.token] = SampleData2D.create_sample_data( + sample_token=sample.token, + boxes=tier4_sample_data.boxes, + ) + return sample_data diff --git a/tools/analysis_2d/data_classes.py b/tools/analysis_2d/data_classes.py new file mode 100644 index 00000000..24b10cd7 --- /dev/null +++ b/tools/analysis_2d/data_classes.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import List + +from t4_devkit.dataclass import Box2D + +from tools.analysis_3d.data_classes import DetectionBox, SampleData + + +@dataclass(frozen=True) +class Detection2DBox(DetectionBox): + """2D boxes from detection.""" + + box: Box2D + + +@dataclass(frozen=True) +class SampleData2D(SampleData): + """Dataclass to save data for a sample, for example, 2D bounding boxes.""" + + sample_token: str + detection_boxes: List[Detection2DBox] + + @classmethod + def create_sample_data( + cls, + sample_token: str, + boxes: List[Box2D], + ) -> SampleData2D: + """ + Create a SampleData2D given the params. + :param sample_token: Sample token to represent a sample (lidar frame). + :param boxes: List of 2D bounding boxes for the given sample token. + """ + detection_2d_boxes = [Detection2DBox(box=box, attrs=box.semantic_label.attributes) for box in boxes] + + return cls( + sample_token=sample_token, + detection_boxes=detection_2d_boxes, + ) diff --git a/tools/analysis_2d/run.py b/tools/analysis_2d/run.py new file mode 100644 index 00000000..30652876 --- /dev/null +++ b/tools/analysis_2d/run.py @@ -0,0 +1,53 @@ +"""Script to compute analysis of T4 datasets.""" + +import argparse + +from mmengine.logging import print_log + +from tools.analysis_2d.analysis_runner import AnalysisRunner2D + + +def parse_args(): + """Add args and parse them through CLI.""" + parser = argparse.ArgumentParser(description="analysis of T4dataset in 2D") + parser.add_argument( + "--config_path", + type=str, + required=True, + help="config for T4dataset", + ) + parser.add_argument( + "--data_root_path", + type=str, + required=True, + help="specify the root path of dataset", + ) + parser.add_argument( + "-o", + "--out_dir", + type=str, + required=True, + help="output directory of info file", + ) + args = parser.parse_args() + return args + + +def main(): + """Main enrtypoint to run the Runner.""" + args = parse_args() + # Build AnalysesRunner + print_log("Building AnalysisRunner2D...", logger="current") + analysis_runner = AnalysisRunner2D( + data_root_path=args.data_root_path, + config_path=args.config_path, + out_path=args.out_dir, + ) + print_log("Built AnalysisRunner!") + + # Run AnalysesRunner + analysis_runner.run() + + +if __name__ == "__main__": + main() diff --git a/tools/analysis_2d/utils.py b/tools/analysis_2d/utils.py new file mode 100644 index 00000000..5aa37683 --- /dev/null +++ b/tools/analysis_2d/utils.py @@ -0,0 +1,135 @@ +from typing import List, Optional + +import mmengine +import numpy as np +import numpy.typing as npt +from data_classes import dataclass +from nptyping import NDArray +from t4_devkit import Tier4 +from t4_devkit.dataclass import Box2D +from t4_devkit.schema import CalibratedSensor, EgoPose, Log, Sample, SampleData, Scene +from t4_devkit.typing import CameraIntrinsicLike + + +@dataclass(frozen=True) +class Tier4SampleData: + """Data class to save a sample in the Nuscene format.""" + + pose_record: EgoPose + cs_record: CalibratedSensor + sd_record: SampleData + scene_record: Scene + log_record: Log + boxes: List[Box2D] + camera_path: str + e2g_r_mat: npt.NDArray[np.float64] + l2e_r_mat: npt.NDArray[np.float64] + e2g_t: npt.NDArray[np.float64] + l2e_t: npt.NDArray[np.float64] + camera_intrinsics: CameraIntrinsicLike + + +def get_camera_token(sample_rec: Sample) -> Optional[str]: + data_dict = sample_rec.data + for key in data_dict.keys(): + if "CAM" in key: + return data_dict[key] + return None + + +def extract_tier4_data(t4: Tier4, sample: Sample) -> tuple[ + EgoPose, + CalibratedSensor, + SampleData, + Scene, + Log, + list[Box2D], + str, + NDArray, + NDArray, + NDArray, + NDArray, + CameraIntrinsicLike, +]: + """ + Extract scenario data based on the Tier4 format given a sample record. + :param t4: Tier4 interface. + :param sample: A sample record. + :return: Tier4SampleData. + """ + camera_token = get_camera_token(sample) + if camera_token is None: + mmengine.print_log( + f"sample {sample.token} doesn't have camera", + ) + return + + sd_record: SampleData = t4.get("sample_data", camera_token) + cs_record: CalibratedSensor = t4.get("calibrated_sensor", sd_record.calibrated_sensor_token) + pose_record: EgoPose = t4.get("ego_pose", sd_record.ego_pose_token) + + camera_path, boxes, camera_intrinsics = t4.get_sample_data(camera_token, as_3d=False) + mmengine.check_file_exist(camera_path) + + scene_record: Scene = t4.get("scene", sample.scene_token) + log_record = t4.get("log", scene_record.log_token) + + l2e_t = cs_record.translation + e2g_t = pose_record.translation + l2e_r = cs_record.rotation + e2g_r = pose_record.rotation + l2e_r_mat = l2e_r.rotation_matrix + e2g_r_mat = e2g_r.rotation_matrix + return ( + pose_record, + cs_record, + sd_record, + scene_record, + log_record, + boxes, + camera_path, + e2g_r_mat, + l2e_r_mat, + e2g_t, + l2e_t, + camera_intrinsics, + ) + + +def extract_tier4_sample_data(t4: Tier4, sample: Sample) -> Optional[Tier4SampleData]: + """ + Extract scenario data based on the Tier4 format given a sample record. + :param t4: Tier4 interface. + :param sample: A sample record. + :return: Tier4SampleData. + """ + + ( + pose_record, + cs_record, + sd_record, + scene_record, + log_record, + boxes, + camera_path, + e2g_r_mat, + l2e_r_mat, + e2g_t, + l2e_t, + camera_intrinsics, + ) = extract_tier4_data(t4, sample) + + return Tier4SampleData( + pose_record=pose_record, + cs_record=cs_record, + sd_record=sd_record, + scene_record=scene_record, + log_record=log_record, + boxes=boxes, + camera_path=camera_path, + e2g_r_mat=e2g_r_mat, + l2e_r_mat=l2e_r_mat, + e2g_t=e2g_t, + l2e_t=l2e_t, + camera_intrinsics=camera_intrinsics, + ) diff --git a/tools/analysis_3d/analysis_runner.py b/tools/analysis_3d/analysis_runner.py index 3682084d..15387026 100644 --- a/tools/analysis_3d/analysis_runner.py +++ b/tools/analysis_3d/analysis_runner.py @@ -16,7 +16,7 @@ DatasetSplitName, LidarPoint, LidarSweep, - SampleData, + SampleData3D, ScenarioData, ) from tools.analysis_3d.split_options import SplitOptions @@ -46,7 +46,8 @@ def __init__( # Initialization self.config = Config.fromfile(self.config_path) self.out_path.mkdir(parents=True, exist_ok=True) - self.remapping_classes = self.config.name_mapping + # TODO (MasatoSaeki): When creating the base AnalysisRunner, remove this temporary fix. + self.remapping_classes = getattr(self.config, "name_mapping", None) self.max_sweeps = max_sweeps # Default callbacks to generate analyses @@ -101,11 +102,11 @@ def _get_dataset_scenario_names(self, dataset_version: str) -> Dict[str, List[st dataset_list_dict: Dict[str, List[str]] = yaml.safe_load(f) return dataset_list_dict - def _extract_sample_data(self, t4: Tier4) -> Dict[str, SampleData]: + def _extract_sample_data(self, t4: Tier4) -> Dict[str, SampleData3D]: """ Extract data for every sample. :param t4: Tier4 interface. - :return: A dict of {sample token: SampleData}. + :return: A dict of {sample token: SampleData3D}. """ sample_data = {} for sample in t4.sample: @@ -139,8 +140,8 @@ def _extract_sample_data(self, t4: Tier4) -> Dict[str, SampleData]: for lidar_sweep in lidar_sweep_info["lidar_sweeps"] ] - # Convert to SampleData - sample_data[sample.token] = SampleData.create_sample_data( + # Convert to SampleData3D + sample_data[sample.token] = SampleData3D.create_sample_data( sample_token=sample.token, boxes=tier4_sample_data.boxes, lidar_point=lidar_point, diff --git a/tools/analysis_3d/data_classes.py b/tools/analysis_3d/data_classes.py index b502b5d3..cb30e02e 100644 --- a/tools/analysis_3d/data_classes.py +++ b/tools/analysis_3d/data_classes.py @@ -1,14 +1,16 @@ from __future__ import annotations +from abc import ABC, abstractmethod from collections import defaultdict from dataclasses import dataclass, field -from typing import Any, Dict, List, NamedTuple, Optional +from typing import Any, Dict, List, NamedTuple, Optional, TypeVar import numpy as np import numpy.typing as npt from mmengine.logging import print_log -from t4_devkit import Tier4 as t4 -from t4_devkit.dataclass import Box3D +from t4_devkit.dataclass import Box3D, BoxLike + +SampleDataT = TypeVar("SampleDataT", bound="SampleData") class DatasetSplitName(NamedTuple): @@ -19,11 +21,18 @@ class DatasetSplitName(NamedTuple): @dataclass(frozen=True) -class Detection3DBox: +class DetectionBox: + """Base boxes from detection.""" + + box: BoxLike + attrs: List[str] + + +@dataclass(frozen=True) +class Detection3DBox(DetectionBox): """3D boxes from detection.""" box: Box3D - attrs: List[str] @dataclass(frozen=True) @@ -40,13 +49,11 @@ class LidarSweep: @dataclass(frozen=True) -class SampleData: - """Dataclass to save data for a sample, for example, 3D bounding boxes.""" +class SampleData(ABC): + """Dataclass to save data for a sample, for example, bounding boxes.""" sample_token: str - detection_3d_boxes: List[Detection3DBox] - lidar_point: Optional[LidarPoint] = None # Path to the lidar file - lidar_sweeps: Optional[List[LidarSweep]] = None # List of lidar sweeps + detection_boxes: List[DetectionBox] def get_category_attr_counts( self, @@ -61,14 +68,14 @@ def get_category_attr_counts( :return: A dict of {attribute name: total counts}. """ category_attr_counts: Dict[str, int] = defaultdict(int) - for detection_3d_box in self.detection_3d_boxes: - box_category_name = detection_3d_box.box.semantic_label.name + for detection_box in self.detection_boxes: + box_category_name = detection_box.box.semantic_label.name if remapping_classes is not None: # If no category found from the remapping, then it uses the original category name box_category_name = remapping_classes.get(box_category_name, box_category_name) if box_category_name == category_name: - for attr_name in detection_3d_box.attrs: + for attr_name in detection_box.attrs: category_attr_counts[attr_name] += 1 return category_attr_counts @@ -84,14 +91,39 @@ def get_category_counts( :return: A dict of {sample token: {category name: total counts}}. """ category_counts: Dict[str, int] = defaultdict(int) - for detection_3d_box in self.detection_3d_boxes: - box_category_name = detection_3d_box.box.semantic_label.name + for detection_box in self.detection_boxes: + box_category_name = detection_box.box.semantic_label.name if remapping_classes is not None: # If no category found from the remapping, then it uses the original category name box_category_name = remapping_classes.get(box_category_name, box_category_name) category_counts[box_category_name] += 1 return category_counts + @classmethod + @abstractmethod + def create_sample_data( + cls, + sample_token: str, + boxes: List[BaseBox], + **kwargs: Any, + ) -> SampleDataT: + """ + Create a SampleData given the params. + :param sample_token: Sample token to represent a sample (sensor frame). + :param boxes: List of bounding boxes for the given sample token. + """ + raise NotImplementedError + + +@dataclass(frozen=True) +class SampleData3D(SampleData): + """Dataclass to save data for a sample, for example, 3D bounding boxes.""" + + sample_token: str + detection_boxes: List[Detection3DBox] + lidar_point: Optional[LidarPoint] = None # Path to the lidar file + lidar_sweeps: Optional[List[LidarSweep]] = None # List of lidar sweeps + @classmethod def create_sample_data( cls, @@ -99,17 +131,17 @@ def create_sample_data( boxes: List[Box3D], lidar_point: Optional[LidarPoint] = None, lidar_sweeps: Optional[List[LidarSweep]] = None, - ) -> SampleData: + ) -> SampleData3D: """ Create a SampleData given the params. :param sample_token: Sample token to represent a sample (lidar frame). - :param detection_3d_boxes: List of 3D bounding boxes for the given sample token. + :param boxes: List of 3D bounding boxes for the given sample token. """ detection_3d_boxes = [Detection3DBox(box=box, attrs=box.semantic_label.attributes) for box in boxes] - return SampleData( + return cls( sample_token=sample_token, - detection_3d_boxes=detection_3d_boxes, + detection_boxes=detection_3d_boxes, lidar_sweeps=lidar_sweeps, lidar_point=lidar_point, ) @@ -125,7 +157,7 @@ class ScenarioData: def add_sample_data(self, sample_data: SampleData) -> None: """ Add a SampleData to ScenarioData. - :param sample_data: SampleData contains data for descripting a sample/lidar frame. + :param sample_data: SampleData contains data for descripting a sample/sensor frame. """ if sample_data.sample_token in self.sample_data: print_log(f"Found {sample_data.sample_token} in the data, replacing it...")