Using Higher-Order Containers to Efficiently Process 7,163 (or More) DataFrames

DataFrame processing routines commonly work with collections of tables. Examples of such collections include a multi-year dataset with a single table per year, historical stock data with a table per ticker, or data from multiple sheets in an XLSX file. This article introduces novel “higher-order” containers for working with such collections of DataFrames, implemented in the Python StaticFrame package (a Pandas alternative that offers immutable DataFrames). The three core containers are the Bus, Batch, and Quilt. A fourth container, the Yarn, will be briefly introduced.

An alternative to these higher-order containers is using one large table with a hierarchical index. For example, time-series data for multiple stocks might be encoded in a single table with a two-depth hierarchical index, the outer level being the ticker, the inner level being the date. Such an approach is often inefficient, as the entire table must be loaded in memory even if only a small number of stocks are being processed.

This article introduces containers for working with large collections of tables, from thousands to hundreds of thousands of tables. Efficient memory usage is provided through lazy loading and, optionally, eager unloading. The Bus (named after buses used in circuits), provides a dictionary-like interface for lazily loading collections of tables stored on disk; collections can be stored in SQLite, HDF5, XLSX, or in zipped archives of Parquet, NPZ, pickle, or delimited text files. The Batch (named after batch processing), is a deferred processor of tables, providing a concise interface to lazily define operations to be applied to all tables. The Quilt (named after textiles formed from a patchwork), is a lazy, virtual concatenation of all tables, permitting operations on partitioned data as if it was a unified, single table.

All three containers provide identical interfaces for reading from, and writing to, the multi-table storage formats mentioned above (SQLite, HDF5, XLSX, or zipped archives of Parquet, NPZ, pickle, or delimited text files). This uniformity permits reusing the same data store in different contexts.

These tools evolved from the context of my work: processing financial data and modelling investment systems. There, datasets are naturally partitioned by date or characteristic. For historical simulations, the data needed can be large. The Bus, Batch, Quilt, and Yarn have provided convenient and efficient tools in this domain. Out-of-core solutions like Vaex and Dask offer related approaches to dealing with large collections of data, though with different tradeoffs.

While these containers are implemented in StaticFrame, the abstractions are useful for application in any DataFrame or table-processing library. StaticFrame calls a DataFrame simply a “Frame,” and that convention will be used herein. StaticFrame is imported with the following convention:

>>> import static_frame as sf

Container Overview

Before demonstrating the utility of using these containers to process thousands of DataFrames, we will start with processing just two DataFrames. After creating a Bus with two Frames, we will use that same Bus to initialize a Batch, Quilt, and Yarn. With this introduction, common and distinguishing characteristics can be observed.

Bus

Two simple Frames can be used to demonstrate initializing a Bus. The Bus.from_items() method takes pairs of label and Frame; items can be provided in a tuple (as shown below) or via an items() method on a Python dictionary or related container.

>>> f1 = sf.Frame.from_element(0.5, index=('w', 'x'), columns=('a', 'b'))
>>> f2 = sf.Frame.from_element(2, index=('y', 'z'), columns=('a', 'b'))
>>> bus = sf.Bus.from_items((('f1', f1), ('f2', f2)))
>>> bus
<Bus>
<Index>
f1      Frame
f2      Frame
<<U2>   <object>

The Bus can be thought of as a Series (or an ordered dictionary) of Frames, permitting access to a Frame given its label.

>>> bus.loc['f2']
<Frame: f2>
<Index>     a       b       <<U1>
<Index>
y           2       2
z           2       2
<<U1>       <int64> <int64>

A key feature of the Bus is that, when reading from disk, Frames are loaded lazily: a Frame is loaded into memory only when accessed, and (with the max_persist argument) the Bus can be configured to only hold strong references to a limited number of Frames, eagerly unloading the least-recently used beyond that limit. This permits examining all Frames while limiting the total memory loaded by the Bus.

As the Bus supports reading from, and writing to, XLSX and HDF5 (as well as many other formats), it provides the functionality of the Pandas ExcelWriter and HDFStore interfaces, but with a more general and consistent interface. The same Bus can be used to write an XLSX workbook (where each Frame is a sheet) or an HDF5 datastore, simply by using Bus.to_xlsx() or Bus.to_hdf5(), respectively.

Additionally, a Bus serves as a convenient resource for creating a Batch, Quilt, or Yarn.

Batch

The Batch can be thought of as an iterator of pairs of label and Frame. Beyond just an iterator, the Batch is a tool for composing deferred operations on each contained Frame. The Batch exposes nearly the entire Frame interface; method calls and operator applications, when invoked, are deferred in a newly returned Batch, composing lazy executions upon the stored iterator. Operations are executed, and pairs are iterated, only when creating a composite Frame with the Batch.to_frame() method, or using dictionary-like iterators such as Batch.keys(), Batch.items(), or Batch.values.

