Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions .github/workflows/unit-tests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: unit-tests
on:
pull_request:
branches: [main]
push:
branches: [main]
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
test:
runs-on: ubuntu-latest
# fluence-base carries flux-sched 0.52.0 (installed to /usr) + Go, so the
# cgo matcher (pkg/graph) links and `make test` can build every package.
container:
image: ghcr.io/converged-computing/fluence-base:latest
env:
FLUX_SCHED_ROOT: /opt/flux-sched
LD_LIBRARY_PATH: /usr/lib:/opt/flux-sched/resource:/opt/flux-sched/resource/reapi/bindings:/opt/flux-sched/resource/libjobspec
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Versions
run: |
export PATH=$PATH:/usr/local/go/bin
go version
ls /opt/flux-sched >/dev/null && echo "flux-sched present"

- name: Go test
run: |
export PATH=$PATH:/usr/local/go/bin
make test
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ COPY . .
RUN CGO_ENABLED=1 \
CGO_CFLAGS="-I/opt/flux-sched" \
CGO_LDFLAGS="-L/opt/flux-sched/resource -L/opt/flux-sched/resource/libjobspec -L/opt/flux-sched/resource/reapi/bindings -lresource -ljobspec_conv -lreapi_cli -lflux-idset -lstdc++ -lczmq -ljansson -lhwloc -lboost_system -lflux-hostlist -lboost_graph -lyaml-cpp" \
go build -ldflags '-w' -o /bin/fluence ./cmd/fluence && \
CGO_ENABLED=0 go build -ldflags '-w' -o /bin/fluence-deviceplugin ./cmd/deviceplugin && \
CGO_ENABLED=0 go build -ldflags '-w' -o /bin/fluence-webhook ./cmd/webhook
go build -ldflags '-w' -o /bin/fluence ./cmd/fluence
RUN CGO_ENABLED=0 go build -ldflags '-w' -o /bin/fluence-deviceplugin ./cmd/deviceplugin
RUN CGO_ENABLED=0 go build -ldflags '-w' -o /bin/fluence-webhook ./cmd/webhook

FROM fluxrm/flux-core:noble AS runtime

Expand Down
14 changes: 5 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

FLUX_SCHED_ROOT ?= /opt/flux-sched
IMG ?= ghcr.io/converged-computing/fluence:latest
TEST_IMG ?= ghcr.io/converged-computing/fluence:test

# cgo flags for the scheduler binary: flux-sched only.
CGO_CFLAGS = -I$(FLUX_SCHED_ROOT)
Expand All @@ -25,14 +26,9 @@ build: ## Build all binaries (scheduler needs flux-sched; helpers are pure Go)
CGO_ENABLED=0 go build -o bin/fluence-webhook ./cmd/webhook

.PHONY: test
test: ## Pure-Go unit tests (no flux, no k8s scheduler libs, no cluster)
go test ./pkg/jgf/... ./pkg/cluster/... ./pkg/jobspec/... ./pkg/placement/... \
./pkg/quantum/... ./pkg/webhook/... ./pkg/deviceplugin/...

.PHONY: test-graph
test-graph: ## Matcher tests (needs flux-sched)
test:
CGO_ENABLED=1 CGO_CFLAGS="$(CGO_CFLAGS)" CGO_LDFLAGS="$(CGO_LDFLAGS)" \
go test ./pkg/graph/...
go test ./...

.PHONY: test-restore
test-restore:
Expand All @@ -45,8 +41,8 @@ image: ## Build the scheduler container image

.PHONY: test-image
test-image: ## Build the scheduler container image
docker build -t $(IMG)-test .
docker push $(IMG)-test
docker build -t $(TEST_IMG)-test .
docker push $(TEST_IMG)

