May 30, 2023

Data Transfer with Apache Arrow and Golang

Matthew Topol

view of skyscraper from ground looking up
note icon
Voltron Data is building open source standards that support a multi-language language future. Central to this vision is the Apache Arrow project.

To demonstrate Arrow’s cross-language capabilities, Matt Topol, Staff Software Engineer and author of "In-Memory Analytics with Apache Arrow", wrote a series of blogs covering the Apache Arrow Golang (Go) module. This series will help users get started with Arrow and Go and see how you can use both to build effective data science workflows.

In our first post, we introduced the Go module for Arrow and walked through the basics of using builders and creating arrays.

In our second post, we discussed using the Go module for Arrow to read and write several data storage formats: CSV, JSON, and Parquet.

Next up in the series:
- Zero-Copy Sharing Using Apache Arrow and Go



Source: Marcus Olsson via GitHub


And we’re back with our next post covering how to get started with common use cases for Apache Arrow and Golang! In our previous post, we covered how to read and write common storage formats using Arrow. When working with data, handling those storage formats is of course extremely important to data pipelines and workflows. But it’s also just as important to be able to send data back and forth across the wire between different services, machines, and processes.

Efficiently Moving Data Across the Network

One of the benefits that Arrow provides is that the format of the data on the wire is identical to its memory representation. This means there’s no need to serialize and deserialize the data at network boundaries, thus no overhead to passing the data. If you’re building data services or pipelines, this can translate into extremely significant performance benefits.

Arrow IPC

To alleviate the need for serialization/deserialization, the Arrow project defines a columnar interprocess communication (IPC) protocol which is a one-way stream of binary messages. Each message is a small Flatbuffers message containing metadata followed by an optional message body consisting of the raw buffers of data. Similar to other implementations, the Go Arrow module provides an ipc package that contains readers and writers for the IPC format. Both flavors of the format are handled, the IPC Stream format and IPC File format (generally called Arrow Files).

The IPC File format is nearly identical to the Stream format, just with the following additions:

  1. The file starts and ends with the magic string “ARROW1”
  2. Before the ending magic string, the File format has a footer containing a redundant copy of the schema, plus memory offsets and sizes for each of the data blocks in the file to enable random access to any record batch.

Let’s look at an example of a simple HTTP server that streams Arrow IPC data to a client:

package examples

import (
    "net/http"
    // ...
    "github.com/apache/arrow/go/v12/arrow/array"
    "github.com/apache/arrow/go/v12/arrow/ipc"
    // ...
)

func serve() {
    http.HandleFunc("/arrow", func(w http.ResponseWriter, r *http.Request) {
        // do something that gets a stream of record batches
        var rr array.RecordReader = ...
        defer rr.Release()

        w.Header().Set("Content-Type",
            "application/vnd.apache.arrow.stream")
        wr := ipc.NewWriter(w, ipc.WithSchema(rr.Schema()))
        defer wr.Close()

        for rr.Next() {
            if err := wr.Write(rr.Record()); err != nil {
                panic(err)
            }
        }
    })

    log.Fatal(http.ListenAndServe(":8080", nil))
}

Note the “Content-Type” header! Arrow data has officially registered IANA Media types (MIME types) for the IPC format:

And now, a simple example of a client using the ipc.Reader:

func client() {
    resp, err := http.Get("http://localhost:8080/arrow")
    if err != nil {
        panic(err)
    }

    if resp.StatusCode != http.StatusOK {
        panic("bad status")
    }

    // initialize the reader with the stream of data
    rdr, err := ipc.NewReader(resp.Body)
    if err != nil {
        panic(err)
    }
    defer rdr.Release()

    for rdr.Next() {
        rec := rdr.Record()
        // do something with the record
        // every call to Next will release the current record
        // so make sure you call Retain if you need it to live
        // past the loop iteration.
    }

    // if there's an error, Next will return false and Err will be non-nil
    if rdr.Err() != nil {
        panic(rdr.Err())
    }
}

Because the ipc.Reader and ipc.Writer use the Go io.Reader/io.Writer interfaces, anything which implements those interfaces will work with the ipc package. For example, reading from or writing to Amazon S3, or Microsoft Azure ADLS, or any other source/sink you need.

Sometimes, though, it’s convenient to use an existing protocol rather than having to come up with one yourself. To this end, the Arrow project defines an RPC protocol named Arrow Flight.

Apache Arrow Flight RPC



Source: Renee French, Creative Commons 3.0 Attributions License


Apache Arrow Flight is an RPC framework for data services based on Arrow formatted data, built on top of gRPC and the Arrow IPC stream format. Arrow Flight defines methods and message formats using Protobuf, so that any client using gRPC and Arrow can interact with a Flight service without needing the official Flight implementations. As with other implementations, the Go implementation provides an Arrow Flight server and client, along with utilities for both.

At its simplest, the following is how to set up an Arrow Flight Server in Go:

package main

import (
    // ...
    "github.com/apache/arrow/go/v12/arrow/flight"
    // ...
)

// It's required to embed the flight.BaseFlightServer which
// by default will return an "unimplemented" error all methods.
// Implement the functions as pointer receiver methods on this
// and they will be used to service requests.
type server struct {
    flight.BaseFlightServer
}

func main() {
    svr := flight.NewServerWithMiddleware(nil)
    svr.Init("localhost:8888") // using 0 will pick a random available port
    svr.RegisterFlightService(&server{})
    
    // Serve blocks until a fatal error occurs, or it receives one
    // of the signals set with SetShutdownOnSignals or Shutdown is called
    if err := svr.Serve(); err != nil {
        panic(err)
    }
}

A DoGet method can utilize a flight.Writer to write a stream of messages:

func (s *server) DoGet(tkt *flight.Ticket, fs flight.FlightService_DoGetServer) error {
    var (
        schema = arrow.NewSchema( /* ... */ )
        reclist []array.Record = // ...
    )

    w := flight.NewRecordWriter(fs, ipc.WithSchema(schema))
    defer w.Close()

    for _, rec := range reclist {
        if err := w.Write(rec); err != nil {
            return err
        }
        rec.Release()
    }
}

The simplest client then looks like the following:

package main

import (
    ...
    "context"
    ...
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    ...
)

func main() {
    // to use TLS replace the usage of insecure.NewCredentials with the appropriate
    // calls to set up your certificates with x509 etc.
    client, err := flight.NewClientWithMiddleware("localhost:8888", nil,
        nil, grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        panic(err)
    }
    defer client.Close()

    // this is an example of calling DoGet, alternately this 
    // can work with the other methods too
    stream, err := client.DoGet(context.Background(), 
        &flight.Ticket{Ticket: []byte(...)})
    if err != nil {
        panic(err)
    }

    rr, err := flight.NewRecordReader(stream)
    if err != nil {
        panic(err)
    }
    defer rr.Release()

    // flight.Reader embeds an ipc.Reader and so you can retrieve 
    // the records the same way
}

For a much more in-depth description of Arrow Flight, check out the Arrow documentation page. Another great resource is this blog which discusses Arrow Flight and provides a walkthrough to building an Arrow Flight server in both R and Python. As an exercise, you can try following along there and then using the Go Flight client to query it. For the full documentation for the flight package of Arrow Go, just follow this link.

Sometimes though, you need to share your data locally instead of across a network. Well, Arrow has you covered! Our next post will cover the zero-copy sharing of local memory to pass your data around.

In the meantime, if you’re working within the Arrow ecosystem and want to accelerate your success see how Voltron Data Enterprise Support can benefit you.