Arrow

Serving Dataframes Over the Wire with Arrow Flight SQL and DuckDB

Tom Drabas, David Li, Philip Moore, Hussain Sultan Jul 21, 2022

Most databases have their own internal data representations that require serializing data to an intermediary format or API to extract the data, especially if one wants to export the data into a columnar format like Parquet. This inefficiency is frequently a limiting factor for data engineering or data science pipelines that require moving data between databases or systems.

Back in 2016, Apache Arrow project introduced an in-memory columnar data standard and inter-process communication framework to help unify different systems’ data representations and provide a common format for different systems. Building on that foundation, Apache Arrow Flight was introduced as a framework for exchanging Arrow data over a network. Arrow Flight cannot, though, talk directly to databases: clients and servers need to agree on the protocol to use to communicate with Flight. Arrow Flight SQL fills this gap and allows conveniently querying and retrieving Arrow data tables from remote databases.

What is Arrow Flight SQL?

In an earlier post we introduced Arrow Flight SQL as a framework to alleviate the data serialization pains and streamline the access to databases.

Flight SQL builds on top of Arrow Flight and defines RPC endpoints for executing SQL queries, fetching database metadata, and creating prepared statements. This enables any client that uses the Flight SQL client to work with any database that conforms to the Flight SQL specifications. But more importantly, it lets databases directly expose Arrow data to minimize expensive serialization and deserialization: in an ideal scenario, if both the database, and the client are Arrow-native, no data transformation would have to take place. In addition, a database that supports Flight SQL could expose the results of a query at multiple endpoints that Arrow Flight client can then retrieve in parallel.

The Tech Stack

At a high level, Arrow Flight SQL can act as a unified interface between a client and any database that exposes Arrow Flight SQL endpoints. Compared to current solutions that require developing and maintaining ODBC and/or JDBC drivers conforming to individual database protocols, Flight SQL unified interface on both sides of the data transport (client and server) significantly improves the experience of a user and a developer.

Figure 1. A theoretical illustration of how a single client could interact with various databases via the Arrow Flight SQL interface.

In the remainder of this post we will focus on how to implement Flight SQL with DuckDB. DuckDB is a free and open source, highly performant database management system, and is quickly gaining a lot of popularity thanks to its lightweight design, fast execution, feature-richness, and flexibility. As of last December DuckDB also supports zero-copy data exchange with Arrow via the Arrow C Data Interface, enabling fast and convenient analysis of larger-than-memory datasets, and thus is a perfect example to adapt Arrow Flight SQL.

By using the C Data Interface, returning Arrow data back to the client is extremely easy and efficient. Following the simplified process from the image above, the client first sends a query to DuckDB via the Arrow Flight SQL interface: this can be executing a SQL query, listing tables, or listing catalogs (among many other calls). DuckDB executes the query and returns Arrow data (via the zero-copy C Data Interface), which we then send back to the client over Arrow Flight, as described by the Flight SQL protocol.

How To: Flight SQL and DuckDB

Arrow Flight SQL is currently available in C++ and Java. All the code for this example (and more, as we also showcase SQLite Flight SQL example!) can be found in this repository. We strongly encourage you to build and run a Docker image to explore and run the above examples.

Create a Server

First, we need to create a Arrow Flight SQL server. Similarly to Arrow Flight, the Flight SQL API provides abstract classes and methods that the developer implements.

DuckDBFlightSqlServer

To implement a Flight SQL server, just derive from the FlightSqlServerBase class. Apart from a few methods to manage starting up and shutting down the server, almost all of the methods it defines are meant to be overridden to implement various Flight SQL endpoints by the derived class. In our case, we create a helper method that initializes the DuckDB Flight SQL server.

arrow::Result<std::shared_ptr> DuckDBFlightSqlServer::Create(
   const std::string &path,
   const duckdb::DBConfig &config
) {
   std::shared_ptr db;
   std::shared_ptr con;
   
   db = std::make_shared(path);
   con = std::make_shared(*db);
   
   std::shared_ptr impl = std::make_shared(db, con);
   std::shared_ptr result(new DuckDBFlightSqlServer(impl));

   for (const auto& id_to_result : GetSqlInfoResultMap()) {
      result->RegisterSqlInfo(id_to_result.first, id_to_result.second);
   }
   return result;
}

