Working with Enriched Events data using Spark SQL

In this Lab we'll learn how to work with Enriched Event data using PySpark and Spark SQL.

This lab contains a set of simple, useful queries for working with this dataset. These queries can help you answer questions like

  • How many visitors were tested in my experiment?
  • How many "unique conversions" of an event were attributed to this variation?
  • What is the total revenue attributed to this variation?

This Lab covers some of the basics of working with event-level experiment data. There are many more useful questions may want to answer with experiment data. Future tutorials will go deeper on the topic.

This guide borrows some initialization code from the Spark SQL getting started guide.

Creating a Spark Session

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .config("spark.sql.repl.eagerEval.truncate", 100) \
    .getOrCreate()

Loading Enriched Event data

The OPTIMIZELY_DATA_DIR environment variable may be used to specify the local directory where Enriched Event data is stored. If, for example, you've downloaded Enriched Event data and saved it in optimizely_data in your home directory, you can load that data in this notebook by executing the following command before launching Jupyter Lab:

$ export OPTIMIZELY_DATA_DIR=~/optimizely_data

If OPTIMIZELY_DATA_DIR is not set, data will be loaded from ./data in your working directory.

import os

base_data_dir = os.environ.get("OPTIMIZELY_DATA_DIR", "./example_data")

def read_data(path, view_name):
    """Read parquet data from the supplied path and create a corresponding temporary view with Spark."""
    spark.read.parquet(path).createOrReplaceTempView(view_name)

Enriched Event data is partitioned into two distinct datasets: decisions and conversions.

We'll load decision data from the type=decisions directory in the base data directory.

decisions_dir = os.path.join(base_data_dir, "type=decisions")
read_data(decisions_dir, "decisions")

We'll load conversion data from the type=events directory in the base data directory.

events_dir = os.path.join(base_data_dir, "type=events")
read_data(events_dir, "events")

Querying our data

Now that we've loaded our data, we can query it using the sql() function. Here's an example on our decisions view:

