OLAP – Phase 4 Table, Catalog, and Bulk Loading

Phases 1-3 built the storage primitives — vectors, row groups, column segments, and compression. But there’s no concept of a “table” yet. There’s no way to define a schema (CREATE TABLE), no way to add rows one at a time, no way to bulk-load from a CSV, and no way to persist and reload the database across restarts.

This phase adds the Table (schema + row groups + append state), the Catalog (table registry with JSON manifest), and a CSV loader for bulk ingestion.

In DuckDB, these are DataTable (src/storage/data_table.cpp), Catalog (src/catalog/catalog.cpp), and ReadCSV (src/function/table/read_csv.cpp).

Here is the roadmap for the phases to come:

  • Phase 4: Table, catalog, and bulk loading
  • Phase 5: Vectorized expressions and scan/filter/project
  • Phase 6: Hash aggregation
  • Phase 7: Hash join
  • Phase 8: SQL parser
  • Phase 9: Query planner and optimizer
  • Phase 10: Sorting, parallel execution, REPL, and server

Full Source Code

The code referenced in this post can be found in https://gitlab.com/kimserey.lam/olap-learn.

Table

A table is a named collection of row groups with a fixed schema:

1
2
3
4
5
6
7
8
9
10
11
type ColumnDef struct {
    Name string
    Type vector.LogicalType
}

type Table struct {
    Name      string
    Columns   []ColumnDef
    RowGroups []*storage.RowGroup
    append    *AppendState
}

The append field manages a two-stage buffer that converts incoming rows into columnar row groups.

Scanning a Table

The table provides an iterator that yields DataChunks with column projection:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (t *Table) Scan(columnIndices []int) *TableScan {
    return &TableScan{
        table:         t,
        columnIndices: columnIndices,
        rgIdx:         0,
    }
}

func (s *TableScan) Next() *vector.DataChunk {
    for s.rgIdx < len(s.table.RowGroups) {
        rg := s.table.RowGroups[s.rgIdx]
        s.rgIdx++
        chunk := rg.ToDataChunk(s.columnIndices)
        if chunk.Count() > 0 {
            return chunk
        }
    }
    return nil
}

Passing columnIndices = []int{0, 3} means “only decompress columns 0 and 3” — the rest are skipped entirely.


Append State

Data arrives row-by-row (from INSERT or CSV), but the storage format is columnar (DataChunks of 2048 rows, then RowGroups of 122,880 rows). The AppendState manages this conversion as a two-stage pipeline:

1
2
3
4
5
6
type AppendState struct {
    table       *Table
    chunk       *vector.DataChunk
    chunks      []*vector.DataChunk
    rowsInGroup int
}
1
2
3
4
AppendRow(values)
    → buffer into current DataChunk
    → chunk full (2048 rows)? → move to chunks list, start new chunk
    → accumulated rows >= RowGroupSize (122,880)? → flush chunks into RowGroup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (a *AppendState) AppendRow(values []any) error {
    if err := a.chunk.Append(values); err != nil {
        return err
    }
    if a.chunk.Count() >= vector.VectorSize {
        a.flushChunk()
    }
    return nil
}

func (a *AppendState) flushChunk() {
    if a.chunk.Count() == 0 {
        return
    }
    a.chunks = append(a.chunks, a.chunk)
    a.rowsInGroup += a.chunk.Count()
    a.chunk = vector.NewDataChunk(a.table.ColumnTypes())
    if a.rowsInGroup >= storage.RowGroupSize {
        a.flushRowGroup()
    }
}

func (a *AppendState) flushRowGroup() {
    if len(a.chunks) == 0 {
        return
    }
    rg := storage.NewRowGroupFromChunks(a.chunks)
    a.table.RowGroups = append(a.table.RowGroups, rg)
    a.chunks = nil
    a.rowsInGroup = 0
}

After all rows are appended, call Flush() to push any remaining buffered data into a final row group.


Catalog

The Catalog is the top-level container that tracks all tables:

1
2
3
4
5
type Catalog struct {
    mu     sync.RWMutex
    tables map[string]*Table
    dir    string
}