A Batch can be initialized with items from a Bus, or any iterator of pairs of label, Frame. Methods called from, or operators invoked on, a Batch simply return a new Batch. Calling Batch.to_frame(), as shown below, is necessary to eagerly execute the composed sum() operation.

>>> sf.Batch(bus.items()).sum()
<Batch at 0x7fabd09779a0>
>>> sf.Batch(bus.items()).sum().to_frame()
<Frame>
<Index> a         b         <<U1>
<Index>
f1      1.0       1.0
f2      4.0       4.0
<<U2>   <float64> <float64>

In addition to Frame methods, the Batch supports usage of Frame selection interfaces and operators. Below, each Frame is taken to the second power, the “b” column is selected, and a new Frame (combining both selections) is returned:

>>> (sf.Batch(bus.items()) ** 2)['b'].to_frame()
<Frame>
<Index> w         x         y         z         <<U1>
<Index>
f1      0.25      0.25      nan       nan
f2      nan       nan       4.0       4.0
<<U2>   <float64> <float64> <float64> <float64>

The Batch is related to the Pandas DataFrameGroupBy and Rolling objects, interfaces that, after configuring a group-by or rolling window iterable, expose function application on those groups or windows. The Batch generalizes this functionality, supporting those contexts as well as offering general-purpose processing of any iterator of labels and Frames.

Quilt

A Quilt is initialized with a Bus (or Yarn), and requires specification of which axis to virtually concatenate on, either vertically (axis 0) or horizontally (axis 1). Additionally, a Quilt must define a Boolean for retain_labels: if True, Frame labels are retained as the outer labels in a hierarchical index along the axis of concatenation. If retain_labels is False, all labels along the axis of concatenation of all contained Frames must be unique. The following examples use the previously created Bus to demonstrate the retain_labels parameter. As a Quilt might be built from thousands of tables, the default representation abbreviates data; Quilt.to_frame() can be used to provide a fully realized representation.

>>> quilt = sf.Quilt(bus, axis=0, retain_labels=False)
>>> quilt
<Quilt>
<Index: Aligned>      a b <<U1>
<Index: Concatenated>
w                     . .
x                     . .
y                     . .
z                     . .
<<U1>
>>> quilt.to_frame()
<Frame>
<Index> a         b         <<U1>
<Index>
w       0.5       0.5
x       0.5       0.5
y       2.0       2.0
z       2.0       2.0
<<U1>   <float64> <float64>
>>> quilt = sf.Quilt(bus, axis=0, retain_labels=True)
>>> quilt.to_frame()
<Frame>
<Index>                a         b         <<U1>
<IndexHierarchy>
f1               w     0.5       0.5
f1               x     0.5       0.5
f2               y     2.0       2.0
f2               z     2.0       2.0
<<U2>            <<U1> <float64> <float64>

The Quilt can be thought of as a Frame built from many smaller Frames, aligned either vertically or horizontally. Importantly, this larger Frame is not eagerly concatenated; rather, Frames are accessed from a contained Bus as needed, providing a lazy concatenation of tables along an axis.

A Bus within a Quilt can be configured with the max_persist argument to limit the total number of Frames held in memory. Such explicit memory management permits doing operations on a virtual Frame that might be too large to load into memory.

The Quilt permits selections, iterations, and operations on this virtually concatenated Frame using a subset of common Frame interfaces. For example, a Quilt can be used for iterating rows and applying functions:

>>> quilt.iter_array(axis=1).apply(lambda a: a.sum())
<Series>
<Index>
w        1.0
x        1.0
y        4.0
z        4.0
<<U1>    <float64>

Yarn

The Yarn, only briefly described here, provides a “virtual concatenation” of one or more Bus. As with the Quilt, the larger container is not eagerly concatenated. Unlike the two-dimensional, single-Frame presentation of the Quilt, the Yarn presents a one-dimensional container of many Frames with a Bus-like interface. Unlike a Bus or Quilt, the index of a Yarn can be arbitrarily relabeled. These features permit heterogeneous Bus to be made available in a single container under (if needed) new labels.

The Yarn, as an even higher-order container, can only be initialized with one or more Bus or Yarn. A Yarn can even be created from multiple instances of the same Bus if each is given a unique name:

>>> sf.Yarn.from_buses((bus.rename('a'), bus.rename('b')), retain_labels=True)
<Yarn>
<IndexHierarchy>
a                f1    Frame
a                f2    Frame
b                f1    Frame
b                f2    Frame
<<U1>            <<U2> <object>

