Arrow

Apache Arrow Flight: A Primer

David Li, Tom Drabas Mar 24, 2022
Geese in flight

Anyone who has moved houses at least once knows that packing and unpacking boxes is no fun and extremely time-consuming, not to mention expensive if you hire someone to do it. Unfortunately, transferring data can be just as fun as moving.

Sending data from one computer–or device or database or server–to another requires serializing data into an intermediate format that both parties can understand, just like packing belongings into boxes. Serialized data is sent to the receiver, corresponding to transporting the boxes to a new place. Then, the receiver needs to deserialize the data into a format it understands, just like unpacking the boxes at the new destination. Serializing and deserializing data on each end, however, can be responsible for as much as 90% of the total time that it takes to move the data, slowing things down and driving up costs.

To move data quickly, we need to get rid of serialization and deserialization, which requires two things: a data standard that both the sender and receiver understand, so we don’t have to pack and unpack, and an efficient protocol for moving the data. The initial release of Apache Arrow provided the foundation–a columnar, cross-language format for laying out data in memory. However, there was no clear framework for sending Arrow data between computers. Until now.

Enter Arrow Flight

Arrow Flight is a framework for transporting Arrow data efficiently over the network. Arrow eliminates the need to serialize and deserialize data–or pack and unpack the boxes–and Arrow Flight brings those benefits to the network by replacing renting a truck with a conveyor belt: simply put your (data) belongings on one end and pick them up at their destination.

Flight simplifies the job of application developers by providing the ability to send and receive data quickly: a Flight client can stream Arrow data to and from a remote Flight server. No serialization and deserialization, no writing a one-off solution to move the data to another computer. It builds on top of multiple underlying technologies, but applies optimizations that make it more performant compared to custom implementations. In addition, Flight’s implementation is flexible and allows developers to customize server-side methods to accommodate more complex use cases.

At a high level, Flight is a remote procedure call (RPC) framework. RPC is a paradigm for structuring inter-process communications, whether it’s between two processes on the same machine or across the network. As the name implies, RPC models communication as calling remote functions and getting back results, just like in procedural programming—except the code being executed is in some other process.

Accordingly, Flight provides functions that fetch Arrow data from (or send data to) some remote process. It integrates tightly with Arrow libraries to make it easy to use and to find optimization opportunities. First introduced in Arrow 0.11, Flight currently ships with the Arrow libraries in Python, Java, Rust, and many other languages.

Now, let’s dive in a little deeper.

Arrow Flight Stack

A diagram of the Arrow Flight tech stack
The Arrow Flight “tech stack”

Arrow Flight builds on top of several open source libraries. While Flight is an RPC framework, it builds on top of the gRPC RPC framework by Google and the Cloud Native Computing Foundation. Adopted by organizations like Dropbox and Netflix, it’s fast, open source, and supported across a wide variety of programming languages. Flight lets gRPC handle all the low-level details around network communications and layers its own optimizations and integration with Arrow data on top.

As we’ve described, sending messages between processes, especially ones written in different languages, requires some way to describe the format of the data so both parties can understand each other. Protocol Buffers (“Protobuf”), a serialization format and library from Google, often fills that role for applications using gRPC, and it’s also used in Flight for metadata, alongside Arrow itself for data, of course.

Using Arrow Flight

Normally, to use gRPC, we have to agree on RPC methods and their input and output types ahead of time so that the server and client can communicate. Flight does this for you, defining abstract methods for things like describing the schema of a dataset, uploading data to or downloading data from a server, and more. A Flight server overrides only the methods it wants to support, and a client using Flight can connect to a server and call these methods.

Flight’s methods either deal with data or metadata. Everything starts with a FlightDescriptor, which describes some dataset. For instance, you can pass it to the GetFlightInfo method to get FlightInfo metadata describing how to download a dataset. To do so, you can “redeem” the Tickets inside the FlightInfo for the data by calling the DoGet method, which will stream data back to the client.

If you want to upload data instead, just pass the FlightDescriptor to the DoPut method and start writing data to the server. And for advanced use cases, use DoExchange instead, which lets you read and write data simultaneously. Note that all of the “Do” methods are streaming, so you can work with datasets that don’t even fit in memory.

Flight also provides several methods to query various metadata about datasets, like fetching the schema without reading the data, as well as to let applications extend it with custom “actions” that don’t naturally fit into any other method. You can get the full details from the Flight documentation.

Given these methods, you can imagine a variety of ways to put them together into an application. For instance, requests in a simple key-value store might look like this, where the client directly constructs a Ticket message and sends it to the server:

Diagram of a simple key-value store

For a simple key-value store, the client can call DoPut to write data, then DoGet to retrieve it.

