diff --git a/benchmarks/README.md b/benchmarks/README.md new file mode 100644 index 0000000..5fa3fcd --- /dev/null +++ b/benchmarks/README.md @@ -0,0 +1,125 @@ +# Benchmarks & demos + +Standalone scripts that exercise xarray-sql against real data. Each declares its +own dependencies inline (PEP 723) and points `xarray_sql` at this checkout, so +they run with no setup: + +```bash +uv run benchmarks/grad_era5.py +``` + +## `grad_era5.py` — differentiable SQL over ARCO-ERA5 + +Demonstrates the autograd feature on a real climate archive +([ARCO-ERA5](https://github.com/google-research/arco-era5), read anonymously +from GCS — needs `gcsfs` and network access). + +The key idea: a physical quantity is written as an **analytic SQL formula** over +ERA5 variables, and `grad(...)` differentiates that formula **symbolically**, +evaluated at every grid cell. Because each row is an independent point, this is +the relational equivalent of `jax.vmap(jax.grad(f))`. It is *not* a finite- +difference spatial gradient — `grad(f(u, v), u)` is the exact partial derivative +of `f`. + +Two worked cases, each checked against an analytic reference: + +| Quantity | SQL | Derivative | Check | +| --- | --- | --- | --- | +| Wind speed | `sqrt(power(u,2) + power(v,2))` | `grad(speed, u) = u/speed` | exact | +| Saturation vapour pressure | `A*exp(B*tc/(tc+C))` | `grad(e_s, T)` | closed-form Clausius-Clapeyron slope | + +Each query round-trips back to an `xarray.Dataset` via `.to_dataset(...)`. + +## `grad_descent.py` — gradient descent as one declarative SQL query + +Fits a line `y ~= a*x + b` by minimising the mean squared error, with the +**entire training loop expressed as a single recursive CTE** — no Python +iteration. Two pieces: + +- **`grad` compiles the update rule.** `xql.differentiate_sql(loss, "a", cols)` + turns the per-row loss into its symbolic derivative *as SQL text* — the + autograd engine as a calculus compiler. +- **A recursive CTE is the optimiser.** `params(step, a, b)` starts at one row + and each recursion appends the next generation, descending along the gradient + (`AVG` of the compiled rule over the data): + + ```sql + WITH RECURSIVE params(step, a, b) AS ( + SELECT 0, 0.0, 0.0 + UNION ALL + SELECT params.step + 1, params.a - lr*AVG(da), params.b - lr*AVG(db) + FROM params CROSS JOIN d WHERE params.step < STEPS + GROUP BY params.step, params.a, params.b) + SELECT * FROM params ORDER BY step + ``` + +So gradient, update, and iteration are all declarative SQL; the trajectory is +the rows of one query. The fit matches numpy's least-squares solution. +Self-contained (no network). + +(Why differentiate to text instead of `grad(...)` inside the recursion? `grad` +needs the Substrait round-trip, and Substrait has no recursion — so a `grad` +marker can't live inside a recursive CTE. Differentiating once to plain SQL +sidesteps that.) + +## `mnist_mlp.py` — an MNIST MLP as relational tensor algebra + +An MLP (196 -> 32 tanh -> 10 softmax on 2x2-pooled 14x14 MNIST) built on one +idea: **a neural net is a chain of tensor contractions (einsums), and an einsum +over coordinate-indexed arrays *is* relational algebra.** + +``` +C[i,k] = sum_j A[i,j] * B[j,k] <=> JOIN A, B ON A.j = B.j + GROUP BY i, k -> SUM(A.val * B.val) +``` + +Contracting a shared index is a join on it followed by a grouped `SUM` over the +indices that survive. In xarray-sql an array indexed by named dims is a table +keyed by those dims, so **the dimension names are the join keys**. + +**The whole network is one relation.** Two moves get there: + +- **Bias folded into the weights (an `nn.Linear`).** Each layer's bias is the + weight of a constant-`1` input, kept as the extra row `inp = width` of the same + weight array — so a layer is a single matrix. +- **A `layer` dimension.** Every layer's weight lives in one + `weight(layer, inp, out)` array, so the forward/backward filter on the `layer` + *column* instead of referencing a table per layer. + +So **the architecture is data**: the whole model is one `xr.Dataset` with a +`layer` dim, registered via `from_dataset`. The dim sizes are the layer widths +and the number of layers is the depth — differing neuron counts are just +differing sizes, NaN-padded in the dense array and dropped on the way in (the +relational form is naturally ragged). Change `WIDTHS` (e.g. `196, 64, 32, 10`) +and the same code trains the deeper net. + +A small `contract()` helper turns an einsum spec into one query, and a single +generic loop trains a net of any shape: + +- **forward** contracts the activation with `weight WHERE layer = L`, adds the + bias row, `tanh` (softmax on the last layer). +- **backward is the *same* operator with indices transposed** — the VJP of a + contraction is a contraction — accumulated into one `gweight` relation, with + `grad(tanh(z), z)` for the only genuinely-calculus part. Even the update is one + query over the whole `weight` relation. Linear algebra is joins; the + derivatives of the nonlinearities are `grad`. + +Everything stays relational: every stage is an inspectable table (`a1`, `delta2`, +`gweight`, …); the only hand-written gradient is softmax + cross-entropy's +`delta = softmax - onehot`. Even the training metrics are a table — each logged +step appends a `(step, loss, train_acc, test_acc)` row to a `metrics` relation +rather than a Python list (NN training produces a lot of such data; it belongs in +rows). Evaluation is SQL too (a forward pass + `ROW_NUMBER()` argmax), and the +trained model, predictions, and metrics all come **back out as xarray** via +`to_dataset`. Reaches ~83% test accuracy over 60 steps. Downloads MNIST on first +run. + +This is not a numpy replacement — relational matmul carries join overhead a BLAS +inner product doesn't. What it buys is a fully declarative, inspectable pipeline +whose data side is chunked xarray (parallel over the batch, larger-than-memory). +The *outer* training loop stays in Python because the steps must be materialised +between iterations: a multi-layer net can't be one recursive CTE (the recursive +relation may be referenced only once, but the weights are used several times per +step), and unrolling the steps as non-recursive CTEs blows up exponentially +(DataFusion inlines CTEs). The thin loop does exactly that materialisation; all +the maths stays in SQL. diff --git a/benchmarks/grad_descent.py b/benchmarks/grad_descent.py new file mode 100644 index 0000000..daff207 --- /dev/null +++ b/benchmarks/grad_descent.py @@ -0,0 +1,115 @@ +# /// script +# requires-python = ">=3.10" +# dependencies = [ +# "xarray_sql", +# "xarray", +# "numpy", +# ] +# +# [tool.uv.sources] +# xarray_sql = { path = "..", editable = true } +# /// +"""Gradient descent as a single declarative SQL query. + +Fits a line ``y ~= a*x + b`` by minimising the mean squared error — with the +**entire training loop expressed as one recursive CTE**, no Python iteration. + +Two pieces: + +1. **grad compiles the update rule.** ``differentiate_sql`` turns the per-row + loss into the symbolic derivative *as SQL text* — the autograd engine acting + as a calculus compiler: + + da = differentiate_sql("(y-(a*x+b))^2", "a") # -> "-2*((a*x+b)-y)*x", etc. + +2. **A recursive CTE is the optimiser.** ``params(step, a, b)`` starts at one + row and each recursion appends the next generation, descending along the + gradient (``AVG`` of the compiled rule over the data): + + params.a - lr * AVG(da) + + So the whole loop — gradient, update, and iteration — is declarative SQL; + the optimisation trajectory is the rows of one query. + +Why two pieces instead of ``grad(...)`` directly inside the recursion? ``grad`` +needs the Substrait round-trip, and Substrait has no recursion — so ``grad`` +can't live inside a recursive CTE (tracked in #194 / a follow-up). Differentiating +once to plain SQL sidesteps that: the recursive query contains no ``grad`` marker. + +Run standalone: + + uv run benchmarks/grad_descent.py +""" + +from __future__ import annotations + +import numpy as np +import xarray as xr + +import xarray_sql as xql + +# Per-row loss r^2 with residual r = y - (a*x + b), over columns a, b, x, y. +RESIDUAL = "(y - (a * x + b))" +LOSS = f"{RESIDUAL} * {RESIDUAL}" +COLUMNS = ["a", "b", "x", "y"] +LR = 0.4 +STEPS = 200 + + +def main() -> None: + rng = np.random.default_rng(0) + n = 500 + x = rng.uniform(0.0, 1.0, n) + a_true, b_true = 2.0, -1.0 + y = a_true * x + b_true + rng.normal(0.0, 0.01, n) + + ctx = xql.XarrayContext() + ctx.from_dataset( + "d", + xr.Dataset( + {"x": (("i",), x), "y": (("i",), y)}, coords={"i": np.arange(n)} + ), + chunks={"i": n}, + ) + + # grad compiles the per-row update rule to SQL, once. + da = xql.differentiate_sql(LOSS, "a", COLUMNS) + db = xql.differentiate_sql(LOSS, "b", COLUMNS) + print(f"d(loss)/da = {da}") + print(f"d(loss)/db = {db}\n") + + # The entire training loop is one declarative recursive query: each step + # appends the next generation, descending along the SQL-computed gradient. + trajectory = ctx.sql( + f""" + WITH RECURSIVE params(step, a, b) AS ( + SELECT 0 AS step, CAST(0.0 AS DOUBLE) AS a, CAST(0.0 AS DOUBLE) AS b + UNION ALL + SELECT params.step + 1 AS step, + params.a - {LR} * AVG({da}) AS a, + params.b - {LR} * AVG({db}) AS b + FROM params CROSS JOIN d + WHERE params.step < {STEPS} + GROUP BY params.step, params.a, params.b + ) + SELECT step, a, b FROM params ORDER BY step + """ + ).to_pandas() + + print("trajectory (every 40th generation):") + print(trajectory.iloc[::40].to_string(index=False)) + + a, b = float(trajectory["a"].iloc[-1]), float(trajectory["b"].iloc[-1]) + a_ols, b_ols = np.polyfit(x, y, 1) + print( + f"\nSQL gradient descent: a={a:.4f} b={b:.4f} ({len(trajectory)} generations)" + ) + print(f"least-squares (numpy): a={a_ols:.4f} b={b_ols:.4f}") + assert abs(a - a_ols) < 1e-2 and abs(b - b_ols) < 1e-2 + print( + "\nOK: a single recursive-CTE query fit the line to the OLS solution." + ) + + +if __name__ == "__main__": + main() diff --git a/benchmarks/grad_era5.py b/benchmarks/grad_era5.py new file mode 100644 index 0000000..866f066 --- /dev/null +++ b/benchmarks/grad_era5.py @@ -0,0 +1,171 @@ +# /// script +# requires-python = ">=3.10" +# dependencies = [ +# "xarray_sql", +# "xarray[io]", +# "gcsfs", +# "numpy", +# ] +# +# [tool.uv.sources] +# xarray_sql = { path = "..", editable = true } +# /// +"""Differentiable SQL over ARCO-ERA5. + +A minimal demonstration of xarray-sql's autograd: take a real climate archive +(ARCO-ERA5, read anonymously from GCS), express a physical quantity as an +*analytic* SQL formula over its variables, and let ``grad(...)`` differentiate +that formula symbolically — evaluated per grid cell, which is the relational +equivalent of ``jax.vmap(jax.grad(f))`` (each row is an independent point). + +Note this is *symbolic* differentiation of an expression, not a finite- +difference spatial gradient: ``grad(f(u, v), u)`` is the exact partial +derivative of the formula ``f``, evaluated at every cell's values. + +Two cases: + +1. Wind-speed magnitude ``speed = sqrt(u^2 + v^2)``. Its sensitivity to the + eastward wind is ``d(speed)/du = u / speed`` — checked exactly. + +2. Saturation vapour pressure ``e_s(T)`` (August-Roche-Magnus form of the + Clausius-Clapeyron relation). ``d(e_s)/dT`` governs how fast the atmosphere's + moisture capacity grows with temperature — checked against the closed-form + slope. + +Run standalone (builds the local extension on first use): + + uv run benchmarks/grad_era5.py +""" + +from __future__ import annotations + +import time + +import numpy as np +import xarray as xr + +import xarray_sql as xql + +ARCO_ERA5 = ( + "gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3" +) + +# ERA5 variable names start with a digit, so they must be double-quoted in SQL. +U = '"10m_u_component_of_wind"' +V = '"10m_v_component_of_wind"' +T = '"2m_temperature"' + + +def load_era5_block() -> xr.Dataset: + """Open ARCO-ERA5 and pull one timestamp over a small region. + + Lazy open of the whole archive; only the requested block is read. We keep + it to a few thousand cells so the demo runs in seconds. + """ + full = xr.open_zarr( + ARCO_ERA5, chunks=None, storage_options={"token": "anon"} + ) + block = ( + full[ + [ + "10m_u_component_of_wind", + "10m_v_component_of_wind", + "2m_temperature", + ] + ] + .sel(time="2020-01-01T00") + # A ~North-America box (index-based to avoid lat-orientation pitfalls). + .isel(latitude=slice(120, 200), longitude=slice(900, 1000)) + .load() + ) + # One partition, so a SQL `ORDER BY latitude DESC` survives the round-trip + # back to xarray (across multiple partitions, to_dataset reconstructs + # coordinates in ascending order regardless of ORDER BY). + return block.chunk() + + +def wind_speed_sensitivity(ctx: xql.XarrayContext, ref: xr.Dataset) -> None: + """grad(sqrt(u^2 + v^2)) checked against the exact u / speed, v / speed.""" + speed = f"sqrt(power({U}, 2) + power({V}, 2))" + out = ctx.sql( + f""" + SELECT + latitude, + longitude, + {speed} AS wind_speed, + grad({speed}, {U}) AS d_speed_d_u, + grad({speed}, {V}) AS d_speed_d_v + FROM era5 + ORDER BY latitude DESC, longitude + """ + ).to_dataset(dims=["latitude", "longitude"]) + + u = ref["10m_u_component_of_wind"] + v = ref["10m_v_component_of_wind"] + speed_ref = np.sqrt(u**2 + v**2) + + xr.testing.assert_allclose( + out["wind_speed"], speed_ref.rename("wind_speed") + ) + xr.testing.assert_allclose( + out["d_speed_d_u"], (u / speed_ref).rename("d_speed_d_u") + ) + xr.testing.assert_allclose( + out["d_speed_d_v"], (v / speed_ref).rename("d_speed_d_v") + ) + print(" wind-speed sensitivity matches u/|w|, v/|w| exactly") + print(out) + + +def clausius_clapeyron(ctx: xql.XarrayContext, ref: xr.Dataset) -> None: + """grad(e_s(T)) checked against the closed-form Clausius-Clapeyron slope.""" + # August-Roche-Magnus: e_s(T) = A * exp(B * tc / (tc + C)), tc = T - 273.15. + a, b, c = 6.1094, 17.625, 243.04 + tc = f"({T} - 273.15)" + es = f"{a} * exp({b} * {tc} / ({tc} + {c}))" + out = ctx.sql( + f""" + SELECT + latitude, + longitude, + {es} AS e_s, + grad({es}, {T}) AS de_s_dt + FROM era5 + ORDER BY latitude DESC, longitude + """ + ).to_dataset(dims=["latitude", "longitude"]) + + # Reference in float64 (the columns are float32): the exact derivative is + # d(e_s)/dT = e_s * B*C / (tc + C)^2. + temp = ref["2m_temperature"].astype("float64") + tc_ref = temp - 273.15 + es_ref = a * np.exp(b * tc_ref / (tc_ref + c)) + des_dt_ref = es_ref * (b * c) / (tc_ref + c) ** 2 + + xr.testing.assert_allclose(out["e_s"], es_ref.rename("e_s"), rtol=1e-5) + xr.testing.assert_allclose( + out["de_s_dt"], des_dt_ref.rename("de_s_dt"), rtol=1e-5 + ) + print(" d(e_s)/dT matches the closed-form Clausius-Clapeyron slope") + print(out) + + +def main() -> None: + t0 = time.time() + ds = load_era5_block() + print(f"loaded ERA5 block {dict(ds.sizes)} in {time.time() - t0:.1f}s") + + ctx = xql.XarrayContext() + ctx.from_dataset("era5", ds) + + print("\n== wind-speed sensitivity: grad(sqrt(u^2 + v^2)) ==") + wind_speed_sensitivity(ctx, ds) + + print("\n== Clausius-Clapeyron: grad(e_s(T)) ==") + clausius_clapeyron(ctx, ds) + + print("\nOK: symbolic SQL gradients match the analytic references.") + + +if __name__ == "__main__": + main() diff --git a/benchmarks/mnist_mlp.py b/benchmarks/mnist_mlp.py new file mode 100644 index 0000000..fe31cee --- /dev/null +++ b/benchmarks/mnist_mlp.py @@ -0,0 +1,459 @@ +# /// script +# requires-python = ">=3.10" +# dependencies = [ +# "xarray_sql", +# "xarray", +# "numpy", +# ] +# +# [tool.uv.sources] +# xarray_sql = { path = "..", editable = true } +# /// +"""Train an MNIST MLP as relational tensor algebra — the whole net is one table. + +A neural network is a chain of **tensor contractions** (einsums), and an einsum +over coordinate-indexed arrays *is* relational algebra: + + C[i,k] = sum_j A[i,j] * B[j,k] <=> JOIN A, B ON A.j = B.j + GROUP BY i, k -> SUM(A.val * B.val) + +Contracting a shared index is a join on it followed by a grouped SUM. In +xarray-sql an array indexed by named dims is a table keyed by those dims, so the +dim names are the join keys. + +Two simplifications make the whole model **one relation**: + +* **Bias folded into the weights (an ``nn.Linear``).** Each layer's bias is the + weight of a constant-``1`` input, stored as the extra row ``inp = width`` in the + same weight array — so a layer is a single matrix. The forward reads the matmul + rows and that bias row from the one relation (no separate bias table). +* **A ``layer`` dimension.** Every layer's weight lives in one + ``weight(layer, inp, out)`` array, so the forward/backward filter on the + ``layer`` *column* instead of referencing a table per layer. The whole network + is one ``xr.Dataset`` registered with ``from_dataset``; differing layer widths + are NaN-padded in the dense array and dropped on the way in (the relational + form is naturally ragged). The architecture is data — change ``WIDTHS`` and the + same code trains a different net. + +A single ``contract()`` and one generic loop train a net of any depth: forward +contracts the activation with ``weight WHERE layer = L``; backward is the same +contraction transposed (the VJP of a contraction is a contraction), with +``grad(tanh(z), z)`` for the one local-derivative step. Even the weight update is +one query over the whole ``weight`` relation. Linear algebra is joins; the +derivatives of the nonlinearities are ``grad``. + +Everything stays relational and inspectable: activations, errors, gradients, and +the per-step training metrics are all tables; the trained model, predictions, and +metrics come back out as ``xarray`` via ``to_dataset``. + +This is not a numpy replacement — the long form puts one matrix entry per row, so +the matmul-as-join carries overhead a BLAS inner product doesn't. What it buys is +a declarative, inspectable pipeline whose data side is chunked xarray (parallel +over the batch, larger-than-memory). Recovering BLAS speed would mean storing +dense *tiles* per cell and contracting them with a tile-matmul — a future +direction, not done here. + +Run standalone (builds the local extension on first use): + + uv run benchmarks/mnist_mlp.py +""" + +from __future__ import annotations + +import gzip +import struct +import tempfile +import time +import urllib.request +from pathlib import Path + +import numpy as np +import pyarrow as pa +import xarray as xr + +import xarray_sql as xql + +MIRROR = "https://storage.googleapis.com/cvdf-datasets/mnist" +CACHE = Path(tempfile.gettempdir()) / "mnist-xql" + +# The architecture, as data: layer widths. 196 pooled pixels -> 32 tanh -> 10. +# Add an entry (e.g. 196, 64, 32, 10) and the same code trains the deeper net. +WIDTHS = [196, 32, 10] +DEPTH = len(WIDTHS) - 1 # number of weight layers +N_TRAIN, N_TEST = 1000, 500 +LR, STEPS, CHUNK = 0.5, 60, 250 + + +# --- the one idea: a tensor contraction is a relational query ----------------- + + +def contract(spec: str, left: str, right: str) -> str: + """An einsum over two coordinate-indexed relations, as one SQL query. + + ``contract("sample,inp * inp,out -> sample,out", "x", w)`` joins ``x`` and + ``w`` on their shared dim ``inp``, groups by the output dims, and sums the + product of values — a matmul. ``left`` / ``right`` are table names or + parenthesised subqueries; each exposes its dims plus a ``val`` column. + Indices in the inputs but not the output are contracted (summed over). + """ + spec = spec.replace(" ", "") + lhs, out = spec.split("->") + da, db = (operand.split(",") for operand in lhs.split("*")) + out_dims = out.split(",") + shared = [d for d in da if d in db] + join = ( + f"JOIN {right} r ON " + " AND ".join(f"l.{d} = r.{d}" for d in shared) + if shared + else f"CROSS JOIN {right} r" + ) + pick = ", ".join(f"{'l' if d in da else 'r'}.{d} AS {d}" for d in out_dims) + return ( + f"SELECT {pick}, SUM(l.val * r.val) AS val " + f"FROM {left} l {join} GROUP BY {', '.join(out_dims)}" + ) + + +def register_tensor( + ctx: xql.XarrayContext, + name: str, + arr: np.ndarray, + dims: tuple[str, ...], + var: str = "val", + chunk: int | None = None, +) -> None: + """Register a numpy array as a relation, the array-relational way: wrap it as + an ``xr.Dataset`` whose named dims become the table's key columns, then hand + it to ``from_dataset``. A tensor is an array at the edge and a relation + inside; ``from_dataset`` is the bridge, and the dims become the join keys.""" + arr = np.asarray(arr, dtype=np.float64) + ds = xr.Dataset( + {var: (dims, arr)}, + coords={d: np.arange(n) for d, n in zip(dims, arr.shape)}, + ) + ctx.from_dataset(name, ds, chunks={dims[0]: chunk or arr.shape[0]}) + + +class Tensors: + """A step rewrites a handful of relations; ``put`` materialises a query as a + named table (the stages of the forward/backward pass).""" + + def __init__(self, ctx: xql.XarrayContext): + self.ctx = ctx + + def put(self, name: str, sql: str) -> None: + batches = self.ctx.sql(sql).collect() + # UNION branches can yield batches that differ only in field nullability; + # cast them all to one (nullable) schema so registration accepts them. + if batches: + target = pa.schema( + [pa.field(f.name, f.type) for f in batches[0].schema] + ) + batches = [b.cast(target) for b in batches] + if self.ctx.table_exist(name): + self.ctx.deregister_table(name) + self.ctx.register_record_batches(name, [batches]) + + +# --- the model: one weight relation, bias folded in --------------------------- + + +def build_model(rng: np.random.Generator) -> xr.Dataset: + """The whole network as one ``weight(layer, inp, out)`` Dataset. + + Layer ``L`` connects ``WIDTHS[L]`` inputs (plus a constant-1 bias input, index + ``WIDTHS[L]``) to ``WIDTHS[L+1]`` outputs. The dense array is NaN-padded to the + widest layer; the padding is dropped when the relation is seeded, so the live + table is the ragged set of real weights. + """ + max_in = max(WIDTHS[layer] + 1 for layer in range(DEPTH)) + max_out = max(WIDTHS[layer + 1] for layer in range(DEPTH)) + arr = np.full((DEPTH, max_in, max_out), np.nan) + for layer in range(DEPTH): + n_in, n_out = WIDTHS[layer], WIDTHS[layer + 1] + arr[layer, :n_in, :n_out] = rng.standard_normal((n_in, n_out)) * 0.1 + arr[layer, n_in, :n_out] = ( + 0.0 # bias row (weight of the constant input) + ) + return xr.Dataset( + {"weight": (("layer", "inp", "out"), arr)}, + coords={ + "layer": np.arange(DEPTH), + "inp": np.arange(max_in), + "out": np.arange(max_out), + }, + ) + + +def matmul_rows(layer: int) -> str: + """The matmul (non-bias) rows of one layer's weight, as a subquery.""" + return f"(SELECT inp, out, val FROM weight WHERE layer = {layer} AND inp < {WIDTHS[layer]})" + + +def bias_row(layer: int) -> str: + """The bias row (inp = width) of one layer's weight, as a subquery over out.""" + return f"(SELECT out, val FROM weight WHERE layer = {layer} AND inp = {WIDTHS[layer]})" + + +# --- the network, as contractions (generic over depth) ------------------------ + + +def forward(t: Tensors, inp: str = "x") -> None: + """Forward pass from ``inp``: per layer, contract with the matmul rows and add + the bias row (both from the one weight relation), then tanh on the hidden + layers. Leaves ``a{L}.z`` for backprop and the output ``logits``.""" + prev = inp + for layer in range(DEPTH): + zc = contract( + "sample,inp * inp,out -> sample,out", prev, matmul_rows(layer) + ) + if layer < DEPTH - 1: + t.put( + f"a{layer + 1}", + f"""WITH zc AS ({zc}) + SELECT zc.sample, zc.out AS inp, zc.val + b.val AS z, + tanh(zc.val + b.val) AS val + FROM zc JOIN {bias_row(layer)} b ON zc.out = b.out""", + ) + prev = f"a{layer + 1}" + else: + t.put( + "logits", + f"""WITH zc AS ({zc}) + SELECT zc.sample, zc.out, zc.val + b.val AS z + FROM zc JOIN {bias_row(layer)} b ON zc.out = b.out""", + ) + + +def softmax_delta_sql() -> str: + """Output error delta = softmax(logits) - onehot(label). The one hand-derived + rule: softmax couples classes through a per-sample normaliser an aggregate + grad() does not cross.""" + return """ + WITH m AS (SELECT sample, MAX(z) AS m FROM logits GROUP BY sample), + e AS (SELECT logits.sample, logits.out, exp(logits.z - m.m) AS e + FROM logits JOIN m ON logits.sample = m.sample), + s AS (SELECT sample, SUM(e) AS s FROM e GROUP BY sample) + SELECT e.sample, e.out, + e.e / s.s - CASE WHEN e.out = y.label THEN 1.0 ELSE 0.0 END AS val + FROM e JOIN s ON e.sample = s.sample JOIN y ON y.sample = e.sample""" + + +def train_step(t: Tensors) -> None: + """Forward, backward (the same contraction transposed), one SGD update.""" + forward(t) + t.put(f"delta{DEPTH}", softmax_delta_sql()) + # Backward: gradients are contractions over the batch, accumulated into one + # gweight relation tagged by layer. delta{L} is the error at layer L's units. + for layer in reversed(range(DEPTH)): + a_in = "x" if layer == 0 else f"a{layer}" + # matmul gradient (mean over batch) + bias gradient (mean of delta), + # both tagged with this layer, as rows of one gweight relation. + gw = contract( + "sample,inp * sample,out -> inp,out", a_in, f"delta{layer + 1}" + ) + rows = ( + f"SELECT CAST({layer} AS BIGINT) AS layer, inp, out, " + f"val / {N_TRAIN} AS val FROM ({gw}) " + f"UNION ALL " + f"SELECT CAST({layer} AS BIGINT) AS layer, " + f"CAST({WIDTHS[layer]} AS BIGINT) AS inp, out, AVG(val) AS val " + f"FROM delta{layer + 1} GROUP BY out" + ) + t.put( + "gweight", + f"SELECT * FROM gweight UNION ALL {rows}" + if t.ctx.table_exist("gweight") + else rows, + ) + if layer > 0: # propagate the cotangent, scaled by the local derivative + dc = contract( + "sample,out * inp,out -> sample,inp", + f"delta{layer + 1}", + matmul_rows(layer), + ) + t.put( + f"delta{layer}", + f"""WITH dc AS ({dc}) + SELECT dc.sample, dc.inp AS out, + dc.val * grad(tanh(a{layer}.z), a{layer}.z) AS val + FROM dc JOIN a{layer} + ON dc.sample = a{layer}.sample AND dc.inp = a{layer}.inp""", + ) + # One SGD update for the whole network: weight <- weight - lr * gweight. + t.put( + "weight", + f"""SELECT w.layer, w.inp, w.out, w.val - {LR} * g.val AS val + FROM weight w JOIN gweight g + ON w.layer = g.layer AND w.inp = g.inp AND w.out = g.out""", + ) + t.ctx.deregister_table("gweight") + + +def accuracy(t: Tensors, inp: str, lab: str) -> float: + """A forward pass over ``inp`` + argmax, compared to ``lab`` — all in SQL.""" + forward(t, inp) + return float( + t.ctx.sql( + f"""WITH pred AS ( + SELECT sample, out, + ROW_NUMBER() OVER (PARTITION BY sample ORDER BY z DESC) AS rk + FROM logits) + SELECT AVG(CASE WHEN p.out = l.label THEN 1.0 ELSE 0.0 END) AS acc + FROM pred p JOIN {lab} l ON p.sample = l.sample WHERE p.rk = 1""" + ).to_pandas()["acc"][0] + ) + + +def record_metrics(t: Tensors, step: int) -> None: + """Append a (step, loss, train_acc, test_acc) row to the ``metrics`` table. + + NN training emits a lot of data — loss curves, per-step accuracies — and like + everything else here it lives as rows in a relation, grown each time, not a + Python list. Read it back at the end as a tidy ``(step,)`` xarray. + """ + train = accuracy(t, "x", "y") # leaves the training forward in `logits` + loss = float( + t.ctx.sql( + """WITH m AS (SELECT sample, MAX(z) AS m FROM logits GROUP BY sample), + e AS (SELECT logits.sample, logits.out, exp(logits.z - m.m) AS e + FROM logits JOIN m ON logits.sample = m.sample), + s AS (SELECT sample, SUM(e) AS s FROM e GROUP BY sample) + SELECT -AVG(ln(e.e / s.s)) AS loss + FROM e JOIN s ON e.sample = s.sample JOIN y ON y.sample = e.sample + WHERE e.out = y.label""" + ).to_pandas()["loss"][0] + ) + test = accuracy(t, "x_te", "y_te") + row = ( + f"SELECT CAST({step} AS BIGINT) AS step, CAST({loss} AS DOUBLE) AS loss, " + f"CAST({train} AS DOUBLE) AS train_acc, CAST({test} AS DOUBLE) AS test_acc" + ) + t.put( + "metrics", + f"SELECT * FROM metrics UNION ALL {row}" + if t.ctx.table_exist("metrics") + else row, + ) + print( + f"step {step:2d}: loss {loss:.3f} train {train:.3f} test {test:.3f}" + ) + + +# --- MNIST loading ------------------------------------------------------------ + + +def _download(url: str, dest: Path, tries: int = 5) -> None: + last = None + for _ in range(tries): + try: + with urllib.request.urlopen(url, timeout=120) as resp: + data = resp.read() + if len(data) < 1024: + raise OSError(f"suspiciously small download: {len(data)} bytes") + dest.write_bytes(data) + return + except Exception as exc: # noqa: BLE001 - retry any transient failure + last = exc + raise OSError(f"failed to download {url}: {last}") + + +def _read_idx(path: Path) -> np.ndarray: + with gzip.open(path, "rb") as f: + (magic,) = struct.unpack(">I", f.read(4)) + if magic == 2051: # images + n, r, c = struct.unpack(">III", f.read(12)) + return np.frombuffer(f.read(), np.uint8).reshape(n, r, c) + struct.unpack(">I", f.read(4)) # labels: skip the count + return np.frombuffer(f.read(), np.uint8) + + +def load_mnist(): + CACHE.mkdir(exist_ok=True) + files = { + "images": "train-images-idx3-ubyte.gz", + "labels": "train-labels-idx1-ubyte.gz", + } + paths = {} + for key, name in files.items(): + dest = CACHE / name + if not dest.exists(): + _download(f"{MIRROR}/{name}", dest) + paths[key] = dest + imgs = _read_idx(paths["images"]).astype(np.float32) / 255.0 + labs = _read_idx(paths["labels"]).astype(np.int64) + side = WIDTHS[0] # pooled pixels per image + pool = 28 // int(round(side**0.5)) # 2 for 196 pixels (14x14) + k = 28 // pool + pooled = ( + imgs.reshape(-1, k, pool, k, pool).mean(axis=(2, 4)).reshape(-1, side) + ) + rng = np.random.default_rng(0) + idx = rng.permutation(len(pooled)) + tr, te = idx[:N_TRAIN], idx[N_TRAIN : N_TRAIN + N_TEST] + return pooled[tr], labs[tr], pooled[te], labs[te] + + +# --- driver ------------------------------------------------------------------- + + +def main() -> None: + Xtr, ytr, Xte, yte = load_mnist() + print(f"MNIST: train {Xtr.shape}, test {Xte.shape} architecture {WIDTHS}") + + ctx = xql.XarrayContext() + # The whole model is one Dataset with a layer dim; from_dataset gives one + # `net` table, and seeding drops the NaN padding to the live `weight` relation. + rng = np.random.default_rng(1) + model = build_model(rng) + ctx.from_dataset( + "net", + model, + chunks={ + "layer": DEPTH, + "inp": model.sizes["inp"], + "out": model.sizes["out"], + }, + ) + t = Tensors(ctx) + t.put( + "weight", + "SELECT layer, inp, out, weight AS val FROM net WHERE weight IS NOT NULL", + ) + + # Inputs and labels (the bias is in the weight relation, so no augmentation). + register_tensor(ctx, "x", Xtr, ("sample", "inp"), chunk=CHUNK) + register_tensor(ctx, "y", ytr, ("sample",), var="label") + register_tensor(ctx, "x_te", Xte, ("sample", "inp")) + register_tensor(ctx, "y_te", yte, ("sample",), var="label") + + print(f"init: test acc {accuracy(t, 'x_te', 'y_te'):.3f}") + t0 = time.time() + for step in range(STEPS): + train_step(t) + if step % 10 == 0 or step == STEPS - 1: + record_metrics(t, step) + dt = time.time() - t0 + + # The trained model, predictions, and metrics all come back out as xarray. + weights = ( + ctx.sql("SELECT layer, inp, out, val FROM weight") + .to_dataset(dims=["layer", "inp", "out"]) + .rename({"val": "weight"}) + ) + metrics = ctx.sql("SELECT * FROM metrics ORDER BY step").to_dataset( + dims=["step"] + ) + + print( + f"\ntrained a {WIDTHS} MLP as relational tensor algebra in {dt:.0f}s: " + f"test accuracy {accuracy(t, 'x_te', 'y_te'):.3f}." + ) + print( + f"the whole model is one weight relation -> xarray " + f"{dict(weights.sizes)}; metrics are a table -> xarray " + f"{list(metrics.data_vars)} over {dict(metrics.sizes)}." + ) + + +if __name__ == "__main__": + main()