diff --git a/.github/check_tag.sh b/.github/check_tag.sh new file mode 100755 index 00000000..838f8abd --- /dev/null +++ b/.github/check_tag.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +set -e + +tag=$1 + +project_version=$(sbt version -Dsbt.log.noformat=true | perl -ne 'print "$1\n" if /info.*(\d+\.\d+\.\d+[^\r\n]*)/' | tail -n 1 | tr -d '\n') + +if [[ "${tag}" = "${project_version}" ]]; then + echo "Tag version (${tag}) matches project version (${project_version}). Deploying!" +else + echo "Tag version (${tag}) doesn't match version in scala project (${project_version}). Aborting!" + exit 1 +fi diff --git a/.github/deploy_docker.sh b/.github/deploy_docker.sh new file mode 100755 index 00000000..4afc4ba1 --- /dev/null +++ b/.github/deploy_docker.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +set -e + +tag=$1 + +mkdir -p "$HOME/.docker" +file="$HOME/.docker/config.json" +docker_repo="snowplow-docker-registry.bintray.io" +cfg=$(curl -X GET -u${BINTRAY_SNOWPLOW_DOCKER_USER}:${BINTRAY_SNOWPLOW_DOCKER_API_KEY} "https://${docker_repo}/v2/auth") +echo '{"auths": '"$cfg"'}' > $file + + +project_version=$(sbt "project common" version -Dsbt.log.noformat=true | perl -ne 'print "$1\n" if /info.*(\d+\.\d+\.\d+[^\r\n]*)/' | tail -n 1 | tr -d '\n') +if [[ "${tag}" = *"${project_version}" ]]; then + echo "Building and publishing ${docker_repo}/snowplow/snowplow-bigquery-repeater:${tag} ($project_version version)" + sbt "project repeater" docker:publishLocal + docker push "${docker_repo}/snowplow/snowplow-bigquery-repeater:${tag}" +else + echo "Tag version '${tag}' doesn't match version in scala project ('${project_version}'). Aborting!" + exit 1 +fi diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 00000000..bc0eb05a --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,22 @@ +name: Test + +on: [push, pull_request] + +jobs: + deploy: + if: startsWith(github.ref, 'refs/tags/') + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Set up JDK 8 + uses: actions/setup-java@v1 + with: + java-version: 8 + - name: Run tests + run: sbt "project repeater" test + - name: Build and publish repeater + run: .github/deploy_docker.sh ${GITHUB_REF##*/} + env: + BINTRAY_SNOWPLOW_DOCKER_USER: ${{ secrets.BINTRAY_SNOWPLOW_DOCKER_USER }} + BINTRAY_SNOWPLOW_DOCKER_API_KEY: ${{ secrets.BINTRAY_SNOWPLOW_DOCKER_API_KEY }} + diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 62596e52..00000000 --- a/.travis.yml +++ /dev/null @@ -1,29 +0,0 @@ -language: scala -dist: trusty -services: -- docker -scala: -- 2.12.10 -jdk: -- oraclejdk8 -script: -- sbt test -before_deploy: -- pip install --user release-manager==0.3.0 -deploy: - - provider: script - skip_cleanup: true - script: release-manager --config ./.travis/release.yml --check-version --make-version --make-artifact --upload-artifact - on: - tags: true - - provider: script - skip_cleanup: true - script: ./.travis/deploy_docker.sh $TRAVIS_TAG - on: - tags: true -env: - global: - - secure: FpFfqcuDuxasw+P+fj62m/NRFWj9Dov74R/6KjdVVn4YdL1PTsqLc6sqttbcJsbobWX9B70LwKKhlYxyL1fWTx6zUe2owXAxtR0Bw2y0WNT6D4XKtfmJYWcVOSGjKJciURaTS9Hi4vypMp06buul+BguuerxW430I6Oo3xMNZF6MDgEJoS6NPL+rzg4l8aqgq+v9PF56ABcyvpHvLwKURz2FvyVE0rj3IJdetZsme/zRkqy+TXA8kGgX2ABgQxohI3kMgJ0sRxhDPN49Ziqjovwmxdg/DkXQJRRzbY5auI/sL0eOhU0ECEDzpfrQEoDBvfuapPEfd3CGfxgH/puXIs8cOQ4gH4NovUQVwdPkpSq/wCEM1j+w4ueSY4+XHRNTKwMYC9oe+YFFCqmAkF2gysYliMV/75ZXa4MMTiCWmKQZZHuSvFRz+tmIv9v1ey2EFc89LeihlxyaUVPsMRhNCg9NOlQ1VcXjaDFqW7Q0udPCIh/32tsoXtkMd7ZriYvG+KIJDZeRf3coYQ9n6SEhfpkizzT/rHcCCnV3iCuA4h2kUYmQkvztRzYGGdttpcOU4Y36teyXnwDv/cZ9FapPSiroydgj/08jcrgkvonzm2me4ddxR/38LmlrbcG2Q/7AhUj2cgICGLjys1OcjvrvZVsINF2u1kJ3qfH5ng6f2X8= - - secure: iaLlq1SC9zkAr5GJXCFi7bwhGs0PwkMv4ntehYZOM4HS+jA/uAfTZcVHAEHoCvkKSUpM0o4jZA5hJZM8xeYKl7aheePmFscDgJUSqcrgXlAzzBgrxn54CDy/ezmZfV2BM86e8kiJaWLSu+Znqr0gOjNtbUl5IUPImhGhhDMwyQYI/lXu3RI4kquAtbbwDPpNUR+1HQq1TLTBrq2meQifLah2nK6kAigu+c4P1ESDnciz8h/xTG8tN+z3pX5T2i+VZ5Qb0BChTxC2NPDGXg6ia5rzEmPO963xiIQdvDrXgBBy2uEmXQzS7AgERiGPtsiE/tlh9G2uJedGEdI2b6jJ0L7UFFj39F4vch28j6HUBfm/CVTzCiRWJq2kSp8exboDntP7kqi6pjClahIbvyUsLelkpmVWmj8E50qWK2e+5HZG5Saosuau3r1giRyJN8zkv/0taliTfZWZWKKLdWzb3EdP3sDrCE7ZIYMOCPqgzdviJ/U5AMrcN7QKmbD5C/wpL3ixrwBr16bYglgzrVrIAX973mYvKaIjlgYtNKGckGFyekRgfR0aOnKzVn3nBCg4ssLeMFJW1COHu8EdwGgiIJ7T62oVehqSA6QYeDFNi0UDtegSUpngLLQRhAJ8tgS2BGJNdZgC5A286GY/59vb9NjoTIvsYJ8FBRC+G3qc180= - - secure: chVAt7rsRVeHqUh09PaLUk3yDqyqPIhyi+349gnz0IBeJPT+5Zp6wncRq8uLDijwyk0Vi+metAxq6U0QOEe+ubuOFKiDnZtykcHCul5Bkka6SFHGbz3DBHmnqivOD4JbhuDkCv5iJbp64BlmWa0OlDrrZ1+7/RchVYvW8FcpYTmmQ4IRL4tC5LbaNGiqnCFaUMXVe/wE2c7PiPZnioZfAU1QcZ7UjFfn2R3FCDk9D+hAxM8YcOZ8CTNdosNkjygCWIOM5IU5FvCO4MkbxqX4yLeldaUZH4XMA7mZoNPGmbkZZHudjEdYedBGWQOv26nEOewfNnjb+3HPsLlgBNrodq/hFjCQpAcy3fLoI8bKt6Qns3YZXYF2zkzDgKcq2Y7TsW/pZN7kzqPwNK4oagMAVAzdeyW63w6uplhhD6LWOF2OMJS/QAabHtPx1D9/cbgyscUK75pCsKGUT8Pq6McHkL4WT2qe1ed2hkHspDQFpRleKr3ySeZz30vMBTtyZCM77IAntmAzqrVpG5aAQzxhgxzLBToezTPqlvz7xO9fopKYgKMAK4seUv0IkXraljjwYuznwkV6oR1gOK2K+86cX7yKxN29cECVzM+CrXtjNv8mEVG3N2j85z+a8/+MTIADE4vA3tF2rfBwOKaxPTAYXhFaVtqIjeOlKRjUvWtHgw8= - - secure: N0meVJgZVNyqUEVIZP2nN5s4rsyqcrRo+c8ddnEkZEkMd59wKx27zWyGcn+53GWA/yax/Z7SF+NNo/+sD2sZuRIBOpMTSa0Bhr07AEQ1oA4jHjoE9mWp3Y2pdj7McW7eBhJUVqlbl3SFn44qiKk7TL0DXCCUhBhVEdKZG3IA34jJdxPzrk0z/yT/EUZqzHqCHe6g3ugxTqH9nuDbHZnwPGrBBjhgdQnQMKNPnhflXyMZY24zIIeds2f6ppPjRM5idGBMK8vlqd88Vk5QXlguHlsi1P7P/vCP8Z35JWAbArqe11YKUxn5b0p2plaPFcEl/qvQ/rukwMX038o4+X1Az6dtSB7sFXjgtOV2WlVRlsgD9mqH4D58jD1LQLOTHYtISz18pL/oxZh3ELK+bkNHutVcVmwVOb2C5c2chMCWm0ra7Dvr0SJQgvQmZVkefmM+Veqy4dIfgsEb7wGshhT5T4DtHObI6xDXNRf06P9rOlr7yu5x2GXAqiIpO9FwWTE+35lgxp6Wqb5p9Oq7VL4hTjtfNsE7SBgZxuQjcMvmv6pc1zmrGA6BOQmPVZVtgIacLXUVtFkMZRPwdiei5ZB/+twxZpniSFqR/SGUEbBAtUk2MUasVjBwhVXhvDHGmo930bDMnk0WmIdvguxyAhwrHx86/7smKj+r0+invOPrNIQ= diff --git a/.travis/deploy_docker.sh b/.travis/deploy_docker.sh deleted file mode 100755 index 32567f45..00000000 --- a/.travis/deploy_docker.sh +++ /dev/null @@ -1,28 +0,0 @@ -#!/bin/bash - -tag=$1 - -file="${HOME}/.dockercfg" -docker_repo="snowplow-docker-registry.bintray.io" -curl -X GET \ - -u${BINTRAY_SNOWPLOW_DOCKER_USER}:${BINTRAY_SNOWPLOW_DOCKER_API_KEY} \ - https://${docker_repo}/v2/auth > $file - -cd $TRAVIS_BUILD_DIR - -project_version=$(sbt "project common" version -Dsbt.log.noformat=true | perl -ne 'print "$1\n" if /info.*(\d+\.\d+\.\d+[^\r\n]*)/' | tail -n 1 | tr -d '\n') -if [[ "${tag}" = *"${project_version}" ]]; then - sbt "project loader" docker:publishLocal - sbt "project streamloader" docker:publishLocal - sbt "project mutator" docker:publishLocal - sbt "project repeater" docker:publishLocal - sbt "project forwarder" docker:publishLocal - docker push "${docker_repo}/snowplow/snowplow-bigquery-mutator:${tag}" - docker push "${docker_repo}/snowplow/snowplow-bigquery-loader:${tag}" - docker push "${docker_repo}/snowplow/snowplow-bigquery-streamloader:${tag}" - docker push "${docker_repo}/snowplow/snowplow-bigquery-repeater:${tag}" - docker push "${docker_repo}/snowplow/snowplow-bigquery-forwarder:${tag}" -else - echo "Tag version '${tag}' doesn't match version in scala project ('${project_version}'). Aborting!" - exit 1 -fi diff --git a/.travis/release.yml b/.travis/release.yml deleted file mode 100644 index 2b6594c3..00000000 --- a/.travis/release.yml +++ /dev/null @@ -1,57 +0,0 @@ -# Required: local settings -local: - root_dir : <%= ENV['TRAVIS_BUILD_DIR'] %> - -# Required: deployment targets -targets: - - type : "bintray" - user : <%= ENV['BINTRAY_SNOWPLOW_GENERIC_USER'] %> - password : <%= ENV['BINTRAY_SNOWPLOW_GENERIC_API_KEY'] %> - -# Required: packages to be deployed -packages: - - repo : "snowplow-generic" - name : "snowplow-bigquery-loader" - user_org : "snowplow" - publish : true - - # Will attempt to overwrite a published entity if one is found - override : false - - # If the artifact already exists will determine whether or not - # to fail the release - continue_on_conflict : false - - # The version of this package - version : <%= ENV['TRAVIS_TAG'] %> - - # Required IF '--check-version' is passed: will assert that - # both versions are the same - build_version : <%= FUNC['sbt_version(.)'] %> - - build_commands: - - 'sbt "project loader" universal:packageBin' - - 'sbt "project mutator" universal:packageBin' - - 'sbt "project repeater" universal:packageBin' - - # Required: Artifact - artifacts: - # The artifact name is composed like so: - # {{prefix}}{{version}}{{suffix}}.zip - - prefix : "snowplow_bigquery_loader_" - suffix : ".zip" - type : "asis" - - # The binaries to put in the zip - binary_paths: - - loader/target/universal/snowplow-bigquery-loader-<%= ENV['TRAVIS_TAG'] %>.zip - - # The artifact name is composed like so: - # {{prefix}}{{version}}{{suffix}}.zip - - prefix : "snowplow_bigquery_mutator_" - suffix : ".zip" - type : "asis" - - # The binaries to put in the zip - binary_paths: - - mutator/target/universal/snowplow-bigquery-mutator-<%= ENV['TRAVIS_TAG'] %>.zip diff --git a/build.sbt b/build.sbt index 6bd54f77..b157ea77 100644 --- a/build.sbt +++ b/build.sbt @@ -100,6 +100,8 @@ lazy val repeater = project Dependencies.gcs, Dependencies.pubsub, Dependencies.pubsubFs2Grpc, + Dependencies.blobstoreCore, + Dependencies.blobstoreGcs, Dependencies.slf4j, Dependencies.catsEffect, Dependencies.circeLiteral, diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index 1527fa26..732afacf 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -22,7 +22,7 @@ import sbtbuildinfo.BuildInfoKeys._ object BuildSettings { lazy val projectSettings = Seq( organization := "com.snowplowanalytics", - version := "0.6.1", + version := "0.6.2-rc11", scalaVersion := "2.13.2", buildInfoKeys := Seq[BuildInfoKey](organization, name, version, description, BuildInfoKey.action("userAgent") { s"${name.value}/${version.value}" diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 152a249a..24b56a83 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -43,6 +43,7 @@ object Dependencies { val fs2 = "2.4.2" val httpClient = "0.21.6" val logging = "1.1.1" + val blobstore = "0.7.3" /** * After 1.102.0 the Google Cloud Java SDK versions diverge. @@ -88,21 +89,23 @@ object Dependencies { val googleOauth = "com.google.oauth-client" % "google-oauth-client" % V.googleOauth // Scala third-party - val cats = "org.typelevel" %% "cats-core" % V.cats - val catsEffect = "org.typelevel" %% "cats-effect" % V.catsEffect - val circe = "io.circe" %% "circe-core" % V.circe - val circeJawn = "io.circe" %% "circe-jawn" % V.circe - val circeLiteral = "io.circe" %% "circe-literal" % V.circe - val circeParser = "io.circe" %% "circe-parser" % V.circe - val decline = "com.monovore" %% "decline" % V.decline - val fs2 = "co.fs2" %% "fs2-core" % V.fs2 - val httpClient = "org.http4s" %% "http4s-async-http-client" % V.httpClient - val logging = "io.chrisdavenport" %% "log4cats-slf4j" % V.logging - val pubsubFs2 = "com.permutive" %% "fs2-google-pubsub-http" % V.pubsubFs2 - val pubsubFs2Grpc = "com.permutive" %% "fs2-google-pubsub-grpc" % V.pubsubFs2 - val scioBigQuery = "com.spotify" %% "scio-bigquery" % V.scio - val scioCore = "com.spotify" %% "scio-core" % V.scio - val scioRepl = "com.spotify" %% "scio-repl" % V.scio + val cats = "org.typelevel" %% "cats-core" % V.cats + val catsEffect = "org.typelevel" %% "cats-effect" % V.catsEffect + val circe = "io.circe" %% "circe-core" % V.circe + val circeJawn = "io.circe" %% "circe-jawn" % V.circe + val circeLiteral = "io.circe" %% "circe-literal" % V.circe + val circeParser = "io.circe" %% "circe-parser" % V.circe + val decline = "com.monovore" %% "decline" % V.decline + val fs2 = "co.fs2" %% "fs2-core" % V.fs2 + val httpClient = "org.http4s" %% "http4s-async-http-client" % V.httpClient + val logging = "io.chrisdavenport" %% "log4cats-slf4j" % V.logging + val pubsubFs2 = "com.permutive" %% "fs2-google-pubsub-http" % V.pubsubFs2 + val pubsubFs2Grpc = "com.permutive" %% "fs2-google-pubsub-grpc" % V.pubsubFs2 + val scioBigQuery = "com.spotify" %% "scio-bigquery" % V.scio + val scioCore = "com.spotify" %% "scio-core" % V.scio + val scioRepl = "com.spotify" %% "scio-repl" % V.scio + val blobstoreCore = "com.github.fs2-blobstore" %% "core" % V.blobstore + val blobstoreGcs = "com.github.fs2-blobstore" %% "gcs" % V.blobstore // Scala Snowplow val analyticsSdk = "com.snowplowanalytics" %% "snowplow-scala-analytics-sdk" % V.analyticsSdk diff --git a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/EventContainer.scala b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/EventContainer.scala index 4eb77b5c..2a26afd0 100644 --- a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/EventContainer.scala +++ b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/EventContainer.scala @@ -26,6 +26,7 @@ import io.circe.parser.parse import com.snowplowanalytics.snowplow.badrows.BadRow import com.snowplowanalytics.snowplow.storage.bigquery.repeater.PayloadParser.ReconstructedEvent +import com.permutive.pubsub.producer.encoder.MessageEncoder /** * Primary data type for events parsed from `failedInserts` PubSub subscription @@ -64,6 +65,8 @@ object EventContainer { } yield payload } + implicit val pubsubEventEncoder: MessageEncoder[String] = s => Right(s.getBytes) + private def decomposeArray(arr: Vector[Json]): JList[Any] = arr.map(decomposeJson).asJava diff --git a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Recover.scala b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Recover.scala new file mode 100644 index 00000000..f428379a --- /dev/null +++ b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Recover.scala @@ -0,0 +1,143 @@ +package com.snowplowanalytics.snowplow.storage.bigquery.repeater + +import java.util.UUID + +import scala.concurrent.duration._ + +import fs2.{Pipe, Stream} + +import blobstore.Path +import blobstore.implicits.GetOps + +import cats.Monad +import cats.implicits._ +import cats.effect.{Concurrent, Timer} + +import io.chrisdavenport.log4cats.Logger + +import io.circe._ +import io.circe.parser._ +import io.circe.syntax._ + +import com.snowplowanalytics.snowplow.storage.bigquery.repeater.RepeaterCli.GcsPath +import com.snowplowanalytics.snowplow.storage.bigquery.repeater.services.Storage + +object Recover { + + val FileConcurrencyLevel = 12 // We don't want to have a big gap between first and last files + val ConcurrencyLevel = 32 // We have 30 lines in a file at most + + val GcsPrefix = "gs://" + + def preparePath: GcsPath => Path = + gcsPath => Path(GcsPrefix + gcsPath.bucket + "/" + gcsPath.path) + + def recoverFailedInserts[F[_]: Concurrent: Logger: Timer](resources: Resources[F]): Stream[F, Unit] = + for { + _ <- Stream.eval(Logger[F].info(s"Starting recovery stream.")) + _ <- Stream.eval(Logger[F].info(s"Resources for recovery: $resources")) + path = preparePath(resources.bucket) + _ <- Stream.eval(Logger[F].info(s"Path for recovery: $path")) + _ <- recoverStream(resources, path) + } yield () + + def recoverStream[F[_]: Concurrent: Timer: Logger](resources: Resources[F], path: Path): Stream[F, Unit] = + resources + .store + .list(path) + .filter(keepTimePeriod) + .map { path => + Stream.eval_[F, Unit](writeListingToLogs(path)) ++ + resources.store.get(path) + .through(fs2.text.utf8Decode) + .through(fs2.text.lines) + .evalTap(_ => resources.logTotal) + } + .parJoin(FileConcurrencyLevel) + .filter(keepInvalidColumnErrors) + .map(recover(resources.problematicContext, resources.fixedContext)) + .observeEither(badPipe(resources), goodPipe(resources)) + .void + + def goodPipe[F[_]: Logger: Concurrent](resources: Resources[F]): Pipe[F, IdAndEvent, Unit] = + _.parEvalMapUnordered(ConcurrencyLevel) { event => + resources.pubSubProducer.produce(event.event.noSpaces).flatMap { msgId => + Logger[F].info(s"Written ${event.id}: $msgId") + } *> resources.logRecovered + } + + def badPipe[F[_]: Logger: Concurrent: Timer](resources: Resources[F]): Pipe[F, FailedRecovery, Unit] = + in => in + .evalTap(_ => resources.logFailed) + .groupWithin(resources.bufferSize, resources.windowTime.seconds) + .parEvalMapUnordered(ConcurrencyLevel) { chunk => + for { + _ <- resources.counter.update(_ + 1) + i <- resources.counter.get + file = Storage.getFileName(resources.bucket.path + s"recovery-${generated.BuildInfo.version}/", i) + _ <- services.Storage.uploadJson(resources.bucket.bucket, file, chunk.map(_.asJson)) + } yield () + } + + case class IdAndEvent(id: UUID, event: Json) + + case class FailedRecovery(original: String, error: Error) + + implicit val failedRecoveryEncoder: Encoder[FailedRecovery] = + Encoder.instance { recovery => + Json.fromFields(List( + "original" -> recovery.original.asJson, + "error" -> recovery.error.show.asJson + )) + } + + /** Try to parse loader_recovery_error bad row and fix it, attaching event id */ + def recover(columnToFix: String, fixedColumn: String)(failed: String): Either[FailedRecovery, IdAndEvent] = { + val result = for { + doc <- parse(failed) + payload = doc.hcursor.downField("data").downField("payload") + quoted <- payload.as[String] + quotedParsed <- parse(quoted) + innerPayload <- quotedParsed.hcursor.downField("payload").as[Json] + eventId <- quotedParsed.hcursor.downField("eventId").as[UUID] + + fixedPayload <- fix(innerPayload, columnToFix, fixedColumn) + + } yield IdAndEvent(eventId, fixedPayload) + + result.leftMap { error => FailedRecovery(failed, error) } + } + + /** Fix *inner* `payload` property from loader_recovery_error bad row + * by replacing "availability_%" with "availability_percentage" keys + * in `ColumnToFix` column + */ + def fix(payload: Json, columnToFix: String, fixedColumn: String): Either[DecodingFailure, Json] = + for { + jsonObject <- payload.as[JsonObject].map(_.toMap) + fixed = jsonObject.map { + case (key, value) if key == columnToFix && value.isArray => + val fixedContexts = value.asArray.getOrElse(Vector.empty).map { context => + val fixedContext = context.asObject.map { hash => + val fixedHash = hash.toMap.map { + case ("availability_%", value) => ("availability_percentage", value) + case (key, value) => (key, value) + } + Json.fromFields(fixedHash) + } + fixedContext.getOrElse(context) + } + (fixedColumn, Json.fromValues(fixedContexts)) + case (key, value) => (key, value) + } + } yield Json.fromFields(fixed) + + def writeListingToLogs[F[_]: Monad: Logger](p: Path): F[Unit] = + Logger[F].info(s"File: ${p.fileName}, path: ${p.size.getOrElse(0L)}") + + def keepTimePeriod(p: Path): Boolean = + p.fileName.exists(_.startsWith("2020-11")) + + def keepInvalidColumnErrors(f: String): Boolean = + f.contains("no such field.") +} diff --git a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala index 9924b7cb..70445eee 100644 --- a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala +++ b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala @@ -51,9 +51,8 @@ object Repeater extends SafeIOApp { .through[IO, Unit](Flow.sink(resources)) desparatesSink = Flow.dequeueDesperates(resources) logging = Stream.awakeEvery[IO](5.minute).evalMap(_ => resources.updateLifetime *> resources.showStats) - _ <- Stream(bqSink, desparatesSink, logging).parJoin( - StreamConcurrency - ) + recover = Recover.recoverFailedInserts(resources) + _ <- Stream(bqSink, desparatesSink, logging, recover).parJoin(StreamConcurrency) } yield () process.compile.drain.attempt.flatMap { diff --git a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/RepeaterCli.scala b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/RepeaterCli.scala index a85e38a3..b2128bc6 100644 --- a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/RepeaterCli.scala +++ b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/RepeaterCli.scala @@ -68,6 +68,9 @@ object RepeaterCli { // TODO: factor out into a common module val verbose = Opts.flag("verbose", "Provide debug output").orFalse + val problematicContext = Opts.option[String]("problematicContext", "A context column name with '%' to recover") + val fixedContext = Opts.option[String]("fixedContext", "A context column name that problematic should be renamed to") + case class ListenCommand( config: EnvironmentConfig, failedInsertsSub: String, @@ -75,11 +78,13 @@ object RepeaterCli { // TODO: factor out into a common module verbose: Boolean, bufferSize: Int, window: Int, - backoff: Int + backoff: Int, + problematicContext: String, + fixedContext: String, ) val command = Command(generated.BuildInfo.name, generated.BuildInfo.description) { - (options, failedInsertsSub, deadEndBucket, verbose, bufferSize, window, backoffPeriod).mapN(ListenCommand.apply) + (options, failedInsertsSub, deadEndBucket, verbose, bufferSize, window, backoffPeriod, problematicContext, fixedContext).mapN(ListenCommand.apply) } def parse(args: Seq[String]) = command.parse(args) diff --git a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Resources.scala b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Resources.scala index bfda015b..f0b29579 100644 --- a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Resources.scala +++ b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Resources.scala @@ -29,6 +29,10 @@ import fs2.concurrent.{Queue, SignallingRef} import com.snowplowanalytics.snowplow.storage.bigquery.common.Config import com.snowplowanalytics.snowplow.badrows.BadRow import com.snowplowanalytics.snowplow.storage.bigquery.repeater.RepeaterCli.GcsPath +import com.permutive.pubsub.producer.PubsubProducer +import blobstore.Store +import blobstore.gcs.GcsStore +import com.google.cloud.storage.StorageOptions /** * Resources container, allowing to manipulate all acquired entities @@ -53,12 +57,24 @@ class Resources[F[_]: Sync]( val backoffTime: Int, val concurrency: Int, val insertBlocker: Blocker, - val jobStartTime: Instant + val jobStartTime: Instant, + val pubSubProducer: PubsubProducer[F, String], + val store: Store[F], + val problematicContext: String, + val fixedContext: String ) { def logInserted: F[Unit] = statistics.update(s => s.copy(inserted = s.inserted + 1)) def logAbandoned: F[Unit] = statistics.update(s => s.copy(desperates = s.desperates + 1)) + + def logRecovered: F[Unit] = + statistics.update(s => s.copy(recovered = s.recovered + 1)) + def logFailed: F[Unit] = + statistics.update(s => s.copy(failed = s.failed + 1)) + def logTotal: F[Unit] = + statistics.update(s => s.copy(total = s.total + 1)) + def updateLifetime: F[Unit] = for { now <- Sync[F].delay(Instant.now()) @@ -70,31 +86,51 @@ class Resources[F[_]: Sync]( statistics.get.flatMap(statistics => L.info(statistics.show)) } +class PartialResources[F[_]: Sync]( + val bigQuery: BigQuery, + val bucket: GcsPath, + val env: Config.Environment, + val desperates: Queue[F, BadRow], + val counter: Ref[F, Int], + val stop: SignallingRef[F, Boolean], + val statistics: Ref[F, Resources.Statistics], + val concurrency: Int, + val insertBlocker: Blocker, + val jobStartTime: Instant, + val store: Store[F] +) + object Resources { val QueueSize = 100 - case class Statistics(inserted: Int, desperates: Int, lifetime: Duration) + case class Statistics(inserted: Int, desperates: Int, lifetime: Duration, recovered: Int, failed: Int, total: Int) object Statistics { - val start = Statistics(0, 0, Duration(0, "millis")) + val start = Statistics(0, 0, Duration(0, "millis"), 0, 0, 0) implicit val showRepeaterStatistics: Show[Statistics] = { case s if (s.lifetime.toHours == 0) => - s"Statistics: ${s.inserted} rows inserted, ${s.desperates} rows rejected in ${s.lifetime.toMinutes} minutes." + s"Statistics: ${s.inserted} rows inserted, ${s.desperates} rows rejected, " + + s"${s.recovered} recovered, ${s.failed} failed to recover, ${s.total} total " + + s"in ${s.lifetime.toMinutes} minutes" case s if (s.lifetime.toHours > 24) => - s"Statistics: ${s.inserted} rows inserted, ${s.desperates} rows rejected in ${s.lifetime.toDays} days and ${s.lifetime.toHours - s.lifetime.toDays * 24} hours." + s"Statistics: ${s.inserted} rows inserted, ${s.desperates} rows rejected, " + + s"${s.recovered} recovered, ${s.failed} failed to recover, ${s.total} total " + + s"in ${s.lifetime.toDays} days and ${s.lifetime.toHours - s.lifetime.toDays * 24} hours" case s => - s"Statistics: ${s.inserted} rows inserted, ${s.desperates} rows rejected in ${s.lifetime.toHours} hours." + s"Statistics: ${s.inserted} rows inserted, ${s.desperates} rows rejected, " + + s"${s.recovered} recovered, ${s.failed} failed to recover, ${s.total} total " + + s"in ${s.lifetime.toHours} hours" } } /** Allocate all resources for an application */ - def acquire[F[_]: ConcurrentEffect: Timer: Logger]( + def acquire[F[_]: ContextShift: ConcurrentEffect: Timer: Logger]( cmd: RepeaterCli.ListenCommand ): Resource[F, Resources[F]] = { // It's a function because blocker needs to be created as Resource - val initResources: F[Blocker => Resources[F]] = for { + val initResources: F[Blocker => PartialResources[F]] = for { transformed <- Config.transform[F](cmd.config).value env <- Sync[F].fromEither(transformed) bigQuery <- services.Database.getClient[F] @@ -103,12 +139,13 @@ object Resources { stop <- SignallingRef[F, Boolean](false) statistics <- Ref[F].of[Statistics](Statistics.start) concurrency <- Sync[F].delay(Runtime.getRuntime.availableProcessors * 16) + storage <- Sync[F].delay(StorageOptions.getDefaultInstance.getService) _ <- Logger[F].info( s"Initializing Repeater from ${env.config.failedInserts} to ${env.config.tableId} with $concurrency streams" ) jobStartTime <- Sync[F].delay(Instant.now()) } yield (b: Blocker) => - new Resources( + new PartialResources( bigQuery, cmd.deadEndBucket, env, @@ -116,18 +153,36 @@ object Resources { counter, stop, statistics, - cmd.bufferSize, - cmd.window, - cmd.backoff, concurrency, b, - jobStartTime + jobStartTime, + GcsStore(storage, b, List.empty) ) val createBlocker = Sync[F].delay(Executors.newCachedThreadPool()).map(ExecutionContext.fromExecutorService) for { - blocker <- Resource.make(createBlocker)(ec => Sync[F].delay(ec.shutdown())).map(Blocker.liftExecutionContext) - resources <- Resource.make(initResources.map(init => init.apply(blocker)))(release) + blocker <- Resource.make(createBlocker)(ec => Sync[F].delay(ec.shutdown())).map(Blocker.liftExecutionContext) + pr <- Resource.make(initResources.map(init => init(blocker)))(release) + pubsubProducer <- services.PubSub.getProducer[F](pr.env.config.projectId, pr.env.config.failedInserts) + resources = new Resources( + pr.bigQuery, + cmd.deadEndBucket, + pr.env, + pr.desperates, + pr.counter, + pr.stop, + pr.statistics, + cmd.bufferSize, + cmd.window, + cmd.backoff, + pr.concurrency, + pr.insertBlocker, + pr.jobStartTime, + pubsubProducer, + pr.store, + cmd.problematicContext, + cmd.fixedContext + ) } yield resources } @@ -147,7 +202,7 @@ object Resources { /** Stop pulling data from Pubsub, flush despeates queue into GCS bucket */ def release[F[_]: ConcurrentEffect: Timer: Logger]( - env: Resources[F] + env: PartialResources[F] ): F[Unit] = { val flushDesperate = for { desperates <- pullRemaining(env.desperates) diff --git a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala index 9ea00ba5..5d1b736f 100644 --- a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala +++ b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala @@ -17,15 +17,21 @@ import com.google.pubsub.v1.PubsubMessage import cats.effect._ import cats.syntax.all._ import com.permutive.pubsub.consumer.grpc.{PubsubGoogleConsumer, PubsubGoogleConsumerConfig} -import com.permutive.pubsub.consumer.{ConsumerRecord, Model} +import com.permutive.pubsub.consumer.ConsumerRecord import io.chrisdavenport.log4cats.Logger import fs2.concurrent.Queue import fs2.Stream import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload} import com.snowplowanalytics.snowplow.storage.bigquery.repeater.{EventContainer, Repeater} +import com.permutive.pubsub.producer.grpc.GooglePubsubProducer +import com.permutive.pubsub.producer.encoder.MessageEncoder +import com.permutive.pubsub.consumer +import com.permutive.pubsub.producer +import com.permutive.pubsub.producer.grpc.PubsubProducerConfig +import scala.concurrent.duration._ -/** Module responsible for reading PubSub */ +/** Module responsible for reading and writing PubSub */ object PubSub { /** Read events from `failedInserts` topic */ @@ -37,8 +43,8 @@ object PubSub { ): Stream[F, ConsumerRecord[F, EventContainer]] = PubsubGoogleConsumer.subscribe[F, EventContainer]( blocker, - Model.ProjectId(projectId), - Model.Subscription(subscription), + consumer.Model.ProjectId(projectId), + consumer.Model.Subscription(subscription), (msg, err, ack, _) => callback[F](msg, err, ack, desperates), PubsubGoogleConsumerConfig[F](onFailedTerminate = t => Logger[F].error(s"Terminating consumer due $t")) ) @@ -49,4 +55,20 @@ object PubSub { val badRow = BadRow.LoaderRecoveryError(Repeater.processor, failure, Payload.RawPayload(msg.toString)) desperates.enqueue1(badRow) >> ack } + + implicit val messageEncoder: MessageEncoder[String] = { s => + Right(s.getBytes()) + } + + def getProducer[F[_]: Concurrent: Timer: Logger](projectId: String, failedInsertsTopic: String) = + GooglePubsubProducer.of[F, String]( + producer.Model.ProjectId(projectId), + producer.Model.Topic(failedInsertsTopic), + config = PubsubProducerConfig[F]( + batchSize = 100, + delayThreshold = 100.millis, + onFailedTerminate = e => Logger[F].error(s"Got error from producer: $e") >> Sync[F].unit + ) + ) + } diff --git a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/Storage.scala b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/Storage.scala index 11da2d98..f5ffb30c 100644 --- a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/Storage.scala +++ b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/Storage.scala @@ -15,13 +15,12 @@ package com.snowplowanalytics.snowplow.storage.bigquery.repeater.services import org.joda.time.{DateTime, DateTimeZone} import org.joda.time.format.DateTimeFormat import com.google.cloud.storage.{BlobInfo, StorageOptions} - import cats.effect.Sync import cats.implicits._ import io.chrisdavenport.log4cats.Logger import fs2.{Chunk, Stream, text} - import com.snowplowanalytics.snowplow.badrows.BadRow +import io.circe.Json object Storage { // TODO: this is certainly non-RT @@ -40,4 +39,13 @@ object Storage { Sync[F].delay(storage.create(blobInfo, content)) *> Logger[F].info(s"Written ${blobInfo.getName} of ${content.size} bytes") } + + def uploadJson[F[_]: Sync: Logger](bucketName: String, fileName: String, rows: Chunk[Json]): F[Unit] = { + val blobInfo = BlobInfo.newBuilder(bucketName, fileName).build() + val content = Stream.chunk(rows).map(_.noSpaces).intersperse("\n").through(text.utf8Encode).compile.to(Array) + + Logger[F].info(s"Preparing write to a failed insderts $fileName with ${rows.size} items") *> + Sync[F].delay(storage.create(blobInfo, content)) *> + Logger[F].info(s"Written ${blobInfo.getName} of ${content.size} bytes") + } } diff --git a/repeater/src/test/resources/failed_inserts.json b/repeater/src/test/resources/failed_inserts.json new file mode 100644 index 00000000..5aa993d5 --- /dev/null +++ b/repeater/src/test/resources/failed_inserts.json @@ -0,0 +1,17 @@ +{ + "schema": "iglu:com.snowplowanalytics.snowplow.badrows/loader_recovery_error/jsonschema/1-0-0", + "data": { + "processor": { + "artifact": "snowplow-bigquery-repeater", + "version": "0.6.0-rc7" + }, + "failure": { + "error": { + "message": "no such field.", + "location": "countexts_com_snplow_eng_gcp_luke_test_percentage_1_0_0", + "reason": "invalid" + } + }, + "payload": "{\"eventId\":\"90aebb1b-1d49-455b-9049-15ec72dfe5a9\",\"etlTstamp\":\"2020-11-10T16:56:39.283Z\",\"payload\":{\"geo_country\":\"DE\",\"event\":\"unstruct\",\"user_ipaddress\":\"18.194.133.57\",\"event_version\":\"1-0-0\",\"geo_city\":\"Frankfurt am Main\",\"platform\":\"srv\",\"event_id\":\"90aebb1b-1d49-455b-9049-15ec72dfe5a9\",\"etl_tstamp\":\"2020-11-10T16:56:39.283Z\",\"geo_latitude\":50.1188,\"v_collector\":\"ssc-2.0.0-googlepubsub\",\"collector_tstamp\":\"2020-11-10T16:56:37.401Z\",\"event_vendor\":\"com.snplow.eng.gcp\",\"network_userid\":\"70ec6f9e-cfe9-46c7-8b9c-fab280ea120e\",\"geo_region\":\"HE\",\"geo_timezone\":\"Europe/Berlin\",\"event_format\":\"jsonschema\",\"geo_zipcode\":\"60313\",\"useragent\":\"Go-http-client/2.0\",\"event_name\":\"luke_test_percentage\",\"dvce_created_tstamp\":\"2020-11-10T16:56:37.115Z\",\"dvce_sent_tstamp\":\"2020-11-10T16:56:37.117Z\",\"geo_longitude\":8.6843,\"v_tracker\":\"golang-2.3.0\",\"derived_tstamp\":\"2020-11-10T16:56:37.399Z\",\"app_id\":\"test_percentage_luke\",\"geo_region_name\":\"Hesse\",\"event_fingerprint\":\"b9f667befa875fcae1bedd55e463330e\",\"v_etl\":\"beam-enrich-1.4.1-common-1.4.1\",\"contexts_com_snowplowanalytics_snowplow_ua_parser_context_1_0_0\":[{\"device_family\":\"Other\",\"os_family\":\"Other\",\"useragent_family\":\"Go-http-client\",\"os_major\":null,\"os_minor\":null,\"os_patch\":null,\"os_patch_minor\":null,\"os_version\":\"Other\",\"useragent_major\":\"2\",\"useragent_minor\":\"0\",\"useragent_patch\":null,\"useragent_version\":\"Go-http-client 2.0\"}],\"contexts_com_iab_snowplow_spiders_and_robots_1_0_0\":[{\"category\":\"SPIDER_OR_ROBOT\",\"primary_impact\":\"UNKNOWN\",\"reason\":\"FAILED_UA_INCLUDE\",\"spider_or_robot\":true}],\"contexts_nl_basjes_yauaa_context_1_0_1\":[{\"device_class\":\"Unknown\",\"agent_build\":null,\"agent_class\":\"Special\",\"agent_information_email\":null,\"agent_information_url\":null,\"agent_language\":null,\"agent_language_code\":null,\"agent_name\":\"Go-http-client\",\"agent_name_version\":\"Go-http-client 2.0\",\"agent_name_version_major\":\"Go-http-client 2\",\"agent_security\":null,\"agent_uuid\":null,\"agent_version\":\"2.0\",\"agent_version_major\":\"2\",\"anonymized\":null,\"carrier\":null,\"device_brand\":\"Unknown\",\"device_cpu\":null,\"device_cpu_bits\":null,\"device_firmware_version\":null,\"device_name\":\"Unknown\",\"device_version\":null,\"facebook_carrier\":null,\"facebook_device_class\":null,\"facebook_device_name\":null,\"facebook_device_version\":null,\"facebook_fbop\":null,\"facebook_fbss\":null,\"facebook_operating_system_name\":null,\"facebook_operating_system_version\":null,\"g_sa_installation_id\":null,\"hacker_attack_vector\":null,\"hacker_toolkit\":null,\"i_e_compatibility_name_version\":null,\"i_e_compatibility_name_version_major\":null,\"i_e_compatibility_version\":null,\"i_e_compatibility_version_major\":null,\"kobo_affiliate\":null,\"kobo_platform_id\":null,\"layout_engine_build\":null,\"layout_engine_class\":\"Unknown\",\"layout_engine_name\":\"Unknown\",\"layout_engine_name_version\":\"Unknown ??\",\"layout_engine_name_version_major\":\"Unknown ??\",\"layout_engine_version\":\"??\",\"layout_engine_version_major\":\"??\",\"network_type\":null,\"operating_system_class\":\"Unknown\",\"operating_system_name\":\"Unknown\",\"operating_system_name_version\":\"Unknown ??\",\"operating_system_name_version_major\":\"Unknown ??\",\"operating_system_version\":\"??\",\"operating_system_version_build\":null,\"operating_system_version_major\":\"??\",\"webview_app_name\":null,\"webview_app_name_version_major\":null,\"webview_app_version\":null,\"webview_app_version_major\":null}],\"contexts_com_snplow_eng_gcp_luke_test_percentage_1_0_0\":[{\"availability_%\":\"10\",\"name\":\"test01\"}]}}" + } +} diff --git a/repeater/src/test/resources/payload.json b/repeater/src/test/resources/payload.json new file mode 100644 index 00000000..832cf5ab --- /dev/null +++ b/repeater/src/test/resources/payload.json @@ -0,0 +1,120 @@ +{ + "geo_country": "DE", + "event": "unstruct", + "user_ipaddress": "18.194.133.57", + "event_version": "1-0-0", + "geo_city": "Frankfurt am Main", + "platform": "srv", + "event_id": "90aebb1b-1d49-455b-9049-15ec72dfe5a9", + "etl_tstamp": "2020-11-10T16:56:39.283Z", + "geo_latitude": 50.1188, + "v_collector": "ssc-2.0.0-googlepubsub", + "collector_tstamp": "2020-11-10T16:56:37.401Z", + "event_vendor": "com.snplow.eng.gcp", + "network_userid": "70ec6f9e-cfe9-46c7-8b9c-fab280ea120e", + "geo_region": "HE", + "geo_timezone": "Europe/Berlin", + "event_format": "jsonschema", + "geo_zipcode": "60313", + "useragent": "Go-http-client/2.0", + "event_name": "luke_test_percentage", + "dvce_created_tstamp": "2020-11-10T16:56:37.115Z", + "dvce_sent_tstamp": "2020-11-10T16:56:37.117Z", + "geo_longitude": 8.6843, + "v_tracker": "golang-2.3.0", + "derived_tstamp": "2020-11-10T16:56:37.399Z", + "app_id": "test_percentage_luke", + "geo_region_name": "Hesse", + "event_fingerprint": "b9f667befa875fcae1bedd55e463330e", + "v_etl": "beam-enrich-1.4.1-common-1.4.1", + "contexts_com_snowplowanalytics_snowplow_ua_parser_context_1_0_0": [ + { + "device_family": "Other", + "os_family": "Other", + "useragent_family": "Go-http-client", + "os_major": null, + "os_minor": null, + "os_patch": null, + "os_patch_minor": null, + "os_version": "Other", + "useragent_major": "2", + "useragent_minor": "0", + "useragent_patch": null, + "useragent_version": "Go-http-client 2.0" + } + ], + "contexts_com_iab_snowplow_spiders_and_robots_1_0_0": [ + { + "category": "SPIDER_OR_ROBOT", + "primary_impact": "UNKNOWN", + "reason": "FAILED_UA_INCLUDE", + "spider_or_robot": true + } + ], + "contexts_nl_basjes_yauaa_context_1_0_1": [ + { + "device_class": "Unknown", + "agent_build": null, + "agent_class": "Special", + "agent_information_email": null, + "agent_information_url": null, + "agent_language": null, + "agent_language_code": null, + "agent_name": "Go-http-client", + "agent_name_version": "Go-http-client 2.0", + "agent_name_version_major": "Go-http-client 2", + "agent_security": null, + "agent_uuid": null, + "agent_version": "2.0", + "agent_version_major": "2", + "anonymized": null, + "carrier": null, + "device_brand": "Unknown", + "device_cpu": null, + "device_cpu_bits": null, + "device_firmware_version": null, + "device_name": "Unknown", + "device_version": null, + "facebook_carrier": null, + "facebook_device_class": null, + "facebook_device_name": null, + "facebook_device_version": null, + "facebook_fbop": null, + "facebook_fbss": null, + "facebook_operating_system_name": null, + "facebook_operating_system_version": null, + "g_sa_installation_id": null, + "hacker_attack_vector": null, + "hacker_toolkit": null, + "i_e_compatibility_name_version": null, + "i_e_compatibility_name_version_major": null, + "i_e_compatibility_version": null, + "i_e_compatibility_version_major": null, + "kobo_affiliate": null, + "kobo_platform_id": null, + "layout_engine_build": null, + "layout_engine_class": "Unknown", + "layout_engine_name": "Unknown", + "layout_engine_name_version": "Unknown ??", + "layout_engine_name_version_major": "Unknown ??", + "layout_engine_version": "??", + "layout_engine_version_major": "??", + "network_type": null, + "operating_system_class": "Unknown", + "operating_system_name": "Unknown", + "operating_system_name_version": "Unknown ??", + "operating_system_name_version_major": "Unknown ??", + "operating_system_version": "??", + "operating_system_version_build": null, + "operating_system_version_major": "??", + "webview_app_name": null, + "webview_app_name_version_major": null, + "webview_app_version": null, + "webview_app_version_major": null + } + ], + "contexts_com_snplow_eng_gcp_luke_test_percentage_1_0_0": [{ + "availability_%": "10", + "name": "test01" + }] +} diff --git a/repeater/src/test/resources/payload_fixed.json b/repeater/src/test/resources/payload_fixed.json new file mode 100644 index 00000000..dcc0c1ea --- /dev/null +++ b/repeater/src/test/resources/payload_fixed.json @@ -0,0 +1,120 @@ +{ + "geo_country": "DE", + "event": "unstruct", + "user_ipaddress": "18.194.133.57", + "event_version": "1-0-0", + "geo_city": "Frankfurt am Main", + "platform": "srv", + "event_id": "90aebb1b-1d49-455b-9049-15ec72dfe5a9", + "etl_tstamp": "2020-11-10T16:56:39.283Z", + "geo_latitude": 50.1188, + "v_collector": "ssc-2.0.0-googlepubsub", + "collector_tstamp": "2020-11-10T16:56:37.401Z", + "event_vendor": "com.snplow.eng.gcp", + "network_userid": "70ec6f9e-cfe9-46c7-8b9c-fab280ea120e", + "geo_region": "HE", + "geo_timezone": "Europe/Berlin", + "event_format": "jsonschema", + "geo_zipcode": "60313", + "useragent": "Go-http-client/2.0", + "event_name": "luke_test_percentage", + "dvce_created_tstamp": "2020-11-10T16:56:37.115Z", + "dvce_sent_tstamp": "2020-11-10T16:56:37.117Z", + "geo_longitude": 8.6843, + "v_tracker": "golang-2.3.0", + "derived_tstamp": "2020-11-10T16:56:37.399Z", + "app_id": "test_percentage_luke", + "geo_region_name": "Hesse", + "event_fingerprint": "b9f667befa875fcae1bedd55e463330e", + "v_etl": "beam-enrich-1.4.1-common-1.4.1", + "contexts_com_snowplowanalytics_snowplow_ua_parser_context_1_0_0": [ + { + "device_family": "Other", + "os_family": "Other", + "useragent_family": "Go-http-client", + "os_major": null, + "os_minor": null, + "os_patch": null, + "os_patch_minor": null, + "os_version": "Other", + "useragent_major": "2", + "useragent_minor": "0", + "useragent_patch": null, + "useragent_version": "Go-http-client 2.0" + } + ], + "contexts_com_iab_snowplow_spiders_and_robots_1_0_0": [ + { + "category": "SPIDER_OR_ROBOT", + "primary_impact": "UNKNOWN", + "reason": "FAILED_UA_INCLUDE", + "spider_or_robot": true + } + ], + "contexts_nl_basjes_yauaa_context_1_0_1": [ + { + "device_class": "Unknown", + "agent_build": null, + "agent_class": "Special", + "agent_information_email": null, + "agent_information_url": null, + "agent_language": null, + "agent_language_code": null, + "agent_name": "Go-http-client", + "agent_name_version": "Go-http-client 2.0", + "agent_name_version_major": "Go-http-client 2", + "agent_security": null, + "agent_uuid": null, + "agent_version": "2.0", + "agent_version_major": "2", + "anonymized": null, + "carrier": null, + "device_brand": "Unknown", + "device_cpu": null, + "device_cpu_bits": null, + "device_firmware_version": null, + "device_name": "Unknown", + "device_version": null, + "facebook_carrier": null, + "facebook_device_class": null, + "facebook_device_name": null, + "facebook_device_version": null, + "facebook_fbop": null, + "facebook_fbss": null, + "facebook_operating_system_name": null, + "facebook_operating_system_version": null, + "g_sa_installation_id": null, + "hacker_attack_vector": null, + "hacker_toolkit": null, + "i_e_compatibility_name_version": null, + "i_e_compatibility_name_version_major": null, + "i_e_compatibility_version": null, + "i_e_compatibility_version_major": null, + "kobo_affiliate": null, + "kobo_platform_id": null, + "layout_engine_build": null, + "layout_engine_class": "Unknown", + "layout_engine_name": "Unknown", + "layout_engine_name_version": "Unknown ??", + "layout_engine_name_version_major": "Unknown ??", + "layout_engine_version": "??", + "layout_engine_version_major": "??", + "network_type": null, + "operating_system_class": "Unknown", + "operating_system_name": "Unknown", + "operating_system_name_version": "Unknown ??", + "operating_system_name_version_major": "Unknown ??", + "operating_system_version": "??", + "operating_system_version_build": null, + "operating_system_version_major": "??", + "webview_app_name": null, + "webview_app_name_version_major": null, + "webview_app_version": null, + "webview_app_version_major": null + } + ], + "contexts_com_snplow_eng_gcp_luke_test_percentage_1_0_3": [{ + "availability_percentage": "10", + "name": "test01" + }] +} diff --git a/repeater/src/test/scala/com/snowplowanalytics/snowplow/storage/bigquery/repeater/RecoverSpec.scala b/repeater/src/test/scala/com/snowplowanalytics/snowplow/storage/bigquery/repeater/RecoverSpec.scala new file mode 100644 index 00000000..49dfebdf --- /dev/null +++ b/repeater/src/test/scala/com/snowplowanalytics/snowplow/storage/bigquery/repeater/RecoverSpec.scala @@ -0,0 +1,29 @@ +package com.snowplowanalytics.snowplow.storage.bigquery.repeater + +import scala.io.Source + +import io.circe.Json +import io.circe.parser.parse + +import org.specs2.Specification + +class RecoverSpec extends Specification { + + def is = + s2""" + recover parses json $e1 + """ + + def e1 = { + val ColumnToFix = "contexts_com_snplow_eng_gcp_luke_test_percentage_1_0_0" + val FixedColumn = "contexts_com_snplow_eng_gcp_luke_test_percentage_1_0_3" + + val in = Source.fromResource("failed_inserts.json").mkString + val recovered = Recover.recover(ColumnToFix, FixedColumn)(in) + val out = parse(Source.fromResource("payload_fixed.json").mkString).getOrElse(Json.Null) + + recovered must beRight.like { + case Recover.IdAndEvent(_, event) => event must beEqualTo(out) + } + } +}