Skip to main content

Click Through Rate Analysis using Spark

Introduction

In recent years, programmatic advertising is been taking over the online advertisement industry. To enable automatic selling and purchasing ad impressions between advertisers and publishers through real-time auctions, Real-Time Bidding (RTB) is quickly becoming the leading method.

In contrast to the traditional online ad market, where a certain amount of impressions is sold at a fixed rate, RTB allows advertisers to bid each impression individually in real time at a cost based on impression-level features. Real-time Bidding (RTB) is a way of transacting media that allows an individual ad impression to be put up for bid in real-time. This is done through a programmatic on-the-spot auction, which is similar to how financial markets operate. RTB allows for Addressable Advertising; the ability to serve ads to consumers directly based on their demographic, psychographic, or behavioral attributes.

Many DSPs (Demand Side Platforms) act as agents for the advertisers and take part in the real-time auction on behalf of them. In order to enable real-time bidding and provide the advertisers with the clicks at the lowest price possible, DSPs develop their own machine learning algorithms using techniques such as hashing trick, feature combinations, stochastic gradient descent etc.

Motivation

Like the standard practice in most of the data science use cases, whenever a new algorithm is developed, they are put into A/B test against the already existing algorithm in the production (at least for few days) in order to do determine which algorithm suits the business metrics better.

Due to the huge volume of bid requests (around a million bid requests per second), the amount of data collected during AB is in the order of 100 GBs. Python's Pandas library has basically all the functionality needed to do the offline analysis of the data collected in terms of CPCs, spend, clicks, CTR, AUC etc.

But, Pandas has a huge problem, it has to load all the dataset in memory in order to run some computations on it. From my experience, Pandas needs the RAM size to be 3 times the size of the dataset and it can not be run into a distributed environment as cluster a of machines. This is where Apache Spark is useful as it can process the datasets whose size is more than the size of the RAM. This blog will not cover the internals of Apache Spark and how it works rather I will jump to how the Pandas CTR Analysis code can be easily converted into spark analysis with few syntax changes.

Migrating to Spark from Pandas

In new versions, Spark started to support Dataframes which is conceptually equivalent to a dataframe in R/Python. Dataframe support in Spark has made it comparatively easy for users to switch to Spark from Pandas using a very similar syntax. In this section, I would jump to coding and show how the CTR analysis that is done in Pandas can be migrated to Spark.

Before I jump into the coding, I would like to introduce some of the keywords used in the code:

Effective CPC: Total money spent / Total number of clicks

Label: It is either 0 or 1 (1 signifies that the click happened and 0 is for no click)

Win Price: The price paid to win the on-spot auction

Bid CPC: The price the advertiser is willing to pay for the impression

CTR: Click Through Rate = Total Number of Clicks / Total Number of Impressions

How is Win Price different from Bid Price?

If an exchange is using First Price Auction, the win pice and the bid price is same but if the exchange is using Second Price Auction, the advertizer with the highest bid price wins but it pays the price equivalent to the second highest bid price hence the win price is less than the bid price.

Setting up notebook and importing libraries

Pandas
import pandas as pd
Spark
import os
import sys
os.environ['SPARK_HOME'] = "/home/spark-2.3.2-bin-hadoop2.7"
os.environ['JAVA_HOME'] = "/home/jdk1.8.0_181"
spark_home = os.environ.get("SPARK_HOME")
sys.path.insert(0, spark_home + "/python")
sys.path.insert(0, os.path.join(spark_home, "python/lib/py4j-0.10.7-src.zip"))

from  pyspark import SparkContext
from pyspark.conf import SparkConf
CLUSTER_URL = "spark://address:7077"
conf = SparkConf()
conf.setMaster(CLUSTER_URL).setAppName("CTR Analysis").set("spark.executor.memory", "120g")
sc = SparkContext(conf=conf)
print(sc.version)

Reading CSV File

Pandas
df = pd.read_csv(data_file_path, names=cols, error_bad_lines=False, warn_bad_lines=True, sep=',')
Spark
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
file_location = "hdfs://address:port/hadoop/dataNode/pyspark/data.csv"
df = spark.read.csv(file_location, header=False, schema=schema, sep=",")
# df = spark.read.csv(file_location, header=False, inferSchema=True, sep=",")
# df.cache()

Cleaning data

Pandas
numeric_cols = ['label','win_price','ctr','bid_cpc']
df[numeric_cols] = df[numeric_cols].convert_objects(convert_numeric=True)
df.dropna(inplace=True, subset=numeric_cols)
Spark
numeric_cols = ['label','win_price','ctr','bid_cpc']
df = df.dropna(subset=numeric_cols)

Calculating Spend, CTR, CPC Per Algo

Pandas
data = df.groupby(['algo']).agg({'win_price': np.sum, c_label:{'clicks':np.sum, 'wins':'count'}}).reset_index()
data[('win_price','sum')] = data[('win_price','sum')] / 1000.
data['ecpc'] = data[('win_price','sum')] / data[('label,'clicks')]
data = pd.DataFrame(data.to_records())
data.columns = ['',algo, 'spend', 'number of impressions', 'number of clicks', 'effective cpc']
Spark
from pyspark.sql.functions import udf
import pyspark.sql.functions as f

def divide_by_1000(x):
    return x/1000.0

udfdivide_by_1000 = udf(divide_by_1000, DoubleType())

data_wins = df.groupby(['algo']).agg( {'win_price': 'sum', 'label: 'count'})
data_clicks = df.groupby(['algo']).agg({'label: 'sum'})
# print data_wins.columns
# print data_clicks.columns
# Rename the columns
data_wins = data_wins.withColumnRenamed("sum(win_price)", "win_price").withColumnRenamed("count(label)", "wins")
#     print data_wins.schema
#     data_wins['win_price'] = data_wins.win_price/1000.
data_wins = (data_wins.withColumn("win_price",udfdivide_by_1000("win_price")))
data_clicks = data_clicks.withColumnRenamed("sum(label)", "clicks")
#     print data_wins.columns
# print data_clicks.columns
data = data_wins.join(data_clicks, on = 'algo', how='inner')
data = (data.withColumn("effective cpc", f.col("win_price")/100))