|
4 | 4 |
|
5 | 5 | from typing import Any, Iterable |
6 | 6 |
|
| 7 | +import jsonschema.exceptions as jsonschema_exceptions |
7 | 8 | import simplejson as json |
8 | 9 | import sqlalchemy |
9 | 10 | from pendulum import now |
@@ -133,3 +134,40 @@ def activate_version(self, new_version: int) -> None: |
133 | 134 | ) |
134 | 135 | with self.connector._connect() as conn, conn.begin(): # noqa: SLF001 |
135 | 136 | conn.execute(query) |
| 137 | + |
| 138 | + |
| 139 | + # Override record validation implementation to parse objects that can be stringified |
| 140 | + # into string fields, ie. numeric or JSON. |
| 141 | + def _validate_and_parse(self, record: dict) -> dict: |
| 142 | + """Validate or repair the record, parsing to python-native types as needed. |
| 143 | +
|
| 144 | + Args: |
| 145 | + record: Individual record in the stream. |
| 146 | +
|
| 147 | + Returns: |
| 148 | + Validated record. |
| 149 | + """ |
| 150 | + try: |
| 151 | + self._validator.validate(record) |
| 152 | + except jsonschema_exceptions.ValidationError as e: |
| 153 | + if "is not of type" in e.message and "'string'" in e.message: |
| 154 | + self.logger.warning( |
| 155 | + "Received non valid record for string type, " |
| 156 | + "attempting forced conversion to string", |
| 157 | + ) |
| 158 | + for key, value in record.items(): |
| 159 | + if isinstance(value, dict): |
| 160 | + record[key] = json.dumps(value) |
| 161 | + elif not isinstance(value, str): |
| 162 | + record[key] = str(value) |
| 163 | + self.logger.warning("Validating converted record") |
| 164 | + self._validator.validate(record) |
| 165 | + else: |
| 166 | + raise |
| 167 | + |
| 168 | + self._parse_timestamps_in_record( |
| 169 | + record=record, |
| 170 | + schema=self.schema, |
| 171 | + treatment=self.datetime_error_treatment, |
| 172 | + ) |
| 173 | + return record |
0 commit comments