From 3158c9e6cf1a1d5f27100229d1d677ba96c70f13 Mon Sep 17 00:00:00 2001 From: Mikita Hradovich Date: Thu, 18 Jun 2026 13:57:54 +0200 Subject: [PATCH] fix: cap onRequestError retries at 1 in DefaultRetryPolicy DefaultRetryPolicy.onRequestError() now retries on the next host only once (nbRetry == 0), then rethrows. Previously, it returned tryNextHost unconditionally, allowing retries across all nodes in the query plan. In large clusters, a single timed-out request could cascade through every node. With a 6-second client timeout and 12 nodes, this means a single thread could be blocked for up to 72 seconds, leading to thread pool exhaustion (RejectedExecutionException). The same fix is applied to DowngradingConsistencyRetryPolicy. This makes onRequestError consistent with onUnavailable, onReadTimeout, and onWriteTimeout, all of which cap retries at 1. Motivated-by: CUSTOMER-331 --- .../core/policies/DefaultRetryPolicy.java | 23 ++- .../DowngradingConsistencyRetryPolicy.java | 19 ++- .../DefaultRetryPolicyIntegrationTest.java | 142 ++++++++++------ ...ConsistencyRetryPolicyIntegrationTest.java | 152 +++++++++++------- 4 files changed, 220 insertions(+), 116 deletions(-) diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/DefaultRetryPolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/DefaultRetryPolicy.java index dc8fed4262b..33262030be5 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/policies/DefaultRetryPolicy.java +++ b/driver-core/src/main/java/com/datastax/driver/core/policies/DefaultRetryPolicy.java @@ -34,8 +34,8 @@ *
  • On a write timeout, retries once on the same host if we timeout while writing the * distributed log used by batch statements. *
  • On an unavailable exception, retries once on the next host. - *
  • On a request error, such as a client timeout, the query is retried on the next host. Do not - * retry on read or write failures. + *
  • On a request error, such as a client timeout, retries once on the next host. Do not retry + * on read or write failures. * * *

    This retry policy is conservative in that it will never retry with a different consistency @@ -136,16 +136,27 @@ public RetryDecision onUnavailable( return (nbRetry == 0) ? RetryDecision.tryNextHost(null) : RetryDecision.rethrow(); } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + * + *

    This implementation triggers a maximum of one retry on the next host in the query plan. The + * rationale is that the first coordinator might have been network-isolated or overloaded, and + * moving to the next host might resolve the issue. If the retry also fails, the exception is + * rethrown. + * + *

    Read and write failures are never retried, as they generally indicate a data problem that is + * unlikely to be resolved by a retry. + * + * @return {@code RetryDecision.tryNextHost(cl)} if no retry attempt has yet been tried and the + * error is not a read/write failure, {@code RetryDecision.rethrow()} otherwise. + */ @Override public RetryDecision onRequestError( Statement statement, ConsistencyLevel cl, DriverException e, int nbRetry) { - // do not retry these by default as they generally indicate a data problem or - // other issue that is unlikely to be resolved by a retry. if (e instanceof WriteFailureException || e instanceof ReadFailureException) { return RetryDecision.rethrow(); } - return RetryDecision.tryNextHost(cl); + return (nbRetry == 0) ? RetryDecision.tryNextHost(cl) : RetryDecision.rethrow(); } @Override diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/DowngradingConsistencyRetryPolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/DowngradingConsistencyRetryPolicy.java index 27eaf72066c..af3a0fc4a1e 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/policies/DowngradingConsistencyRetryPolicy.java +++ b/driver-core/src/main/java/com/datastax/driver/core/policies/DowngradingConsistencyRetryPolicy.java @@ -208,21 +208,24 @@ public RetryDecision onUnavailable( /** * {@inheritDoc} * - *

    For historical reasons, this implementation triggers a retry on the next host in the query - * plan with the same consistency level, regardless of the statement's idempotence. Note that this - * breaks the general rule stated in {@link RetryPolicy#onRequestError(Statement, - * ConsistencyLevel, DriverException, int)}: "a retry should only be attempted if the request is - * known to be idempotent".` + *

    This implementation triggers a maximum of one retry on the next host in the query plan. The + * rationale is that the first coordinator might have been network-isolated or overloaded, and + * moving to the next host might resolve the issue. If the retry also fails, the exception is + * rethrown. + * + *

    Read and write failures are never retried, as they generally indicate a data problem that is + * unlikely to be resolved by a retry. + * + * @return {@code RetryDecision.tryNextHost(cl)} if no retry attempt has yet been tried and the + * error is not a read/write failure, {@code RetryDecision.rethrow()} otherwise. */ @Override public RetryDecision onRequestError( Statement statement, ConsistencyLevel cl, DriverException e, int nbRetry) { - // do not retry these by default as they generally indicate a data problem or - // other issue that is unlikely to be resolved by a retry. if (e instanceof WriteFailureException || e instanceof ReadFailureException) { return RetryDecision.rethrow(); } - return RetryDecision.tryNextHost(cl); + return (nbRetry == 0) ? RetryDecision.tryNextHost(cl) : RetryDecision.rethrow(); } @Override diff --git a/driver-core/src/test/java/com/datastax/driver/core/policies/DefaultRetryPolicyIntegrationTest.java b/driver-core/src/test/java/com/datastax/driver/core/policies/DefaultRetryPolicyIntegrationTest.java index aaa25e1d9d4..3c14fbd8f2f 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/policies/DefaultRetryPolicyIntegrationTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/policies/DefaultRetryPolicyIntegrationTest.java @@ -179,7 +179,38 @@ public void should_rethrow_unavailable_in_no_host_available_exception() { } @Test(groups = "short") - public void should_try_next_host_on_client_timeouts() { + public void should_try_next_host_on_first_client_timeout() { + cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(1); + try { + scassandras + .node(1) + .primingClient() + .prime( + PrimingRequest.queryBuilder() + .withQuery("mock query") + .withThen(then().withFixedDelay(1000L).withRows(row("result", "result1"))) + .build()); + simulateNormalResponse(2); + + query(); + + assertOnRequestErrorWasCalled(1, OperationTimedOutException.class); + assertThat(errors.getRetries().getCount()).isEqualTo(1); + assertThat(errors.getClientTimeouts().getCount()).isEqualTo(1); + assertThat(errors.getRetriesOnClientTimeout().getCount()).isEqualTo(1); + assertQueried(1, 1); + assertQueried(2, 1); + assertQueried(3, 0); + } finally { + cluster + .getConfiguration() + .getSocketOptions() + .setReadTimeoutMillis(SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS); + } + } + + @Test(groups = "short") + public void should_rethrow_on_second_client_timeout() { cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(1); try { scassandras @@ -208,32 +239,17 @@ public void should_try_next_host_on_client_timeouts() { .build()); try { query(); - fail("expected a NoHostAvailableException"); - } catch (NoHostAvailableException e) { - assertThat(e.getErrors().keySet()) - .hasSize(3) - .containsOnly(host1.getEndPoint(), host2.getEndPoint(), host3.getEndPoint()); - assertThat(e.getErrors().values()).hasOnlyElementsOfType(OperationTimedOutException.class); - assertThat( - ((OperationTimedOutException) e.getErrors().get(host1.getEndPoint())).getMessage()) - .contains( - String.format("[%s] Timed out waiting for server response", host1.getEndPoint())); - assertThat( - ((OperationTimedOutException) e.getErrors().get(host2.getEndPoint())).getMessage()) - .contains( - String.format("[%s] Timed out waiting for server response", host2.getEndPoint())); - assertThat( - ((OperationTimedOutException) e.getErrors().get(host3.getEndPoint())).getMessage()) - .contains( - String.format("[%s] Timed out waiting for server response", host3.getEndPoint())); + fail("expected an OperationTimedOutException"); + } catch (OperationTimedOutException e) { + assertThat(e.getMessage()).contains("Timed out waiting for server response"); } - assertOnRequestErrorWasCalled(3, OperationTimedOutException.class); - assertThat(errors.getRetries().getCount()).isEqualTo(3); - assertThat(errors.getClientTimeouts().getCount()).isEqualTo(3); - assertThat(errors.getRetriesOnClientTimeout().getCount()).isEqualTo(3); + assertOnRequestErrorWasCalled(2, OperationTimedOutException.class); + assertThat(errors.getRetries().getCount()).isEqualTo(1); + assertThat(errors.getClientTimeouts().getCount()).isEqualTo(2); + assertThat(errors.getRetriesOnClientTimeout().getCount()).isEqualTo(1); assertQueried(1, 1); assertQueried(2, 1); - assertQueried(3, 1); + assertQueried(3, 0); } finally { cluster .getConfiguration() @@ -243,27 +259,41 @@ public void should_try_next_host_on_client_timeouts() { } @Test(groups = "short", dataProvider = "serverSideErrors") - public void should_try_next_host_on_server_side_error( + public void should_try_next_host_on_first_server_side_error( + Result error, Class exception) { + simulateError(1, error); + simulateNormalResponse(2); + + query(); + + assertOnRequestErrorWasCalled(1, exception); + assertThat(errors.getOthers().getCount()).isEqualTo(1); + assertThat(errors.getRetries().getCount()).isEqualTo(1); + assertThat(errors.getRetriesOnOtherErrors().getCount()).isEqualTo(1); + assertQueried(1, 1); + assertQueried(2, 1); + assertQueried(3, 0); + } + + @Test(groups = "short", dataProvider = "serverSideErrors") + public void should_rethrow_on_second_server_side_error( Result error, Class exception) { simulateError(1, error); simulateError(2, error); simulateError(3, error); try { query(); - Fail.fail("expected a NoHostAvailableException"); - } catch (NoHostAvailableException e) { - assertThat(e.getErrors().keySet()) - .hasSize(3) - .containsOnly(host1.getEndPoint(), host2.getEndPoint(), host3.getEndPoint()); - assertThat(e.getErrors().values()).hasOnlyElementsOfType(exception); + Fail.fail("expected a " + exception.getSimpleName()); + } catch (DriverException e) { + assertThat(e).isInstanceOf(exception); } - assertOnRequestErrorWasCalled(3, exception); - assertThat(errors.getOthers().getCount()).isEqualTo(3); - assertThat(errors.getRetries().getCount()).isEqualTo(3); - assertThat(errors.getRetriesOnOtherErrors().getCount()).isEqualTo(3); + assertOnRequestErrorWasCalled(2, exception); + assertThat(errors.getOthers().getCount()).isEqualTo(2); + assertThat(errors.getRetries().getCount()).isEqualTo(1); + assertThat(errors.getRetriesOnOtherErrors().getCount()).isEqualTo(1); assertQueried(1, 1); assertQueried(2, 1); - assertQueried(3, 1); + assertQueried(3, 0); } @Test(groups = "short") @@ -307,27 +337,43 @@ public void should_rethrow_on_write_failure() { } @Test(groups = "short", dataProvider = "connectionErrors") - public void should_try_next_host_on_connection_error(ClosedConnectionConfig.CloseType closeType) { + public void should_try_next_host_on_first_connection_error( + ClosedConnectionConfig.CloseType closeType) { + simulateError(1, closed_connection, new ClosedConnectionConfig(closeType)); + simulateNormalResponse(2); + + query(); + + assertOnRequestErrorWasCalled(1, TransportException.class); + assertThat(errors.getRetries().getCount()).isEqualTo(1); + assertThat(errors.getConnectionErrors().getCount()).isEqualTo(1); + assertThat(errors.getIgnoresOnConnectionError().getCount()).isEqualTo(0); + assertThat(errors.getRetriesOnConnectionError().getCount()).isEqualTo(1); + assertQueried(1, 1); + assertQueried(2, 1); + assertQueried(3, 0); + } + + @Test(groups = "short", dataProvider = "connectionErrors") + public void should_rethrow_on_second_connection_error( + ClosedConnectionConfig.CloseType closeType) { simulateError(1, closed_connection, new ClosedConnectionConfig(closeType)); simulateError(2, closed_connection, new ClosedConnectionConfig(closeType)); simulateError(3, closed_connection, new ClosedConnectionConfig(closeType)); try { query(); - Fail.fail("expected a NoHostAvailableException"); - } catch (NoHostAvailableException e) { - assertThat(e.getErrors().keySet()) - .hasSize(3) - .containsOnly(host1.getEndPoint(), host2.getEndPoint(), host3.getEndPoint()); - assertThat(e.getErrors().values()).hasOnlyElementsOfType(TransportException.class); + Fail.fail("expected a TransportException"); + } catch (TransportException e) { + // expected — rethrown after one retry } - assertOnRequestErrorWasCalled(3, TransportException.class); - assertThat(errors.getRetries().getCount()).isEqualTo(3); - assertThat(errors.getConnectionErrors().getCount()).isEqualTo(3); + assertOnRequestErrorWasCalled(2, TransportException.class); + assertThat(errors.getRetries().getCount()).isEqualTo(1); + assertThat(errors.getConnectionErrors().getCount()).isEqualTo(2); assertThat(errors.getIgnoresOnConnectionError().getCount()).isEqualTo(0); - assertThat(errors.getRetriesOnConnectionError().getCount()).isEqualTo(3); + assertThat(errors.getRetriesOnConnectionError().getCount()).isEqualTo(1); assertQueried(1, 1); assertQueried(2, 1); - assertQueried(3, 1); + assertQueried(3, 0); } @Test(groups = "short") diff --git a/driver-core/src/test/java/com/datastax/driver/core/policies/DowngradingConsistencyRetryPolicyIntegrationTest.java b/driver-core/src/test/java/com/datastax/driver/core/policies/DowngradingConsistencyRetryPolicyIntegrationTest.java index 7a305358ec2..7745c7e2216 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/policies/DowngradingConsistencyRetryPolicyIntegrationTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/policies/DowngradingConsistencyRetryPolicyIntegrationTest.java @@ -31,7 +31,6 @@ import com.datastax.driver.core.SocketOptions; import com.datastax.driver.core.WriteType; import com.datastax.driver.core.exceptions.DriverException; -import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.datastax.driver.core.exceptions.OperationTimedOutException; import com.datastax.driver.core.exceptions.ReadFailureException; import com.datastax.driver.core.exceptions.ReadTimeoutException; @@ -370,13 +369,43 @@ public void should_rethrow_if_no_hosts_alive_on_unavailable() { /** * Ensures that when handling a client timeout with {@link DowngradingConsistencyRetryPolicy} that - * a retry is attempted on the next host until all hosts are tried at which point a {@link - * NoHostAvailableException} is returned. + * a retry is attempted on the next host. If the retry also times out, the exception is rethrown. * * @test_category retry_policy */ @Test(groups = "short") - public void should_try_next_host_on_client_timeouts() { + public void should_try_next_host_on_first_client_timeout() { + cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(1); + try { + scassandras + .node(1) + .primingClient() + .prime( + PrimingRequest.queryBuilder() + .withQuery("mock query") + .withThen(then().withFixedDelay(1000L).withRows(row("result", "result1"))) + .build()); + simulateNormalResponse(2); + + query(); + + assertOnRequestErrorWasCalled(1, OperationTimedOutException.class); + assertThat(errors.getRetries().getCount()).isEqualTo(1); + assertThat(errors.getClientTimeouts().getCount()).isEqualTo(1); + assertThat(errors.getRetriesOnClientTimeout().getCount()).isEqualTo(1); + assertQueried(1, 1); + assertQueried(2, 1); + assertQueried(3, 0); + } finally { + cluster + .getConfiguration() + .getSocketOptions() + .setReadTimeoutMillis(SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS); + } + } + + @Test(groups = "short") + public void should_rethrow_on_second_client_timeout() { cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(1); try { scassandras @@ -405,32 +434,17 @@ public void should_try_next_host_on_client_timeouts() { .build()); try { query(); - Assertions.fail("expected a NoHostAvailableException"); - } catch (NoHostAvailableException e) { - assertThat(e.getErrors().keySet()) - .hasSize(3) - .containsOnly(host1.getEndPoint(), host2.getEndPoint(), host3.getEndPoint()); - assertThat(e.getErrors().values()).hasOnlyElementsOfType(OperationTimedOutException.class); - assertThat( - ((OperationTimedOutException) e.getErrors().get(host1.getEndPoint())).getMessage()) - .contains( - String.format("[%s] Timed out waiting for server response", host1.getEndPoint())); - assertThat( - ((OperationTimedOutException) e.getErrors().get(host2.getEndPoint())).getMessage()) - .contains( - String.format("[%s] Timed out waiting for server response", host2.getEndPoint())); - assertThat( - ((OperationTimedOutException) e.getErrors().get(host3.getEndPoint())).getMessage()) - .contains( - String.format("[%s] Timed out waiting for server response", host3.getEndPoint())); + Assertions.fail("expected an OperationTimedOutException"); + } catch (OperationTimedOutException e) { + assertThat(e.getMessage()).contains("Timed out waiting for server response"); } - assertOnRequestErrorWasCalled(3, OperationTimedOutException.class); - assertThat(errors.getRetries().getCount()).isEqualTo(3); - assertThat(errors.getClientTimeouts().getCount()).isEqualTo(3); - assertThat(errors.getRetriesOnClientTimeout().getCount()).isEqualTo(3); + assertOnRequestErrorWasCalled(2, OperationTimedOutException.class); + assertThat(errors.getRetries().getCount()).isEqualTo(1); + assertThat(errors.getClientTimeouts().getCount()).isEqualTo(2); + assertThat(errors.getRetriesOnClientTimeout().getCount()).isEqualTo(1); assertQueried(1, 1); assertQueried(2, 1); - assertQueried(3, 1); + assertQueried(3, 0); } finally { cluster .getConfiguration() @@ -441,66 +455,96 @@ public void should_try_next_host_on_client_timeouts() { /** * Ensures that when handling a server error defined in {@link #serverSideErrors} with {@link - * DowngradingConsistencyRetryPolicy} that a retry is attempted on the next host until all hosts - * are tried at which point a {@link NoHostAvailableException} is raised and it's errors include - * the expected exception. + * DowngradingConsistencyRetryPolicy} that a retry is attempted on the next host. If the retry + * also fails, the exception is rethrown. * * @param error Server side error to be produced. * @param exception The exception we expect to be raised. * @test_category retry_policy */ @Test(groups = "short", dataProvider = "serverSideErrors") - public void should_try_next_host_on_server_side_error( + public void should_try_next_host_on_first_server_side_error( + Result error, Class exception) { + simulateError(1, error); + simulateNormalResponse(2); + + query(); + + assertOnRequestErrorWasCalled(1, exception); + assertThat(errors.getOthers().getCount()).isEqualTo(1); + assertThat(errors.getRetries().getCount()).isEqualTo(1); + assertThat(errors.getRetriesOnOtherErrors().getCount()).isEqualTo(1); + assertQueried(1, 1); + assertQueried(2, 1); + assertQueried(3, 0); + } + + @Test(groups = "short", dataProvider = "serverSideErrors") + public void should_rethrow_on_second_server_side_error( Result error, Class exception) { simulateError(1, error); simulateError(2, error); simulateError(3, error); try { query(); - Fail.fail("expected a NoHostAvailableException"); - } catch (NoHostAvailableException e) { - assertThat(e.getErrors().keySet()) - .hasSize(3) - .containsOnly(host1.getEndPoint(), host2.getEndPoint(), host3.getEndPoint()); - assertThat(e.getErrors().values()).hasOnlyElementsOfType(exception); + Fail.fail("expected a " + exception.getSimpleName()); + } catch (DriverException e) { + assertThat(e).isInstanceOf(exception); } - assertOnRequestErrorWasCalled(3, exception); - assertThat(errors.getOthers().getCount()).isEqualTo(3); - assertThat(errors.getRetries().getCount()).isEqualTo(3); - assertThat(errors.getRetriesOnOtherErrors().getCount()).isEqualTo(3); + assertOnRequestErrorWasCalled(2, exception); + assertThat(errors.getOthers().getCount()).isEqualTo(2); + assertThat(errors.getRetries().getCount()).isEqualTo(1); + assertThat(errors.getRetriesOnOtherErrors().getCount()).isEqualTo(1); assertQueried(1, 1); assertQueried(2, 1); - assertQueried(3, 1); + assertQueried(3, 0); } /** * Ensures that when handling a connection error caused by the connection closing during a request - * in a way described by {@link #connectionErrors} that the next host is tried. + * in a way described by {@link #connectionErrors} that the next host is tried. If the retry also + * fails, the exception is rethrown. * * @param closeType The way the connection should be closed during the request. */ @Test(groups = "short", dataProvider = "connectionErrors") - public void should_try_next_host_on_connection_error(ClosedConnectionConfig.CloseType closeType) { + public void should_try_next_host_on_first_connection_error( + ClosedConnectionConfig.CloseType closeType) { + simulateError(1, closed_connection, new ClosedConnectionConfig(closeType)); + simulateNormalResponse(2); + + query(); + + assertOnRequestErrorWasCalled(1, TransportException.class); + assertThat(errors.getRetries().getCount()).isEqualTo(1); + assertThat(errors.getConnectionErrors().getCount()).isEqualTo(1); + assertThat(errors.getIgnoresOnConnectionError().getCount()).isEqualTo(0); + assertThat(errors.getRetriesOnConnectionError().getCount()).isEqualTo(1); + assertQueried(1, 1); + assertQueried(2, 1); + assertQueried(3, 0); + } + + @Test(groups = "short", dataProvider = "connectionErrors") + public void should_rethrow_on_second_connection_error( + ClosedConnectionConfig.CloseType closeType) { simulateError(1, closed_connection, new ClosedConnectionConfig(closeType)); simulateError(2, closed_connection, new ClosedConnectionConfig(closeType)); simulateError(3, closed_connection, new ClosedConnectionConfig(closeType)); try { query(); Fail.fail("expected a TransportException"); - } catch (NoHostAvailableException e) { - assertThat(e.getErrors().keySet()) - .hasSize(3) - .containsOnly(host1.getEndPoint(), host2.getEndPoint(), host3.getEndPoint()); - assertThat(e.getErrors().values()).hasOnlyElementsOfType(TransportException.class); + } catch (TransportException e) { + // expected — rethrown after one retry } - assertOnRequestErrorWasCalled(3, TransportException.class); - assertThat(errors.getRetries().getCount()).isEqualTo(3); - assertThat(errors.getConnectionErrors().getCount()).isEqualTo(3); + assertOnRequestErrorWasCalled(2, TransportException.class); + assertThat(errors.getRetries().getCount()).isEqualTo(1); + assertThat(errors.getConnectionErrors().getCount()).isEqualTo(2); assertThat(errors.getIgnoresOnConnectionError().getCount()).isEqualTo(0); - assertThat(errors.getRetriesOnConnectionError().getCount()).isEqualTo(3); + assertThat(errors.getRetriesOnConnectionError().getCount()).isEqualTo(1); assertQueried(1, 1); assertQueried(2, 1); - assertQueried(3, 1); + assertQueried(3, 0); } @Test(groups = "short")