First, we open the DuckDB database specified on the path, and then instantiate a connection, following the instructions found in the official DuckDB documentation. The remainder of the code sets up the private implementation class of the DuckDB Flight SQL Server, and registers all the functionality that DuckDB supports at the time when this example was created.

The actual protocol to interact with the server requires creating a FlightInfo by calling an appropriate GetFlightInfoXXX method: in our example, we implement GetFlightInfoTables we will use to retrieve a list of tables from the database, and GetFlightInfoStatement that we will use to create a ticket to execute a query against those tables. The ticket can then be used to retrieve the actual data.

In order to create the FlightInfo object we call the Make method and pass the Schema for the call, the FlightDescriptor, and a vector of FlightEndpoint objects; we also pass -1 in place of total_records and total_bytes since we want to extract all the records.

arrow::Result<std::unique_ptr> GetFlightInfoStatement(
    const ServerCallContext& context, const StatementQuery& command,
    const FlightDescriptor& descriptor
) {
    const std::string& query = command.query;
    
    ARROW_ASSIGN_OR_RAISE(auto statement, DuckDBStatement::Create(db_conn_, query));
    ARROW_ASSIGN_OR_RAISE(auto schema, statement->GetSchema());
    ARROW_ASSIGN_OR_RAISE(auto ticket_string, CreateStatementQueryTicket(query));
    std::vector endpoints{FlightEndpoint{{ticket_string}, {}}};
    ARROW_ASSIGN_OR_RAISE(auto result,
    FlightInfo::Make(*schema, descriptor, endpoints, -1, -1))
    return std::unique_ptr(new FlightInfo(result));
}

A fundamental object to retrieve all the required information to build the FlightInfo object is the DuckDBStatement that we will discuss in the next section.

DuckDBStatement

In order to execute a SQL statement against a DuckDB database and retrieve the results, we first create the DuckDBStatement wrapper that allows us to prepare the DuckDB PreparedStatement, and later execute it and return the results as RecordBatches.

arrow::Result<std::shared_ptr> DuckDBStatement::Create(
    std::shared_ptr con, const std::string& sql
){
    std::shared_ptr stmt = con->Prepare(sql);
    std::shared_ptr result(new DuckDBStatement(con, stmt));

    return result;
}

The Prepare call to con returns a PreparedStatement. Subsequently, upon calling the Execute method of the DuckDBStatement we use the C Data Interface to zero-copy move the data from DuckDB’s internal format to Arrow’s RecordBatch.

arrow::Result DuckDBStatement::Execute() {
    auto res = stmt_->Execute();
    
    ArrowArray res_arr;
    ArrowSchema res_schema;
    
    QueryResult::ToArrowSchema(&res_schema, res->types, res->names);
    res->Fetch()->ToArrowArray(&res_arr);
    ARROW_ASSIGN_OR_RAISE(result_, arrow::ImportRecordBatch(&res_arr, &res_schema));
    schema_ = result_->schema();

    return 0;
}

Once the statement stmt_ is executed, we use the ImportRecordBatch to create a RecordBatch by passing ArrowArray and ArrowSchema, both of which can conveniently be zero-copy moved from DuckDB using ToArrowSchema and ToArrowArray calls.

DuckDBStatementBatchReader

For a Flight SQL client, in order to retrieve the results, the framework actually calls the DoGetStatement method on the server side.

arrow::Result<std::unique_ptr> DoGetStatement(
    const ServerCallContext& context, const StatementQueryTicket& command
) {
    const std::string& sql = command.statement_handle;
    
    ARROW_ASSIGN_OR_RAISE(auto statement, DuckDBStatement::Create(db_conn_, sql));
    ARROW_ASSIGN_OR_RAISE(std::shared_ptr reader,
    DuckDBStatementBatchReader::Create(statement));
    
    return std::unique_ptr(new RecordBatchStream(reader));
}

Flight SQL client uses FlightStreamReader to retrieve data from the server. Once the stream is ready, the reader calls the ReadNext method to retrieve the next batch of data.

Status DuckDBStatementBatchReader::ReadNext(std::shared_ptr* out) {
    if (!already_executed_) {
        ARROW_ASSIGN_OR_RAISE(rc_, statement_->Execute());
        already_executed_ = true;
    }
    
    if(!results_read_) {
        ARROW_ASSIGN_OR_RAISE(*out, statement_->GetResult());
        results_read_ = true;
    } else {
        *out = nullptr;
    }

    return Status::OK();
}

