CC Open Source Blog
Date-Partitioned Data Reingestion
CC Catalog is a project that gathers information about images from around the internet, and stores the information so that these images can eventually be indexed in CC Search. A portion of the process is directed by Apache Airflow, which is a tool commonly used to organize workflows and data pipelines.
In this blog post, we will explore the way in which we keep information we gather about images up-to-date, using metadata pulled from the Flickr API as an example case study.
Apache Airflow, and the
Apache Airflow is open source software that loads Directed Acyclic Graphs (DAGs) defined via python files. The DAG is what defines a given workflow. The nodes are pieces of jobs that need to be accomplished, and the directed edges of the graph define dependencies between the various pieces.
A DAG Run is an 'execution' of the overall workflow defined by
the DAG, and is associated with an
execution_date. Contrary to what one might
execution_date does not mean the date when the workflow is executed,
but rather the date 'perspective' from which the workflow is executed. This
means one can give a command that instructs Airflow to execute the workflow
defined by a DAG as if the date were 2019-01-01, regardless of the actual date.
Our Use of
Much of the data contained in CC Catalog is pulled from various APIs on the internet, and one strategy we use quite regularly is to make a request of the form:
"please give me all the metadata for photos uploaded to Flickr on 2019-01-01".
Since we're often requesting metadata about user-sourced content on 3rd-party
sites, some sort of
date_uploaded parameter is often available for filtering
results in the API provided by the 3rd-party site. This allows us to partition
large data-sets into more manageable pieces. It also leads naturally to the
strategy of requesting metadata for yesterday's photos, each day:
"please give me all the metadata for photos uploaded to Flickr yesterday".
Doing this each day lets us keep the metadata in our catalog synced with the
upstream source (i.e., the Flickr API). This is where the
concept comes in. By default, a workflow which is scheduled to run daily uses
the previous day's date as its
execution_date, and so an execution that
happens on the actual date 2020-02-02 will have
execution_date 2020-02-01 by
default. This matches up naturally with the strategy above, so we have a number
of workflows that ingest (meta)data into CC Catalog using this default
execution_date on a daily basis.
Challenge: Data can go stale over time
There are some problems with the strategy outlined above:
- What if a photo changes upstream?
- What if a photo is deleted upstream?
- What about metadata that changes over time (e.g., 'views')?
Given we're only ingesting metadata about photos the day after they're uploaded, we won't be able to capture the relevant data for any of these situations. So, we need to reingest the metadata for images on some schedule over time.
We would prefer to reingest the metadata for newer images more frequently, and the metadata for older images less frequently. This is because we assume the metadata for newer images will be updated at the source in more interesting ways when the image is newer. For example, assume a picture is viewed 100 times per month.
|month||total views||% increase|
As we see, given consistent monthly views, the 'percent increase' of the total views metric drops off as the picture ages (In reality, it appears that in most cases, pictures are mostly viewed when they are new).
Thus, it makes sense to focus more on keeping the information up-to-date for the most recently uploaded images.
Real numbers for Flickr
For Flickr, in the worst case, we can ingest about 100 dates' worth of uploaded image metadata per day. This was calculated using the year 2016 as an example. Because 2016 was around the peak for the number of images uploaded to Flickr per day, the actual number if dates' worth of metadata we can ingest per day is quite a bit higher, perhaps 150.
We'll need to choose around 150 dates for each daily run, and reingest the metadata for all images uploaded on each of those dates. We want to choose those dates preferring newer images (for the reasons outlined above), and choose them so that if we follow the same date-choosing algorithm each daily run, we'll eventually reingest the metadata for all images on some predictable schedule.
Strategy to choose which dates to reingest
Assume we'll reingest metadata from some number
n of dates on each daily run.
We set some maximum number of days
D we're willing to wait between reingestion
of the data for a given image, subject to the constraint that we need to have
n * D > T, where
T is the total number of dates for which data exists. For
Flickr, there's (at the time of this writing) about 6,000 days for which image
metadata was uploaded. If we set
n = 150
D = 180
then we have
n * D = 150 * 180 = 27,000 > 6,000, as desired. In fact, there
is quite a bit of slack in this calculation. Keep in mind, however, that we add
one date's worth of metadata as each day passes in real time. Thus, we want to
keep some slack here. One option would be to reingest the metadata for each
image every 90 days, rather than every 180. This would still leave some slack,
and we'd have generally fresher data. This means that on each day, we'd ingest
metadata for photos uploaded on that date, as well as metadata for photos
- 90, 180, 270, 360, ..., 13320, or 13410 days prior to the current date.
This is better, but 90 days is still quite a long time to wait to reingest
metadata for a recently-uploaded photo. So, it'd be better to use the slack
available to reingest metadata for recently-uploaded photos more often, and back
off smoothly to reingest metadata for the oldest photos only once every 180
days. We ended up using a schedule where we ingest metadata for photos uploaded
on the current
execution_date, as well as metadata for photos uploaded
- 1, 2, ..., 6, or 7 days prior;
- 14, 21, ..., 84, or 91 days prior;
- 106, 121, ..., 376, or 391 days prior;
- 421, 451, ..., 1081, or 1111 days prior;
- 1201, 1291, ..., 3181, or 3271 days prior; and
- 3451, 3631, ..., 10291, or 10471 days prior.
These lists can be generated using the following snippet:
def get_reingestion_day_list_list(*args): return [ [ args[i] * (j + 1) + sum(arg * arg for arg in args[:i]) for j in range(args[i]) ] for i in range(len(args)) ] get_reingestion_day_list_list( (1, 7), (7, 12), (15, 20), (30, 24), (90, 24), (180, 40) )
This function creates a list of lists of integers based on input pairs describing which prior dates to ingest. An approximate interpretation of the input pairs in this example would be
- Ingest data which is at most a week old daily.
- Ingest data which is between a week and three months old weekly.
- Ingest data which is between three months and a year old biweekly.
- Ingest data which is between one and three years old monthly.
- Ingest data which is between three and nine years old every three months.
- Ingest data which is between nine and twenty-eight years old every six months.
The astute reader will notice that these lists only define 128 dates (including the current date) for which metadata should be reingested. We prefer to be a bit conservative on the total amount we plan to ingest per day, since things can happen that put the ingestion workflow DAG out of service for some time on occasion.
So, using this strategy, we ensure that all metadata is updated at least every 6 months, with a preference towards metadata about images uploaded recently. Because this schedule covers about 28.7 years back in time, this strategy should suffice to reingest all relevant Flickr data for the next 12 years or so (the current date is 2020).
For more context around what we've shown here, please take a look at the CC Catalog repo.