How Tyra helped us train our user models in minutes rather than days
All of TellApart’s products are powered by a machine-learning pipeline that calculates Customer Quality Scores (CQS). This is our secret sauce. Without CQS we don’t know the value of a user and don’t know how much to bid on ads for that user. It’s what makes TellApart’s engine churn. But recently, we noticed that with our growing volume of data we were going to need to add more horsepower…
Modeling pipeline to calculate CQS
CQS are calculated in real-time as users navigate the web and allow our algorithms to make decisions ranging from bidding on display advertisements to selecting what content we ultimately serve. The two main challenges in calculating these scores are:
- Processing huge amounts of offline historical data with which to train statistical models that predict future user behavior - these predictions are what make up the CQS.
- Making predictions fast enough to meet strict latency constraints in the real-time bidding (RTB) ad exchanges in which we participate.
In this post, we discuss recent work to address the first challenge. We’ve had a map-reduce-based modeling pipeline in place since the earliest days of TellApart, but our rapidly growing datasets have proven it to be unscalable and re-working was urgently needed.
The steps in our pipeline are (1): a feature extraction step to produce consolidated representations for each event and (2): a training and validation step to produce high quality statistical models.
Our new system, called Tyra, is a re-design of step (1) above. Traditionally we call our modeling codebase “runway”, and our new system is named Tyra because Tyra trains the runway models! (ba-dum ch)
This blog post aims to share some of the lessons we learned while designing Tyra and to describe the ultimate system we produced. Our desiderata were:
- Scalability: we are experiencing hockey-stick growth in data volume and want to continue training models with large feature sets without restricting the historical signals we can include.
- Rapid feature exploration: we experiment with new features, and these evaluations should be fast and require minimal reprocessing of old data.
- Speed: we are nimble and move quickly so we should be able to re-train models on the latest data weekly or even daily.
Background: Feature Extraction
Each data point used in model training corresponds to a recorded event for one of our users. For example, two types of events are ad events (e.g. the user viewed one of our ads) and product view events (e.g. the user viewed a pair of shoes on one of our clients’ sites).
Our characterization of an event can be broken into two parts: features and a class label. Features are pieces of data about the event itself or about the user’s history, for example, time of day the event occurred or number of ads we have shown the user in the past 30 days. Class labels represent facets of customer quality, for example, propensity to click on ads (e.g. class label of click/non-click) or to purchase products (e.g. class label of purchase/non-purchase).
We can visualize the pairing of features with a class label in the model-training pipeline as such:
The events themselves are stored as flat files on Amazon S3, and are organized by event type and date. The feature extraction pipeline is a map reduce job that processes these logs. At the end of the feature extraction step, we will have produced a list of features for each event.
Sound simple? To give a sense of the scale we are working at, one model we trained a few days ago used roughly 10 billion events!
Immediate and Historical Features
Many features we can simply extract from the row itself, such as the time of day or the website on which we displayed an ad. These features we call immediate features, and are straightforward to extract directly from events — because they depend only on that event itself.
The more interesting features, however, are historical features that depend on user behavior over time. For example, one historical feature is: number of ads shown to this user in the past 30 days. This is much harder to track for millions of users across billions of events. The biggest challenge to scalability in the modeling pipeline is extracting the massive amount of these historical features.
Extracting Historical Features
There are two general approaches to extracting historical features from logs:
- Retain the historical context within each event’s log entry:
We save a snippet of that user’s history in log entries, so this approach requires no examination of historical data. The feature extraction pipeline simply sweeps over log entries and outputs the feature values. The disadvantage of this approach, however, is that it is limited to features that can be derived from the historical data we chose to put in the logs at the time that the event occurred. Adding a new historical feature requires augmenting the information recorded at each log event and waiting to collect training data.
- The alternative approach is to sweep over past events in a user’s history to extract necessary historical information.
The advantage of this approach is flexibility: any desired historical feature can be added at any time, and does not require foresight when logging event data. The disadvantage is that a huge amount of historical data must be processed.
Our old feature extraction pipeline used approach (2) and has served us well until recently, but has outlived its practicality when extracting features for two use cases:
- Production models: extraction time scaled with the time window of the features. For example: extracting 30 days of training data with 30-day trailing window historical features required 60 days of log entries.
- Models exploring new features: because historical context was not cached in any way, adding a new feature requires re-running the entire flow which could take more than 12 hours to run.
Our new system, Tyra, is designed to retain the best parts of these two approaches.
Tyra Feature Extraction Pipeline
The central idea behind Tyra is to get the advantages of approach (1) above with cached historical context while retaining the flexibility of (2) by being able to easily add new features.
We accomplish this by computing and storing historical summaries. For every day, for every user who interacted on that day, and for every historical feature type, we store a summary containing all historical information necessary for extracting this feature on future days. For example, the feature number of pages viewed in the last month has an associated summary that counts total pages viewed by each user on each day.
This system has some nice properties:
- Summaries take the place of re-processing past events and are updated on a daily basis.
- We can retroactively add new historical feature types by adding new summary types to the data store.
Tyra consists of three steps:
- Summarize - each day, sweep over all events of that day to produce a summary object for each user and each feature type.
- Combine summaries - sweep over summaries from prior days and combine to produce a user state for each feature type.
- Extract - begin with the user state for previous day and output successive features for each event on this day (updating an intra-day state as we go).
Tyra is implemented in Java using Cascading as an abstraction layer to Hadoop running in production our AWS Elastic Map Reduce cluster. It currently churns out roughly 100 million summaries and over 50 billion individual feature values to our data store every day. The features are accessible via Hive and via our custom Cascading-based tool that supports sub-sampling in order to facilitate data exploration.
As we grow as a company, we have an increasing demand to train new models and run experiments with new features. Our old system was holding us back and would soon have become untenable. This work has transitioned us from a monolithic feature extraction pipeline that churned out months of features in one shot, to an incremental and persistent data pipeline that avoids repeated computation. With Tyra we can now easily and quickly retrieve subsets of features, train models, and add new historical features. To give a sense of the savings, we can now train models in minutes as opposed to the 12 to 24 hours it previously required.
The only downside is that with our experiments running hundreds of times faster we have less free time waiting around to take in the finer things in life like watching re-runs of America’s next top model. On the other hand, with all the saved AWS fees maybe we can invite Tyra Banks to our next happy hour.
- State vs summary: it would be possible to skip the summarization step and instead store and update user states directly. This would avoid the need to combine summaries over multiple days. However, storing summaries has the advantage of maintaining independence between days while tracking state has the disadvantage of allowing data integrity issues to cascade forward in time. That being said, we may consider a pure user-state-based system in the future if we start incorporating longer-term historical features.
- Right now, our summaries are stored in flat binary files with no indexing. While this is acceptable for model training, we are considering moving to HBase in the near future to make this user-centric data more pliable.
- When implementing Tyra we found it best to use as few reduce steps as possible. This usually required adding dummy values to reduce keys and sorting so that some rows reach reducers before others. In Cascading, this comes down to using GroupBy for dissimilar pipes rather than CoGroup.
- We found it extremely helpful to keep the number of files and distinct directories as low as possible — this decreases HDFS copy time significantly. One simple way to achieve this is to use a lower number of reducers for jobs writing persistent data.
Tyra was built by TellApart Engineers Jeshua Bratman and Nick Pisarro.