Common & Distinguishing Characteristics

A common characteristic shared by the Bus, Batch, and Quilt is that they all support instantiation from an iterator of pairs of labels and Frames. When that iterator is from a Bus, the lazy-loading of the Bus can be used to minimize memory overhead.

These containers all share the same file-based constructors, such as from_zip_csv() or from_xlsx(); each constructor has a corresponding exporter, e.g., to_zip_csv() or to_xlsx(), respectively, permitting round-trip reading and writing, or conversion from one format to another. The following table summarize the file-based constructors and exporters available on all three containers. (The Yarn, as an aggregation of Bus, only supports the exporters.)

Constructor

Exporter

from_hdf5

to_hdf5

from_sqlite

to_sqlite

from_zip_csv

to_zip_csv

from_zip_npz

to_zip_npz

from_zip_pickle

to_zip_pickle

from_zip_parquet

to_zip_parquet

from_zip_tsv

to_zip_tsv

from_xlsx

to_xlsx

These containers can be distinguished by dimensionality, shape, and interface. The Bus and Yarn are one-dimensional collections of Frames; the Batch and Quilt present two-dimensional Frame-like interfaces. While the shape of the Bus is equal to the number of Frames (or, for the Yarn, the number of Frames in all contained Bus), the shape of the Quilt depends on its contained Frames and its axis of orientation. Like a generator, the length (or shape) of a Batch is not known until iteration. Finally, while the Bus and Yarn expose a Series-like interface, the Batch and Quilt expose a Frame-like interface, operating on individual Frames or the virtually concatenated Frame, respectively.

As shown in the following table for m Bus of n Frame of shape (x, y), these containers populate a spectrum of dimensionality and interfaces.

Bus

Batch

Quilt

Yarn

Presented ndim

1

2

2

1

Approximate Interface

Series

Frame

Frame

Series

Composes

n Frame

1 Bus/Yarn of n Frame

m Bus of n Frame

Presented shape

(n,)

(xn, y) or (x, yn)

(mn,)

Processing 7,163 DataFrames

The “Huge Stock Market Dataset” contains a collection of 7,163 CSV tables, each table a time series of characteristics for a US stock. The file “archive.zip” is available at https://www.kaggle.com/borismarjanovic/price-volume-data-for-all-us-stocks-etfs

After opening the archive, we can read from the contained “Stocks” directory and use a Batch to create a zip pickle of the stock data, labelled by ticker, for fast reading in subsequent examples. As some files are empty, we must also filter out files with no size. Depending on hardware, this initial transformation may take some time.

>>> import os
>>> d = 'archive/Stocks'
>>> fps = ((fn, os.path.join(d, fn)) for fn in os.listdir(d))
>>> items = ((fn.replace('.us.txt', ''), sf.Frame.from_csv(fp, index_depth=1)) for fn, fp in fps if os.path.getsize(fp))
>>> sf.Batch(items).to_zip_pickle('stocks.zip')

As the Bus is lazy, initialization from this new zip archive loads zero Frames into memory. Fast access to the data is provided only when explicitly requested. Thus, while the Bus.shape attribute shows 7,163 contained Frames, the status attribute shows zero loaded Frames.

>>> bus = sf.Bus.from_zip_pickle('stocks.zip')
>>> bus.shape
(7163,)
>>> bus.status['loaded'].sum()
0

Accessing a single Frame loads only that one Frame.

>>> bus['ibm'].shape
(14059, 6)
>>> bus['ibm'].columns
<Index>
Open
High
Low
Close
Volume
OpenInt
<<U7>

Extracting multiple Frames produces a new Bus that reads from the same store.

>>> bus[['aapl', 'msft', 'goog']]
<Bus>
<Index>
aapl    Frame
msft    Frame
goog    Frame
<<U9>   <object>
>>> bus.status['loaded'].sum()
4

With a Batch we can perform operations on the Frames contained in the Bus, returning labeled results. The Batch.apply() method can be used with a lambda to multiply two columns (“Volume” and “Close”) of each Frame; we then extract the most recent two values with iloc and produce a composite Frame, the index derived from the original Bus labels:

>>> sf.Batch(bus[['aapl', 'msft', 'goog']].items()).apply(lambda f: f['Close'] * f['Volume']).iloc[-2:].to_frame()
<Frame>
<Index> 2017-11-09         2017-11-10         <<U10>
<Index>
aapl    5175673321.5       4389543386.98
msft    1780638040.5600002 1626767764.8700001
goog    1283539710.3       740903319.18
<<U4>   <float64>          <float64>

