diff --git a/api/src/main/java/io/grpc/ForwardingServerBuilder.java b/api/src/main/java/io/grpc/ForwardingServerBuilder.java index 9cef7cfa331..d1f183dd824 100644 --- a/api/src/main/java/io/grpc/ForwardingServerBuilder.java +++ b/api/src/main/java/io/grpc/ForwardingServerBuilder.java @@ -201,6 +201,12 @@ public Server build() { return delegate().build(); } + @Override + public T addMetricSink(MetricSink metricSink) { + delegate().addMetricSink(metricSink); + return thisT(); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString(); diff --git a/api/src/main/java/io/grpc/InternalTcpMetrics.java b/api/src/main/java/io/grpc/InternalTcpMetrics.java new file mode 100644 index 00000000000..3dd89b6f76c --- /dev/null +++ b/api/src/main/java/io/grpc/InternalTcpMetrics.java @@ -0,0 +1,98 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * TCP Metrics defined to be shared across transport implementations. + * These metrics and their definitions are specified in + * gRFC + * A80. + */ +@Internal +public final class InternalTcpMetrics { + + private InternalTcpMetrics() { + } + + private static final List OPTIONAL_LABELS = Arrays.asList( + "network.local.address", + "network.local.port", + "network.peer.address", + "network.peer.port"); + + public static final DoubleHistogramMetricInstrument MIN_RTT_INSTRUMENT = + MetricInstrumentRegistry.getDefaultRegistry() + .registerDoubleHistogram( + "grpc.tcp.min_rtt", + "Minimum round-trip time of a TCP connection", + "s", + Collections.emptyList(), + Collections.emptyList(), + OPTIONAL_LABELS, + false); + + public static final LongCounterMetricInstrument CONNECTIONS_CREATED_INSTRUMENT = + MetricInstrumentRegistry + .getDefaultRegistry() + .registerLongCounter( + "grpc.tcp.connections_created", + "The total number of TCP connections established.", + "{connection}", + Collections.emptyList(), + OPTIONAL_LABELS, + false); + + public static final LongUpDownCounterMetricInstrument CONNECTION_COUNT_INSTRUMENT = + MetricInstrumentRegistry + .getDefaultRegistry() + .registerLongUpDownCounter( + "grpc.tcp.connection_count", + "The current number of active TCP connections.", + "{connection}", + Collections.emptyList(), + OPTIONAL_LABELS, + false); + + public static final LongCounterMetricInstrument PACKETS_RETRANSMITTED_INSTRUMENT = + MetricInstrumentRegistry + .getDefaultRegistry() + .registerLongCounter( + "grpc.tcp.packets_retransmitted", + "The total number of packets retransmitted for all TCP connections.", + "{packet}", + Collections.emptyList(), + OPTIONAL_LABELS, + false); + + public static final LongCounterMetricInstrument RECURRING_RETRANSMITS_INSTRUMENT = + MetricInstrumentRegistry + .getDefaultRegistry() + .registerLongCounter( + "grpc.tcp.recurring_retransmits", + "The total number of times the retransmit timer " + + "popped for all TCP connections.", + "{timeout}", + Collections.emptyList(), + OPTIONAL_LABELS, + false); + +} diff --git a/api/src/main/java/io/grpc/NameResolver.java b/api/src/main/java/io/grpc/NameResolver.java index 53dbc5d6888..b47bd93332b 100644 --- a/api/src/main/java/io/grpc/NameResolver.java +++ b/api/src/main/java/io/grpc/NameResolver.java @@ -355,7 +355,7 @@ public static final class Args { @Nullable private final ChannelLogger channelLogger; @Nullable private final Executor executor; @Nullable private final String overrideAuthority; - @Nullable private final MetricRecorder metricRecorder; + private final MetricRecorder metricRecorder; @Nullable private final NameResolverRegistry nameResolverRegistry; @Nullable private final IdentityHashMap, Object> customArgs; @@ -369,7 +369,8 @@ private Args(Builder builder) { this.channelLogger = builder.channelLogger; this.executor = builder.executor; this.overrideAuthority = builder.overrideAuthority; - this.metricRecorder = builder.metricRecorder; + this.metricRecorder = builder.metricRecorder != null ? builder.metricRecorder + : new MetricRecorder() {}; this.nameResolverRegistry = builder.nameResolverRegistry; this.customArgs = cloneCustomArgs(builder.customArgs); } @@ -497,7 +498,6 @@ public String getOverrideAuthority() { /** * Returns the {@link MetricRecorder} that the channel uses to record metrics. */ - @Nullable public MetricRecorder getMetricRecorder() { return metricRecorder; } @@ -680,7 +680,7 @@ public Builder setArg(Key key, T value) { * See {@link Args#getMetricRecorder()}. This is an optional field. */ public Builder setMetricRecorder(MetricRecorder metricRecorder) { - this.metricRecorder = metricRecorder; + this.metricRecorder = checkNotNull(metricRecorder); return this; } diff --git a/api/src/main/java/io/grpc/ServerBuilder.java b/api/src/main/java/io/grpc/ServerBuilder.java index cd1cddbb93f..3effe593e57 100644 --- a/api/src/main/java/io/grpc/ServerBuilder.java +++ b/api/src/main/java/io/grpc/ServerBuilder.java @@ -435,6 +435,17 @@ public T setBinaryLog(BinaryLog binaryLog) { */ public abstract Server build(); + /** + * Adds a metric sink to the server. + * + * @param metricSink the metric sink to add. + * @return this + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/12693") + public T addMetricSink(MetricSink metricSink) { + return thisT(); + } + /** * Returns the correctly typed version of the builder. */ diff --git a/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java b/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java index c926c853472..5f0885883a5 100644 --- a/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java +++ b/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java @@ -68,7 +68,7 @@ private BinderServerBuilder( serverImplBuilder = new ServerImplBuilder( - streamTracerFactories -> { + (streamTracerFactories, metricRecorder) -> { internalBuilder.setStreamTracerFactories(streamTracerFactories); BinderServer server = internalBuilder.build(); BinderInternal.setIBinder(binderReceiver, server.getHostBinder()); diff --git a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java index 6c10ced4652..6023fb14aa9 100644 --- a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java +++ b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java @@ -24,6 +24,7 @@ import io.grpc.ChannelCredentials; import io.grpc.ChannelLogger; import io.grpc.HttpConnectProxiedSocketAddress; +import io.grpc.MetricRecorder; import java.io.Closeable; import java.net.SocketAddress; import java.util.Collection; @@ -91,6 +92,8 @@ final class ClientTransportOptions { private Attributes eagAttributes = Attributes.EMPTY; @Nullable private String userAgent; @Nullable private HttpConnectProxiedSocketAddress connectProxiedSocketAddr; + private MetricRecorder metricRecorder = new MetricRecorder() { + }; public ChannelLogger getChannelLogger() { return channelLogger; @@ -101,6 +104,15 @@ public ClientTransportOptions setChannelLogger(ChannelLogger channelLogger) { return this; } + public MetricRecorder getMetricRecorder() { + return metricRecorder; + } + + public ClientTransportOptions setMetricRecorder(MetricRecorder metricRecorder) { + this.metricRecorder = Preconditions.checkNotNull(metricRecorder, "metricRecorder"); + return this; + } + public String getAuthority() { return authority; } diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 7a48bf642fe..ce31921e316 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -80,6 +80,7 @@ final class InternalSubchannel implements InternalInstrumented, Tr private final InternalChannelz channelz; private final CallTracer callsTracer; private final ChannelTracer channelTracer; + private final MetricRecorder metricRecorder; private final ChannelLogger channelLogger; private final boolean reconnectDisabled; @@ -191,6 +192,7 @@ protected void handleNotInUse() { this.scheduledExecutor = scheduledExecutor; this.connectingTimer = stopwatchSupplier.get(); this.syncContext = syncContext; + this.metricRecorder = metricRecorder; this.callback = callback; this.channelz = channelz; this.callsTracer = callsTracer; @@ -265,6 +267,7 @@ private void startNewTransport() { .setAuthority(eagChannelAuthority != null ? eagChannelAuthority : authority) .setEagAttributes(currentEagAttributes) .setUserAgent(userAgent) + .setMetricRecorder(metricRecorder) .setHttpConnectProxiedSocketAddress(proxiedAddr); TransportLogger transportLogger = new TransportLogger(); // In case the transport logs in the constructor, use the subchannel logId diff --git a/core/src/main/java/io/grpc/internal/ServerImplBuilder.java b/core/src/main/java/io/grpc/internal/ServerImplBuilder.java index f6566e067db..62a0e66f314 100644 --- a/core/src/main/java/io/grpc/internal/ServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/ServerImplBuilder.java @@ -31,6 +31,9 @@ import io.grpc.HandlerRegistry; import io.grpc.InternalChannelz; import io.grpc.InternalConfiguratorRegistry; +import io.grpc.MetricInstrumentRegistry; +import io.grpc.MetricRecorder; +import io.grpc.MetricSink; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.ServerCallExecutorSupplier; @@ -80,6 +83,7 @@ public static ServerBuilder forPort(int port) { final List transportFilters = new ArrayList<>(); final List interceptors = new ArrayList<>(); private final List streamTracerFactories = new ArrayList<>(); + final List metricSinks = new ArrayList<>(); private final ClientTransportServersBuilder clientTransportServersBuilder; HandlerRegistry fallbackRegistry = DEFAULT_FALLBACK_REGISTRY; ObjectPool executorPool = DEFAULT_EXECUTOR_POOL; @@ -104,7 +108,8 @@ public static ServerBuilder forPort(int port) { */ public interface ClientTransportServersBuilder { InternalServer buildClientTransportServers( - List streamTracerFactories); + List streamTracerFactories, + MetricRecorder metricRecorder); } /** @@ -157,6 +162,15 @@ public ServerImplBuilder intercept(ServerInterceptor interceptor) { return this; } + /** + * Adds a MetricSink to the server. + */ + @Override + public ServerImplBuilder addMetricSink(MetricSink metricSink) { + metricSinks.add(checkNotNull(metricSink, "metricSink")); + return this; + } + @Override public ServerImplBuilder addStreamTracerFactory(ServerStreamTracer.Factory factory) { streamTracerFactories.add(checkNotNull(factory, "factory")); @@ -241,8 +255,11 @@ public void setDeadlineTicker(Deadline.Ticker ticker) { @Override public Server build() { + MetricRecorder metricRecorder = new MetricRecorderImpl(metricSinks, + MetricInstrumentRegistry.getDefaultRegistry()); return new ServerImpl(this, - clientTransportServersBuilder.buildClientTransportServers(getTracerFactories()), + clientTransportServersBuilder.buildClientTransportServers( + getTracerFactories(), metricRecorder), Context.ROOT); } diff --git a/core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java b/core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java index 7ad7f15f358..54c2d6ef8b1 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java @@ -18,10 +18,13 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; import io.grpc.InternalConfigurator; import io.grpc.InternalConfiguratorRegistry; import io.grpc.Metadata; +import io.grpc.MetricRecorder; +import io.grpc.MetricSink; import io.grpc.ServerBuilder; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; @@ -73,7 +76,8 @@ public void setUp() throws Exception { new ClientTransportServersBuilder() { @Override public InternalServer buildClientTransportServers( - List streamTracerFactories) { + List streamTracerFactories, + MetricRecorder metricRecorder) { throw new UnsupportedOperationException(); } }); @@ -128,6 +132,13 @@ public void getTracerFactories_disableBoth() { assertThat(factories).containsExactly(DUMMY_USER_TRACER); } + @Test + public void addMetricSink_addsToSinks() { + MetricSink mockSink = mock(MetricSink.class); + builder.addMetricSink(mockSink); + assertThat(builder.metricSinks).containsExactly(mockSink); + } + @Test public void getTracerFactories_callsGet() throws Exception { Class runnable = classLoader.loadClass(StaticTestingClassLoaderCallsGet.class.getName()); @@ -139,7 +150,7 @@ public static final class StaticTestingClassLoaderCallsGet implements Runnable { public void run() { ServerImplBuilder builder = new ServerImplBuilder( - streamTracerFactories -> { + (streamTracerFactories, metricRecorder) -> { throw new UnsupportedOperationException(); }); assertThat(builder.getTracerFactories()).hasSize(2); @@ -169,7 +180,7 @@ public void configureServerBuilder(ServerBuilder builder) { })); ServerImplBuilder builder = new ServerImplBuilder( - streamTracerFactories -> { + (streamTracerFactories, metricRecorder) -> { throw new UnsupportedOperationException(); }); assertThat(builder.getTracerFactories()).containsExactly(DUMMY_USER_TRACER); @@ -192,7 +203,7 @@ public void run() { InternalConfiguratorRegistry.setConfigurators(Collections.emptyList()); ServerImplBuilder builder = new ServerImplBuilder( - streamTracerFactories -> { + (streamTracerFactories, metricRecorder) -> { throw new UnsupportedOperationException(); }); assertThat(builder.getTracerFactories()).isEmpty(); diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 0f18efe078c..3405cb9bb0c 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -65,6 +65,7 @@ import io.grpc.InternalServerInterceptors; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.MetricRecorder; import io.grpc.ServerCall; import io.grpc.ServerCall.Listener; import io.grpc.ServerCallExecutorSupplier; @@ -206,7 +207,8 @@ public void startUp() throws IOException { new ClientTransportServersBuilder() { @Override public InternalServer buildClientTransportServers( - List streamTracerFactories) { + List streamTracerFactories, + MetricRecorder metricRecorder) { throw new UnsupportedOperationException(); } }); diff --git a/inprocess/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java b/inprocess/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java index 190f67603c3..b2004426aae 100644 --- a/inprocess/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java +++ b/inprocess/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java @@ -24,6 +24,7 @@ import io.grpc.ExperimentalApi; import io.grpc.ForwardingServerBuilder; import io.grpc.Internal; +import io.grpc.MetricRecorder; import io.grpc.ServerBuilder; import io.grpc.ServerStreamTracer; import io.grpc.internal.FixedObjectPool; @@ -120,7 +121,8 @@ private InProcessServerBuilder(SocketAddress listenAddress) { final class InProcessClientTransportServersBuilder implements ClientTransportServersBuilder { @Override public InternalServer buildClientTransportServers( - List streamTracerFactories) { + List streamTracerFactories, + MetricRecorder metricRecorder) { return buildTransportServers(streamTracerFactories); } } diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index 258aa15b005..e64f1065681 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -856,6 +856,7 @@ public void run() { localSocketPicker, channelLogger, useGetForSafeMethods, + options.getMetricRecorder(), Ticker.systemTicker()); return transport; } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 8ebf89842ad..5615f5ed75a 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -30,6 +30,7 @@ import io.grpc.InternalChannelz; import io.grpc.InternalStatus; import io.grpc.Metadata; +import io.grpc.MetricRecorder; import io.grpc.Status; import io.grpc.StatusException; import io.grpc.internal.ClientStreamListener.RpcProgress; @@ -123,6 +124,7 @@ class NettyClientHandler extends AbstractNettyHandler { private final Supplier stopwatchFactory; private final TransportTracer transportTracer; private final Attributes eagAttributes; + private final TcpMetrics.Tracker tcpMetrics; private final String authority; private final InUseStateAggregator inUseState = new InUseStateAggregator() { @@ -164,7 +166,8 @@ static NettyClientHandler newHandler( Attributes eagAttributes, String authority, ChannelLogger negotiationLogger, - Ticker ticker) { + Ticker ticker, + MetricRecorder metricRecorder) { Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize); Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder); @@ -194,7 +197,8 @@ static NettyClientHandler newHandler( eagAttributes, authority, negotiationLogger, - ticker); + ticker, + metricRecorder); } @VisibleForTesting @@ -214,7 +218,8 @@ static NettyClientHandler newHandler( Attributes eagAttributes, String authority, ChannelLogger negotiationLogger, - Ticker ticker) { + Ticker ticker, + MetricRecorder metricRecorder) { Preconditions.checkNotNull(connection, "connection"); Preconditions.checkNotNull(frameReader, "frameReader"); Preconditions.checkNotNull(lifecycleManager, "lifecycleManager"); @@ -269,7 +274,8 @@ static NettyClientHandler newHandler( pingCounter, ticker, maxHeaderListSize, - softLimitHeaderListSize); + softLimitHeaderListSize, + metricRecorder); } private NettyClientHandler( @@ -288,7 +294,8 @@ private NettyClientHandler( PingLimiter pingLimiter, Ticker ticker, int maxHeaderListSize, - int softLimitHeaderListSize) { + int softLimitHeaderListSize, + MetricRecorder metricRecorder) { super( /* channelUnused= */ null, decoder, @@ -350,6 +357,7 @@ public void onStreamClosed(Http2Stream stream) { } } }); + this.tcpMetrics = new TcpMetrics.Tracker(metricRecorder); } /** @@ -478,6 +486,7 @@ private void onRstStreamRead(int streamId, long errorCode) { @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + tcpMetrics.recordTcpInfo(ctx.channel()); logger.fine("Network channel being closed by the application."); if (ctx.channel().isActive()) { // Ignore notification that the socket was closed lifecycleManager.notifyShutdown( @@ -490,10 +499,17 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce /** * Handler for the Channel shutting down. */ + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + tcpMetrics.channelActive(ctx.channel()); + super.channelActive(ctx); + } + @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { try { logger.fine("Network channel is closed"); + tcpMetrics.channelInactive(ctx.channel()); Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason"); lifecycleManager.notifyShutdown(status, SimpleDisconnectError.UNKNOWN); final Status streamStatus; diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 53914b3c877..6585df42df3 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -34,6 +34,7 @@ import io.grpc.InternalLogId; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.MetricRecorder; import io.grpc.Status; import io.grpc.internal.ClientStream; import io.grpc.internal.ConnectionClientTransport; @@ -108,6 +109,7 @@ class NettyClientTransport implements ConnectionClientTransport, private final ChannelLogger channelLogger; private final boolean useGetForSafeMethods; private final Ticker ticker; + private final MetricRecorder metricRecorder; NettyClientTransport( @@ -132,6 +134,7 @@ class NettyClientTransport implements ConnectionClientTransport, LocalSocketPicker localSocketPicker, ChannelLogger channelLogger, boolean useGetForSafeMethods, + MetricRecorder metricRecorder, Ticker ticker) { this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator"); @@ -159,6 +162,7 @@ class NettyClientTransport implements ConnectionClientTransport, this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString()); this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger"); this.useGetForSafeMethods = useGetForSafeMethods; + this.metricRecorder = metricRecorder; this.ticker = Preconditions.checkNotNull(ticker, "ticker"); } @@ -251,7 +255,8 @@ public Runnable start(Listener transportListener) { eagAttributes, authorityString, channelLogger, - ticker); + ticker, + metricRecorder); ChannelHandler negotiationHandler = negotiator.newHandler(handler); diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index 1cf67ea25ca..2bb6b2c5921 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -31,6 +31,7 @@ import io.grpc.InternalInstrumented; import io.grpc.InternalLogId; import io.grpc.InternalWithLogId; +import io.grpc.MetricRecorder; import io.grpc.ServerStreamTracer; import io.grpc.internal.InternalServer; import io.grpc.internal.ObjectPool; @@ -93,6 +94,7 @@ class NettyServer implements InternalServer, InternalWithLogId { private final int maxMessageSize; private final int maxHeaderListSize; private final int softLimitHeaderListSize; + private MetricRecorder metricRecorder; private final long keepAliveTimeInNanos; private final long keepAliveTimeoutInNanos; private final long maxConnectionIdleInNanos; @@ -136,8 +138,10 @@ class NettyServer implements InternalServer, InternalWithLogId { long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos, int maxRstCount, long maxRstPeriodNanos, - Attributes eagAttributes, InternalChannelz channelz) { + Attributes eagAttributes, InternalChannelz channelz, + MetricRecorder metricRecorder) { this.addresses = checkNotNull(addresses, "addresses"); + this.metricRecorder = metricRecorder; this.channelFactory = checkNotNull(channelFactory, "channelFactory"); checkNotNull(channelOptions, "channelOptions"); this.channelOptions = new HashMap, Object>(channelOptions); @@ -174,6 +178,7 @@ class NettyServer implements InternalServer, InternalWithLogId { String.valueOf(addresses)); } + @Override public SocketAddress getListenSocketAddress() { Iterator it = channelGroup.iterator(); @@ -272,7 +277,8 @@ public void initChannel(Channel ch) { permitKeepAliveTimeInNanos, maxRstCount, maxRstPeriodNanos, - eagAttributes); + eagAttributes, + metricRecorder); ServerTransportListener transportListener; // This is to order callbacks on the listener, not to guard access to channel. synchronized (NettyServer.this) { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index eb3a6d9b538..21e1a063700 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -32,6 +32,7 @@ import io.grpc.ExperimentalApi; import io.grpc.ForwardingServerBuilder; import io.grpc.Internal; +import io.grpc.MetricSink; import io.grpc.ServerBuilder; import io.grpc.ServerCredentials; import io.grpc.ServerStreamTracer; @@ -164,8 +165,9 @@ public static NettyServerBuilder forAddress(SocketAddress address, ServerCredent private final class NettyClientTransportServersBuilder implements ClientTransportServersBuilder { @Override public InternalServer buildClientTransportServers( - List streamTracerFactories) { - return buildTransportServers(streamTracerFactories); + List streamTracerFactories, + io.grpc.MetricRecorder metricRecorder) { + return buildTransportServers(streamTracerFactories, metricRecorder); } } @@ -703,8 +705,10 @@ void eagAttributes(Attributes eagAttributes) { this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes"); } + @VisibleForTesting NettyServer buildTransportServers( - List streamTracerFactories) { + List streamTracerFactories, + io.grpc.MetricRecorder metricRecorder) { assertEventLoopsAndChannelType(); ProtocolNegotiator negotiator = protocolNegotiatorFactory.newNegotiator( @@ -737,7 +741,8 @@ NettyServer buildTransportServers( maxRstCount, maxRstPeriodNanos, eagAttributes, - this.serverImplBuilder.getChannelz()); + this.serverImplBuilder.getChannelz(), + metricRecorder); } @VisibleForTesting @@ -760,6 +765,13 @@ NettyServerBuilder setTransportTracerFactory(TransportTracer.Factory transportTr return this; } + @CanIgnoreReturnValue + @Override + public NettyServerBuilder addMetricSink(MetricSink metricSink) { + serverImplBuilder.addMetricSink(metricSink); + return this; + } + @CanIgnoreReturnValue @Override public NettyServerBuilder useTransportSecurity(File certChain, File privateKey) { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 036fde55e2c..53b0f3e0dfd 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -42,6 +42,7 @@ import io.grpc.InternalMetadata; import io.grpc.InternalStatus; import io.grpc.Metadata; +import io.grpc.MetricRecorder; import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.internal.GrpcUtil; @@ -127,6 +128,7 @@ class NettyServerHandler extends AbstractNettyHandler { private final Http2Connection.PropertyKey streamKey; private final ServerTransportListener transportListener; private final int maxMessageSize; + private final TcpMetrics.Tracker tcpMetrics; private final long keepAliveTimeInNanos; private final long keepAliveTimeoutInNanos; private final long maxConnectionAgeInNanos; @@ -174,7 +176,8 @@ static NettyServerHandler newHandler( long permitKeepAliveTimeInNanos, int maxRstCount, long maxRstPeriodNanos, - Attributes eagAttributes) { + Attributes eagAttributes, + MetricRecorder metricRecorder) { Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s", maxHeaderListSize); Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class); @@ -208,7 +211,8 @@ static NettyServerHandler newHandler( maxRstCount, maxRstPeriodNanos, eagAttributes, - Ticker.systemTicker()); + Ticker.systemTicker(), + metricRecorder); } static NettyServerHandler newHandler( @@ -234,7 +238,8 @@ static NettyServerHandler newHandler( int maxRstCount, long maxRstPeriodNanos, Attributes eagAttributes, - Ticker ticker) { + Ticker ticker, + MetricRecorder metricRecorder) { Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams); Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s", flowControlWindow); @@ -294,7 +299,8 @@ static NettyServerHandler newHandler( keepAliveEnforcer, autoFlowControl, rstStreamCounter, - eagAttributes, ticker); + eagAttributes, ticker, + metricRecorder); } private NettyServerHandler( @@ -318,7 +324,8 @@ private NettyServerHandler( boolean autoFlowControl, RstStreamCounter rstStreamCounter, Attributes eagAttributes, - Ticker ticker) { + Ticker ticker, + MetricRecorder metricRecorder) { super( channelUnused, decoder, @@ -362,6 +369,7 @@ public void onStreamClosed(Http2Stream stream) { checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize); this.maxMessageSize = maxMessageSize; + this.tcpMetrics = new TcpMetrics.Tracker(metricRecorder); this.keepAliveTimeInNanos = keepAliveTimeInNanos; this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; this.maxConnectionIdleManager = maxConnectionIdleManager; @@ -661,8 +669,15 @@ void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) { /** * Handler for the Channel shutting down. */ + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + tcpMetrics.channelActive(ctx.channel()); + super.channelActive(ctx); + } + @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { + tcpMetrics.channelInactive(ctx.channel()); try { if (keepAliveManager != null) { keepAliveManager.onTransportTermination(); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index 758ffeee5b1..c0e52b75876 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -25,6 +25,7 @@ import io.grpc.Attributes; import io.grpc.InternalChannelz.SocketStats; import io.grpc.InternalLogId; +import io.grpc.MetricRecorder; import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.internal.ServerTransport; @@ -81,6 +82,7 @@ class NettyServerTransport implements ServerTransport { private final int maxRstCount; private final long maxRstPeriodNanos; private final Attributes eagAttributes; + private final MetricRecorder metricRecorder; private final List streamTracerFactories; private final TransportTracer transportTracer; @@ -105,7 +107,8 @@ class NettyServerTransport implements ServerTransport { long permitKeepAliveTimeInNanos, int maxRstCount, long maxRstPeriodNanos, - Attributes eagAttributes) { + Attributes eagAttributes, + MetricRecorder metricRecorder) { this.channel = Preconditions.checkNotNull(channel, "channel"); this.channelUnused = channelUnused; this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator"); @@ -128,6 +131,7 @@ class NettyServerTransport implements ServerTransport { this.maxRstCount = maxRstCount; this.maxRstPeriodNanos = maxRstPeriodNanos; this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes"); + this.metricRecorder = metricRecorder; SocketAddress remote = channel.remoteAddress(); this.logId = InternalLogId.allocate(getClass(), remote != null ? remote.toString() : null); } @@ -289,6 +293,7 @@ private NettyServerHandler createHandler( permitKeepAliveTimeInNanos, maxRstCount, maxRstPeriodNanos, - eagAttributes); + eagAttributes, + metricRecorder); } } diff --git a/netty/src/main/java/io/grpc/netty/TcpMetrics.java b/netty/src/main/java/io/grpc/netty/TcpMetrics.java new file mode 100644 index 00000000000..0123c774771 --- /dev/null +++ b/netty/src/main/java/io/grpc/netty/TcpMetrics.java @@ -0,0 +1,231 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.netty; + +import io.grpc.InternalTcpMetrics; +import io.grpc.MetricRecorder; +import io.netty.channel.Channel; +import io.netty.util.concurrent.ScheduledFuture; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Utility for collecting TCP metrics from Netty channels. + */ +final class TcpMetrics { + private static final Logger log = Logger.getLogger(TcpMetrics.class.getName()); + + static EpollInfo epollInfo = loadEpollInfo(); + + static final class EpollInfo { + final Class channelClass; + final Class infoClass; + final java.lang.reflect.Constructor infoConstructor; + final Method tcpInfo; + final Method totalRetrans; + final Method retransmits; + final Method rtt; + + EpollInfo( + Class channelClass, + Class infoClass, + java.lang.reflect.Constructor infoConstructor, + Method tcpInfo, + Method totalRetrans, + Method retransmits, + Method rtt) { + this.channelClass = channelClass; + this.infoClass = infoClass; + this.infoConstructor = infoConstructor; + this.tcpInfo = tcpInfo; + this.totalRetrans = totalRetrans; + this.retransmits = retransmits; + this.rtt = rtt; + } + } + + private static EpollInfo loadEpollInfo() { + boolean epollAvailable = false; + try { + Class epollClass = Class.forName("io.netty.channel.epoll.Epoll"); + Method isAvailableMethod = epollClass.getDeclaredMethod("isAvailable"); + epollAvailable = (Boolean) isAvailableMethod.invoke(null); + if (epollAvailable) { + Class channelClass = Class.forName("io.netty.channel.epoll.EpollSocketChannel"); + Class infoClass = Class.forName("io.netty.channel.epoll.EpollTcpInfo"); + return new EpollInfo( + channelClass, + infoClass, + infoClass.getDeclaredConstructor(), + channelClass.getMethod("tcpInfo", infoClass), + infoClass.getMethod("totalRetrans"), + infoClass.getMethod("retrans"), + infoClass.getMethod("rtt")); + } + } catch (ReflectiveOperationException | RuntimeException e) { + log.log(Level.FINE, "Failed to initialize Epoll tcp_info reflection", e); + } catch (Error e) { + log.log(Level.FINE, "Failed to load native Epoll library", e); + } finally { + log.log(Level.INFO, "Epoll available during static init of TcpMetrics:" + + "{0}", epollAvailable); + } + return null; + } + + static final class Tracker { + private final MetricRecorder metricRecorder; + private final Object tcpInfo; + + private long lastTotalRetrans = 0; + + Tracker(MetricRecorder metricRecorder) { + this.metricRecorder = metricRecorder; + + Object tcpInfo = null; + if (epollInfo != null) { + try { + tcpInfo = epollInfo.infoConstructor.newInstance(); + } catch (ReflectiveOperationException e) { + log.log(Level.FINE, "Failed to instantiate EpollTcpInfo", e); + } + } + this.tcpInfo = tcpInfo; + } + + private static final long RECORD_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(5); + private ScheduledFuture reportTimer; + + void channelActive(Channel channel) { + List labelValues = getLabelValues(channel); + metricRecorder.addLongCounter(InternalTcpMetrics.CONNECTIONS_CREATED_INSTRUMENT, 1, + Collections.emptyList(), labelValues); + metricRecorder.addLongUpDownCounter(InternalTcpMetrics.CONNECTION_COUNT_INSTRUMENT, 1, + Collections.emptyList(), labelValues); + scheduleNextReport(channel, true); + } + + private void scheduleNextReport(final Channel channel, boolean isInitial) { + if (!channel.isActive()) { + return; + } + + double jitter = isInitial + ? 0.1 + ThreadLocalRandom.current().nextDouble() // 10% to 110% + : 0.9 + ThreadLocalRandom.current().nextDouble() * 0.2; // 90% to 110% + long rearmingDelay = (long) (RECORD_INTERVAL_MILLIS * jitter); + + reportTimer = channel.eventLoop().schedule(() -> { + if (channel.isActive()) { + Tracker.this.recordTcpInfo(channel, false); + scheduleNextReport(channel, false); // Re-arm + } + }, rearmingDelay, TimeUnit.MILLISECONDS); + } + + void channelInactive(Channel channel) { + if (reportTimer != null) { + reportTimer.cancel(false); + } + List labelValues = getLabelValues(channel); + metricRecorder.addLongUpDownCounter(InternalTcpMetrics.CONNECTION_COUNT_INSTRUMENT, -1, + Collections.emptyList(), labelValues); + // Final collection on close + recordTcpInfo(channel, true); + } + + void recordTcpInfo(Channel channel) { + recordTcpInfo(channel, false); + } + + private void recordTcpInfo(Channel channel, boolean isClose) { + if (epollInfo == null) { + log.log(Level.FINE, "Skipping recordTcpInfo because" + + "epollInfo is null"); + return; + } + if (!epollInfo.channelClass.isInstance(channel)) { + log.log(Level.FINE, "Skipping recordTcpInfo because channel is not an" + + "instance of epollSocketChannelClass: {0}", + channel.getClass() + .getName()); + return; + } + List labelValues = getLabelValues(channel); + long totalRetrans; + long retransmits; + long rtt; + try { + epollInfo.tcpInfo.invoke(channel, tcpInfo); + totalRetrans = (Long) epollInfo.totalRetrans.invoke(tcpInfo); + retransmits = (Long) epollInfo.retransmits.invoke(tcpInfo); + rtt = (Long) epollInfo.rtt.invoke(tcpInfo); + } catch (ReflectiveOperationException e) { + log.log(Level.FINE, "Error computing TCP metrics", e); + return; + } + + long deltaTotal = totalRetrans - lastTotalRetrans; + if (deltaTotal > 0) { + metricRecorder.addLongCounter(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT, + deltaTotal, Collections.emptyList(), labelValues); + lastTotalRetrans = totalRetrans; + } + if (isClose && retransmits > 0) { + metricRecorder.addLongCounter(InternalTcpMetrics.RECURRING_RETRANSMITS_INSTRUMENT, + retransmits, Collections.emptyList(), labelValues); + } + metricRecorder.recordDoubleHistogram(InternalTcpMetrics.MIN_RTT_INSTRUMENT, + rtt / 1000000.0, // Convert microseconds to seconds + Collections.emptyList(), labelValues); + } + } + + private static List getLabelValues(Channel channel) { + String localAddress = ""; + String localPort = ""; + String peerAddress = ""; + String peerPort = ""; + + SocketAddress local = channel.localAddress(); + if (local instanceof InetSocketAddress) { + InetSocketAddress inetLocal = (InetSocketAddress) local; + localAddress = inetLocal.getAddress().getHostAddress(); + localPort = String.valueOf(inetLocal.getPort()); + } + + SocketAddress remote = channel.remoteAddress(); + if (remote instanceof InetSocketAddress) { + InetSocketAddress inetRemote = (InetSocketAddress) remote; + peerAddress = inetRemote.getAddress().getHostAddress(); + peerPort = String.valueOf(inetRemote.getPort()); + } + + return Arrays.asList(localAddress, localPort, peerAddress, peerPort); + } + + private TcpMetrics() { + } +} diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index 53598727efd..9f6be9a2f3e 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -57,6 +57,7 @@ import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.Metadata; +import io.grpc.MetricRecorder; import io.grpc.Status; import io.grpc.internal.AbstractStream; import io.grpc.internal.ClientStreamListener; @@ -1165,7 +1166,8 @@ public Stopwatch get() { Attributes.EMPTY, "someauthority", null, - fakeClock().getTicker()); + fakeClock().getTicker(), + new MetricRecorder() {}); } @Override diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index db44c8f50fd..7023acc947c 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -56,6 +56,7 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.Marshaller; +import io.grpc.MetricRecorder; import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.Status.Code; @@ -228,30 +229,31 @@ public void setSoLingerChannelOption() throws IOException, GeneralSecurityExcept // set SO_LINGER option int soLinger = 123; channelOptions.put(ChannelOption.SO_LINGER, soLinger); - NettyClientTransport transport = - new NettyClientTransport( - address, - new ReflectiveChannelFactory<>(NioSocketChannel.class), - channelOptions, - group, - newNegotiator(), - false, - DEFAULT_WINDOW_SIZE, - DEFAULT_MAX_MESSAGE_SIZE, - GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, - GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, - KEEPALIVE_TIME_NANOS_DISABLED, - 1L, - false, - authority, - null /* user agent */, - tooManyPingsRunnable, - new TransportTracer(), - Attributes.EMPTY, - new SocketPicker(), - new FakeChannelLogger(), - false, - Ticker.systemTicker()); + NettyClientTransport transport = new NettyClientTransport( + address, + new ReflectiveChannelFactory<>(NioSocketChannel.class), + channelOptions, + group, + newNegotiator(), + false, + DEFAULT_WINDOW_SIZE, + DEFAULT_MAX_MESSAGE_SIZE, + GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, + GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, + KEEPALIVE_TIME_NANOS_DISABLED, + 1L, + false, + authority, + null /* user agent */, + tooManyPingsRunnable, + new TransportTracer(), + Attributes.EMPTY, + new SocketPicker(), + new FakeChannelLogger(), + false, + new MetricRecorder() { + }, + Ticker.systemTicker()); transports.add(transport); callMeMaybe(transport.start(clientTransportListener)); @@ -503,30 +505,31 @@ private static class CantConstructChannelError extends Error {} public void failingToConstructChannelShouldFailGracefully() throws Exception { address = TestUtils.testServerAddress(new InetSocketAddress(12345)); authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); - NettyClientTransport transport = - new NettyClientTransport( - address, - new ReflectiveChannelFactory<>(CantConstructChannel.class), - new HashMap, Object>(), - group, - newNegotiator(), - false, - DEFAULT_WINDOW_SIZE, - DEFAULT_MAX_MESSAGE_SIZE, - GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, - GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, - KEEPALIVE_TIME_NANOS_DISABLED, - 1, - false, - authority, - null, - tooManyPingsRunnable, - new TransportTracer(), - Attributes.EMPTY, - new SocketPicker(), - new FakeChannelLogger(), - false, - Ticker.systemTicker()); + NettyClientTransport transport = new NettyClientTransport( + address, + new ReflectiveChannelFactory<>(CantConstructChannel.class), + new HashMap, Object>(), + group, + newNegotiator(), + false, + DEFAULT_WINDOW_SIZE, + DEFAULT_MAX_MESSAGE_SIZE, + GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, + GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, + KEEPALIVE_TIME_NANOS_DISABLED, + 1, + false, + authority, + null, + tooManyPingsRunnable, + new TransportTracer(), + Attributes.EMPTY, + new SocketPicker(), + new FakeChannelLogger(), + false, + new MetricRecorder() { + }, + Ticker.systemTicker()); transports.add(transport); // Should not throw @@ -989,7 +992,7 @@ public void authorityOverrideInCallOptions_matchesServerPeerHost_newStreamCreati new Rpc(transport, new Metadata(), "foo.test.google.fr").waitForResponse(); } finally { - NettyClientHandler.enablePerRpcAuthorityCheck = false;; + NettyClientHandler.enablePerRpcAuthorityCheck = false; } } @@ -1012,7 +1015,7 @@ public void authorityOverrideInCallOptions_portNumberInAuthority_isStrippedForPe new Rpc(transport, new Metadata(), "foo.test.google.fr:12345").waitForResponse(); } finally { - NettyClientHandler.enablePerRpcAuthorityCheck = false;; + NettyClientHandler.enablePerRpcAuthorityCheck = false; } } @@ -1046,7 +1049,7 @@ public void authorityOverrideInCallOptions_portNumberAndIpv6_isStrippedForPeerVe "No subject alternative names matching IP address 2001:db8:3333:4444:5555:6666:1.2.3.4 " + "found"); } finally { - NettyClientHandler.enablePerRpcAuthorityCheck = false;; + NettyClientHandler.enablePerRpcAuthorityCheck = false; } } @@ -1125,30 +1128,31 @@ private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int max if (!enableKeepAlive) { keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED; } - NettyClientTransport transport = - new NettyClientTransport( - address, - channelFactory, - new HashMap, Object>(), - group, - negotiator, - false, - DEFAULT_WINDOW_SIZE, - maxMsgSize, - maxHeaderListSize, - maxHeaderListSize, - keepAliveTimeNano, - keepAliveTimeoutNano, - false, - authority, - userAgent, - tooManyPingsRunnable, - new TransportTracer(), - eagAttributes, - new SocketPicker(), - new FakeChannelLogger(), - false, - Ticker.systemTicker()); + NettyClientTransport transport = new NettyClientTransport( + address, + channelFactory, + new HashMap, Object>(), + group, + negotiator, + false, + DEFAULT_WINDOW_SIZE, + maxMsgSize, + maxHeaderListSize, + maxHeaderListSize, + keepAliveTimeNano, + keepAliveTimeoutNano, + false, + authority, + userAgent, + tooManyPingsRunnable, + new TransportTracer(), + eagAttributes, + new SocketPicker(), + new FakeChannelLogger(), + false, + new MetricRecorder() { + }, + Ticker.systemTicker()); transports.add(transport); return transport; } @@ -1167,35 +1171,35 @@ private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) thr private void startServer(int maxStreamsPerConnection, int maxHeaderListSize, ServerListener serverListener) throws IOException { - server = - new NettyServer( - TestUtils.testServerAddresses(new InetSocketAddress(0)), - new ReflectiveChannelFactory<>(NioServerSocketChannel.class), - new HashMap, Object>(), - new HashMap, Object>(), - new FixedObjectPool<>(group), - new FixedObjectPool<>(group), - false, - negotiator, - Collections.emptyList(), - TransportTracer.getDefaultFactory(), - maxStreamsPerConnection, - false, - DEFAULT_WINDOW_SIZE, - DEFAULT_MAX_MESSAGE_SIZE, - maxHeaderListSize, - maxHeaderListSize, - DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, - DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS, - MAX_CONNECTION_IDLE_NANOS_DISABLED, - MAX_CONNECTION_AGE_NANOS_DISABLED, - MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, - true, - 0, - MAX_RST_COUNT_DISABLED, - 0, - Attributes.EMPTY, - channelz); + server = new NettyServer( + TestUtils.testServerAddresses(new InetSocketAddress(0)), + new ReflectiveChannelFactory<>(NioServerSocketChannel.class), + new HashMap, Object>(), + new HashMap, Object>(), + new FixedObjectPool<>(group), + new FixedObjectPool<>(group), + false, + negotiator, + Collections.emptyList(), + TransportTracer.getDefaultFactory(), + maxStreamsPerConnection, + false, + DEFAULT_WINDOW_SIZE, + DEFAULT_MAX_MESSAGE_SIZE, + maxHeaderListSize, + maxHeaderListSize, + DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, + DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS, + MAX_CONNECTION_IDLE_NANOS_DISABLED, + MAX_CONNECTION_AGE_NANOS_DISABLED, + MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, + true, + 0, + MAX_RST_COUNT_DISABLED, + 0, + Attributes.EMPTY, + channelz, + new MetricRecorder() {}); server.start(serverListener); address = TestUtils.testServerAddress((InetSocketAddress) server.getListenSocketAddress()); authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); diff --git a/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java b/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java index 797cfa95c0e..f3b73a515b5 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java @@ -22,7 +22,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; -import io.grpc.ServerStreamTracer; +import io.grpc.MetricRecorder; import io.netty.channel.EventLoopGroup; import io.netty.channel.local.LocalServerChannel; import io.netty.handler.ssl.SslContext; @@ -43,8 +43,9 @@ public class NettyServerBuilderTest { @Test public void addMultipleListenAddresses() { builder.addListenAddress(new InetSocketAddress(8081)); - NettyServer server = - builder.buildTransportServers(ImmutableList.of()); + NettyServer server = builder.buildTransportServers( + ImmutableList.of(), + new MetricRecorder() {}); assertThat(server.getListenSocketAddresses()).hasSize(2); } @@ -189,4 +190,5 @@ public void useNioTransport_shouldNotThrow() { builder.assertEventLoopsAndChannelType(); } + } diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index 0d5a9bab176..2c0ab21cb56 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -59,6 +59,7 @@ import io.grpc.Attributes; import io.grpc.InternalStatus; import io.grpc.Metadata; +import io.grpc.MetricRecorder; import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.Status.Code; @@ -129,6 +130,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase streamListenerMessageQueue = new LinkedList<>(); @@ -205,6 +207,20 @@ protected void manualSetUp() throws Exception { channel().releaseOutbound(); } + @Test + public void tcpMetrics_recorded() throws Exception { + manualSetUp(); + handler().channelActive(ctx()); + // Verify that channelActive triggered TcpMetrics + ArgumentCaptor countCaptor = ArgumentCaptor.forClass(Long.class); + verify(metricRecorder, atLeastOnce()).addLongCounter( + eq(io.grpc.InternalTcpMetrics.CONNECTIONS_CREATED_INSTRUMENT), + countCaptor.capture(), + any(), + any()); + assertEquals(1L, countCaptor.getValue().longValue()); + } + @Test public void transportReadyDelayedUntilConnectionPreface() throws Exception { initChannel(new GrpcHttp2ServerHeadersDecoder(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE)); @@ -1416,7 +1432,8 @@ protected NettyServerHandler newHandler() { maxRstCount, maxRstPeriodNanos, Attributes.EMPTY, - fakeClock().getTicker()); + fakeClock().getTicker(), + metricRecorder); } @Override diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java index f9bda4c5af1..61c3f9e219e 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java @@ -37,6 +37,7 @@ import io.grpc.InternalChannelz.SocketStats; import io.grpc.InternalInstrumented; import io.grpc.Metadata; +import io.grpc.MetricRecorder; import io.grpc.ServerStreamTracer; import io.grpc.internal.FixedObjectPool; import io.grpc.internal.ServerListener; @@ -161,7 +162,7 @@ class NoHandlerProtocolNegotiator implements ProtocolNegotiator { 0, 0, // ignore Attributes.EMPTY, - channelz); + channelz, mock(MetricRecorder.class)); final SettableFuture serverShutdownCalled = SettableFuture.create(); ns.start(new ServerListener() { @Override @@ -218,7 +219,7 @@ public void multiPortStartStopGet() throws Exception { 0, 0, // ignore Attributes.EMPTY, - channelz); + channelz, mock(MetricRecorder.class)); final SettableFuture shutdownCompleted = SettableFuture.create(); ns.start(new ServerListener() { @Override @@ -298,7 +299,7 @@ public void multiPortConnections() throws Exception { 0, 0, // ignore Attributes.EMPTY, - channelz); + channelz, mock(MetricRecorder.class)); final SettableFuture shutdownCompleted = SettableFuture.create(); ns.start(new ServerListener() { @Override @@ -366,7 +367,7 @@ public void getPort_notStarted() { 0, 0, // ignore Attributes.EMPTY, - channelz); + channelz, mock(MetricRecorder.class)); assertThat(ns.getListenSocketAddress()).isEqualTo(addr); assertThat(ns.getListenSocketAddresses()).isEqualTo(addresses); @@ -447,7 +448,7 @@ class TestProtocolNegotiator implements ProtocolNegotiator { 0, 0, // ignore eagAttributes, - channelz); + channelz, mock(MetricRecorder.class)); ns.start(new ServerListener() { @Override public ServerTransportListener transportCreated(ServerTransport transport) { @@ -501,7 +502,7 @@ public void channelzListenSocket() throws Exception { 0, 0, // ignore Attributes.EMPTY, - channelz); + channelz, mock(MetricRecorder.class)); final SettableFuture shutdownCompleted = SettableFuture.create(); ns.start(new ServerListener() { @Override @@ -649,7 +650,7 @@ private NettyServer getServer(List addr, EventLoopGroup ev) { 0, 0, // ignore Attributes.EMPTY, - channelz); + channelz, mock(MetricRecorder.class)); } private static class NoopServerTransportListener implements ServerTransportListener { diff --git a/netty/src/test/java/io/grpc/netty/NettyTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyTransportTest.java index b779dfbe980..22758a8b727 100644 --- a/netty/src/test/java/io/grpc/netty/NettyTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyTransportTest.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.SettableFuture; import io.grpc.Attributes; import io.grpc.ChannelLogger; +import io.grpc.MetricRecorder; import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.internal.AbstractTransportTest; @@ -71,7 +72,7 @@ protected InternalServer newServer( .forAddress(new InetSocketAddress("localhost", 0)) .flowControlWindow(AbstractTransportTest.TEST_FLOW_CONTROL_WINDOW) .setTransportTracerFactory(fakeClockTransportTracer) - .buildTransportServers(streamTracerFactories); + .buildTransportServers(streamTracerFactories, new MetricRecorder() {}); } @Override @@ -81,7 +82,7 @@ protected InternalServer newServer( .forAddress(new InetSocketAddress("localhost", port)) .flowControlWindow(AbstractTransportTest.TEST_FLOW_CONTROL_WINDOW) .setTransportTracerFactory(fakeClockTransportTracer) - .buildTransportServers(streamTracerFactories); + .buildTransportServers(streamTracerFactories, new MetricRecorder() {}); } @Override diff --git a/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java b/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java index 80438532172..403b1b64329 100644 --- a/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java +++ b/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java @@ -46,6 +46,7 @@ import io.grpc.InternalChannelz; import io.grpc.InternalChannelz.Security; import io.grpc.Metadata; +import io.grpc.MetricRecorder; import io.grpc.SecurityLevel; import io.grpc.ServerCredentials; import io.grpc.ServerStreamTracer; @@ -389,7 +390,9 @@ private Object expectHandshake( .buildTransportFactory(); InternalServer server = NettyServerBuilder .forPort(0, serverCreds) - .buildTransportServers(Collections.emptyList()); + .buildTransportServers( + Collections.emptyList(), + new MetricRecorder() {}); server.start(serverListener); ManagedClientTransport.Listener clientTransportListener = diff --git a/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java b/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java new file mode 100644 index 00000000000..874025369ee --- /dev/null +++ b/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java @@ -0,0 +1,579 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.netty; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.grpc.InternalTcpMetrics; +import io.grpc.MetricRecorder; +import io.netty.channel.Channel; +import io.netty.channel.EventLoop; +import io.netty.util.concurrent.ScheduledFuture; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class TcpMetricsTest { + + @Rule + public final MockitoRule mocks = MockitoJUnit.rule(); + + @Mock + private MetricRecorder metricRecorder; + @Mock + private Channel channel; + @Mock + private EventLoop eventLoop; + @Mock + private ScheduledFuture scheduledFuture; + + private TcpMetrics.Tracker metrics; + + @Before + public void setUp() throws Exception { + when(channel.eventLoop()).thenReturn(eventLoop); + when(eventLoop.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + .thenAnswer(invocation -> scheduledFuture); + metrics = new TcpMetrics.Tracker(metricRecorder); + } + + @Test + public void metricsInitialization() throws Exception { + + org.junit.Assert.assertNotNull(InternalTcpMetrics.CONNECTIONS_CREATED_INSTRUMENT); + org.junit.Assert.assertNotNull(InternalTcpMetrics.CONNECTION_COUNT_INSTRUMENT); + org.junit.Assert.assertNotNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT); + org.junit.Assert.assertNotNull(InternalTcpMetrics.RECURRING_RETRANSMITS_INSTRUMENT); + org.junit.Assert.assertNotNull(InternalTcpMetrics.MIN_RTT_INSTRUMENT); + } + + public static class FakeEpollTcpInfo { + long totalRetrans; + long retransmits; + long rtt; + + public void setValues(long totalRetrans, long retransmits, long rtt) { + this.totalRetrans = totalRetrans; + this.retransmits = retransmits; + this.rtt = rtt; + } + + @SuppressWarnings("unused") + public long totalRetrans() { + return totalRetrans; + } + + @SuppressWarnings("unused") + public long retrans() { + return retransmits; + } + + @SuppressWarnings("unused") + public long rtt() { + return rtt; + } + } + + @Test + public void tracker_recordTcpInfo_reflectionSuccess() throws Exception { + MetricRecorder recorder = mock(MetricRecorder.class); + TcpMetrics.epollInfo = new TcpMetrics.EpollInfo( + ConfigurableFakeWithTcpInfo.class, FakeEpollTcpInfo.class, + FakeEpollTcpInfo.class.getConstructor(), + ConfigurableFakeWithTcpInfo.class.getMethod("tcpInfo", FakeEpollTcpInfo.class), + FakeEpollTcpInfo.class.getMethod("totalRetrans"), + FakeEpollTcpInfo.class.getMethod("retrans"), + FakeEpollTcpInfo.class.getMethod("rtt")); + TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder); + + FakeEpollTcpInfo infoSource = new FakeEpollTcpInfo(); + infoSource.setValues(123, 4, 5000); + ConfigurableFakeWithTcpInfo channel = new ConfigurableFakeWithTcpInfo(infoSource); + channel.writeInbound("dummy"); + + tracker.channelInactive(channel); + + verify(recorder).addLongCounter( + eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)), + eq(123L), any(), any()); + verify(recorder).addLongCounter( + eq(Objects.requireNonNull(InternalTcpMetrics.RECURRING_RETRANSMITS_INSTRUMENT)), + eq(4L), any(), any()); + verify(recorder).recordDoubleHistogram( + eq(Objects.requireNonNull(InternalTcpMetrics.MIN_RTT_INSTRUMENT)), + eq(0.005), any(), any()); + } + + @Test + public void tracker_periodicRecord_doesNotRecordRecurringRetransmits() throws Exception { + MetricRecorder recorder = mock(MetricRecorder.class); + TcpMetrics.epollInfo = new TcpMetrics.EpollInfo( + ConfigurableFakeWithTcpInfo.class, FakeEpollTcpInfo.class, + FakeEpollTcpInfo.class.getConstructor(), + ConfigurableFakeWithTcpInfo.class.getMethod("tcpInfo", FakeEpollTcpInfo.class), + FakeEpollTcpInfo.class.getMethod("totalRetrans"), + FakeEpollTcpInfo.class.getMethod("retrans"), + FakeEpollTcpInfo.class.getMethod("rtt")); + TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder); + + FakeEpollTcpInfo infoSource = new FakeEpollTcpInfo(); + infoSource.setValues(123, 4, 5000); + ConfigurableFakeWithTcpInfo channel = org.mockito.Mockito.spy( + new ConfigurableFakeWithTcpInfo(infoSource)); + when(channel.eventLoop()).thenReturn(eventLoop); + when(channel.isActive()).thenReturn(true); + + tracker.channelActive(channel); + + ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(eventLoop).schedule(runnableCaptor.capture(), anyLong(), eq(TimeUnit.MILLISECONDS)); + Runnable periodicTask = runnableCaptor.getValue(); + + org.mockito.Mockito.clearInvocations(recorder); + periodicTask.run(); + + verify(recorder).addLongCounter( + eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)), + eq(123L), any(), any()); + verify(recorder).recordDoubleHistogram( + eq(Objects.requireNonNull(InternalTcpMetrics.MIN_RTT_INSTRUMENT)), + eq(0.005), any(), any()); + // Should NOT record recurring retransmits during periodic polling + verify(recorder, org.mockito.Mockito.never()) + .addLongCounter( + eq(Objects.requireNonNull(InternalTcpMetrics.RECURRING_RETRANSMITS_INSTRUMENT)), + anyLong(), any(), any()); + } + + @Test + public void tracker_channelInactive_recordsRecurringRetransmits_raw_notDelta() throws Exception { + MetricRecorder recorder = mock(MetricRecorder.class); + TcpMetrics.epollInfo = new TcpMetrics.EpollInfo( + ConfigurableFakeWithTcpInfo.class, FakeEpollTcpInfo.class, + FakeEpollTcpInfo.class.getConstructor(), + ConfigurableFakeWithTcpInfo.class.getMethod("tcpInfo", FakeEpollTcpInfo.class), + FakeEpollTcpInfo.class.getMethod("totalRetrans"), + FakeEpollTcpInfo.class.getMethod("retrans"), + FakeEpollTcpInfo.class.getMethod("rtt")); + TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder); + + FakeEpollTcpInfo infoSource = new FakeEpollTcpInfo(); + infoSource.setValues(123, 4, 5000); + ConfigurableFakeWithTcpInfo channel = org.mockito.Mockito.spy( + new ConfigurableFakeWithTcpInfo(infoSource)); + when(channel.eventLoop()).thenReturn(eventLoop); + when(channel.isActive()).thenReturn(true); + + // Mimic the periodic schedule invocation + tracker.channelActive(channel); + ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(eventLoop).schedule(runnableCaptor.capture(), anyLong(), eq(TimeUnit.MILLISECONDS)); + + // Fire periodic task once. TotalRetrans=123, retransmits=4. + runnableCaptor.getValue().run(); + + org.mockito.Mockito.clearInvocations(recorder); + + // Let's just create a new channel instance where tcpInfo sets retrans=5. + FakeEpollTcpInfo infoSource2 = new FakeEpollTcpInfo(); + infoSource2.setValues(130, 5, 5000); + ConfigurableFakeWithTcpInfo channel2 = org.mockito.Mockito.spy( + new ConfigurableFakeWithTcpInfo(infoSource2)); + when(channel2.eventLoop()).thenReturn(eventLoop); + + tracker.channelInactive(channel2); + + // It should record delta for totalRetrans (130 - 123 = 7) + verify(recorder).addLongCounter( + eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)), + eq(7L), any(), any()); + // But for recurringRetransmits it MUST record the raw value 5, not the delta! + verify(recorder).addLongCounter( + eq(Objects.requireNonNull(InternalTcpMetrics.RECURRING_RETRANSMITS_INSTRUMENT)), + eq(5L), any(), any()); + } + + @Test + public void tracker_periodicRecord_reportsDeltaForTotalRetrans() throws Exception { + MetricRecorder recorder = mock(MetricRecorder.class); + TcpMetrics.epollInfo = new TcpMetrics.EpollInfo( + ConfigurableFakeWithTcpInfo.class, FakeEpollTcpInfo.class, + FakeEpollTcpInfo.class.getConstructor(), + ConfigurableFakeWithTcpInfo.class.getMethod("tcpInfo", FakeEpollTcpInfo.class), + FakeEpollTcpInfo.class.getMethod("totalRetrans"), + FakeEpollTcpInfo.class.getMethod("retrans"), + FakeEpollTcpInfo.class.getMethod("rtt")); + TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder); + + FakeEpollTcpInfo infoSource = new FakeEpollTcpInfo(); + infoSource.setValues(123, 4, 5000); + ConfigurableFakeWithTcpInfo channel = org.mockito.Mockito.spy( + new ConfigurableFakeWithTcpInfo(infoSource)); + when(channel.eventLoop()).thenReturn(eventLoop); + when(channel.isActive()).thenReturn(true); + + // Initial Active Trigger + tracker.channelActive(channel); + ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(eventLoop).schedule(runnableCaptor.capture(), anyLong(), eq(TimeUnit.MILLISECONDS)); + Runnable periodicTask = runnableCaptor.getValue(); + + // First periodic record + org.mockito.Mockito.clearInvocations(recorder); + periodicTask.run(); + verify(recorder).addLongCounter( + eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)), + eq(123L), any(), any()); + + // Change tcpInfo for second periodic record + org.mockito.Mockito.doAnswer(invocation -> { + FakeEpollTcpInfo info = invocation.getArgument(0); + info.totalRetrans = 150; + info.retransmits = 2; // Should not be recorded + info.rtt = 6000; + return null; + }).when(channel).tcpInfo(any(FakeEpollTcpInfo.class)); + + org.mockito.Mockito.clearInvocations(recorder); + periodicTask.run(); + + // Only the delta (150 - 123 = 27) should be recorded + verify(recorder).addLongCounter( + eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)), + eq(27L), any(), any()); + verify(recorder).recordDoubleHistogram( + eq(Objects.requireNonNull(InternalTcpMetrics.MIN_RTT_INSTRUMENT)), + eq(0.006), any(), any()); + verify(recorder, org.mockito.Mockito.never()) + .addLongCounter( + eq(Objects.requireNonNull(InternalTcpMetrics.RECURRING_RETRANSMITS_INSTRUMENT)), + anyLong(), any(), any()); + } + + @Test + public void tracker_periodicRecord_doesNotReportZeroDeltaForTotalRetrans() throws Exception { + MetricRecorder recorder = mock(MetricRecorder.class); + TcpMetrics.epollInfo = new TcpMetrics.EpollInfo( + ConfigurableFakeWithTcpInfo.class, FakeEpollTcpInfo.class, + FakeEpollTcpInfo.class.getConstructor(), + ConfigurableFakeWithTcpInfo.class.getMethod("tcpInfo", FakeEpollTcpInfo.class), + FakeEpollTcpInfo.class.getMethod("totalRetrans"), + FakeEpollTcpInfo.class.getMethod("retrans"), + FakeEpollTcpInfo.class.getMethod("rtt")); + TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder); + + FakeEpollTcpInfo infoSource = new FakeEpollTcpInfo(); + infoSource.setValues(123, 4, 5000); + ConfigurableFakeWithTcpInfo channel = org.mockito.Mockito.spy( + new ConfigurableFakeWithTcpInfo(infoSource)); + when(channel.eventLoop()).thenReturn(eventLoop); + when(channel.isActive()).thenReturn(true); + + // Initial Active Trigger + tracker.channelActive(channel); + ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(eventLoop).schedule(runnableCaptor.capture(), anyLong(), eq(TimeUnit.MILLISECONDS)); + Runnable periodicTask = runnableCaptor.getValue(); + + // First periodic record + periodicTask.run(); + org.mockito.Mockito.clearInvocations(recorder); + + // Keep tcpInfo the same for second periodic record + periodicTask.run(); + + // NO delta (123 - 123 = 0), so it should not be recorded + verify(recorder, org.mockito.Mockito.never()) + .addLongCounter( + eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)), + anyLong(), any(), any()); + verify(recorder).recordDoubleHistogram( + eq(Objects.requireNonNull(InternalTcpMetrics.MIN_RTT_INSTRUMENT)), + eq(0.005), any(), any()); + } + + public static class ConfigurableFakeWithTcpInfo extends + io.netty.channel.embedded.EmbeddedChannel { + private final FakeEpollTcpInfo infoToCopy; + + public ConfigurableFakeWithTcpInfo(FakeEpollTcpInfo infoToCopy) { + this.infoToCopy = infoToCopy; + } + + public void tcpInfo(FakeEpollTcpInfo info) { + info.totalRetrans = infoToCopy.totalRetrans; + info.retransmits = infoToCopy.retransmits; + info.rtt = infoToCopy.rtt; + } + } + + @Test + public void tracker_reportsDeltas_correctly() throws Exception { + MetricRecorder recorder = mock(MetricRecorder.class); + + TcpMetrics.epollInfo = new TcpMetrics.EpollInfo( + ConfigurableFakeWithTcpInfo.class, FakeEpollTcpInfo.class, + FakeEpollTcpInfo.class.getConstructor(), + ConfigurableFakeWithTcpInfo.class.getMethod("tcpInfo", FakeEpollTcpInfo.class), + FakeEpollTcpInfo.class.getMethod("totalRetrans"), + FakeEpollTcpInfo.class.getMethod("retrans"), + FakeEpollTcpInfo.class.getMethod("rtt")); + TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder); + + FakeEpollTcpInfo infoSource = new FakeEpollTcpInfo(); + ConfigurableFakeWithTcpInfo channel = new ConfigurableFakeWithTcpInfo(infoSource); + + // 10 retransmits total + infoSource.setValues(10, 2, 1000); + tracker.recordTcpInfo(channel); + + verify(recorder).addLongCounter( + eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)), + eq(10L), any(), any()); + + // 15 retransmits total (delta 5) + infoSource.setValues(15, 0, 1000); + tracker.recordTcpInfo(channel); + + verify(recorder).addLongCounter( + eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)), + eq(5L), any(), any()); + + // 15 retransmits total (delta 0) - should NOT report + // also set retransmits to 1 + infoSource.setValues(15, 1, 1000); + tracker.recordTcpInfo(channel); + // Verify no new interactions with this specific metric and value + // We can't easily verify "no interaction" for specific value without capturing. + verify(recorder, org.mockito.Mockito.times(1)).addLongCounter( + eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)), + eq(10L), any(), any()); + verify(recorder, org.mockito.Mockito.times(1)).addLongCounter( + eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)), + eq(5L), any(), any()); + // Total interactions for packetsRetransmitted should be 2 + verify(recorder, org.mockito.Mockito.times(2)).addLongCounter( + eq(Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT)), + anyLong(), any(), any()); + + // recurringRetransmits should NOT have been reported yet (periodic calls) + verify(recorder, org.mockito.Mockito.times(0)).addLongCounter( + eq(Objects.requireNonNull(InternalTcpMetrics.RECURRING_RETRANSMITS_INSTRUMENT)), + anyLong(), any(), any()); + + // Close channel - should report recurringRetransmits + tracker.channelInactive(channel); + verify(recorder, org.mockito.Mockito.times(1)).addLongCounter( + eq(Objects.requireNonNull(InternalTcpMetrics.RECURRING_RETRANSMITS_INSTRUMENT)), + eq(1L), // From last infoSource setValues(15, 1, 1000) + any(), any()); + } + + @Test + public void tracker_recordTcpInfo_reflectionFailure() throws Exception { + MetricRecorder recorder = mock(MetricRecorder.class); + + TcpMetrics.epollInfo = null; + TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder); + + Channel channel = org.mockito.Mockito.mock(Channel.class); + when(channel.isActive()).thenReturn(true); + + // Should catch exception and ignore + tracker.channelInactive(channel); + } + + @Test + public void registeredMetrics_haveCorrectOptionalLabels() throws Exception { + List expectedOptionalLabels = Arrays.asList( + "network.local.address", + "network.local.port", + "network.peer.address", + "network.peer.port"); + + org.junit.Assert.assertEquals( + expectedOptionalLabels, + InternalTcpMetrics.CONNECTIONS_CREATED_INSTRUMENT.getOptionalLabelKeys()); + org.junit.Assert.assertEquals( + expectedOptionalLabels, + InternalTcpMetrics.CONNECTION_COUNT_INSTRUMENT.getOptionalLabelKeys()); + + if (InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT != null) { + org.junit.Assert.assertEquals( + expectedOptionalLabels, + Objects.requireNonNull(InternalTcpMetrics.PACKETS_RETRANSMITTED_INSTRUMENT) + .getOptionalLabelKeys()); + org.junit.Assert.assertEquals( + expectedOptionalLabels, + Objects.requireNonNull(InternalTcpMetrics.RECURRING_RETRANSMITS_INSTRUMENT) + .getOptionalLabelKeys()); + org.junit.Assert.assertEquals( + expectedOptionalLabels, + Objects.requireNonNull(InternalTcpMetrics.MIN_RTT_INSTRUMENT).getOptionalLabelKeys()); + } + } + + @Test + public void channelActive_extractsLabels_ipv4() throws Exception { + + InetAddress localInet = InetAddress.getByAddress(new byte[] { 127, 0, 0, 1 }); + InetAddress remoteInet = InetAddress.getByAddress(new byte[] { 10, 0, 0, 1 }); + when(channel.localAddress()).thenReturn(new InetSocketAddress(localInet, 8080)); + when(channel.remoteAddress()).thenReturn(new InetSocketAddress(remoteInet, 443)); + + metrics.channelActive(channel); + + verify(metricRecorder).addLongCounter( + eq(InternalTcpMetrics.CONNECTIONS_CREATED_INSTRUMENT), eq(1L), + eq(Collections.emptyList()), + eq(Arrays.asList( + localInet.getHostAddress(), "8080", remoteInet.getHostAddress(), "443"))); + verify(metricRecorder).addLongUpDownCounter( + eq(InternalTcpMetrics.CONNECTION_COUNT_INSTRUMENT), eq(1L), + eq(Collections.emptyList()), + eq(Arrays.asList( + localInet.getHostAddress(), "8080", remoteInet.getHostAddress(), "443"))); + verifyNoMoreInteractions(metricRecorder); + } + + @Test + public void channelInactive_extractsLabels_ipv6() throws Exception { + + InetAddress localInet = InetAddress.getByAddress( + new byte[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1 }); + InetAddress remoteInet = InetAddress.getByAddress( + new byte[] { 32, 1, 13, -72, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1 }); + when(channel.localAddress()).thenReturn(new InetSocketAddress(localInet, 8080)); + when(channel.remoteAddress()).thenReturn(new InetSocketAddress(remoteInet, 443)); + + metrics.channelInactive(channel); + + verify(metricRecorder).addLongUpDownCounter( + eq(InternalTcpMetrics.CONNECTION_COUNT_INSTRUMENT), eq(-1L), + eq(Collections.emptyList()), + eq(Arrays.asList( + localInet.getHostAddress(), "8080", remoteInet.getHostAddress(), "443"))); + verifyNoMoreInteractions(metricRecorder); + } + + @Test + public void channelActive_extractsLabels_nonInetAddress() throws Exception { + SocketAddress dummyAddress = new SocketAddress() { + }; + when(channel.localAddress()).thenReturn(dummyAddress); + when(channel.remoteAddress()).thenReturn(dummyAddress); + + metrics.channelActive(channel); + + verify(metricRecorder).addLongCounter( + eq(InternalTcpMetrics.CONNECTIONS_CREATED_INSTRUMENT), eq(1L), + eq(Collections.emptyList()), + eq(Arrays.asList("", "", "", ""))); + verify(metricRecorder).addLongUpDownCounter( + eq(InternalTcpMetrics.CONNECTION_COUNT_INSTRUMENT), eq(1L), + eq(Collections.emptyList()), + eq(Arrays.asList("", "", "", ""))); + verifyNoMoreInteractions(metricRecorder); + } + + @Test + public void channelActive_incrementsCounts() throws Exception { + metrics.channelActive(channel); + verify(metricRecorder).addLongCounter( + eq(InternalTcpMetrics.CONNECTIONS_CREATED_INSTRUMENT), eq(1L), + eq(Collections.emptyList()), + eq(Arrays.asList("", "", "", ""))); + verify(metricRecorder).addLongUpDownCounter( + eq(InternalTcpMetrics.CONNECTION_COUNT_INSTRUMENT), eq(1L), + eq(Collections.emptyList()), + eq(Arrays.asList("", "", "", ""))); + verifyNoMoreInteractions(metricRecorder); + } + + @Test + public void channelInactive_decrementsCount_noEpoll_noError() throws Exception { + metrics.channelInactive(channel); + verify(metricRecorder).addLongUpDownCounter( + eq(InternalTcpMetrics.CONNECTION_COUNT_INSTRUMENT), eq(-1L), + eq(Collections.emptyList()), + eq(Arrays.asList("", "", "", ""))); + verifyNoMoreInteractions(metricRecorder); + } + + @Test + public void channelActive_schedulesReportTimer() throws Exception { + when(channel.isActive()).thenReturn(true); + metrics.channelActive(channel); + + ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class); + ArgumentCaptor delayCaptor = ArgumentCaptor.forClass(Long.class); + verify(eventLoop).schedule( + runnableCaptor.capture(), delayCaptor.capture(), eq(TimeUnit.MILLISECONDS)); + + Runnable task = runnableCaptor.getValue(); + long delay = delayCaptor.getValue(); + + // Default RECORD_INTERVAL_MILLIS is 5 minutes (300,000 ms) + // Initial jitter is 10% to 110%, so 30,000 ms to 330,000 ms + org.junit.Assert.assertTrue("Delay should be >= 30000 but was " + delay, delay >= 30_000); + org.junit.Assert.assertTrue("Delay should be <= 330000 but was " + delay, delay <= 330_000); + + // Run the task to verify rescheduling + task.run(); + + verify(eventLoop, org.mockito.Mockito.times(2)) + .schedule(any(Runnable.class), delayCaptor.capture(), eq(TimeUnit.MILLISECONDS)); + + // Re-arming jitter is 90% to 110%, so 270,000 ms to 330,000 ms + long rearmDelay = delayCaptor.getValue(); + org.junit.Assert.assertTrue( + "Delay should be >= 270000 but was " + rearmDelay, rearmDelay >= 270_000); + org.junit.Assert.assertTrue( + "Delay should be <= 330000 but was " + rearmDelay, rearmDelay <= 330_000); + } + + @Test + public void channelInactive_cancelsReportTimer() throws Exception { + when(channel.isActive()).thenReturn(true); + metrics.channelActive(channel); + + metrics.channelInactive(channel); + + verify(scheduledFuture).cancel(false); + } +} diff --git a/okhttp/src/main/java/io/grpc/okhttp/InternalOkHttpServerBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/InternalOkHttpServerBuilder.java index 78a409a3f85..0032972756d 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/InternalOkHttpServerBuilder.java +++ b/okhttp/src/main/java/io/grpc/okhttp/InternalOkHttpServerBuilder.java @@ -17,6 +17,7 @@ package io.grpc.okhttp; import io.grpc.Internal; +import io.grpc.MetricRecorder; import io.grpc.ServerStreamTracer; import io.grpc.internal.InternalServer; import io.grpc.internal.TransportTracer; @@ -29,7 +30,8 @@ @Internal public final class InternalOkHttpServerBuilder { public static InternalServer buildTransportServers(OkHttpServerBuilder builder, - List streamTracerFactories) { + List streamTracerFactories, + MetricRecorder metricRecorder) { return builder.buildTransportServers(streamTracerFactories); } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java index 8daeed42a8c..163d2023b1c 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java @@ -27,6 +27,7 @@ import io.grpc.ForwardingServerBuilder; import io.grpc.InsecureServerCredentials; import io.grpc.Internal; +import io.grpc.MetricRecorder; import io.grpc.ServerBuilder; import io.grpc.ServerCredentials; import io.grpc.ServerStreamTracer; @@ -111,7 +112,15 @@ public static OkHttpServerBuilder forPort(SocketAddress address, ServerCredentia return new OkHttpServerBuilder(address, result.factory); } - final ServerImplBuilder serverImplBuilder = new ServerImplBuilder(this::buildTransportServers); + final ServerImplBuilder serverImplBuilder = new ServerImplBuilder( + new ServerImplBuilder.ClientTransportServersBuilder() { + @Override + public InternalServer buildClientTransportServers( + List streamTracerFactories, + MetricRecorder metricRecorder) { + return buildTransportServers(streamTracerFactories); + } + }); final SocketAddress listenAddress; final HandshakerSocketFactory handshakerSocketFactory; TransportTracer.Factory transportTracerFactory = TransportTracer.getDefaultFactory(); diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java index 076eea3349a..9317ca96639 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java @@ -17,6 +17,7 @@ package io.grpc.okhttp; import io.grpc.InsecureServerCredentials; +import io.grpc.MetricRecorder; import io.grpc.ServerStreamTracer; import io.grpc.internal.AbstractTransportTest; import io.grpc.internal.ClientTransportFactory; @@ -58,11 +59,12 @@ protected InternalServer newServer( @Override protected InternalServer newServer( int port, List streamTracerFactories) { - return OkHttpServerBuilder + OkHttpServerBuilder builder = OkHttpServerBuilder .forPort(port, InsecureServerCredentials.create()) .flowControlWindow(AbstractTransportTest.TEST_FLOW_CONTROL_WINDOW) - .setTransportTracerFactory(fakeClockTransportTracer) - .buildTransportServers(streamTracerFactories); + .setTransportTracerFactory(fakeClockTransportTracer); + return InternalOkHttpServerBuilder + .buildTransportServers(builder, streamTracerFactories, new MetricRecorder() {}); } @Override diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java index 6904340ac74..87ad61c9f27 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java @@ -196,6 +196,7 @@ public void configureServerBuilder(ServerBuilder serverBuilder) { serverBuilder.intercept(openTelemetryTracingModule.getServerSpanPropagationInterceptor()); } serverBuilder.addStreamTracerFactory(openTelemetryMetricsModule.getServerTracerFactory()); + serverBuilder.addMetricSink(sink); } @VisibleForTesting diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java index 16cb02c61c2..f0bd6f93098 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java @@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableList; import io.grpc.ClientInterceptor; import io.grpc.ManagedChannelBuilder; -import io.grpc.MetricSink; import io.grpc.ServerBuilder; import io.grpc.internal.GrpcUtil; import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter; @@ -98,6 +97,7 @@ public void buildTracer() { grpcOpenTelemetry.configureServerBuilder(mockServerBuiler); verify(mockServerBuiler, times(2)).addStreamTracerFactory(any()); verify(mockServerBuiler).intercept(any()); + verify(mockServerBuiler).addMetricSink(any()); verifyNoMoreInteractions(mockServerBuiler); ManagedChannelBuilder mockChannelBuilder = mock(ManagedChannelBuilder.class); @@ -121,7 +121,6 @@ public void builderDefaults() { .build()); assertThat(module.getEnableMetrics()).isEmpty(); assertThat(module.getOptionalLabels()).isEmpty(); - assertThat(module.getSink()).isInstanceOf(MetricSink.class); assertThat(module.getTracer()).isSameInstanceAs(noopOpenTelemetry .getTracerProvider() diff --git a/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java b/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java index 58143a8516c..59936cdd485 100644 --- a/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java +++ b/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java @@ -59,7 +59,7 @@ public class JettyTransportTest extends AbstractTransportTest { protected InternalServer newServer(List streamTracerFactories) { return new InternalServer() { final InternalServer delegate = - new ServletServerBuilder().buildTransportServers(streamTracerFactories); + new ServletServerBuilder().buildTransportServers(streamTracerFactories); @Override public void start(ServerListener listener) throws IOException { diff --git a/servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java b/servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java index aee25de01ad..5bea4c6e03b 100644 --- a/servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java +++ b/servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java @@ -78,7 +78,9 @@ public final class ServletServerBuilder extends ForwardingServerBuilder + buildTransportServers(streamTracerFactories)); } /** diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index 196d51fb5a6..06bd66008f4 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -1052,18 +1052,18 @@ private static final class BootstrappingXdsClientPool implements XdsClientPool { private final XdsClientPoolFactory xdsClientPoolFactory; private final String target; private final @Nullable Map bootstrapOverride; - private final @Nullable MetricRecorder metricRecorder; + private final MetricRecorder metricRecorder; private ObjectPool xdsClientPool; BootstrappingXdsClientPool( XdsClientPoolFactory xdsClientPoolFactory, String target, @Nullable Map bootstrapOverride, - @Nullable MetricRecorder metricRecorder) { + MetricRecorder metricRecorder) { this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory"); this.target = checkNotNull(target, "target"); this.bootstrapOverride = bootstrapOverride; - this.metricRecorder = metricRecorder; + this.metricRecorder = checkNotNull(metricRecorder, "metricRecorder"); } @Override