Deploying Snowplow 3 - Kubernetes Cluster

This is the Part 3 of our post that explains how to deploy Snowplow on GCP and GKE. In part 1 we gave a high level overview of the architure of our deployment. In part 2 we went into full details of how to create the infrastrucure required for this deployment using Terraform. In this part, we explain in details, how to deploy the services on Kubernetes. Note that the implemtation mentioned here does not include horizontal scaling HPA, and must be manually updated to handle increased traffic.

We will go over HPA in a future post. In this post, we go over these resources:

  1. Namespace
  2. Service Account
  3. Iglu Service
  4. Collector
  5. Enrich
  6. Data Warehouse Loaders
  7. Streamloader
  8. Repeater
  9. Mutator
  10. Mutator Init
  11. Endpoint
  12. Wrap Up

Namespace

In our implementation, we deploy everything in snowplow namespace. We need to create the namespace first:

# snowplow/namespace.yaml

apiVersion: v1
kind: Namespace
metadata:
  name: snowplow

Service Account

As we mentioned in part 2, in order to authenticate our GKE deployments, we use GKE’s Workload Identity Federation. The way this works is that we create a GKE service account named snowplow-service-account in snowplow namespace and let our deployments to use this service account by mentioning:

serviceAccountName: snowplow-service-account

in deployment’s template spec. In part 2, we allowed this service account to bind to GCP service account snowplow that we created specially for this purpose and gave it all the required permissions. So let’s create the GKE service account:

# snowplow/service-account.yaml

apiVersion: v1
kind: ServiceAccount

metadata:
  annotations:
    iam.gke.io/gcp-service-account: snowplow@<GCP-PROJECT>.iam.gserviceaccount.com
  name: snowplow-service-account
  namespace: snowplow

Note that snowplow@<GCP-PROJECT>.iam.gserviceaccount.com is the name of GCP service account we created in part 2. You should change <GCP-PROJECT> to the name of your GCP project.

Iglu Service

Perfect. Next, let’s deploy Iglu service. Iglu Server in Snowplow’s documentation words:

The Iglu Server is an Iglu schema registry which allows you to publish, test and serve schemas via an easy-to-use RESTful interface.

We divide Iglu service declaration (and all other service to come next) into a config file and a deployment file. Here is the config file for the Iglu service. Here you can find full documentation for Iglu Server Configuration. Note that this is the documentation for the latest version, but it gives a very good overview of the options for older version.

Also note that we make sure we use an older image that does not require us to accept the new Snowplow Limited Use License. Therefore if you don’t want to be limited by the new license, make sure you use an older version and make sure you don’t have:

license {
  accept = true
}

in your config. This applies to all the configurations we show next as well.

# snowplow/iglu-config.yaml

kind: ConfigMap
apiVersion: v1
metadata:
  name: iglu-configmap
  namespace: snowplow

data:
  iglu-server.hocon: |
    {
      "repoServer": {
        "interface": "0.0.0.0"
        "port": 8080
        "threadPool": "cached"
        "maxConnections": 2048
      }
      "database": {
        "type": "postgres"
        "host": "<igludb_ip_address>"
        "port": 5432
        "dbname": "<igludb_db_name>"
        "username": "<igludb_username>"
        "password": "<igludb_password>"
        "driver": "org.postgresql.Driver"
        pool: {
          "type": "hikari"
          "maximumPoolSize": 5
          connectionPool: {
            "type": "fixed"
            "size": 4
          }
          "transactionPool": "cached"
        }
      }
      "debug": false
      "patchesAllowed": true
      "superApiKey": "<UUID>"
    }

Note that you need to make sure you full the following values to match with the ones we used in part 2 to create the resources:

  • igludb_ip_address
  • igludb_db_name
  • igludb_username
  • igludb_password

Also, you need to generate a UUID for superApiKey. Make sure to keep it private. After you created and applied the config file, let’s create and apply the deployment:

# snowplow/iglu.yaml

apiVersion: apps/v1
kind: Deployment

metadata:
  name: iglu-server
  namespace: snowplow

spec:
  selector:
    matchLabels:
      app: iglu

  replicas: 2
  revisionHistoryLimit: 1

  template:
    metadata:
      labels:
        app: iglu

    spec:
      securityContext:
        runAsUser: 1000
        runAsGroup: 3000
        fsGroup: 2000

      containers:
        - name: iglu-server
          image: snowplow/iglu-server:0.11.0
          command:
            [
              "/home/snowplow/bin/iglu-server",
              "--config",
              "/snowplow/config/iglu-server.hocon",
            ]
          imagePullPolicy: "IfNotPresent"

          env:
            - name: JAVA_OPTS
              value: -Dorg.slf4j.simpleLogger.defaultLogLevel=info

          volumeMounts:
            - name: iglu-config-volume
              mountPath: /snowplow/config

          resources:
            requests:
              memory: "256Mi"
              cpu: "100m"
            limits:
              memory: "1.5Gi"

      volumes:
        - name: iglu-config-volume
          configMap:
            name: iglu-configmap
            items:
              - key: iglu-server.hocon
                path: iglu-server.hocon

---
apiVersion: v1
kind: Service

metadata:
  name: iglu-server-service
  namespace: snowplow

spec:
  selector:
    app: iglu

  ports:
    - name: http
      protocol: TCP
      port: 80
      targetPort: 8080