To make observations across the entire dataset, we can pass the Bus to a Quilt. Below, a null slice is used to force loading all Frames at once to optimize Quilt performance. The shape shows a Quilt of almost 15 million rows.

>>> quilt = sf.Quilt(bus[:], retain_labels=True)
>>> quilt.shape
(14887665, 6)

Using the Quilt we can calculate the total volume of seven thousand securities on a single day without explicitly concatenating all Frames. The StaticFrame HLoc selector, used below, permits per-depth-level selection within a hierarchical index. Here we select all security records for 2017-11-10, across all tickers, and sum the volume.

>>> quilt.loc[sf.HLoc[:, '2017-11-10'], 'Volume'].sum()
5520175355

Similarly, the iloc_max() method can be used to find the ticker and date of the security with the highest volume across all securities. The ticker and date become the name attribute of the Series selected by iloc_max().

>>> quilt.iloc[quilt['Volume'].iloc_max()]
<Series: ('bac', '2012-03-07')>
<Index>
Open                            7.4073
High                            7.6065
Low                             7.3694
Close                           7.6065
Volume                          2423735131.0
OpenInt                         0.0
<<U7>                           <float64>

Cross-Container Comparisons: Same Method, Different Selections

The previous examples demonstrated loading, processing, and examining the “Huge Stock Market Dataset” with the Bus, Batch, and Quilt. Cross-container comparisons can be used to further illustrate the characteristics of these containers. First, we can observe how three different selections are returned by applying the same method to each container. Second, we can observe how three approaches can be used with each container to return the same selection.

The head(2) method returns the first two rows (or elements) from any container. Understanding how the method’s output differs between the Bus, Batch, and Quilt helps illustrate their nature.

The head(2) method call on the Bus returns a new Bus consisting of the first two elements, i.e., the first two Frames in the “Huge Stock Market Dataset”.

>>> bus.head(2)
<Bus>
<Index>
fljh    Frame
bgt     Frame
<<U9>   <object>

As the Batch operates on each Frame in a Bus, calling head(2) extracts the top two rows from each Frame in the “Huge Stock Market Dataset.” Calling to_frame() concatenates these extractions into a new Frame, from which only two columns are then selected:

>>> sf.Batch(bus.items()).head(2).to_frame().shape
(14316, 6)
>>> sf.Batch(bus.items()).head(2).to_frame()[['Close', 'Volume']]
<Frame>
<Index>                     Close     Volume  <<U7>
<IndexHierarchy>
fljh             2017-11-07 26.189    1300
fljh             2017-11-08 26.3875   3600
bgt              2005-02-25 11.618    97637
bgt              2005-02-28 11.683    90037
angi             2011-11-21 15.4      469578
angi             2011-11-22 16.12     202970
ccj              2005-02-25 20.235    3830399
ccj              2005-02-28 19.501    3911079
uhs              2005-02-25 22.822    4700749
uhs              2005-02-28 23.056    1739084
eqfn             2015-07-09 8.68      489900
eqfn             2015-07-10 8.58      44100
ivfgc            2016-12-02 99.97     5005
ivfgc            2016-12-05 99.97     6002
achn             2006-10-25 11.5      0
achn             2006-10-26 12.39     361420
eurz             2015-08-19 24.75     200
...              ...        ...       ...
cai              2007-05-16 15.0      3960000
desc             2016-07-26 27.062    1015
desc             2016-07-27 27.15     193
swks             2005-02-25 7.0997    1838285
swks             2005-02-28 6.9653    2737207
hair             2017-10-12 9.92      2818561
hair             2017-10-13 9.6       294724
jnj              1970-01-02 0.5941    1468563
jnj              1970-01-05 0.5776    1185461
rosg             2011-08-05 181.8     183
rosg             2011-08-08 169.2     79
wbbw             2013-04-12 13.8      162747
wbbw             2013-04-15 13.67     126845
twow             2017-10-23 16.7      10045
twow             2017-10-24 16.682    850
gsjy             2016-03-07 25.238    14501
gsjy             2016-03-08 25.158    12457
<<U9>            <<U10>     <float64> <int64>

Finally, the Quilt represents the contained Frames as if they were a single, contiguous Frame. Calling head(2) returns the first two rows of that virtual Frame, labelled with a hierarchical index whose outer label is the Frame’s label (i.e., the ticker).

>>> quilt.head(2)[['Close', 'Volume']]
<Frame>
<Index>                     Close     Volume  <<U7>
<IndexHierarchy>
fljh             2017-11-07 26.189    1300
fljh             2017-11-08 26.3875   3600
<<U4>            <<U10>     <float64> <int64>

