Jul 21, 2022
Serving Dataframes Over the Wire with Arrow Flight SQL and DuckDB
Tom Drabas, David Li, Philip Moore, Hussain Sultan
Most databases have their own internal data representations that require serializing data to an intermediary format or API to extract the data, especially if one wants to export the data into a columnar format like Parquet. This inefficiency is frequently a limiting factor for data engineering or data science pipelines that require moving data between databases or systems.
Back in 2016, Apache Arrow project introduced an in-memory columnar data standard and inter-process communication framework to help unify different systems’ data representations and provide a common format for different systems. Building on that foundation, Apache Arrow Flight was introduced as a framework for exchanging Arrow data over a network. Arrow Flight cannot, though, talk directly to databases: clients and servers need to agree on the protocol to use to communicate with Flight. Arrow Flight SQL fills this gap and allows conveniently querying and retrieving Arrow data tables from remote databases.
What is Arrow Flight SQL?
In an earlier post, we introduced Arrow Flight SQL as a framework to alleviate the data serialization pains and streamline the access to databases.
Flight SQL builds on top of Arrow Flight and defines RPC endpoints for executing SQL queries, fetching database metadata, and creating prepared statements. This enables any client that uses the Flight SQL client to work with any database that conforms to the Flight SQL specifications. But more importantly, it lets databases directly expose Arrow data to minimize expensive serialization and deserialization: in an ideal scenario, if both the database, and the client are Arrow-native, no data transformation would have to take place. In addition, a database that supports Flight SQL could expose the results of a query at multiple endpoints that Arrow Flight client can then retrieve in parallel.
The Tech Stack
At a high level, Arrow Flight SQL can act as a unified interface between a client and any database that exposes Arrow Flight SQL endpoints. Compared to current solutions that require developing and maintaining ODBC and/or JDBC drivers conforming to individual database protocols, Flight SQL unified interface on both sides of the data transport (client and server) significantly improves the experience of a user and a developer.
Figure 1. A theoretical illustration of how a single client could interact with various databases via the Arrow Flight SQL interface.
In the remainder of this post we will focus on how to implement Flight SQL with DuckDB. DuckDB is a free and open source, highly performant database management system, and is quickly gaining a lot of popularity thanks to its lightweight design, fast execution, feature-richness, and flexibility. As of last December DuckDB also supports zero-copy data exchange with Arrow via the Arrow C Data Interface enabling fast and convenient analysis of larger-than-memory datasets, and thus is a perfect example to adapt Arrow Flight SQL.
By using the C Data Interface, returning Arrow data back to the client is extremely easy and efficient. Following the simplified process from the image above, the client first sends a query to DuckDB via the Arrow Flight SQL interface: this can be executing a SQL query, listing tables, or listing catalogs (among many other calls). DuckDB executes the query and returns Arrow data (via the zero-copy C Data Interface), which we then send back to the client over Arrow Flight, as described by the Flight SQL protocol.
How To: Flight SQL and DuckDB
Arrow Flight SQL is currently available in C++ and Java. All the code for this example (and more, as we also showcase SQLite Flight SQL example!) can be found in this repository. We strongly encourage you to build and run a Docker image to explore and run the above examples.
Create a Server
First, we need to create a Arrow Flight SQL server. Similarly to Arrow Flight, the Flight SQL API provides abstract classes and methods that the developer implements.
To implement a Flight SQL server, just derive from the FlightSqlServerBase class. Apart from a few methods to manage starting up and shutting down the server, almost all of the methods it defines are meant to be overridden to implement various Flight SQL endpoints by the derived class. In our case, we create a helper method that initializes the DuckDB Flight SQL server.
First, we open the DuckDB database specified on the path, and then instantiate a connection, following the instructions found in the official DuckDB documentation. The remainder of the code sets up the private implementation class of the DuckDB Flight SQL Server, and registers all the functionality that DuckDB supports at the time when this example was created.
The actual protocol to interact with the server requires creating a FlightInfo by calling an appropriate
GetFlightInfoXXX method: in our example, we implement
GetFlightInfoTables we will use to retrieve a list of tables from the database, and
GetFlightInfoStatement that we will use to create a ticket to execute a query against those tables. The ticket can then be used to retrieve the actual data.
In order to create the
FlightInfo object we call the
Make method and pass the
Schema for the call, the
FlightDescriptor, and a vector of
FlightEndpoint objects; we also pass
-1 in place of
total_bytes since we want to extract all the records.
A fundamental object to retrieve all the required information to build the FlightInfo object is the
DuckDBStatement that we will discuss in the next section.
In order to execute a SQL statement against a DuckDB database and retrieve the results, we first create the DuckDBStatement wrapper that allows us to prepare the DuckDB PreparedStatement, and later execute it and return the results as
Prepare call to con returns a
PreparedStatement. Subsequently, upon calling the
Execute method of the
DuckDBStatement we use the C Data Interface to zero-copy move the data from DuckDB’s internal format to Arrow’s
Once the statement
stmt_ is executed, we use the
ImportRecordBatch to create a
RecordBatch by passing
ArrowSchema, both of which can conveniently be zero-copy moved from DuckDB using
For a Flight SQL client, in order to retrieve the results, the framework actually calls the
DoGetStatement method on the server side.
Flight SQL client uses FlightStreamReader to retrieve data from the server. Once the stream is ready, the reader calls the
ReadNext method to retrieve the next batch of data.
First, we check if the statement has already been executed by the DuckDB database and, if not, execute it. The call to
Execute, as shown earlier, would also cache the results. Once the results are pushed out onto the stream, we set the
results_read_ flag to
true so upon another call we terminate the stream; the stream termination is indicated by appending the
nullptr to the stream.
Query the Server
Having prepared all the machinery to interact with DuckDB using Arrow Flight SQL, we can now start using it
Create Server and Client
To create a server we need to pass the path to the database and configuration.
If the above call is successful and returns a valid DuckDBFlightSqlServer object, we then initialize it and set the shutdown signal.
Now we can create a Flight SQL client.
FlightSqlClient uses FlightClient under the hood to communicate with the Flight SQL server.
List Available Tables
Now that we have the server up and running, and a client connected to the server, we can list the tables stored in the database.
Here, we’re using a default call to the server to retrieve a list of tables. At a high-level, the
GetTables method of the client will call the
GetFlightInfoTables to retrieve the
FlightInfo object that then can be used to retrieve the results from endpoints.
To retrieve and print the results we can loop over all the endpoints and call the DoGet method that returns the stream reader.
FlightStreamReader object can be then converted to an Arrow table.
The above code would produce a result similar to the one below (abbreviated for brevity).
Run a Query
To execute a query we use the
Execute call to the client. Under the hood, the client will run the
GetFlightInfoStatement call on the server to retrieve the
FlightInfo object specific to our query.
If successful, the returned
FlightInfo object can be used to retrieve the results using the
DoGet call as in the example above. The output may look similar to this (again, abbreviated for brevity).
Simplifying Database Connections
The code samples contained within this post should be treated as one way of implementing Flight SQL interface for a database. We skipped a lot of functionality available, but the aim of this article was to show a minimal example of implementing a Flight SQL enabled server. We can also implicitly see how any Flight SQL client could talk to any database implementing the protocol: the only difference would be to connect to a different host or pass a different set of parameters.
If you’re working within the Apache Arrow ecosystem, we’re here to support you. Check out Voltron Data Enterprise Support subscription options today.