Collector

Collector is the service that event traffic sees first, so let’s implement it. Again we divide collector declaration into two parts, config and deployment. Let’s start with configuration. In the configuration file below, make sure you replace <GCP-PROJECT> with the name of your project. Also the collector service has three endpoints that can be customized:

  1. tracker endpoint: /com.snowplowanalytics.snowplow/tp2
  2. redirect endpoint: /r/tp2
  3. iglu endpoint: /com.snowplowanalytics.iglu/v1

in the config file below we have customized these endpoints to /v1/track, /v1/redirect, and /v1/iglu just to show how it’s done. You can see the full configuration document for the latest version here. Again make sure you use an older version if you don’t want to be limited by the new license.

# snowplow/collector-config.yaml

kind: ConfigMap
apiVersion: v1
metadata:
  name: collector-configmap
  namespace: snowplow
data:
  config.hocon: |
    collector {
      interface = "0.0.0.0"
      port = 8080
      ssl {
        enable = false
        redirect = false
        port = 8443
      }
      paths {
        "/v1/track" = "/com.snowplowanalytics.snowplow/tp2"
        "/v1/redirect" = "/r/tp2"
        "/v1/iglu" = "/com.snowplowanalytics.iglu/v1"
      }
      p3p {
        policyRef = "/w3c/p3p.xml"
        CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
      }
      crossDomain {
        enabled = false
        domains = [ "*" ]
        secure = true
      }
      cookie {
        enabled = true
        expiration = "365 days"
        name = sp
        domains = []
        fallbackDomain = ""
        secure = true
        httpOnly = false
        sameSite = "None"
      }
      doNotTrackCookie {
        enabled = false
        name = ""
        value = ""
      }
      cookieBounce {
        enabled = false
        name = "n3pc"
        fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
        forwardedProtocolHeader = "X-Forwarded-Proto"
      }
      enableDefaultRedirect = false
      redirectMacro {
        enabled = false
        placeholder = "[TOKEN]"
      }
      rootResponse {
        enabled = false
        statusCode = 302
        headers = {}
        body = "302, redirecting"
      }
      cors {
        accessControlMaxAge = "5 seconds"
      }
      prometheusMetrics {
        enabled = false
      }
      streams {
        good = snowplow-raw
        bad = snowplow-bad-1
        useIpAddressAsPartitionKey = false
        sink {
          enabled = google-pub-sub
          googleProjectId = "<GCP-PROJECT>"
          backoffPolicy {
            minBackoff = 1000
            maxBackoff = 1000
            totalBackoff = 10000
            multiplier = 1
          }
        }
        buffer {
          byteLimit = 1000000
          recordLimit = 500
          timeLimit = 500
        }
      }
      telemetry {
        disable = true
        url = "telemetry-g.snowplowanalytics.com"
        userProvidedId = ""
        moduleName = "collector-pubsub-ce"
        moduleVersion = "0.2.2"
        autoGeneratedId = ""
      }
    }
    akka {
      loglevel = WARNING
      loggers = ["akka.event.slf4j.Slf4jLogger"]
      http.server {
        remote-address-header = on
        raw-request-uri-header = on
        parsing {
          max-uri-length = 32768
          uri-parsing-mode = relaxed
        }
        max-connections = 2048
      }
    }

And now the collector implementation itself:

# snowplow/collector.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: collector-server
  namespace: snowplow
spec:
  selector:
    matchLabels:
      app: collector
  replicas: 2
  revisionHistoryLimit: 1

  template:
    metadata:
      labels:
        app: collector
    spec:
      # Prevent the scheduler from placing two pods on the same node
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
            - weight: 10
              podAffinityTerm:
                labelSelector:
                  matchExpressions:
                    - key: app
                      operator: In
                      values:
                        - collector
                topologyKey: "kubernetes.io/hostname"

      serviceAccountName: snowplow-service-account
      containers:
        - name: collector-server
          image: snowplow/scala-stream-collector-pubsub:2.10.0
          command:
            - "/home/snowplow/bin/snowplow-stream-collector"
            - "--config"
            - "/snowplow/config/config.hocon"
          imagePullPolicy: "IfNotPresent"
          env:
            - name: JAVA_OPTS
              value: -Dorg.slf4j.simpleLogger.defaultLogLevel=info
          volumeMounts:
            - name: collector-config-volume
              mountPath: /snowplow/config
          resources:
            requests:
              memory: "128Mi"
              cpu: "250m"
            limits:
              memory: "1Gi"
      volumes:
        - name: collector-config-volume
          configMap:
            name: collector-configmap
            items:
              - key: config.hocon
                path: config.hocon

---
apiVersion: cloud.google.com/v1
kind: BackendConfig
metadata:
  name: collector-backendconfig
  namespace: snowplow
spec:
  timeoutSec: 60
  healthCheck:
    checkIntervalSec: 10
    timeoutSec: 10
    healthyThreshold: 3
    unhealthyThreshold: 5
    type: HTTP
    requestPath: /health
    port: 8080
  logging:
    enable: false

---
apiVersion: v1
kind: Service
metadata:
  name: collector-server-service
  namespace: snowplow
  annotations:
    cloud.google.com/backend-config: '{"default": "collector-backendconfig"}' # this is only required if you run on GKE
spec:
  selector:
    app: collector
  type: ClusterIP
  ports:
    - protocol: TCP
      port: 8080
      targetPort: 8080

