Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.llamalad7.mixinextras.sugar.Local;
import dev.ryanhcode.sable.Sable;
import dev.ryanhcode.sable.SableClient;
import dev.ryanhcode.sable.SableConfig;
import dev.ryanhcode.sable.mixinterface.udp.ConnectionExtension;
import dev.ryanhcode.sable.network.udp.SableUDPPacket;
import dev.ryanhcode.sable.network.udp.handler.SableUDPChannelHandlerClient;
Expand All @@ -24,6 +25,7 @@

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;

@Mixin(Connection.class)
public abstract class ConnectionMixin implements ConnectionExtension {
Expand All @@ -42,11 +44,14 @@ public abstract class ConnectionMixin implements ConnectionExtension {
if (this.sable$udpChannel != null && this.sable$udpChannel.isOpen()) {
this.sable$udpChannel = null;

Sable.LOGGER.debug("[sable-udp] closing UDP channel on disconnect, reason={}",
disconnectionDetails != null ? disconnectionDetails.reason().getString() : "<no details>");

channel.close().awaitUninterruptibly().addListener((x) -> {
if (x.isSuccess()) {
Sable.LOGGER.info("Closed UDP channel!");
} else {
Sable.LOGGER.info("Failed to close UDP channel", x.cause());
Sable.LOGGER.warn("Failed to close UDP channel", x.cause());
}
});
}
Expand All @@ -57,8 +62,17 @@ public abstract class ConnectionMixin implements ConnectionExtension {
return this.sable$udpChannel;
}

@Unique
private static final long SABLE$UDP_CONNECT_TIMEOUT_MS = 5_000L;

@Inject(method = "connect", at = @At("TAIL"))
private static void sable$connect(final InetSocketAddress inetSocketAddress, final boolean bl, final Connection connection, final CallbackInfoReturnable<ChannelFuture> cir) {
if (SableConfig.DISABLE_UDP_PIPELINE.get()) {
Sable.LOGGER.debug("[sable-udp] client UDP pipeline disabled via config; skipping bootstrap for {}", inetSocketAddress);
return;
}

final long startNs = System.nanoTime();
final boolean useNativeTransport = SableClient.useNativeTransport();

final Class<? extends Channel> channelClass;
Expand All @@ -72,35 +86,96 @@ public abstract class ConnectionMixin implements ConnectionExtension {
eventLoopGroup = Connection.NETWORK_WORKER_GROUP.get();
}

Sable.LOGGER.info("Starting remote client UDP channel future");
Sable.LOGGER.info("Starting remote client UDP channel future (remote={}, transport={})",
inetSocketAddress, channelClass.getSimpleName());

final ChannelFuture channelFuture;
try {
channelFuture = new Bootstrap().group(eventLoopGroup).handler(new ChannelInitializer<>() {
@Override
protected void initChannel(final Channel channel) {
channel.config().setOption(ChannelOption.SO_KEEPALIVE, true);
SableUDPPacket.configureSerialization(channel.pipeline(), PacketFlow.CLIENTBOUND, false, null);
sable$setupChannel(channel, connection);
}
})
.channel(channelClass)
.connect(inetSocketAddress.getAddress(), inetSocketAddress.getPort());
} catch (final Throwable t) {
Sable.LOGGER.error("[sable-udp] Bootstrap.connect threw for {} (elapsedMs={}); continuing without UDP",
inetSocketAddress, (System.nanoTime() - startNs) / 1_000_000L, t);
return;
}

final ChannelFuture channelFuture = new Bootstrap().group(eventLoopGroup).handler(new ChannelInitializer<>() {
@Override
protected void initChannel(final Channel channel) {
channel.config().setOption(ChannelOption.SO_KEEPALIVE, true);
SableUDPPacket.configureSerialization(channel.pipeline(), PacketFlow.CLIENTBOUND, false, null);
sable$setupChannel(channel, connection);
}
})
.channel(channelClass)
.connect(inetSocketAddress.getAddress(), inetSocketAddress.getPort());
final boolean completed = channelFuture.awaitUninterruptibly(SABLE$UDP_CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
final long elapsedMs = (System.nanoTime() - startNs) / 1_000_000L;

channelFuture.syncUninterruptibly();
if (!completed) {
Sable.LOGGER.warn("[sable-udp] UDP connect did not complete within {}ms for remote={} (elapsedMs={}); cancelling future and falling back to TCP-only. This usually indicates the OS-level UDP connect() call is blocking - check antivirus/firewall outbound UDP rules.",
SABLE$UDP_CONNECT_TIMEOUT_MS, inetSocketAddress, elapsedMs);
channelFuture.cancel(true);
channelFuture.addListener((ChannelFutureListener) f -> {
if (f.isCancelled()) {
return;
}
if (f.cause() != null) {
Sable.LOGGER.warn("[sable-udp] late UDP connect failure for {}: {}", inetSocketAddress, f.cause().toString());
} else if (f.isSuccess() && f.channel() != null) {
Sable.LOGGER.debug("[sable-udp] late UDP connect success for {} after timeout; closing channel", inetSocketAddress);
f.channel().close();
}
});
return;
}

if (!channelFuture.isSuccess()) {
Sable.LOGGER.warn("[sable-udp] UDP connect failed for remote={} (elapsedMs={}); falling back to TCP-only. cause={}",
inetSocketAddress, elapsedMs, channelFuture.cause() != null ? channelFuture.cause().toString() : "<no cause>");
return;
}

Sable.LOGGER.debug("[sable-udp] UDP connect succeeded for remote={} (elapsedMs={})", inetSocketAddress, elapsedMs);
}

@Inject(method = "connectToLocalServer", at = @At("TAIL"))
private static void sable$connectToLocalServer(final SocketAddress socketAddress, final CallbackInfoReturnable<Connection> cir, @Local final Connection connection) {
if (SableConfig.DISABLE_UDP_PIPELINE.get()) {
Sable.LOGGER.debug("[sable-udp] local UDP pipeline disabled via config; skipping bootstrap for {}", socketAddress);
return;
}

final long startNs = System.nanoTime();
Sable.LOGGER.info("Starting local client UDP channel future");

final ChannelFuture channelFuture = new Bootstrap().group(Connection.LOCAL_WORKER_GROUP.get()).handler(new ChannelInitializer<>() {
@Override
protected void initChannel(final Channel channel) {
SableUDPPacket.configureInMemoryPipeline(channel.pipeline(), PacketFlow.CLIENTBOUND);
sable$setupChannel(channel, connection);
}
}).channel(LocalChannel.class).connect(socketAddress).syncUninterruptibly();
final ChannelFuture channelFuture;
try {
channelFuture = new Bootstrap().group(Connection.LOCAL_WORKER_GROUP.get()).handler(new ChannelInitializer<>() {
@Override
protected void initChannel(final Channel channel) {
SableUDPPacket.configureInMemoryPipeline(channel.pipeline(), PacketFlow.CLIENTBOUND);
sable$setupChannel(channel, connection);
}
}).channel(LocalChannel.class).connect(socketAddress);
} catch (final Throwable t) {
Sable.LOGGER.error("[sable-udp] local Bootstrap.connect threw for {} (elapsedMs={})",
socketAddress, (System.nanoTime() - startNs) / 1_000_000L, t);
return;
}

channelFuture.syncUninterruptibly();
final boolean completed = channelFuture.awaitUninterruptibly(SABLE$UDP_CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
final long elapsedMs = (System.nanoTime() - startNs) / 1_000_000L;
if (!completed) {
Sable.LOGGER.warn("[sable-udp] local UDP connect timed out after {}ms for {} (elapsedMs={}); cancelling",
SABLE$UDP_CONNECT_TIMEOUT_MS, socketAddress, elapsedMs);
channelFuture.cancel(true);
return;
}
if (!channelFuture.isSuccess()) {
Sable.LOGGER.warn("[sable-udp] local UDP connect failed for {} (elapsedMs={}): {}",
socketAddress, elapsedMs, channelFuture.cause() != null ? channelFuture.cause().toString() : "<no cause>");
return;
}
Sable.LOGGER.debug("[sable-udp] local UDP connect succeeded for {} (elapsedMs={})", socketAddress, elapsedMs);
}

@Unique
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,16 @@ public void handle(final PacketContext context) {
final ConnectionExtension connectionExtension = (ConnectionExtension) connection;
final Channel channel = connectionExtension.sable$getUDPChannel();

final InetSocketAddress baseAddress = ((InetSocketAddress) connection.getRemoteAddress());
if (channel == null) {
Sable.LOGGER.warn("[sable-udp] Received UDP activation packet but no UDP channel is available on this connection; remaining on TCP-only");
return;
}

if (!(connection.getRemoteAddress() instanceof final InetSocketAddress baseAddress)) {
Sable.LOGGER.warn("[sable-udp] Received UDP activation packet but remote address is not an InetSocketAddress ({}); skipping UDP auth", connection.getRemoteAddress());
return;
}

final InetSocketAddress remoteAddress = new InetSocketAddress(baseAddress.getAddress(), baseAddress.getPort());

Sable.LOGGER.info("Received authentication request, sending response over UDP to {}", remoteAddress);
Expand All @@ -54,7 +63,12 @@ public void handle(final PacketContext context) {
final AddressedSableUDPPacket envelope = new AddressedSableUDPPacket(packet, remoteAddress);
final ChannelFuture writeFuture = channel.writeAndFlush(envelope);

writeFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
writeFuture.addListener((ChannelFutureListener) f -> {
if (!f.isSuccess()) {
Sable.LOGGER.warn("[sable-udp] Failed to send UDP auth response to {}: {}",
remoteAddress, f.cause() != null ? f.cause().toString() : "<no cause>");
}
});
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ public SableUDPChannelHandlerClient(final Connection connection) {
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
Sable.LOGGER.error("UDP channel exception caught", cause);
Sable.LOGGER.error("UDP channel exception caught (local={}, remote={})",
ctx.channel().localAddress(), ctx.channel().remoteAddress(), cause);
}

@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
Sable.LOGGER.info("Client UDP channel active");
Sable.LOGGER.debug("[sable-udp] channel id={}, local={}, remote={}",
ctx.channel().id(), ctx.channel().localAddress(), ctx.channel().remoteAddress());

this.channel = ctx.channel();
((ConnectionExtension) this.connection).sable$setUDPChannel(this.channel);
Expand Down