.PHONY: test-image-deploy
test-image-deploy: test-image
Expand Down
92 changes: 52 additions & 40 deletions pkg/fluence/fluence.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ import (
// KubeSchedulerConfiguration.
const Name = "Fluence"

// matcher is the subset of *graph.FluxionGraph the plugin depends on. Declaring
// it as an interface lets tests inject a fake (the real matcher is cgo/flux and
// cannot run in a unit test). *graph.FluxionGraph satisfies this.
type matcher interface {
MatchAllocateSpec(spec string) (graph.MatchAllocateRequest, error)
Cancel(jobid uint64) error
}

// groupAlloc is the in-memory record of a group's Fluxion allocation. It is a
// rebuildable, within-lifetime memo: its job is race-free "match once per group"
// dedup on the scheduling path (the durable record is the jobid annotation on
Expand All @@ -47,7 +55,7 @@ type groupAlloc struct {
// delegated to the native PodGroup API; Fluence only decides placement.
type Fluence struct {
handle fwk.Handle
matcher *graph.FluxionGraph
matcher matcher

// matcherMu serializes all access to the cgo Fluxion client, which is not
// thread-safe. Match runs on the (sequential) scheduling path; Cancel runs in
Expand Down Expand Up @@ -122,12 +130,12 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error
// scheduling key is the same JGF vertex subgraph we parse for placement, and
// it carries the execution view flux uses to replay an allocation on restart.
// This is the format we persist and feed back to UpdateAllocate for recovery.
matcher := &graph.FluxionGraph{MatchFormat: "rv1"}
matcher.Init(tmp.Name(), os.Getenv("FLUENCE_MATCH_POLICY"), "")
fluxion := &graph.FluxionGraph{MatchFormat: "rv1"}
fluxion.Init(tmp.Name(), os.Getenv("FLUENCE_MATCH_POLICY"), "")

f := &Fluence{
handle: h,
matcher: matcher,
matcher: fluxion,
placement: map[string]groupAlloc{},
}
f.registerCancelHandlers()
Expand Down Expand Up @@ -299,50 +307,54 @@ func (f *Fluence) patchPodAnnotation(ctx context.Context, ns, name, key, val str
}

// registerCancelHandlers watches PodGroup and Pod deletions and frees the
// corresponding Fluxion allocation. Grouped pods are ignored by the pod handler
// (their allocation lives on the PodGroup); ungrouped pods are handled there.
// The framework has no deletion extension point, so this is informer-driven.
// corresponding Fluxion allocation. The framework has no deletion extension
// point, so this is informer-driven.
func (f *Fluence) registerCancelHandlers() {
sif := f.handle.SharedInformerFactory()

_, _ = sif.Scheduling().V1alpha2().PodGroups().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
pg, ok := obj.(*schedv1a2.PodGroup)
if !ok {
tomb, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
if pg, ok = tomb.Obj.(*schedv1a2.PodGroup); !ok {
return
}
}
f.cancelGroup(pg.Namespace+"/"+pg.Name, pg.Annotations)
},
DeleteFunc: f.onPodGroupDeleted,
})

_, _ = sif.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
tomb, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
if pod, ok = tomb.Obj.(*corev1.Pod); !ok {
return
}
}
// Grouped pods' allocation is owned by the PodGroup; only the
// PodGroup's deletion frees it. Act on ungrouped pods only.
if placement.PodGroupName(pod) != "" {
return
}
f.cancelGroup(pod.Namespace+"/"+pod.Name, pod.Annotations)
},
DeleteFunc: f.onPodDeleted,
})
}

// onPodGroupDeleted frees the gang's allocation when its PodGroup is deleted.
func (f *Fluence) onPodGroupDeleted(obj interface{}) {
pg, ok := obj.(*schedv1a2.PodGroup)
if !ok {
tomb, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
if pg, ok = tomb.Obj.(*schedv1a2.PodGroup); !ok {
return
}
}
f.cancelGroup(pg.Namespace+"/"+pg.Name, pg.Annotations)
}

// onPodDeleted frees an ungrouped pod's allocation when the pod is deleted.
// Grouped pods are ignored: their allocation is owned by the PodGroup and is
// freed only when the PodGroup is deleted (freeing it on a single pod's delete
// would release the whole gang's resources while its other pods still run).
func (f *Fluence) onPodDeleted(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
tomb, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
if pod, ok = tomb.Obj.(*corev1.Pod); !ok {
return
}
}
if placement.PodGroupName(pod) != "" {
return
}
f.cancelGroup(pod.Namespace+"/"+pod.Name, pod.Annotations)
}

// cancelGroup frees the allocation for a deleted owning object. The jobid comes
// from the object's annotation (the durable source of truth); if it is missing
// (e.g. deleted between PreFilter and PreBind, before the annotation was
Expand Down
Loading
Loading