Enrich

Going from right to left in the architecture diagram in part 1, next is Enrich server. We are doing basically the same thing as we did for the collector and divide it into a configuration file and a deployment file. Here is the latest documentation for the configuration of the enrich server

Note that in the configuration below you need to replace:

  1. <SALT> with some arbitrary random string
  2. <Iglu superApiKey> with the same superApiKey UUID you used in Iglu server configuration above
# snowplow/enrich-config.yaml

kind: ConfigMap
apiVersion: v1
metadata:
  name: enrich-configmap
  namespace: snowplow
data:
  enrichment_campaigns.json: |
    {
      "schema": "iglu:com.snowplowanalytics.snowplow/campaign_attribution/jsonschema/1-0-1",
      "data": {
        "name": "campaign_attribution",
        "vendor": "com.snowplowanalytics.snowplow",
        "enabled": true,
        "parameters": {
          "mapping": "static",
          "fields": {
            "mktMedium": ["utm_medium", "medium"],
            "mktSource": ["utm_source", "source"],
            "mktTerm": ["utm_term", "legacy_term"],
            "mktContent": ["utm_content"],
            "mktCampaign": ["utm_campaign", "cid", "legacy_campaign"]
          }
        }
      }
    }
  enrichment_pii.json: |
    {
      "schema": "iglu:com.snowplowanalytics.snowplow.enrichments/pii_enrichment_config/jsonschema/2-0-0",
      "data": {
        "vendor": "com.snowplowanalytics.snowplow.enrichments",
        "name": "pii_enrichment_config",
        "emitEvent": true,
        "enabled": true,
        "parameters": {
          "pii": [
            {
              "pojo": {
                "field": "user_ipaddress"
              }
            }
          ],
          "strategy": {
            "pseudonymize": {
              "hashFunction": "MD5",
              "salt": "<SALT>"
            }
          }
        }
      }
    }
  enrichment_event_fingerprint.json: |
    {
      "schema": "iglu:com.snowplowanalytics.snowplow/event_fingerprint_config/jsonschema/1-0-1",
      "data": {
        "name": "event_fingerprint_config",
        "vendor": "com.snowplowanalytics.snowplow",
        "enabled": true,
        "parameters": {
          "excludeParameters": ["cv", "eid", "nuid", "stm"],
          "hashAlgorithm": "MD5"
        }
      }
    }
  enrichment_referrer_parser.json: |
    {
      "schema": "iglu:com.snowplowanalytics.snowplow/referer_parser/jsonschema/2-0-0",
      "data": {
        "name": "referer_parser",
        "vendor": "com.snowplowanalytics.snowplow",
        "enabled": true,
        "parameters": {
          "database": "referers-latest.json",
          "uri": "https://snowplow-hosted-assets.s3.eu-west-1.amazonaws.com/third-party/referer-parser/",
          "internalDomains": []
        }
      }
    }
  enrichment_ua_parser.json: |
    {
      "schema": "iglu:com.snowplowanalytics.snowplow/ua_parser_config/jsonschema/1-0-1",
      "data": {
        "name": "ua_parser_config",
        "vendor": "com.snowplowanalytics.snowplow",
        "enabled": true,
        "parameters": {
          "uri": "https://snowplow-hosted-assets.s3.eu-west-1.amazonaws.com/third-party/ua-parser",
          "database": "regexes-latest.yaml"
        }
      }
    }
  enrichment_yauaa.json: |
    {
      "schema": "iglu:com.snowplowanalytics.snowplow.enrichments/yauaa_enrichment_config/jsonschema/1-0-0",
      "data": {
        "enabled": true,
        "vendor": "com.snowplowanalytics.snowplow.enrichments",
        "name": "yauaa_enrichment_config"
      }
    }
  config.hocon: |
    {
      "auth": {
        "type": "Gcp"
      },
      "input": {
        "type": "PubSub",
        "subscription": "projects/<GCP-PROJECT>/subscriptions/snowplow-raw"
      },
      "output": {
        "good": {
          "type": "PubSub",
          "topic": "projects/<GCP-PROJECT>/topics/snowplow-enriched",
          "attributes": [ "app_id", "event_name" ]
        },
        "bad": {
          "type": "PubSub",
          "topic": "projects/<GCP-PROJECT>/topics/snowplow-bad-1"
        }
      },
      "assetsUpdatePeriod": "10080 minutes"
    }
  iglu-config.json: |
    {
      "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-3",
      "data": {
        "cacheSize": 500,
        "cacheTtl": 600,
        "repositories": [
          {
            "connection": {
              "http": {
                "uri": "http://iglucentral.com"
              }
            },
            "name": "Iglu Central",
            "priority": 10,
            "vendorPrefixes": []
          },
          {
            "connection": {
              "http": {
                "uri": "http://mirror01.iglucentral.com"
              }
            },
            "name": "Iglu Central - Mirror 01",
            "priority": 20,
            "vendorPrefixes": []
          },
          {
            "connection": {
              "http": {
                "apikey": "<Iglu superApiKey>",
                "uri": "http://iglu-server-service/api"
              }
            },
            "name": "Iglu Server",
            "priority": 0,
            "vendorPrefixes": []
          }
        ]
      }
    }

And here is the Enrich deployment file:

# snowplow/enrich.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: enrich-server
  namespace: snowplow
