-
Notifications
You must be signed in to change notification settings - Fork 16
DRAFT failed inserts recovery #139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 2 commits
8c16d84
6ddd52e
910fc78
717e053
8f8f613
9d29238
67ad2fb
d1cc980
02a67af
d93a54e
6ef950a
c954749
a34d24f
05ef0fb
0eb652a
7af2dad
230655c
e63f94b
644b130
cca8c58
ae69e45
ba50264
06d570d
a064ce1
b765359
a98be19
25380d8
5c3a933
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| package com.snowplowanalytics.snowplow.storage.bigquery.repeater | ||
|
|
||
| import fs2.Stream | ||
| import cats.effect.Concurrent | ||
| import cats.effect.Timer | ||
| import blobstore.Path | ||
| import blobstore.implicits.GetOps | ||
| import io.circe.JsonObject | ||
| import java.time.Instant | ||
| import java.util.UUID | ||
| import io.circe.generic.auto._, io.circe.parser._ | ||
| import cats.effect.Sync | ||
| import cats.syntax.all._ | ||
|
|
||
| object Recover { | ||
|
|
||
| def recoverFailedInserts[F[_]: Timer: Concurrent](resources: Resources[F]): Stream[F, Unit] = | ||
| resources | ||
| .store | ||
| .list(resources.bucket) | ||
| .evalMap(resources.store.getContents) | ||
|
||
| .map(recover) | ||
| .evalMap { | ||
| case Left(e) => Sync[F].pure(println(s"Error: $e")) // TODO: use logger | ||
| case Right(read) => resources.pubSubProducer.produce(read) *> Sync[F].unit | ||
| } | ||
|
|
||
| case class SimpleEventContainer(eventId: UUID, etlTstamp: Instant, payload: String) | ||
|
|
||
| // TODO: filter only time period that is needed (based on names in GCS bucket) | ||
| // TODO: filter only failures that are due to invalid column | ||
| // TODO: count successfuly recovered events (?) | ||
| def recover: String => Either[String, EventContainer] = | ||
| b => | ||
| stringToFailedInsertBadRow(b).map { ev => | ||
| val fixed = fix(ev.payload) | ||
|
||
| EventContainer(ev.eventId, ev.etlTstamp, stringToJson(fixed)) | ||
| } | ||
|
|
||
| case class FailedInsert(schema: String, data: Data) | ||
| case class Data(payload: String) | ||
|
|
||
| case class Payload(eventId: UUID, etlTstamp: Instant) | ||
|
|
||
| case class Combined(eventId: UUID, etlTstamp: Instant, payload: String) | ||
|
|
||
| def stringToFailedInsertBadRow: String => Either[String, Combined] = | ||
| in => { | ||
| val parse = | ||
| for { | ||
| raw <- decode[FailedInsert](in).map(_.data.payload) | ||
| extra <- decode[Payload](raw) | ||
| } yield Combined(extra.eventId, extra.etlTstamp, raw) | ||
|
|
||
| parse.left.map(_.toString) | ||
| } | ||
|
|
||
| def fix: String => String = _.replaceAll("_%", "_percentage") | ||
|
|
||
| def stringToJson: String => JsonObject = ??? | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| { | ||
| "schema": "iglu:com.snowplowanalytics.snowplow.badrows/loader_recovery_error/jsonschema/1-0-0", | ||
| "data": { | ||
| "processor": { | ||
| "artifact": "snowplow-bigquery-repeater", | ||
| "version": "0.6.0-rc7" | ||
| }, | ||
| "failure": { | ||
| "error": { | ||
| "message": "no such field.", | ||
| "location": "unstruct_event_com_snplow_eng_gcp_luke_test_percentage_1_0_0", | ||
| "reason": "invalid" | ||
| } | ||
| }, | ||
| "payload": "{\"eventId\":\"90aebb1b-1d49-455b-9049-15ec72dfe5a9\",\"etlTstamp\":\"2020-11-10T16:56:39.283Z\",\"payload\":{\"geo_country\":\"DE\",\"event\":\"unstruct\",\"user_ipaddress\":\"18.194.133.57\",\"event_version\":\"1-0-0\",\"geo_city\":\"Frankfurt am Main\",\"platform\":\"srv\",\"event_id\":\"90aebb1b-1d49-455b-9049-15ec72dfe5a9\",\"etl_tstamp\":\"2020-11-10T16:56:39.283Z\",\"geo_latitude\":50.1188,\"v_collector\":\"ssc-2.0.0-googlepubsub\",\"collector_tstamp\":\"2020-11-10T16:56:37.401Z\",\"event_vendor\":\"com.snplow.eng.gcp\",\"network_userid\":\"70ec6f9e-cfe9-46c7-8b9c-fab280ea120e\",\"geo_region\":\"HE\",\"geo_timezone\":\"Europe/Berlin\",\"event_format\":\"jsonschema\",\"geo_zipcode\":\"60313\",\"useragent\":\"Go-http-client/2.0\",\"event_name\":\"luke_test_percentage\",\"dvce_created_tstamp\":\"2020-11-10T16:56:37.115Z\",\"dvce_sent_tstamp\":\"2020-11-10T16:56:37.117Z\",\"geo_longitude\":8.6843,\"v_tracker\":\"golang-2.3.0\",\"derived_tstamp\":\"2020-11-10T16:56:37.399Z\",\"app_id\":\"test_percentage_luke\",\"geo_region_name\":\"Hesse\",\"event_fingerprint\":\"b9f667befa875fcae1bedd55e463330e\",\"v_etl\":\"beam-enrich-1.4.1-common-1.4.1\",\"contexts_com_snowplowanalytics_snowplow_ua_parser_context_1_0_0\":[{\"device_family\":\"Other\",\"os_family\":\"Other\",\"useragent_family\":\"Go-http-client\",\"os_major\":null,\"os_minor\":null,\"os_patch\":null,\"os_patch_minor\":null,\"os_version\":\"Other\",\"useragent_major\":\"2\",\"useragent_minor\":\"0\",\"useragent_patch\":null,\"useragent_version\":\"Go-http-client 2.0\"}],\"contexts_com_iab_snowplow_spiders_and_robots_1_0_0\":[{\"category\":\"SPIDER_OR_ROBOT\",\"primary_impact\":\"UNKNOWN\",\"reason\":\"FAILED_UA_INCLUDE\",\"spider_or_robot\":true}],\"contexts_nl_basjes_yauaa_context_1_0_1\":[{\"device_class\":\"Unknown\",\"agent_build\":null,\"agent_class\":\"Special\",\"agent_information_email\":null,\"agent_information_url\":null,\"agent_language\":null,\"agent_language_code\":null,\"agent_name\":\"Go-http-client\",\"agent_name_version\":\"Go-http-client 2.0\",\"agent_name_version_major\":\"Go-http-client 2\",\"agent_security\":null,\"agent_uuid\":null,\"agent_version\":\"2.0\",\"agent_version_major\":\"2\",\"anonymized\":null,\"carrier\":null,\"device_brand\":\"Unknown\",\"device_cpu\":null,\"device_cpu_bits\":null,\"device_firmware_version\":null,\"device_name\":\"Unknown\",\"device_version\":null,\"facebook_carrier\":null,\"facebook_device_class\":null,\"facebook_device_name\":null,\"facebook_device_version\":null,\"facebook_fbop\":null,\"facebook_fbss\":null,\"facebook_operating_system_name\":null,\"facebook_operating_system_version\":null,\"g_sa_installation_id\":null,\"hacker_attack_vector\":null,\"hacker_toolkit\":null,\"i_e_compatibility_name_version\":null,\"i_e_compatibility_name_version_major\":null,\"i_e_compatibility_version\":null,\"i_e_compatibility_version_major\":null,\"kobo_affiliate\":null,\"kobo_platform_id\":null,\"layout_engine_build\":null,\"layout_engine_class\":\"Unknown\",\"layout_engine_name\":\"Unknown\",\"layout_engine_name_version\":\"Unknown ??\",\"layout_engine_name_version_major\":\"Unknown ??\",\"layout_engine_version\":\"??\",\"layout_engine_version_major\":\"??\",\"network_type\":null,\"operating_system_class\":\"Unknown\",\"operating_system_name\":\"Unknown\",\"operating_system_name_version\":\"Unknown ??\",\"operating_system_name_version_major\":\"Unknown ??\",\"operating_system_version\":\"??\",\"operating_system_version_build\":null,\"operating_system_version_major\":\"??\",\"webview_app_name\":null,\"webview_app_name_version_major\":null,\"webview_app_version\":null,\"webview_app_version_major\":null}],\"unstruct_event_com_snplow_eng_gcp_luke_test_percentage_1_0_0\":{\"availability_%\":\"10\",\"name\":\"test01\"}}}" | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we want a
main()to be able to recover already existing bad rows with the percentage issue ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you mean by ?
It’s added in Repeater.scala to main for the repeater
https://github.com/snowplow-incubator/snowplow-bigquery-loader/pull/139/files#diff-d0e5513b7035a18d3239b7e5b227c63eb1db4386df406d89c23b3b6e7e331cb5
Do you have something else in mind?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that there was 2 use cases:
The
main()inRepeater.scaladoes the first one but I struggle to see where 2) happens.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This job is only to for point 2). The main starts the Recover stream, which does the job. Do I miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that Repeater is a long running Scala app that reads from PubSub and writes to BQ. How will you make it run as a "batch" run to just read GCS ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I see it recover in repeater should only be called for current failed inserts, not for the whole history.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added stream to Repeater that will work next to the main flow. It will read the data from GCS and write to Pub/Sub. Kind of similar to what repeater does, but the other way round. It will read all the data that are in filtered buckets, recover them, write to pub/sub and that's it. Stream will stop. All other streams with the main flow will continue to work normally. I'm only adding functionality, not changing the existing one.
It's here: https://github.com/snowplow-incubator/snowplow-bigquery-loader/pull/139/files#diff-d0e5513b7035a18d3239b7e5b227c63eb1db4386df406d89c23b3b6e7e331cb5R55
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means that we will start the repeater and will expect it to go and try to recover failed inserts from weeks ago. I guess what is troubling me is that we want to use a long-running streaming app to perform a batch job (for this very use case).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's why it's a one off job. There is nothing wrong in this solution, imho. We just add recovery feature to the app to not spin the new app and bother with all the infrastructure code and troubles spinning up something new. The long-term goal would be to use recovery project for this purpose, but currently it can not recover events in this part of the pipeline.