It supports in-memory mode (no persistence) and directory-backed mode:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func NewCatalog() *Catalog {
    return &Catalog{tables: make(map[string]*Table)}
}

func NewCatalogWithDir(dir string) (*Catalog, error) {
    if err := os.MkdirAll(dir, 0o755); err != nil {
        return nil, err
    }
    c := &Catalog{tables: make(map[string]*Table), dir: dir}
    if err := c.loadManifest(); err != nil && !os.IsNotExist(err) {
        return nil, err
    }
    return c, nil
}

Persistence Format

The catalog persists as two things per table:

  1. catalog.json — a JSON manifest with table schemas:
1
2
3
4
5
6
7
8
9
10
[
  {
    "name": "orders",
    "columns": [
      {"name": "id", "type": 2},
      {"name": "region", "type": 4},
      {"name": "amount", "type": 3}
    ]
  }
]
  1. orders.dat — binary file with the row groups (using the file format from Phase 2)

On Save(), the catalog flushes all tables’ append states, writes each table’s row groups to its .dat file, then writes the manifest:

1
2
3
4
5
6
7
8
9
func (c *Catalog) Save() error {
    for _, t := range c.tables {
        t.Flush()
        if err := c.saveTableData(t); err != nil {
            return err
        }
    }
    return c.saveManifest()
}

CSV Bulk Loading

The CSV loader maps CSV column headers to table columns by name, converts string values to typed values, and appends them through the table’s append pipeline:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func LoadCSVFromReader(t *Table, r io.Reader) (int, error) {
    reader := csv.NewReader(r)
    headers, err := reader.Read()
    if err != nil {
        return 0, fmt.Errorf("csv: failed to read header: %w", err)
    }

    colMap, err := buildColumnMapping(t, headers)
    if err != nil {
        return 0, err
    }

    rowCount := 0
    for {
        record, err := reader.Read()
        if err == io.EOF {
            break
        }

        values := make([]any, len(t.Columns))
        for csvIdx, tableIdx := range colMap {
            raw := strings.TrimSpace(record[csvIdx])
            if raw == "" || strings.EqualFold(raw, "null") {
                values[tableIdx] = nil
                continue
            }
            val, err := parseValue(raw, t.Columns[tableIdx].Type)
            if err != nil {
                return rowCount, err
            }
            values[tableIdx] = val
        }

        if err := t.Append(values); err != nil {
            return rowCount, err
        }
        rowCount++
    }
    t.Flush()
    return rowCount, nil
}

Column order in the CSV doesn’t need to match the table — the mapping is built by header name:

1
2
3
4
5
6
7
8
9
10
11
func buildColumnMapping(t *Table, headers []string) (map[int]int, error) {
    colMap := make(map[int]int, len(headers))
    for csvIdx, header := range headers {
        tableIdx := t.ColumnIndex(strings.TrimSpace(header))
        if tableIdx < 0 {
            return nil, fmt.Errorf("csv: column %q not found in table", header)
        }
        colMap[csvIdx] = tableIdx
    }
    return colMap, nil
}

Type conversion is straightforward:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func parseValue(raw string, typ vector.LogicalType) (any, error) {
    switch typ {
    case vector.Int32:
        v, err := strconv.ParseInt(raw, 10, 32)
        return int32(v), err
    case vector.Int64:
        v, err := strconv.ParseInt(raw, 10, 64)
        return v, err
    case vector.Float64:
        v, err := strconv.ParseFloat(raw, 64)
        return v, err
    case vector.Varchar:
        return raw, nil
    case vector.Boolean:
        b, err := strconv.ParseBool(raw)
        return b, err
    }
    return nil, fmt.Errorf("unsupported type: %v", typ)
}

Summary

With table, catalog, and CSV loading in place, we can now define schemas, ingest data, persist to disk, and reload on startup. The append pipeline transparently converts row-oriented input into compressed columnar storage. The catalog handles the bookkeeping — mapping names to tables, persisting schemas as JSON, and storing data as binary row group files.

The next phase adds the execution engine that actually queries this data — vectorized expressions, scan, filter, and projection operators.