May 01, 2023

Make Data Files Easier to Work With Using Golang and Apache Arrow

Matthew Topol

aerial shot of highway system at night
note icon
TL;DR Voltron Data is building open source standards that support a multi-language language future. Central to this vision is the Apache Arrow project.
To demonstrate Arrow’s cross-language capabilities, Matt Topol, Staff Software Engineer and author of In-Memory Analytics with Apache Arrow, wrote a series of blogs covering the Apache Arrow Golang (Go) module. This series will help users get started with Arrow and Go and see how you can use both to build effective data science workflows.

In our first post, we introduced the Go Module for Arrow and walked through the basics of using builders and creating arrays.

Next up in the series:
  • Data Transfer Using Apache Arrow and Go
  • Zero-Copy Sharing Using Apache Arrow and Go
  • Welcome back to this series of posts covering the Apache Arrow Go module! In our last post, we covered the basics of using builders and creating arrays. This time we’re covering the first of our common use cases: working with data files.



    [Source]


    Reading and Writing Storage Formats

    While Apache Arrow defines an in-memory representation of columnar data, the Arrow package for most languages provides utilities to convert between that in-memory format and various common long-term storage formats. The Go package is no different and provides for conversions to and from Arrow IPC, CSV, JSON, and Parquet files. Let’s take a look at a few examples.

    JSON

    All the array builders have an UnmarshalJSON method which will add the values it unmarshals to the builder:

    package examples
    
    import (
        "encoding/json"
        "strings"
    
        "github.com/apache/arrow/go/v11/arrow/array"
        "github.com/apache/arrow/go/v11/arrow/memory"
    )
    
    func build() (arrow.Array, error) {
        bldr := array.NewInt32Builder(memory.DefaultAllocator)
        defer bldr.Release()
    
        err := json.Unmarshal([]byte(`[1, 2, null, 4, 5]`), bldr)
        if err != nil {
            return nil, err
        }
    
        return bldr.NewArray(), nil
    }
    

    Since Unmarshal will append the values to a builder, values could be either manually appended before/after unmarshalling JSON, or multiple JSON documents could be unmarshalled into a single array.

    You can also unmarshal nested data types like structs and lists! Let’s look at a slightly more complex JSON example:

    func build() (arrow.Array, error) {
        const jsonData = `[
            { "hello": 1.5, "world": [1, null, 3, 4], "datetimes": [
                {"date": "2005-05-06", "time": "15:02:04.123"},
                {"date": "1956-01-02", "time": "02:10:00"}]},
            { "hello": null, "world": null, "datetimes":
                [ null, { "date": "2022-02-02", "time": "12:00:00" } ] }
        ]`
    
        dataType := arrow.StructOf(
            arrow.Field{Name: "hello",
                        Type: arrow.PrimitiveTypes.Float64, Nullable: true},
            arrow.Field{Name: "world",
                        Type: arrow.ListOf(arrow.PrimitiveTypes.Int32),
                        Nullable: true},
            arrow.Field{Name: "datetimes",
                Type: arrow.FixedSizeListOf(2, arrow.StructOf(
                     arrow.Field{Name: "date",
                                 Type: arrow.FixedWidthTypes.Date32},
                     arrow.Field{Name: "time",
                                 Type: arrow.FixedWidthTypes.Time32ms},
                 ))},
        )
    
        arr, pos, err := array.FromJSON(memory.DefaultAllocator, dataType,
                  strings.NewReader(jsonData))
        if err != nil {
            return nil, err
        }
    
        if pos != len(jsonData) {
            return nil, errors.New("did not use entire input")
        }
    
        return arr, nil
    }
    
    

    Note the usage of strings.NewReader in the FromJSON function. The function takes an io.Reader, and the documentation provides lots of examples of different JSON strings and data types. Have a look!

    Of course, if we’re going to unmarshal from JSON into Arrow arrays, we should also be able to marshal data back to JSON from the array! As a result, the array interface also implements the json.Marshaller interface. Regardless of the data type of your array, or even a full record batch, you can easily encode it to JSON as needed:

    func build() string {
       bldr := array.NewRecordBuilder(memory.DefaultAllocator,
           arrow.NewSchema([]arrow.Field{
               {Name: "bin", Type: arrow.BinaryTypes.Binary, Nullable: true},
               {Name: "list_i64", Type: arrow.ListOf(arrow.PrimitiveTypes.Int64)},
               {Name: "dict", Type: &arrow.DictionaryType{
                   IndexType: arrow.PrimitiveTypes.Int8,
                   ValueType: arrow.BinaryTypes.String}},
           }, nil))
       defer bldr.Release()
    
       fbBin := bldr.Field(0).(*array.BinaryBuilder)
       fbList := bldr.Field(1).(*array.ListBuilder)
       vb := fbList.ValueBuilder().(*array.Int64Builder)
       fbDict := bldr.Field(2).(*array.BinaryDictionaryBuilder)
    
       // binary data will get output as base64 encoded, as per
       // regular Go JSON marshalling.
       fbBin.AppendValues([][]byte{
           []byte("dead"), []byte("beef"),
       }, []bool{true, true})
       fbList.Append(true)
       vb.AppendValues([]int64{1, 2, 3, 4, 5}, nil)
       fbList.Append(true)
       vb.AppendValues([]int64{6, 7, 8}, nil)
       vb.AppendNull()
    
       fbDict.AppendString("foo")
       fbDict.AppendString("bar")
    
       rec := bldr.NewRecord()
       defer rec.Release()
    
       var buf bytes.Buffer
       enc := json.NewEncoder(&buf)
       enc.SetIndent("", "  ")
       if err := enc.Encode(rec); err != nil {
           panic(err)
       }
    
       return buf.String()
    }
    
    

    The return value of the previous function is the following JSON:

    [
      {
         "bin": "ZGVhZA==",
         "dict": "foo",
         "list_i64": [
           1,
           2,
           3,
           4,
           5
         ]
       },
       {
         "bin": "YmVlZg==",
         "dict": "bar",
         "list_i64": [
           6,
           7,
           8,
           null
         ]
       }
     ]
    
    

    There are a couple of specifics to note about that JSON output:

    • As documented in Go’s encoding/json package binary data ([]byte) will be encoded as base64-encoded data. Conversely, if you plan to unmarshal JSON into a binary Array, it must be base64-encoded. Otherwise, you should use the String data type.
    • Dictionary-encoded arrays get expanded out to their values when marshaled. You can unmarshal into a dictionary-encoded array and the dictionary will be created for you by the builder.
    • Null values appropriately translate to a JSON null.

    Let’s move on to CSV files now…

    CSV

    There are two ways to read CSV data in. The default CSV Reader requires specifying the expected schema of your CSV file, as shown in the example in the documentation here. Alternately, there is also a Reader that will infer the schema from the file data and you can stream records out with a goroutine and a channel:

    package examples
    
    import (
        // ... other imports
        "github.com/apache/arrow/go/v11/arrow/csv"
        // ... other imports
    )
    
    func read(data io.ReadCloser) chan arrow.Record {
        // read 500 lines at a time to create record batches
        rdr := csv.NewInferringReader(data, csv.WithChunk(500),
             // strings can be null, and these are the values
             // to consider as null
             csv.WithNullReader(true, "", "null", "[]"),
             // assume the first line is a header line which names the columns
             csv.WithHeader(true))
    
        ch := make(chan arrow.Record, 10)
        go func() {
            defer data.Close()
            defer close(ch)
            for rdr.Next() {
                rec := rdr.Record()
                rec.Retain()
                ch <- rec
            }
    
            if rdr.Err() != nil {
                panic(rdr.Err())
            }
        }()
        return ch
    }
    
    

    There are a few parts of that example I want to highlight, so they don’t get lost as just comments. Specifically, the various options you can pass to the CSV reader (regardless of if you’re using the default reader or the inferring reader).

    • csv.WithChunk: How many lines to read from the CSV file for each record batch
    • csv.WithHeader: Whether or not the first line is a header with the name of each column. What is done with the header depends on the situation:
      • The default reader, with a provided schema, will error if there is the wrong number of columns. Otherwise, it’ll use the column names to map the columns to their appropriate schema field index.
      • The inferring reader will use the column names for generating the inferred schema.
      • The csv.Writer will write out a header line before any output if this option is true (default is false).
    • csv.WithNullReader: The first argument is a boolean defining whether or not strings can be considered null (or if all strings should be considered non-null). Then it takes a variable number of strings which are all of the possible values which should be considered to be null when reading from the CSV file.

    There are other possible options you can pass which can be found in the documentation, so read up!

    Similarly, there is also a csv.Writer which can be used to write CSV data from record batches, see the example in the docs! Also, because the reader and writer use the standard io interfaces of io.Reader and io.Writer, they will easily work with anything that can stream the raw CSV data. This means not only easy usage for local files, but they are simple to utilize for remote files or service requests which return the CSV data.

    Parquet


    Apache Parquet is an open source, columnar data file format that is commonly used for efficient storage and querying of large amounts of data. The Apache Arrow Go module also contains a Parquet package alongside the Arrow package. As with JSON and CSV, utilities are provided for easy reading and writing Parquet files using Arrow data structures.

    First and foremost, there are FromParquet and ToParquet functions to convert between an Arrow schema and a Parquet schema, if necessary. The easiest way to create a new Parquet file from Arrow data is to use an Arrow Table (a collection of chunked columns treated as a single large table) and call WriteTable. More commonly, though, you might have a stream of record batches, such as querying and manipulating a stream of data. Incrementally writing a Parquet file from that stream of record batches could look like this:

    package examples
    
    import (
        // ...
        "github.com/apache/arrow/go/v11/parquet"
        "github.com/apache/arrow/go/v11/parquet/pqarrow"
        // ...
    )
    
    func write(ch chan arrow.Record) {
        // get first record
        rec := <-ch
    
        f, err := os.Create("test.parquet")
        ...
        // we'll use the default writer properties, but you could easily pass
        // properties to customize the writer
        props := parquet.NewWriterProperties()
        writer, err := pqarrow.NewFileWriter(rec.Schema(), f, props,
            pqarrow.DefaultWriteProps())
        if err != nil {
            panic(err)
        }
        defer writer.Close()
    
        if err := writer.Write(rec); err != nil {
            panic(err)
        }
        rec.Release()
    
        for rec := range ch {
            // this will create a new record batch every time
            // you could alternately call WriteBuffered to buffer
            // an entire row group from multiple record batches
            if err := writer.Write(rec); err != nil {
                panic(err)
            }
            rec.Release()
        }
    }
    
    

    If you look at the documentation, you’ll see that the writer takes the humble io.Writer interface. This interface only requires a single Write method that takes a byte slice, allowing it to write files remotely to locations like AWS S3 or Microsoft Azure ADLS just as easily as writing a local file.

    Reading data from a Parquet file is similar. You can use ReadTable to simply read an entire Parquet file into an arrow.Table in one shot, or use the following example to get yourself a RecordReader to stream from:

    package examples
    
    import (
        // ...
        "github.com/apache/arrow/go/v11/arrow/array"
        "github.com/apache/arrow/go/v11/arrow/memory"
        "github.com/apache/arrow/go/v11/parquet"
        "github.com/apache/arrow/go/v11/parquet/file"
        "github.com/apache/arrow/go/v11/parquet/pqarrow"
        // ...
    )
    
    func getReader(fileName string) (array.RecordReader, io.Closer, error) {
        f, err := os.Open(fileName)
        if err != nil {
            return nil, nil, err
        }
        // you can also pass various read options if you desire, but we'll take
        // the defaults
        rdr, err := file.NewParquetReader(f)
        if err != nil {
            f.Close()
            return nil, nil, err
        }
        // the parquet reader is for reading/writing parquet data directly.
        // So let's wrap it with a pqarrow reader to set up converting to Arrow
        arrRdr, err := pqarrow.NewFileReader(rdr,
            pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
        if err != nil {
            rdr.Close()
            return nil, nil, err
        }
    
        // you could pass a list of column indices to limit which
        // columns are read or a list of rowGroup indices to limit
        // which row groups are read we'll pass nil so it just reads everything
        rr, err := arrRdr.GetRecordReader(context.Background(), nil, nil)
        if err != nil {
            rdr.Close()
            return nil, nil, err
        }
        return rr, rdr, nil
    }
    
    

    Check out the documentation for the various options and ways you can get data into or out of Parquet files that are available in the package. Some of the options provided are:

    • Controlling most aspects of how the file is read such as skipping columns, or even skipping entire row groups.
    • Set the batch size for reading or writing
    • Choosing whether or not to buffer a row group in memory (if you want to write multiple columns simultaneously) or write it serially (much lower memory usage, but have to write the entire column at a time).
    • Reading and/or writing encrypted Parquet files (file-level and column-level, etc.)

    Lastly, you’ll notice in the documentation that all of the functions to read parquet files utilize the same interface: parquet.ReaderAtSeeker. This interface is a simple combination of the io.ReaderAt and io.Seeker interfaces. That means that anything which implements just two methods can be used to supply the raw parquet data:

    ReadAt(p []byte, off int64) (n int, err error)
    Seek(offset int64, whence int) (int64, error)
    
    

    As a result, it’s extremely easy to use this library to read Parquet files both locally or remotely on cloud storage services like Google Cloud Platform (GCP) or AWS S3.

    Moving right along…

    Moving data into and out of storage formats is probably one of the most common use cases for any workflow that utilizes that data. In the next post, we’ll cover the other most common use case: Moving data across a network between different processes and machines.