diff --git a/LICENSES/github.com/google/nftables/LICENSE b/LICENSES/github.com/google/nftables/LICENSE new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/LICENSES/github.com/google/nftables/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/LICENSES/github.com/mdlayher/netlink/LICENSE.md b/LICENSES/github.com/mdlayher/netlink/LICENSE.md new file mode 100644 index 0000000..12f7105 --- /dev/null +++ b/LICENSES/github.com/mdlayher/netlink/LICENSE.md @@ -0,0 +1,9 @@ +# MIT License + +Copyright (C) 2016-2022 Matt Layher + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/LICENSES/github.com/mdlayher/socket/LICENSE.md b/LICENSES/github.com/mdlayher/socket/LICENSE.md new file mode 100644 index 0000000..3ccdb75 --- /dev/null +++ b/LICENSES/github.com/mdlayher/socket/LICENSE.md @@ -0,0 +1,9 @@ +# MIT License + +Copyright (C) 2021 Matt Layher + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/cmd/ateom-gvisor/main.go b/cmd/ateom-gvisor/main.go index ce468df..e71e4a5 100644 --- a/cmd/ateom-gvisor/main.go +++ b/cmd/ateom-gvisor/main.go @@ -24,7 +24,6 @@ import ( "net" "os" "runtime" - "sort" "sync" "cloud.google.com/go/compute/metadata" @@ -34,11 +33,15 @@ import ( "github.com/agent-substrate/substrate/internal/contextlogging" "github.com/agent-substrate/substrate/internal/proto/ateompb" "github.com/agent-substrate/substrate/internal/serverboot" + "github.com/google/nftables" + "github.com/google/nftables/binaryutil" + "github.com/google/nftables/expr" "github.com/hashicorp/go-reap" "github.com/vishvananda/netlink" "github.com/vishvananda/netns" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" sdktrace "go.opentelemetry.io/otel/sdk/trace" + "golang.org/x/sys/unix" "google.golang.org/grpc" "google.golang.org/grpc/reflection" ) @@ -50,6 +53,17 @@ var ( reapLock sync.RWMutex ) +const ( + hostVethName = "ateom0" + actorVethName = "eth0" + actorVethTempName = "ateom1" + hostVethCIDR = "10.200.0.1/30" + actorVethCIDR = "10.200.0.2/30" + actorVethGateway = "10.200.0.1" + actorVethIP = "10.200.0.2" + actorNftTableName = "ateom_actor" +) + func main() { flag.Parse() ctx := context.Background() @@ -73,8 +87,6 @@ func do(ctx context.Context) error { tp, err := serverboot.InitTracing(ctx, serverboot.TracingOptions{ ServiceName: "ateom-gvisor", Sampler: sdktrace.ParentBased(sdktrace.NeverSample()), - // ateom has no network connectivity once eth0 moves into the gvisor netns. - NoExporter: true, }) if err != nil { serverboot.Fatal(ctx, "Failed to initialize tracing", err) @@ -111,21 +123,6 @@ func do(ctx context.Context) error { return fmt.Errorf("while opening unix socket: %w", err) } - // On first start, scrape from eth0 interface. - // - // TODO(ateom): Save to boltdb database or file under ateom folder, so that - // if ateom process restarts, we still have it. - eth0Link, err := netlink.LinkByName("eth0") - if err != nil { - return fmt.Errorf("while getting netlink link for eth0: %w", err) - } - - eth0LinkInfo, err := scrapeLink(eth0Link) - if err != nil { - return fmt.Errorf("while scraping info from eth0: %w", err) - } - slog.InfoContext(ctx, "Eth0 link info", slog.Any("eth0", eth0LinkInfo)) - // Create a new network namespace that we will pass to gVisor. gVisor will // read the addresses and routes off of every link in the namespace, then // remove all the addresses and handle injecting packets into the interfaces @@ -136,7 +133,7 @@ func do(ctx context.Context) error { } actorLogger := ateom.NewActorLogger(syncedWriter, metadata.OnGCE()) - ateomService := NewService(interiorNetNS, eth0LinkInfo, actorLogger) + ateomService := NewService(interiorNetNS, actorLogger) svr := grpc.NewServer( grpc.StatsHandler(otelgrpc.NewServerHandler()), @@ -162,51 +159,20 @@ type AteomService struct { lock sync.Mutex interiorNetNS netns.NsHandle - eth0LinkInfo *SaveLinkInfo actorLogger *ateom.ActorLogger } var _ ateompb.AteomServer = (*AteomService)(nil) // NewService creates a new AteomService. -func NewService(interiorNetNS netns.NsHandle, eth0LinkInfo *SaveLinkInfo, actorLogger *ateom.ActorLogger) *AteomService { +func NewService(interiorNetNS netns.NsHandle, actorLogger *ateom.ActorLogger) *AteomService { svc := &AteomService{ interiorNetNS: interiorNetNS, - eth0LinkInfo: eth0LinkInfo, actorLogger: actorLogger, } return svc } -// ensureEth0InPodNetns moves eth0 back to the pod netns if a prior -// Run/Restore left it stuck in the interior netns. Idempotent: returns -// nil if eth0 is already in the pod netns or absent from both. -func ensureEth0InPodNetns(ctx context.Context, s *AteomService) error { - if _, err := netlink.LinkByName("eth0"); err == nil { - return nil - } - podNetNS, err := netns.Get() - if err != nil { - return fmt.Errorf("while getting pod netns: %w", err) - } - var moved bool - err = netNSDo(ctx, s.interiorNetNS, func(_ context.Context) error { - link, lookupErr := netlink.LinkByName("eth0") - if lookupErr != nil { - return nil - } - if mvErr := netlink.LinkSetNsFd(link, int(podNetNS)); mvErr != nil { - return fmt.Errorf("while moving eth0 to pod netns: %w", mvErr) - } - moved = true - return nil - }) - if moved { - slog.WarnContext(ctx, "Recovered eth0 from interior netns to pod netns") - } - return err -} - func (s *AteomService) RunWorkload(ctx context.Context, req *ateompb.RunWorkloadRequest) (resp *ateompb.RunWorkloadResponse, retErr error) { s.lock.Lock() defer s.lock.Unlock() @@ -218,57 +184,17 @@ func (s *AteomService) RunWorkload(ctx context.Context, req *ateompb.RunWorkload // * Correct runsc version is downloaded and placed on disk. // * All OCI bundles are set up, including for "pause" container. - if err := ensureEth0InPodNetns(ctx, s); err != nil { - return nil, fmt.Errorf("while recovering eth0 from prior failure: %w", err) + if err := s.setupActorNetwork(ctx); err != nil { + return nil, fmt.Errorf("while setting up actor network: %w", err) } - - // Move pod eth0 into interior netns - eth0Link, err := netlink.LinkByName("eth0") - if err != nil { - return nil, fmt.Errorf("while getting netlink link for eth0: %w", err) - } - if err := netlink.LinkSetNsFd(eth0Link, int(s.interiorNetNS)); err != nil { - return nil, fmt.Errorf("while moving eth0 into interior network namespace: %w", err) - } - // Roll eth0 back to the pod netns if any subsequent step errors, - // otherwise the worker pod is bricked for the next actor. defer func() { if retErr != nil { - if cleanupErr := ensureEth0InPodNetns(ctx, s); cleanupErr != nil { - slog.WarnContext(ctx, "Failed to roll back eth0 after Run failure", "err", cleanupErr) + if cleanupErr := s.cleanupActorNetwork(ctx); cleanupErr != nil { + slog.WarnContext(ctx, "Failed to clean up actor network after Run failure", "err", cleanupErr) } } }() - slog.InfoContext(ctx, "Restoring eth0 routes/addresses in interior netns") - err = netNSDo(ctx, s.interiorNetNS, func(ctx context.Context) error { - loLink, err := netlink.LinkByName("lo") - if err != nil { - return fmt.Errorf("while acquiring lo in interior netns: %w", err) - } - if err := netlink.LinkSetUp(loLink); err != nil { - return fmt.Errorf("while bringing up lo in interior netns: %w", err) - } - - eth0Link, err := netlink.LinkByName("eth0") - if err != nil { - return fmt.Errorf("while acquiring eth0 in interior netns: %w", err) - } - if err := netlink.LinkSetUp(eth0Link); err != nil { - return fmt.Errorf("while bringing up eth0 in interior netns: %w", err) - } - if err := restoreLink(ctx, eth0Link, s.eth0LinkInfo); err != nil { - return fmt.Errorf("while restoring eth0 routes and addresses in interior netns: %w", err) - } - if err := dumpNetInfo(ctx, "Interior NetNS "); err != nil { - return fmt.Errorf("while dumping links of interior netns: %w", err) - } - return nil - }) - if err != nil { - return nil, fmt.Errorf("while restoring eth0 in interior netns: %w", err) - } - rcmd := &runsc{ path: req.GetRunscPath(), actorTemplateNamespace: req.GetActorTemplateNamespace(), @@ -357,23 +283,8 @@ func (s *AteomService) CheckpointWorkload(ctx context.Context, req *ateompb.Chec return nil, fmt.Errorf("while deleting pause container: %w", err) } - // Yoink eth0 back to the pod netns. - podNetNS, err := netns.Get() - if err != nil { - return nil, fmt.Errorf("while getting pod netns: %w", err) - } - err = netNSDo(ctx, s.interiorNetNS, func(ctx context.Context) error { - eth0Link, err := netlink.LinkByName("eth0") - if err != nil { - return fmt.Errorf("while acquiring eth0 in interior netns: %w", err) - } - if err := netlink.LinkSetNsFd(eth0Link, int(podNetNS)); err != nil { - return fmt.Errorf("while sending eth0 back to pod netns: %w", err) - } - return nil - }) - if err != nil { - return nil, fmt.Errorf("while restoring eth0 in interior netns: %w", err) + if err := s.cleanupActorNetwork(ctx); err != nil { + return nil, fmt.Errorf("while cleaning up actor network: %w", err) } s.actorLogger.EmitLifecycleLog("Actor checkpointed", req.GetActorId(), req.GetActorTemplateName(), req.GetActorTemplateNamespace()) @@ -393,60 +304,17 @@ func (s *AteomService) RestoreWorkload(ctx context.Context, req *ateompb.Restore // * All OCI bundles are set up, including for "pause" container. // * Checkpoint downloaded and placed on disk - if err := ensureEth0InPodNetns(ctx, s); err != nil { - return nil, fmt.Errorf("while recovering eth0 from prior failure: %w", err) - } - - // Move pod eth0 into interior netns - eth0Link, err := netlink.LinkByName("eth0") - if err != nil { - return nil, fmt.Errorf("while getting netlink link for eth0: %w", err) + if err := s.setupActorNetwork(ctx); err != nil { + return nil, fmt.Errorf("while setting up actor network: %w", err) } - if err := netlink.LinkSetNsFd(eth0Link, int(s.interiorNetNS)); err != nil { - return nil, fmt.Errorf("while moving eth0 into interior network namespace: %w", err) - } - // Roll eth0 back to the pod netns if any subsequent step errors, - // otherwise the worker pod is bricked for the next actor. defer func() { if retErr != nil { - if cleanupErr := ensureEth0InPodNetns(ctx, s); cleanupErr != nil { - slog.WarnContext(ctx, "Failed to roll back eth0 after Restore failure", "err", cleanupErr) + if cleanupErr := s.cleanupActorNetwork(ctx); cleanupErr != nil { + slog.WarnContext(ctx, "Failed to clean up actor network after Restore failure", "err", cleanupErr) } } }() - // Restore route and IP information from save onto eth0. - slog.InfoContext(ctx, "Restoring eth0 routes/addresses in interior netns") - err = netNSDo(ctx, s.interiorNetNS, func(ctx context.Context) error { - loLink, err := netlink.LinkByName("lo") - if err != nil { - return fmt.Errorf("while acquiring lo in interior netns: %w", err) - } - if err := netlink.LinkSetUp(loLink); err != nil { - return fmt.Errorf("while bringing up lo in interior netns: %w", err) - } - - eth0Link, err := netlink.LinkByName("eth0") - if err != nil { - return fmt.Errorf("while acquiring eth0 in interior netns: %w", err) - } - if err := netlink.LinkSetUp(eth0Link); err != nil { - return fmt.Errorf("while bringing up eth0 in interior netns: %w", err) - } - if err := restoreLink(ctx, eth0Link, s.eth0LinkInfo); err != nil { - return fmt.Errorf("while restoring eth0 routes and addresses in interior netns: %w", err) - } - - if err := dumpNetInfo(ctx, "Interior NetNS "); err != nil { - return fmt.Errorf("while dumping links of interior netns: %w", err) - } - - return nil - }) - if err != nil { - return nil, fmt.Errorf("while restoring eth0 in interior netns: %w", err) - } - rcmd := &runsc{ path: req.GetRunscPath(), actorTemplateNamespace: req.GetActorTemplateNamespace(), @@ -485,6 +353,399 @@ func (s *AteomService) RestoreWorkload(ctx context.Context, req *ateompb.Restore return &ateompb.RestoreWorkloadResponse{}, nil } +func (s *AteomService) setupActorNetwork(ctx context.Context) (retErr error) { + // Build a fresh point-to-point network between the worker pod netns and the + // gVisor interior netns. The worker side keeps the pod's real eth0, creates + // ateom0 as the gateway, and moves only the veth peer into the actor netns. + // The actor side renames that peer to eth0 and installs a default route via + // the worker-side veth address. This replaces the old behavior of moving the + // Kubernetes-provided eth0 out of the worker pod. + // + // The nftables rules installed here are a compatibility bridge for the + // current router assumptions: actor egress is masqueraded behind the worker + // pod IP, and inbound traffic to the worker pod's HTTP port is DNAT'd to the + // actor veth IP. + // + // Clean up stale state from a failed prior activation before creating the + // next actor-side network. The worker currently runs one actor at a time. + if err := s.cleanupActorNetwork(ctx); err != nil { + return fmt.Errorf("while removing stale actor network: %w", err) + } + defer func() { + if retErr != nil { + if cleanupErr := s.cleanupActorNetwork(ctx); cleanupErr != nil { + slog.WarnContext(ctx, "Failed to clean up partially configured actor network", "err", cleanupErr) + } + } + }() + + podIP, err := podIPv4() + if err != nil { + return fmt.Errorf("while resolving pod IPv4 address: %w", err) + } + + hostAddr, err := parseAddr(hostVethCIDR) + if err != nil { + return err + } + + veth := &netlink.Veth{ + LinkAttrs: netlink.LinkAttrs{ + Name: hostVethName, + }, + PeerName: actorVethTempName, + } + if err := netlink.LinkAdd(veth); err != nil { + return fmt.Errorf("while creating actor veth pair: %w", err) + } + + hostLink, err := netlink.LinkByName(hostVethName) + if err != nil { + return fmt.Errorf("while getting host veth: %w", err) + } + if err := netlink.AddrReplace(hostLink, hostAddr); err != nil { + return fmt.Errorf("while assigning host veth address: %w", err) + } + if err := netlink.LinkSetUp(hostLink); err != nil { + return fmt.Errorf("while bringing up host veth: %w", err) + } + + actorLink, err := netlink.LinkByName(actorVethTempName) + if err != nil { + return fmt.Errorf("while getting actor veth peer: %w", err) + } + if err := netlink.LinkSetNsFd(actorLink, int(s.interiorNetNS)); err != nil { + return fmt.Errorf("while moving actor veth peer into interior netns: %w", err) + } + + if err := netNSDo(ctx, s.interiorNetNS, configureActorVeth); err != nil { + return fmt.Errorf("while configuring actor veth in interior netns: %w", err) + } + + if err := enableIPv4Forwarding(); err != nil { + return err + } + if err := installActorNftablesRules(podIP); err != nil { + return err + } + + if err := dumpNetInfo(ctx, "Pod NetNS "); err != nil { + return fmt.Errorf("while dumping pod netns links: %w", err) + } + if err := netNSDo(ctx, s.interiorNetNS, func(ctx context.Context) error { + return dumpNetInfo(ctx, "Interior NetNS ") + }); err != nil { + return fmt.Errorf("while dumping interior netns links: %w", err) + } + + return nil +} + +func configureActorVeth(ctx context.Context) error { + // Run inside the gVisor interior netns after setupActorNetwork moves the + // veth peer there. gVisor reads link names, addresses, and routes from this + // namespace when the workload starts, so the peer is deliberately renamed to + // eth0 and configured like a normal container interface: + // + // * lo is brought up for localhost behavior. + // * the temporary veth peer is renamed to eth0. + // * eth0 receives the actor-side /30 address. + // * the default route points to the worker-side veth gateway. + loLink, err := netlink.LinkByName("lo") + if err != nil { + return fmt.Errorf("while acquiring lo in interior netns: %w", err) + } + if err := netlink.LinkSetUp(loLink); err != nil { + return fmt.Errorf("while bringing up lo in interior netns: %w", err) + } + + actorLink, err := netlink.LinkByName(actorVethTempName) + if err != nil { + return fmt.Errorf("while acquiring actor veth in interior netns: %w", err) + } + if err := netlink.LinkSetName(actorLink, actorVethName); err != nil { + return fmt.Errorf("while renaming actor veth to %q: %w", actorVethName, err) + } + actorLink, err = netlink.LinkByName(actorVethName) + if err != nil { + return fmt.Errorf("while reacquiring actor veth in interior netns: %w", err) + } + + actorAddr, err := parseAddr(actorVethCIDR) + if err != nil { + return err + } + if err := netlink.AddrReplace(actorLink, actorAddr); err != nil { + return fmt.Errorf("while assigning actor veth address: %w", err) + } + if err := netlink.LinkSetUp(actorLink); err != nil { + return fmt.Errorf("while bringing up actor veth: %w", err) + } + + gw := net.ParseIP(actorVethGateway).To4() + if gw == nil { + return fmt.Errorf("invalid actor veth gateway %q", actorVethGateway) + } + if err := netlink.RouteReplace(&netlink.Route{ + LinkIndex: actorLink.Attrs().Index, + Gw: gw, + }); err != nil { + return fmt.Errorf("while installing actor default route: %w", err) + } + + return nil +} + +func (s *AteomService) cleanupActorNetwork(ctx context.Context) error { + // Remove all per-activation network state owned by ateom. Deleting the + // worker-side veth also deletes its peer when the pair is still connected, + // but failed setup can leave the peer already moved into the actor netns. + // For that reason cleanup also enters the interior netns and deletes either + // the final actor interface name or the temporary peer name if present. + // + // This function is intentionally idempotent so it can run before setup, after + // checkpoint, and from setup failure cleanup without requiring the caller to + // know how far network initialization progressed. + if err := removeActorNftablesRules(); err != nil { + return err + } + + if link, err := netlink.LinkByName(hostVethName); err == nil { + if err := netlink.LinkDel(link); err != nil { + return fmt.Errorf("while deleting host veth: %w", err) + } + } else if _, ok := err.(netlink.LinkNotFoundError); !ok { + return fmt.Errorf("while looking up host veth: %w", err) + } + + if err := netNSDo(ctx, s.interiorNetNS, func(_ context.Context) error { + for _, name := range []string{actorVethName, actorVethTempName} { + link, err := netlink.LinkByName(name) + if err == nil { + if err := netlink.LinkDel(link); err != nil { + return fmt.Errorf("while deleting interior veth %q: %w", name, err) + } + continue + } + if _, ok := err.(netlink.LinkNotFoundError); !ok { + return fmt.Errorf("while looking up interior veth %q: %w", name, err) + } + } + return nil + }); err != nil { + return fmt.Errorf("while cleaning interior netns links: %w", err) + } + + return nil +} + +func podIPv4() (net.IP, error) { + // Resolve the worker pod IPv4 address from the pod namespace's real eth0. + // Because eth0 now stays in the pod namespace, this IP remains available for + // both normal worker connectivity and the temporary inbound DNAT rule. + eth0Link, err := netlink.LinkByName("eth0") + if err != nil { + return nil, fmt.Errorf("while getting pod eth0: %w", err) + } + addrs, err := netlink.AddrList(eth0Link, netlink.FAMILY_V4) + if err != nil { + return nil, fmt.Errorf("while listing pod eth0 addresses: %w", err) + } + for _, addr := range addrs { + if addr.IP == nil { + continue + } + if ip := addr.IP.To4(); ip != nil { + return ip, nil + } + } + return nil, fmt.Errorf("pod eth0 has no IPv4 address") +} + +func parseAddr(cidr string) (*netlink.Addr, error) { + addr, err := netlink.ParseAddr(cidr) + if err != nil { + return nil, fmt.Errorf("while parsing address %q: %w", cidr, err) + } + return addr, nil +} + +func enableIPv4Forwarding() error { + // Forwarding is required because actor packets now enter the worker pod via + // the host-side veth and then leave through the pod's eth0. Without this, the + // kernel would not route traffic between those interfaces even though both + // live in the worker pod network namespace. + if err := os.WriteFile("/proc/sys/net/ipv4/ip_forward", []byte("1\n"), 0o644); err != nil { + return fmt.Errorf("while enabling IPv4 forwarding in worker pod netns: %w", err) + } + return nil +} + +func installActorNftablesRules(podIP net.IP) error { + // Install a dedicated nftables table for the active actor. Keeping all + // rules in an ateom-owned table makes cleanup simple and avoids mutating + // Kubernetes or CNI-managed chains directly. + // + // The temporary compatibility rules do three things: + // + // * postrouting: masquerade actor egress from 10.200.0.2 behind the worker + // pod IP so replies route back to the pod. + // * prerouting: DNAT traffic sent to the worker pod IP on TCP/80 to the + // actor veth IP on TCP/80, preserving existing inbound behavior. + // * forward: accept forwarded packets between the actor veth and pod eth0. + // + // This is not the final egress policy path. The later AgentGateway phase + // should replace the broad masquerade path with transparent TCP capture and + // default-deny rules. + if err := removeActorNftablesRules(); err != nil { + return err + } + + c := &nftables.Conn{} + table := &nftables.Table{ + Family: nftables.TableFamilyIPv4, + Name: actorNftTableName, + } + c.AddTable(table) + + prerouting := c.AddChain(&nftables.Chain{ + Name: "prerouting", + Table: table, + Type: nftables.ChainTypeNAT, + Hooknum: nftables.ChainHookPrerouting, + Priority: nftables.ChainPriorityNATDest, + }) + // TODO: Support inbound UDP DNAT for actors that expose UDP protocols such + // as QUIC. + // TODO: Replace the hard-coded HTTP port with the actor's configured + // inbound ports, either by adding one rule per port or by matching a set. + preroutingExprs := append(ipDestinationEqual(podIP.String()), tcpDestinationPortEqual(80)...) + preroutingExprs = append(preroutingExprs, + &expr.Immediate{ + Register: 1, + Data: net.ParseIP(actorVethIP).To4(), + }, + &expr.Immediate{ + Register: 2, + Data: binaryutil.BigEndian.PutUint16(80), + }, + &expr.NAT{ + Type: expr.NATTypeDestNAT, + Family: unix.NFPROTO_IPV4, + RegAddrMin: 1, + RegProtoMin: 2, + }, + ) + c.AddRule(&nftables.Rule{ + Table: table, + Chain: prerouting, + Exprs: preroutingExprs, + }) + + postrouting := c.AddChain(&nftables.Chain{ + Name: "postrouting", + Table: table, + Type: nftables.ChainTypeNAT, + Hooknum: nftables.ChainHookPostrouting, + Priority: nftables.ChainPriorityNATSource, + }) + c.AddRule(&nftables.Rule{ + Table: table, + Chain: postrouting, + Exprs: append(ipSourceEqual(actorVethIP), &expr.Masq{}), + }) + + acceptPolicy := nftables.ChainPolicyAccept + forward := c.AddChain(&nftables.Chain{ + Name: "forward", + Table: table, + Type: nftables.ChainTypeFilter, + Hooknum: nftables.ChainHookForward, + Priority: nftables.ChainPriorityFilter, + Policy: &acceptPolicy, + }) + c.AddRule(&nftables.Rule{ + Table: table, + Chain: forward, + Exprs: []expr.Any{ + &expr.Verdict{Kind: expr.VerdictAccept}, + }, + }) + + if err := c.Flush(); err != nil { + return fmt.Errorf("while installing actor nftables rules: %w", err) + } + return nil +} + +func removeActorNftablesRules() error { + // Delete the whole ateom nftables table if it exists. The table is + // per-worker and currently per-active-actor because this worker path runs at + // most one actor at a time. Missing tables are treated as already clean. + c := &nftables.Conn{} + tables, err := c.ListTablesOfFamily(nftables.TableFamilyIPv4) + if err != nil { + return fmt.Errorf("while listing nftables tables: %w", err) + } + for _, table := range tables { + if table.Name != actorNftTableName { + continue + } + c.DelTable(table) + if err := c.Flush(); err != nil { + return fmt.Errorf("while deleting actor nftables table: %w", err) + } + return nil + } + return nil +} + +func ipSourceEqual(ip string) []expr.Any { + return ipPayloadEqual(12, ip) +} + +func ipDestinationEqual(ip string) []expr.Any { + return ipPayloadEqual(16, ip) +} + +func ipPayloadEqual(offset uint32, ip string) []expr.Any { + return []expr.Any{ + &expr.Payload{ + DestRegister: 1, + Base: expr.PayloadBaseNetworkHeader, + Offset: offset, + Len: 4, + }, + &expr.Cmp{ + Op: expr.CmpOpEq, + Register: 1, + Data: net.ParseIP(ip).To4(), + }, + } +} + +func tcpDestinationPortEqual(port uint16) []expr.Any { + return []expr.Any{ + &expr.Meta{Key: expr.MetaKeyL4PROTO, Register: 1}, + &expr.Cmp{ + Op: expr.CmpOpEq, + Register: 1, + Data: []byte{unix.IPPROTO_TCP}, + }, + &expr.Payload{ + DestRegister: 1, + Base: expr.PayloadBaseTransportHeader, + Offset: 2, + Len: 2, + }, + &expr.Cmp{ + Op: expr.CmpOpEq, + Register: 1, + Data: binaryutil.BigEndian.PutUint16(port), + }, + } +} + func createNetNSWithoutSwitching(ctx context.Context, name string) (netns.NsHandle, error) { runtime.LockOSThread() defer runtime.UnlockOSThread() @@ -538,101 +799,6 @@ func netNSDo(ctx context.Context, targetNS netns.NsHandle, do func(context.Conte return nil } -type SaveLinkInfo struct { - Addresses []SaveAddr - Routes []SaveRoute -} - -type SaveAddr struct { - Addr net.IPNet - Scope int - Broadcast net.IP -} - -type SaveRoute struct { - Scope uint8 - Dst net.IPNet - Src net.IP - Gateway net.IP - Protocol int - Type int -} - -func scrapeLink(link netlink.Link) (*SaveLinkInfo, error) { - rawAddrs, err := netlink.AddrList(link, netlink.FAMILY_ALL) - if err != nil { - return nil, fmt.Errorf("while scraping addresses: %w", err) - } - - var addrs []SaveAddr - for _, rawAddr := range rawAddrs { - addr := SaveAddr{ - Addr: *rawAddr.IPNet, - Scope: rawAddr.Scope, - Broadcast: rawAddr.Broadcast, - } - addrs = append(addrs, addr) - } - - rawRoutes, err := netlink.RouteList(link, netlink.FAMILY_ALL) - if err != nil { - return nil, fmt.Errorf("while scraping routes: %w", err) - } - - var routes []SaveRoute - for _, rawRoute := range rawRoutes { - route := SaveRoute{ - Scope: uint8(rawRoute.Scope), - Dst: *rawRoute.Dst, - Src: rawRoute.Src, - Gateway: rawRoute.Gw, - Protocol: int(rawRoute.Protocol), - Type: rawRoute.Type, - } - routes = append(routes, route) - } - - return &SaveLinkInfo{ - Addresses: addrs, - Routes: routes, - }, nil -} - -func restoreLink(ctx context.Context, link netlink.Link, info *SaveLinkInfo) error { - for i, saveAddr := range info.Addresses { - addr := &netlink.Addr{ - IPNet: &saveAddr.Addr, - Scope: saveAddr.Scope, - Broadcast: saveAddr.Broadcast, - } - if err := netlink.AddrReplace(link, addr); err != nil { - return fmt.Errorf("while restoring addr %d onto link: %w", i, err) - } - } - // Link-scope routes must be installed before gateway routes so the - // kernel can resolve each gateway's nexthop (fib_check_nh_v4_gw). - routes := append([]SaveRoute(nil), info.Routes...) - sort.SliceStable(routes, func(i, j int) bool { - return routes[i].Gateway == nil && routes[j].Gateway != nil - }) - for i, saveRoute := range routes { - route := &netlink.Route{ - LinkIndex: link.Attrs().Index, - Scope: netlink.Scope(saveRoute.Scope), - Dst: &saveRoute.Dst, - Src: saveRoute.Src, - Gw: saveRoute.Gateway, - Protocol: netlink.RouteProtocol(saveRoute.Protocol), - Type: saveRoute.Type, - } - slog.InfoContext(ctx, "Restoring route", slog.String("dst", saveRoute.Dst.String()), slog.String("src", saveRoute.Src.String()), slog.String("gateway", saveRoute.Gateway.String())) - if err := netlink.RouteReplace(route); err != nil { - return fmt.Errorf("while restoring route %d: %w", i, err) - } - } - return nil -} - func dumpNetInfo(ctx context.Context, prefix string) error { links, err := netlink.LinkList() if err != nil { diff --git a/go.mod b/go.mod index 901317d..82603ec 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/envoyproxy/go-control-plane/envoy v1.37.0 github.com/google/go-cmp v0.7.0 github.com/google/go-containerregistry v0.21.5 + github.com/google/nftables v0.3.0 github.com/google/uuid v1.6.0 github.com/hashicorp/go-reap v0.0.0-20260220095743-4e27870b4f51 github.com/klauspost/compress v1.18.5 @@ -118,6 +119,8 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/mdlayher/netlink v1.7.3-0.20250113171957-fbb4dce95f42 // indirect + github.com/mdlayher/socket v0.5.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/moby/spdystream v0.5.1 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/go.sum b/go.sum index cbfa6fb..1b7e609 100644 --- a/go.sum +++ b/go.sum @@ -179,6 +179,8 @@ github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian/v3 v3.3.3 h1:DIhPTQrbPkgs2yJYdXU/eNACCG5DVQjySNRNlflZ9Fc= github.com/google/martian/v3 v3.3.3/go.mod h1:iEPrYcgCF7jA9OtScMFQyAlZZ4YXTKEtJ1E6RWzmBA0= +github.com/google/nftables v0.3.0 h1:bkyZ0cbpVeMHXOrtlFc8ISmfVqq5gPJukoYieyVmITg= +github.com/google/nftables v0.3.0/go.mod h1:BCp9FsrbF1Fn/Yu6CLUc9GGZFw/+hsxfluNXXmxBfRM= github.com/google/pprof v0.0.0-20250602020802-c6617b811d0e h1:FJta/0WsADCe1r9vQjdHbd3KuiLPu7Y9WlyLGwMUNyE= github.com/google/pprof v0.0.0-20250602020802-c6617b811d0e/go.mod h1:5hDyRhoBCxViHszMt12TnOpEI4VVi+U8Gm9iphldiMA= github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0= @@ -209,6 +211,10 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/mdlayher/netlink v1.7.3-0.20250113171957-fbb4dce95f42 h1:A1Cq6Ysb0GM0tpKMbdCXCIfBclan4oHk1Jb+Hrejirg= +github.com/mdlayher/netlink v1.7.3-0.20250113171957-fbb4dce95f42/go.mod h1:BB4YCPDOzfy7FniQ/lxuYQ3dgmM2cZumHbK8RpTjN2o= +github.com/mdlayher/socket v0.5.0 h1:ilICZmJcQz70vrWVes1MFera4jGiWNocSkykwwoy3XI= +github.com/mdlayher/socket v0.5.0/go.mod h1:WkcBFfvyG8QENs5+hfQPl1X6Jpd2yeLIYgrGFmJiJxI= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/moby/spdystream v0.5.1 h1:9sNYeYZUcci9R6/w7KDaFWEWeV4LStVG78Mpyq/Zm/Y= diff --git a/internal/serverboot/serverboot.go b/internal/serverboot/serverboot.go index 275e474..e43e815 100644 --- a/internal/serverboot/serverboot.go +++ b/internal/serverboot/serverboot.go @@ -51,10 +51,6 @@ type TracingOptions struct { // Sampler is required. ateapi typically uses ParentBased(AlwaysSample); // atelet/ateom-gvisor use ParentBased(NeverSample). Sampler sdktrace.Sampler - // NoExporter skips the OTLP exporter. Used by binaries with no - // network egress (ateom-gvisor, after eth0 moves into the gvisor - // netns). - NoExporter bool } // InitTracing registers a global TracerProvider with the given options @@ -74,16 +70,14 @@ func InitTracing(ctx context.Context, opts TracingOptions) (*sdktrace.TracerProv sdktrace.WithResource(res), sdktrace.WithSampler(opts.Sampler), } - if !opts.NoExporter { - exporter, err := otlptracegrpc.New(ctx, - // GKE managed traces doesn't support validating the TLS certs of the collector. - otlptracegrpc.WithInsecure(), - ) - if err != nil { - return nil, fmt.Errorf("create OTLP exporter: %w", err) - } - tpOpts = append(tpOpts, sdktrace.WithBatcher(exporter)) + exporter, err := otlptracegrpc.New(ctx, + // GKE managed traces doesn't support validating the TLS certs of the collector. + otlptracegrpc.WithInsecure(), + ) + if err != nil { + return nil, fmt.Errorf("create OTLP exporter: %w", err) } + tpOpts = append(tpOpts, sdktrace.WithBatcher(exporter)) tp := sdktrace.NewTracerProvider(tpOpts...) otel.SetTracerProvider(tp)