spec:
  selector:
    matchLabels:
      app: enrich
  replicas: 1
  revisionHistoryLimit: 1

  template:
    metadata:
      labels:
        app: enrich
    spec:
      serviceAccountName: snowplow-service-account
      containers:
        - name: enrich-server
          image: snowplow/snowplow-enrich-pubsub:3.9.0
          command:
            - "/home/snowplow/bin/snowplow-enrich-pubsub"
            - "--config"
            - "/snowplow/config/config.hocon"
            - "--iglu-config"
            - "/snowplow/config/iglu-config.json"
            - "--enrichments"
            - "/snowplow/config/enrichments"
          imagePullPolicy: "IfNotPresent"
          env:
            - name: JAVA_OPTS
              value: -Dorg.slf4j.simpleLogger.defaultLogLevel=info -Dorg.slf4j.simpleLogger.log.InvalidEnriched=debug
          volumeMounts:
            - name: enrich-config-volume
              mountPath: /snowplow/config
          resources:
            requests:
              memory: "256Mi"
              cpu: "350m"
            limits:
              memory: "1.5Gi"
              cpu: 2
      volumes:
        - name: enrich-config-volume
          configMap:
            name: enrich-configmap
            items:
              - key: iglu-config.json
                path: iglu-config.json
              - key: config.hocon
                path: config.hocon
              - key: enrichment_campaigns.json
                path: enrichments/enrichment_campaigns.json
              - key: enrichment_pii.json
                path: enrichments/enrichment_pii.json
              - key: enrichment_event_fingerprint.json
                path: enrichments/enrichment_event_fingerprint.json
              - key: enrichment_referrer_parser.json
                path: enrichments/enrichment_referrer_parser.json
              - key: enrichment_ua_parser.json
                path: enrichments/enrichment_ua_parser.json

Data Warehouse Loaders

We have three (or four if you count the mutator-init server) more deployments to deploy:

  1. streamloader
  2. repeater
  3. mutator
  4. mutator-init (A one time user table initialization service)

These services are exteremely similar, and share the same configuration file. For the most up-to-date configuration documents, checkout BigQuery Loader page

Let us start with the configuration file. In the configuration file below, you just need to replace:

  1. <GCP-PROJECT>
  2. <Iglu superApiKey>

with their appropriate values.

# snowplow/streamloader-config.yaml

kind: ConfigMap
apiVersion: v1
metadata:
  name: streamloader-configmap
  namespace: snowplow
data:
  config.hocon: |
    {
      "projectId": "<GCP-PROJECT>",
      "loader": {
        "input": {
          "subscription": "snowplow-enriched"
        },
        "output": {
          "good": {
            "datasetId": "snowplow",
            "tableId": "events"
          },
          "bad": {
            "topic": "snowplow-bq-bad-rows"
          },
          "types": {
            "topic": "snowplow-bq-loader-server-types"
          },
          "failedInserts": {
            "topic": "snowplow-bq-loader-server-failed-inserts"
          }
        }
      },
      "mutator": {
        "input": {
          "subscription": "snowplow-bq-loader-server-types"
        },
        "output": {
          "good": ${loader.output.good}
        }
      },
      "repeater": {
        "input": {
          "subscription": "snowplow-bq-loader-server-failed-inserts"
        },
        "output": {
          "good": ${loader.output.good}
          "deadLetters": {
            "bucket": "gs://spangle-snowplow-bq-loader-dead-letter"
          }
        }
      }
    }
  iglu-config.json: |
    {
      "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-3",
      "data": {
        "cacheSize": 500,
        "cacheTtl": 600,
        "repositories": [
          {
            "connection": {
              "http": {
                "uri": "http://iglucentral.com"
              }
            },
            "name": "Iglu Central",
            "priority": 10,
            "vendorPrefixes": []
          },
          {
            "connection": {
              "http": {
                "uri": "http://mirror01.iglucentral.com"
              }
            },
            "name": "Iglu Central - Mirror 01",
            "priority": 20,
            "vendorPrefixes": []
          },
          {
            "connection": {
              "http": {
                "apikey": "<Iglu superApiKey>",
                "uri": "http://iglu-server-service/api"
              }
            },
            "name": "Iglu Server",
            "priority": 0,
            "vendorPrefixes": []
          }
        ]
      }
    }

Streamloader

Now we can easily deploy the streamloader:

# snowplow/streamloader.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: streamloader-server
  namespace: snowplow
spec:
  selector:
    matchLabels:
      app: streamloader
  replicas: 1
  revisionHistoryLimit: 1

  template:
    metadata:
      labels:
        app: streamloader
    spec:
      serviceAccountName: snowplow-service-account
      containers:
        - name: streamloader-server
          image: snowplow/snowplow-bigquery-streamloader:1.7.1
          command:
            - "/home/snowplow/bin/snowplow-bigquery-streamloader"
            - "--config"
            - "/snowplow/config/config.hocon"
            - "--resolver"
            - "/snowplow/config/iglu-config.json"
          imagePullPolicy: "IfNotPresent"
          env:
            - name: JAVA_OPTS
              value: -Dorg.slf4j.simpleLogger.defaultLogLevel=info
          volumeMounts:
            - name: streamloader-config-volume
              mountPath: /snowplow/config
          resources:
            requests:
              memory: "256Mi"
              cpu: "250m"
            limits:
              memory: "2Gi"
      volumes:
        - name: streamloader-config-volume
          configMap:
            name: streamloader-configmap
            items:
              - key: iglu-config.json
                path: iglu-config.json
              - key: config.hocon
                path: config.hocon

