Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions cmd/api/api/fork_mailbox.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package api

import (
"encoding/json"
"fmt"
"time"

"github.com/kernel/hypeman/lib/instances"
"github.com/kernel/hypeman/lib/oapi"
)

func toDomainForkMailboxes(mailboxes *[]oapi.ForkMailboxPayload) ([]instances.ForkMailboxPayload, error) {
if mailboxes == nil || len(*mailboxes) == 0 {
return nil, nil
}

out := make([]instances.ForkMailboxPayload, 0, len(*mailboxes))
for _, mailbox := range *mailboxes {
payload, err := json.Marshal(mailbox.Payload)
if err != nil {
return nil, fmt.Errorf("marshal mailbox %q payload: %w", mailbox.Name, err)
}
waitForAck := mailbox.WaitForAck != nil && *mailbox.WaitForAck
var ackTimeout time.Duration
if mailbox.AckTimeoutMs != nil {
ackTimeout = time.Duration(*mailbox.AckTimeoutMs) * time.Millisecond
}
out = append(out, instances.ForkMailboxPayload{
Name: mailbox.Name,
Token: mailbox.Token,
Payload: payload,
WaitForAck: waitForAck,
AckTimeout: ackTimeout,
})
}
return out, nil
}
8 changes: 8 additions & 0 deletions cmd/api/api/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,11 +655,19 @@ func (s *ApiService) ForkInstance(ctx context.Context, request oapi.ForkInstance
if request.Body.TargetState != nil {
targetState = instances.State(*request.Body.TargetState)
}
mailboxes, err := toDomainForkMailboxes(request.Body.Mailboxes)
if err != nil {
return oapi.ForkInstance400JSONResponse{
Code: "invalid_request",
Message: err.Error(),
}, nil
}

result, err := s.InstanceManager.ForkInstance(ctx, inst.Id, instances.ForkInstanceRequest{
Name: request.Body.Name,
FromRunning: request.Body.FromRunning != nil && *request.Body.FromRunning,
TargetState: targetState,
Mailboxes: mailboxes,
})
if err != nil {
switch {
Expand Down
15 changes: 15 additions & 0 deletions cmd/api/api/instances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,13 +1165,22 @@ func TestForkInstance_Success(t *testing.T) {
result: forked,
}
svc.InstanceManager = mockMgr
waitForAck := true
ackTimeoutMS := 1500

resp, err := svc.ForkInstance(
mw.WithResolvedInstance(ctx(), source.Id, source),
oapi.ForkInstanceRequestObject{
Id: source.Id,
Body: &oapi.ForkInstanceRequest{
Name: "forked-instance",
Mailboxes: &[]oapi.ForkMailboxPayload{{
Name: "kernel.identity.v1",
Token: "template-token",
Payload: map[string]interface{}{"instance_name": "forked-instance"},
WaitForAck: &waitForAck,
AckTimeoutMs: &ackTimeoutMS,
}},
},
},
)
Expand All @@ -1185,6 +1194,12 @@ func TestForkInstance_Success(t *testing.T) {
assert.Equal(t, "forked-instance", mockMgr.lastReq.Name)
assert.False(t, mockMgr.lastReq.FromRunning)
assert.Equal(t, instances.State(""), mockMgr.lastReq.TargetState)
require.Len(t, mockMgr.lastReq.Mailboxes, 1)
assert.Equal(t, "kernel.identity.v1", mockMgr.lastReq.Mailboxes[0].Name)
assert.Equal(t, "template-token", mockMgr.lastReq.Mailboxes[0].Token)
assert.True(t, mockMgr.lastReq.Mailboxes[0].WaitForAck)
assert.Equal(t, 1500*time.Millisecond, mockMgr.lastReq.Mailboxes[0].AckTimeout)
assert.JSONEq(t, `{"instance_name":"forked-instance"}`, string(mockMgr.lastReq.Mailboxes[0].Payload))
}

func TestForkInstance_NotSupported(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions cmd/api/api/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ func (s *ApiService) ForkSnapshot(ctx context.Context, request oapi.ForkSnapshot
domainReq := instances.ForkSnapshotRequest{
Name: request.Body.Name,
}
mailboxes, err := toDomainForkMailboxes(request.Body.Mailboxes)
if err != nil {
return oapi.ForkSnapshot400JSONResponse{Code: "invalid_request", Message: err.Error()}, nil
}
domainReq.Mailboxes = mailboxes
if request.Body.TargetState != nil {
domainReq.TargetState = instances.State(*request.Body.TargetState)
}
Expand Down
15 changes: 15 additions & 0 deletions cmd/api/api/snapshots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,20 @@ func TestForkSnapshotSuccess(t *testing.T) {
result: forked,
}
svc.InstanceManager = mockMgr
waitForAck := true
ackTimeoutMS := 2500

resp, err := svc.ForkSnapshot(ctx(), oapi.ForkSnapshotRequestObject{
SnapshotId: "snap-123",
Body: &oapi.ForkSnapshotRequest{
Name: "forked-instance",
Mailboxes: &[]oapi.ForkMailboxPayload{{
Name: "kernel.identity.v1",
Token: "template-token",
Payload: map[string]interface{}{"instance_name": "forked-instance"},
WaitForAck: &waitForAck,
AckTimeoutMs: &ackTimeoutMS,
}},
},
})
require.NoError(t, err)
Expand All @@ -86,4 +95,10 @@ func TestForkSnapshotSuccess(t *testing.T) {
assert.Equal(t, "snap-123", mockMgr.lastID)
require.NotNil(t, mockMgr.lastReq)
assert.Equal(t, "forked-instance", mockMgr.lastReq.Name)
require.Len(t, mockMgr.lastReq.Mailboxes, 1)
assert.Equal(t, "kernel.identity.v1", mockMgr.lastReq.Mailboxes[0].Name)
assert.Equal(t, "template-token", mockMgr.lastReq.Mailboxes[0].Token)
assert.True(t, mockMgr.lastReq.Mailboxes[0].WaitForAck)
assert.Equal(t, 2500*time.Millisecond, mockMgr.lastReq.Mailboxes[0].AckTimeout)
assert.JSONEq(t, `{"instance_name":"forked-instance"}`, string(mockMgr.lastReq.Mailboxes[0].Payload))
}
22 changes: 19 additions & 3 deletions lib/instances/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ func (m *manager) forkInstance(ctx context.Context, id string, req ForkInstanceR
if err != nil {
return nil, "", err
}
if len(req.Mailboxes) > 0 {
if source.State == StateStopped {
return nil, "", fmt.Errorf("%w: mailboxes require a standby snapshot fork", ErrInvalidRequest)
}
if targetState != StateRunning {
return nil, "", fmt.Errorf("%w: mailboxes require target_state %s", ErrInvalidRequest, StateRunning)
}
if err := validateForkMailboxHypervisor(source.HypervisorType); err != nil {
return nil, "", err
}
}

switch source.State {
case StateRunning:
Expand Down Expand Up @@ -81,7 +92,9 @@ func (m *manager) forkInstance(ctx context.Context, id string, req ForkInstanceR
// the source data directory. Restore the fork while source remains standby and
// under lock, then restore the source.
if forkErr == nil && targetState == StateRunning {
restoredFork, err := m.applyForkTargetState(ctx, forked.Id, StateRunning)
restoredFork, err := m.applyForkTargetState(ctx, forked.Id, StateRunning, restoreInstanceOptions{
Mailboxes: req.Mailboxes,
})
if err != nil {
forkErr = fmt.Errorf("restore forked instance before source restore: %w", err)
if cleanupErr := m.cleanupForkInstanceOnError(ctx, forked.Id); cleanupErr != nil {
Expand Down Expand Up @@ -377,6 +390,9 @@ func validateForkRequest(req ForkInstanceRequest) error {
if req.TargetState != "" && req.TargetState != StateStopped && req.TargetState != StateStandby && req.TargetState != StateRunning {
return fmt.Errorf("%w: invalid fork target state %q (must be one of %s, %s, %s)", ErrInvalidRequest, req.TargetState, StateStopped, StateStandby, StateRunning)
}
if err := validateForkMailboxes(req.Mailboxes); err != nil {
return err
}
return nil
}

Expand All @@ -401,7 +417,7 @@ func resolveForkTargetState(requested State, sourceState State) (State, error) {
return requested, nil
}

func (m *manager) applyForkTargetState(ctx context.Context, forkID string, target State) (*Instance, error) {
func (m *manager) applyForkTargetState(ctx context.Context, forkID string, target State, opts restoreInstanceOptions) (*Instance, error) {
lock := m.getInstanceLock(forkID)
lock.Lock()
defer lock.Unlock()
Expand Down Expand Up @@ -442,7 +458,7 @@ func (m *manager) applyForkTargetState(ctx context.Context, forkID string, targe
case StateStandby:
switch target {
case StateRunning:
inst, err := m.restoreInstance(ctx, forkID)
inst, err := m.restoreInstanceWithOptions(ctx, forkID, opts)
return returnWithReadiness(inst, err, current.NetworkEnabled && !current.SkipGuestAgent)
case StateStopped:
if err := os.RemoveAll(m.paths.InstanceSnapshotLatest(forkID)); err != nil {
Expand Down
Loading
Loading