spark.sql("""
    SELECT
        *
    FROM decisions
    LIMIT 1
""")
uuidtimestampprocess_timestampvisitor_idsession_idaccount_idcampaign_idexperiment_idvariation_idattributesuser_ipuser_agentrefereris_holdbackrevisionclient_engineclient_version
F4F1EF48-6BC2-4153-A1DA-C29E39B772F92020-05-25 15:27:33.0852020-05-25 15:29:21.197visitor_1590445653085-1235693267596780373181499400061815694340918174970251[[100,, browserId, ie], [300,, device, iphone], [600,, source_type, direct], [200,, campaign, fre...75.111.77.0Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81....https://app.optimizely.com/falsenullricky/fakedata.pwned1.0.0

Here's an example on our events view:

spark.sql("""
    SELECT
        *
    FROM events
    LIMIT 1
""")
uuidtimestampprocess_timestampvisitor_idsession_idaccount_idexperimentsentity_idattributesuser_ipuser_agentrefererevent_typeevent_namerevenuevaluequantitytagsrevisionclient_engineclient_version
235ABEC8-C9A1-4484-94AF-FB107524BFF82020-05-24 17:34:27.4482020-05-24 17:41:59.059visitor_1590366867448-1274245065596780373[[18128690585, 18142600572, 18130191769, false]]15776040040[[100,, browserId, ff], [300,, device, ipad], [600,, source_type, campaign], [200,, campaign, fre...174.222.139.0Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81....https://app.optimizely.com/nulladd_to_cart10001000.00001null[]nullricky/fakedata.pwned1.0.0

Useful queries

Next we'll cover some simple, useful queries for working with Optimizely's Enriched Event data.

These queries are parameterized with the following values:

# The analysis window for your queries.  Change these values if you wish to restrict the event data included in your
# queries
start = "2010-01-01 00:00:00"
end = "2050-12-31 23:59:59"

Counting the unique visitors in an Optimizely Web experiment

Optimizely Web and Optimizely Full Stack experiment results pages count unique visitors in slightly different ways.

Given a particular analysis time window (between start and end) Optimizely Web attributes all visitors who were exposed to a variation at any time between when the experiment started and end and sent any event (decision or conversion) to Optimizely between start and end.

The following query captures that attribution logic:

# Count the unique visitors from all events (Optimizely Web)

spark.sql(f"""
    SELECT 
        experiment_id,
        variation_id,
        COUNT (distinct visitor_id) as `Unique visitors (Optimizely Web)`
    FROM (
        SELECT
            exp.experiment_id as experiment_id,
            exp.variation_id as variation_id,
            visitor_id
        FROM events
        LATERAL VIEW explode(experiments) t AS exp
        WHERE timestamp between '{start}' AND '{end}'
        UNION
        SELECT
            experiment_id,
            variation_id,
            visitor_id
        FROM decisions
        WHERE
            timestamp between '{start}' AND '{end}'
            AND is_holdback = false
        )
    GROUP BY
        experiment_id,
        variation_id
    ORDER BY
        experiment_id ASC,
        variation_id ASC
""")
experiment_idvariation_idUnique visitors (Optimizely Web)
1814260057218130191769972
1814260057218159011346963
18156943409181126130004487
18156943409181749702514514

A note on timestamp vs process_timestamp: If you're working on re-computing the numbers you see on your experiment results page, it's important to understand the difference between the timestamp and process_timestamp fields in your Enriched Events data.

  • timestamp contains the time set by the client, e.g. the Optimizely Full Stack SDK
  • process_timestamp contains the approximate time that the event payload was received by Optimizely

The difference is important because Enriched Event data is partitioned by process_timestamp, but Optimizely results are computed using timestamp. This allows clients to send events retroactively, but also means that depending on your implementation you may need to load a wider range of data in order to ensure that you've captured all of the events with a timestamp in your desired analysis range.

Counting the unique visitors in an Optimizely Full Stack experiment

The Full Stack attribution model is a little simpler:

Given a particular analysis time window (between start and end) Full Stack attributes all visitors who were exposed to a variation at any time between start and end. We measure this by counting the unique visitor_ids in the decisions dataset for that experiment:

# Count the unique visitors from decisions (Optimizely Full Stack)

spark.sql(f"""
    SELECT
        experiment_id,
        variation_id,
        COUNT(distinct visitor_id) as `Unique visitors (Full Stack)`
    FROM decisions
    WHERE
        timestamp between '{start}' AND '{end}'
        AND is_holdback = false
    GROUP BY
        experiment_id,
        variation_id
""")
experiment_idvariation_idUnique visitors (Full Stack)
18156943409181749702514514
18156943409181126130004487

Counting conversions in an Optimizely Web experiment

When it comes to counting conversions, Optimizely Full Stack and Optimizely Web do things a little differently.

Given a particular analysis time window (between start and end) Optimizely Web will attribute an event to a particular variation if the visitor who triggered that event was exposed to the variation at any time prior to that event, even if it was before the beginning of the analysis time window.

Optimizely event data is enriched with a an attribution column, experiments, that lists all of the experiments and variations to which an event has been attributed. Since Optimizely Web does not require that a corresponding decision take place during the analysis window, we can use a simple query to count the number of attributed conversions during our analysis window.

# Count the unique conversions of a particular event attributed to an experiment

spark.sql(f"""
    SELECT 
        exp.experiment_id as experiment_id,
        exp.variation_id as variation_id,
        event_name,
        COUNT(1) as `Conversion count (Optimizely Web)`
    FROM events
    LATERAL VIEW explode(experiments) t AS exp
    WHERE 
        timestamp between '{start}' AND '{end}'
    GROUP BY
        experiment_id, variation_id, event_name
    ORDER BY
        experiment_id DESC,
        variation_id DESC,
        event_name ASC
""")
experiment_idvariation_idevent_nameConversion count (Optimizely Web)
1815694340918174970251add_to_cart2655
1815694340918112613000add_to_cart2577
1814260057218159011346add_to_cart2919
1814260057218130191769add_to_cart2958

Counting conversions in an Optimizely Full Stack experiment

Given a particular analysis time window (between start and end) Optimizely Full Stack will attribute an event to a particular variation if the visitor who triggered that event was exposed to the variation prior to that event and during the analysis window.

Since Optimizely Full Stack requires that a corresponding decision take place during the analysis window, the query required to attribute events to experiments and variation is more complex.

spark.sql(f"""
    SELECT 
        experiment_id,
        variation_id,
        event_name,
        COUNT (1) as `Conversion count (Optimizely Full Stack)`
    FROM (
         SELECT 
             d.experiment_id,
             d.variation_id,
             e.event_name,
             e.visitor_id
         FROM events e
         INNER JOIN 
         (
            SELECT 
                experiment_id,
                variation_id,
                visitor_id,
                MIN(timestamp) as decision_timestamp
            FROM decisions
            WHERE 
                timestamp between '{start}' AND '{end}'
                AND is_holdback = false
            GROUP BY
                experiment_id,
                variation_id,
                visitor_id
         ) d 
         ON e.visitor_id = d.visitor_id
         WHERE
             e.timestamp  between '{start}' AND '{end}'
             AND e.timestamp >= d.decision_timestamp
    )
    GROUP BY
         experiment_id,
         variation_id,
         event_name
    ORDER BY
         experiment_id DESC,
         variation_id ASC
""")
experiment_idvariation_idevent_nameConversion count (Optimizely Full Stack)
1815694340918112613000add_to_cart2577
1815694340918174970251add_to_cart2655

How to run this notebook

This notebook lives in the Optimizely Labs repository. You can download it and everything you need to run it by doing one of the following

Once you've downloaded this Lab directory (on its own, or as part of the Optimizely Labs repository, follow the instructions in the README.md file for this Lab.

>