Repeater

Deploying repeater is done quite easily as well:

# snowplow/repeater.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: repeater-server
  namespace: snowplow
spec:
  selector:
    matchLabels:
      app: repeater
  replicas: 1
  revisionHistoryLimit: 1

  template:
    metadata:
      labels:
        app: repeater
    spec:
      serviceAccountName: snowplow-service-account
      containers:
        - name: repeater-server
          image: snowplow/snowplow-bigquery-repeater:1.7.1
          command:
            - "/home/snowplow/bin/snowplow-bigquery-repeater"
            - "--config"
            - "/snowplow/config/config.hocon"
            - "--resolver"
            - "/snowplow/config/iglu-config.json"
            - "--bufferSize=20"
            - "--timeout=20"
            - "--backoffPeriod=900"
          imagePullPolicy: "IfNotPresent"
          env:
            - name: JAVA_OPTS
              value: -Dorg.slf4j.simpleLogger.defaultLogLevel=info
          volumeMounts:
            - name: repeater-config-volume
              mountPath: /snowplow/config
          resources:
            requests:
              memory: "128Mi"
              cpu: "150m"
            limits:
              memory: "512Mi"
      volumes:
        - name: repeater-config-volume
          configMap:
            name: streamloader-configmap
            items:
              - key: iglu-config.json
                path: iglu-config.json
              - key: config.hocon
                path: config.hocon

Mutator

Deploying mutator is also easy:

# snowplow/mutator.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mutator-server
  namespace: snowplow
spec:
  selector:
    matchLabels:
      app: mutator
  replicas: 1
  revisionHistoryLimit: 1

  template:
    metadata:
      labels:
        app: mutator
    spec:
      serviceAccountName: snowplow-service-account
      containers:
        - name: mutator-server
          image: snowplow/snowplow-bigquery-mutator:1.7.1
          command:
            - "/home/snowplow/bin/snowplow-bigquery-mutator"
            - "listen"
            - "--config"
            - "/snowplow/config/config.hocon"
            - "--resolver"
            - "/snowplow/config/iglu-config.json"
          imagePullPolicy: "IfNotPresent"
          env:
            - name: JAVA_OPTS
              value: -Dorg.slf4j.simpleLogger.defaultLogLevel=info
          volumeMounts:
            - name: mutator-config-volume
              mountPath: /snowplow/config
          resources:
            requests:
              memory: "128Mi"
              cpu: "150m"
            limits:
              memory: "512Mi"
      volumes:
        - name: mutator-config-volume
          configMap:
            name: streamloader-configmap
            items:
              - key: iglu-config.json
                path: iglu-config.json
              - key: config.hocon
                path: config.hocon

Mutator Init

Mutator comes with another serivce, mutator-init, which has a one time utility to create the BigQuery table. A similar description file deploys mutator-init service:

# snowplow/mutator-init.yaml

apiVersion: v1
kind: Pod
metadata:
  name: mutator-init-server
  namespace: snowplow
spec:
  serviceAccountName: snowplow-service-account
  containers:
    - name: mutator-init-server
      image: snowplow/snowplow-bigquery-mutator:1.7.1
      command:
        - "/home/snowplow/bin/snowplow-bigquery-mutator"
        - "create"
        - "--config"
        - "/snowplow/config/config.hocon"
        - "--resolver"
        - "/snowplow/config/iglu-config.json"
        - "--partitionColumn=collector_tstamp"
        - "--requirePartitionFilter"
      imagePullPolicy: "IfNotPresent"
      env:
        - name: JAVA_OPTS
          value: -Dorg.slf4j.simpleLogger.defaultLogLevel=info
      volumeMounts:
        - name: mutator-config-volume
          mountPath: /snowplow/config
      resources:
        requests:
          memory: "128Mi"
          cpu: "150m"
        limits:
          memory: "512Mi"
  volumes:
    - name: mutator-config-volume
      configMap:
        name: streamloader-configmap
        items:
          - key: iglu-config.json
            path: iglu-config.json
          - key: config.hocon
            path: config.hocon

Endpoint

Finally we need to create the endpoint to receive incoming tracking calls and pass them to the collector service. In the following, you need to:

  1. Replace <TRACKER.DOMAIN> with the tracker’s domain name. Something like tracker.example.com.
  2. Replace the name of the managed certificate from tracker-domain-certificate to something more sensible, something like tracker-example-com.
# snowplow/endpoint.yaml

apiVersion: networking.k8s.io/v1
kind: Ingress

metadata:
  name: snowplow-ingress
  namespace: snowplow

  annotations:
    kubernetes.io/ingress.global-static-ip-name: snowplow-ingress-ip
    networking.gke.io/managed-certificates: tracker-domain-certificate
    networking.gke.io/v1beta1.FrontendConfig: http-to-https
    # kubernetes.io/ingress.allow-http: "true"

spec:
  rules:
    - host: <TRACKER.DOMAIN>
      http:
        paths:
          - path: /
            pathType: Prefix
            backend:
              service:
                name: collector-server-service
                port:
                  number: 8080

