From a0e5530d2af1977b7bb2e77238a83b544e3cac81 Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Sun, 31 May 2026 21:51:18 -0400 Subject: [PATCH 1/2] Add Firecracker UFFD snapshot pager --- cmd/api/config/config.go | 31 +- cmd/api/config/config_test.go | 29 + cmd/api/main.go | 5 + lib/hypervisor/firecracker/config.go | 73 ++- lib/hypervisor/firecracker/config_test.go | 46 +- lib/hypervisor/firecracker/firecracker.go | 4 +- lib/hypervisor/firecracker/process.go | 51 +- lib/instances/delete.go | 1 + lib/instances/firecracker_uffd.go | 150 +++++ lib/instances/fork.go | 5 + lib/instances/guest_resume_network.go | 106 +++- lib/instances/hypervisor_darwin.go | 12 + lib/instances/hypervisor_linux.go | 17 + lib/instances/manager.go | 89 ++- lib/instances/query.go | 8 + lib/instances/restore.go | 22 +- lib/instances/restore_egress_test.go | 41 ++ lib/instances/snapshot.go | 5 + lib/instances/standby.go | 4 + lib/instances/stop.go | 2 + lib/instances/types.go | 5 + lib/paths/paths.go | 25 + lib/providers/providers.go | 10 +- lib/uffdpager/cache.go | 95 +++ lib/uffdpager/cache_test.go | 46 ++ lib/uffdpager/server_linux.go | 675 ++++++++++++++++++++++ lib/uffdpager/supervisor_linux.go | 270 +++++++++ lib/uffdpager/supervisor_unsupported.go | 42 ++ lib/uffdpager/types.go | 62 ++ 29 files changed, 1848 insertions(+), 83 deletions(-) create mode 100644 lib/instances/firecracker_uffd.go create mode 100644 lib/uffdpager/cache.go create mode 100644 lib/uffdpager/cache_test.go create mode 100644 lib/uffdpager/server_linux.go create mode 100644 lib/uffdpager/supervisor_linux.go create mode 100644 lib/uffdpager/supervisor_unsupported.go create mode 100644 lib/uffdpager/types.go diff --git a/cmd/api/config/config.go b/cmd/api/config/config.go index a9ae8311..e315c7bb 100644 --- a/cmd/api/config/config.go +++ b/cmd/api/config/config.go @@ -193,10 +193,12 @@ type CapacityConfig struct { // HypervisorConfig holds hypervisor settings. type HypervisorConfig struct { - Default string `koanf:"default"` - CloudHypervisorDefaultVersion string `koanf:"cloud_hypervisor_default_version"` - FirecrackerBinaryPath string `koanf:"firecracker_binary_path"` - Memory HypervisorMemoryConfig `koanf:"memory"` + Default string `koanf:"default"` + CloudHypervisorDefaultVersion string `koanf:"cloud_hypervisor_default_version"` + FirecrackerBinaryPath string `koanf:"firecracker_binary_path"` + FirecrackerSnapshotMemoryBackend string `koanf:"firecracker_snapshot_memory_backend"` + FirecrackerUFFDCacheMaxBytes string `koanf:"firecracker_uffd_cache_max_bytes"` + Memory HypervisorMemoryConfig `koanf:"memory"` } // HypervisorMemoryConfig holds guest memory management settings. @@ -404,9 +406,11 @@ func defaultConfig() *Config { }, Hypervisor: HypervisorConfig{ - Default: "cloud-hypervisor", - CloudHypervisorDefaultVersion: "", - FirecrackerBinaryPath: "", + Default: "cloud-hypervisor", + CloudHypervisorDefaultVersion: "", + FirecrackerBinaryPath: "", + FirecrackerSnapshotMemoryBackend: "file", + FirecrackerUFFDCacheMaxBytes: "4294967296", Memory: HypervisorMemoryConfig{ Enabled: false, KernelPageInitMode: "hardened", @@ -618,6 +622,19 @@ func (c *Config) Validate() error { if c.Hypervisor.Memory.KernelPageInitMode != "performance" && c.Hypervisor.Memory.KernelPageInitMode != "hardened" { return fmt.Errorf("hypervisor.memory.kernel_page_init_mode must be one of {performance,hardened}, got %q", c.Hypervisor.Memory.KernelPageInitMode) } + backend := strings.ToLower(strings.TrimSpace(c.Hypervisor.FirecrackerSnapshotMemoryBackend)) + if backend == "" { + backend = "file" + } + switch backend { + case "file", "uffd": + c.Hypervisor.FirecrackerSnapshotMemoryBackend = backend + default: + return fmt.Errorf("hypervisor.firecracker_snapshot_memory_backend must be one of {file,uffd}, got %q", c.Hypervisor.FirecrackerSnapshotMemoryBackend) + } + if err := validateByteSize("hypervisor.firecracker_uffd_cache_max_bytes", c.Hypervisor.FirecrackerUFFDCacheMaxBytes); err != nil { + return err + } if err := validateDuration("hypervisor.memory.active_ballooning.poll_interval", c.Hypervisor.Memory.ActiveBallooning.PollInterval); err != nil { return err } diff --git a/cmd/api/config/config_test.go b/cmd/api/config/config_test.go index e7183504..f627c628 100644 --- a/cmd/api/config/config_test.go +++ b/cmd/api/config/config_test.go @@ -55,6 +55,35 @@ func TestDefaultConfigIncludesMetricsSettings(t *testing.T) { if cfg.Instances.LifecycleEventBufferSize != 256 { t.Fatalf("expected default instances.lifecycle_event_buffer_size to be 256, got %d", cfg.Instances.LifecycleEventBufferSize) } + if cfg.Hypervisor.FirecrackerSnapshotMemoryBackend != "file" { + t.Fatalf("expected default firecracker snapshot backend to be file, got %q", cfg.Hypervisor.FirecrackerSnapshotMemoryBackend) + } + if cfg.Hypervisor.FirecrackerUFFDCacheMaxBytes != "4294967296" { + t.Fatalf("expected default firecracker uffd cache size to be 4294967296, got %q", cfg.Hypervisor.FirecrackerUFFDCacheMaxBytes) + } +} + +func TestValidateFirecrackerSnapshotMemoryBackend(t *testing.T) { + cfg := defaultConfig() + cfg.Hypervisor.FirecrackerSnapshotMemoryBackend = "UFFD" + if err := cfg.Validate(); err != nil { + t.Fatalf("expected UFFD backend to validate, got %v", err) + } + if cfg.Hypervisor.FirecrackerSnapshotMemoryBackend != "uffd" { + t.Fatalf("expected backend to normalize to uffd, got %q", cfg.Hypervisor.FirecrackerSnapshotMemoryBackend) + } + + cfg = defaultConfig() + cfg.Hypervisor.FirecrackerSnapshotMemoryBackend = "bad" + if err := cfg.Validate(); err == nil { + t.Fatalf("expected invalid firecracker snapshot backend validation error") + } + + cfg = defaultConfig() + cfg.Hypervisor.FirecrackerUFFDCacheMaxBytes = "not-a-size" + if err := cfg.Validate(); err == nil { + t.Fatalf("expected invalid firecracker uffd cache size validation error") + } } func TestLoadEnvOverridesMetricsAndOtelInterval(t *testing.T) { diff --git a/cmd/api/main.go b/cmd/api/main.go index da2a2326..969a1f89 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -37,6 +37,7 @@ import ( "github.com/kernel/hypeman/lib/paths" "github.com/kernel/hypeman/lib/registry" "github.com/kernel/hypeman/lib/scopes" + "github.com/kernel/hypeman/lib/uffdpager" "github.com/kernel/hypeman/lib/vmm" nethttpmiddleware "github.com/oapi-codegen/nethttp-middleware" "github.com/riandyrn/otelchi" @@ -132,6 +133,10 @@ func startOCICacheGC(grp *errgroup.Group, ctx context.Context, runner ociCacheGC } func run() error { + if len(os.Args) > 1 && os.Args[1] == "--internal-uffd-pager" { + return uffdpager.Main(os.Args[2:]) + } + // Load config early for OTel initialization // Config path can be specified via CONFIG_PATH env var or defaults to platform-specific locations configPath := os.Getenv("CONFIG_PATH") diff --git a/lib/hypervisor/firecracker/config.go b/lib/hypervisor/firecracker/config.go index 5b6b48cc..07e7d4d8 100644 --- a/lib/hypervisor/firecracker/config.go +++ b/lib/hypervisor/firecracker/config.go @@ -5,8 +5,10 @@ import ( "fmt" "os" "path/filepath" + "strings" "github.com/kernel/hypeman/lib/hypervisor" + "github.com/kernel/hypeman/lib/uffdpager" ) const ( @@ -74,11 +76,16 @@ type snapshotCreateParams struct { } type snapshotLoadParams struct { - MemFilePath string `json:"mem_file_path,omitempty"` - SnapshotPath string `json:"snapshot_path"` - EnableDiffSnapshots bool `json:"enable_diff_snapshots,omitempty"` - ResumeVM bool `json:"resume_vm,omitempty"` - NetworkOverrides []networkOverride `json:"network_overrides,omitempty"` + MemBackend *snapshotMemBackend `json:"mem_backend,omitempty"` + SnapshotPath string `json:"snapshot_path"` + EnableDiffSnapshots bool `json:"enable_diff_snapshots,omitempty"` + ResumeVM bool `json:"resume_vm,omitempty"` + NetworkOverrides []networkOverride `json:"network_overrides,omitempty"` +} + +type snapshotMemBackend struct { + BackendType string `json:"backend_type"` + BackendPath string `json:"backend_path"` } type networkOverride struct { @@ -101,9 +108,18 @@ type instanceInfo struct { } type restoreMetadata struct { - NetworkOverrides []networkOverride `json:"network_overrides,omitempty"` - SnapshotSourceDataDir string `json:"snapshot_source_data_dir,omitempty"` - RetainSnapshotSourceDataDirAlias bool `json:"retain_snapshot_source_data_dir_alias,omitempty"` + NetworkOverrides []networkOverride `json:"network_overrides,omitempty"` + SnapshotSourceDataDir string `json:"snapshot_source_data_dir,omitempty"` + RetainSnapshotSourceDataDirAlias bool `json:"retain_snapshot_source_data_dir_alias,omitempty"` + SnapshotMemoryBackend string `json:"snapshot_memory_backend,omitempty"` + UFFDCacheKey string `json:"uffd_cache_key,omitempty"` + UFFDOverlays []uffdpager.OverlayPage `json:"uffd_overlays,omitempty"` +} + +type SnapshotMemoryBackendConfig struct { + Backend string + CacheKey string + Overlays []uffdpager.OverlayPage } func toBootSource(cfg hypervisor.VMConfig) bootSource { @@ -213,9 +229,9 @@ func toSnapshotCreateParams(snapshotDir string) snapshotCreateParams { } } -func toSnapshotLoadParams(snapshotDir string, networkOverrides []networkOverride, resumeVM bool) snapshotLoadParams { +func toSnapshotLoadParams(snapshotDir string, networkOverrides []networkOverride, resumeVM bool, backend snapshotMemBackend) snapshotLoadParams { return snapshotLoadParams{ - MemFilePath: snapshotMemoryPath(snapshotDir), + MemBackend: &backend, SnapshotPath: snapshotStatePath(snapshotDir), EnableDiffSnapshots: true, ResumeVM: resumeVM, @@ -223,6 +239,13 @@ func toSnapshotLoadParams(snapshotDir string, networkOverrides []networkOverride } } +func fileSnapshotMemBackend(snapshotDir string) snapshotMemBackend { + return snapshotMemBackend{ + BackendType: "File", + BackendPath: snapshotMemoryPath(snapshotDir), + } +} + func snapshotStatePath(snapshotDir string) string { return filepath.Join(snapshotDir, snapshotStateFile) } @@ -233,7 +256,8 @@ func snapshotMemoryPath(snapshotDir string) string { func saveRestoreMetadata(instanceDir string, networkConfigs []networkInterface) error { meta := restoreMetadata{ - NetworkOverrides: make([]networkOverride, 0, len(networkConfigs)), + NetworkOverrides: make([]networkOverride, 0, len(networkConfigs)), + SnapshotMemoryBackend: uffdpager.BackendFile, } for _, netCfg := range networkConfigs { meta.NetworkOverrides = append(meta.NetworkOverrides, networkOverride{ @@ -245,6 +269,33 @@ func saveRestoreMetadata(instanceDir string, networkConfigs []networkInterface) return saveRestoreMetadataState(instanceDir, &meta) } +func ConfigureSnapshotMemoryBackend(instanceDir string, cfg SnapshotMemoryBackendConfig) error { + meta, err := loadRestoreMetadata(instanceDir) + if err != nil { + return err + } + backend := strings.ToLower(strings.TrimSpace(cfg.Backend)) + if backend == "" { + backend = uffdpager.BackendFile + } + switch backend { + case uffdpager.BackendFile: + meta.SnapshotMemoryBackend = uffdpager.BackendFile + meta.UFFDCacheKey = "" + meta.UFFDOverlays = nil + case uffdpager.BackendUFFD: + if strings.TrimSpace(cfg.CacheKey) == "" { + return fmt.Errorf("uffd cache key is required") + } + meta.SnapshotMemoryBackend = uffdpager.BackendUFFD + meta.UFFDCacheKey = strings.TrimSpace(cfg.CacheKey) + meta.UFFDOverlays = append([]uffdpager.OverlayPage(nil), cfg.Overlays...) + default: + return fmt.Errorf("unsupported snapshot memory backend %q", cfg.Backend) + } + return saveRestoreMetadataState(instanceDir, meta) +} + func saveRestoreMetadataState(instanceDir string, meta *restoreMetadata) error { data, err := json.MarshalIndent(meta, "", " ") if err != nil { diff --git a/lib/hypervisor/firecracker/config_test.go b/lib/hypervisor/firecracker/config_test.go index 66adf83e..ede3abf8 100644 --- a/lib/hypervisor/firecracker/config_test.go +++ b/lib/hypervisor/firecracker/config_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/kernel/hypeman/lib/hypervisor" + "github.com/kernel/hypeman/lib/uffdpager" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -82,14 +83,55 @@ func TestSnapshotParamPaths(t *testing.T) { load := toSnapshotLoadParams("/tmp/snapshot-latest", []networkOverride{ {IfaceID: "eth0", HostDevName: "hype-abc123"}, - }, true) + }, true, fileSnapshotMemBackend("/tmp/snapshot-latest")) assert.Equal(t, "/tmp/snapshot-latest/state", load.SnapshotPath) - assert.Equal(t, "/tmp/snapshot-latest/memory", load.MemFilePath) + require.NotNil(t, load.MemBackend) + assert.Equal(t, "File", load.MemBackend.BackendType) + assert.Equal(t, "/tmp/snapshot-latest/memory", load.MemBackend.BackendPath) assert.True(t, load.EnableDiffSnapshots) assert.True(t, load.ResumeVM) require.Len(t, load.NetworkOverrides, 1) } +func TestSnapshotLoadParamsSupportsUFFDBackend(t *testing.T) { + load := toSnapshotLoadParams("/tmp/snapshot-latest", nil, true, snapshotMemBackend{ + BackendType: "Uffd", + BackendPath: "/tmp/pager.sock", + }) + require.NotNil(t, load.MemBackend) + assert.Equal(t, "Uffd", load.MemBackend.BackendType) + assert.Equal(t, "/tmp/pager.sock", load.MemBackend.BackendPath) +} + +func TestConfigureSnapshotMemoryBackendPersistsUFFDAndClearsForFile(t *testing.T) { + dir := t.TempDir() + require.NoError(t, saveRestoreMetadata(dir, []networkInterface{{IfaceID: "eth0", HostDevName: "tap0"}})) + + require.NoError(t, ConfigureSnapshotMemoryBackend(dir, SnapshotMemoryBackendConfig{ + Backend: uffdpager.BackendUFFD, + CacheKey: "cache-key", + Overlays: []uffdpager.OverlayPage{{ + GuestMemoryOffset: 4096, + Path: "/tmp/overlay.page", + }}, + })) + meta, err := loadRestoreMetadata(dir) + require.NoError(t, err) + assert.Equal(t, uffdpager.BackendUFFD, meta.SnapshotMemoryBackend) + assert.Equal(t, "cache-key", meta.UFFDCacheKey) + require.Len(t, meta.UFFDOverlays, 1) + assert.Equal(t, int64(4096), meta.UFFDOverlays[0].GuestMemoryOffset) + require.Len(t, meta.NetworkOverrides, 1) + + require.NoError(t, ConfigureSnapshotMemoryBackend(dir, SnapshotMemoryBackendConfig{Backend: uffdpager.BackendFile})) + meta, err = loadRestoreMetadata(dir) + require.NoError(t, err) + assert.Equal(t, uffdpager.BackendFile, meta.SnapshotMemoryBackend) + assert.Empty(t, meta.UFFDCacheKey) + assert.Empty(t, meta.UFFDOverlays) + require.Len(t, meta.NetworkOverrides, 1) +} + func TestToBalloonConfig(t *testing.T) { cfg := hypervisor.VMConfig{ GuestMemory: hypervisor.GuestMemoryConfig{ diff --git a/lib/hypervisor/firecracker/firecracker.go b/lib/hypervisor/firecracker/firecracker.go index ff5c44d6..09a2d5ab 100644 --- a/lib/hypervisor/firecracker/firecracker.go +++ b/lib/hypervisor/firecracker/firecracker.go @@ -228,8 +228,8 @@ func (f *Firecracker) instanceStart(ctx context.Context) error { return f.postAction(ctx, "InstanceStart") } -func (f *Firecracker) loadSnapshot(ctx context.Context, snapshotDir string, networkOverrides []networkOverride, resumeVM bool) error { - params := toSnapshotLoadParams(snapshotDir, networkOverrides, resumeVM) +func (f *Firecracker) loadSnapshot(ctx context.Context, snapshotDir string, networkOverrides []networkOverride, resumeVM bool, backend snapshotMemBackend) error { + params := toSnapshotLoadParams(snapshotDir, networkOverrides, resumeVM, backend) if _, err := f.do(ctx, http.MethodPut, "/snapshot/load", params, http.StatusNoContent); err != nil { return err } diff --git a/lib/hypervisor/firecracker/process.go b/lib/hypervisor/firecracker/process.go index 5044fa2b..64bddf69 100644 --- a/lib/hypervisor/firecracker/process.go +++ b/lib/hypervisor/firecracker/process.go @@ -14,6 +14,7 @@ import ( "github.com/kernel/hypeman/lib/hypervisor" "github.com/kernel/hypeman/lib/paths" + "github.com/kernel/hypeman/lib/uffdpager" "gvisor.dev/gvisor/pkg/cleanup" ) @@ -33,11 +34,30 @@ func init() { }) } +type UFFDClient interface { + CreateSession(ctx context.Context, req uffdpager.CreateSessionRequest) (*uffdpager.CreateSessionResponse, error) + CloseSession(ctx context.Context, sessionID string) error +} + +type StarterOption func(*Starter) + // Starter implements hypervisor.VMStarter for Firecracker. -type Starter struct{} +type Starter struct { + uffd UFFDClient +} + +func NewStarter(opts ...StarterOption) *Starter { + s := &Starter{} + for _, opt := range opts { + opt(s) + } + return s +} -func NewStarter() *Starter { - return &Starter{} +func WithUFFDClient(client UFFDClient) StarterOption { + return func(s *Starter) { + s.uffd = client + } } var _ hypervisor.VMStarter = (*Starter)(nil) @@ -118,14 +138,37 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, return 0, nil, fmt.Errorf("load firecracker restore metadata: %w", err) } resumeOnLoad := shouldResumeOnSnapshotLoad() + backend := fileSnapshotMemBackend(snapshotPath) + createdUFFDSession := "" + if strings.EqualFold(strings.TrimSpace(meta.SnapshotMemoryBackend), uffdpager.BackendUFFD) { + if s.uffd == nil { + return 0, nil, fmt.Errorf("uffd snapshot restore requested but no uffd pager is configured") + } + sessionID := filepath.Base(filepath.Dir(socketPath)) + resp, err := s.uffd.CreateSession(ctx, uffdpager.CreateSessionRequest{ + SessionID: sessionID, + InstanceID: sessionID, + BackingMemoryPath: snapshotMemoryPath(snapshotPath), + CacheKey: meta.UFFDCacheKey, + Overlays: meta.UFFDOverlays, + }) + if err != nil { + return 0, nil, fmt.Errorf("create uffd pager session: %w", err) + } + createdUFFDSession = resp.SessionID + backend = snapshotMemBackend{BackendType: "Uffd", BackendPath: resp.UFFDSocketPath} + } err = func() error { snapshotSourceAliasMu.Lock() defer snapshotSourceAliasMu.Unlock() return withSnapshotSourceDirAlias(meta, filepath.Dir(socketPath), func() error { - return hv.loadSnapshot(ctx, snapshotPath, meta.NetworkOverrides, resumeOnLoad) + return hv.loadSnapshot(ctx, snapshotPath, meta.NetworkOverrides, resumeOnLoad, backend) }) }() if err != nil { + if createdUFFDSession != "" { + _ = s.uffd.CloseSession(context.Background(), createdUFFDSession) + } return 0, nil, fmt.Errorf("load firecracker snapshot: %w", err) } hv.restoredResumed = resumeOnLoad diff --git a/lib/instances/delete.go b/lib/instances/delete.go index 283e0b19..90236809 100644 --- a/lib/instances/delete.go +++ b/lib/instances/delete.go @@ -81,6 +81,7 @@ func (m *manager) deleteInstance( log.WarnContext(ctx, "failed to kill hypervisor, continuing with cleanup", "instance_id", id, "error", err) } } + m.closeFirecrackerUFFDSession(ctx, stored) // 6. Release network allocation if inst.NetworkEnabled { diff --git a/lib/instances/firecracker_uffd.go b/lib/instances/firecracker_uffd.go new file mode 100644 index 00000000..dc731f55 --- /dev/null +++ b/lib/instances/firecracker_uffd.go @@ -0,0 +1,150 @@ +package instances + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "os" + "path/filepath" + "strings" + "syscall" + "time" + + "github.com/kernel/hypeman/lib/hypervisor" + "github.com/kernel/hypeman/lib/hypervisor/firecracker" + "github.com/kernel/hypeman/lib/logger" + "github.com/kernel/hypeman/lib/uffdpager" +) + +func (m *manager) useFirecrackerUFFD(stored *StoredMetadata) bool { + return stored != nil && + stored.HypervisorType == hypervisor.TypeFirecracker && + m.firecrackerSnapshotMemoryBackend == uffdpager.BackendUFFD +} + +func (m *manager) configureFirecrackerSnapshotRestore(stored *StoredMetadata, snapshotDir string, overlays []uffdpager.OverlayPage) error { + if stored == nil || stored.HypervisorType != hypervisor.TypeFirecracker { + return nil + } + if !m.useFirecrackerUFFD(stored) { + stored.FirecrackerUFFDSessionID = "" + stored.FirecrackerUFFDPagerVersion = "" + return firecracker.ConfigureSnapshotMemoryBackend(stored.DataDir, firecracker.SnapshotMemoryBackendConfig{ + Backend: uffdpager.BackendFile, + }) + } + if m.firecrackerUFFDPager == nil { + return fmt.Errorf("firecracker uffd snapshot restore is enabled but the pager is not configured") + } + + cacheKey := strings.TrimSpace(stored.FirecrackerSnapshotCacheKey) + if cacheKey == "" { + var err error + cacheKey, err = firecrackerSnapshotCacheKey(stored, snapshotDir) + if err != nil { + return err + } + stored.FirecrackerSnapshotCacheKey = cacheKey + } + stored.FirecrackerUFFDSessionID = stored.Id + stored.FirecrackerUFFDPagerVersion = m.firecrackerUFFDPager.VersionKey() + return firecracker.ConfigureSnapshotMemoryBackend(stored.DataDir, firecracker.SnapshotMemoryBackendConfig{ + Backend: uffdpager.BackendUFFD, + CacheKey: cacheKey, + Overlays: overlays, + }) +} + +func (m *manager) refreshFirecrackerSnapshotCacheKey(stored *StoredMetadata, snapshotDir string) error { + if stored == nil || stored.HypervisorType != hypervisor.TypeFirecracker { + return nil + } + key, err := firecrackerSnapshotCacheKey(stored, snapshotDir) + if err != nil { + return err + } + stored.FirecrackerSnapshotCacheKey = key + return nil +} + +func (m *manager) closeFirecrackerUFFDSession(ctx context.Context, stored *StoredMetadata) { + if stored == nil || stored.HypervisorType != hypervisor.TypeFirecracker || stored.FirecrackerUFFDSessionID == "" { + return + } + log := logger.FromContext(ctx) + if m.firecrackerUFFDPager == nil { + log.WarnContext(ctx, "cannot close firecracker uffd session; pager is not configured", + "instance_id", stored.Id, + "session_id", stored.FirecrackerUFFDSessionID, + "pager_version", stored.FirecrackerUFFDPagerVersion) + stored.FirecrackerUFFDSessionID = "" + stored.FirecrackerUFFDPagerVersion = "" + return + } + version := stored.FirecrackerUFFDPagerVersion + if version == "" { + version = m.firecrackerUFFDPager.VersionKey() + } + closeCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + if err := m.firecrackerUFFDPager.CloseSessionVersion(closeCtx, version, stored.FirecrackerUFFDSessionID); err != nil { + log.WarnContext(ctx, "failed to close firecracker uffd session", + "instance_id", stored.Id, + "session_id", stored.FirecrackerUFFDSessionID, + "pager_version", version, + "error", err) + } + stored.FirecrackerUFFDSessionID = "" + stored.FirecrackerUFFDPagerVersion = "" +} + +func (m *manager) checkFirecrackerUFFDSessionHealth(ctx context.Context, stored *StoredMetadata) error { + if stored == nil || stored.HypervisorType != hypervisor.TypeFirecracker || stored.FirecrackerUFFDSessionID == "" { + return nil + } + if m.firecrackerUFFDPager == nil { + return fmt.Errorf("firecracker uffd session %s has no configured pager", stored.FirecrackerUFFDSessionID) + } + version := stored.FirecrackerUFFDPagerVersion + if version == "" { + version = m.firecrackerUFFDPager.VersionKey() + } + healthCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer cancel() + if _, err := m.firecrackerUFFDPager.HealthVersion(healthCtx, version); err != nil { + return fmt.Errorf("firecracker uffd pager %s for session %s is unhealthy: %w", version, stored.FirecrackerUFFDSessionID, err) + } + return nil +} + +func firecrackerSnapshotCacheKey(stored *StoredMetadata, snapshotDir string) (string, error) { + memoryInfo, err := os.Stat(filepath.Join(snapshotDir, firecrackerSnapshotMemoryFile)) + if err != nil { + return "", fmt.Errorf("stat firecracker snapshot memory for uffd cache key: %w", err) + } + stateInfo, err := os.Stat(filepath.Join(snapshotDir, "state")) + if err != nil { + return "", fmt.Errorf("stat firecracker snapshot state for uffd cache key: %w", err) + } + sum := sha256.Sum256([]byte(fmt.Sprintf( + "%s:%s:%d:%d:%d:%d", + stored.Id, + fileStatFingerprint(memoryInfo), + memoryInfo.Size(), + memoryInfo.ModTime().UnixNano(), + stateInfo.Size(), + stateInfo.ModTime().UnixNano(), + ))) + return hex.EncodeToString(sum[:])[:24], nil +} + +func fileStatFingerprint(info os.FileInfo) string { + if info == nil { + return "" + } + if stat, ok := info.Sys().(*syscall.Stat_t); ok { + return fmt.Sprintf("%s:%d:%d:%d", info.Name(), info.Mode(), stat.Dev, stat.Ino) + } + return fmt.Sprintf("%s:%d", info.Name(), info.Mode()) +} diff --git a/lib/instances/fork.go b/lib/instances/fork.go index d74a75a9..308d94dc 100644 --- a/lib/instances/fork.go +++ b/lib/instances/fork.go @@ -285,6 +285,11 @@ func (m *manager) forkInstanceFromStoppedOrStandby(ctx context.Context, id strin forkMeta.ExitCode = nil forkMeta.ExitMessage = "" forkMeta.RestartStatus = restartpolicy.Status{} + forkMeta.FirecrackerUFFDSessionID = "" + forkMeta.FirecrackerUFFDPagerVersion = "" + if source.State != StateStandby { + forkMeta.FirecrackerSnapshotCacheKey = "" + } // Forks are new instances; phase accounting must not inherit the source's // cumulative durations. The first transition into the fork's runtime // phase (Standby for snapshot forks, Stopped for stopped forks) will be diff --git a/lib/instances/guest_resume_network.go b/lib/instances/guest_resume_network.go index 46488560..50fb7261 100644 --- a/lib/instances/guest_resume_network.go +++ b/lib/instances/guest_resume_network.go @@ -5,7 +5,9 @@ import ( "context" "encoding/binary" "encoding/json" + "errors" "fmt" + "io" stdnet "net" "os" "path/filepath" @@ -15,6 +17,7 @@ import ( "github.com/kernel/hypeman/lib/hypervisor" "github.com/kernel/hypeman/lib/logger" + "github.com/kernel/hypeman/lib/uffdpager" "go.opentelemetry.io/otel/attribute" "golang.org/x/sys/unix" ) @@ -199,36 +202,98 @@ func (m *manager) waitForGuestResumeNetworkUDPAck(ctx context.Context, waiter *g } func patchGuestResumeNetworkMailbox(snapshotDir, token string, payload *guestResumeNetworkPayload) error { + payloadBytes, idx, file, err := prepareGuestResumeNetworkMailbox(snapshotDir, token, payload, os.O_RDWR) + if err != nil { + return err + } + defer file.Close() + + if _, err := file.WriteAt(payloadBytes, idx+int64(guestResumeNetworkMailboxPayloadOffset)); err != nil { + return fmt.Errorf("write resume network mailbox payload: %w", err) + } + var u32 [4]byte + binary.LittleEndian.PutUint32(u32[:], uint32(len(payloadBytes))) + if _, err := file.WriteAt(u32[:], idx+int64(guestResumeNetworkMailboxLengthOffset)); err != nil { + return fmt.Errorf("write resume network mailbox payload length: %w", err) + } + binary.LittleEndian.PutUint32(u32[:], 1) + if _, err := file.WriteAt(u32[:], idx+int64(guestResumeNetworkMailboxSeqOffset)); err != nil { + return fmt.Errorf("write resume network mailbox sequence: %w", err) + } + return nil +} + +func buildGuestResumeNetworkMailboxOverlay(snapshotDir, token string, payload *guestResumeNetworkPayload) (uffdpager.OverlayPage, error) { + payloadBytes, idx, file, err := prepareGuestResumeNetworkMailbox(snapshotDir, token, payload, os.O_RDONLY) + if err != nil { + return uffdpager.OverlayPage{}, err + } + defer file.Close() + + const pageSize = 4096 + pageOffset := (idx / pageSize) * pageSize + pageRelative := idx - pageOffset + if pageRelative+int64(guestResumeNetworkMailboxPayloadOffset)+int64(len(payloadBytes)) > pageSize { + return uffdpager.OverlayPage{}, fmt.Errorf("resume network mailbox crosses a page boundary") + } + + page := make([]byte, pageSize) + n, err := file.ReadAt(page, pageOffset) + if err != nil && !errors.Is(err, io.EOF) { + return uffdpager.OverlayPage{}, fmt.Errorf("read resume network mailbox overlay source page: %w", err) + } + if n == 0 && err != nil { + return uffdpager.OverlayPage{}, fmt.Errorf("read resume network mailbox overlay source page: %w", err) + } + + base := int(pageRelative) + copy(page[base+guestResumeNetworkMailboxPayloadOffset:], payloadBytes) + binary.LittleEndian.PutUint32(page[base+guestResumeNetworkMailboxLengthOffset:], uint32(len(payloadBytes))) + binary.LittleEndian.PutUint32(page[base+guestResumeNetworkMailboxSeqOffset:], 1) + + overlayDir := filepath.Join(snapshotDir, "uffd-overlays") + if err := os.MkdirAll(overlayDir, 0755); err != nil { + return uffdpager.OverlayPage{}, fmt.Errorf("create uffd overlay directory: %w", err) + } + overlayPath := filepath.Join(overlayDir, fmt.Sprintf("resume-network-mailbox-%d.page", pageOffset)) + if err := os.WriteFile(overlayPath, page, 0644); err != nil { + return uffdpager.OverlayPage{}, fmt.Errorf("write resume network mailbox overlay page: %w", err) + } + return uffdpager.OverlayPage{GuestMemoryOffset: pageOffset, Path: overlayPath}, nil +} + +func prepareGuestResumeNetworkMailbox(snapshotDir, token string, payload *guestResumeNetworkPayload, flag int) ([]byte, int64, *os.File, error) { if token == "" { - return fmt.Errorf("resume network mailbox token is empty") + return nil, 0, nil, fmt.Errorf("resume network mailbox token is empty") } if len(token) > guestResumeNetworkMailboxSeqOffset-len(guestResumeNetworkMailboxMagic) { - return fmt.Errorf("resume network mailbox token is too long") + return nil, 0, nil, fmt.Errorf("resume network mailbox token is too long") } if payload == nil { - return fmt.Errorf("resume network mailbox payload is nil") + return nil, 0, nil, fmt.Errorf("resume network mailbox payload is nil") } payloadBytes, err := json.Marshal(payload) if err != nil { - return fmt.Errorf("marshal resume network mailbox payload: %w", err) + return nil, 0, nil, fmt.Errorf("marshal resume network mailbox payload: %w", err) } if len(payloadBytes) > 4096-guestResumeNetworkMailboxPayloadOffset { - return fmt.Errorf("resume network mailbox payload too large: %d bytes", len(payloadBytes)) + return nil, 0, nil, fmt.Errorf("resume network mailbox payload too large: %d bytes", len(payloadBytes)) } - file, err := os.OpenFile(filepath.Join(snapshotDir, firecrackerSnapshotMemoryFile), os.O_RDWR, 0) + file, err := os.OpenFile(filepath.Join(snapshotDir, firecrackerSnapshotMemoryFile), flag, 0) if err != nil { - return fmt.Errorf("open snapshot memory for resume network mailbox: %w", err) + return nil, 0, nil, fmt.Errorf("open snapshot memory for resume network mailbox: %w", err) } - defer file.Close() info, err := file.Stat() if err != nil { - return fmt.Errorf("stat snapshot memory for resume network mailbox: %w", err) + file.Close() + return nil, 0, nil, fmt.Errorf("stat snapshot memory for resume network mailbox: %w", err) } if info.Size() <= 0 { - return fmt.Errorf("resume network mailbox memory file is empty") + file.Close() + return nil, 0, nil, fmt.Errorf("resume network mailbox memory file is empty") } marker := make([]byte, 0, len(guestResumeNetworkMailboxMagic)+len(token)) @@ -237,25 +302,14 @@ func patchGuestResumeNetworkMailbox(snapshotDir, token string, payload *guestRes idx, err := findGuestResumeNetworkMailbox(file, info.Size(), marker, token) if err != nil { - return err + file.Close() + return nil, 0, nil, err } if idx+int64(guestResumeNetworkMailboxPayloadOffset)+int64(len(payloadBytes)) > info.Size() { - return fmt.Errorf("resume network mailbox marker is too close to end of memory file") + file.Close() + return nil, 0, nil, fmt.Errorf("resume network mailbox marker is too close to end of memory file") } - - if _, err := file.WriteAt(payloadBytes, idx+int64(guestResumeNetworkMailboxPayloadOffset)); err != nil { - return fmt.Errorf("write resume network mailbox payload: %w", err) - } - var u32 [4]byte - binary.LittleEndian.PutUint32(u32[:], uint32(len(payloadBytes))) - if _, err := file.WriteAt(u32[:], idx+int64(guestResumeNetworkMailboxLengthOffset)); err != nil { - return fmt.Errorf("write resume network mailbox payload length: %w", err) - } - binary.LittleEndian.PutUint32(u32[:], 1) - if _, err := file.WriteAt(u32[:], idx+int64(guestResumeNetworkMailboxSeqOffset)); err != nil { - return fmt.Errorf("write resume network mailbox sequence: %w", err) - } - return nil + return payloadBytes, idx, file, nil } func findGuestResumeNetworkMailbox(file *os.File, size int64, marker []byte, token string) (int64, error) { diff --git a/lib/instances/hypervisor_darwin.go b/lib/instances/hypervisor_darwin.go index 183a928e..1cd97abe 100644 --- a/lib/instances/hypervisor_darwin.go +++ b/lib/instances/hypervisor_darwin.go @@ -3,10 +3,15 @@ package instances import ( + "context" + "fmt" + "github.com/kernel/hypeman/lib/hypervisor" "github.com/kernel/hypeman/lib/hypervisor/cloudhypervisor" "github.com/kernel/hypeman/lib/hypervisor/qemu" "github.com/kernel/hypeman/lib/hypervisor/vz" + "github.com/kernel/hypeman/lib/paths" + "github.com/kernel/hypeman/lib/uffdpager" ) func init() { @@ -14,3 +19,10 @@ func init() { platformStarters[hypervisor.TypeQEMU] = qemu.NewStarter() platformStarters[hypervisor.TypeVZ] = vz.NewStarter() } + +func configurePlatformStarters(_ context.Context, _ *paths.Paths, _ map[hypervisor.Type]hypervisor.VMStarter, cfg ManagerConfig) (*uffdpager.Supervisor, error) { + if cfg.FirecrackerSnapshotMemoryBackend == uffdpager.BackendUFFD { + return nil, fmt.Errorf("firecracker uffd snapshot restore is only supported on linux") + } + return nil, nil +} diff --git a/lib/instances/hypervisor_linux.go b/lib/instances/hypervisor_linux.go index 3e2269f0..149f1718 100644 --- a/lib/instances/hypervisor_linux.go +++ b/lib/instances/hypervisor_linux.go @@ -3,10 +3,15 @@ package instances import ( + "context" + "fmt" + "github.com/kernel/hypeman/lib/hypervisor" "github.com/kernel/hypeman/lib/hypervisor/cloudhypervisor" "github.com/kernel/hypeman/lib/hypervisor/firecracker" "github.com/kernel/hypeman/lib/hypervisor/qemu" + "github.com/kernel/hypeman/lib/paths" + "github.com/kernel/hypeman/lib/uffdpager" ) func init() { @@ -14,3 +19,15 @@ func init() { platformStarters[hypervisor.TypeFirecracker] = firecracker.NewStarter() platformStarters[hypervisor.TypeQEMU] = qemu.NewStarter() } + +func configurePlatformStarters(ctx context.Context, p *paths.Paths, starters map[hypervisor.Type]hypervisor.VMStarter, cfg ManagerConfig) (*uffdpager.Supervisor, error) { + if cfg.FirecrackerSnapshotMemoryBackend != uffdpager.BackendUFFD { + return nil, nil + } + pager, err := uffdpager.NewSupervisor(ctx, p.DataDir(), cfg.FirecrackerUFFDCacheMaxBytes) + if err != nil { + return nil, fmt.Errorf("start firecracker uffd pager: %w", err) + } + starters[hypervisor.TypeFirecracker] = firecracker.NewStarter(firecracker.WithUFFDClient(pager)) + return pager, nil +} diff --git a/lib/instances/manager.go b/lib/instances/manager.go index c55ba93a..599dee75 100644 --- a/lib/instances/manager.go +++ b/lib/instances/manager.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "strings" "sync" "time" @@ -17,6 +18,7 @@ import ( "github.com/kernel/hypeman/lib/paths" "github.com/kernel/hypeman/lib/resources" "github.com/kernel/hypeman/lib/system" + "github.com/kernel/hypeman/lib/uffdpager" "github.com/kernel/hypeman/lib/volumes" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -81,7 +83,9 @@ type ResourceLimits struct { // ManagerConfig holds non-resource manager behavior settings. type ManagerConfig struct { - LifecycleEventBufferSize int + LifecycleEventBufferSize int + FirecrackerSnapshotMemoryBackend string + FirecrackerUFFDCacheMaxBytes int64 } // Normalize applies defaults to manager config values. @@ -89,6 +93,13 @@ func (c ManagerConfig) Normalize() ManagerConfig { if c.LifecycleEventBufferSize <= 0 { c.LifecycleEventBufferSize = defaultLifecycleEventBufferSize } + c.FirecrackerSnapshotMemoryBackend = strings.ToLower(strings.TrimSpace(c.FirecrackerSnapshotMemoryBackend)) + if c.FirecrackerSnapshotMemoryBackend == "" { + c.FirecrackerSnapshotMemoryBackend = uffdpager.BackendFile + } + if c.FirecrackerUFFDCacheMaxBytes <= 0 { + c.FirecrackerUFFDCacheMaxBytes = 4 << 30 + } return c } @@ -149,9 +160,11 @@ type manager struct { tapGCOnce sync.Once // Hypervisor support - vmStarters map[hypervisor.Type]hypervisor.VMStarter - defaultHypervisor hypervisor.Type // Default hypervisor type when not specified in request - guestMemoryPolicy guestmemory.Policy + vmStarters map[hypervisor.Type]hypervisor.VMStarter + defaultHypervisor hypervisor.Type // Default hypervisor type when not specified in request + guestMemoryPolicy guestmemory.Policy + firecrackerSnapshotMemoryBackend string + firecrackerUFFDPager *uffdpager.Supervisor } // platformStarters is populated by platform-specific init functions. @@ -166,6 +179,16 @@ func NewManager(p *paths.Paths, imageManager images.Manager, systemManager syste // NewManagerWithConfig creates a new instances manager with additional manager settings. func NewManagerWithConfig(p *paths.Paths, imageManager images.Manager, systemManager system.Manager, networkManager network.Manager, deviceManager devices.Manager, volumeManager volumes.Manager, limits ResourceLimits, defaultHypervisor hypervisor.Type, snapshotDefaults SnapshotPolicy, managerConfig ManagerConfig, meter metric.Meter, tracer trace.Tracer, memoryPolicy ...guestmemory.Policy) Manager { + m, err := NewManagerWithConfigE(p, imageManager, systemManager, networkManager, deviceManager, volumeManager, limits, defaultHypervisor, snapshotDefaults, managerConfig, meter, tracer, memoryPolicy...) + if err != nil { + panic(err) + } + return m +} + +// NewManagerWithConfigE creates a new instances manager and returns startup +// errors for optional host services such as the Firecracker UFFD pager. +func NewManagerWithConfigE(p *paths.Paths, imageManager images.Manager, systemManager system.Manager, networkManager network.Manager, deviceManager devices.Manager, volumeManager volumes.Manager, limits ResourceLimits, defaultHypervisor hypervisor.Type, snapshotDefaults SnapshotPolicy, managerConfig ManagerConfig, meter metric.Meter, tracer trace.Tracer, memoryPolicy ...guestmemory.Policy) (Manager, error) { // Validate and default the hypervisor type if defaultHypervisor == "" { defaultHypervisor = hypervisor.TypeCloudHypervisor @@ -179,34 +202,44 @@ func NewManagerWithConfig(p *paths.Paths, imageManager images.Manager, systemMan managerConfig = managerConfig.Normalize() // Initialize VM starters from platform-specific init functions - vmStarters := make(map[hypervisor.Type]hypervisor.VMStarter, len(platformStarters)) + rawStarters := make(map[hypervisor.Type]hypervisor.VMStarter, len(platformStarters)) for hvType, starter := range platformStarters { + rawStarters[hvType] = starter + } + firecrackerUFFDPager, err := configurePlatformStarters(context.Background(), p, rawStarters, managerConfig) + if err != nil { + return nil, err + } + vmStarters := make(map[hypervisor.Type]hypervisor.VMStarter, len(rawStarters)) + for hvType, starter := range rawStarters { vmStarters[hvType] = hypervisor.WrapVMStarter(hvType, starter) } m := &manager{ - paths: p, - imageManager: imageManager, - systemManager: systemManager, - networkManager: networkManager, - deviceManager: deviceManager, - volumeManager: volumeManager, - limits: limits, - instanceLocks: sync.Map{}, - bootMarkerScans: sync.Map{}, - hostTopology: detectHostTopology(), // Detect and cache host topology - vmStarters: vmStarters, - defaultHypervisor: defaultHypervisor, - now: time.Now, - writeFile: os.WriteFile, - meter: meter, - tracer: tracer, - guestMemoryPolicy: policy, - snapshotDefaults: snapshotDefaults, - compressionJobs: make(map[string]*compressionJob), - nativeCodecPaths: make(map[string]string), - lifecycleEvents: newLifecycleSubscribersWithBufferSize(managerConfig.LifecycleEventBufferSize), - guestAgentReadyProbe: probeGuestAgentReady, + paths: p, + imageManager: imageManager, + systemManager: systemManager, + networkManager: networkManager, + deviceManager: deviceManager, + volumeManager: volumeManager, + limits: limits, + instanceLocks: sync.Map{}, + bootMarkerScans: sync.Map{}, + hostTopology: detectHostTopology(), // Detect and cache host topology + vmStarters: vmStarters, + defaultHypervisor: defaultHypervisor, + now: time.Now, + writeFile: os.WriteFile, + meter: meter, + tracer: tracer, + guestMemoryPolicy: policy, + firecrackerSnapshotMemoryBackend: managerConfig.FirecrackerSnapshotMemoryBackend, + firecrackerUFFDPager: firecrackerUFFDPager, + snapshotDefaults: snapshotDefaults, + compressionJobs: make(map[string]*compressionJob), + nativeCodecPaths: make(map[string]string), + lifecycleEvents: newLifecycleSubscribersWithBufferSize(managerConfig.LifecycleEventBufferSize), + guestAgentReadyProbe: probeGuestAgentReady, } m.deleteSnapshotFn = m.deleteSnapshot @@ -224,7 +257,7 @@ func NewManagerWithConfig(p *paths.Paths, imageManager images.Manager, systemMan logger.FromContext(context.Background()).WarnContext(context.Background(), "failed to recover pending standby compression jobs", "error", err) } - return m + return m, nil } // SetResourceValidator sets the resource validator for aggregate limit checking. diff --git a/lib/instances/query.go b/lib/instances/query.go index 99d67d48..45dc13fc 100644 --- a/lib/instances/query.go +++ b/lib/instances/query.go @@ -83,6 +83,14 @@ func (m *manager) deriveStateWithOptions(ctx context.Context, stored *StoredMeta } return stateResult{State: StateStopped} } + if err := m.checkFirecrackerUFFDSessionHealth(ctx, stored); err != nil { + errMsg := err.Error() + log.WarnContext(ctx, "firecracker uffd session is unhealthy", + "instance_id", stored.Id, + "error", err, + ) + return stateResult{State: StateUnknown, Error: &errMsg} + } // 2. Socket exists - resolve hypervisor state, preferring the in-memory // cache (populated by lifecycle events and prior queries) and falling diff --git a/lib/instances/restore.go b/lib/instances/restore.go index d169242a..f04f8fe8 100644 --- a/lib/instances/restore.go +++ b/lib/instances/restore.go @@ -15,6 +15,7 @@ import ( "github.com/kernel/hypeman/lib/logger" "github.com/kernel/hypeman/lib/network" snapshotstore "github.com/kernel/hypeman/lib/snapshot" + "github.com/kernel/hypeman/lib/uffdpager" "go.opentelemetry.io/otel/attribute" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -241,6 +242,7 @@ func (m *manager) restoreInstance( var resumeNetworkAckWaiter *guestResumeNetworkUDPWaiter var resumeNetworkAckCfg *guestNetworkConfig resumeNetworkMailboxPatched := false + var uffdOverlays []uffdpager.OverlayPage if allocatedNet != nil && !stored.SkipGuestAgent && guestInitiatedResumeNetworkMailbox(stored) { resumeNetworkCfg, cfgErr := guestNetworkReconfigureConfig(allocatedNet) if cfgErr != nil { @@ -258,7 +260,17 @@ func (m *manager) restoreInstance( resumeNetworkAckCfg = resumeNetworkCfg payload.AckPort = resumeNetworkAckWaiter.Port() } - if patchErr := patchGuestResumeNetworkMailbox(snapshotDir, guestInitiatedResumeNetworkMailboxToken(stored), &payload); patchErr != nil { + var patchErr error + if m.useFirecrackerUFFD(stored) { + var overlay uffdpager.OverlayPage + overlay, patchErr = buildGuestResumeNetworkMailboxOverlay(snapshotDir, guestInitiatedResumeNetworkMailboxToken(stored), &payload) + if patchErr == nil { + uffdOverlays = append(uffdOverlays, overlay) + } + } else { + patchErr = patchGuestResumeNetworkMailbox(snapshotDir, guestInitiatedResumeNetworkMailboxToken(stored), &payload) + } + if patchErr != nil { if resumeNetworkAckWaiter != nil { resumeNetworkAckWaiter.Close() resumeNetworkAckWaiter = nil @@ -274,6 +286,11 @@ func (m *manager) restoreInstance( defer resumeNetworkAckWaiter.Close() } + if err := m.configureFirecrackerSnapshotRestore(stored, snapshotDir, uffdOverlays); err != nil { + releaseNetwork() + return nil, fmt.Errorf("configure firecracker snapshot memory backend: %w", err) + } + // 5. Transition: Standby → Paused (start hypervisor + restore) restoreCtx, restoreSpanEnd := m.startLifecycleStep(ctx, "restore_from_snapshot", attribute.String("instance_id", id), @@ -285,6 +302,7 @@ func (m *manager) restoreInstance( restoreSpanEnd(err) if err != nil { log.ErrorContext(ctx, "failed to restore from snapshot", "instance_id", id, "error", err) + m.closeFirecrackerUFFDSession(ctx, stored) // Cleanup network on failure releaseNetwork() return nil, err @@ -327,6 +345,7 @@ func (m *manager) restoreInstance( log.ErrorContext(ctx, "failed to resume VM", "instance_id", id, "error", err) // Cleanup on failure hv.Shutdown(ctx) + m.closeFirecrackerUFFDSession(ctx, stored) releaseNetwork() return nil, fmt.Errorf("resume vm failed: %w", err) } @@ -392,6 +411,7 @@ func (m *manager) restoreInstance( if reconfigureErr != nil { log.ErrorContext(ctx, "failed to configure guest network after restore", "instance_id", id, "error", reconfigureErr) _ = hv.Shutdown(ctx) + m.closeFirecrackerUFFDSession(ctx, stored) m.rollbackAdmissionAllocationActive(stored) releaseNetwork() return nil, fmt.Errorf("configure guest network after restore: %w", reconfigureErr) diff --git a/lib/instances/restore_egress_test.go b/lib/instances/restore_egress_test.go index b5fcdb4e..30f92b01 100644 --- a/lib/instances/restore_egress_test.go +++ b/lib/instances/restore_egress_test.go @@ -114,6 +114,47 @@ func TestPatchGuestResumeNetworkMailbox(t *testing.T) { assert.Equal(t, *payload, decoded) } +func TestBuildGuestResumeNetworkMailboxOverlayDoesNotMutateMemory(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + token := "overlay-token" + mem := make([]byte, 8192) + copy(mem[512:], guestResumeNetworkMailboxMagic) + copy(mem[512+len(guestResumeNetworkMailboxMagic):], token) + require.NoError(t, os.WriteFile(dir+"/"+firecrackerSnapshotMemoryFile, mem, 0644)) + + payload := &guestResumeNetworkPayload{ + InterfaceName: "eth0", + MAC: "02:00:00:85:17:c8", + IPv4: "10.102.146.62", + Prefix: 16, + Gateway: "10.102.0.1", + AckPort: 43210, + } + overlay, err := buildGuestResumeNetworkMailboxOverlay(dir, token, payload) + require.NoError(t, err) + assert.Equal(t, int64(0), overlay.GuestMemoryOffset) + + unchanged, err := os.ReadFile(dir + "/" + firecrackerSnapshotMemoryFile) + require.NoError(t, err) + assert.Equal(t, mem, unchanged) + + page, err := os.ReadFile(overlay.Path) + require.NoError(t, err) + require.Len(t, page, 4096) + + offset := 512 + require.Equal(t, uint32(1), binary.LittleEndian.Uint32(page[offset+guestResumeNetworkMailboxSeqOffset:])) + payloadLen := binary.LittleEndian.Uint32(page[offset+guestResumeNetworkMailboxLengthOffset:]) + require.NotZero(t, payloadLen) + + var decoded guestResumeNetworkPayload + err = json.Unmarshal(page[offset+guestResumeNetworkMailboxPayloadOffset:offset+guestResumeNetworkMailboxPayloadOffset+int(payloadLen)], &decoded) + require.NoError(t, err) + assert.Equal(t, *payload, decoded) +} + func TestRequiresRestoreConfigDiskRefresh(t *testing.T) { t.Parallel() diff --git a/lib/instances/snapshot.go b/lib/instances/snapshot.go index 523917b4..606ec8f6 100644 --- a/lib/instances/snapshot.go +++ b/lib/instances/snapshot.go @@ -450,6 +450,11 @@ func (m *manager) forkSnapshot(ctx context.Context, snapshotID string, req ForkS forkMeta.ExitCode = nil forkMeta.ExitMessage = "" forkMeta.RestartStatus = restartpolicy.Status{} + forkMeta.FirecrackerUFFDSessionID = "" + forkMeta.FirecrackerUFFDPagerVersion = "" + if rec.Snapshot.Kind != SnapshotKindStandby { + forkMeta.FirecrackerSnapshotCacheKey = "" + } if rec.Snapshot.Kind == SnapshotKindStandby { forkMeta.VsockCID = rec.StoredMetadata.VsockCID } else { diff --git a/lib/instances/standby.go b/lib/instances/standby.go index 03b27410..33328600 100644 --- a/lib/instances/standby.go +++ b/lib/instances/standby.go @@ -190,6 +190,7 @@ func (m *manager) standbyInstance( if dialer, err := hypervisor.NewVsockDialer(inst.HypervisorType, inst.VsockSocket, inst.VsockCID); err == nil { guest.CloseConn(dialer.Key()) } + m.closeFirecrackerUFFDSession(ctx, stored) // 9. Release network allocation (delete TAP device) // TAP devices with explicit Owner/Group fields do NOT auto-delete when VMM exits @@ -216,6 +217,9 @@ func (m *manager) standbyInstance( stored.StoppedAt = &now stored.HypervisorPID = nil stored.PendingStandbyCompression = nil + if err := m.refreshFirecrackerSnapshotCacheKey(stored, snapshotDir); err != nil { + log.WarnContext(ctx, "failed to refresh firecracker snapshot cache key", "instance_id", id, "error", err) + } if compressionPolicy != nil { stored.PendingStandbyCompression = &PendingStandbyCompression{ Policy: *cloneCompressionConfig(compressionPolicy), diff --git a/lib/instances/stop.go b/lib/instances/stop.go index 3613323b..8c128351 100644 --- a/lib/instances/stop.go +++ b/lib/instances/stop.go @@ -282,6 +282,7 @@ func (m *manager) stopInstance( _ = os.Remove(match) } } + m.closeFirecrackerUFFDSession(ctx, stored) // 9. Ensure terminal stop semantics: no snapshot should remain in Stopped state. // This prevents stale snapshot directories from deriving state as Standby and @@ -305,6 +306,7 @@ func (m *manager) stopInstance( // Boot markers are per-boot-run and must not carry across stop/restore/start. stored.ProgramStartedAt = nil stored.GuestAgentReadyAt = nil + stored.FirecrackerSnapshotCacheKey = "" stored.Phases.Record(phasetracking.PhaseStopped, now) meta = &metadata{StoredMetadata: *stored} diff --git a/lib/instances/types.go b/lib/instances/types.go index e38f5499..1c8e2917 100644 --- a/lib/instances/types.go +++ b/lib/instances/types.go @@ -117,6 +117,11 @@ type StoredMetadata struct { HypervisorVersion string // Hypervisor version (e.g., "v51.1") HypervisorPID *int // Hypervisor process ID (may be stale after host restart) + // Firecracker UFFD snapshot restore metadata. + FirecrackerSnapshotCacheKey string + FirecrackerUFFDSessionID string + FirecrackerUFFDPagerVersion string + // Paths SocketPath string // Path to API socket DataDir string // Instance data directory diff --git a/lib/paths/paths.go b/lib/paths/paths.go index adc070c4..7f22bf6b 100644 --- a/lib/paths/paths.go +++ b/lib/paths/paths.go @@ -103,6 +103,31 @@ func (p *Paths) FirecrackerBinary(version, arch string) string { return filepath.Join(p.dataDir, "system", "binaries", "firecracker", version, arch, "firecracker") } +// UFFDDir returns the root directory for Firecracker UFFD pager state. +func (p *Paths) UFFDDir() string { + return filepath.Join(p.dataDir, "uffd") +} + +func (p *Paths) UFFDPagerDir(versionKey string) string { + return filepath.Join(p.UFFDDir(), versionKey) +} + +func (p *Paths) UFFDControlSocket(versionKey string) string { + return filepath.Join(p.UFFDPagerDir(versionKey), "control.sock") +} + +func (p *Paths) UFFDPagerPID(versionKey string) string { + return filepath.Join(p.UFFDPagerDir(versionKey), "pager.pid") +} + +func (p *Paths) UFFDPagerLog(versionKey string) string { + return filepath.Join(p.UFFDPagerDir(versionKey), "pager.log") +} + +func (p *Paths) UFFDSessionsDir(versionKey string) string { + return filepath.Join(p.UFFDPagerDir(versionKey), "sessions") +} + // Image path methods // ImageDigestDir returns the directory for a specific image digest. diff --git a/lib/providers/providers.go b/lib/providers/providers.go index 72b8a426..33a909d5 100644 --- a/lib/providers/providers.go +++ b/lib/providers/providers.go @@ -138,10 +138,16 @@ func ProvideInstanceManager(p *paths.Paths, cfg *config.Config, imageManager ima ReclaimEnabled: cfg.Hypervisor.Memory.ReclaimEnabled, VZBalloonRequired: cfg.Hypervisor.Memory.VZBalloonRequired, } + var firecrackerUFFDCacheMaxBytes datasize.ByteSize + if err := firecrackerUFFDCacheMaxBytes.UnmarshalText([]byte(cfg.Hypervisor.FirecrackerUFFDCacheMaxBytes)); err != nil { + return nil, fmt.Errorf("failed to parse hypervisor.firecracker_uffd_cache_max_bytes %q: %w", cfg.Hypervisor.FirecrackerUFFDCacheMaxBytes, err) + } managerConfig := instances.ManagerConfig{ - LifecycleEventBufferSize: cfg.Instances.LifecycleEventBufferSize, + LifecycleEventBufferSize: cfg.Instances.LifecycleEventBufferSize, + FirecrackerSnapshotMemoryBackend: cfg.Hypervisor.FirecrackerSnapshotMemoryBackend, + FirecrackerUFFDCacheMaxBytes: int64(firecrackerUFFDCacheMaxBytes), } - return instances.NewManagerWithConfig(p, imageManager, systemManager, networkManager, deviceManager, volumeManager, limits, defaultHypervisor, snapshotDefaults, managerConfig, meter, tracer, memoryPolicy), nil + return instances.NewManagerWithConfigE(p, imageManager, systemManager, networkManager, deviceManager, volumeManager, limits, defaultHypervisor, snapshotDefaults, managerConfig, meter, tracer, memoryPolicy) } func snapshotDefaultsFromConfig(cfg *config.Config) instances.SnapshotPolicy { diff --git a/lib/uffdpager/cache.go b/lib/uffdpager/cache.go new file mode 100644 index 00000000..01aedc19 --- /dev/null +++ b/lib/uffdpager/cache.go @@ -0,0 +1,95 @@ +package uffdpager + +import ( + "container/list" + "sync" +) + +type pageKey struct { + cacheKey string + offset int64 + size int +} + +type cacheEntry struct { + key pageKey + data []byte +} + +type PageCache struct { + mu sync.Mutex + maxBytes int64 + bytes int64 + items map[pageKey]*list.Element + lru *list.List + hits int64 + misses int64 +} + +func NewPageCache(maxBytes int64) *PageCache { + return &PageCache{ + maxBytes: normalizeCacheMaxBytes(maxBytes), + items: make(map[pageKey]*list.Element), + lru: list.New(), + } +} + +func (c *PageCache) Get(cacheKey string, offset int64, size int) ([]byte, bool) { + key := pageKey{cacheKey: cacheKey, offset: offset, size: size} + + c.mu.Lock() + defer c.mu.Unlock() + + elem, ok := c.items[key] + if !ok { + c.misses++ + return nil, false + } + c.hits++ + c.lru.MoveToFront(elem) + entry := elem.Value.(*cacheEntry) + data := append([]byte(nil), entry.data...) + return data, true +} + +func (c *PageCache) Add(cacheKey string, offset int64, data []byte) { + if len(data) == 0 { + return + } + key := pageKey{cacheKey: cacheKey, offset: offset, size: len(data)} + value := append([]byte(nil), data...) + + c.mu.Lock() + defer c.mu.Unlock() + + if elem, ok := c.items[key]; ok { + entry := elem.Value.(*cacheEntry) + c.bytes -= int64(len(entry.data)) + entry.data = value + c.bytes += int64(len(entry.data)) + c.lru.MoveToFront(elem) + c.evictLocked() + return + } + + elem := c.lru.PushFront(&cacheEntry{key: key, data: value}) + c.items[key] = elem + c.bytes += int64(len(value)) + c.evictLocked() +} + +func (c *PageCache) SnapshotStats() (bytes, maxBytes int64, items int, hits, misses int64) { + c.mu.Lock() + defer c.mu.Unlock() + return c.bytes, c.maxBytes, len(c.items), c.hits, c.misses +} + +func (c *PageCache) evictLocked() { + for c.bytes > c.maxBytes && c.lru.Len() > 0 { + elem := c.lru.Back() + entry := elem.Value.(*cacheEntry) + delete(c.items, entry.key) + c.bytes -= int64(len(entry.data)) + c.lru.Remove(elem) + } +} diff --git a/lib/uffdpager/cache_test.go b/lib/uffdpager/cache_test.go new file mode 100644 index 00000000..f360b8e3 --- /dev/null +++ b/lib/uffdpager/cache_test.go @@ -0,0 +1,46 @@ +package uffdpager + +import ( + "bytes" + "testing" +) + +func TestPageCacheSharesPagesByCacheKeyAndOffset(t *testing.T) { + cache := NewPageCache(8192) + page := bytes.Repeat([]byte{7}, 4096) + + cache.Add("snapshot-a", 0, page) + got, ok := cache.Get("snapshot-a", 0, 4096) + if !ok { + t.Fatalf("expected cache hit") + } + if !bytes.Equal(got, page) { + t.Fatalf("cached page mismatch") + } + + got[0] = 1 + again, ok := cache.Get("snapshot-a", 0, 4096) + if !ok { + t.Fatalf("expected second cache hit") + } + if again[0] != 7 { + t.Fatalf("cache returned mutable backing slice") + } +} + +func TestPageCacheEvictsLRUWhenBounded(t *testing.T) { + cache := NewPageCache(8192) + cache.Add("snapshot-a", 0, bytes.Repeat([]byte{1}, 4096)) + cache.Add("snapshot-a", 4096, bytes.Repeat([]byte{2}, 4096)) + if _, ok := cache.Get("snapshot-a", 0, 4096); !ok { + t.Fatalf("expected first page before eviction") + } + + cache.Add("snapshot-a", 8192, bytes.Repeat([]byte{3}, 4096)) + if _, ok := cache.Get("snapshot-a", 4096, 4096); ok { + t.Fatalf("expected least recently used page to be evicted") + } + if _, ok := cache.Get("snapshot-a", 0, 4096); !ok { + t.Fatalf("expected recently used page to remain") + } +} diff --git a/lib/uffdpager/server_linux.go b/lib/uffdpager/server_linux.go new file mode 100644 index 00000000..7ce2a87a --- /dev/null +++ b/lib/uffdpager/server_linux.go @@ -0,0 +1,675 @@ +//go:build linux + +package uffdpager + +import ( + "context" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "log" + "net" + "net/http" + "os" + "path/filepath" + "sort" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + "unsafe" + + "github.com/go-chi/chi/v5" + "golang.org/x/sys/unix" +) + +const ( + uffdEventPagefault = 0x12 + uffdEventRemove = 0x15 + uffdEventUnmap = 0x16 + uffdioCopy = 0xc028aa03 +) + +type guestRegionUffdMapping struct { + BaseHostVirtAddr uint64 `json:"base_host_virt_addr"` + Size uint64 `json:"size"` + Offset uint64 `json:"offset"` + PageSize uint64 `json:"page_size"` + PageSizeKiB uint64 `json:"page_size_kib,omitempty"` +} + +type uffdioCopyArgs struct { + dst uint64 + src uint64 + len uint64 + mode uint64 + copy int64 +} + +type server struct { + dataDir string + versionKey string + cache *PageCache + controlSocket string + sessionRoot string + httpServer *http.Server + + mu sync.Mutex + sessions map[string]*session + draining bool + + faults atomic.Int64 + overlayFaults atomic.Int64 + backingBytesRead atomic.Int64 + copies atomic.Int64 + copyErrors atomic.Int64 +} + +type session struct { + id string + instanceID string + backingMemoryPath string + cacheKey string + socketPath string + listener *net.UnixListener + backingFile *os.File + overlays map[int64][]byte + server *server + done chan struct{} + closeOnce sync.Once + uffdFD int + conn *net.UnixConn +} + +type uffdEvent struct { + kind byte + addr int64 +} + +func Main(args []string) error { + fs := flag.NewFlagSet("internal-uffd-pager", flag.ContinueOnError) + dataDir := fs.String("data-dir", "", "hypeman data directory") + versionKey := fs.String("version-key", "", "pager version key") + cacheMaxBytes := fs.Int64("cache-max-bytes", defaultCacheMaxBytes, "maximum shared page cache bytes") + if err := fs.Parse(args); err != nil { + return err + } + if strings.TrimSpace(*dataDir) == "" { + return fmt.Errorf("--data-dir is required") + } + if strings.TrimSpace(*versionKey) == "" { + return fmt.Errorf("--version-key is required") + } + + s := newServer(*dataDir, *versionKey, *cacheMaxBytes) + return s.run() +} + +func newServer(dataDir, versionKey string, cacheMaxBytes int64) *server { + dir := pagerVersionDir(dataDir, versionKey) + return &server{ + dataDir: dataDir, + versionKey: versionKey, + cache: NewPageCache(cacheMaxBytes), + controlSocket: filepath.Join(dir, controlSocketFile), + sessionRoot: filepath.Join(dir, sessionsDir), + sessions: make(map[string]*session), + } +} + +func (s *server) run() error { + if err := os.MkdirAll(s.sessionRoot, 0755); err != nil { + return fmt.Errorf("create uffd session directory: %w", err) + } + _ = os.Remove(s.controlSocket) + listener, err := net.Listen("unix", s.controlSocket) + if err != nil { + return fmt.Errorf("listen on uffd control socket: %w", err) + } + defer listener.Close() + + _ = os.WriteFile(filepath.Join(filepath.Dir(s.controlSocket), pagerPIDFile), []byte(fmt.Sprintf("%d\n", os.Getpid())), 0644) + + router := chi.NewRouter() + router.Get("/health", s.handleHealth) + router.Get("/stats", s.handleStats) + router.Post("/sessions", s.handleCreateSession) + router.Post("/sessions/{id}/close", s.handleCloseSession) + router.Post("/drain", s.handleDrain) + + s.httpServer = &http.Server{Handler: router} + err = s.httpServer.Serve(listener) + if errors.Is(err, http.ErrServerClosed) { + return nil + } + return err +} + +func (s *server) handleHealth(w http.ResponseWriter, r *http.Request) { + s.writeJSON(w, http.StatusOK, HealthResponse{ + Version: s.versionKey, + Draining: s.isDraining(), + ActiveSessions: s.activeSessions(), + }) +} + +func (s *server) handleStats(w http.ResponseWriter, r *http.Request) { + cacheBytes, cacheMax, cacheItems, hits, misses := s.cache.SnapshotStats() + s.writeJSON(w, http.StatusOK, Stats{ + Version: s.versionKey, + Draining: s.isDraining(), + ActiveSessions: s.activeSessions(), + CacheBytes: cacheBytes, + CacheMax: cacheMax, + CacheItems: cacheItems, + CacheHits: hits, + CacheMisses: misses, + Faults: s.faults.Load(), + OverlayFaults: s.overlayFaults.Load(), + BackingBytesRead: s.backingBytesRead.Load(), + Copies: s.copies.Load(), + CopyErrors: s.copyErrors.Load(), + }) +} + +func (s *server) handleCreateSession(w http.ResponseWriter, r *http.Request) { + if s.isDraining() { + http.Error(w, "pager is draining", http.StatusServiceUnavailable) + return + } + + var req CreateSessionRequest + if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<20)).Decode(&req); err != nil { + http.Error(w, fmt.Sprintf("decode request: %v", err), http.StatusBadRequest) + return + } + if strings.TrimSpace(req.SessionID) == "" { + req.SessionID = req.InstanceID + } + if strings.TrimSpace(req.SessionID) == "" { + http.Error(w, "session_id or instance_id is required", http.StatusBadRequest) + return + } + if strings.TrimSpace(req.BackingMemoryPath) == "" { + http.Error(w, "backing_memory_path is required", http.StatusBadRequest) + return + } + if strings.TrimSpace(req.CacheKey) == "" { + http.Error(w, "cache_key is required", http.StatusBadRequest) + return + } + + created, err := s.createSession(req) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.writeJSON(w, http.StatusOK, CreateSessionResponse{ + SessionID: created.id, + UFFDSocketPath: created.socketPath, + PagerVersion: s.versionKey, + }) +} + +func (s *server) handleCloseSession(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + s.closeSession(id) + w.WriteHeader(http.StatusNoContent) +} + +func (s *server) handleDrain(w http.ResponseWriter, r *http.Request) { + s.mu.Lock() + s.draining = true + shouldExit := len(s.sessions) == 0 + s.mu.Unlock() + + if shouldExit { + s.shutdownSoon() + } + w.WriteHeader(http.StatusNoContent) +} + +func (s *server) createSession(req CreateSessionRequest) (*session, error) { + id := sanitizeSessionID(req.SessionID) + socketPath := filepath.Join(s.sessionRoot, id+".sock") + _ = os.Remove(socketPath) + addr := net.UnixAddr{Name: socketPath, Net: "unix"} + listener, err := net.ListenUnix("unix", &addr) + if err != nil { + return nil, fmt.Errorf("listen for uffd session %s: %w", id, err) + } + + overlays := make(map[int64][]byte, len(req.Overlays)) + for _, overlay := range req.Overlays { + if overlay.GuestMemoryOffset < 0 || strings.TrimSpace(overlay.Path) == "" { + listener.Close() + return nil, fmt.Errorf("invalid overlay page: offset=%d path=%q", overlay.GuestMemoryOffset, overlay.Path) + } + data, err := os.ReadFile(overlay.Path) + if err != nil { + listener.Close() + return nil, fmt.Errorf("read overlay page %q: %w", overlay.Path, err) + } + overlays[overlay.GuestMemoryOffset] = data + } + + sess := &session{ + id: id, + instanceID: req.InstanceID, + backingMemoryPath: req.BackingMemoryPath, + cacheKey: req.CacheKey, + socketPath: socketPath, + listener: listener, + overlays: overlays, + server: s, + done: make(chan struct{}), + uffdFD: -1, + } + + s.mu.Lock() + if existing := s.sessions[id]; existing != nil { + existing.close() + } + s.sessions[id] = sess + s.mu.Unlock() + + go sess.run() + return sess, nil +} + +func (s *server) closeSession(id string) { + id = sanitizeSessionID(id) + s.mu.Lock() + sess := s.sessions[id] + delete(s.sessions, id) + shouldExit := s.draining && len(s.sessions) == 0 + s.mu.Unlock() + if sess != nil { + sess.close() + } + if shouldExit { + s.shutdownSoon() + } +} + +func (s *server) removeSession(sess *session) { + s.mu.Lock() + if current := s.sessions[sess.id]; current == sess { + delete(s.sessions, sess.id) + } + shouldExit := s.draining && len(s.sessions) == 0 + s.mu.Unlock() + if shouldExit { + s.shutdownSoon() + } +} + +func (s *server) shutdownSoon() { + go func() { + time.Sleep(50 * time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _ = s.httpServer.Shutdown(ctx) + }() +} + +func (s *server) activeSessions() int { + s.mu.Lock() + defer s.mu.Unlock() + return len(s.sessions) +} + +func (s *server) isDraining() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.draining +} + +func (s *server) writeJSON(w http.ResponseWriter, status int, value any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + if err := json.NewEncoder(w).Encode(value); err != nil { + log.Printf("write json response: %v", err) + } +} + +func (s *session) run() { + defer func() { + s.close() + s.server.removeSession(s) + }() + + conn, err := s.listener.AcceptUnix() + if err != nil { + return + } + s.conn = conn + + file, err := os.Open(s.backingMemoryPath) + if err != nil { + log.Printf("uffd session %s open backing memory: %v", s.id, err) + return + } + s.backingFile = file + + mappings, uffdFD, err := recvMappingsAndFD(conn) + if err != nil { + log.Printf("uffd session %s receive mappings and fd: %v", s.id, err) + return + } + s.uffdFD = uffdFD + + sort.Slice(mappings, func(i, j int) bool { + return mappings[i].BaseHostVirtAddr < mappings[j].BaseHostVirtAddr + }) + s.handleFaults(mappings) +} + +func (s *session) handleFaults(mappings []guestRegionUffdMapping) { + fd := s.uffdFD + _ = unix.SetNonblock(fd, true) + pollFDs := []unix.PollFd{{Fd: int32(fd), Events: unix.POLLIN}} + buf := make([]byte, 64) + var deferred []uffdEvent + for { + if len(deferred) == 0 { + n, err := unix.Poll(pollFDs, -1) + if err != nil { + if err == unix.EINTR { + continue + } + return + } + if n == 0 || pollFDs[0].Revents&unix.POLLIN == 0 { + if pollFDs[0].Revents&(unix.POLLHUP|unix.POLLERR|unix.POLLNVAL) != 0 { + return + } + continue + } + } + + events := deferred + deferred = nil + for { + event, ok, err := readUFFDEvent(fd, buf) + if err != nil { + log.Printf("uffd session %s read uffd event: %v", s.id, err) + return + } + if !ok { + break + } + events = append(events, event) + } + + for _, event := range events { + switch event.kind { + case uffdEventPagefault: + if err := s.servePageFault(mappings, event.addr); err != nil { + if errors.Is(err, unix.EAGAIN) { + deferred = append(deferred, event) + continue + } + s.server.copyErrors.Add(1) + log.Printf("uffd session %s page fault at %#x: %v", s.id, event.addr, err) + } + case uffdEventRemove, uffdEventUnmap: + // Reading remove/unmap events clears the pending state that can + // make UFFDIO_COPY return EAGAIN. + default: + log.Printf("uffd session %s ignoring unexpected uffd event %#x", s.id, event.kind) + } + } + + if len(deferred) > 0 { + time.Sleep(time.Millisecond) + } + } +} + +func readUFFDEvent(fd int, buf []byte) (uffdEvent, bool, error) { + read, err := unix.Read(fd, buf) + if err != nil { + if err == unix.EINTR { + return uffdEvent{}, false, nil + } + if err == unix.EAGAIN || err == unix.EWOULDBLOCK { + return uffdEvent{}, false, nil + } + return uffdEvent{}, false, err + } + if read == 0 { + return uffdEvent{}, false, io.EOF + } + if read < 32 { + return uffdEvent{}, false, fmt.Errorf("short uffd event read: %d bytes", read) + } + event := uffdEvent{kind: buf[0]} + if event.kind == uffdEventPagefault { + event.addr = int64(nativeEndian.Uint64(buf[16:24])) + } + return event, true, nil +} + +func (s *session) servePageFault(mappings []guestRegionUffdMapping, faultAddr int64) error { + mapping, pageAddr, pageOffset, pageSize, ok := findMapping(mappings, faultAddr) + if !ok { + return fmt.Errorf("fault address %#x outside guest mappings", faultAddr) + } + _ = mapping + + s.server.faults.Add(1) + page, overlay, err := s.readPage(pageOffset, pageSize) + if err != nil { + return err + } + if overlay { + s.server.overlayFaults.Add(1) + } + if err := uffdCopy(s.uffdFD, uint64(pageAddr), page); err != nil { + return err + } + s.server.copies.Add(1) + return nil +} + +func (s *session) readPage(offset int64, size int) ([]byte, bool, error) { + if page, ok := s.overlays[offset]; ok { + if len(page) != size { + return nil, true, fmt.Errorf("overlay page at offset %d has size %d, expected %d", offset, len(page), size) + } + return append([]byte(nil), page...), true, nil + } + if page, ok := s.server.cache.Get(s.cacheKey, offset, size); ok { + return page, false, nil + } + + page := make([]byte, size) + n, err := s.backingFile.ReadAt(page, offset) + if err != nil && !errors.Is(err, io.EOF) { + return nil, false, fmt.Errorf("read backing page at %d: %w", offset, err) + } + if n > 0 { + s.server.backingBytesRead.Add(int64(n)) + } + s.server.cache.Add(s.cacheKey, offset, page) + return page, false, nil +} + +func (s *session) close() { + s.closeOnce.Do(func() { + if s.listener != nil { + _ = s.listener.Close() + } + if s.conn != nil { + _ = s.conn.Close() + } + if s.uffdFD >= 0 { + _ = unix.Close(s.uffdFD) + } + if s.backingFile != nil { + _ = s.backingFile.Close() + } + _ = os.Remove(s.socketPath) + close(s.done) + }) +} + +func recvMappingsAndFD(conn *net.UnixConn) ([]guestRegionUffdMapping, int, error) { + buf := make([]byte, 128<<10) + oob := make([]byte, unix.CmsgSpace(4)) + n, oobn, _, _, err := conn.ReadMsgUnix(buf, oob) + if err != nil { + return nil, -1, err + } + if n == 0 { + return nil, -1, fmt.Errorf("empty mapping payload") + } + + msgs, err := unix.ParseSocketControlMessage(oob[:oobn]) + if err != nil { + return nil, -1, err + } + var fds []int + for _, msg := range msgs { + rights, err := unix.ParseUnixRights(&msg) + if err != nil { + continue + } + fds = append(fds, rights...) + } + if len(fds) == 0 { + return nil, -1, fmt.Errorf("no uffd file descriptor received") + } + for _, extra := range fds[1:] { + _ = unix.Close(extra) + } + + mappings, err := decodeMappings(buf[:n]) + if err != nil { + _ = unix.Close(fds[0]) + return nil, -1, err + } + return mappings, fds[0], nil +} + +func decodeMappings(data []byte) ([]guestRegionUffdMapping, error) { + var mappings []guestRegionUffdMapping + if err := json.Unmarshal(data, &mappings); err == nil { + return normalizeMappings(mappings) + } + + var wrapped struct { + Mappings []guestRegionUffdMapping `json:"mappings"` + } + if err := json.Unmarshal(data, &wrapped); err != nil { + return nil, fmt.Errorf("decode uffd mappings: %w", err) + } + return normalizeMappings(wrapped.Mappings) +} + +func normalizeMappings(mappings []guestRegionUffdMapping) ([]guestRegionUffdMapping, error) { + if len(mappings) == 0 { + return nil, fmt.Errorf("no uffd mappings received") + } + for i := range mappings { + if mappings[i].PageSize == 0 { + mappings[i].PageSize = mappings[i].PageSizeKiB + } + if mappings[i].PageSize == 0 { + mappings[i].PageSize = uint64(os.Getpagesize()) + } + if mappings[i].Size == 0 { + return nil, fmt.Errorf("mapping %d has zero size", i) + } + if mappings[i].PageSize&(mappings[i].PageSize-1) != 0 { + return nil, fmt.Errorf("mapping %d page size %d is not a power of two", i, mappings[i].PageSize) + } + } + return mappings, nil +} + +func findMapping(mappings []guestRegionUffdMapping, faultAddr int64) (guestRegionUffdMapping, int64, int64, int, bool) { + for _, mapping := range mappings { + start := int64(mapping.BaseHostVirtAddr) + end := start + int64(mapping.Size) + if faultAddr < start || faultAddr >= end { + continue + } + pageSize := int64(mapping.PageSize) + pageAddr := faultAddr &^ (pageSize - 1) + pageOffset := int64(mapping.Offset) + (pageAddr - start) + return mapping, pageAddr, pageOffset, int(pageSize), true + } + return guestRegionUffdMapping{}, 0, 0, 0, false +} + +func uffdCopy(fd int, dst uint64, page []byte) error { + if len(page) == 0 { + return fmt.Errorf("empty page") + } + args := uffdioCopyArgs{ + dst: dst, + src: uint64(uintptr(unsafe.Pointer(&page[0]))), + len: uint64(len(page)), + } + _, _, errno := unix.Syscall(unix.SYS_IOCTL, uintptr(fd), uintptr(uffdioCopy), uintptr(unsafe.Pointer(&args))) + if errno == 0 || errno == syscall.EEXIST { + return nil + } + return errno +} + +func sanitizeSessionID(id string) string { + id = strings.TrimSpace(id) + if id == "" { + return fmt.Sprintf("session-%d", time.Now().UnixNano()) + } + return strings.Map(func(r rune) rune { + switch { + case r >= 'a' && r <= 'z': + return r + case r >= 'A' && r <= 'Z': + return r + case r >= '0' && r <= '9': + return r + case r == '-', r == '_', r == '.': + return r + default: + return '-' + } + }, id) +} + +var nativeEndian = nativeByteOrder() + +type byteOrder interface { + Uint64([]byte) uint64 +} + +func nativeByteOrder() byteOrder { + var x uint16 = 0x1 + b := (*[2]byte)(unsafe.Pointer(&x)) + if b[0] == 0x1 { + return littleEndian{} + } + return bigEndian{} +} + +type littleEndian struct{} + +func (littleEndian) Uint64(b []byte) uint64 { + return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 | + uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56 +} + +type bigEndian struct{} + +func (bigEndian) Uint64(b []byte) uint64 { + return uint64(b[7]) | uint64(b[6])<<8 | uint64(b[5])<<16 | uint64(b[4])<<24 | + uint64(b[3])<<32 | uint64(b[2])<<40 | uint64(b[1])<<48 | uint64(b[0])<<56 +} diff --git a/lib/uffdpager/supervisor_linux.go b/lib/uffdpager/supervisor_linux.go new file mode 100644 index 00000000..2b7a6945 --- /dev/null +++ b/lib/uffdpager/supervisor_linux.go @@ -0,0 +1,270 @@ +//go:build linux + +package uffdpager + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "sync" + "syscall" + "time" +) + +const ( + controlSocketFile = "control.sock" + pagerPIDFile = "pager.pid" + pagerLogFile = "pager.log" + sessionsDir = "sessions" +) + +type Supervisor struct { + dataDir string + versionKey string + cacheMaxBytes int64 + controlSocket string + executable string + + mu sync.Mutex + clients map[string]*http.Client +} + +func NewSupervisor(ctx context.Context, dataDir string, cacheMaxBytes int64) (*Supervisor, error) { + exe, err := os.Executable() + if err != nil { + return nil, fmt.Errorf("resolve executable path: %w", err) + } + versionKey, err := executableVersionKey(exe) + if err != nil { + return nil, err + } + s := &Supervisor{ + dataDir: dataDir, + versionKey: versionKey, + cacheMaxBytes: normalizeCacheMaxBytes(cacheMaxBytes), + controlSocket: pagerControlSocket(dataDir, versionKey), + executable: exe, + clients: make(map[string]*http.Client), + } + if err := s.ensureRunning(ctx); err != nil { + return nil, err + } + s.drainOlderPagers(ctx) + return s, nil +} + +func (s *Supervisor) VersionKey() string { + if s == nil { + return "" + } + return s.versionKey +} + +func (s *Supervisor) CreateSession(ctx context.Context, req CreateSessionRequest) (*CreateSessionResponse, error) { + if s == nil { + return nil, fmt.Errorf("uffd pager supervisor is nil") + } + if strings.TrimSpace(req.SessionID) == "" { + req.SessionID = req.InstanceID + } + var resp CreateSessionResponse + if err := s.doJSON(ctx, s.versionKey, http.MethodPost, "/sessions", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (s *Supervisor) CloseSession(ctx context.Context, sessionID string) error { + if s == nil || strings.TrimSpace(sessionID) == "" { + return nil + } + return s.CloseSessionVersion(ctx, s.versionKey, sessionID) +} + +func (s *Supervisor) CloseSessionVersion(ctx context.Context, versionKey, sessionID string) error { + if s == nil || strings.TrimSpace(versionKey) == "" || strings.TrimSpace(sessionID) == "" { + return nil + } + path := "/sessions/" + urlPathEscape(sessionID) + "/close" + return s.doJSON(ctx, versionKey, http.MethodPost, path, nil, nil) +} + +func (s *Supervisor) Stats(ctx context.Context) (*Stats, error) { + if s == nil { + return nil, fmt.Errorf("uffd pager supervisor is nil") + } + var stats Stats + if err := s.doJSON(ctx, s.versionKey, http.MethodGet, "/stats", nil, &stats); err != nil { + return nil, err + } + return &stats, nil +} + +func (s *Supervisor) HealthVersion(ctx context.Context, versionKey string) (*HealthResponse, error) { + if s == nil { + return nil, fmt.Errorf("uffd pager supervisor is nil") + } + var health HealthResponse + if err := s.doJSON(ctx, versionKey, http.MethodGet, "/health", nil, &health); err != nil { + return nil, err + } + return &health, nil +} + +func (s *Supervisor) ensureRunning(ctx context.Context) error { + if s.isHealthy(ctx, s.versionKey) { + return nil + } + + dir := pagerVersionDir(s.dataDir, s.versionKey) + if err := os.MkdirAll(filepath.Join(dir, sessionsDir), 0755); err != nil { + return fmt.Errorf("create uffd pager directory: %w", err) + } + _ = os.Remove(s.controlSocket) + + logFile, err := os.OpenFile(filepath.Join(dir, pagerLogFile), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return fmt.Errorf("open uffd pager log: %w", err) + } + defer logFile.Close() + + cmd := exec.Command( + s.executable, + "--internal-uffd-pager", + "--data-dir", s.dataDir, + "--version-key", s.versionKey, + "--cache-max-bytes", strconv.FormatInt(s.cacheMaxBytes, 10), + ) + cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true} + cmd.Stdout = logFile + cmd.Stderr = logFile + if err := cmd.Start(); err != nil { + return fmt.Errorf("start uffd pager: %w", err) + } + _ = os.WriteFile(filepath.Join(dir, pagerPIDFile), []byte(strconv.Itoa(cmd.Process.Pid)+"\n"), 0644) + _ = cmd.Process.Release() + + deadline := time.Now().Add(5 * time.Second) + var lastErr error + for time.Now().Before(deadline) { + if s.isHealthy(ctx, s.versionKey) { + return nil + } + lastErr = ctx.Err() + if lastErr != nil { + break + } + time.Sleep(20 * time.Millisecond) + } + if lastErr != nil { + return fmt.Errorf("wait for uffd pager health: %w", lastErr) + } + return fmt.Errorf("uffd pager did not become healthy at %s", s.controlSocket) +} + +func (s *Supervisor) isHealthy(ctx context.Context, versionKey string) bool { + var health HealthResponse + return s.doJSON(ctx, versionKey, http.MethodGet, "/health", nil, &health) == nil +} + +func (s *Supervisor) drainOlderPagers(ctx context.Context) { + root := filepath.Join(s.dataDir, "uffd") + entries, err := os.ReadDir(root) + if err != nil { + return + } + for _, entry := range entries { + if !entry.IsDir() || entry.Name() == s.versionKey { + continue + } + _ = s.doJSON(ctx, entry.Name(), http.MethodPost, "/drain", nil, nil) + } +} + +func (s *Supervisor) doJSON(ctx context.Context, versionKey, method, path string, body any, out any) error { + client := s.clientForVersion(versionKey) + var reader io.Reader + if body != nil { + data, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("marshal uffd pager request: %w", err) + } + reader = bytes.NewReader(data) + } + req, err := http.NewRequestWithContext(ctx, method, "http://unix"+path, reader) + if err != nil { + return err + } + if body != nil { + req.Header.Set("Content-Type", "application/json") + } + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + data, _ := io.ReadAll(io.LimitReader(resp.Body, 64<<10)) + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("uffd pager %s %s returned %s: %s", method, path, resp.Status, strings.TrimSpace(string(data))) + } + if out == nil || len(data) == 0 { + return nil + } + if err := json.Unmarshal(data, out); err != nil { + return fmt.Errorf("decode uffd pager response: %w", err) + } + return nil +} + +func (s *Supervisor) clientForVersion(versionKey string) *http.Client { + s.mu.Lock() + defer s.mu.Unlock() + if client := s.clients[versionKey]; client != nil { + return client + } + socketPath := pagerControlSocket(s.dataDir, versionKey) + client := &http.Client{ + Transport: &http.Transport{ + DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { + var d net.Dialer + return d.DialContext(ctx, "unix", socketPath) + }, + }, + Timeout: 10 * time.Second, + } + s.clients[versionKey] = client + return client +} + +func executableVersionKey(path string) (string, error) { + data, err := os.ReadFile(path) + if err != nil { + return "", fmt.Errorf("hash executable %q: %w", path, err) + } + sum := sha256.Sum256(data) + return hex.EncodeToString(sum[:])[:12], nil +} + +func pagerVersionDir(dataDir, versionKey string) string { + return filepath.Join(dataDir, "uffd", versionKey) +} + +func pagerControlSocket(dataDir, versionKey string) string { + return filepath.Join(pagerVersionDir(dataDir, versionKey), controlSocketFile) +} + +func urlPathEscape(value string) string { + replacer := strings.NewReplacer("%", "%25", "/", "%2F", "?", "%3F", "#", "%23") + return replacer.Replace(value) +} diff --git a/lib/uffdpager/supervisor_unsupported.go b/lib/uffdpager/supervisor_unsupported.go new file mode 100644 index 00000000..ece0704c --- /dev/null +++ b/lib/uffdpager/supervisor_unsupported.go @@ -0,0 +1,42 @@ +//go:build !linux + +package uffdpager + +import ( + "context" + "fmt" +) + +type Supervisor struct{} + +func NewSupervisor(context.Context, string, int64) (*Supervisor, error) { + return nil, fmt.Errorf("uffd pager is only supported on linux") +} + +func (s *Supervisor) VersionKey() string { + return "" +} + +func (s *Supervisor) CreateSession(context.Context, CreateSessionRequest) (*CreateSessionResponse, error) { + return nil, fmt.Errorf("uffd pager is only supported on linux") +} + +func (s *Supervisor) CloseSession(context.Context, string) error { + return nil +} + +func (s *Supervisor) CloseSessionVersion(context.Context, string, string) error { + return nil +} + +func (s *Supervisor) Stats(context.Context) (*Stats, error) { + return nil, fmt.Errorf("uffd pager is only supported on linux") +} + +func (s *Supervisor) HealthVersion(context.Context, string) (*HealthResponse, error) { + return nil, fmt.Errorf("uffd pager is only supported on linux") +} + +func Main([]string) error { + return fmt.Errorf("uffd pager is only supported on linux") +} diff --git a/lib/uffdpager/types.go b/lib/uffdpager/types.go new file mode 100644 index 00000000..12dcc49a --- /dev/null +++ b/lib/uffdpager/types.go @@ -0,0 +1,62 @@ +package uffdpager + +const ( + BackendFile = "file" + BackendUFFD = "uffd" + + defaultCacheMaxBytes = int64(4 << 30) +) + +// OverlayPage replaces a single snapshot memory page for one restore session. +type OverlayPage struct { + GuestMemoryOffset int64 `json:"guest_memory_offset"` + Path string `json:"path"` +} + +// CreateSessionRequest describes one Firecracker UFFD restore session. +type CreateSessionRequest struct { + SessionID string `json:"session_id,omitempty"` + InstanceID string `json:"instance_id"` + BackingMemoryPath string `json:"backing_memory_path"` + CacheKey string `json:"cache_key"` + Overlays []OverlayPage `json:"overlays,omitempty"` +} + +// CreateSessionResponse returns the per-session socket Firecracker should use +// as mem_backend.backend_path. +type CreateSessionResponse struct { + SessionID string `json:"session_id"` + UFFDSocketPath string `json:"uffd_socket_path"` + PagerVersion string `json:"pager_version"` +} + +type HealthResponse struct { + Version string `json:"version"` + Draining bool `json:"draining"` + ActiveSessions int `json:"active_sessions"` +} + +type Stats struct { + Version string `json:"version"` + Draining bool `json:"draining"` + ActiveSessions int `json:"active_sessions"` + + CacheBytes int64 `json:"cache_bytes"` + CacheMax int64 `json:"cache_max"` + CacheItems int `json:"cache_items"` + CacheHits int64 `json:"cache_hits"` + CacheMisses int64 `json:"cache_misses"` + + Faults int64 `json:"faults"` + OverlayFaults int64 `json:"overlay_faults"` + BackingBytesRead int64 `json:"backing_bytes_read"` + Copies int64 `json:"copies"` + CopyErrors int64 `json:"copy_errors"` +} + +func normalizeCacheMaxBytes(v int64) int64 { + if v <= 0 { + return defaultCacheMaxBytes + } + return v +} From a258b9e602e2df56c136b123330c9d3bd75a141e Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Sun, 31 May 2026 22:12:18 -0400 Subject: [PATCH 2/2] Fix UFFD event reads --- lib/uffdpager/server_linux.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/uffdpager/server_linux.go b/lib/uffdpager/server_linux.go index 7ce2a87a..e39e40a9 100644 --- a/lib/uffdpager/server_linux.go +++ b/lib/uffdpager/server_linux.go @@ -31,6 +31,7 @@ const ( uffdEventRemove = 0x15 uffdEventUnmap = 0x16 uffdioCopy = 0xc028aa03 + uffdMsgSize = 32 ) type guestRegionUffdMapping struct { @@ -373,7 +374,7 @@ func (s *session) handleFaults(mappings []guestRegionUffdMapping) { fd := s.uffdFD _ = unix.SetNonblock(fd, true) pollFDs := []unix.PollFd{{Fd: int32(fd), Events: unix.POLLIN}} - buf := make([]byte, 64) + buf := make([]byte, uffdMsgSize) var deferred []uffdEvent for { if len(deferred) == 0 {