Spark and Storlets

One of the most compelling use cases for Storlets (at least as I see it) is boosting Spark analytics workloads by offloading compute to Openstack Swift object store. While not all types of workloads can be offloaded, there are some that definately can. This blog post introduces one type of such a copmutation and introduces Spark-Storlets [1] that imlements its offload. When coming to implement Spark-Storlets it was tempting to leverage the existing stack that connects Spark with Swift (especially Hadoop RDD and Hadoop-IO). This, however, proved to have some issues that are also described in this post.

Storlets Essentials

Openstack Storlets allow to execute dockerized computations inside Openstack Swift object store in a serverless fashion. With Storlets, an end user can upload the code as if it were just another data object, and then execute it over data in the object store without the need to take care of any server side settings. We refer to the code being invoked on data objects as a storlet. Storlets can be invoked during the upload, download or copy of an object. Invoking a storlet during download means that rather than downloading the object data as kept in the store, the user downloads a transformed object (transformed by the storlet). Thus, the storlet computation is done while the data is "in-flight" and so storlets are best for "streaming" computations where typically, the output should start flowing out before all the input is read. Filtering is a good example for such a computation.

Spark Essentials

Spark is a most popular distributed in-memory compute engine that can ingest data from a variety of storage systems, including Openstack Swift. Object stores can serve as a cheap massively scalable secondary storage systems for data that is no longer affordable to keep in primary storage. However, moving the data to a secondary storage does not mean it should not be queriable anymore. Ideally, the data should not be copied back to the primary storage tier in order to be queried. Spark-Storlets allow just that.

From the Spark side of things there is the Data Sources API. The basic functionality of this API is to import structured data that is stored in various formats. One example is Databrick's Spark-CSV package [2] that implements the Data Sources API allowing to import CSV formatted data to Spark SQL. The basic API (known as Scan) takes no parameters and is expected to return an RDD whose entries are table rows that can be further manipulated using Spark SQL. More sophisticated versions of the API take selection and/or projection filters (PrunedScan and PrunedFilteredScan). This means that Spark SQL can leverage the data source API implementation not only to parse a cetain format but to also do some filtering while parsing the data. Thus, by calling the PrunedFilteredScan API that takes both selection and projection filters, Spark-CSV returns an RDD consisted only of the selected rows with each row consisting only of the projected columns.

Now this is a low hanging fruit for Storlets: Filtering is a classic computation for storlets, and the data sources API is essentially an offloading hook. Indeed, the Spark-Storlets repo starts with an implementation of the Data Sources API for CSV formated data. The implementation leverages the CSVStorlet that can perform selection and projection filtering within the object store (the storlet is part of the Openstack Storlets repo).

  Some Design Considerations

Independently of Spark-SQL and the Data Source API, Spark has the ability to read data from Swift using the Hadoop I/O layer. Moreover, there is also a choice of drivers to do so: IBM's stocator [3] and Hadoop's Openstack Swift native file system driver [4]. It is tempting to leverage those existing 'connectors' also for 'spark-storlets'. Attempting to do so, however, reveals quite a few issues.

Partition Discovery

The first issue has to do with partition discovery. In the context of loading data from external storage, partition discovery allocates different partitions of the data to different Spark worker nodes. This essentially parallelize the reading and processing of the data amongst the Spark nodes. When using Hadoop I/O, data partitioning is done according to the underlying driver reported chunk size. In HDFS the chunk size is a fundamental system wide parameter: Every file is divided to chunks of that size, and these chunks are spread over the HDFS cluster. Thus, in HDFS it only makes sense that data would be consumed in chunks of that size. Object stores, however, are a different story: When we offload work to the object store it would make sense to partition taking into account the "amount of parallelism" that exists in the object store, e.g. the replication factor and the number of workes in each replica node.

The Broken Records Problem

Partitioning the data amongst different workers surfaces a classic distributed computing problem: Imagine a read only shared memory containing all the data. Different workers are assigned different partitions of the data, and are not allowed to communicate. How can we make sure that records crossing a partition boundary are not lost? Apparently, knowing the maximum length of a record is enough to solve the problem: In case a worker reaches the end of its partition and it is still in the middle of a record it will read past the end up to the maximum length until it reaches record separator. In addition all workers except the one working on the first partition skip the beginning of their data chunk up to the first record separator.

This solution is implemented inside the Hadoop I/O layer. When offloading the filtering to the object store, different storlet instances will be processing different ranges of the data object, and so we clearly need to solve this also within the storlet. Thus, if we are to use Hadoop I/O it should be made storlets aware.

Data Format

Relying on Hadoop I/O also limits the data formats that can be used to text based data. Consider the use case of EXIF metadata extraction from JPEGs. One can imagine a Data Source API implementation for JPEGs that would import a large collection of JPEGs as a table of their EXIF metadata. As JPEGs are binary data, they cannot be consumed using Hadoop I/O. Also, partitioning the dataset in this case should have a totally different approach then using chunk size.

Beyond Data Sources API

As mentioned in the background, Storlets can do more than filtering, and there is no apparent reason why we should limit the offload to Data Sources API. Why not provide the Spark programmer means to offload certain tasks explicitly? Practically, this means that for any type of "storlet comaptible" functionality we need an RDD-Storlet pair, where the RDD serves as an iterator over the storlet output. It would be interesting to find machine learning algorithms that can benefit from the type of computations that can be done using storlets.

Getting Further

While not directly related to Spark and Storlets, importing a container listing as structured data to Spark seems like a compelling idea (i.e. Swift container as a data source). Specifically, if/when Swift will have metadata search support. Having a queriable container listing (by name or object metadata) seems most useful. It can help reduce the number of objects on which we actually want to run storlets, or do other operations.

There is, however, more to the idea which has to do with another partition discovery issue: When loading data from external storage it is possible to load many objects. Unlike filesystems, object stores are typically made of flat namespaces that can contain millions of objects. Current partition discovery for multiple objects relies on the ability to hold the container listing in memory while calculating the partitions. This does not scale. Having an RDD as a container listing might be of help. This implies, however, lazy or dynamic partitioning that seems to be supported only with Spark-Streaming. It would be fun to work on that one.

 

[1] https://github.com/eranr/spark-storlets

[2] https://github.com/databricks/spark-csv

[3] https://github.com/SparkTC/stocator

[4] https://github.com/apache/hadoop/tree/trunk/hadoop-tools/hadoop-openstack/src/main/java/org/apache/hadoop/fs/swift

Storlet: 
1

Add new comment