---
apiVersion: networking.gke.io/v1beta1
kind: FrontendConfig
metadata:
  name: http-to-https
  namespace: snowplow

spec:
  redirectToHttps:
    enabled: true
    responseCodeName: MOVED_PERMANENTLY_DEFAULT

---
apiVersion: networking.gke.io/v1
kind: ManagedCertificate

metadata:
  name: tracker-domain-certificate
  namespace: snowplow

spec:
  domains:
    - <TRACKER.DOMAIN>

Wrap Up

Now you should have 14 files in snowplow folder for kubernetes:

    snowplow
    ├── README.md
    ├── collector-config.yaml
    ├── collector.yaml
    ├── endpoint.yaml
    ├── enrich-config.yaml
    ├── enrich.yaml
    ├── iglu-config.yaml
    ├── iglu.yaml
    ├── mutator-init.yaml
    ├── mutator.yaml
    ├── namespace.yaml
    ├── repeater.yaml
    ├── service-account.yaml
    ├── streamloader-config.yaml
    └── streamloader.yaml

Deploying Snowplow 2 - Infrastructure

This is part 2 of our post that explains how to deploy Snowplow GCP and GKE. In part 1 we gave a high level overview of the architecture. In this part we will explain how to use Terraform to create the required infrastructure in GCP. You probably want to add the following defintions in a folder called snowplow insides modules folder and then import it from main.

We go over these resources:

  1. Variables
  2. Google Cloud Storage
  3. PostgreSQL
  4. BigQuery Dataset
  5. Networking and IP Address
  6. PubSub topics and subscriptions
  7. PubSub to GCS Permissions
  8. Service Account and Permissions
  9. Wrap Up

Variables

Let’s start with the set of variables that our infrastructure is going to need:

# modules/snowplow/variables.tf

variable "project_id" {
  description = "The ID of the GCP project"
  type        = string
}

variable "region" {
  description = "The region for GCP resources"
  type        = string
}

variable "igludb_instance_id" {
  description = "The ID of the Cloud SQL PostgreSQL instance for Iglu"
  type        = string
}

variable "igludb_db_name" {
  description = "Database name for Iglu"
  type        = string
}

variable "igludb_username" {
  description = "The username for the Iglu database"
  type        = string
}

variable "igludb_password" {
  description = "The password for the Iglu database user"
  type        = string
  sensitive   = true # Mark as sensitive to protect the password
}

Google Cloud Storage

Next we need three storage buckets for bad rows, failed inserts, etc:

# modules/snowplow/storage.tf

resource "google_storage_bucket" "bq_loader_dead_letter_bucket" {
  name                        = "spangle-snowplow-bq-loader-dead-letter"
  location                    = var.region
  storage_class               = "STANDARD"
  force_destroy               = true
  uniform_bucket_level_access = true

  versioning {
    enabled = false
  }
}

resource "google_storage_bucket" "bad_1_bucket" {
  name                        = "spangle-snowplow-bad-1"
  location                    = var.region
  storage_class               = "STANDARD"
  force_destroy               = true
  uniform_bucket_level_access = true

  versioning {
    enabled = false
  }
}

resource "google_storage_bucket" "bq_bad_rows_bucket" {
  name                        = "spangle-snowplow-bq-bad-rows"
  location                    = var.region
  storage_class               = "STANDARD"
  force_destroy               = true
  uniform_bucket_level_access = true

  versioning {
    enabled = false
  }
}

PostgreSQL

In case you already have a cloudsql PostgreSQL setup you can skip next step, otherwise we need to create a cloudsql PostgreSQL database. Here is just an example setup:

# modules/snowplow/sql_database.tf

resource "google_sql_database_instance" "dev_postgres_instance" {
  name             = "dev-postgres"
  database_version = "POSTGRES_16"
  region           = var.region


  settings {
    tier                        = "db-custom-2-8192"
    deletion_protection_enabled = true

    backup_configuration {
      enabled                        = true
      location                       = "us"
      point_in_time_recovery_enabled = true
    }

    maintenance_window {
      update_track = "canary"
    }
  }
}


Then we need to create a database for the Iglu server on the PostgreSQL database created above:

# modules/snowplow/sql_database.tf

# Create a database in the existing Cloud SQL PostgreSQL instance
resource "google_sql_database" "iglu_db" {
  name     = var.igludb_db_name     # Replace with your desired database name
  instance = var.igludb_instance_id # Reference the existing Cloud SQL instance
  project  = var.project_id         # Reference the project ID
}

# Create a user for the database with a password
resource "google_sql_user" "iglu_postgres_user" {
  name     = var.igludb_username    # Use the username variable
  password = var.igludb_password    # Use the password variable
  instance = var.igludb_instance_id # Reference the existing Cloud SQL instance
  project  = var.project_id         # Reference the project ID
}

If you are creating the database with the commands above, make sure you add a dependency declaration to the database definitions. Something like:

  depends_on = [
    google_sql_database_instance.dev_postgres_instance
  ]

to both google_sql_database.iglu_db and google_sql_database.iglu_postgres_user above.

BigQuery Dataset

Next, let’s create the BigQuery dataset. We call the dataset snowplow, you can of course change that to something else, but you need to make sure you update the service configuration files, that we will go over in part 3, accordingly.

# modules/snowplow/bigquery.tf

