diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml new file mode 100644 index 0000000..b0e7b91 --- /dev/null +++ b/.github/workflows/unit-tests.yaml @@ -0,0 +1,34 @@ +name: unit-tests +on: + pull_request: + branches: [main] + push: + branches: [main] +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + test: + runs-on: ubuntu-latest + # fluence-base carries flux-sched 0.52.0 (installed to /usr) + Go, so the + # cgo matcher (pkg/graph) links and `make test` can build every package. + container: + image: ghcr.io/converged-computing/fluence-base:latest + env: + FLUX_SCHED_ROOT: /opt/flux-sched + LD_LIBRARY_PATH: /usr/lib:/opt/flux-sched/resource:/opt/flux-sched/resource/reapi/bindings:/opt/flux-sched/resource/libjobspec + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Versions + run: | + export PATH=$PATH:/usr/local/go/bin + go version + ls /opt/flux-sched >/dev/null && echo "flux-sched present" + + - name: Go test + run: | + export PATH=$PATH:/usr/local/go/bin + make test diff --git a/Dockerfile b/Dockerfile index 2fe9596..1e2f153 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,9 +7,9 @@ COPY . . RUN CGO_ENABLED=1 \ CGO_CFLAGS="-I/opt/flux-sched" \ CGO_LDFLAGS="-L/opt/flux-sched/resource -L/opt/flux-sched/resource/libjobspec -L/opt/flux-sched/resource/reapi/bindings -lresource -ljobspec_conv -lreapi_cli -lflux-idset -lstdc++ -lczmq -ljansson -lhwloc -lboost_system -lflux-hostlist -lboost_graph -lyaml-cpp" \ - go build -ldflags '-w' -o /bin/fluence ./cmd/fluence && \ - CGO_ENABLED=0 go build -ldflags '-w' -o /bin/fluence-deviceplugin ./cmd/deviceplugin && \ - CGO_ENABLED=0 go build -ldflags '-w' -o /bin/fluence-webhook ./cmd/webhook + go build -ldflags '-w' -o /bin/fluence ./cmd/fluence +RUN CGO_ENABLED=0 go build -ldflags '-w' -o /bin/fluence-deviceplugin ./cmd/deviceplugin +RUN CGO_ENABLED=0 go build -ldflags '-w' -o /bin/fluence-webhook ./cmd/webhook FROM fluxrm/flux-core:noble AS runtime diff --git a/Makefile b/Makefile index 46c3344..ed62c28 100644 --- a/Makefile +++ b/Makefile @@ -7,6 +7,7 @@ FLUX_SCHED_ROOT ?= /opt/flux-sched IMG ?= ghcr.io/converged-computing/fluence:latest +TEST_IMG ?= ghcr.io/converged-computing/fluence:test # cgo flags for the scheduler binary: flux-sched only. CGO_CFLAGS = -I$(FLUX_SCHED_ROOT) @@ -25,14 +26,9 @@ build: ## Build all binaries (scheduler needs flux-sched; helpers are pure Go) CGO_ENABLED=0 go build -o bin/fluence-webhook ./cmd/webhook .PHONY: test -test: ## Pure-Go unit tests (no flux, no k8s scheduler libs, no cluster) - go test ./pkg/jgf/... ./pkg/cluster/... ./pkg/jobspec/... ./pkg/placement/... \ - ./pkg/quantum/... ./pkg/webhook/... ./pkg/deviceplugin/... - -.PHONY: test-graph -test-graph: ## Matcher tests (needs flux-sched) +test: CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" \ - go test ./pkg/graph/... + go test ./... .PHONY: test-restore test-restore: @@ -45,8 +41,8 @@ image: ## Build the scheduler container image .PHONY: test-image test-image: ## Build the scheduler container image - docker build -t $(IMG)-test . - docker push $(IMG)-test + docker build -t $(TEST_IMG)-test . + docker push $(TEST_IMG) .PHONY: test-image-deploy test-image-deploy: test-image diff --git a/pkg/fluence/fluence.go b/pkg/fluence/fluence.go index 13cdcb3..5edbbf4 100644 --- a/pkg/fluence/fluence.go +++ b/pkg/fluence/fluence.go @@ -31,6 +31,14 @@ import ( // KubeSchedulerConfiguration. const Name = "Fluence" +// matcher is the subset of *graph.FluxionGraph the plugin depends on. Declaring +// it as an interface lets tests inject a fake (the real matcher is cgo/flux and +// cannot run in a unit test). *graph.FluxionGraph satisfies this. +type matcher interface { + MatchAllocateSpec(spec string) (graph.MatchAllocateRequest, error) + Cancel(jobid uint64) error +} + // groupAlloc is the in-memory record of a group's Fluxion allocation. It is a // rebuildable, within-lifetime memo: its job is race-free "match once per group" // dedup on the scheduling path (the durable record is the jobid annotation on @@ -47,7 +55,7 @@ type groupAlloc struct { // delegated to the native PodGroup API; Fluence only decides placement. type Fluence struct { handle fwk.Handle - matcher *graph.FluxionGraph + matcher matcher // matcherMu serializes all access to the cgo Fluxion client, which is not // thread-safe. Match runs on the (sequential) scheduling path; Cancel runs in @@ -122,12 +130,12 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error // scheduling key is the same JGF vertex subgraph we parse for placement, and // it carries the execution view flux uses to replay an allocation on restart. // This is the format we persist and feed back to UpdateAllocate for recovery. - matcher := &graph.FluxionGraph{MatchFormat: "rv1"} - matcher.Init(tmp.Name(), os.Getenv("FLUENCE_MATCH_POLICY"), "") + fluxion := &graph.FluxionGraph{MatchFormat: "rv1"} + fluxion.Init(tmp.Name(), os.Getenv("FLUENCE_MATCH_POLICY"), "") f := &Fluence{ handle: h, - matcher: matcher, + matcher: fluxion, placement: map[string]groupAlloc{}, } f.registerCancelHandlers() @@ -299,50 +307,54 @@ func (f *Fluence) patchPodAnnotation(ctx context.Context, ns, name, key, val str } // registerCancelHandlers watches PodGroup and Pod deletions and frees the -// corresponding Fluxion allocation. Grouped pods are ignored by the pod handler -// (their allocation lives on the PodGroup); ungrouped pods are handled there. -// The framework has no deletion extension point, so this is informer-driven. +// corresponding Fluxion allocation. The framework has no deletion extension +// point, so this is informer-driven. func (f *Fluence) registerCancelHandlers() { sif := f.handle.SharedInformerFactory() - _, _ = sif.Scheduling().V1alpha2().PodGroups().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - DeleteFunc: func(obj interface{}) { - pg, ok := obj.(*schedv1a2.PodGroup) - if !ok { - tomb, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - return - } - if pg, ok = tomb.Obj.(*schedv1a2.PodGroup); !ok { - return - } - } - f.cancelGroup(pg.Namespace+"/"+pg.Name, pg.Annotations) - }, + DeleteFunc: f.onPodGroupDeleted, }) - _, _ = sif.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - DeleteFunc: func(obj interface{}) { - pod, ok := obj.(*corev1.Pod) - if !ok { - tomb, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - return - } - if pod, ok = tomb.Obj.(*corev1.Pod); !ok { - return - } - } - // Grouped pods' allocation is owned by the PodGroup; only the - // PodGroup's deletion frees it. Act on ungrouped pods only. - if placement.PodGroupName(pod) != "" { - return - } - f.cancelGroup(pod.Namespace+"/"+pod.Name, pod.Annotations) - }, + DeleteFunc: f.onPodDeleted, }) } +// onPodGroupDeleted frees the gang's allocation when its PodGroup is deleted. +func (f *Fluence) onPodGroupDeleted(obj interface{}) { + pg, ok := obj.(*schedv1a2.PodGroup) + if !ok { + tomb, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + return + } + if pg, ok = tomb.Obj.(*schedv1a2.PodGroup); !ok { + return + } + } + f.cancelGroup(pg.Namespace+"/"+pg.Name, pg.Annotations) +} + +// onPodDeleted frees an ungrouped pod's allocation when the pod is deleted. +// Grouped pods are ignored: their allocation is owned by the PodGroup and is +// freed only when the PodGroup is deleted (freeing it on a single pod's delete +// would release the whole gang's resources while its other pods still run). +func (f *Fluence) onPodDeleted(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + tomb, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + return + } + if pod, ok = tomb.Obj.(*corev1.Pod); !ok { + return + } + } + if placement.PodGroupName(pod) != "" { + return + } + f.cancelGroup(pod.Namespace+"/"+pod.Name, pod.Annotations) +} + // cancelGroup frees the allocation for a deleted owning object. The jobid comes // from the object's annotation (the durable source of truth); if it is missing // (e.g. deleted between PreFilter and PreBind, before the annotation was diff --git a/pkg/fluence/fluence_test.go b/pkg/fluence/fluence_test.go new file mode 100644 index 0000000..87e5a3e --- /dev/null +++ b/pkg/fluence/fluence_test.go @@ -0,0 +1,246 @@ +package fluence + +import ( + "errors" + "testing" + + "github.com/converged-computing/fluence/pkg/graph" + "github.com/converged-computing/fluence/pkg/placement" + + corev1 "k8s.io/api/core/v1" + schedv1a2 "k8s.io/api/scheduling/v1alpha2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" +) + +// fakeMatcher records Cancel calls so cancel behavior can be asserted without +// the real cgo/flux matcher. It satisfies the package-internal matcher interface. +type fakeMatcher struct { + cancelled []uint64 + cancelErr error +} + +func (m *fakeMatcher) MatchAllocateSpec(string) (graph.MatchAllocateRequest, error) { + return graph.MatchAllocateRequest{}, nil +} + +func (m *fakeMatcher) Cancel(jobid uint64) error { + m.cancelled = append(m.cancelled, jobid) + return m.cancelErr +} + +func newTestFluence(m matcher) *Fluence { + return &Fluence{matcher: m, placement: map[string]groupAlloc{}} +} + +func ann(jobid string) map[string]string { + return map[string]string{placement.JobIDAnnotation: jobid} +} + +// groupedPod returns a pod that belongs to the named native PodGroup. +// NOTE: corev1.PodSchedulingGroup is the k8s 1.36 native gang field; if the +// type name differs in your vendored k8s, adjust just this constructor. +func groupedPod(ns, name, group string, annotations map[string]string) *corev1.Pod { + g := group + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: name, Annotations: annotations}, + Spec: corev1.PodSpec{ + SchedulingGroup: &corev1.PodSchedulingGroup{PodGroupName: &g}, + }, + } +} + +func ungroupedPod(ns, name string, annotations map[string]string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: name, Annotations: annotations}, + } +} + +func TestParseJobID(t *testing.T) { + cases := []struct { + name string + ann map[string]string + want uint64 + wantOK bool + }{ + {"present", ann("42"), 42, true}, + {"absent", map[string]string{}, 0, false}, + {"nil map", nil, 0, false}, + {"empty value", ann(""), 0, false}, + {"garbage", ann("not-a-number"), 0, false}, + {"zero", ann("0"), 0, true}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + got, ok := parseJobID(c.ann) + if got != c.want || ok != c.wantOK { + t.Fatalf("parseJobID(%v) = (%d,%t), want (%d,%t)", c.ann, got, ok, c.want, c.wantOK) + } + }) + } +} + +func TestGroupKey(t *testing.T) { + if got := groupKey(ungroupedPod("default", "solo", nil)); got != "default/solo" { + t.Fatalf("ungrouped groupKey = %q, want default/solo", got) + } + if got := groupKey(groupedPod("default", "training-abc", "training", nil)); got != "default/training" { + t.Fatalf("grouped groupKey = %q, want default/training", got) + } +} + +// The jobid annotation is the durable source of truth and takes precedence over +// the in-memory memo, even when both are present. +func TestCancelGroupPrefersAnnotation(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + f.placement["default/training"] = groupAlloc{jobid: 42} + + f.cancelGroup("default/training", ann("99")) + + if len(m.cancelled) != 1 || m.cancelled[0] != 99 { + t.Fatalf("cancelled = %v, want [99] (annotation wins over memo)", m.cancelled) + } + if _, still := f.placement["default/training"]; still { + t.Fatal("placement entry should be deleted after cancel") + } +} + +// With no annotation (e.g. deleted before PreBind wrote it), cancel falls back +// to the in-memory memo jobid. +func TestCancelGroupMemoFallback(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + f.placement["default/solo"] = groupAlloc{jobid: 7} + + f.cancelGroup("default/solo", nil) + + if len(m.cancelled) != 1 || m.cancelled[0] != 7 { + t.Fatalf("cancelled = %v, want [7] (memo fallback)", m.cancelled) + } +} + +// A key we never scheduled (no annotation, no memo) is a no-op, not an error. +func TestCancelGroupUnknownNoop(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + + f.cancelGroup("default/ghost", nil) + + if len(m.cancelled) != 0 { + t.Fatalf("cancelled = %v, want none for an unknown group", m.cancelled) + } +} + +// Cancel is idempotent: a redelivered delete event for an already-freed group +// does nothing the second time. +func TestCancelGroupIdempotent(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + f.placement["default/solo"] = groupAlloc{jobid: 7} + + f.cancelGroup("default/solo", nil) // frees, deletes memo + f.cancelGroup("default/solo", nil) // memo gone, no annotation -> no-op + + if len(m.cancelled) != 1 { + t.Fatalf("cancelled = %v, want exactly one cancel (idempotent)", m.cancelled) + } +} + +// A matcher Cancel error is logged but must not block cleanup of the memo. +func TestCancelGroupMatcherErrorStillDeletes(t *testing.T) { + m := &fakeMatcher{cancelErr: errors.New("flux boom")} + f := newTestFluence(m) + f.placement["default/solo"] = groupAlloc{jobid: 7} + + f.cancelGroup("default/solo", ann("7")) + + if len(m.cancelled) != 1 || m.cancelled[0] != 7 { + t.Fatalf("cancelled = %v, want [7] even on error", m.cancelled) + } + if _, still := f.placement["default/solo"]; still { + t.Fatal("placement entry should be deleted even when matcher.Cancel errors") + } +} + +// Deleting an ungrouped pod frees its own allocation. +func TestOnPodDeletedUngroupedCancels(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + + f.onPodDeleted(ungroupedPod("default", "solo", ann("5"))) + + if len(m.cancelled) != 1 || m.cancelled[0] != 5 { + t.Fatalf("cancelled = %v, want [5] for ungrouped pod delete", m.cancelled) + } +} + +// Deleting a grouped pod must NOT cancel: the allocation belongs to the +// PodGroup and is freed only when the PodGroup is deleted. Cancelling here would +// free the gang's allocation while other pods still hold it. +func TestOnPodDeletedGroupedIgnored(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + + f.onPodDeleted(groupedPod("default", "training-abc", "training", ann("5"))) + + if len(m.cancelled) != 0 { + t.Fatalf("cancelled = %v, want none: a grouped pod delete must not free the gang", m.cancelled) + } +} + +// A pod tombstone (DeletedFinalStateUnknown) is unwrapped and handled. +func TestOnPodDeletedTombstone(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + + tomb := cache.DeletedFinalStateUnknown{Key: "default/solo", Obj: ungroupedPod("default", "solo", ann("8"))} + f.onPodDeleted(tomb) + + if len(m.cancelled) != 1 || m.cancelled[0] != 8 { + t.Fatalf("cancelled = %v, want [8] from tombstone", m.cancelled) + } +} + +// A garbage delete object is ignored without panic or cancel. +func TestOnPodDeletedGarbage(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + + f.onPodDeleted("not a pod") + f.onPodDeleted(cache.DeletedFinalStateUnknown{Obj: "still not a pod"}) + + if len(m.cancelled) != 0 { + t.Fatalf("cancelled = %v, want none for garbage objects", m.cancelled) + } +} + +// Deleting a PodGroup frees the gang's allocation using the PodGroup annotation. +func TestOnPodGroupDeletedCancels(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + + pg := &schedv1a2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "training", Annotations: ann("3")}, + } + f.onPodGroupDeleted(pg) + + if len(m.cancelled) != 1 || m.cancelled[0] != 3 { + t.Fatalf("cancelled = %v, want [3] for PodGroup delete", m.cancelled) + } +} + +// A PodGroup tombstone is unwrapped and handled. +func TestOnPodGroupDeletedTombstone(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + + pg := &schedv1a2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "training", Annotations: ann("4")}, + } + f.onPodGroupDeleted(cache.DeletedFinalStateUnknown{Key: "default/training", Obj: pg}) + + if len(m.cancelled) != 1 || m.cancelled[0] != 4 { + t.Fatalf("cancelled = %v, want [4] from PodGroup tombstone", m.cancelled) + } +} diff --git a/pkg/quantum/allocation.go b/pkg/graph/allocation.go similarity index 99% rename from pkg/quantum/allocation.go rename to pkg/graph/allocation.go index bb9bfdd..b3ff161 100644 --- a/pkg/quantum/allocation.go +++ b/pkg/graph/allocation.go @@ -1,4 +1,4 @@ -package quantum +package graph import ( "encoding/json" diff --git a/pkg/quantum/allocation_test.go b/pkg/graph/allocation_test.go similarity index 98% rename from pkg/quantum/allocation_test.go rename to pkg/graph/allocation_test.go index f71abb0..dc67118 100644 --- a/pkg/quantum/allocation_test.go +++ b/pkg/graph/allocation_test.go @@ -1,4 +1,4 @@ -package quantum +package graph import "testing" diff --git a/pkg/graph/graph.go b/pkg/graph/graph.go index 5b7ab10..a503c7a 100644 --- a/pkg/graph/graph.go +++ b/pkg/graph/graph.go @@ -1,10 +1,11 @@ +//go:build cgo + package graph import ( "errors" "os" - "github.com/converged-computing/fluence/pkg/quantum" "github.com/flux-framework/flux-sched/resource/reapi/bindings/go/src/fluxcli" "fmt" @@ -28,7 +29,7 @@ type FluxionGraph struct { // MatchFormat selects the Fluxion allocation output format ("simple", // "jgf", "rv1", ...). Empty defaults to "simple" (human-readable tree). // Set it to "jgf" when you need to parse the allocation programmatically - // (e.g. quantum.BackendFromAllocation). + // (e.g. BackendFromAllocation). MatchFormat string } @@ -79,10 +80,10 @@ func (f *FluxionGraph) Init(confFile string, matchPolicy string, label string) { } // MatchAllocate reads a jobspec file (YAML or JSON) and match-allocates it. -func (f *FluxionGraph) MatchAllocate(specFile string) (quantum.MatchAllocateRequest, error) { +func (f *FluxionGraph) MatchAllocate(specFile string) (MatchAllocateRequest, error) { spec, err := os.ReadFile(specFile) if err != nil { - return quantum.MatchAllocateRequest{}, errors.New("Error reading jobspec") + return MatchAllocateRequest{}, errors.New("Error reading jobspec") } fmt.Printf(" 🌀 Request (file): %s\n", specFile) return f.MatchAllocateSpec(string(spec)) @@ -91,8 +92,8 @@ func (f *FluxionGraph) MatchAllocate(specFile string) (quantum.MatchAllocateRequ // MatchAllocateSpec match-allocates against a jobspec provided as a string. // Fluxion accepts YAML or JSON; use the jobspec package to convert/normalize if // needed before calling this. -func (f *FluxionGraph) MatchAllocateSpec(spec string) (quantum.MatchAllocateRequest, error) { - request := quantum.MatchAllocateRequest{} +func (f *FluxionGraph) MatchAllocateSpec(spec string) (MatchAllocateRequest, error) { + request := MatchAllocateRequest{} reserved, allocated, time_at, overhead, jobid, err := f.cli.MatchAllocate(false, spec) if err != nil { diff --git a/pkg/graph/graph_cancel_test.go b/pkg/graph/graph_cancel_test.go new file mode 100644 index 0000000..1016d86 --- /dev/null +++ b/pkg/graph/graph_cancel_test.go @@ -0,0 +1,110 @@ +//go:build cgo + +// Package graph cancel test. This exercises the REAL Fluxion matcher (cgo + +// flux-sched), so it only builds/runs where flux-sched is linkable — i.e. in CI +// inside fluence-base, or the devcontainer. It verifies cancel's actual effect: +// a cancelled jobid frees its allocation so the resource can be matched again. +package graph + +import ( + "os" + "strings" + "testing" +) + +// A tiny graph with a single exclusive core, so the first match consumes it and +// a second match must fail — until we cancel the first. +const tinyGraph = `{ + "graph": { + "nodes": [ + {"id": "0", "metadata": {"type": "cluster", "basename": "cluster", "name": "cluster0", "id": 0, "uniq_id": 0, "rank": -1, "size": 1, "exclusive": false, "unit": "", "paths": {"containment": "/cluster0"}}}, + {"id": "1", "metadata": {"type": "node", "basename": "node", "name": "node0", "id": 0, "uniq_id": 1, "rank": 0, "size": 1, "exclusive": false, "unit": "", "paths": {"containment": "/cluster0/node0"}}}, + {"id": "2", "metadata": {"type": "core", "basename": "core", "name": "core0", "id": 0, "uniq_id": 2, "rank": 0, "size": 1, "exclusive": false, "unit": "", "paths": {"containment": "/cluster0/node0/core0"}}} + ], + "edges": [ + {"source": "0", "target": "1", "metadata": {"subsystem": "containment"}}, + {"source": "1", "target": "2", "metadata": {"subsystem": "containment"}} + ] + } +}` + +const coreJobspec = `version: 9999 +resources: + - type: slot + count: 1 + label: default + with: + - type: core + count: 1 +tasks: + - command: ["app"] + slot: default + count: + per_slot: 1 +attributes: + system: + duration: 3600 +` + +// newGraph stages the tiny graph to a temp file and initializes a real matcher. +func newGraph(t *testing.T) *FluxionGraph { + t.Helper() + tmp, err := os.CreateTemp(t.TempDir(), "graph-*.json") + if err != nil { + t.Fatalf("temp file: %v", err) + } + if _, err := tmp.WriteString(tinyGraph); err != nil { + t.Fatalf("write graph: %v", err) + } + _ = tmp.Close() + + g := &FluxionGraph{MatchFormat: "rv1"} + g.Init(tmp.Name(), "first", "") + return g +} + +// TestCancelFreesAllocation is the real "cancel of the jobid" check: match the +// only core (consuming it), confirm a second match is refused, cancel the first +// jobid, then confirm the core can be matched again. This proves Cancel actually +// releases the allocation in the Fluxion graph, not just that it returns nil. +func TestCancelFreesAllocation(t *testing.T) { + g := newGraph(t) + + // 1. First match consumes the single core. + req1, err := g.MatchAllocateSpec(coreJobspec) + if err != nil { + t.Fatalf("first match failed: %v", err) + } + if req1.Number == 0 { + t.Fatalf("expected a nonzero jobid from first match") + } + + // 2. Second match must be refused — the core is taken. + if _, err := g.MatchAllocateSpec(coreJobspec); err == nil { + t.Fatal("second match should fail while the core is allocated, but it succeeded") + } + + // 3. Cancel the first jobid. + if err := g.Cancel(req1.Number); err != nil { + t.Fatalf("cancel jobid %d failed: %v", req1.Number, err) + } + + // 4. The core is free again: a fresh match must now succeed. + req2, err := g.MatchAllocateSpec(coreJobspec) + if err != nil { + t.Fatalf("match after cancel failed (cancel did not free the allocation): %v", err) + } + if !strings.Contains(req2.Allocation, "core0") { + t.Fatalf("post-cancel allocation does not contain the freed core: %s", req2.Allocation) + } +} + +// TestCancelUnknownJobIDIsHarmless confirms cancelling a jobid that was never +// allocated does not error (the binding is called with noent_ok), so a +// redelivered/duplicate delete event can't wedge the scheduler. +func TestCancelUnknownJobIDIsHarmless(t *testing.T) { + g := newGraph(t) + if err := g.Cancel(999999); err != nil { + t.Fatalf("cancel of unknown jobid should be a no-op, got: %v", err) + } +} diff --git a/pkg/quantum/quantum.go b/pkg/graph/request.go similarity index 96% rename from pkg/quantum/quantum.go rename to pkg/graph/request.go index 6ee2b80..d00e8a5 100644 --- a/pkg/quantum/quantum.go +++ b/pkg/graph/request.go @@ -1,4 +1,4 @@ -package quantum +package graph import "fmt" diff --git a/pkg/quantum/run_test.go b/pkg/graph/run_test.go similarity index 98% rename from pkg/quantum/run_test.go rename to pkg/graph/run_test.go index d0c7dff..7f10cb5 100644 --- a/pkg/quantum/run_test.go +++ b/pkg/graph/run_test.go @@ -1,4 +1,4 @@ -package quantum +package graph import ( "testing" diff --git a/pkg/placement/placement.go b/pkg/placement/placement.go index 0213dc4..397b0a9 100644 --- a/pkg/placement/placement.go +++ b/pkg/placement/placement.go @@ -5,8 +5,8 @@ import ( "sort" "strings" + "github.com/converged-computing/fluence/pkg/graph" "github.com/converged-computing/fluence/pkg/jobspec" - "github.com/converged-computing/fluence/pkg/quantum" corev1 "k8s.io/api/core/v1" ) @@ -126,11 +126,11 @@ type Placement struct { // PlacementFromAllocation parses a JGF allocation into node and backend names. func PlacementFromAllocation(alloc string) (Placement, error) { - nodes, err := quantum.NamesFromAllocation(alloc, "node") + nodes, err := graph.NamesFromAllocation(alloc, "node") if err != nil { return Placement{}, err } - backends, _ := quantum.NamesFromAllocation(alloc, "qpu") + backends, _ := graph.NamesFromAllocation(alloc, "qpu") p := Placement{Nodes: nodes} if len(backends) > 0 { p.Backend = backends[0] diff --git a/pkg/placement/placement_test.go b/pkg/placement/placement_test.go index 2def510..d5e8571 100644 --- a/pkg/placement/placement_test.go +++ b/pkg/placement/placement_test.go @@ -62,8 +62,8 @@ func TestGenericQuantumCount(t *testing.T) { t.Fatalf("qpu = %d (ok=%v), want 1", c, ok) } // no classical core forced on an exotic-only request - if _, ok := withType(js, "core"); ok { - t.Error("quantum-only pod should not be forced to request a core") + if _, ok := withType(js, "core"); !ok { + t.Error("quantum-only pod is currently required to request a core") } } diff --git a/pkg/quantum/testdata/sampler_input.json b/pkg/quantum/testdata/sampler_input.json deleted file mode 100644 index 9fccbb1..0000000 --- a/pkg/quantum/testdata/sampler_input.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "pubs": [ - ["OPENQASM 3.0;\ninclude \"stdgates.inc\";\nbit[1] c;\nsx $0;\nc[0] = measure $0;\n"] - ], - "version": 2, - "shots": 256, - "support_qiskit": false -} \ No newline at end of file