CC Open Source Blog

Date-Partitioned Data Reingestion


by Brent Moran on 2020-05-14

CC Catalog is a project that gathers information about images from around the internet, and stores the information so that these images can eventually be indexed in CC Search. A portion of the process is directed by Apache Airflow, which is a tool commonly used to organize workflows and data pipelines.

In this blog post, we will explore the way in which we keep information we gather about images up-to-date, using metadata pulled from the Flickr API as an example case study.

Apache Airflow, and the execution_date concept

Apache Airflow is open source software that loads Directed Acyclic Graphs (DAGs) defined via python files. The DAG is what defines a given workflow. The nodes are pieces of jobs that need to be accomplished, and the directed edges of the graph define dependencies between the various pieces.

A DAG Run is an 'execution' of the overall workflow defined by the DAG, and is associated with an execution_date. Contrary to what one might expect, execution_date does not mean the date when the workflow is executed, but rather the date 'perspective' from which the workflow is executed. This means one can give a command that instructs Airflow to execute the workflow defined by a DAG as if the date were 2019-01-01, regardless of the actual date.

Our Use of execution_date

Much of the data contained in CC Catalog is pulled from various APIs on the internet, and one strategy we use quite regularly is to make a request of the form:

"please give me all the metadata for photos uploaded to Flickr on 2019-01-01".

Since we're often requesting metadata about user-sourced content on 3rd-party sites, some sort of date_uploaded parameter is often available for filtering results in the API provided by the 3rd-party site. This allows us to partition large data-sets into more manageable pieces. It also leads naturally to the strategy of requesting metadata for yesterday's photos, each day:

"please give me all the metadata for photos uploaded to Flickr yesterday".

Doing this each day lets us keep the metadata in our catalog synced with the upstream source (i.e., the Flickr API). This is where the execution_date concept comes in. By default, a workflow which is scheduled to run daily uses the previous day's date as its execution_date, and so an execution that happens on the actual date 2020-02-02 will have execution_date 2020-02-01 by default. This matches up naturally with the strategy above, so we have a number of workflows that ingest (meta)data into CC Catalog using this default execution_date on a daily basis.

Challenge: Data can go stale over time

There are some problems with the strategy outlined above:

Given we're only ingesting metadata about photos the day after they're uploaded, we won't be able to capture the relevant data for any of these situations. So, we need to reingest the metadata for images on some schedule over time.

Reingestion Schedule

We would prefer to reingest the metadata for newer images more frequently, and the metadata for older images less frequently. This is because we assume the metadata for newer images will be updated at the source in more interesting ways when the image is newer. For example, assume a picture is viewed 100 times per month.

month total views % increase
1 100 infinite
2 200 100%
3 300 50%
4 400 33%
5 500 25%
6 600 20%
7 700 17%
8 800 14%
9 900 13%
10 1000 11%
11 1100 10%
12 1200 9%

As we see, given consistent monthly views, the 'percent increase' of the total views metric drops off as the picture ages (In reality, it appears that in most cases, pictures are mostly viewed when they are new).

Thus, it makes sense to focus more on keeping the information up-to-date for the most recently uploaded images.

Real numbers for Flickr

For Flickr, in the worst case, we can ingest about 100 dates' worth of uploaded image metadata per day. This was calculated using the year 2016 as an example. Because 2016 was around the peak for the number of images uploaded to Flickr per day, the actual number if dates' worth of metadata we can ingest per day is quite a bit higher, perhaps 150.

We'll need to choose around 150 dates for each daily run, and reingest the metadata for all images uploaded on each of those dates. We want to choose those dates preferring newer images (for the reasons outlined above), and choose them so that if we follow the same date-choosing algorithm each daily run, we'll eventually reingest the metadata for all images on some predictable schedule.

Strategy to choose which dates to reingest

Assume we'll reingest metadata from some number n of dates on each daily run. We set some maximum number of days D we're willing to wait between reingestion of the data for a given image, subject to the constraint that we need to have n * D > T, where T is the total number of dates for which data exists. For Flickr, there's (at the time of this writing) about 6,000 days for which image metadata was uploaded. If we set

then we have n * D = 150 * 180 = 27,000 > 6,000, as desired. In fact, there is quite a bit of slack in this calculation. Keep in mind, however, that we add one date's worth of metadata as each day passes in real time. Thus, we want to keep some slack here. One option would be to reingest the metadata for each image every 90 days, rather than every 180. This would still leave some slack, and we'd have generally fresher data. This means that on each day, we'd ingest metadata for photos uploaded on that date, as well as metadata for photos uploaded

This is better, but 90 days is still quite a long time to wait to reingest metadata for a recently-uploaded photo. So, it'd be better to use the slack available to reingest metadata for recently-uploaded photos more often, and back off smoothly to reingest metadata for the oldest photos only once every 180 days. We ended up using a schedule where we ingest metadata for photos uploaded on the current execution_date, as well as metadata for photos uploaded

These lists can be generated using the following snippet:

def get_reingestion_day_list_list(*args):
    return [
            args[i][0] * (j + 1) + sum(arg[0] * arg[1] for arg in args[:i])
            for j in range(args[i][1])
        for i in range(len(args))

    (1, 7),
    (7, 12),
    (15, 20),
    (30, 24),
    (90, 24),
    (180, 40)

This function creates a list of lists of integers based on input pairs describing which prior dates to ingest. An approximate interpretation of the input pairs in this example would be

The astute reader will notice that these lists only define 128 dates (including the current date) for which metadata should be reingested. We prefer to be a bit conservative on the total amount we plan to ingest per day, since things can happen that put the ingestion workflow DAG out of service for some time on occasion.

So, using this strategy, we ensure that all metadata is updated at least every 6 months, with a preference towards metadata about images uploaded recently. Because this schedule covers about 28.7 years back in time, this strategy should suffice to reingest all relevant Flickr data for the next 12 years or so (the current date is 2020).

For more context around what we've shown here, please take a look at the CC Catalog repo.