But maybe you want to support sharding for resiliency, and distribute a dataset across multiple servers. First, use GetFlightInfo to ask a server where the data is located before requesting it, instead of assuming the data is on the server we’re talking to.

Diagram of advanced key value
The client can use GetFlightInfo to ask a server for a request payload, which it then uses with DoGet to fetch the data.

Then, split the service into microservices. One server can focus on handling metadata requests, and a fleet of servers can scale up independently to serve data:

Diagram of separate services
The server may tell the client that the data is located on a different server.

Since the data is now split across multiple servers, you can make DoGet requests in parallel for more throughput:

Diagram of parallel requests

The first server may tell the client the data is located on multiple servers, and the client can then fetch data in parallel.

Making Things Fast

As mentioned, Flight combines the Arrow libraries, gRPC, and Protobuf, so in theory, Flight has to deploy a lot of machinery to send data. Without any optimizations, one way to do this would be as follows: start with some Arrow data in memory, represented by a RecordBatch object in C++ or Python, which is just a bunch of arrays. Those arrays then get copied into an IPC RecordBatch structure. This is different from the RecordBatch object because the IPC structure is a single, self-contained buffer, completely contiguous in memory and ready to be sent across the network—it’s not something we normally use directly. The resulting data is copied into a Protobuf message, called FlightData, that itself gets serialized and handed off to gRPC to send across the network. On the receiving end, the steps are reversed, and the message is deserialized back into the Arrow data.

A diagram showing the conceptual steps in Flight: from RecordBatch to IPC RecordBatch to FlightData.
A diagram showing the conceptual steps in Flight: from RecordBatch to IPC RecordBatch to FlightData.

This sounds inefficient—we talked about how Flight doesn’t need serialization, but a quick count shows a lot of (conceptual) boxes getting packed and unpacked! While the details vary between implementations, Flight can actually skip a lot of steps here because it’s tightly integrated with Arrow. But a “naive” Flight implementation can follow this outline to get a correct, interoperable implementation using out-of-the-box Arrow and gRPC methods.

We’ll focus on C++ here (which underlies PyArrow, R, and other Arrow implementations), but in short, Flight bypasses Protobuf entirely for messages carrying Arrow data and hooks into gRPC’s serialization layer to avoid copies between gRPC and Arrow. So really, when Flight sends Arrow data, what happens is:

  1. A pointer to the data is passed to gRPC and eventually to a custom serialization handler. Nothing gets serialized or copied, and no boxes get packed!
  2. The custom serialization handler synthesizes the surrounding Protobuf message metadata and Arrow RecordBatch metadata. (Recall a RecordBatch is an IPC message that is contiguous in memory, in contrast to the record batch object in the API.) Again, none of the data itself is copied or serialized—we’re just prepping the bytes needed to describe the data to the other side.
  3. The handler gives that data, as well as pointers to the underlying Arrow data buffers, to gRPC. Still no copying or serialization!
  4. gRPC then writes out the Protobuf and RecordBatch metadata to the network, followed by the data in the Arrow buffers. Once all of this is written to the network, we have sent the same exact data as if Flight had separately serialized to an intermediary RecordBatch first, copied that to a Protobuf, and then serialized the Protobuf—except without any copies!

No extra copies are created, and Flight avoids the Protobuf serializer. Going back to the conveyor belt metaphor, it’s like putting stuff on the belt in the order it would have been packed, so that the person on the other end knows what to do with the things they’re getting—but without actually having to pack anything.

Diagram of optimizations

On the flip side, when data is received:

  1. gRPC hands a list of buffers containing the raw message data to a custom deserialization handler.
  2. The handler parses the Protobuf and RecordBatch metadata, then reconstructs Arrow buffers from the gRPC buffers, reusing the gRPC-allocated memory where possible.
  3. The handler reconstructs the in-memory Arrow RecordBatch objects and hands them back to the application.

Again, there’s no Protobuf parser, and no data copies, whether it’s from gRPC to Protobuf, from Protobuf to Arrow, or from gRPC to Arrow!

Note one caveat: Arrow buffers need to be contiguous in memory, since that makes it more efficient to process the data. Unfortunately, gRPC often gives discontiguous buffers to the deserialization handler. In this case, Flight does have to copy the data once to get a contiguous buffer.

Summary

Apache Arrow Flight combines gRPC, Protocol Buffers, and the Arrow libraries to provide a performant, easy-to-use RPC framework specialized for transferring Arrow data. By providing lots of building blocks, as well as low-level optimizations, we can avoid wasting our time packing and unpacking (boxes of) data and get maximum bandwidth even while building high-level applications. For more information, check out the cookbook recipe that introduces this framework.

 

Photo credit: “Cinematic moment of geese departing during sunset” by Jeff S. PhotoArt at HDCanvas.ca is marked with CC BY-NC-ND 2.0.