Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8c16d84
Create the stub for readign from GCS and writing to pub/sub
lukeindykiewicz Nov 5, 2020
6ddd52e
Add recovery steps
lukeindykiewicz Nov 11, 2020
910fc78
Parse json and filter bucket
lukeindykiewicz Nov 13, 2020
717e053
Use values from config
lukeindykiewicz Nov 13, 2020
8f8f613
Bump to 0.6.2-rc1
lukeindykiewicz Nov 14, 2020
9d29238
Add more logging
lukeindykiewicz Nov 16, 2020
67ad2fb
Bump to 0.6.2-rc2
lukeindykiewicz Nov 16, 2020
d1cc980
Fix gcs path
lukeindykiewicz Nov 16, 2020
02a67af
Bump to 0.6.2-rc3
lukeindykiewicz Nov 16, 2020
d93a54e
Fix pubsub producer config
lukeindykiewicz Nov 16, 2020
6ef950a
Bump to 0.6.2-rc4
lukeindykiewicz Nov 16, 2020
c954749
Add more logging
lukeindykiewicz Nov 16, 2020
a34d24f
Bump to 0.6.2-rc5
lukeindykiewicz Nov 16, 2020
05ef0fb
Modify event version + use full column name
lukeindykiewicz Nov 17, 2020
0eb652a
Bump to 0.6.2-rc6
lukeindykiewicz Nov 17, 2020
7af2dad
Fix column name
chuwy Nov 18, 2020
230655c
Bump to 0.6.2-rc7
chuwy Nov 18, 2020
e63f94b
Bring event_version fix back
chuwy Nov 18, 2020
644b130
[WIP] Common: migrate from Travis to GH actions (close #142)
chuwy Nov 18, 2020
cca8c58
[DELETEME] Keep only repeater publishing
chuwy Nov 18, 2020
ae69e45
Bump to 0.6.2-rc8
chuwy Nov 18, 2020
ba50264
Switch to contexts
chuwy Nov 18, 2020
06d570d
Make column names configurable
chuwy Nov 18, 2020
a064ce1
Bump to 0.6.2-rc9
chuwy Nov 18, 2020
b765359
Implement counters and additional sink, fix concatenated file
chuwy Nov 18, 2020
a98be19
Bump to 0.6.2-rc10
chuwy Nov 18, 2020
25380d8
Fix a file pipe
chuwy Nov 18, 2020
5c3a933
Bump to 0.6.2-rc11
chuwy Nov 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ lazy val repeater = project
Dependencies.gcs,
Dependencies.pubsub,
Dependencies.pubsubFs2Grpc,
Dependencies.blobstoreCore,
Dependencies.blobstoreGcs,
Dependencies.slf4j,
Dependencies.catsEffect,
Dependencies.circeLiteral,
Expand Down
33 changes: 18 additions & 15 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ object Dependencies {
val fs2 = "2.4.2"
val httpClient = "0.21.6"
val logging = "1.1.1"
val blobstore = "0.7.3"

/**
* After 1.102.0 the Google Cloud Java SDK versions diverge.
Expand Down Expand Up @@ -88,21 +89,23 @@ object Dependencies {
val googleOauth = "com.google.oauth-client" % "google-oauth-client" % V.googleOauth

// Scala third-party
val cats = "org.typelevel" %% "cats-core" % V.cats
val catsEffect = "org.typelevel" %% "cats-effect" % V.catsEffect
val circe = "io.circe" %% "circe-core" % V.circe
val circeJawn = "io.circe" %% "circe-jawn" % V.circe
val circeLiteral = "io.circe" %% "circe-literal" % V.circe
val circeParser = "io.circe" %% "circe-parser" % V.circe
val decline = "com.monovore" %% "decline" % V.decline
val fs2 = "co.fs2" %% "fs2-core" % V.fs2
val httpClient = "org.http4s" %% "http4s-async-http-client" % V.httpClient
val logging = "io.chrisdavenport" %% "log4cats-slf4j" % V.logging
val pubsubFs2 = "com.permutive" %% "fs2-google-pubsub-http" % V.pubsubFs2
val pubsubFs2Grpc = "com.permutive" %% "fs2-google-pubsub-grpc" % V.pubsubFs2
val scioBigQuery = "com.spotify" %% "scio-bigquery" % V.scio
val scioCore = "com.spotify" %% "scio-core" % V.scio
val scioRepl = "com.spotify" %% "scio-repl" % V.scio
val cats = "org.typelevel" %% "cats-core" % V.cats
val catsEffect = "org.typelevel" %% "cats-effect" % V.catsEffect
val circe = "io.circe" %% "circe-core" % V.circe
val circeJawn = "io.circe" %% "circe-jawn" % V.circe
val circeLiteral = "io.circe" %% "circe-literal" % V.circe
val circeParser = "io.circe" %% "circe-parser" % V.circe
val decline = "com.monovore" %% "decline" % V.decline
val fs2 = "co.fs2" %% "fs2-core" % V.fs2
val httpClient = "org.http4s" %% "http4s-async-http-client" % V.httpClient
val logging = "io.chrisdavenport" %% "log4cats-slf4j" % V.logging
val pubsubFs2 = "com.permutive" %% "fs2-google-pubsub-http" % V.pubsubFs2
val pubsubFs2Grpc = "com.permutive" %% "fs2-google-pubsub-grpc" % V.pubsubFs2
val scioBigQuery = "com.spotify" %% "scio-bigquery" % V.scio
val scioCore = "com.spotify" %% "scio-core" % V.scio
val scioRepl = "com.spotify" %% "scio-repl" % V.scio
val blobstoreCore = "com.github.fs2-blobstore" %% "core" % V.blobstore
val blobstoreGcs = "com.github.fs2-blobstore" %% "gcs" % V.blobstore

// Scala Snowplow
val analyticsSdk = "com.snowplowanalytics" %% "snowplow-scala-analytics-sdk" % V.analyticsSdk
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.snowplowanalytics.snowplow.storage.bigquery.repeater

import fs2.Stream
import cats.effect.Concurrent
import cats.effect.Timer
import blobstore.Path
import blobstore.implicits.GetOps
import io.circe.JsonObject
import java.time.Instant
import java.util.UUID
import io.circe.generic.auto._, io.circe.parser._
import cats.effect.Sync
import cats.syntax.all._

object Recover {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want a main() to be able to recover already existing bad rows with the percentage issue ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by ?

It’s added in Repeater.scala to main for the repeater
https://github.com/snowplow-incubator/snowplow-bigquery-loader/pull/139/files#diff-d0e5513b7035a18d3239b7e5b227c63eb1db4386df406d89c23b3b6e7e331cb5

Do you have something else in mind?

Copy link
Contributor

@benjben benjben Nov 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that there was 2 use cases:

  1. auto recover failed inserts live
  2. recover already existing bad rows

The main() in Repeater.scala does the first one but I struggle to see where 2) happens.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This job is only to for point 2). The main starts the Recover stream, which does the job. Do I miss something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that Repeater is a long running Scala app that reads from PubSub and writes to BQ. How will you make it run as a "batch" run to just read GCS ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I see it recover in repeater should only be called for current failed inserts, not for the whole history.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added stream to Repeater that will work next to the main flow. It will read the data from GCS and write to Pub/Sub. Kind of similar to what repeater does, but the other way round. It will read all the data that are in filtered buckets, recover them, write to pub/sub and that's it. Stream will stop. All other streams with the main flow will continue to work normally. I'm only adding functionality, not changing the existing one.

It's here: https://github.com/snowplow-incubator/snowplow-bigquery-loader/pull/139/files#diff-d0e5513b7035a18d3239b7e5b227c63eb1db4386df406d89c23b3b6e7e331cb5R55

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means that we will start the repeater and will expect it to go and try to recover failed inserts from weeks ago. I guess what is troubling me is that we want to use a long-running streaming app to perform a batch job (for this very use case).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why it's a one off job. There is nothing wrong in this solution, imho. We just add recovery feature to the app to not spin the new app and bother with all the infrastructure code and troubles spinning up something new. The long-term goal would be to use recovery project for this purpose, but currently it can not recover events in this part of the pipeline.


def recoverFailedInserts[F[_]: Timer: Concurrent](resources: Resources[F]): Stream[F, Unit] =
resources
.store
.list(resources.bucket)
.evalMap(resources.store.getContents)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with Store.getContents, do all the data need to fit into memory ? If yes, are we sure that data is not too big?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asumed that every single file should fit in memory without problems. Is that a wrong assumption?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can read the files line by line to be sure?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I can definitely. I just thought these files are pretty small. Thanks you both!

.map(recover)
.evalMap {
case Left(e) => Sync[F].pure(println(s"Error: $e")) // TODO: use logger
case Right(read) => resources.pubSubProducer.produce(read) *> Sync[F].unit
}

case class SimpleEventContainer(eventId: UUID, etlTstamp: Instant, payload: String)

// TODO: filter only time period that is needed (based on names in GCS bucket)
// TODO: filter only failures that are due to invalid column
// TODO: count successfuly recovered events (?)
def recover: String => Either[String, EventContainer] =
b =>
stringToFailedInsertBadRow(b).map { ev =>
val fixed = fix(ev.payload)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can potentially corrupt a lot of good data - if there are any vaules with _% it wil change them without a reason - it should operate only on keys.

EventContainer(ev.eventId, ev.etlTstamp, stringToJson(fixed))
}

case class FailedInsert(schema: String, data: Data)
case class Data(payload: String)

case class Payload(eventId: UUID, etlTstamp: Instant)

case class Combined(eventId: UUID, etlTstamp: Instant, payload: String)

def stringToFailedInsertBadRow: String => Either[String, Combined] =
in => {
val parse =
for {
raw <- decode[FailedInsert](in).map(_.data.payload)
extra <- decode[Payload](raw)
} yield Combined(extra.eventId, extra.etlTstamp, raw)

parse.left.map(_.toString)
}

def fix: String => String = _.replaceAll("_%", "_percentage")

def stringToJson: String => JsonObject = ???

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ object Repeater extends SafeIOApp {
.through[IO, Unit](Flow.sink(resources))
desparatesSink = Flow.dequeueDesperates(resources)
logging = Stream.awakeEvery[IO](5.minute).evalMap(_ => resources.updateLifetime *> resources.showStats)
_ <- Stream(bqSink, desparatesSink, logging).parJoin(
StreamConcurrency
)
recover = Recover.recoverFailedInserts(resources)
_ <- Stream(bqSink, desparatesSink, logging, recover).parJoin(StreamConcurrency)
} yield ()

process.compile.drain.attempt.flatMap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import fs2.concurrent.{Queue, SignallingRef}
import com.snowplowanalytics.snowplow.storage.bigquery.common.Config
import com.snowplowanalytics.snowplow.badrows.BadRow
import com.snowplowanalytics.snowplow.storage.bigquery.repeater.RepeaterCli.GcsPath
import com.permutive.pubsub.producer.PubsubProducer
import blobstore.Store
import blobstore.gcs.GcsStore
import com.google.cloud.storage.StorageOptions

/**
* Resources container, allowing to manipulate all acquired entities
Expand All @@ -53,7 +57,9 @@ class Resources[F[_]: Sync](
val backoffTime: Int,
val concurrency: Int,
val insertBlocker: Blocker,
val jobStartTime: Instant
val jobStartTime: Instant,
val pubSubProducer: PubsubProducer[F, EventContainer],
val store: Store[F]
) {
def logInserted: F[Unit] =
statistics.update(s => s.copy(inserted = s.inserted + 1))
Expand Down Expand Up @@ -90,11 +96,11 @@ object Resources {
}

/** Allocate all resources for an application */
def acquire[F[_]: ConcurrentEffect: Timer: Logger](
def acquire[F[_]: ContextShift: ConcurrentEffect: Timer: Logger](
cmd: RepeaterCli.ListenCommand
): Resource[F, Resources[F]] = {
// It's a function because blocker needs to be created as Resource
val initResources: F[Blocker => Resources[F]] = for {
val initResources: F[Blocker => PubsubProducer[F, EventContainer] => Resources[F]] = for {
transformed <- Config.transform[F](cmd.config).value
env <- Sync[F].fromEither(transformed)
bigQuery <- services.Database.getClient[F]
Expand All @@ -103,31 +109,36 @@ object Resources {
stop <- SignallingRef[F, Boolean](false)
statistics <- Ref[F].of[Statistics](Statistics.start)
concurrency <- Sync[F].delay(Runtime.getRuntime.availableProcessors * 16)
storage = StorageOptions.getDefaultInstance.getService
_ <- Logger[F].info(
s"Initializing Repeater from ${env.config.failedInserts} to ${env.config.tableId} with $concurrency streams"
)
jobStartTime <- Sync[F].delay(Instant.now())
} yield (b: Blocker) =>
new Resources(
bigQuery,
cmd.deadEndBucket,
env,
queue,
counter,
stop,
statistics,
cmd.bufferSize,
cmd.window,
cmd.backoff,
concurrency,
b,
jobStartTime
)
(p: PubsubProducer[F, EventContainer]) =>
new Resources(
bigQuery,
cmd.deadEndBucket,
env,
queue,
counter,
stop,
statistics,
cmd.bufferSize,
cmd.window,
cmd.backoff,
concurrency,
b,
jobStartTime,
p,
GcsStore(storage, b, List.empty)
)

val createBlocker = Sync[F].delay(Executors.newCachedThreadPool()).map(ExecutionContext.fromExecutorService)
for {
blocker <- Resource.make(createBlocker)(ec => Sync[F].delay(ec.shutdown())).map(Blocker.liftExecutionContext)
resources <- Resource.make(initResources.map(init => init.apply(blocker)))(release)
blocker <- Resource.make(createBlocker)(ec => Sync[F].delay(ec.shutdown())).map(Blocker.liftExecutionContext)
pubsubProducer <- services.PubSub.getProducer[F]
resources <- Resource.make(initResources.map(init => init(blocker)(pubsubProducer)))(release)
} yield resources
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@ import com.google.pubsub.v1.PubsubMessage
import cats.effect._
import cats.syntax.all._
import com.permutive.pubsub.consumer.grpc.{PubsubGoogleConsumer, PubsubGoogleConsumerConfig}
import com.permutive.pubsub.consumer.{ConsumerRecord, Model}
import com.permutive.pubsub.consumer.ConsumerRecord
import io.chrisdavenport.log4cats.Logger
import fs2.concurrent.Queue
import fs2.Stream

import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload}
import com.snowplowanalytics.snowplow.storage.bigquery.repeater.{EventContainer, Repeater}
import com.permutive.pubsub.producer.grpc.GooglePubsubProducer
import com.permutive.pubsub.producer.encoder.MessageEncoder
import com.permutive.pubsub.consumer
import com.permutive.pubsub.producer
import com.permutive.pubsub.producer.grpc.PubsubProducerConfig
import scala.concurrent.duration._

/** Module responsible for reading PubSub */
/** Module responsible for reading and writing PubSub */
object PubSub {

/** Read events from `failedInserts` topic */
Expand All @@ -37,8 +43,8 @@ object PubSub {
): Stream[F, ConsumerRecord[F, EventContainer]] =
PubsubGoogleConsumer.subscribe[F, EventContainer](
blocker,
Model.ProjectId(projectId),
Model.Subscription(subscription),
consumer.Model.ProjectId(projectId),
consumer.Model.Subscription(subscription),
(msg, err, ack, _) => callback[F](msg, err, ack, desperates),
PubsubGoogleConsumerConfig[F](onFailedTerminate = t => Logger[F].error(s"Terminating consumer due $t"))
)
Expand All @@ -49,4 +55,21 @@ object PubSub {
val badRow = BadRow.LoaderRecoveryError(Repeater.processor, failure, Payload.RawPayload(msg.toString))
desperates.enqueue1(badRow) >> ack
}

implicit val encoder: MessageEncoder[EventContainer] = new MessageEncoder[EventContainer] {
override def encode(a: EventContainer): Either[Throwable, Array[Byte]] = ???
}

//TODO: provide projectId and topic
def getProducer[F[_]: Concurrent: Timer: Logger] =
GooglePubsubProducer.of[F, EventContainer](
producer.Model.ProjectId("test-project"),
producer.Model.Topic("values"),
config = PubsubProducerConfig[F](
batchSize = 100,
delayThreshold = 100.millis,
onFailedTerminate = e => Sync[F].pure(println(s"Got error $e")) >> Sync[F].unit
)
)

}
17 changes: 17 additions & 0 deletions repeater/src/test/resources/failed_inserts.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.badrows/loader_recovery_error/jsonschema/1-0-0",
"data": {
"processor": {
"artifact": "snowplow-bigquery-repeater",
"version": "0.6.0-rc7"
},
"failure": {
"error": {
"message": "no such field.",
"location": "unstruct_event_com_snplow_eng_gcp_luke_test_percentage_1_0_0",
"reason": "invalid"
}
},
"payload": "{\"eventId\":\"90aebb1b-1d49-455b-9049-15ec72dfe5a9\",\"etlTstamp\":\"2020-11-10T16:56:39.283Z\",\"payload\":{\"geo_country\":\"DE\",\"event\":\"unstruct\",\"user_ipaddress\":\"18.194.133.57\",\"event_version\":\"1-0-0\",\"geo_city\":\"Frankfurt am Main\",\"platform\":\"srv\",\"event_id\":\"90aebb1b-1d49-455b-9049-15ec72dfe5a9\",\"etl_tstamp\":\"2020-11-10T16:56:39.283Z\",\"geo_latitude\":50.1188,\"v_collector\":\"ssc-2.0.0-googlepubsub\",\"collector_tstamp\":\"2020-11-10T16:56:37.401Z\",\"event_vendor\":\"com.snplow.eng.gcp\",\"network_userid\":\"70ec6f9e-cfe9-46c7-8b9c-fab280ea120e\",\"geo_region\":\"HE\",\"geo_timezone\":\"Europe/Berlin\",\"event_format\":\"jsonschema\",\"geo_zipcode\":\"60313\",\"useragent\":\"Go-http-client/2.0\",\"event_name\":\"luke_test_percentage\",\"dvce_created_tstamp\":\"2020-11-10T16:56:37.115Z\",\"dvce_sent_tstamp\":\"2020-11-10T16:56:37.117Z\",\"geo_longitude\":8.6843,\"v_tracker\":\"golang-2.3.0\",\"derived_tstamp\":\"2020-11-10T16:56:37.399Z\",\"app_id\":\"test_percentage_luke\",\"geo_region_name\":\"Hesse\",\"event_fingerprint\":\"b9f667befa875fcae1bedd55e463330e\",\"v_etl\":\"beam-enrich-1.4.1-common-1.4.1\",\"contexts_com_snowplowanalytics_snowplow_ua_parser_context_1_0_0\":[{\"device_family\":\"Other\",\"os_family\":\"Other\",\"useragent_family\":\"Go-http-client\",\"os_major\":null,\"os_minor\":null,\"os_patch\":null,\"os_patch_minor\":null,\"os_version\":\"Other\",\"useragent_major\":\"2\",\"useragent_minor\":\"0\",\"useragent_patch\":null,\"useragent_version\":\"Go-http-client 2.0\"}],\"contexts_com_iab_snowplow_spiders_and_robots_1_0_0\":[{\"category\":\"SPIDER_OR_ROBOT\",\"primary_impact\":\"UNKNOWN\",\"reason\":\"FAILED_UA_INCLUDE\",\"spider_or_robot\":true}],\"contexts_nl_basjes_yauaa_context_1_0_1\":[{\"device_class\":\"Unknown\",\"agent_build\":null,\"agent_class\":\"Special\",\"agent_information_email\":null,\"agent_information_url\":null,\"agent_language\":null,\"agent_language_code\":null,\"agent_name\":\"Go-http-client\",\"agent_name_version\":\"Go-http-client 2.0\",\"agent_name_version_major\":\"Go-http-client 2\",\"agent_security\":null,\"agent_uuid\":null,\"agent_version\":\"2.0\",\"agent_version_major\":\"2\",\"anonymized\":null,\"carrier\":null,\"device_brand\":\"Unknown\",\"device_cpu\":null,\"device_cpu_bits\":null,\"device_firmware_version\":null,\"device_name\":\"Unknown\",\"device_version\":null,\"facebook_carrier\":null,\"facebook_device_class\":null,\"facebook_device_name\":null,\"facebook_device_version\":null,\"facebook_fbop\":null,\"facebook_fbss\":null,\"facebook_operating_system_name\":null,\"facebook_operating_system_version\":null,\"g_sa_installation_id\":null,\"hacker_attack_vector\":null,\"hacker_toolkit\":null,\"i_e_compatibility_name_version\":null,\"i_e_compatibility_name_version_major\":null,\"i_e_compatibility_version\":null,\"i_e_compatibility_version_major\":null,\"kobo_affiliate\":null,\"kobo_platform_id\":null,\"layout_engine_build\":null,\"layout_engine_class\":\"Unknown\",\"layout_engine_name\":\"Unknown\",\"layout_engine_name_version\":\"Unknown ??\",\"layout_engine_name_version_major\":\"Unknown ??\",\"layout_engine_version\":\"??\",\"layout_engine_version_major\":\"??\",\"network_type\":null,\"operating_system_class\":\"Unknown\",\"operating_system_name\":\"Unknown\",\"operating_system_name_version\":\"Unknown ??\",\"operating_system_name_version_major\":\"Unknown ??\",\"operating_system_version\":\"??\",\"operating_system_version_build\":null,\"operating_system_version_major\":\"??\",\"webview_app_name\":null,\"webview_app_name_version_major\":null,\"webview_app_version\":null,\"webview_app_version_major\":null}],\"unstruct_event_com_snplow_eng_gcp_luke_test_percentage_1_0_0\":{\"availability_%\":\"10\",\"name\":\"test01\"}}}"
}
}
Loading