# Create a BigQuery dataset with the specified location and description
resource "google_bigquery_dataset" "snowplow_event_dataset" {
  dataset_id  = "snowplow"
  description = "Snowplow event dataset"
  location    = "US"

  project = var.project_id
}

Networking and IP Address

Next, let’s create the static IP address that is used as the tracker endpoint:

# modules/snowplow/network.tf

# Create a global static IP address
resource "google_compute_global_address" "snowplow_ingress_ip" {
  name       = "snowplow-ingress-ip"
  ip_version = "IPV4"
}

PubSub topics and subscriptions

We also need to create the six PubSub topics and their related subscriptions that we mentioned in Part 1.

# modules/snowplow/pubsub.tf

locals {
  pubsub_topics = [
    # Tuple format: ("topic_name", subscription_needed)
    ["snowplow-bad-1", false],
    ["snowplow-bq-bad-rows", false],
    ["snowplow-bq-loader-server-failed-inserts", true],
    ["snowplow-bq-loader-server-types", true],
    ["snowplow-enriched", true],
    ["snowplow-raw", true]
  ]
}

resource "google_pubsub_topic" "pubsub_topics" {
  for_each = { for topic in local.pubsub_topics : topic[0] => topic }
  name     = each.value[0]
}

resource "google_pubsub_subscription" "pubsub_subscriptions" {
  for_each = { for topic in local.pubsub_topics : topic[0] => topic if topic[1] }
  name     = each.value[0]
  topic    = each.value[0]

  expiration_policy {
    ttl = "604800s" # Set TTL to 7 days (604800 seconds)
  }
}

resource "google_pubsub_subscription" "bad_1_subscription" {
  name  = "snowplow-bad-1-gcs"
  topic = google_pubsub_topic.pubsub_topics["snowplow-bad-1"].id

  cloud_storage_config {
    bucket = google_storage_bucket.bad_1_bucket.name

    filename_prefix          = ""
    filename_suffix          = "-${var.region}"
    filename_datetime_format = "YYYY-MM-DD/hh_mm_ssZ"

    # max_bytes    = 1000000
    # max_duration = "300s"
    # max_messages = 1000
  }

  depends_on = [
    google_storage_bucket.bad_1_bucket,
    google_storage_bucket_iam_member.admin,
  ]
}

resource "google_pubsub_subscription" "bq_bad_rows_subscription" {
  name  = "snowplow-bq-bad-rows-gcs"
  topic = google_pubsub_topic.pubsub_topics["snowplow-bq-bad-rows"].id

  cloud_storage_config {
    bucket = google_storage_bucket.bq_bad_rows_bucket.name

    filename_prefix          = ""
    filename_suffix          = "-${var.region}"
    filename_datetime_format = "YYYY-MM-DD/hh_mm_ssZ"

    # max_bytes    = 1000000
    # max_duration = "300s"
    # max_messages = 1000
  }

  depends_on = [
    google_storage_bucket.bq_bad_rows_bucket,
    google_storage_bucket_iam_member.admin,
  ]
}

PubSub to GCP Permissions

Somewhat confusingly, we should explicitly allow PubSub to GCS service account to write to the GCS buckets we mentioned above. In other words:

# modules/snowplow/iam.tf

data "google_project" "project" {
}

locals {
  bucket_names = [
    google_storage_bucket.bad_1_bucket.name,
    google_storage_bucket.bq_bad_rows_bucket.name,
  ]
}

resource "google_storage_bucket_iam_member" "admin" {
  for_each = toset(local.bucket_names)

  bucket = each.value
  role   = "roles/storage.admin"
  member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-pubsub.iam.gserviceaccount.com"
}

Service Account and Permissions

Next we create service account that is going to be used by all the Snowplow services and we give it all the permissions that these services need. Note that, here we are using one shared service account just for simplicity. Ideally, it’s better to create different service accounts for different services and give them exactly the minimum permissions that they need.

Problably the most non-trivial part of this declaration is the last part, workload_identity_user_binding where we allow GKE service account snowplow/snowplow-service-account i.e. service account snowplow-service-account in Kubernetes namespace snowplow to bind to snowplow service account in GCP. In other words, we give our kubernetes services the ability to use GKE service account snowplow/snowplow-service-account which can then bind to snowplow service account we create below on GCP. More on that in Part 3.

# modules/snowplow/service_accounts.tf

# Combined Service Account: snowplow
resource "google_service_account" "snowplow" {
  account_id   = "snowplow"
  display_name = "snowplow"
  description  = "Combined service account with all permissions for Snowplow"
}

# List of roles to be assigned to the service account
locals {
  roles = [
    "roles/bigquery.dataEditor",
    "roles/logging.logWriter",
    "roles/pubsub.publisher",
    "roles/pubsub.subscriber",
    "roles/pubsub.viewer",
    "roles/storage.objectViewer"
  ]
}

# Assign all roles to the service account in a loop
resource "google_project_iam_member" "combined_iam_roles" {
  for_each = toset(local.roles)
  project  = var.project_id
  role     = each.value
  member   = "serviceAccount:${google_service_account.snowplow.email}"
}

# Add IAM policy binding to the Google Cloud Storage bucket
resource "google_storage_bucket_iam_member" "bq_loader_dead_letter_bucket_binding" {
  bucket = google_storage_bucket.bq_loader_dead_letter_bucket.name
  role   = "roles/storage.objectAdmin"
  member = "serviceAccount:${google_service_account.snowplow.email}"
}

