Skip to content

Commit 73ef80b

Browse files
Add recovery steps
1 parent 8c16d84 commit 73ef80b

File tree

5 files changed

+338
-5
lines changed

5 files changed

+338
-5
lines changed

repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Recover.scala

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,62 @@ import fs2.Stream
44
import cats.effect.Concurrent
55
import cats.effect.Timer
66
import blobstore.Path
7+
import blobstore.implicits.GetOps
8+
import io.circe.JsonObject
9+
import java.time.Instant
10+
import java.util.UUID
11+
import io.circe.generic.auto._, io.circe.parser._
12+
import cats.effect.Sync
13+
import cats.syntax.all._
714

815
object Recover {
916

17+
//TODO: hardcode proper GCS path
18+
val DeadQueuePath = Path("gs://sp-storage-loader-failed-inserts-dev1-com_snplow_eng_gcp/dead_queue")
19+
1020
def recoverFailedInserts[F[_]: Timer: Concurrent](resources: Resources[F]): Stream[F, Unit] =
11-
for {
12-
readFromGcs <- resources.store.get(Path("gs://foo/bar"), 1024)
13-
_ <- Stream.eval(resources.pubSubProducer.produce(recover(readFromGcs)))
14-
} yield ()
21+
resources
22+
.store
23+
.list(DeadQueuePath)
24+
.evalMap(resources.store.getContents)
25+
.map(recover)
26+
.evalMap {
27+
case Left(e) => Sync[F].pure(println(s"Error: $e")) // TODO: use logger
28+
case Right(read) => resources.pubSubProducer.produce(read) *> Sync[F].unit
29+
}
30+
31+
case class SimpleEventContainer(eventId: UUID, etlTstamp: Instant, payload: String)
32+
33+
// TODO: filter only time period that is needed (based on names in GCS bucket)
34+
// TODO: filter only failures that are due to invalid column
35+
// TODO: count successfuly recovered events (?)
36+
def recover: String => Either[String, EventContainer] =
37+
b =>
38+
stringToFailedInsertBadRow(b).map { ev =>
39+
val fixed = fix(ev.payload)
40+
EventContainer(ev.eventId, ev.etlTstamp, stringToJson(fixed))
41+
}
42+
43+
case class FailedInsert(schema: String, data: Data)
44+
case class Data(payload: String)
45+
46+
case class Payload(eventId: UUID, etlTstamp: Instant)
47+
48+
case class Combined(eventId: UUID, etlTstamp: Instant, payload: String)
49+
50+
def stringToFailedInsertBadRow: String => Either[String, Combined] =
51+
in => {
52+
val parse =
53+
for {
54+
raw <- decode[FailedInsert](in).map(_.data.payload)
55+
extra <- decode[Payload](raw)
56+
} yield Combined(extra.eventId, extra.etlTstamp, raw)
57+
58+
parse.left.map(_.toString)
59+
}
60+
61+
def fix: String => String = _.replaceAll("_%", "_percentage")
1562

16-
private def recover: Byte => EventContainer = ???
63+
def stringToJson: String => JsonObject = ???
1764

1865
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"schema": "iglu:com.snowplowanalytics.snowplow.badrows/loader_recovery_error/jsonschema/1-0-0",
3+
"data": {
4+
"processor": {
5+
"artifact": "snowplow-bigquery-repeater",
6+
"version": "0.6.0-rc7"
7+
},
8+
"failure": {
9+
"error": {
10+
"message": "no such field.",
11+
"location": "unstruct_event_com_snplow_eng_gcp_luke_test_percentage_1_0_0",
12+
"reason": "invalid"
13+
}
14+
},
15+
"payload": "{\"eventId\":\"90aebb1b-1d49-455b-9049-15ec72dfe5a9\",\"etlTstamp\":\"2020-11-10T16:56:39.283Z\",\"payload\":{\"geo_country\":\"DE\",\"event\":\"unstruct\",\"user_ipaddress\":\"18.194.133.57\",\"event_version\":\"1-0-0\",\"geo_city\":\"Frankfurt am Main\",\"platform\":\"srv\",\"event_id\":\"90aebb1b-1d49-455b-9049-15ec72dfe5a9\",\"etl_tstamp\":\"2020-11-10T16:56:39.283Z\",\"geo_latitude\":50.1188,\"v_collector\":\"ssc-2.0.0-googlepubsub\",\"collector_tstamp\":\"2020-11-10T16:56:37.401Z\",\"event_vendor\":\"com.snplow.eng.gcp\",\"network_userid\":\"70ec6f9e-cfe9-46c7-8b9c-fab280ea120e\",\"geo_region\":\"HE\",\"geo_timezone\":\"Europe/Berlin\",\"event_format\":\"jsonschema\",\"geo_zipcode\":\"60313\",\"useragent\":\"Go-http-client/2.0\",\"event_name\":\"luke_test_percentage\",\"dvce_created_tstamp\":\"2020-11-10T16:56:37.115Z\",\"dvce_sent_tstamp\":\"2020-11-10T16:56:37.117Z\",\"geo_longitude\":8.6843,\"v_tracker\":\"golang-2.3.0\",\"derived_tstamp\":\"2020-11-10T16:56:37.399Z\",\"app_id\":\"test_percentage_luke\",\"geo_region_name\":\"Hesse\",\"event_fingerprint\":\"b9f667befa875fcae1bedd55e463330e\",\"v_etl\":\"beam-enrich-1.4.1-common-1.4.1\",\"contexts_com_snowplowanalytics_snowplow_ua_parser_context_1_0_0\":[{\"device_family\":\"Other\",\"os_family\":\"Other\",\"useragent_family\":\"Go-http-client\",\"os_major\":null,\"os_minor\":null,\"os_patch\":null,\"os_patch_minor\":null,\"os_version\":\"Other\",\"useragent_major\":\"2\",\"useragent_minor\":\"0\",\"useragent_patch\":null,\"useragent_version\":\"Go-http-client 2.0\"}],\"contexts_com_iab_snowplow_spiders_and_robots_1_0_0\":[{\"category\":\"SPIDER_OR_ROBOT\",\"primary_impact\":\"UNKNOWN\",\"reason\":\"FAILED_UA_INCLUDE\",\"spider_or_robot\":true}],\"contexts_nl_basjes_yauaa_context_1_0_1\":[{\"device_class\":\"Unknown\",\"agent_build\":null,\"agent_class\":\"Special\",\"agent_information_email\":null,\"agent_information_url\":null,\"agent_language\":null,\"agent_language_code\":null,\"agent_name\":\"Go-http-client\",\"agent_name_version\":\"Go-http-client 2.0\",\"agent_name_version_major\":\"Go-http-client 2\",\"agent_security\":null,\"agent_uuid\":null,\"agent_version\":\"2.0\",\"agent_version_major\":\"2\",\"anonymized\":null,\"carrier\":null,\"device_brand\":\"Unknown\",\"device_cpu\":null,\"device_cpu_bits\":null,\"device_firmware_version\":null,\"device_name\":\"Unknown\",\"device_version\":null,\"facebook_carrier\":null,\"facebook_device_class\":null,\"facebook_device_name\":null,\"facebook_device_version\":null,\"facebook_fbop\":null,\"facebook_fbss\":null,\"facebook_operating_system_name\":null,\"facebook_operating_system_version\":null,\"g_sa_installation_id\":null,\"hacker_attack_vector\":null,\"hacker_toolkit\":null,\"i_e_compatibility_name_version\":null,\"i_e_compatibility_name_version_major\":null,\"i_e_compatibility_version\":null,\"i_e_compatibility_version_major\":null,\"kobo_affiliate\":null,\"kobo_platform_id\":null,\"layout_engine_build\":null,\"layout_engine_class\":\"Unknown\",\"layout_engine_name\":\"Unknown\",\"layout_engine_name_version\":\"Unknown ??\",\"layout_engine_name_version_major\":\"Unknown ??\",\"layout_engine_version\":\"??\",\"layout_engine_version_major\":\"??\",\"network_type\":null,\"operating_system_class\":\"Unknown\",\"operating_system_name\":\"Unknown\",\"operating_system_name_version\":\"Unknown ??\",\"operating_system_name_version_major\":\"Unknown ??\",\"operating_system_version\":\"??\",\"operating_system_version_build\":null,\"operating_system_version_major\":\"??\",\"webview_app_name\":null,\"webview_app_name_version_major\":null,\"webview_app_version\":null,\"webview_app_version_major\":null}],\"unstruct_event_com_snplow_eng_gcp_luke_test_percentage_1_0_0\":{\"availability_%\":\"10\",\"name\":\"test01\"}}}"
16+
}
17+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
{
2+
"eventId": "90aebb1b-1d49-455b-9049-15ec72dfe5a9",
3+
"etlTstamp": "2020-11-10T16:56:39.283Z",
4+
"payload": {
5+
"geo_country": "DE",
6+
"event": "unstruct",
7+
"user_ipaddress": "18.194.133.57",
8+
"event_version": "1-0-0",
9+
"geo_city": "Frankfurt am Main",
10+
"platform": "srv",
11+
"event_id": "90aebb1b-1d49-455b-9049-15ec72dfe5a9",
12+
"etl_tstamp": "2020-11-10T16:56:39.283Z",
13+
"geo_latitude": 50.1188,
14+
"v_collector": "ssc-2.0.0-googlepubsub",
15+
"collector_tstamp": "2020-11-10T16:56:37.401Z",
16+
"event_vendor": "com.snplow.eng.gcp",
17+
"network_userid": "70ec6f9e-cfe9-46c7-8b9c-fab280ea120e",
18+
"geo_region": "HE",
19+
"geo_timezone": "Europe/Berlin",
20+
"event_format": "jsonschema",
21+
"geo_zipcode": "60313",
22+
"useragent": "Go-http-client/2.0",
23+
"event_name": "luke_test_percentage",
24+
"dvce_created_tstamp": "2020-11-10T16:56:37.115Z",
25+
"dvce_sent_tstamp": "2020-11-10T16:56:37.117Z",
26+
"geo_longitude": 8.6843,
27+
"v_tracker": "golang-2.3.0",
28+
"derived_tstamp": "2020-11-10T16:56:37.399Z",
29+
"app_id": "test_percentage_luke",
30+
"geo_region_name": "Hesse",
31+
"event_fingerprint": "b9f667befa875fcae1bedd55e463330e",
32+
"v_etl": "beam-enrich-1.4.1-common-1.4.1",
33+
"contexts_com_snowplowanalytics_snowplow_ua_parser_context_1_0_0": [
34+
{
35+
"device_family": "Other",
36+
"os_family": "Other",
37+
"useragent_family": "Go-http-client",
38+
"os_major": null,
39+
"os_minor": null,
40+
"os_patch": null,
41+
"os_patch_minor": null,
42+
"os_version": "Other",
43+
"useragent_major": "2",
44+
"useragent_minor": "0",
45+
"useragent_patch": null,
46+
"useragent_version": "Go-http-client 2.0"
47+
}
48+
],
49+
"contexts_com_iab_snowplow_spiders_and_robots_1_0_0": [
50+
{
51+
"category": "SPIDER_OR_ROBOT",
52+
"primary_impact": "UNKNOWN",
53+
"reason": "FAILED_UA_INCLUDE",
54+
"spider_or_robot": true
55+
}
56+
],
57+
"contexts_nl_basjes_yauaa_context_1_0_1": [
58+
{
59+
"device_class": "Unknown",
60+
"agent_build": null,
61+
"agent_class": "Special",
62+
"agent_information_email": null,
63+
"agent_information_url": null,
64+
"agent_language": null,
65+
"agent_language_code": null,
66+
"agent_name": "Go-http-client",
67+
"agent_name_version": "Go-http-client 2.0",
68+
"agent_name_version_major": "Go-http-client 2",
69+
"agent_security": null,
70+
"agent_uuid": null,
71+
"agent_version": "2.0",
72+
"agent_version_major": "2",
73+
"anonymized": null,
74+
"carrier": null,
75+
"device_brand": "Unknown",
76+
"device_cpu": null,
77+
"device_cpu_bits": null,
78+
"device_firmware_version": null,
79+
"device_name": "Unknown",
80+
"device_version": null,
81+
"facebook_carrier": null,
82+
"facebook_device_class": null,
83+
"facebook_device_name": null,
84+
"facebook_device_version": null,
85+
"facebook_fbop": null,
86+
"facebook_fbss": null,
87+
"facebook_operating_system_name": null,
88+
"facebook_operating_system_version": null,
89+
"g_sa_installation_id": null,
90+
"hacker_attack_vector": null,
91+
"hacker_toolkit": null,
92+
"i_e_compatibility_name_version": null,
93+
"i_e_compatibility_name_version_major": null,
94+
"i_e_compatibility_version": null,
95+
"i_e_compatibility_version_major": null,
96+
"kobo_affiliate": null,
97+
"kobo_platform_id": null,
98+
"layout_engine_build": null,
99+
"layout_engine_class": "Unknown",
100+
"layout_engine_name": "Unknown",
101+
"layout_engine_name_version": "Unknown ??",
102+
"layout_engine_name_version_major": "Unknown ??",
103+
"layout_engine_version": "??",
104+
"layout_engine_version_major": "??",
105+
"network_type": null,
106+
"operating_system_class": "Unknown",
107+
"operating_system_name": "Unknown",
108+
"operating_system_name_version": "Unknown ??",
109+
"operating_system_name_version_major": "Unknown ??",
110+
"operating_system_version": "??",
111+
"operating_system_version_build": null,
112+
"operating_system_version_major": "??",
113+
"webview_app_name": null,
114+
"webview_app_name_version_major": null,
115+
"webview_app_version": null,
116+
"webview_app_version_major": null
117+
}
118+
],
119+
"unstruct_event_com_snplow_eng_gcp_luke_test_percentage_1_0_0": {
120+
"availability_%": "10",
121+
"name": "test01"
122+
}
123+
}
124+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
{
2+
"eventId": "90aebb1b-1d49-455b-9049-15ec72dfe5a9",
3+
"etlTstamp": "2020-11-10T16:56:39.283Z",
4+
"payload": {
5+
"geo_country": "DE",
6+
"event": "unstruct",
7+
"user_ipaddress": "18.194.133.57",
8+
"event_version": "1-0-0",
9+
"geo_city": "Frankfurt am Main",
10+
"platform": "srv",
11+
"event_id": "90aebb1b-1d49-455b-9049-15ec72dfe5a9",
12+
"etl_tstamp": "2020-11-10T16:56:39.283Z",
13+
"geo_latitude": 50.1188,
14+
"v_collector": "ssc-2.0.0-googlepubsub",
15+
"collector_tstamp": "2020-11-10T16:56:37.401Z",
16+
"event_vendor": "com.snplow.eng.gcp",
17+
"network_userid": "70ec6f9e-cfe9-46c7-8b9c-fab280ea120e",
18+
"geo_region": "HE",
19+
"geo_timezone": "Europe/Berlin",
20+
"event_format": "jsonschema",
21+
"geo_zipcode": "60313",
22+
"useragent": "Go-http-client/2.0",
23+
"event_name": "luke_test_percentage",
24+
"dvce_created_tstamp": "2020-11-10T16:56:37.115Z",
25+
"dvce_sent_tstamp": "2020-11-10T16:56:37.117Z",
26+
"geo_longitude": 8.6843,
27+
"v_tracker": "golang-2.3.0",
28+
"derived_tstamp": "2020-11-10T16:56:37.399Z",
29+
"app_id": "test_percentage_luke",
30+
"geo_region_name": "Hesse",
31+
"event_fingerprint": "b9f667befa875fcae1bedd55e463330e",
32+
"v_etl": "beam-enrich-1.4.1-common-1.4.1",
33+
"contexts_com_snowplowanalytics_snowplow_ua_parser_context_1_0_0": [
34+
{
35+
"device_family": "Other",
36+
"os_family": "Other",
37+
"useragent_family": "Go-http-client",
38+
"os_major": null,
39+
"os_minor": null,
40+
"os_patch": null,
41+
"os_patch_minor": null,
42+
"os_version": "Other",
43+
"useragent_major": "2",
44+
"useragent_minor": "0",
45+
"useragent_patch": null,
46+
"useragent_version": "Go-http-client 2.0"
47+
}
48+
],
49+
"contexts_com_iab_snowplow_spiders_and_robots_1_0_0": [
50+
{
51+
"category": "SPIDER_OR_ROBOT",
52+
"primary_impact": "UNKNOWN",
53+
"reason": "FAILED_UA_INCLUDE",
54+
"spider_or_robot": true
55+
}
56+
],
57+
"contexts_nl_basjes_yauaa_context_1_0_1": [
58+
{
59+
"device_class": "Unknown",
60+
"agent_build": null,
61+
"agent_class": "Special",
62+
"agent_information_email": null,
63+
"agent_information_url": null,
64+
"agent_language": null,
65+
"agent_language_code": null,
66+
"agent_name": "Go-http-client",
67+
"agent_name_version": "Go-http-client 2.0",
68+
"agent_name_version_major": "Go-http-client 2",
69+
"agent_security": null,
70+
"agent_uuid": null,
71+
"agent_version": "2.0",
72+
"agent_version_major": "2",
73+
"anonymized": null,
74+
"carrier": null,
75+
"device_brand": "Unknown",
76+
"device_cpu": null,
77+
"device_cpu_bits": null,
78+
"device_firmware_version": null,
79+
"device_name": "Unknown",
80+
"device_version": null,
81+
"facebook_carrier": null,
82+
"facebook_device_class": null,
83+
"facebook_device_name": null,
84+
"facebook_device_version": null,
85+
"facebook_fbop": null,
86+
"facebook_fbss": null,
87+
"facebook_operating_system_name": null,
88+
"facebook_operating_system_version": null,
89+
"g_sa_installation_id": null,
90+
"hacker_attack_vector": null,
91+
"hacker_toolkit": null,
92+
"i_e_compatibility_name_version": null,
93+
"i_e_compatibility_name_version_major": null,
94+
"i_e_compatibility_version": null,
95+
"i_e_compatibility_version_major": null,
96+
"kobo_affiliate": null,
97+
"kobo_platform_id": null,
98+
"layout_engine_build": null,
99+
"layout_engine_class": "Unknown",
100+
"layout_engine_name": "Unknown",
101+
"layout_engine_name_version": "Unknown ??",
102+
"layout_engine_name_version_major": "Unknown ??",
103+
"layout_engine_version": "??",
104+
"layout_engine_version_major": "??",
105+
"network_type": null,
106+
"operating_system_class": "Unknown",
107+
"operating_system_name": "Unknown",
108+
"operating_system_name_version": "Unknown ??",
109+
"operating_system_name_version_major": "Unknown ??",
110+
"operating_system_version": "??",
111+
"operating_system_version_build": null,
112+
"operating_system_version_major": "??",
113+
"webview_app_name": null,
114+
"webview_app_name_version_major": null,
115+
"webview_app_version": null,
116+
"webview_app_version_major": null
117+
}
118+
],
119+
"unstruct_event_com_snplow_eng_gcp_luke_test_percentage_1_0_0": {
120+
"availability_percentage": "10",
121+
"name": "test01"
122+
}
123+
}
124+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.snowplowanalytics.snowplow.storage.bigquery.repeater
2+
3+
import org.specs2.Specification
4+
import scala.io.Source
5+
6+
class RecoverSpec extends Specification {
7+
8+
def is =
9+
s2"""
10+
fix wrong column name $e1
11+
"""
12+
13+
def e1 = {
14+
val in = Source.fromResource("payload.json").mkString
15+
val recovered = Recover.fix(in)
16+
val out = Source.fromResource("payload_fixed.json").mkString
17+
18+
recovered ==== out
19+
}
20+
21+
}

0 commit comments

Comments
 (0)