|
9 | 9 | import simplejson as json |
10 | 10 | import sqlalchemy |
11 | 11 | from pendulum import now |
| 12 | +from singer_sdk.helpers._compat import ( |
| 13 | + date_fromisoformat, |
| 14 | + datetime_fromisoformat, |
| 15 | + time_fromisoformat, |
| 16 | +) |
12 | 17 | from singer_sdk.helpers._typing import ( |
13 | 18 | DatetimeErrorTreatmentEnum, |
| 19 | + get_datelike_property_type, |
| 20 | + handle_invalid_timestamp_in_record, |
14 | 21 | ) |
15 | 22 | from singer_sdk.sinks import SQLSink |
16 | 23 | from sqlalchemy.sql.expression import bindparam |
@@ -174,10 +181,64 @@ def _validate_and_parse(self, record: dict) -> dict: |
174 | 181 | except jsonschema_exceptions.ValidationError as e: |
175 | 182 | if self.logger: |
176 | 183 | self.logger.exception(f"Record failed validation: {record}") |
177 | | - raise e # noqa: RERAISES |
| 184 | + raise e # : RERAISES |
178 | 185 |
|
179 | 186 | return record |
180 | 187 |
|
| 188 | + def _parse_timestamps_in_record( |
| 189 | + self, |
| 190 | + record: dict, |
| 191 | + schema: dict, |
| 192 | + treatment: DatetimeErrorTreatmentEnum, |
| 193 | + ) -> None: |
| 194 | + """Parse strings to datetime.datetime values, repairing or erroring on failure. |
| 195 | +
|
| 196 | + Attempts to parse every field that is of type date/datetime/time. If its value |
| 197 | + is out of range, repair logic will be driven by the `treatment` input arg: |
| 198 | + MAX, NULL, or ERROR. |
| 199 | +
|
| 200 | + Args: |
| 201 | + record: Individual record in the stream. |
| 202 | + schema: TODO |
| 203 | + treatment: TODO |
| 204 | + """ |
| 205 | + for key, value in record.items(): |
| 206 | + if key not in schema["properties"]: |
| 207 | + self.logger.warning("No schema for record field '%s'", key) |
| 208 | + continue |
| 209 | + datelike_type = get_datelike_property_type(schema["properties"][key]) |
| 210 | + if datelike_type: |
| 211 | + date_val = value |
| 212 | + try: |
| 213 | + if value is not None: |
| 214 | + if datelike_type == "time": |
| 215 | + date_val = time_fromisoformat(date_val) |
| 216 | + elif datelike_type == "date": |
| 217 | + # Trim time value from date fields. |
| 218 | + if "T" in date_val: |
| 219 | + # Split on T and get the first part. |
| 220 | + date_val = date_val.split("T")[0] |
| 221 | + self.logger.warning( |
| 222 | + "Trimmed time value from date field '%s': %s", |
| 223 | + key, |
| 224 | + date_val, |
| 225 | + ) |
| 226 | + date_val = date_fromisoformat(date_val) |
| 227 | + else: |
| 228 | + date_val = datetime_fromisoformat(date_val) |
| 229 | + except ValueError as ex: |
| 230 | + date_val = handle_invalid_timestamp_in_record( |
| 231 | + record, |
| 232 | + [key], |
| 233 | + date_val, |
| 234 | + datelike_type, |
| 235 | + ex, |
| 236 | + treatment, |
| 237 | + self.logger, |
| 238 | + ) |
| 239 | + record[key] = date_val |
| 240 | + |
| 241 | + |
181 | 242 | def pre_validate_for_string_type( |
182 | 243 | record: dict, |
183 | 244 | schema: dict, |
|
0 commit comments