# Add Workload Identity User binding
resource "google_service_account_iam_binding" "workload_identity_user_binding" {
  service_account_id = google_service_account.snowplow.name

  role = "roles/iam.workloadIdentityUser"

  members = [
    "serviceAccount:${var.project_id}.svc.id.goog[snowplow/snowplow-service-account]"
  ]
}

Wrap Up

Nice! by now we should have eight files in modules/snowplow folder. Feel tree to terraform plan and terraform apply.

    modules/snowplow
    ├── README.md
    ├── bigquery.tf
    ├── iam.tf
    ├── network.tf
    ├── pubsub.tf
    ├── service_accounts.tf
    ├── sql_database.tf
    ├── storage.tf
    └── variables.tf

In Part 3 we will go over the Kubernetes cluster deployment.

Deploying Snowplow 1 - Architecture

In this article, we explain our deployment of Snowplow Open Source on the GCP platform. Our deployment collects events, passes them to PubSub on GCP, and then stores them in BigQuery. This deployment is made possible primarily due to a great (though slightly outdated) document, Highly Available Snowplow Pipeline on Kubernetes and GCP. However, there are a few key differences in our deployment compared to that document:

  1. We use Terraform to provision our cloud infrastructure, which makes the deployment cleaner, more manageable, and easier to inspect and replicate.

  2. We use Google Cloud’s Workload Identity Federation to authenticate workloads, greatly simplifying the configuration files, as we will demonstrate below.

  3. We also simplify our deployment by using only one service account for different service types. We have consolidated all the required permissions and assigned them to one service account for simplicity. Of course, you can define multiple service accounts. For example, our reference document uses three separate service accounts for better security.

Before we continue, we should also mention that, as of January 8, 2024, Snowplow has changed its license agreement from Apache 2 to its own agreement called Introducing the Snowplow Limited Use License Agreement, which limits how you can use Snowplow’s open source. In particular, it states:

You cannot run this software in production in a highly available manner. For current users of Snowplow Open Source Software who want high availability capabilities in production, you must contact Snowplow for a commercial license. However, SLULA does allow you to run highly available development and QA pipelines that are not for production use

Therefore, in order to use Snowplow in production, we need to use images that were released before January 8, 2024. That is why we are not using the latest versions of the Docker images, as you will see later in this post.

We divide this document to three parts:

  1. Architecture Overview
  2. Infrastructure Deployment
  3. Service Deployment

In the first part, we will briefly go over the architecture and explain the types of services needed for Snowplow to function properly. In the second part, we will explain how to deploy the required infrastructure, such as PubSub topics, subscriptions, and the PostgreSQL databases used by the Iglu server. Finally, in the last part, we will explain how to deploy the services in Kubernetes.

Architecture Overview

Perhaps it’s easiest to start with the reference architecture for Snowplow deployment on GCP. Below is the original diagram from Snowplow:

Snowplow Pipline Architecture

As it can be seen in the diagram, we need to deploy six types of services on our Kubernetes cluster:

  1. Iglu
  2. Collector
  3. Enrich
  4. Streamloader
  5. Repeater
  6. Mutator

We also need to create six PubSub topics:

  1. raw-topic — called snowplow-raw in our implementation
    The Enrich service subscribes to this topic through a subscription with the same name, snowplow-raw. It reads the events, enriches them, and adds them to the enrich-topic.

  2. bad-events-topic — called snowplow-bad-1
    We connect this topic directly to GCS via a subscription called snowplow-bad-1-gcs.

  3. enrich-topic — called snowplow-enriched
    The Streamloader service subscribes to this topic through a subscription with the same name, snowplow-enriched, and writes the events to BigQuery.

  4. new-types-topic — called snowplow-bq-loader-server-types
    In case new columns need to be created in the BigQuery events table, Streamloader adds a message to this topic. The Mutator service subscribes to this topic through a subscription with the same name, snowplow-bq-loader-server-types, and creates the required columns.

  5. failed-inserts-topic — called snowplow-bq-loader-server-failed-inserts
    If Streamloader cannot write to BigQuery, it adds events to this topic. The Repeater service subscribes to this topic through a subscription with the same name, snowplow-bq-loader-server-failed-inserts, to retry inserting them into BigQuery.

  6. bad-rows-topic — called snowplow-bq-bad-rows
    After multiple unsuccessful retries by the Repeater service to write failed events to BigQuery, it writes them to this topic. We connect this topic directly to GCS via a subscription called snowplow-bq-bad-rows-gcs.

In addition, we need:

  1. Kubernetes ingress (Load Balancer)
  2. PostgreSQL for Iglu Server
  3. Three GCS buckets: To write bad rows, failed inserts, etc as mentioned above. We have prefixed them with spangle to make the names unique in the GCS space. Feel free to use your preferred names of course:

    • spangle-snowplow-bad-1
    • spangle-snowplow-bq-bad-rows
    • spangle-snowplow-bq-loader-dead-letter
  4. Service account to allow Snowplow services interaction with GCP resources
  5. Fixed IP address for tracker
  6. BigQuery dataset to hold the events table

In part 2 and part 3 we will overview how to create each of the resources mentioned above.

My First Post

This is my very first post that is going to be deleted after the first commit. I am using Jekyll, a static site generator to generate these pages.

These pages are going to be hosted on GitHub.