From 67286dbb237ddef7a21c111a761fd2c8d86da43d Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 28 Jun 2026 14:52:29 +0000 Subject: [PATCH 1/5] Add differentiable-SQL demos: ARCO-ERA5 and gradient descent Stacked demo branch (on the autograd feature) holding the runnable benchmark scripts, kept out of the core branch so it stays reviewable. * grad_era5.py: symbolic grad over real ARCO-ERA5 data (wind-speed sensitivity checked exactly; saturation vapour pressure checked against the closed-form Clausius-Clapeyron slope). The queries ORDER BY latitude DESC, longitude to match ERA5's native order, so results line up with the xarray reference with no sorting on either side (single partition, so the order survives to_dataset). * grad_descent.py: gradient descent as ONE declarative recursive-CTE query. differentiate_sql compiles the per-row update rule to SQL once; a recursive CTE then iterates it. No Python loop. Fit matches numpy least-squares. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_017mDoFJgsm9kS7SicGoCVF6 --- benchmarks/README.md | 64 ++++++++++++++ benchmarks/grad_descent.py | 115 +++++++++++++++++++++++++ benchmarks/grad_era5.py | 171 +++++++++++++++++++++++++++++++++++++ 3 files changed, 350 insertions(+) create mode 100644 benchmarks/README.md create mode 100644 benchmarks/grad_descent.py create mode 100644 benchmarks/grad_era5.py diff --git a/benchmarks/README.md b/benchmarks/README.md new file mode 100644 index 0000000..5a0188c --- /dev/null +++ b/benchmarks/README.md @@ -0,0 +1,64 @@ +# Benchmarks & demos + +Standalone scripts that exercise xarray-sql against real data. Each declares its +own dependencies inline (PEP 723) and points `xarray_sql` at this checkout, so +they run with no setup: + +```bash +uv run benchmarks/grad_era5.py +``` + +## `grad_era5.py` — differentiable SQL over ARCO-ERA5 + +Demonstrates the autograd feature on a real climate archive +([ARCO-ERA5](https://github.com/google-research/arco-era5), read anonymously +from GCS — needs `gcsfs` and network access). + +The key idea: a physical quantity is written as an **analytic SQL formula** over +ERA5 variables, and `grad(...)` differentiates that formula **symbolically**, +evaluated at every grid cell. Because each row is an independent point, this is +the relational equivalent of `jax.vmap(jax.grad(f))`. It is *not* a finite- +difference spatial gradient — `grad(f(u, v), u)` is the exact partial derivative +of `f`. + +Two worked cases, each checked against an analytic reference: + +| Quantity | SQL | Derivative | Check | +| --- | --- | --- | --- | +| Wind speed | `sqrt(power(u,2) + power(v,2))` | `grad(speed, u) = u/speed` | exact | +| Saturation vapour pressure | `A*exp(B*tc/(tc+C))` | `grad(e_s, T)` | closed-form Clausius-Clapeyron slope | + +Each query round-trips back to an `xarray.Dataset` via `.to_dataset(...)`. + +## `grad_descent.py` — gradient descent as one declarative SQL query + +Fits a line `y ~= a*x + b` by minimising the mean squared error, with the +**entire training loop expressed as a single recursive CTE** — no Python +iteration. Two pieces: + +- **`grad` compiles the update rule.** `xql.differentiate_sql(loss, "a", cols)` + turns the per-row loss into its symbolic derivative *as SQL text* — the + autograd engine as a calculus compiler. +- **A recursive CTE is the optimiser.** `params(step, a, b)` starts at one row + and each recursion appends the next generation, descending along the gradient + (`AVG` of the compiled rule over the data): + + ```sql + WITH RECURSIVE params(step, a, b) AS ( + SELECT 0, 0.0, 0.0 + UNION ALL + SELECT params.step + 1, params.a - lr*AVG(da), params.b - lr*AVG(db) + FROM params CROSS JOIN d WHERE params.step < STEPS + GROUP BY params.step, params.a, params.b) + SELECT * FROM params ORDER BY step + ``` + +So gradient, update, and iteration are all declarative SQL; the trajectory is +the rows of one query. The fit matches numpy's least-squares solution. +Self-contained (no network). + +(Why differentiate to text instead of `grad(...)` inside the recursion? `grad` +needs the Substrait round-trip, and Substrait has no recursion — so a `grad` +marker can't live inside a recursive CTE. Differentiating once to plain SQL +sidesteps that.) + diff --git a/benchmarks/grad_descent.py b/benchmarks/grad_descent.py new file mode 100644 index 0000000..daff207 --- /dev/null +++ b/benchmarks/grad_descent.py @@ -0,0 +1,115 @@ +# /// script +# requires-python = ">=3.10" +# dependencies = [ +# "xarray_sql", +# "xarray", +# "numpy", +# ] +# +# [tool.uv.sources] +# xarray_sql = { path = "..", editable = true } +# /// +"""Gradient descent as a single declarative SQL query. + +Fits a line ``y ~= a*x + b`` by minimising the mean squared error — with the +**entire training loop expressed as one recursive CTE**, no Python iteration. + +Two pieces: + +1. **grad compiles the update rule.** ``differentiate_sql`` turns the per-row + loss into the symbolic derivative *as SQL text* — the autograd engine acting + as a calculus compiler: + + da = differentiate_sql("(y-(a*x+b))^2", "a") # -> "-2*((a*x+b)-y)*x", etc. + +2. **A recursive CTE is the optimiser.** ``params(step, a, b)`` starts at one + row and each recursion appends the next generation, descending along the + gradient (``AVG`` of the compiled rule over the data): + + params.a - lr * AVG(da) + + So the whole loop — gradient, update, and iteration — is declarative SQL; + the optimisation trajectory is the rows of one query. + +Why two pieces instead of ``grad(...)`` directly inside the recursion? ``grad`` +needs the Substrait round-trip, and Substrait has no recursion — so ``grad`` +can't live inside a recursive CTE (tracked in #194 / a follow-up). Differentiating +once to plain SQL sidesteps that: the recursive query contains no ``grad`` marker. + +Run standalone: + + uv run benchmarks/grad_descent.py +""" + +from __future__ import annotations + +import numpy as np +import xarray as xr + +import xarray_sql as xql + +# Per-row loss r^2 with residual r = y - (a*x + b), over columns a, b, x, y. +RESIDUAL = "(y - (a * x + b))" +LOSS = f"{RESIDUAL} * {RESIDUAL}" +COLUMNS = ["a", "b", "x", "y"] +LR = 0.4 +STEPS = 200 + + +def main() -> None: + rng = np.random.default_rng(0) + n = 500 + x = rng.uniform(0.0, 1.0, n) + a_true, b_true = 2.0, -1.0 + y = a_true * x + b_true + rng.normal(0.0, 0.01, n) + + ctx = xql.XarrayContext() + ctx.from_dataset( + "d", + xr.Dataset( + {"x": (("i",), x), "y": (("i",), y)}, coords={"i": np.arange(n)} + ), + chunks={"i": n}, + ) + + # grad compiles the per-row update rule to SQL, once. + da = xql.differentiate_sql(LOSS, "a", COLUMNS) + db = xql.differentiate_sql(LOSS, "b", COLUMNS) + print(f"d(loss)/da = {da}") + print(f"d(loss)/db = {db}\n") + + # The entire training loop is one declarative recursive query: each step + # appends the next generation, descending along the SQL-computed gradient. + trajectory = ctx.sql( + f""" + WITH RECURSIVE params(step, a, b) AS ( + SELECT 0 AS step, CAST(0.0 AS DOUBLE) AS a, CAST(0.0 AS DOUBLE) AS b + UNION ALL + SELECT params.step + 1 AS step, + params.a - {LR} * AVG({da}) AS a, + params.b - {LR} * AVG({db}) AS b + FROM params CROSS JOIN d + WHERE params.step < {STEPS} + GROUP BY params.step, params.a, params.b + ) + SELECT step, a, b FROM params ORDER BY step + """ + ).to_pandas() + + print("trajectory (every 40th generation):") + print(trajectory.iloc[::40].to_string(index=False)) + + a, b = float(trajectory["a"].iloc[-1]), float(trajectory["b"].iloc[-1]) + a_ols, b_ols = np.polyfit(x, y, 1) + print( + f"\nSQL gradient descent: a={a:.4f} b={b:.4f} ({len(trajectory)} generations)" + ) + print(f"least-squares (numpy): a={a_ols:.4f} b={b_ols:.4f}") + assert abs(a - a_ols) < 1e-2 and abs(b - b_ols) < 1e-2 + print( + "\nOK: a single recursive-CTE query fit the line to the OLS solution." + ) + + +if __name__ == "__main__": + main() diff --git a/benchmarks/grad_era5.py b/benchmarks/grad_era5.py new file mode 100644 index 0000000..866f066 --- /dev/null +++ b/benchmarks/grad_era5.py @@ -0,0 +1,171 @@ +# /// script +# requires-python = ">=3.10" +# dependencies = [ +# "xarray_sql", +# "xarray[io]", +# "gcsfs", +# "numpy", +# ] +# +# [tool.uv.sources] +# xarray_sql = { path = "..", editable = true } +# /// +"""Differentiable SQL over ARCO-ERA5. + +A minimal demonstration of xarray-sql's autograd: take a real climate archive +(ARCO-ERA5, read anonymously from GCS), express a physical quantity as an +*analytic* SQL formula over its variables, and let ``grad(...)`` differentiate +that formula symbolically — evaluated per grid cell, which is the relational +equivalent of ``jax.vmap(jax.grad(f))`` (each row is an independent point). + +Note this is *symbolic* differentiation of an expression, not a finite- +difference spatial gradient: ``grad(f(u, v), u)`` is the exact partial +derivative of the formula ``f``, evaluated at every cell's values. + +Two cases: + +1. Wind-speed magnitude ``speed = sqrt(u^2 + v^2)``. Its sensitivity to the + eastward wind is ``d(speed)/du = u / speed`` — checked exactly. + +2. Saturation vapour pressure ``e_s(T)`` (August-Roche-Magnus form of the + Clausius-Clapeyron relation). ``d(e_s)/dT`` governs how fast the atmosphere's + moisture capacity grows with temperature — checked against the closed-form + slope. + +Run standalone (builds the local extension on first use): + + uv run benchmarks/grad_era5.py +""" + +from __future__ import annotations + +import time + +import numpy as np +import xarray as xr + +import xarray_sql as xql + +ARCO_ERA5 = ( + "gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3" +) + +# ERA5 variable names start with a digit, so they must be double-quoted in SQL. +U = '"10m_u_component_of_wind"' +V = '"10m_v_component_of_wind"' +T = '"2m_temperature"' + + +def load_era5_block() -> xr.Dataset: + """Open ARCO-ERA5 and pull one timestamp over a small region. + + Lazy open of the whole archive; only the requested block is read. We keep + it to a few thousand cells so the demo runs in seconds. + """ + full = xr.open_zarr( + ARCO_ERA5, chunks=None, storage_options={"token": "anon"} + ) + block = ( + full[ + [ + "10m_u_component_of_wind", + "10m_v_component_of_wind", + "2m_temperature", + ] + ] + .sel(time="2020-01-01T00") + # A ~North-America box (index-based to avoid lat-orientation pitfalls). + .isel(latitude=slice(120, 200), longitude=slice(900, 1000)) + .load() + ) + # One partition, so a SQL `ORDER BY latitude DESC` survives the round-trip + # back to xarray (across multiple partitions, to_dataset reconstructs + # coordinates in ascending order regardless of ORDER BY). + return block.chunk() + + +def wind_speed_sensitivity(ctx: xql.XarrayContext, ref: xr.Dataset) -> None: + """grad(sqrt(u^2 + v^2)) checked against the exact u / speed, v / speed.""" + speed = f"sqrt(power({U}, 2) + power({V}, 2))" + out = ctx.sql( + f""" + SELECT + latitude, + longitude, + {speed} AS wind_speed, + grad({speed}, {U}) AS d_speed_d_u, + grad({speed}, {V}) AS d_speed_d_v + FROM era5 + ORDER BY latitude DESC, longitude + """ + ).to_dataset(dims=["latitude", "longitude"]) + + u = ref["10m_u_component_of_wind"] + v = ref["10m_v_component_of_wind"] + speed_ref = np.sqrt(u**2 + v**2) + + xr.testing.assert_allclose( + out["wind_speed"], speed_ref.rename("wind_speed") + ) + xr.testing.assert_allclose( + out["d_speed_d_u"], (u / speed_ref).rename("d_speed_d_u") + ) + xr.testing.assert_allclose( + out["d_speed_d_v"], (v / speed_ref).rename("d_speed_d_v") + ) + print(" wind-speed sensitivity matches u/|w|, v/|w| exactly") + print(out) + + +def clausius_clapeyron(ctx: xql.XarrayContext, ref: xr.Dataset) -> None: + """grad(e_s(T)) checked against the closed-form Clausius-Clapeyron slope.""" + # August-Roche-Magnus: e_s(T) = A * exp(B * tc / (tc + C)), tc = T - 273.15. + a, b, c = 6.1094, 17.625, 243.04 + tc = f"({T} - 273.15)" + es = f"{a} * exp({b} * {tc} / ({tc} + {c}))" + out = ctx.sql( + f""" + SELECT + latitude, + longitude, + {es} AS e_s, + grad({es}, {T}) AS de_s_dt + FROM era5 + ORDER BY latitude DESC, longitude + """ + ).to_dataset(dims=["latitude", "longitude"]) + + # Reference in float64 (the columns are float32): the exact derivative is + # d(e_s)/dT = e_s * B*C / (tc + C)^2. + temp = ref["2m_temperature"].astype("float64") + tc_ref = temp - 273.15 + es_ref = a * np.exp(b * tc_ref / (tc_ref + c)) + des_dt_ref = es_ref * (b * c) / (tc_ref + c) ** 2 + + xr.testing.assert_allclose(out["e_s"], es_ref.rename("e_s"), rtol=1e-5) + xr.testing.assert_allclose( + out["de_s_dt"], des_dt_ref.rename("de_s_dt"), rtol=1e-5 + ) + print(" d(e_s)/dT matches the closed-form Clausius-Clapeyron slope") + print(out) + + +def main() -> None: + t0 = time.time() + ds = load_era5_block() + print(f"loaded ERA5 block {dict(ds.sizes)} in {time.time() - t0:.1f}s") + + ctx = xql.XarrayContext() + ctx.from_dataset("era5", ds) + + print("\n== wind-speed sensitivity: grad(sqrt(u^2 + v^2)) ==") + wind_speed_sensitivity(ctx, ds) + + print("\n== Clausius-Clapeyron: grad(e_s(T)) ==") + clausius_clapeyron(ctx, ds) + + print("\nOK: symbolic SQL gradients match the analytic references.") + + +if __name__ == "__main__": + main() From b8d3e83da8e802de1afc2e3ab4e8acd7e4503d0a Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 28 Jun 2026 14:46:27 +0000 Subject: [PATCH 2/5] Add MNIST MLP trained in SQL (benchmarks/mnist_mlp.py) A one-hidden-layer MLP (196->32 tanh->10 softmax, on 2x2-pooled 14x14 MNIST) trained by gradient descent with every gradient computed in SQL. The images are registered as xarray (the library's core); the model weights and per-step intermediates are DataFusion in-memory tables (register_record_batches), so a matmul is a join over them and there's no xarray pivot per step. Reverse-mode autodiff as relational algebra: matmul = join + GROUP BY SUM; the hidden activation's local Jacobian = grad(tanh(z), z); cotangent propagation = join; parameter gradients = join + GROUP BY AVG. The only hand-written gradient is softmax + cross-entropy's delta = softmax - onehot. ~83% test accuracy in ~20s. Adds a benchmarks README entry. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_017mDoFJgsm9kS7SicGoCVF6 --- benchmarks/README.md | 21 +++ benchmarks/mnist_mlp.py | 311 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 332 insertions(+) create mode 100644 benchmarks/mnist_mlp.py diff --git a/benchmarks/README.md b/benchmarks/README.md index 5a0188c..f1c6283 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -62,3 +62,24 @@ needs the Substrait round-trip, and Substrait has no recursion — so a `grad` marker can't live inside a recursive CTE. Differentiating once to plain SQL sidesteps that.) +## `mnist_mlp.py` — train an MNIST MLP classifier in SQL + +A one-hidden-layer neural network (196 -> 32 tanh -> 10 softmax, on 2x2-pooled +14x14 MNIST) trained by gradient descent where **every gradient is computed in +SQL**; the optimisation loop is plain Python. It is reverse-mode autodiff +expressed as relational algebra: + +- **matmul = join + `GROUP BY SUM`** — a layer's pre-activation is + `SUM(input * weight)` grouped by (sample, unit). +- **local derivatives = `grad()`** — the hidden activation's Jacobian is + `grad(tanh(z), z)`, the autograd feature doing the calculus per (sample, unit). +- **cotangent propagation = join**, **parameter gradients = join + `GROUP BY + AVG`**. + +The MNIST images are registered as xarray (the library's core); the model +weights and per-step intermediates are DataFusion in-memory tables (a matmul is +a join over them). The only hand-written gradient is softmax + cross-entropy's +`delta = softmax - onehot` (softmax couples classes through a per-sample +normaliser, an aggregate `grad` does not cross). Reaches ~83% test accuracy in +~20s. Downloads MNIST on first run. + diff --git a/benchmarks/mnist_mlp.py b/benchmarks/mnist_mlp.py new file mode 100644 index 0000000..516ee83 --- /dev/null +++ b/benchmarks/mnist_mlp.py @@ -0,0 +1,311 @@ +# /// script +# requires-python = ">=3.10" +# dependencies = [ +# "xarray_sql", +# "xarray", +# "numpy", +# "pyarrow", +# ] +# +# [tool.uv.sources] +# xarray_sql = { path = "..", editable = true } +# /// +"""Train an MNIST MLP classifier in SQL. + +A one-hidden-layer neural network (196->32 tanh->10 softmax, on 2x2-pooled +14x14 = 196-pixel images) trained by gradient descent where **every gradient is +computed in SQL**. The MNIST images are registered as xarray (the library's +core); the model weights and per-step intermediates live in DataFusion +in-memory tables. The optimisation loop is plain Python; all the math is +relational. + +The design is reverse-mode autodiff expressed in relational algebra: + +* **matmul = join + GROUP BY SUM.** A layer's pre-activation is + ``SUM(input * weight)`` grouped by (sample, unit), joining the data table to a + weight table on the shared index. +* **local derivatives = grad().** The hidden activation's Jacobian is + ``grad(tanh(z), z)`` — the engine differentiates the nonlinearity for us, + evaluated per (sample, unit). This is where the autograd feature does its + work; the rest is ordinary SQL. +* **cotangent propagation = join.** The output error is pushed back through the + second weight matrix by another join + SUM, then multiplied by the local + ``grad`` factor to get the hidden-layer error. +* **parameter gradients = join + GROUP BY AVG.** ``dW = AVG(input * delta)`` + grouped by the weight's indices. + +The only hand-written gradient is softmax + cross-entropy's ``delta = softmax - +onehot`` (softmax couples classes through a per-sample normaliser, an aggregate +``grad`` does not cross — staying faithful to SQL). Everything else is grad and +joins. + +Run standalone (builds the local extension on first use): + + uv run benchmarks/mnist_mlp.py +""" + +from __future__ import annotations + +import gzip +import struct +import tempfile +import time +import urllib.request +from pathlib import Path + +import numpy as np +import pyarrow as pa +import xarray as xr + +import xarray_sql as xql + +MIRROR = "https://storage.googleapis.com/cvdf-datasets/mnist" +CACHE = Path(tempfile.gettempdir()) / "mnist-xql" + +# Network dimensions: 14x14 pooled pixels -> 32 hidden (tanh) -> 10 classes. +N_TRAIN, N_TEST, N_PIX, N_HID, N_CLS = 1000, 500, 196, 32, 10 + + +def _download(url: str, dest: Path, tries: int = 5) -> None: + """Fetch a URL to dest, reading the whole body (retries on truncation).""" + last = None + for attempt in range(tries): + try: + with urllib.request.urlopen(url, timeout=120) as resp: + data = resp.read() + if len(data) < 1024: + raise OSError(f"suspiciously small download: {len(data)} bytes") + dest.write_bytes(data) + return + except Exception as exc: # noqa: BLE001 - retry any transient failure + last = exc + raise OSError(f"failed to download {url}: {last}") + + +def _read_idx(path: Path) -> np.ndarray: + with gzip.open(path, "rb") as f: + (magic,) = struct.unpack(">I", f.read(4)) + if magic == 2051: # images + n, r, c = struct.unpack(">III", f.read(12)) + return np.frombuffer(f.read(), np.uint8).reshape(n, r, c) + (n,) = struct.unpack(">I", f.read(4)) # labels + return np.frombuffer(f.read(), np.uint8) + + +def load_mnist(): + """Download (and cache) MNIST, 2x2 mean-pool to 14x14, subsample.""" + CACHE.mkdir(exist_ok=True) + files = { + "images": "train-images-idx3-ubyte.gz", + "labels": "train-labels-idx1-ubyte.gz", + } + paths = {} + for key, name in files.items(): + dest = CACHE / name + if not dest.exists(): + _download(f"{MIRROR}/{name}", dest) + paths[key] = dest + + imgs = _read_idx(paths["images"]).astype(np.float32) / 255.0 + labs = _read_idx(paths["labels"]).astype(np.int64) + pooled = imgs.reshape(-1, 14, 2, 14, 2).mean(axis=(2, 4)).reshape(-1, N_PIX) + + rng = np.random.default_rng(0) + idx = rng.permutation(len(pooled)) + tr, te = idx[:N_TRAIN], idx[N_TRAIN : N_TRAIN + N_TEST] + return pooled[tr], labs[tr], pooled[te], labs[te] + + +class SqlTables: + """Model parameters and intermediates as DataFusion in-memory tables. + + The MNIST data stays registered as xarray (the library's core); the model + weights and the per-step intermediate results (hidden activations, errors) + are plain in-memory tables, rebuilt from Arrow each step. Matrices are stored + in long form — a weight ``W[i, j]`` is a row ``(i, j, w)`` — so a matmul is a + join + ``GROUP BY``. + """ + + def __init__(self, ctx: xql.XarrayContext): + self.ctx = ctx + + def _replace(self, name: str, batches: list[pa.RecordBatch]) -> None: + if self.ctx.table_exist(name): + self.ctx.deregister_table(name) + self.ctx.register_record_batches(name, [batches]) + + def matrix( + self, name: str, var: str, arr: np.ndarray, di: str, dj: str + ) -> None: + """Register a 2-D array as a long ``(di, dj, var)`` in-memory table.""" + ni, nj = arr.shape + ii, jj = np.meshgrid(np.arange(ni), np.arange(nj), indexing="ij") + batch = pa.RecordBatch.from_pydict( + {di: ii.ravel(), dj: jj.ravel(), var: arr.ravel()} + ) + self._replace(name, [batch]) + + def vector(self, name: str, var: str, arr: np.ndarray, d0: str) -> None: + """Register a 1-D array as a ``(d0, var)`` in-memory table.""" + batch = pa.RecordBatch.from_pydict( + {d0: np.arange(len(arr)), var: np.asarray(arr, dtype=np.float64)} + ) + self._replace(name, [batch]) + + def materialize(self, name: str, sql: str) -> None: + """Run a query and register its Arrow result as the next stage's table.""" + self._replace(name, self.ctx.sql(sql).collect()) + + +def main() -> None: + Xtr, ytr, Xte, yte = load_mnist() + print( + f"MNIST: train {Xtr.shape}, test {Xte.shape} ({N_PIX} pix, {N_HID} hidden)" + ) + + ctx = xql.XarrayContext() + # The data is registered as xarray (the library's core); model state below + # lives in DataFusion in-memory tables. + ctx.from_dataset( + "imgs", + xr.Dataset( + {"val": (("sample", "pix"), Xtr)}, + coords={"sample": np.arange(N_TRAIN), "pix": np.arange(N_PIX)}, + ), + chunks={"sample": N_TRAIN}, + ) + ctx.from_dataset( + "labels", + xr.Dataset( + {"label": (("sample",), ytr.astype(np.float64))}, + coords={"sample": np.arange(N_TRAIN)}, + ), + chunks={"sample": N_TRAIN}, + ) + t = SqlTables(ctx) + + rng = np.random.default_rng(1) + W1 = rng.standard_normal((N_PIX, N_HID)) * 0.1 + b1 = np.zeros(N_HID) + W2 = rng.standard_normal((N_HID, N_CLS)) * 0.1 + b2 = np.zeros(N_CLS) + + def dense_to(df, ni, nj, ci, cj): + g = np.zeros((ni, nj)) + g[df[ci].to_numpy(), df[cj].to_numpy()] = df["g"].to_numpy() + return g + + def step(lr: float) -> None: + nonlocal W1, b1, W2, b2 + t.matrix("w1", "w", W1, "pix", "hid") + t.vector("b1", "b", b1, "hid") + t.matrix("w2", "w", W2, "hid", "cls") + t.vector("b2", "b", b2, "cls") + + # Forward: hidden pre-activation z and activation a = tanh(z). + t.materialize( + "h", + """ + WITH z AS ( + SELECT i.sample, w.hid, SUM(i.val * w.w) + MAX(bb.b) AS z + FROM imgs i JOIN w1 w ON i.pix = w.pix + JOIN b1 bb ON w.hid = bb.hid + GROUP BY i.sample, w.hid) + SELECT sample, hid, z, tanh(z) AS a FROM z + """, + ) + # Output softmax, then output error delta2 = softmax - onehot(label). + t.materialize( + "delta2", + """ + WITH logit AS ( + SELECT h.sample, w.cls, SUM(h.a * w.w) + MAX(bb.b) AS z + FROM h JOIN w2 w ON h.hid = w.hid + JOIN b2 bb ON w.cls = bb.cls + GROUP BY h.sample, w.cls), + mx AS (SELECT sample, MAX(z) AS m FROM logit GROUP BY sample), + ex AS (SELECT l.sample, l.cls, exp(l.z - mx.m) AS e + FROM logit l JOIN mx ON l.sample = mx.sample), + zsum AS (SELECT sample, SUM(e) AS z FROM ex GROUP BY sample) + SELECT ex.sample, ex.cls, + ex.e / zsum.z + - CASE WHEN ex.cls = lb.label THEN 1.0 ELSE 0.0 END AS d + FROM ex JOIN zsum ON ex.sample = zsum.sample + JOIN labels lb ON ex.sample = lb.sample + """, + ) + # Backprop to the hidden layer: push delta2 back through W2 (join + SUM), + # then multiply by the LOCAL activation derivative grad(tanh(z), z). + t.materialize( + "delta1", + """ + WITH da AS ( + SELECT d.sample, w.hid, SUM(d.d * w.w) AS da + FROM delta2 d JOIN w2 w ON d.cls = w.cls + GROUP BY d.sample, w.hid) + SELECT da.sample, da.hid, da.da * grad(tanh(h.z), h.z) AS d + FROM da JOIN h ON da.sample = h.sample AND da.hid = h.hid + """, + ) + + # Parameter gradients: dW = AVG(input * delta) over the batch. + gW2 = dense_to( + ctx.sql( + f"SELECT h.hid, d.cls, SUM(h.a * d.d) / {N_TRAIN}.0 AS g " + "FROM h JOIN delta2 d ON h.sample = d.sample " + "GROUP BY h.hid, d.cls" + ).to_pandas(), + N_HID, + N_CLS, + "hid", + "cls", + ) + gW1 = dense_to( + ctx.sql( + f"SELECT i.pix, d.hid, SUM(i.val * d.d) / {N_TRAIN}.0 AS g " + "FROM imgs i JOIN delta1 d ON i.sample = d.sample " + "GROUP BY i.pix, d.hid" + ).to_pandas(), + N_PIX, + N_HID, + "pix", + "hid", + ) + gb2 = ctx.sql( + f"SELECT cls, SUM(d) / {N_TRAIN}.0 AS g FROM delta2 GROUP BY cls" + ).to_pandas() + gb1 = ctx.sql( + f"SELECT hid, SUM(d) / {N_TRAIN}.0 AS g FROM delta1 GROUP BY hid" + ).to_pandas() + vb2 = np.zeros(N_CLS) + vb2[gb2["cls"].to_numpy()] = gb2["g"].to_numpy() + vb1 = np.zeros(N_HID) + vb1[gb1["hid"].to_numpy()] = gb1["g"].to_numpy() + + W2 -= lr * gW2 + b2 -= lr * vb2 + W1 -= lr * gW1 + b1 -= lr * vb1 + + def accuracy(X, y) -> float: + a = np.tanh(X @ W1 + b1) + return float(((a @ W2 + b2).argmax(1) == y).mean()) + + print(f"init: test acc {accuracy(Xte, yte):.3f}") + t0 = time.time() + steps = 60 + for s in range(steps): + step(lr=0.5) + if s % 10 == 0 or s == steps - 1: + print( + f"step {s:2d}: train {accuracy(Xtr, ytr):.3f} " + f"test {accuracy(Xte, yte):.3f}" + ) + print( + f"\ntrained an MNIST MLP in SQL: test accuracy " + f"{accuracy(Xte, yte):.3f} in {time.time() - t0:.0f}s" + ) + + +if __name__ == "__main__": + main() From 06fabbc66473e33200ecde05dc8a45af094b2aa8 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Tue, 30 Jun 2026 17:09:39 +0300 Subject: [PATCH 3/5] demo: train the MNIST MLP as one append-only model table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rewrite mnist_mlp.py so the whole model and its entire training history live in a single append-only table model(step, layer, i, j, val): every parameter is a row tagged by generation, and a training step appends the next generation's rows rather than mutating anything. Each step is a single SQL statement (forward, grad(tanh(z),z) backprop, parameter update); evaluation is SQL too (a forward pass with ROW_NUMBER() for the argmax). Python no longer holds the weights or computes any gradients — it only sequences the steps. A 2-layer net can't be one recursive CTE (the recursive relation may be referenced only once, but W1/W2 are used several times per step) and unrolling the steps as non-recursive CTEs blows up exponentially (DataFusion inlines CTEs; no MATERIALIZED). Materialising between steps is therefore host-driven; the thin loop does exactly that. Reaches ~83% test accuracy over 60 steps. Co-Authored-By: Claude Opus 4.8 --- benchmarks/README.md | 40 ++-- benchmarks/mnist_mlp.py | 424 ++++++++++++++++++++++------------------ 2 files changed, 265 insertions(+), 199 deletions(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index f1c6283..e89e8f5 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -65,21 +65,37 @@ sidesteps that.) ## `mnist_mlp.py` — train an MNIST MLP classifier in SQL A one-hidden-layer neural network (196 -> 32 tanh -> 10 softmax, on 2x2-pooled -14x14 MNIST) trained by gradient descent where **every gradient is computed in -SQL**; the optimisation loop is plain Python. It is reverse-mode autodiff -expressed as relational algebra: +14x14 MNIST) where **every gradient is computed in SQL** and the whole model — +with its entire training history — lives in a single table. + +The model is one append-only table `model(step, layer, i, j, val)`: every +parameter is a row, tagged by which generation (`step`) it belongs to. **A +training step never mutates anything; it appends the next generation's rows.** +`WHERE step = N` is the model at iteration N, and the full trajectory is the +table. Each step is a *single* SQL statement that reads the current generation +and writes the next — reverse-mode autodiff as relational algebra: - **matmul = join + `GROUP BY SUM`** — a layer's pre-activation is `SUM(input * weight)` grouped by (sample, unit). - **local derivatives = `grad()`** — the hidden activation's Jacobian is `grad(tanh(z), z)`, the autograd feature doing the calculus per (sample, unit). - **cotangent propagation = join**, **parameter gradients = join + `GROUP BY - AVG`**. - -The MNIST images are registered as xarray (the library's core); the model -weights and per-step intermediates are DataFusion in-memory tables (a matmul is -a join over them). The only hand-written gradient is softmax + cross-entropy's -`delta = softmax - onehot` (softmax couples classes through a per-sample -normaliser, an aggregate `grad` does not cross). Reaches ~83% test accuracy in -~20s. Downloads MNIST on first run. - + AVG`**, and the update `w - lr*g` is emitted as the next generation's rows. + +The images are registered as xarray (the library's core); evaluation is SQL too +(a forward pass with `ROW_NUMBER()` for the argmax). The only hand-written +gradient is softmax + cross-entropy's `delta = softmax - onehot` (softmax couples +classes through a per-sample normaliser, which an aggregate `grad` does not +cross). Reaches ~83% test accuracy over 60 steps (~140s on a laptop — the +parameter updates run in SQL and every generation is kept as rows, so it trades +speed for a fully relational, fully inspectable training history). Downloads +MNIST on first run. + +Why is the *outer* loop still Python rather than one recursive query (like +`grad_descent.py`)? A recursive CTE may reference the recursive relation only +once, but a 2-layer net uses the current weights several times per step (W1 and +W2 forward, W2 again in backprop), so it can't be a single recursive statement. +Training is also sequential and reuses each step's result, so steps must be +*materialised* between iterations — which is exactly what the thin loop does +(append a generation, then query it). All the maths stays in SQL; Python only +sequences the steps. diff --git a/benchmarks/mnist_mlp.py b/benchmarks/mnist_mlp.py index 516ee83..4e1d81b 100644 --- a/benchmarks/mnist_mlp.py +++ b/benchmarks/mnist_mlp.py @@ -12,32 +12,45 @@ # /// """Train an MNIST MLP classifier in SQL. -A one-hidden-layer neural network (196->32 tanh->10 softmax, on 2x2-pooled -14x14 = 196-pixel images) trained by gradient descent where **every gradient is -computed in SQL**. The MNIST images are registered as xarray (the library's -core); the model weights and per-step intermediates live in DataFusion -in-memory tables. The optimisation loop is plain Python; all the math is -relational. +A one-hidden-layer network (196->32 tanh->10 softmax, on 2x2-pooled 14x14 = +196-pixel images) trained by gradient descent where **every gradient is computed +in SQL** — and the whole model, with its entire training history, lives in a +single table. -The design is reverse-mode autodiff expressed in relational algebra: +The model is one append-only table ``model(step, layer, i, j, val)``: every +parameter ``W1[i, j]`` / ``b1[i]`` / ``W2`` / ``b2`` is a row, tagged by which +generation (``step``) it belongs to. **A training step never mutates anything — +it appends the next generation's rows.** The full optimisation trajectory is the +table; ``WHERE step = N`` is the model at iteration N. + +Each step is a *single* SQL statement (``STEP`` below) that reads the current +generation and writes the next. It is reverse-mode autodiff as relational +algebra: * **matmul = join + GROUP BY SUM.** A layer's pre-activation is - ``SUM(input * weight)`` grouped by (sample, unit), joining the data table to a - weight table on the shared index. -* **local derivatives = grad().** The hidden activation's Jacobian is - ``grad(tanh(z), z)`` — the engine differentiates the nonlinearity for us, - evaluated per (sample, unit). This is where the autograd feature does its - work; the rest is ordinary SQL. -* **cotangent propagation = join.** The output error is pushed back through the - second weight matrix by another join + SUM, then multiplied by the local - ``grad`` factor to get the hidden-layer error. -* **parameter gradients = join + GROUP BY AVG.** ``dW = AVG(input * delta)`` - grouped by the weight's indices. + ``SUM(input * weight)`` grouped by (sample, unit), joining the data to the + current weight rows. +* **local derivatives = grad().** The hidden Jacobian is ``grad(tanh(z), z)`` — + the autograd feature differentiates the nonlinearity, per (sample, unit). +* **cotangent propagation = join.** The output error is pushed back through W2 by + another join + SUM, then scaled by the local ``grad`` factor. +* **parameter gradients = join + GROUP BY AVG**, and the update is ``w - lr*g``, + emitted as the next generation's rows. The only hand-written gradient is softmax + cross-entropy's ``delta = softmax - -onehot`` (softmax couples classes through a per-sample normaliser, an aggregate -``grad`` does not cross — staying faithful to SQL). Everything else is grad and -joins. +onehot`` (softmax couples classes through a per-sample normaliser, which an +aggregate ``grad`` does not cross — staying faithful to SQL). Everything else is +grad and joins. Evaluation is SQL too: a forward pass with ``ROW_NUMBER()`` for +the argmax. + +Why is the *outer* loop still Python rather than one recursive query (like +``grad_descent.py``)? A recursive CTE may reference the recursive relation only +once, but a 2-layer net uses the current weights several times per step (W1 and +W2 in the forward pass, W2 again in backprop), so it cannot be a single recursive +statement. Training is also inherently sequential and reuses each step's result, +so the steps must be *materialised* between iterations — which is exactly what the +thin Python loop does (append a generation, then query it). All the maths stays +in SQL; Python only sequences the steps. Run standalone (builds the local extension on first use): @@ -64,12 +77,13 @@ # Network dimensions: 14x14 pooled pixels -> 32 hidden (tanh) -> 10 classes. N_TRAIN, N_TEST, N_PIX, N_HID, N_CLS = 1000, 500, 196, 32, 10 +LR, STEPS = 0.5, 60 def _download(url: str, dest: Path, tries: int = 5) -> None: """Fetch a URL to dest, reading the whole body (retries on truncation).""" last = None - for attempt in range(tries): + for _ in range(tries): try: with urllib.request.urlopen(url, timeout=120) as resp: data = resp.read() @@ -116,194 +130,230 @@ def load_mnist(): return pooled[tr], labs[tr], pooled[te], labs[te] -class SqlTables: - """Model parameters and intermediates as DataFusion in-memory tables. +# --- the model as rows -------------------------------------------------------- - The MNIST data stays registered as xarray (the library's core); the model - weights and the per-step intermediate results (hidden activations, errors) - are plain in-memory tables, rebuilt from Arrow each step. Matrices are stored - in long form — a weight ``W[i, j]`` is a row ``(i, j, w)`` — so a matmul is a - join + ``GROUP BY``. - """ +_MODEL_SCHEMA = pa.schema( + [ + ("step", pa.int64()), + ("layer", pa.utf8()), + ("i", pa.int64()), + ("j", pa.int64()), + ("val", pa.float64()), + ] +) - def __init__(self, ctx: xql.XarrayContext): - self.ctx = ctx - - def _replace(self, name: str, batches: list[pa.RecordBatch]) -> None: - if self.ctx.table_exist(name): - self.ctx.deregister_table(name) - self.ctx.register_record_batches(name, [batches]) - - def matrix( - self, name: str, var: str, arr: np.ndarray, di: str, dj: str - ) -> None: - """Register a 2-D array as a long ``(di, dj, var)`` in-memory table.""" - ni, nj = arr.shape - ii, jj = np.meshgrid(np.arange(ni), np.arange(nj), indexing="ij") - batch = pa.RecordBatch.from_pydict( - {di: ii.ravel(), dj: jj.ravel(), var: arr.ravel()} - ) - self._replace(name, [batch]) - def vector(self, name: str, var: str, arr: np.ndarray, d0: str) -> None: - """Register a 1-D array as a ``(d0, var)`` in-memory table.""" - batch = pa.RecordBatch.from_pydict( - {d0: np.arange(len(arr)), var: np.asarray(arr, dtype=np.float64)} - ) - self._replace(name, [batch]) +def _param_rows(step: int, layer: str, arr: np.ndarray) -> dict: + """One layer's parameters as ``(step, layer, i, j, val)`` columns. - def materialize(self, name: str, sql: str) -> None: - """Run a query and register its Arrow result as the next stage's table.""" - self._replace(name, self.ctx.sql(sql).collect()) + A matrix ``W[i, j]`` becomes rows ``(i, j, w)``; a bias vector ``b[i]`` + becomes ``(i, 0, b)``. + """ + if arr.ndim == 2: + ii, jj = np.meshgrid( + np.arange(arr.shape[0]), np.arange(arr.shape[1]), indexing="ij" + ) + ii, jj = ii.ravel(), jj.ravel() + else: + ii, jj = np.arange(arr.size), np.zeros(arr.size, np.int64) + n = arr.size + return { + "step": np.full(n, step, np.int64), + "layer": [layer] * n, + "i": ii.astype(np.int64), + "j": jj.astype(np.int64), + "val": arr.ravel().astype(np.float64), + } -def main() -> None: - Xtr, ytr, Xte, yte = load_mnist() - print( - f"MNIST: train {Xtr.shape}, test {Xte.shape} ({N_PIX} pix, {N_HID} hidden)" +def _generation_batch(step, w1, b1, w2, b2) -> pa.RecordBatch: + """All four layers of one generation as a single RecordBatch.""" + cols: dict[str, list] = {k: [] for k in ("step", "layer", "i", "j", "val")} + for layer, arr in (("w1", w1), ("b1", b1), ("w2", w2), ("b2", b2)): + for k, v in _param_rows(step, layer, arr).items(): + cols[k].extend(list(v)) + return pa.RecordBatch.from_arrays( + [ + pa.array(cols["step"], pa.int64()), + pa.array(cols["layer"], pa.utf8()), + pa.array(cols["i"], pa.int64()), + pa.array(cols["j"], pa.int64()), + pa.array(cols["val"], pa.float64()), + ], + schema=_MODEL_SCHEMA, ) - ctx = xql.XarrayContext() - # The data is registered as xarray (the library's core); model state below - # lives in DataFusion in-memory tables. + +# One training step, as one SQL statement: read the current generation of the +# model table, run the forward + backward pass over the data, and SELECT the next +# generation's parameter rows (which the loop appends to the model table). +STEP = f""" +WITH cur AS (SELECT max(step) AS s FROM model), + w1 AS (SELECT i AS pix, j AS hid, val AS w FROM model, cur + WHERE step = cur.s AND layer = 'w1'), + b1 AS (SELECT i AS hid, val AS b FROM model, cur + WHERE step = cur.s AND layer = 'b1'), + w2 AS (SELECT i AS hid, j AS cls, val AS w FROM model, cur + WHERE step = cur.s AND layer = 'w2'), + b2 AS (SELECT i AS cls, val AS b FROM model, cur + WHERE step = cur.s AND layer = 'b2'), + -- forward: hidden pre-activation z and activation a = tanh(z) + zt AS (SELECT i.sample, w.hid, SUM(i.val * w.w) + MAX(bb.b) AS z + FROM imgs i JOIN w1 w ON i.pix = w.pix JOIN b1 bb ON w.hid = bb.hid + GROUP BY i.sample, w.hid), + h AS (SELECT sample, hid, z, tanh(z) AS a FROM zt), + -- output logits, then a stable softmax + lg AS (SELECT h.sample, w.cls, SUM(h.a * w.w) + MAX(bb.b) AS z + FROM h JOIN w2 w ON h.hid = w.hid JOIN b2 bb ON w.cls = bb.cls + GROUP BY h.sample, w.cls), + mx AS (SELECT sample, MAX(z) AS m FROM lg GROUP BY sample), + ex AS (SELECT l.sample, l.cls, exp(l.z - mx.m) AS e + FROM lg l JOIN mx ON l.sample = mx.sample), + zs AS (SELECT sample, SUM(e) AS z FROM ex GROUP BY sample), + -- output error delta2 = softmax - onehot(label) + d2 AS (SELECT ex.sample, ex.cls, + ex.e / zs.z + - CASE WHEN ex.cls = lb.label THEN 1.0 ELSE 0.0 END AS d + FROM ex JOIN zs ON ex.sample = zs.sample + JOIN labels lb ON lb.sample = ex.sample), + -- backprop to hidden: push delta2 through W2, scale by grad(tanh(z), z) + da AS (SELECT d.sample, w.hid, SUM(d.d * w.w) AS da + FROM d2 d JOIN w2 w ON d.cls = w.cls GROUP BY d.sample, w.hid), + d1 AS (SELECT da.sample, da.hid, da.da * grad(tanh(h.z), h.z) AS d + FROM da JOIN h ON da.sample = h.sample AND da.hid = h.hid), + -- parameter gradients: dW = AVG(input * delta) over the batch + gw1 AS (SELECT i.pix, d.hid, AVG(i.val * d.d) AS g + FROM imgs i JOIN d1 d ON i.sample = d.sample GROUP BY i.pix, d.hid), + gb1 AS (SELECT hid, AVG(d) AS g FROM d1 GROUP BY hid), + gw2 AS (SELECT h.hid, d.cls, AVG(h.a * d.d) AS g + FROM h JOIN d2 d ON h.sample = d.sample GROUP BY h.hid, d.cls), + gb2 AS (SELECT cls, AVG(d) AS g FROM d2 GROUP BY cls) +-- the next generation: w - lr*grad, tagged step+1, as model rows +SELECT (SELECT s FROM cur) + 1 AS step, 'w1' AS layer, + w.pix AS i, w.hid AS j, w.w - {LR} * g.g AS val +FROM w1 w JOIN gw1 g ON w.pix = g.pix AND w.hid = g.hid +UNION ALL +SELECT (SELECT s FROM cur) + 1, 'b1', b.hid, CAST(0 AS BIGINT), b.b - {LR} * g.g +FROM b1 b JOIN gb1 g ON b.hid = g.hid +UNION ALL +SELECT (SELECT s FROM cur) + 1, 'w2', w.hid, w.cls, w.w - {LR} * g.g +FROM w2 w JOIN gw2 g ON w.hid = g.hid AND w.cls = g.cls +UNION ALL +SELECT (SELECT s FROM cur) + 1, 'b2', b.cls, CAST(0 AS BIGINT), b.b - {LR} * g.g +FROM b2 b JOIN gb2 g ON b.cls = g.cls +""" + + +def eval_sql(imgs_table: str, labels_table: str) -> str: + """Accuracy of the latest model on a dataset — a forward pass in SQL. + + ``ROW_NUMBER()`` picks each sample's argmax class; it is compared to the + label. No softmax needed at inference: the argmax of the logits is the + prediction. + """ + return f""" + WITH cur AS (SELECT max(step) AS s FROM model), + w1 AS (SELECT i AS pix, j AS hid, val AS w FROM model, cur + WHERE step = cur.s AND layer = 'w1'), + b1 AS (SELECT i AS hid, val AS b FROM model, cur + WHERE step = cur.s AND layer = 'b1'), + w2 AS (SELECT i AS hid, j AS cls, val AS w FROM model, cur + WHERE step = cur.s AND layer = 'w2'), + b2 AS (SELECT i AS cls, val AS b FROM model, cur + WHERE step = cur.s AND layer = 'b2'), + h AS (SELECT i.sample, w.hid, + tanh(SUM(i.val * w.w) + MAX(bb.b)) AS a + FROM {imgs_table} i JOIN w1 w ON i.pix = w.pix + JOIN b1 bb ON w.hid = bb.hid + GROUP BY i.sample, w.hid), + lg AS (SELECT h.sample, w.cls, SUM(h.a * w.w) + MAX(bb.b) AS z + FROM h JOIN w2 w ON h.hid = w.hid JOIN b2 bb ON w.cls = bb.cls + GROUP BY h.sample, w.cls), + pred AS (SELECT sample, cls, + ROW_NUMBER() OVER (PARTITION BY sample ORDER BY z DESC) AS rk + FROM lg) + SELECT AVG(CASE WHEN p.cls = l.label THEN 1.0 ELSE 0.0 END) AS acc + FROM pred p JOIN {labels_table} l ON p.sample = l.sample + WHERE p.rk = 1 + """ + + +def _register_images(ctx, name, X): ctx.from_dataset( - "imgs", + name, xr.Dataset( - {"val": (("sample", "pix"), Xtr)}, - coords={"sample": np.arange(N_TRAIN), "pix": np.arange(N_PIX)}, + {"val": (("sample", "pix"), X)}, + coords={ + "sample": np.arange(X.shape[0]), + "pix": np.arange(N_PIX), + }, ), - chunks={"sample": N_TRAIN}, + chunks={"sample": X.shape[0]}, ) + + +def _register_labels(ctx, name, y): ctx.from_dataset( - "labels", + name, xr.Dataset( - {"label": (("sample",), ytr.astype(np.float64))}, - coords={"sample": np.arange(N_TRAIN)}, + {"label": (("sample",), y.astype(np.float64))}, + coords={"sample": np.arange(len(y))}, ), - chunks={"sample": N_TRAIN}, + chunks={"sample": len(y)}, + ) + + +def main() -> None: + Xtr, ytr, Xte, yte = load_mnist() + print( + f"MNIST: train {Xtr.shape}, test {Xte.shape} " + f"({N_PIX} pix, {N_HID} hidden, {N_CLS} classes)" ) - t = SqlTables(ctx) + ctx = xql.XarrayContext() + # The data is registered as xarray (the library's core); the model below is + # the one append-only table that holds every layer and every generation. + _register_images(ctx, "imgs", Xtr) + _register_labels(ctx, "labels", ytr) + _register_images(ctx, "imgs_te", Xte) + _register_labels(ctx, "labels_te", yte) + + # Generation 0: small random weights, zero biases. rng = np.random.default_rng(1) - W1 = rng.standard_normal((N_PIX, N_HID)) * 0.1 - b1 = np.zeros(N_HID) - W2 = rng.standard_normal((N_HID, N_CLS)) * 0.1 - b2 = np.zeros(N_CLS) - - def dense_to(df, ni, nj, ci, cj): - g = np.zeros((ni, nj)) - g[df[ci].to_numpy(), df[cj].to_numpy()] = df["g"].to_numpy() - return g - - def step(lr: float) -> None: - nonlocal W1, b1, W2, b2 - t.matrix("w1", "w", W1, "pix", "hid") - t.vector("b1", "b", b1, "hid") - t.matrix("w2", "w", W2, "hid", "cls") - t.vector("b2", "b", b2, "cls") - - # Forward: hidden pre-activation z and activation a = tanh(z). - t.materialize( - "h", - """ - WITH z AS ( - SELECT i.sample, w.hid, SUM(i.val * w.w) + MAX(bb.b) AS z - FROM imgs i JOIN w1 w ON i.pix = w.pix - JOIN b1 bb ON w.hid = bb.hid - GROUP BY i.sample, w.hid) - SELECT sample, hid, z, tanh(z) AS a FROM z - """, - ) - # Output softmax, then output error delta2 = softmax - onehot(label). - t.materialize( - "delta2", - """ - WITH logit AS ( - SELECT h.sample, w.cls, SUM(h.a * w.w) + MAX(bb.b) AS z - FROM h JOIN w2 w ON h.hid = w.hid - JOIN b2 bb ON w.cls = bb.cls - GROUP BY h.sample, w.cls), - mx AS (SELECT sample, MAX(z) AS m FROM logit GROUP BY sample), - ex AS (SELECT l.sample, l.cls, exp(l.z - mx.m) AS e - FROM logit l JOIN mx ON l.sample = mx.sample), - zsum AS (SELECT sample, SUM(e) AS z FROM ex GROUP BY sample) - SELECT ex.sample, ex.cls, - ex.e / zsum.z - - CASE WHEN ex.cls = lb.label THEN 1.0 ELSE 0.0 END AS d - FROM ex JOIN zsum ON ex.sample = zsum.sample - JOIN labels lb ON ex.sample = lb.sample - """, - ) - # Backprop to the hidden layer: push delta2 back through W2 (join + SUM), - # then multiply by the LOCAL activation derivative grad(tanh(z), z). - t.materialize( - "delta1", - """ - WITH da AS ( - SELECT d.sample, w.hid, SUM(d.d * w.w) AS da - FROM delta2 d JOIN w2 w ON d.cls = w.cls - GROUP BY d.sample, w.hid) - SELECT da.sample, da.hid, da.da * grad(tanh(h.z), h.z) AS d - FROM da JOIN h ON da.sample = h.sample AND da.hid = h.hid - """, - ) + gen0 = _generation_batch( + 0, + rng.standard_normal((N_PIX, N_HID)) * 0.1, + np.zeros(N_HID), + rng.standard_normal((N_HID, N_CLS)) * 0.1, + np.zeros(N_CLS), + ) + generations = [gen0] + ctx.register_record_batches("model", [generations]) - # Parameter gradients: dW = AVG(input * delta) over the batch. - gW2 = dense_to( - ctx.sql( - f"SELECT h.hid, d.cls, SUM(h.a * d.d) / {N_TRAIN}.0 AS g " - "FROM h JOIN delta2 d ON h.sample = d.sample " - "GROUP BY h.hid, d.cls" - ).to_pandas(), - N_HID, - N_CLS, - "hid", - "cls", - ) - gW1 = dense_to( - ctx.sql( - f"SELECT i.pix, d.hid, SUM(i.val * d.d) / {N_TRAIN}.0 AS g " - "FROM imgs i JOIN delta1 d ON i.sample = d.sample " - "GROUP BY i.pix, d.hid" - ).to_pandas(), - N_PIX, - N_HID, - "pix", - "hid", + def test_acc() -> float: + return float( + ctx.sql(eval_sql("imgs_te", "labels_te")).to_pandas()["acc"][0] ) - gb2 = ctx.sql( - f"SELECT cls, SUM(d) / {N_TRAIN}.0 AS g FROM delta2 GROUP BY cls" - ).to_pandas() - gb1 = ctx.sql( - f"SELECT hid, SUM(d) / {N_TRAIN}.0 AS g FROM delta1 GROUP BY hid" - ).to_pandas() - vb2 = np.zeros(N_CLS) - vb2[gb2["cls"].to_numpy()] = gb2["g"].to_numpy() - vb1 = np.zeros(N_HID) - vb1[gb1["hid"].to_numpy()] = gb1["g"].to_numpy() - - W2 -= lr * gW2 - b2 -= lr * vb2 - W1 -= lr * gW1 - b1 -= lr * vb1 - - def accuracy(X, y) -> float: - a = np.tanh(X @ W1 + b1) - return float(((a @ W2 + b2).argmax(1) == y).mean()) - - print(f"init: test acc {accuracy(Xte, yte):.3f}") + + print(f"init: test acc {test_acc():.3f}") t0 = time.time() - steps = 60 - for s in range(steps): - step(lr=0.5) - if s % 10 == 0 or s == steps - 1: - print( - f"step {s:2d}: train {accuracy(Xtr, ytr):.3f} " - f"test {accuracy(Xte, yte):.3f}" + for s in range(STEPS): + # One SQL statement computes the next generation; appending its rows to + # the model table *is* the parameter update. + generations.extend(ctx.sql(STEP).collect()) + ctx.deregister_table("model") + ctx.register_record_batches("model", [generations]) + if s % 10 == 0 or s == STEPS - 1: + tr = float( + ctx.sql(eval_sql("imgs", "labels")).to_pandas()["acc"][0] ) + print(f"step {s:2d}: train {tr:.3f} test {test_acc():.3f}") + + n_rows = ctx.sql("SELECT count(*) AS n FROM model").to_pandas()["n"][0] print( - f"\ntrained an MNIST MLP in SQL: test accuracy " - f"{accuracy(Xte, yte):.3f} in {time.time() - t0:.0f}s" + f"\ntrained an MNIST MLP in SQL: test accuracy {test_acc():.3f} " + f"in {time.time() - t0:.0f}s.\nThe model and its entire training " + f"history are one table of {n_rows} rows ({STEPS + 1} generations)." ) From c92447ae8ee1d95ee2be4fca0fb74cf056ff9f3e Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Tue, 30 Jun 2026 17:45:00 +0300 Subject: [PATCH 4/5] demo: data-driven deep MLP with the model and metrics as relations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make the architecture itself data. The whole model is one xr.Dataset: each layer's weight is a data_var w{L} over its boundary dims (u{L}, u{L+1}), sharing the dims that connect adjacent layers (the join keys). The dim sizes are the layer widths and the number of weights is the depth, so differing neuron counts are just differing dim sizes — no padding, because the relational long form is naturally ragged. from_dataset splits the one Dataset into a table per weight; changing WIDTHS trains a different network with the same code. One generic contract()-based loop trains a net of any depth: forward contracts each layer, backward is the same contraction transposed (VJP of a contraction is a contraction) with grad(tanh(z), z) for the local derivative. Validated exact against numpy at depth 3. Training metrics are a relation too: each logged step appends a (step, loss, train_acc, test_acc) row to a metrics table rather than a Python list. The trained model, predictions, and metrics all come back out as xarray via to_dataset. ~83% test accuracy in ~13s. Co-Authored-By: Claude Opus 4.8 --- benchmarks/README.md | 91 +++--- benchmarks/mnist_mlp.py | 607 +++++++++++++++++++++++----------------- 2 files changed, 398 insertions(+), 300 deletions(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index e89e8f5..10b4fea 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -62,40 +62,57 @@ needs the Substrait round-trip, and Substrait has no recursion — so a `grad` marker can't live inside a recursive CTE. Differentiating once to plain SQL sidesteps that.) -## `mnist_mlp.py` — train an MNIST MLP classifier in SQL - -A one-hidden-layer neural network (196 -> 32 tanh -> 10 softmax, on 2x2-pooled -14x14 MNIST) where **every gradient is computed in SQL** and the whole model — -with its entire training history — lives in a single table. - -The model is one append-only table `model(step, layer, i, j, val)`: every -parameter is a row, tagged by which generation (`step`) it belongs to. **A -training step never mutates anything; it appends the next generation's rows.** -`WHERE step = N` is the model at iteration N, and the full trajectory is the -table. Each step is a *single* SQL statement that reads the current generation -and writes the next — reverse-mode autodiff as relational algebra: - -- **matmul = join + `GROUP BY SUM`** — a layer's pre-activation is - `SUM(input * weight)` grouped by (sample, unit). -- **local derivatives = `grad()`** — the hidden activation's Jacobian is - `grad(tanh(z), z)`, the autograd feature doing the calculus per (sample, unit). -- **cotangent propagation = join**, **parameter gradients = join + `GROUP BY - AVG`**, and the update `w - lr*g` is emitted as the next generation's rows. - -The images are registered as xarray (the library's core); evaluation is SQL too -(a forward pass with `ROW_NUMBER()` for the argmax). The only hand-written -gradient is softmax + cross-entropy's `delta = softmax - onehot` (softmax couples -classes through a per-sample normaliser, which an aggregate `grad` does not -cross). Reaches ~83% test accuracy over 60 steps (~140s on a laptop — the -parameter updates run in SQL and every generation is kept as rows, so it trades -speed for a fully relational, fully inspectable training history). Downloads -MNIST on first run. - -Why is the *outer* loop still Python rather than one recursive query (like -`grad_descent.py`)? A recursive CTE may reference the recursive relation only -once, but a 2-layer net uses the current weights several times per step (W1 and -W2 forward, W2 again in backprop), so it can't be a single recursive statement. -Training is also sequential and reuses each step's result, so steps must be -*materialised* between iterations — which is exactly what the thin loop does -(append a generation, then query it). All the maths stays in SQL; Python only -sequences the steps. +## `mnist_mlp.py` — an MNIST MLP as relational tensor algebra + +An MLP (196 -> 32 tanh -> 10 softmax on 2x2-pooled 14x14 MNIST) built on one +idea: **a neural net is a chain of tensor contractions (einsums), and an einsum +over coordinate-indexed arrays *is* relational algebra.** + +``` +C[i,k] = sum_j A[i,j] * B[j,k] <=> JOIN A, B ON A.j = B.j + GROUP BY i, k -> SUM(A.val * B.val) +``` + +Contracting a shared index is a join on it followed by a grouped `SUM` over the +indices that survive. In xarray-sql an array indexed by named dims is a table +keyed by those dims, so **the dimension names are the join keys**. + +**The architecture is data.** The whole model is *one* `xr.Dataset`: each layer's +weight is a data variable `w{L}` over dims `(u{L}, u{L+1})`, the widths it +connects, sharing the boundary dims (`u1` is layer 0's output and layer 1's +input, so it is the join key between them). The dim sizes *are* the layer widths, +and the number of weights is the depth — differing neuron counts per layer are +just differing dim sizes, no padding, because the relational (long) form is +naturally ragged. `from_dataset` splits that one Dataset into a table per weight +automatically. Change `WIDTHS` (e.g. `196, 64, 32, 10`) and the same code trains +the deeper net. + +A small `contract()` helper turns an einsum spec into one query, and a single +generic loop trains a net of any shape: + +- **forward** contracts the activation with each layer's weight, `+ bias`, + `tanh` (softmax on the last layer). +- **backward is the *same* operator with indices transposed** — the VJP of a + contraction is a contraction — and `grad(tanh(z), z)` supplies the only + genuinely-calculus part. Linear algebra is joins; the derivatives of the + nonlinearities are `grad`. + +Everything stays relational: every stage is an inspectable table (`a1`, `delta2`, +`gw0`, …); the only hand-written gradient is softmax + cross-entropy's `delta = +softmax - onehot`. Even the training metrics are a table — each logged step +appends a `(step, loss, train_acc, test_acc)` row to a `metrics` relation rather +than a Python list (NN training produces a lot of such data; it belongs in +rows). Evaluation is SQL too (a forward pass + `ROW_NUMBER()` argmax), and the +trained model, predictions, and metrics all come **back out as xarray** via +`to_dataset`. Reaches ~83% test accuracy over 60 steps. Downloads MNIST on first +run. + +This is not a numpy replacement — relational matmul carries join overhead a BLAS +inner product doesn't. What it buys is a fully declarative, inspectable pipeline +whose data side is chunked xarray (parallel over the batch, larger-than-memory). +The *outer* training loop stays in Python because the steps must be materialised +between iterations: a multi-layer net can't be one recursive CTE (the recursive +relation may be referenced only once, but the weights are used several times per +step), and unrolling the steps as non-recursive CTEs blows up exponentially +(DataFusion inlines CTEs). The thin loop does exactly that materialisation; all +the maths stays in SQL. diff --git a/benchmarks/mnist_mlp.py b/benchmarks/mnist_mlp.py index 4e1d81b..d7d97aa 100644 --- a/benchmarks/mnist_mlp.py +++ b/benchmarks/mnist_mlp.py @@ -4,53 +4,48 @@ # "xarray_sql", # "xarray", # "numpy", -# "pyarrow", # ] # # [tool.uv.sources] # xarray_sql = { path = "..", editable = true } # /// -"""Train an MNIST MLP classifier in SQL. - -A one-hidden-layer network (196->32 tanh->10 softmax, on 2x2-pooled 14x14 = -196-pixel images) trained by gradient descent where **every gradient is computed -in SQL** — and the whole model, with its entire training history, lives in a -single table. - -The model is one append-only table ``model(step, layer, i, j, val)``: every -parameter ``W1[i, j]`` / ``b1[i]`` / ``W2`` / ``b2`` is a row, tagged by which -generation (``step``) it belongs to. **A training step never mutates anything — -it appends the next generation's rows.** The full optimisation trajectory is the -table; ``WHERE step = N`` is the model at iteration N. - -Each step is a *single* SQL statement (``STEP`` below) that reads the current -generation and writes the next. It is reverse-mode autodiff as relational -algebra: - -* **matmul = join + GROUP BY SUM.** A layer's pre-activation is - ``SUM(input * weight)`` grouped by (sample, unit), joining the data to the - current weight rows. -* **local derivatives = grad().** The hidden Jacobian is ``grad(tanh(z), z)`` — - the autograd feature differentiates the nonlinearity, per (sample, unit). -* **cotangent propagation = join.** The output error is pushed back through W2 by - another join + SUM, then scaled by the local ``grad`` factor. -* **parameter gradients = join + GROUP BY AVG**, and the update is ``w - lr*g``, - emitted as the next generation's rows. - -The only hand-written gradient is softmax + cross-entropy's ``delta = softmax - -onehot`` (softmax couples classes through a per-sample normaliser, which an -aggregate ``grad`` does not cross — staying faithful to SQL). Everything else is -grad and joins. Evaluation is SQL too: a forward pass with ``ROW_NUMBER()`` for -the argmax. - -Why is the *outer* loop still Python rather than one recursive query (like -``grad_descent.py``)? A recursive CTE may reference the recursive relation only -once, but a 2-layer net uses the current weights several times per step (W1 and -W2 in the forward pass, W2 again in backprop), so it cannot be a single recursive -statement. Training is also inherently sequential and reuses each step's result, -so the steps must be *materialised* between iterations — which is exactly what the -thin Python loop does (append a generation, then query it). All the maths stays -in SQL; Python only sequences the steps. +"""Train an MNIST MLP as relational tensor algebra — with the architecture as data. + +A neural network is a chain of **tensor contractions** (einsums), and an einsum +over coordinate-indexed arrays *is* relational algebra: + + C[i,k] = sum_j A[i,j] * B[j,k] <=> JOIN A, B ON A.j = B.j + GROUP BY i, k -> SUM(A.val * B.val) + +Contracting a shared index is a join on it followed by a grouped SUM over the +indices that survive. In xarray-sql an array indexed by named dims is a table +keyed by those dims, so **the dimension names are the join keys**. + +The whole model is **one ``xr.Dataset``**. Each layer's weight is a data variable +whose two dims are the widths it connects — ``w0(u0, u1)``, ``w1(u1, u2)``, … — +sharing the boundary dims (``u1`` is the output of layer 0 and the input of layer +1, so it is the join key between them). **The architecture is therefore data: the +Dataset's dim sizes are the layer widths, and the number of layers is how many +weights it holds.** Differing neuron counts per layer are just differing dim +sizes — no padding, because the relational (long) form is naturally ragged. +``from_dataset`` splits that one Dataset into a table per weight automatically. + +A single ``contract()`` turns an einsum spec into one query, and a single generic +loop trains a net of any depth/width: + +* **forward** — contract the activation with each layer's weight, add bias, tanh + (softmax on the last layer). +* **backward is the same operator transposed** — the VJP of a contraction is a + contraction — with ``grad(tanh(z), z)`` for the one local-derivative step. + Linear algebra is joins; the derivatives of the nonlinearities are ``grad``. + +Every stage is an inspectable relation; the trained model, predictions, and loss +curve come back out as ``xarray`` via ``to_dataset``. Change ``WIDTHS`` and the +same code trains a different network. + +This is not a numpy replacement — relational matmul carries join overhead a BLAS +inner product doesn't. What it buys is a declarative, inspectable pipeline whose +data side is chunked xarray (parallel over the batch, larger-than-memory). Run standalone (builds the local extension on first use): @@ -67,7 +62,6 @@ from pathlib import Path import numpy as np -import pyarrow as pa import xarray as xr import xarray_sql as xql @@ -75,13 +69,252 @@ MIRROR = "https://storage.googleapis.com/cvdf-datasets/mnist" CACHE = Path(tempfile.gettempdir()) / "mnist-xql" -# Network dimensions: 14x14 pooled pixels -> 32 hidden (tanh) -> 10 classes. -N_TRAIN, N_TEST, N_PIX, N_HID, N_CLS = 1000, 500, 196, 32, 10 -LR, STEPS = 0.5, 60 +# The architecture, as data: layer widths. 196 pooled pixels -> 32 tanh -> 10. +# Add an entry (e.g. 196, 64, 32, 10) and the same code trains the deeper net. +WIDTHS = [196, 32, 10] +DEPTH = len(WIDTHS) - 1 # number of weight layers +N_TRAIN, N_TEST = 1000, 500 +LR, STEPS, CHUNK = 0.5, 60, 250 + + +# --- the one idea: a tensor contraction is a relational query ----------------- + + +def contract(spec: str, left: str, right: str) -> str: + """An einsum over two coordinate-indexed tables, as one SQL query. + + ``contract("sample,u0 * u0,u1 -> sample,u1", "x", "w0")`` joins ``x`` and + ``w0`` on their shared dim ``u0``, groups by the output dims, and sums the + product of values — a matmul. Every table has its dims as columns plus a + ``val`` column. Indices in the inputs but not the output are contracted; the + same helper expresses the transposed contractions of backprop. + """ + spec = spec.replace(" ", "") + lhs, out = spec.split("->") + da, db = (operand.split(",") for operand in lhs.split("*")) + out_dims = out.split(",") + shared = [d for d in da if d in db] + join = ( + f"JOIN {right} r ON " + " AND ".join(f"l.{d} = r.{d}" for d in shared) + if shared + else f"CROSS JOIN {right} r" + ) + pick = ", ".join(f"{'l' if d in da else 'r'}.{d} AS {d}" for d in out_dims) + return ( + f"SELECT {pick}, SUM(l.val * r.val) AS val " + f"FROM {left} l {join} GROUP BY {', '.join(out_dims)}" + ) + + +def register_tensor( + ctx: xql.XarrayContext, + name: str, + arr: np.ndarray, + dims: tuple[str, ...], + var: str = "val", + chunk: int | None = None, +) -> None: + """Register a numpy array as a relation, the array-relational way: wrap it as + an ``xr.Dataset`` whose named dims become the table's key columns, then hand + it to ``from_dataset``. A tensor is an array at the edge and a relation + inside; ``from_dataset`` is the bridge, and the dims become the join keys.""" + arr = np.asarray(arr, dtype=np.float64) + ds = xr.Dataset( + {var: (dims, arr)}, + coords={d: np.arange(n) for d, n in zip(dims, arr.shape)}, + ) + ctx.from_dataset(name, ds, chunks={dims[0]: chunk or arr.shape[0]}) + + +class Tensors: + """A step rewrites a handful of relations; ``put`` materialises a query as a + named table (the stages of the forward/backward pass).""" + + def __init__(self, ctx: xql.XarrayContext): + self.ctx = ctx + + def put(self, name: str, sql: str) -> None: + batches = self.ctx.sql(sql).collect() + if self.ctx.table_exist(name): + self.ctx.deregister_table(name) + self.ctx.register_record_batches(name, [batches]) + + +# --- the model as one xarray Dataset ------------------------------------------ + + +def build_model(rng: np.random.Generator) -> xr.Dataset: + """The whole model as one Dataset: weight ``w{L}`` over dims ``(u{L}, u{L+1})`` + and bias ``b{L}`` over ``(u{L+1},)``. The shared boundary dims tie the layers + together; the dim sizes *are* the architecture.""" + data_vars: dict = {} + for layer in range(DEPTH): + n_in, n_out = WIDTHS[layer], WIDTHS[layer + 1] + data_vars[f"w{layer}"] = ( + (f"u{layer}", f"u{layer + 1}"), + rng.standard_normal((n_in, n_out)) * 0.1, + ) + data_vars[f"b{layer}"] = ((f"u{layer + 1}",), np.zeros(n_out)) + coords = {f"u{i}": np.arange(w) for i, w in enumerate(WIDTHS)} + return xr.Dataset(data_vars, coords=coords) + + +def seed_weights(t: Tensors) -> None: + """Unpack the one model Dataset (registered as the ``model`` schema) into + working weight/bias relations with a uniform ``val`` column.""" + for layer in range(DEPTH): + i, o = f"u{layer}", f"u{layer + 1}" + t.put( + f"w{layer}", f"SELECT {i}, {o}, w{layer} AS val FROM model.w{layer}" + ) + t.put(f"b{layer}", f"SELECT {o}, b{layer} AS val FROM model.b{layer}") + + +# --- the network, as contractions (generic over depth) ------------------------ + + +def forward(t: Tensors, inp: str = "x") -> None: + """Forward pass from ``inp``: a contraction + bias + tanh per layer, leaving + the pre-activations ``a{L}.z`` for backprop and the output ``logits``.""" + prev = inp + for layer in range(DEPTH): + i, o = f"u{layer}", f"u{layer + 1}" + zc = contract(f"sample,{i} * {i},{o} -> sample,{o}", prev, f"w{layer}") + if layer < DEPTH - 1: + t.put( + f"a{layer + 1}", + f"""WITH zc AS ({zc}) + SELECT zc.sample, zc.{o}, zc.val + b{layer}.val AS z, + tanh(zc.val + b{layer}.val) AS val + FROM zc JOIN b{layer} ON zc.{o} = b{layer}.{o}""", + ) + prev = f"a{layer + 1}" + else: + t.put( + "logits", + f"""WITH zc AS ({zc}) + SELECT zc.sample, zc.{o}, zc.val + b{layer}.val AS z + FROM zc JOIN b{layer} ON zc.{o} = b{layer}.{o}""", + ) + + +def softmax_delta_sql() -> str: + """Output error delta = softmax(logits) - onehot(label). The one hand-derived + rule: softmax couples classes through a per-sample normaliser an aggregate + grad() does not cross.""" + o = f"u{DEPTH}" + return f""" + WITH m AS (SELECT sample, MAX(z) AS m FROM logits GROUP BY sample), + e AS (SELECT logits.sample, logits.{o}, exp(logits.z - m.m) AS e + FROM logits JOIN m ON logits.sample = m.sample), + s AS (SELECT sample, SUM(e) AS s FROM e GROUP BY sample) + SELECT e.sample, e.{o}, + e.e / s.s - CASE WHEN e.{o} = y.label THEN 1.0 ELSE 0.0 END AS val + FROM e JOIN s ON e.sample = s.sample JOIN y ON y.sample = e.sample""" + + +def train_step(t: Tensors) -> None: + """Forward, backward (the same contraction transposed), SGD update.""" + forward(t) + t.put(f"delta{DEPTH}", softmax_delta_sql()) + # Backward: walk the layers in reverse, the gradients are contractions. + for layer in reversed(range(DEPTH)): + i, o = f"u{layer}", f"u{layer + 1}" + a_in = "x" if layer == 0 else f"a{layer}" + gw = contract( + f"sample,{i} * sample,{o} -> {i},{o}", a_in, f"delta{layer + 1}" + ) + t.put( + f"gw{layer}", f"SELECT {i}, {o}, val / {N_TRAIN} AS val FROM ({gw})" + ) + t.put( + f"gb{layer}", + f"SELECT {o}, AVG(val) AS val FROM delta{layer + 1} GROUP BY {o}", + ) + if layer > 0: # propagate the cotangent, scaled by the local derivative + dc = contract( + f"sample,{o} * {i},{o} -> sample,{i}", + f"delta{layer + 1}", + f"w{layer}", + ) + t.put( + f"delta{layer}", + f"""WITH dh AS ({dc}) + SELECT dh.sample, dh.{i}, dh.val * grad(tanh(a{layer}.z), a{layer}.z) AS val + FROM dh JOIN a{layer} ON dh.sample = a{layer}.sample AND dh.{i} = a{layer}.{i}""", + ) + # SGD: each weight relation becomes w - lr * grad. + for layer in range(DEPTH): + i, o = f"u{layer}", f"u{layer + 1}" + t.put( + f"w{layer}", + f"SELECT w{layer}.{i}, w{layer}.{o}, w{layer}.val - {LR} * gw{layer}.val AS val " + f"FROM w{layer} JOIN gw{layer} ON w{layer}.{i} = gw{layer}.{i} " + f"AND w{layer}.{o} = gw{layer}.{o}", + ) + t.put( + f"b{layer}", + f"SELECT b{layer}.{o}, b{layer}.val - {LR} * gb{layer}.val AS val " + f"FROM b{layer} JOIN gb{layer} ON b{layer}.{o} = gb{layer}.{o}", + ) + + +def accuracy(t: Tensors, inp: str, lab: str) -> float: + """A forward pass over ``inp`` + argmax, compared to ``lab`` — all in SQL.""" + forward(t, inp) + o = f"u{DEPTH}" + return float( + t.ctx.sql( + f"""WITH pred AS ( + SELECT sample, {o}, + ROW_NUMBER() OVER (PARTITION BY sample ORDER BY z DESC) AS rk + FROM logits) + SELECT AVG(CASE WHEN p.{o} = l.label THEN 1.0 ELSE 0.0 END) AS acc + FROM pred p JOIN {lab} l ON p.sample = l.sample WHERE p.rk = 1""" + ).to_pandas()["acc"][0] + ) + + +def record_metrics(t: Tensors, step: int) -> None: + """Append a (step, loss, train_acc, test_acc) row to the ``metrics`` table. + + NN training emits a lot of data — loss curves, per-step accuracies — and like + everything else here it lives as rows in a relation, grown each time, not a + Python list. Read it back at the end as a tidy ``(step,)`` xarray. + """ + o = f"u{DEPTH}" + train = accuracy(t, "x", "y") # leaves the training forward in `logits` + loss = float( + t.ctx.sql( + f"""WITH m AS (SELECT sample, MAX(z) AS m FROM logits GROUP BY sample), + e AS (SELECT logits.sample, logits.{o}, exp(logits.z - m.m) AS e + FROM logits JOIN m ON logits.sample = m.sample), + s AS (SELECT sample, SUM(e) AS s FROM e GROUP BY sample) + SELECT -AVG(ln(e.e / s.s)) AS loss + FROM e JOIN s ON e.sample = s.sample JOIN y ON y.sample = e.sample + WHERE e.{o} = y.label""" + ).to_pandas()["loss"][0] + ) + test = accuracy(t, "x_te", "y_te") + row = ( + f"SELECT CAST({step} AS BIGINT) AS step, CAST({loss} AS DOUBLE) AS loss, " + f"CAST({train} AS DOUBLE) AS train_acc, CAST({test} AS DOUBLE) AS test_acc" + ) + t.put( + "metrics", + f"SELECT * FROM metrics UNION ALL {row}" + if t.ctx.table_exist("metrics") + else row, + ) + print( + f"step {step:2d}: loss {loss:.3f} train {train:.3f} test {test:.3f}" + ) + + +# --- MNIST loading ------------------------------------------------------------ def _download(url: str, dest: Path, tries: int = 5) -> None: - """Fetch a URL to dest, reading the whole body (retries on truncation).""" last = None for _ in range(tries): try: @@ -102,12 +335,11 @@ def _read_idx(path: Path) -> np.ndarray: if magic == 2051: # images n, r, c = struct.unpack(">III", f.read(12)) return np.frombuffer(f.read(), np.uint8).reshape(n, r, c) - (n,) = struct.unpack(">I", f.read(4)) # labels + struct.unpack(">I", f.read(4)) # labels: skip the count return np.frombuffer(f.read(), np.uint8) def load_mnist(): - """Download (and cache) MNIST, 2x2 mean-pool to 14x14, subsample.""" CACHE.mkdir(exist_ok=True) files = { "images": "train-images-idx3-ubyte.gz", @@ -119,241 +351,90 @@ def load_mnist(): if not dest.exists(): _download(f"{MIRROR}/{name}", dest) paths[key] = dest - imgs = _read_idx(paths["images"]).astype(np.float32) / 255.0 labs = _read_idx(paths["labels"]).astype(np.int64) - pooled = imgs.reshape(-1, 14, 2, 14, 2).mean(axis=(2, 4)).reshape(-1, N_PIX) - + side = WIDTHS[0] # pooled pixels per image + pool = int(round((28 * 28 / side) ** 0.5)) # 2 for 196 pixels + k = 28 // pool + pooled = ( + imgs.reshape(-1, k, pool, k, pool).mean(axis=(2, 4)).reshape(-1, side) + ) rng = np.random.default_rng(0) idx = rng.permutation(len(pooled)) tr, te = idx[:N_TRAIN], idx[N_TRAIN : N_TRAIN + N_TEST] return pooled[tr], labs[tr], pooled[te], labs[te] -# --- the model as rows -------------------------------------------------------- - -_MODEL_SCHEMA = pa.schema( - [ - ("step", pa.int64()), - ("layer", pa.utf8()), - ("i", pa.int64()), - ("j", pa.int64()), - ("val", pa.float64()), - ] -) - - -def _param_rows(step: int, layer: str, arr: np.ndarray) -> dict: - """One layer's parameters as ``(step, layer, i, j, val)`` columns. - - A matrix ``W[i, j]`` becomes rows ``(i, j, w)``; a bias vector ``b[i]`` - becomes ``(i, 0, b)``. - """ - if arr.ndim == 2: - ii, jj = np.meshgrid( - np.arange(arr.shape[0]), np.arange(arr.shape[1]), indexing="ij" - ) - ii, jj = ii.ravel(), jj.ravel() - else: - ii, jj = np.arange(arr.size), np.zeros(arr.size, np.int64) - n = arr.size - return { - "step": np.full(n, step, np.int64), - "layer": [layer] * n, - "i": ii.astype(np.int64), - "j": jj.astype(np.int64), - "val": arr.ravel().astype(np.float64), - } - - -def _generation_batch(step, w1, b1, w2, b2) -> pa.RecordBatch: - """All four layers of one generation as a single RecordBatch.""" - cols: dict[str, list] = {k: [] for k in ("step", "layer", "i", "j", "val")} - for layer, arr in (("w1", w1), ("b1", b1), ("w2", w2), ("b2", b2)): - for k, v in _param_rows(step, layer, arr).items(): - cols[k].extend(list(v)) - return pa.RecordBatch.from_arrays( - [ - pa.array(cols["step"], pa.int64()), - pa.array(cols["layer"], pa.utf8()), - pa.array(cols["i"], pa.int64()), - pa.array(cols["j"], pa.int64()), - pa.array(cols["val"], pa.float64()), - ], - schema=_MODEL_SCHEMA, - ) - - -# One training step, as one SQL statement: read the current generation of the -# model table, run the forward + backward pass over the data, and SELECT the next -# generation's parameter rows (which the loop appends to the model table). -STEP = f""" -WITH cur AS (SELECT max(step) AS s FROM model), - w1 AS (SELECT i AS pix, j AS hid, val AS w FROM model, cur - WHERE step = cur.s AND layer = 'w1'), - b1 AS (SELECT i AS hid, val AS b FROM model, cur - WHERE step = cur.s AND layer = 'b1'), - w2 AS (SELECT i AS hid, j AS cls, val AS w FROM model, cur - WHERE step = cur.s AND layer = 'w2'), - b2 AS (SELECT i AS cls, val AS b FROM model, cur - WHERE step = cur.s AND layer = 'b2'), - -- forward: hidden pre-activation z and activation a = tanh(z) - zt AS (SELECT i.sample, w.hid, SUM(i.val * w.w) + MAX(bb.b) AS z - FROM imgs i JOIN w1 w ON i.pix = w.pix JOIN b1 bb ON w.hid = bb.hid - GROUP BY i.sample, w.hid), - h AS (SELECT sample, hid, z, tanh(z) AS a FROM zt), - -- output logits, then a stable softmax - lg AS (SELECT h.sample, w.cls, SUM(h.a * w.w) + MAX(bb.b) AS z - FROM h JOIN w2 w ON h.hid = w.hid JOIN b2 bb ON w.cls = bb.cls - GROUP BY h.sample, w.cls), - mx AS (SELECT sample, MAX(z) AS m FROM lg GROUP BY sample), - ex AS (SELECT l.sample, l.cls, exp(l.z - mx.m) AS e - FROM lg l JOIN mx ON l.sample = mx.sample), - zs AS (SELECT sample, SUM(e) AS z FROM ex GROUP BY sample), - -- output error delta2 = softmax - onehot(label) - d2 AS (SELECT ex.sample, ex.cls, - ex.e / zs.z - - CASE WHEN ex.cls = lb.label THEN 1.0 ELSE 0.0 END AS d - FROM ex JOIN zs ON ex.sample = zs.sample - JOIN labels lb ON lb.sample = ex.sample), - -- backprop to hidden: push delta2 through W2, scale by grad(tanh(z), z) - da AS (SELECT d.sample, w.hid, SUM(d.d * w.w) AS da - FROM d2 d JOIN w2 w ON d.cls = w.cls GROUP BY d.sample, w.hid), - d1 AS (SELECT da.sample, da.hid, da.da * grad(tanh(h.z), h.z) AS d - FROM da JOIN h ON da.sample = h.sample AND da.hid = h.hid), - -- parameter gradients: dW = AVG(input * delta) over the batch - gw1 AS (SELECT i.pix, d.hid, AVG(i.val * d.d) AS g - FROM imgs i JOIN d1 d ON i.sample = d.sample GROUP BY i.pix, d.hid), - gb1 AS (SELECT hid, AVG(d) AS g FROM d1 GROUP BY hid), - gw2 AS (SELECT h.hid, d.cls, AVG(h.a * d.d) AS g - FROM h JOIN d2 d ON h.sample = d.sample GROUP BY h.hid, d.cls), - gb2 AS (SELECT cls, AVG(d) AS g FROM d2 GROUP BY cls) --- the next generation: w - lr*grad, tagged step+1, as model rows -SELECT (SELECT s FROM cur) + 1 AS step, 'w1' AS layer, - w.pix AS i, w.hid AS j, w.w - {LR} * g.g AS val -FROM w1 w JOIN gw1 g ON w.pix = g.pix AND w.hid = g.hid -UNION ALL -SELECT (SELECT s FROM cur) + 1, 'b1', b.hid, CAST(0 AS BIGINT), b.b - {LR} * g.g -FROM b1 b JOIN gb1 g ON b.hid = g.hid -UNION ALL -SELECT (SELECT s FROM cur) + 1, 'w2', w.hid, w.cls, w.w - {LR} * g.g -FROM w2 w JOIN gw2 g ON w.hid = g.hid AND w.cls = g.cls -UNION ALL -SELECT (SELECT s FROM cur) + 1, 'b2', b.cls, CAST(0 AS BIGINT), b.b - {LR} * g.g -FROM b2 b JOIN gb2 g ON b.cls = g.cls -""" - - -def eval_sql(imgs_table: str, labels_table: str) -> str: - """Accuracy of the latest model on a dataset — a forward pass in SQL. - - ``ROW_NUMBER()`` picks each sample's argmax class; it is compared to the - label. No softmax needed at inference: the argmax of the logits is the - prediction. - """ - return f""" - WITH cur AS (SELECT max(step) AS s FROM model), - w1 AS (SELECT i AS pix, j AS hid, val AS w FROM model, cur - WHERE step = cur.s AND layer = 'w1'), - b1 AS (SELECT i AS hid, val AS b FROM model, cur - WHERE step = cur.s AND layer = 'b1'), - w2 AS (SELECT i AS hid, j AS cls, val AS w FROM model, cur - WHERE step = cur.s AND layer = 'w2'), - b2 AS (SELECT i AS cls, val AS b FROM model, cur - WHERE step = cur.s AND layer = 'b2'), - h AS (SELECT i.sample, w.hid, - tanh(SUM(i.val * w.w) + MAX(bb.b)) AS a - FROM {imgs_table} i JOIN w1 w ON i.pix = w.pix - JOIN b1 bb ON w.hid = bb.hid - GROUP BY i.sample, w.hid), - lg AS (SELECT h.sample, w.cls, SUM(h.a * w.w) + MAX(bb.b) AS z - FROM h JOIN w2 w ON h.hid = w.hid JOIN b2 bb ON w.cls = bb.cls - GROUP BY h.sample, w.cls), - pred AS (SELECT sample, cls, - ROW_NUMBER() OVER (PARTITION BY sample ORDER BY z DESC) AS rk - FROM lg) - SELECT AVG(CASE WHEN p.cls = l.label THEN 1.0 ELSE 0.0 END) AS acc - FROM pred p JOIN {labels_table} l ON p.sample = l.sample - WHERE p.rk = 1 - """ - - -def _register_images(ctx, name, X): - ctx.from_dataset( - name, - xr.Dataset( - {"val": (("sample", "pix"), X)}, - coords={ - "sample": np.arange(X.shape[0]), - "pix": np.arange(N_PIX), - }, - ), - chunks={"sample": X.shape[0]}, - ) - - -def _register_labels(ctx, name, y): - ctx.from_dataset( - name, - xr.Dataset( - {"label": (("sample",), y.astype(np.float64))}, - coords={"sample": np.arange(len(y))}, - ), - chunks={"sample": len(y)}, - ) +# --- driver ------------------------------------------------------------------- def main() -> None: Xtr, ytr, Xte, yte = load_mnist() - print( - f"MNIST: train {Xtr.shape}, test {Xte.shape} " - f"({N_PIX} pix, {N_HID} hidden, {N_CLS} classes)" - ) + print(f"MNIST: train {Xtr.shape}, test {Xte.shape} architecture {WIDTHS}") ctx = xql.XarrayContext() - # The data is registered as xarray (the library's core); the model below is - # the one append-only table that holds every layer and every generation. - _register_images(ctx, "imgs", Xtr) - _register_labels(ctx, "labels", ytr) - _register_images(ctx, "imgs_te", Xte) - _register_labels(ctx, "labels_te", yte) - - # Generation 0: small random weights, zero biases. + # The whole model is one Dataset; from_dataset splits it into a table per + # weight (the shared boundary dims become the join keys). rng = np.random.default_rng(1) - gen0 = _generation_batch( - 0, - rng.standard_normal((N_PIX, N_HID)) * 0.1, - np.zeros(N_HID), - rng.standard_normal((N_HID, N_CLS)) * 0.1, - np.zeros(N_CLS), + model = build_model(rng) + ctx.from_dataset( + "model", + model, + table_names={ + (f"u{layer}", f"u{layer + 1}"): f"w{layer}" + for layer in range(DEPTH) + } + | {(f"u{layer + 1}",): f"b{layer}" for layer in range(DEPTH)}, + chunks={f"u{i}": w for i, w in enumerate(WIDTHS)}, ) - generations = [gen0] - ctx.register_record_batches("model", [generations]) + t = Tensors(ctx) + seed_weights(t) - def test_acc() -> float: - return float( - ctx.sql(eval_sql("imgs_te", "labels_te")).to_pandas()["acc"][0] - ) + # Inputs and labels, registered once; the queries read x / x_te by name. + register_tensor(ctx, "x", Xtr, ("sample", "u0"), chunk=CHUNK) + register_tensor(ctx, "y", ytr, ("sample",), var="label") + register_tensor(ctx, "x_te", Xte, ("sample", "u0")) + register_tensor(ctx, "y_te", yte, ("sample",), var="label") + + print(f"init: test acc {accuracy(t, 'x_te', 'y_te'):.3f}") - print(f"init: test acc {test_acc():.3f}") t0 = time.time() - for s in range(STEPS): - # One SQL statement computes the next generation; appending its rows to - # the model table *is* the parameter update. - generations.extend(ctx.sql(STEP).collect()) - ctx.deregister_table("model") - ctx.register_record_batches("model", [generations]) - if s % 10 == 0 or s == STEPS - 1: - tr = float( - ctx.sql(eval_sql("imgs", "labels")).to_pandas()["acc"][0] - ) - print(f"step {s:2d}: train {tr:.3f} test {test_acc():.3f}") + for step in range(STEPS): + train_step(t) + if step % 10 == 0 or step == STEPS - 1: + record_metrics(t, step) + dt = time.time() - t0 + + # The trained model comes back out as one xarray Dataset. + parts = [] + for layer in range(DEPTH): + i, o = f"u{layer}", f"u{layer + 1}" + parts.append( + ctx.sql(f"SELECT {i}, {o}, val FROM w{layer}") + .to_dataset(dims=[i, o]) + .rename({"val": f"w{layer}"}) + ) + parts.append( + ctx.sql(f"SELECT {o}, val FROM b{layer}") + .to_dataset(dims=[o]) + .rename({"val": f"b{layer}"}) + ) + trained = xr.merge(parts) + # The loss curve and accuracies were recorded as rows; read them back as a + # tidy (step,) xarray of training metrics. + metrics = ctx.sql("SELECT * FROM metrics ORDER BY step").to_dataset( + dims=["step"] + ) - n_rows = ctx.sql("SELECT count(*) AS n FROM model").to_pandas()["n"][0] print( - f"\ntrained an MNIST MLP in SQL: test accuracy {test_acc():.3f} " - f"in {time.time() - t0:.0f}s.\nThe model and its entire training " - f"history are one table of {n_rows} rows ({STEPS + 1} generations)." + f"\ntrained a {WIDTHS} MLP as relational tensor algebra in {dt:.0f}s: " + f"test accuracy {accuracy(t, 'x_te', 'y_te'):.3f}." + ) + print( + f"the model is one xarray Dataset again " + f"(vars {list(trained.data_vars)}, dims {dict(trained.sizes)}); " + f"metrics are a table -> xarray {list(metrics.data_vars)} over " + f"{dict(metrics.sizes)}." ) From 3b365f6656f887706f4dfdb8ef589eddbad0a201 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Tue, 30 Jun 2026 18:15:36 +0300 Subject: [PATCH 5/5] demo: the whole MLP as one weight relation (bias folded, layer dim) Two simplifications collapse the model to a single relation: - Bias folded into the weights (an nn.Linear): each layer's bias is the weight of a constant-1 input, kept as the row inp=width of the same weight array, so a layer is one matrix. - A layer dimension: every layer's weight lives in one weight(layer, inp, out) array, so forward/backward filter on the layer COLUMN instead of referencing a table per layer. The model is one xr.Dataset with a layer dim (NaN-padded for the ragged pyramid, dropped on seed); from_dataset registers it; the update is one query over the whole weight relation. A single contract() and a generic loop train a net of any depth (validated exact against numpy at depth 3). Tensors.put now unifies batch nullability so UNION results register cleanly. Faster too (~6s vs ~13s) at the same ~83% test accuracy; model and metrics still round-trip to xarray. Co-Authored-By: Claude Opus 4.8 --- benchmarks/README.md | 43 +++--- benchmarks/mnist_mlp.py | 297 +++++++++++++++++++++------------------- 2 files changed, 182 insertions(+), 158 deletions(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index 10b4fea..5fa3fcd 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -77,31 +77,38 @@ Contracting a shared index is a join on it followed by a grouped `SUM` over the indices that survive. In xarray-sql an array indexed by named dims is a table keyed by those dims, so **the dimension names are the join keys**. -**The architecture is data.** The whole model is *one* `xr.Dataset`: each layer's -weight is a data variable `w{L}` over dims `(u{L}, u{L+1})`, the widths it -connects, sharing the boundary dims (`u1` is layer 0's output and layer 1's -input, so it is the join key between them). The dim sizes *are* the layer widths, -and the number of weights is the depth — differing neuron counts per layer are -just differing dim sizes, no padding, because the relational (long) form is -naturally ragged. `from_dataset` splits that one Dataset into a table per weight -automatically. Change `WIDTHS` (e.g. `196, 64, 32, 10`) and the same code trains -the deeper net. +**The whole network is one relation.** Two moves get there: + +- **Bias folded into the weights (an `nn.Linear`).** Each layer's bias is the + weight of a constant-`1` input, kept as the extra row `inp = width` of the same + weight array — so a layer is a single matrix. +- **A `layer` dimension.** Every layer's weight lives in one + `weight(layer, inp, out)` array, so the forward/backward filter on the `layer` + *column* instead of referencing a table per layer. + +So **the architecture is data**: the whole model is one `xr.Dataset` with a +`layer` dim, registered via `from_dataset`. The dim sizes are the layer widths +and the number of layers is the depth — differing neuron counts are just +differing sizes, NaN-padded in the dense array and dropped on the way in (the +relational form is naturally ragged). Change `WIDTHS` (e.g. `196, 64, 32, 10`) +and the same code trains the deeper net. A small `contract()` helper turns an einsum spec into one query, and a single generic loop trains a net of any shape: -- **forward** contracts the activation with each layer's weight, `+ bias`, - `tanh` (softmax on the last layer). +- **forward** contracts the activation with `weight WHERE layer = L`, adds the + bias row, `tanh` (softmax on the last layer). - **backward is the *same* operator with indices transposed** — the VJP of a - contraction is a contraction — and `grad(tanh(z), z)` supplies the only - genuinely-calculus part. Linear algebra is joins; the derivatives of the - nonlinearities are `grad`. + contraction is a contraction — accumulated into one `gweight` relation, with + `grad(tanh(z), z)` for the only genuinely-calculus part. Even the update is one + query over the whole `weight` relation. Linear algebra is joins; the + derivatives of the nonlinearities are `grad`. Everything stays relational: every stage is an inspectable table (`a1`, `delta2`, -`gw0`, …); the only hand-written gradient is softmax + cross-entropy's `delta = -softmax - onehot`. Even the training metrics are a table — each logged step -appends a `(step, loss, train_acc, test_acc)` row to a `metrics` relation rather -than a Python list (NN training produces a lot of such data; it belongs in +`gweight`, …); the only hand-written gradient is softmax + cross-entropy's +`delta = softmax - onehot`. Even the training metrics are a table — each logged +step appends a `(step, loss, train_acc, test_acc)` row to a `metrics` relation +rather than a Python list (NN training produces a lot of such data; it belongs in rows). Evaluation is SQL too (a forward pass + `ROW_NUMBER()` argmax), and the trained model, predictions, and metrics all come **back out as xarray** via `to_dataset`. Reaches ~83% test accuracy over 60 steps. Downloads MNIST on first diff --git a/benchmarks/mnist_mlp.py b/benchmarks/mnist_mlp.py index d7d97aa..fe31cee 100644 --- a/benchmarks/mnist_mlp.py +++ b/benchmarks/mnist_mlp.py @@ -9,7 +9,7 @@ # [tool.uv.sources] # xarray_sql = { path = "..", editable = true } # /// -"""Train an MNIST MLP as relational tensor algebra — with the architecture as data. +"""Train an MNIST MLP as relational tensor algebra — the whole net is one table. A neural network is a chain of **tensor contractions** (einsums), and an einsum over coordinate-indexed arrays *is* relational algebra: @@ -17,35 +17,41 @@ C[i,k] = sum_j A[i,j] * B[j,k] <=> JOIN A, B ON A.j = B.j GROUP BY i, k -> SUM(A.val * B.val) -Contracting a shared index is a join on it followed by a grouped SUM over the -indices that survive. In xarray-sql an array indexed by named dims is a table -keyed by those dims, so **the dimension names are the join keys**. - -The whole model is **one ``xr.Dataset``**. Each layer's weight is a data variable -whose two dims are the widths it connects — ``w0(u0, u1)``, ``w1(u1, u2)``, … — -sharing the boundary dims (``u1`` is the output of layer 0 and the input of layer -1, so it is the join key between them). **The architecture is therefore data: the -Dataset's dim sizes are the layer widths, and the number of layers is how many -weights it holds.** Differing neuron counts per layer are just differing dim -sizes — no padding, because the relational (long) form is naturally ragged. -``from_dataset`` splits that one Dataset into a table per weight automatically. - -A single ``contract()`` turns an einsum spec into one query, and a single generic -loop trains a net of any depth/width: - -* **forward** — contract the activation with each layer's weight, add bias, tanh - (softmax on the last layer). -* **backward is the same operator transposed** — the VJP of a contraction is a - contraction — with ``grad(tanh(z), z)`` for the one local-derivative step. - Linear algebra is joins; the derivatives of the nonlinearities are ``grad``. - -Every stage is an inspectable relation; the trained model, predictions, and loss -curve come back out as ``xarray`` via ``to_dataset``. Change ``WIDTHS`` and the -same code trains a different network. - -This is not a numpy replacement — relational matmul carries join overhead a BLAS -inner product doesn't. What it buys is a declarative, inspectable pipeline whose -data side is chunked xarray (parallel over the batch, larger-than-memory). +Contracting a shared index is a join on it followed by a grouped SUM. In +xarray-sql an array indexed by named dims is a table keyed by those dims, so the +dim names are the join keys. + +Two simplifications make the whole model **one relation**: + +* **Bias folded into the weights (an ``nn.Linear``).** Each layer's bias is the + weight of a constant-``1`` input, stored as the extra row ``inp = width`` in the + same weight array — so a layer is a single matrix. The forward reads the matmul + rows and that bias row from the one relation (no separate bias table). +* **A ``layer`` dimension.** Every layer's weight lives in one + ``weight(layer, inp, out)`` array, so the forward/backward filter on the + ``layer`` *column* instead of referencing a table per layer. The whole network + is one ``xr.Dataset`` registered with ``from_dataset``; differing layer widths + are NaN-padded in the dense array and dropped on the way in (the relational + form is naturally ragged). The architecture is data — change ``WIDTHS`` and the + same code trains a different net. + +A single ``contract()`` and one generic loop train a net of any depth: forward +contracts the activation with ``weight WHERE layer = L``; backward is the same +contraction transposed (the VJP of a contraction is a contraction), with +``grad(tanh(z), z)`` for the one local-derivative step. Even the weight update is +one query over the whole ``weight`` relation. Linear algebra is joins; the +derivatives of the nonlinearities are ``grad``. + +Everything stays relational and inspectable: activations, errors, gradients, and +the per-step training metrics are all tables; the trained model, predictions, and +metrics come back out as ``xarray`` via ``to_dataset``. + +This is not a numpy replacement — the long form puts one matrix entry per row, so +the matmul-as-join carries overhead a BLAS inner product doesn't. What it buys is +a declarative, inspectable pipeline whose data side is chunked xarray (parallel +over the batch, larger-than-memory). Recovering BLAS speed would mean storing +dense *tiles* per cell and contracting them with a tile-matmul — a future +direction, not done here. Run standalone (builds the local extension on first use): @@ -62,6 +68,7 @@ from pathlib import Path import numpy as np +import pyarrow as pa import xarray as xr import xarray_sql as xql @@ -81,13 +88,13 @@ def contract(spec: str, left: str, right: str) -> str: - """An einsum over two coordinate-indexed tables, as one SQL query. + """An einsum over two coordinate-indexed relations, as one SQL query. - ``contract("sample,u0 * u0,u1 -> sample,u1", "x", "w0")`` joins ``x`` and - ``w0`` on their shared dim ``u0``, groups by the output dims, and sums the - product of values — a matmul. Every table has its dims as columns plus a - ``val`` column. Indices in the inputs but not the output are contracted; the - same helper expresses the transposed contractions of backprop. + ``contract("sample,inp * inp,out -> sample,out", "x", w)`` joins ``x`` and + ``w`` on their shared dim ``inp``, groups by the output dims, and sums the + product of values — a matmul. ``left`` / ``right`` are table names or + parenthesised subqueries; each exposes its dims plus a ``val`` column. + Indices in the inputs but not the output are contracted (summed over). """ spec = spec.replace(" ", "") lhs, out = spec.split("->") @@ -135,66 +142,85 @@ def __init__(self, ctx: xql.XarrayContext): def put(self, name: str, sql: str) -> None: batches = self.ctx.sql(sql).collect() + # UNION branches can yield batches that differ only in field nullability; + # cast them all to one (nullable) schema so registration accepts them. + if batches: + target = pa.schema( + [pa.field(f.name, f.type) for f in batches[0].schema] + ) + batches = [b.cast(target) for b in batches] if self.ctx.table_exist(name): self.ctx.deregister_table(name) self.ctx.register_record_batches(name, [batches]) -# --- the model as one xarray Dataset ------------------------------------------ +# --- the model: one weight relation, bias folded in --------------------------- def build_model(rng: np.random.Generator) -> xr.Dataset: - """The whole model as one Dataset: weight ``w{L}`` over dims ``(u{L}, u{L+1})`` - and bias ``b{L}`` over ``(u{L+1},)``. The shared boundary dims tie the layers - together; the dim sizes *are* the architecture.""" - data_vars: dict = {} + """The whole network as one ``weight(layer, inp, out)`` Dataset. + + Layer ``L`` connects ``WIDTHS[L]`` inputs (plus a constant-1 bias input, index + ``WIDTHS[L]``) to ``WIDTHS[L+1]`` outputs. The dense array is NaN-padded to the + widest layer; the padding is dropped when the relation is seeded, so the live + table is the ragged set of real weights. + """ + max_in = max(WIDTHS[layer] + 1 for layer in range(DEPTH)) + max_out = max(WIDTHS[layer + 1] for layer in range(DEPTH)) + arr = np.full((DEPTH, max_in, max_out), np.nan) for layer in range(DEPTH): n_in, n_out = WIDTHS[layer], WIDTHS[layer + 1] - data_vars[f"w{layer}"] = ( - (f"u{layer}", f"u{layer + 1}"), - rng.standard_normal((n_in, n_out)) * 0.1, + arr[layer, :n_in, :n_out] = rng.standard_normal((n_in, n_out)) * 0.1 + arr[layer, n_in, :n_out] = ( + 0.0 # bias row (weight of the constant input) ) - data_vars[f"b{layer}"] = ((f"u{layer + 1}",), np.zeros(n_out)) - coords = {f"u{i}": np.arange(w) for i, w in enumerate(WIDTHS)} - return xr.Dataset(data_vars, coords=coords) + return xr.Dataset( + {"weight": (("layer", "inp", "out"), arr)}, + coords={ + "layer": np.arange(DEPTH), + "inp": np.arange(max_in), + "out": np.arange(max_out), + }, + ) -def seed_weights(t: Tensors) -> None: - """Unpack the one model Dataset (registered as the ``model`` schema) into - working weight/bias relations with a uniform ``val`` column.""" - for layer in range(DEPTH): - i, o = f"u{layer}", f"u{layer + 1}" - t.put( - f"w{layer}", f"SELECT {i}, {o}, w{layer} AS val FROM model.w{layer}" - ) - t.put(f"b{layer}", f"SELECT {o}, b{layer} AS val FROM model.b{layer}") +def matmul_rows(layer: int) -> str: + """The matmul (non-bias) rows of one layer's weight, as a subquery.""" + return f"(SELECT inp, out, val FROM weight WHERE layer = {layer} AND inp < {WIDTHS[layer]})" + + +def bias_row(layer: int) -> str: + """The bias row (inp = width) of one layer's weight, as a subquery over out.""" + return f"(SELECT out, val FROM weight WHERE layer = {layer} AND inp = {WIDTHS[layer]})" # --- the network, as contractions (generic over depth) ------------------------ def forward(t: Tensors, inp: str = "x") -> None: - """Forward pass from ``inp``: a contraction + bias + tanh per layer, leaving - the pre-activations ``a{L}.z`` for backprop and the output ``logits``.""" + """Forward pass from ``inp``: per layer, contract with the matmul rows and add + the bias row (both from the one weight relation), then tanh on the hidden + layers. Leaves ``a{L}.z`` for backprop and the output ``logits``.""" prev = inp for layer in range(DEPTH): - i, o = f"u{layer}", f"u{layer + 1}" - zc = contract(f"sample,{i} * {i},{o} -> sample,{o}", prev, f"w{layer}") + zc = contract( + "sample,inp * inp,out -> sample,out", prev, matmul_rows(layer) + ) if layer < DEPTH - 1: t.put( f"a{layer + 1}", f"""WITH zc AS ({zc}) - SELECT zc.sample, zc.{o}, zc.val + b{layer}.val AS z, - tanh(zc.val + b{layer}.val) AS val - FROM zc JOIN b{layer} ON zc.{o} = b{layer}.{o}""", + SELECT zc.sample, zc.out AS inp, zc.val + b.val AS z, + tanh(zc.val + b.val) AS val + FROM zc JOIN {bias_row(layer)} b ON zc.out = b.out""", ) prev = f"a{layer + 1}" else: t.put( "logits", f"""WITH zc AS ({zc}) - SELECT zc.sample, zc.{o}, zc.val + b{layer}.val AS z - FROM zc JOIN b{layer} ON zc.{o} = b{layer}.{o}""", + SELECT zc.sample, zc.out, zc.val + b.val AS z + FROM zc JOIN {bias_row(layer)} b ON zc.out = b.out""", ) @@ -202,74 +228,77 @@ def softmax_delta_sql() -> str: """Output error delta = softmax(logits) - onehot(label). The one hand-derived rule: softmax couples classes through a per-sample normaliser an aggregate grad() does not cross.""" - o = f"u{DEPTH}" - return f""" + return """ WITH m AS (SELECT sample, MAX(z) AS m FROM logits GROUP BY sample), - e AS (SELECT logits.sample, logits.{o}, exp(logits.z - m.m) AS e + e AS (SELECT logits.sample, logits.out, exp(logits.z - m.m) AS e FROM logits JOIN m ON logits.sample = m.sample), s AS (SELECT sample, SUM(e) AS s FROM e GROUP BY sample) - SELECT e.sample, e.{o}, - e.e / s.s - CASE WHEN e.{o} = y.label THEN 1.0 ELSE 0.0 END AS val + SELECT e.sample, e.out, + e.e / s.s - CASE WHEN e.out = y.label THEN 1.0 ELSE 0.0 END AS val FROM e JOIN s ON e.sample = s.sample JOIN y ON y.sample = e.sample""" def train_step(t: Tensors) -> None: - """Forward, backward (the same contraction transposed), SGD update.""" + """Forward, backward (the same contraction transposed), one SGD update.""" forward(t) t.put(f"delta{DEPTH}", softmax_delta_sql()) - # Backward: walk the layers in reverse, the gradients are contractions. + # Backward: gradients are contractions over the batch, accumulated into one + # gweight relation tagged by layer. delta{L} is the error at layer L's units. for layer in reversed(range(DEPTH)): - i, o = f"u{layer}", f"u{layer + 1}" a_in = "x" if layer == 0 else f"a{layer}" + # matmul gradient (mean over batch) + bias gradient (mean of delta), + # both tagged with this layer, as rows of one gweight relation. gw = contract( - f"sample,{i} * sample,{o} -> {i},{o}", a_in, f"delta{layer + 1}" + "sample,inp * sample,out -> inp,out", a_in, f"delta{layer + 1}" ) - t.put( - f"gw{layer}", f"SELECT {i}, {o}, val / {N_TRAIN} AS val FROM ({gw})" + rows = ( + f"SELECT CAST({layer} AS BIGINT) AS layer, inp, out, " + f"val / {N_TRAIN} AS val FROM ({gw}) " + f"UNION ALL " + f"SELECT CAST({layer} AS BIGINT) AS layer, " + f"CAST({WIDTHS[layer]} AS BIGINT) AS inp, out, AVG(val) AS val " + f"FROM delta{layer + 1} GROUP BY out" ) t.put( - f"gb{layer}", - f"SELECT {o}, AVG(val) AS val FROM delta{layer + 1} GROUP BY {o}", + "gweight", + f"SELECT * FROM gweight UNION ALL {rows}" + if t.ctx.table_exist("gweight") + else rows, ) if layer > 0: # propagate the cotangent, scaled by the local derivative dc = contract( - f"sample,{o} * {i},{o} -> sample,{i}", + "sample,out * inp,out -> sample,inp", f"delta{layer + 1}", - f"w{layer}", + matmul_rows(layer), ) t.put( f"delta{layer}", - f"""WITH dh AS ({dc}) - SELECT dh.sample, dh.{i}, dh.val * grad(tanh(a{layer}.z), a{layer}.z) AS val - FROM dh JOIN a{layer} ON dh.sample = a{layer}.sample AND dh.{i} = a{layer}.{i}""", + f"""WITH dc AS ({dc}) + SELECT dc.sample, dc.inp AS out, + dc.val * grad(tanh(a{layer}.z), a{layer}.z) AS val + FROM dc JOIN a{layer} + ON dc.sample = a{layer}.sample AND dc.inp = a{layer}.inp""", ) - # SGD: each weight relation becomes w - lr * grad. - for layer in range(DEPTH): - i, o = f"u{layer}", f"u{layer + 1}" - t.put( - f"w{layer}", - f"SELECT w{layer}.{i}, w{layer}.{o}, w{layer}.val - {LR} * gw{layer}.val AS val " - f"FROM w{layer} JOIN gw{layer} ON w{layer}.{i} = gw{layer}.{i} " - f"AND w{layer}.{o} = gw{layer}.{o}", - ) - t.put( - f"b{layer}", - f"SELECT b{layer}.{o}, b{layer}.val - {LR} * gb{layer}.val AS val " - f"FROM b{layer} JOIN gb{layer} ON b{layer}.{o} = gb{layer}.{o}", - ) + # One SGD update for the whole network: weight <- weight - lr * gweight. + t.put( + "weight", + f"""SELECT w.layer, w.inp, w.out, w.val - {LR} * g.val AS val + FROM weight w JOIN gweight g + ON w.layer = g.layer AND w.inp = g.inp AND w.out = g.out""", + ) + t.ctx.deregister_table("gweight") def accuracy(t: Tensors, inp: str, lab: str) -> float: """A forward pass over ``inp`` + argmax, compared to ``lab`` — all in SQL.""" forward(t, inp) - o = f"u{DEPTH}" return float( t.ctx.sql( f"""WITH pred AS ( - SELECT sample, {o}, + SELECT sample, out, ROW_NUMBER() OVER (PARTITION BY sample ORDER BY z DESC) AS rk FROM logits) - SELECT AVG(CASE WHEN p.{o} = l.label THEN 1.0 ELSE 0.0 END) AS acc + SELECT AVG(CASE WHEN p.out = l.label THEN 1.0 ELSE 0.0 END) AS acc FROM pred p JOIN {lab} l ON p.sample = l.sample WHERE p.rk = 1""" ).to_pandas()["acc"][0] ) @@ -282,17 +311,16 @@ def record_metrics(t: Tensors, step: int) -> None: everything else here it lives as rows in a relation, grown each time, not a Python list. Read it back at the end as a tidy ``(step,)`` xarray. """ - o = f"u{DEPTH}" train = accuracy(t, "x", "y") # leaves the training forward in `logits` loss = float( t.ctx.sql( - f"""WITH m AS (SELECT sample, MAX(z) AS m FROM logits GROUP BY sample), - e AS (SELECT logits.sample, logits.{o}, exp(logits.z - m.m) AS e + """WITH m AS (SELECT sample, MAX(z) AS m FROM logits GROUP BY sample), + e AS (SELECT logits.sample, logits.out, exp(logits.z - m.m) AS e FROM logits JOIN m ON logits.sample = m.sample), s AS (SELECT sample, SUM(e) AS s FROM e GROUP BY sample) SELECT -AVG(ln(e.e / s.s)) AS loss FROM e JOIN s ON e.sample = s.sample JOIN y ON y.sample = e.sample - WHERE e.{o} = y.label""" + WHERE e.out = y.label""" ).to_pandas()["loss"][0] ) test = accuracy(t, "x_te", "y_te") @@ -354,7 +382,7 @@ def load_mnist(): imgs = _read_idx(paths["images"]).astype(np.float32) / 255.0 labs = _read_idx(paths["labels"]).astype(np.int64) side = WIDTHS[0] # pooled pixels per image - pool = int(round((28 * 28 / side) ** 0.5)) # 2 for 196 pixels + pool = 28 // int(round(side**0.5)) # 2 for 196 pixels (14x14) k = 28 // pool pooled = ( imgs.reshape(-1, k, pool, k, pool).mean(axis=(2, 4)).reshape(-1, side) @@ -373,31 +401,32 @@ def main() -> None: print(f"MNIST: train {Xtr.shape}, test {Xte.shape} architecture {WIDTHS}") ctx = xql.XarrayContext() - # The whole model is one Dataset; from_dataset splits it into a table per - # weight (the shared boundary dims become the join keys). + # The whole model is one Dataset with a layer dim; from_dataset gives one + # `net` table, and seeding drops the NaN padding to the live `weight` relation. rng = np.random.default_rng(1) model = build_model(rng) ctx.from_dataset( - "model", + "net", model, - table_names={ - (f"u{layer}", f"u{layer + 1}"): f"w{layer}" - for layer in range(DEPTH) - } - | {(f"u{layer + 1}",): f"b{layer}" for layer in range(DEPTH)}, - chunks={f"u{i}": w for i, w in enumerate(WIDTHS)}, + chunks={ + "layer": DEPTH, + "inp": model.sizes["inp"], + "out": model.sizes["out"], + }, ) t = Tensors(ctx) - seed_weights(t) + t.put( + "weight", + "SELECT layer, inp, out, weight AS val FROM net WHERE weight IS NOT NULL", + ) - # Inputs and labels, registered once; the queries read x / x_te by name. - register_tensor(ctx, "x", Xtr, ("sample", "u0"), chunk=CHUNK) + # Inputs and labels (the bias is in the weight relation, so no augmentation). + register_tensor(ctx, "x", Xtr, ("sample", "inp"), chunk=CHUNK) register_tensor(ctx, "y", ytr, ("sample",), var="label") - register_tensor(ctx, "x_te", Xte, ("sample", "u0")) + register_tensor(ctx, "x_te", Xte, ("sample", "inp")) register_tensor(ctx, "y_te", yte, ("sample",), var="label") print(f"init: test acc {accuracy(t, 'x_te', 'y_te'):.3f}") - t0 = time.time() for step in range(STEPS): train_step(t) @@ -405,23 +434,12 @@ def main() -> None: record_metrics(t, step) dt = time.time() - t0 - # The trained model comes back out as one xarray Dataset. - parts = [] - for layer in range(DEPTH): - i, o = f"u{layer}", f"u{layer + 1}" - parts.append( - ctx.sql(f"SELECT {i}, {o}, val FROM w{layer}") - .to_dataset(dims=[i, o]) - .rename({"val": f"w{layer}"}) - ) - parts.append( - ctx.sql(f"SELECT {o}, val FROM b{layer}") - .to_dataset(dims=[o]) - .rename({"val": f"b{layer}"}) - ) - trained = xr.merge(parts) - # The loss curve and accuracies were recorded as rows; read them back as a - # tidy (step,) xarray of training metrics. + # The trained model, predictions, and metrics all come back out as xarray. + weights = ( + ctx.sql("SELECT layer, inp, out, val FROM weight") + .to_dataset(dims=["layer", "inp", "out"]) + .rename({"val": "weight"}) + ) metrics = ctx.sql("SELECT * FROM metrics ORDER BY step").to_dataset( dims=["step"] ) @@ -431,10 +449,9 @@ def main() -> None: f"test accuracy {accuracy(t, 'x_te', 'y_te'):.3f}." ) print( - f"the model is one xarray Dataset again " - f"(vars {list(trained.data_vars)}, dims {dict(trained.sizes)}); " - f"metrics are a table -> xarray {list(metrics.data_vars)} over " - f"{dict(metrics.sizes)}." + f"the whole model is one weight relation -> xarray " + f"{dict(weights.sizes)}; metrics are a table -> xarray " + f"{list(metrics.data_vars)} over {dict(metrics.sizes)}." )