Cross-Container Comparisons: Same Selections, Different Methods

Next, we will show how three approaches can be used with each container to return the same selection. While the head() method, used above, is a type of pre-configured selector, the full range of loc and iloc selection interfaces are supported by all containers. The following examples extract all “Open” and “Close” records from 1962-01-02.

To perform this selection with a Bus, we can iterate through each Frame and select the targeted records.

>>> for label, f in bus.items():
...     if '1962-01-02' in f.index:
...         print(f.loc['1962-01-02', ['Open', 'Close']].rename(label))
...
<Series: ge>
<Index>
Open         0.6277
Close        0.6201
<<U7>        <float64>
<Series: ibm>
<Index>
Open          6.413
Close         6.3378
<<U7>         <float64>

The Batch offers a more compact interface to achieve this selection than possible with the Bus. Without writing a loop, the Batch.apply_except() method can select row and column values from within each contained Frame while ignoring any KeyErrors raised from Frames without the selected date. Calling to_frame() concatenates the results together with their Frame labels.

>>> sf.Batch(bus.items()).apply_except(lambda f: f.loc['1962-01-02', ['Open', 'Close']], KeyError).to_frame()
<Frame>
<Index> Open      Close     <<U7>
<Index>
ge      0.6277    0.6201
ibm     6.413     6.3378
<<U3>   <float64> <float64>

Finally, as a virtual concatenation of Frames, the Quilt permits selection as if from a single Frame. As shown below, a hierarchical selection on the inner label “1962-01-02” brings together any records for that date across all tickers.

>>> quilt.loc[sf.HLoc[:, '1962-01-02'], ['Open', 'Close']]
<Frame>
<Index>                     Open      Close     <<U7>
<IndexHierarchy>
ge               1962-01-02 0.6277    0.6201
ibm              1962-01-02 6.413     6.3378
<<U3>            <<U10>     <float64> <float64>

Minimizing Memory Usage

In previous examples, the Bus was shown to lazily load data as it was accessed. While this permits only loading what is needed, strong references to loaded Frames are retained in the Bus, keeping them in memory. For large collections of data this can result in undesirable data retention.

By using the max_persist argument on Bus initialization, we can fix the maximum number of Frames retained in the Bus. As shown below, by setting max_persist to one, after loading each Frame, the number of loaded Frames remains one:

>>> bus = sf.Bus.from_zip_pickle('stocks.zip', max_persist=1)
>>> bus['aapl'].shape
(8364, 6)
>>> bus.status['loaded'].sum()
1
>>> bus['ibm'].shape
(14059, 6)
>>> bus.status['loaded'].sum()
1
>>> bus['goog'].shape
(916, 6)
>>> bus.status['loaded'].sum()
1

With this configuration, a process could iterate through all 7,163 Frames, doing work on each Frame, but only incurring the memory overhead of a single Frame. While the same routine could be performed with a group-by on a single Frame, this approach explicitly favors minimizing memory usage over compute time. The example below demonstrates such an approach, finding the maximum span between close quotes per stock across all stocks.

>>> max_span = 0
>>> for label in bus.index:
...     max_span = max(bus[label]['Close'].max() - bus[label]['Close'].min(), max_span)
...
>>> max_span
1437986239.4042
>>> bus.status['loaded'].sum()
1

As a Bus can be provided as input to a Batch, Quilt, and Yarn, the entire family of containers can benefit from this approach to reducing memory overhead.

Parallel Processing

Independently processing large numbers of Frames is an embarrassingly parallel problem. As such, these higher-order containers provide opportunities for parallel processing.

All constructors and exporters of zipped archives, such as from_zip_parquet() or to_zip_npz(), support a config argument that permits specifying, within a StoreConfig instance, numbers of workers and chunksize for multiprocessing Frame deserialization or serialization. The relevant parameters of the StoreConfig are read_max_workers, read_chunksize, write_max_workers, and write_chunksize.

Similarly, all Batch constructors expose max_workers, chunk_size, and use_threads parameters to permit processing Frames in parallel. Simply by enabling these parameters, operations on vast numbers of Frames can be multi-processed or multi-threaded, potentially delivering significant performance improvements. While using threads for CPU-bound processing is generally inefficient in Python, some NumPy-based operations (outside the global interpreter lock) executed with thread pools can out-perform process pools.

Conclusion

While related tools for working with collections of Frames exist, the Bus, Batch, Quilt, and Yarn provide well-defined abstractions that cover common needs in working with potentially huge collections of tables. Combined with lazy loading, eager unloading, and lazy execution, as well as support for a variety of multi-table storage formats, these tools provide valuable resources for DataFrame processing.