First, we check if the statement has already been executed by the DuckDB database and, if not, execute it. The call to Execute, as shown earlier, would also cache the results. Once the results are pushed out onto the stream, we set the results_read_ flag to true so upon another call we terminate the stream; the stream termination is indicated by appending the nullptr to the stream.

Query the Server

Having prepared all the machinery to interact with DuckDB using Arrow Flight SQL, we can now start using it.

Create Server and Client

To create a server we need to pass the path to the database and configuration.

duckdb::DBConfig config;
ARROW_ASSIGN_OR_RAISE(server,
    arrow::flight::sql::duckdbflight::DuckDBFlightSqlServer::Create(“sample.duckdb”, config)
);

If the above call is successful and returns a valid DuckDBFlightSqlServer object, we then initialize it and set the shutdown signal.

ARROW_ASSIGN_OR_RAISE(auto location,
    arrow::flight::Location::ForGrpcTcp("0.0.0.0", 6667));
arrow::flight::FlightServerOptions options(location);

ARROW_CHECK_OK(server->Init(options));
// Exit with a clean error code (0) on SIGTERM
ARROW_CHECK_OK(server->SetShutdownOnSignals({SIGTERM}));
std::cout << "Server listening on localhost:" << server->port() << std::endl;

Now we can create a Flight SQL client.

ARROW_ASSIGN_OR_RAISE(auto location,
    arrow::flight::Location::ForGrpcTcp("localhost", 6667));
arrow::flight::FlightServerOptions options(location);
   
ARROW_ASSIGN_OR_RAISE(auto flight_client, flight::FlightClient::Connect(location));
std::cout << "Connected to server: localhost:" << port << std::endl;
    
std::unique_ptr client(
    new flightsql::FlightSqlClient(std::move(flight_client)));
std::cout << "Client created." << std::endl;

The FlightSqlClient uses FlightClient under the hood to communicate with the Flight SQL server.

List Available Tables

Now that we have the server up and running, and a client connected to the server, we can list the tables stored in the database.

flight::FlightCallOptions call_options;
ARROW_ASSIGN_OR_RAISE(std::unique_ptr tables,
    client->GetTables(call_options, nullptr, nullptr, nullptr, nullptr, nullptr));

Here, we’re using a default call to the server to retrieve a list of tables. At a high-level, the GetTables method of the client will call the GetFlightInfoTables to retrieve the FlightInfo object that then can be used to retrieve the results from endpoints.

To retrieve and print the results we can loop over all the endpoints and call the DoGet method that returns the stream reader.

for (const flight::FlightEndpoint& endpoint : results->endpoints()) {
    ARROW_ASSIGN_OR_RAISE(auto stream, client->DoGet(call_options, endpoint.ticket));
    ARROW_ASSIGN_OR_RAISE(auto table, stream->ToTable());
    
    if(print_results_flag) std::cout << table->ToString() << std::endl;
}

The FlightStreamReader object can be then converted to an Arrow table.

The above code would produce a result similar to the one below (abbreviated for brevity).

catalog_name: string
db_schema_name: string
table_name: string not null
table_type: string not null
----
catalog_name:
...
db_schema_name:
...
table_name:
[
[
"TABLE_1",
...
]
]
table_type:
...

Run a Query

To execute a query we use the Execute call to the client. Under the hood, the client will run the GetFlightInfoStatement call on the server to retrieve the FlightInfo object specific to our query.

ARROW_ASSIGN_OR_RAISE(auto flight_info, client->Execute(call_options, kQuery));

If successful, the returned FlightInfo object can be used to retrieve the results using the DoGet call as in the example above. The output may look similar to this (again, abbreviated for brevity).

c_count: int64
custdist: int64
----
c_count:
[
[
0,
44,
...
]
]
custdist:
[
[
500,
68,
...
]
]

Simplifying Database Connections

The code samples contained within this post should be treated as one way of implementing Flight SQL interface for a database. We skipped a lot of functionality available, but the aim of this article was to show a minimal example of implementing a Flight SQL enabled server. We can also implicitly see how any Flight SQL client could talk to any database implementing the protocol: the only difference would be to connect to a different host or pass a different set of parameters.