@@ -309,7 +309,7 @@ def _sync_insert_candles_history(self, rows: typing.Iterable[list], column_names
309309 self ._register_updated_min_max (table , pa_table )
310310 self ._get_logger ().info (
311311 f"Successfully inserted { len (rows )} rows into "
312- f"{ TableNames .OHLCV_HISTORY .value } for { pa_table [ 'exchange_internal_name' ][ 0 ] } : { pa_table [ 'symbol' ][ 0 ] } : { pa_table [ 'time_frame' ][ 0 ] } "
312+ f"{ TableNames .OHLCV_HISTORY .value } : { self . _get_candles_summary ( pa_table ) } "
313313 )
314314
315315 async def _async_insert (
@@ -630,6 +630,26 @@ def _register_updated_min_max(self, table: pyiceberg.table.Table, update_table:
630630 update_table , self ._updated_min_max_per_symbol_per_time_frame_per_exchange
631631 )
632632
633+ def _get_candles_summary (self , table : pyarrow .Table ) -> dict [str , dict [str , int ]]:
634+ grouped_result = table .group_by (
635+ ["exchange_internal_name" , "symbol" , "time_frame" ]
636+ ).aggregate ([
637+ ("timestamp" , "count" ),
638+ ])
639+ summary = {}
640+ for exchange , symbol , time_frame , count in zip (
641+ grouped_result ['exchange_internal_name' ], grouped_result ['symbol' ], grouped_result ['time_frame' ], grouped_result ['timestamp_count' ]
642+ ):
643+ py_exchange = exchange .as_py () # type: ignore
644+ py_symbol = symbol .as_py () # type: ignore
645+ py_time_frame = time_frame .as_py () # type: ignore
646+ if py_exchange not in summary :
647+ summary [py_exchange ] = {}
648+ if py_symbol not in summary [py_exchange ]:
649+ summary [py_exchange ][py_symbol ] = {}
650+ summary [py_exchange ][py_symbol ][py_time_frame ] = count .as_py () # type: ignore
651+ return summary
652+
633653 @staticmethod
634654 def _update_min_max_per_symbol_per_time_frame_per_exchange_for_table (
635655 update_table : pyarrow .Table ,
0 commit comments