diff --git a/modules/core/src/test/java/org/apache/ignite/client/LocalIgniteCluster.java b/modules/core/src/test/java/org/apache/ignite/client/LocalIgniteCluster.java index 50ae358a4d9e0..d6f1816ba6bb0 100644 --- a/modules/core/src/test/java/org/apache/ignite/client/LocalIgniteCluster.java +++ b/modules/core/src/test/java/org/apache/ignite/client/LocalIgniteCluster.java @@ -41,7 +41,7 @@ public class LocalIgniteCluster implements AutoCloseable { private static final Random rnd = new Random(); /** Servers. */ - private final List srvs = new ArrayList<>(); + public final List srvs = new ArrayList<>(); /** Configurations of the failed servers. */ private final List failedCfgs = new ArrayList<>(); diff --git a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java index 86e00d30137a6..edb67bd3117be 100644 --- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java +++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java @@ -17,9 +17,9 @@ package org.apache.ignite.client; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -46,11 +46,15 @@ import org.apache.ignite.configuration.ClientConnectorConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.failure.FailureHandler; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.client.thin.AbstractThinClientTest; import org.apache.ignite.internal.client.thin.ClientOperation; import org.apache.ignite.internal.client.thin.ClientServerError; import org.apache.ignite.internal.client.thin.ServicesTest; +import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.services.Service; @@ -58,13 +62,17 @@ import org.apache.ignite.services.ServiceContext; import org.apache.ignite.testframework.GridTestUtils; import org.junit.Assume; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import static java.util.Arrays.asList; +import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE; import static org.apache.ignite.events.EventType.EVTS_CACHE; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** * High Availability tests. @@ -82,19 +90,20 @@ public class ReliabilityTest extends AbstractThinClientTest { @Parameterized.Parameter(1) public boolean async; + /** Async operations. */ + @Parameterized.Parameter(2) + public int idx; + /** * @return List of parameters to test. */ - @Parameterized.Parameters(name = "partitionAware={0}, async={1}") + @Parameterized.Parameters(name = "partitionAware={0}, async={1}, idx={2}") public static Collection testData() { - List res = new ArrayList<>(); - - res.add(new Object[] {false, false}); - res.add(new Object[] {false, true}); - res.add(new Object[] {true, false}); - res.add(new Object[] {true, true}); - - return res; + return GridTestUtils.cartesianProduct( + asList(false, true), + asList(false, true), + IntStream.range(0, 50).boxed().collect(Collectors.toList()) + ); } /** @@ -120,23 +129,24 @@ public void testFailover() throws Exception { final ClientCache cache = client.getOrCreateCache( new ClientCacheConfiguration() .setName("testFailover") + .setPartitionLossPolicy(READ_ONLY_SAFE) .setCacheMode(CacheMode.REPLICATED) .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) ); - // Simple operation failover: put/get - assertOnUnstableCluster(cluster, () -> { - Integer key = rnd.nextInt(); - String val = key.toString(); - - cachePut(cache, key, val); +// // Simple operation failover: put/get +// assertOnUnstableCluster(cluster, () -> { +// Integer key = rnd.nextInt(); +// String val = key.toString(); +// +// cachePut(cache, key, val); +// +// String cachedVal = cache.get(key); +// +// assertEquals(val, cachedVal); +// }); - String cachedVal = cache.get(key); - - assertEquals(val, cachedVal); - }); - - cache.clear(); +// cache.clear(); // Composite operation failover: query Map data = IntStream.rangeClosed(1, 1000).boxed() @@ -152,10 +162,36 @@ public void testFailover() throws Exception { try (QueryCursor> cur = cache.query(qry)) { List> res = cur.getAll(); - assertEquals("Unexpected number of entries", data.size(), res.size()); - Map act = res.stream() - .collect(Collectors.toMap(Cache.Entry::getKey, Cache.Entry::getValue)); + .collect(Collectors.toMap(Cache.Entry::getKey, Cache.Entry::getValue)); + + Map failover0 = null; + Map failover1 = null; + Map failover2 = null; + + if (data.size() != res.size()) { + synchronized (this) { + failover0 = new HashMap<>(); + failover1 = new HashMap<>(); + failover2 = new HashMap<>(); + + for (int i : data.keySet()) { + failover0.put(i, cluster.srvs.get(0).cache("testFailover").get(i)); + + if (cluster.srvs.get(1) != null) + failover1.put(i, cluster.srvs.get(1).cache("testFailover").get(i)); + + if (cluster.srvs.get(2) != null) + failover2.put(i, cluster.srvs.get(2).cache("testFailover").get(i)); + } + } + } + + assertEquals( + "Unexpected number of entries " + act + " " + failover0 + " " + failover1 + " " + failover2, + data.size(), + res.size() + ); assertEquals("Unexpected entries", data, act); } @@ -192,6 +228,7 @@ public void testFailover() throws Exception { * Test single server failover. */ @Test + @Ignore public void testSingleServerFailover() throws Exception { try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1); IgniteClient client = Ignition.startClient(getClientConfiguration() @@ -214,6 +251,7 @@ public void testSingleServerFailover() throws Exception { * Test single server can be used multiple times in configuration. */ @Test + @Ignore public void testSingleServerDuplicatedFailover() throws Exception { try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1); IgniteClient client = Ignition.startClient(getClientConfiguration() @@ -237,6 +275,7 @@ public void testSingleServerDuplicatedFailover() throws Exception { * Test single server can be used multiple times in configuration. */ @Test + @Ignore public void testRetryReadPolicyRetriesCacheGet() { try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1); IgniteClient client = Ignition.startClient(getClientConfiguration() @@ -261,6 +300,7 @@ public void testRetryReadPolicyRetriesCacheGet() { * Tests retry policy exception handling. */ @Test + @Ignore public void testExceptionInRetryPolicyPropagatesToCaller() { Assume.assumeFalse(partitionAware); @@ -294,6 +334,7 @@ public void testExceptionInRetryPolicyPropagatesToCaller() { */ @SuppressWarnings("ThrowableNotThrown") @Test + @Ignore public void testNullRetryPolicyDisablesFailover() { try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1); IgniteClient client = Ignition.startClient(getClientConfiguration() @@ -322,6 +363,7 @@ public void testNullRetryPolicyDisablesFailover() { */ @SuppressWarnings("ThrowableNotThrown") @Test + @Ignore public void testRetryNonePolicyDisablesFailover() { try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1); IgniteClient client = Ignition.startClient(getClientConfiguration() @@ -349,6 +391,7 @@ public void testRetryNonePolicyDisablesFailover() { * Tests that {@link ClientOperationType} is updated accordingly when {@link ClientOperation} is added. */ @Test + @Ignore public void testRetryPolicyConvertOpAllOperationsSupported() { List nullOps = Arrays.stream(ClientOperation.values()) .filter(o -> o.toPublicOperationType() == null) @@ -369,6 +412,7 @@ public void testRetryPolicyConvertOpAllOperationsSupported() { * Test that failover doesn't lead to silent query inconsistency. */ @Test + @Ignore public void testQueryConsistencyOnFailover() throws Exception { int CLUSTER_SIZE = 2; @@ -407,6 +451,7 @@ public void testQueryConsistencyOnFailover() throws Exception { * Test that client works properly with servers txId intersection. */ @Test + @Ignore @SuppressWarnings("ThrowableNotThrown") public void testTxWithIdIntersection() throws Exception { // Partition-aware client connects to all known servers at the start, and dropAllThinClientConnections @@ -477,6 +522,7 @@ public void testTxWithIdIntersection() throws Exception { * Test reconnection throttling. */ @Test + @Ignore @SuppressWarnings("ThrowableNotThrown") public void testReconnectionThrottling() throws Exception { // If partition awareness is enabled, channels are restored asynchronously without applying throttling. @@ -506,7 +552,7 @@ public void testReconnectionThrottling() throws Exception { doSleep(throttlingPeriod); // Attempt to reconnect after throttlingPeriod should pass. - assertTrue(GridTestUtils.waitForCondition(() -> { + assertTrue(waitForCondition(() -> { try { cachePut(cache, 0, 0); @@ -523,6 +569,7 @@ public void testReconnectionThrottling() throws Exception { * Test server-side critical error. */ @Test + @Ignore public void testServerCriticalError() throws Exception { AtomicBoolean failure = new AtomicBoolean(); @@ -552,7 +599,7 @@ public void testServerCriticalError() throws Exception { GridTestUtils.assertThrowsAnyCause(log, () -> cache.remove(0), ClientServerError.class, msg); - assertTrue(GridTestUtils.waitForCondition(failure::get, 1_000L)); + assertTrue(waitForCondition(failure::get, 1_000L)); } } @@ -561,6 +608,7 @@ public void testServerCriticalError() throws Exception { * cluster failover. */ @Test + @Ignore public void testServiceMethodInvocationAfterFailover() throws Exception { PersonExternalizable person = new PersonExternalizable("Person 1"); @@ -627,6 +675,7 @@ public void testServiceMethodInvocationAfterFailover() throws Exception { * Tests that server does not disconnect idle clients when heartbeats are enabled. */ @Test + @Ignore public void testServerDoesNotDisconnectIdleClientWithHeartbeats() throws Exception { IgniteConfiguration serverCfg = getConfiguration().setClientConnectorConfiguration( new ClientConnectorConfiguration().setIdleTimeout(2000)); @@ -646,6 +695,7 @@ public void testServerDoesNotDisconnectIdleClientWithHeartbeats() throws Excepti * Tests service proxy failover. */ @Test + @Ignore public void testServiceProxyFailover() throws Exception { Assume.assumeTrue(partitionAware); @@ -703,20 +753,27 @@ private void assertOnUnstableCluster(LocalIgniteCluster cluster, Runnable clo) t Future topChangeFut = Executors.newSingleThreadExecutor().submit(() -> { try { for (int i = 0; i < 5 && !stopFlag.get(); i++) { - while (cluster.size() != 1) - cluster.failNode(); + while (cluster.size() != 1) { + synchronized (this) { + cluster.failNode(); + } + } - while (cluster.size() != cluster.getInitialSize()) - cluster.restoreNode(); + while (cluster.size() != cluster.getInitialSize()) { + synchronized (this) { + cluster.restoreNode(); + } + } - awaitPartitionMapExchange(); + waitRebalanceFinished(); } } - catch (InterruptedException ignore) { + catch (InterruptedException | IgniteInterruptedCheckedException ignore) { // No-op. } - - stopFlag.set(true); + finally { + stopFlag.set(true); + } }); // Use Ignite while nodes keep failing. @@ -731,6 +788,23 @@ private void assertOnUnstableCluster(LocalIgniteCluster cluster, Runnable clo) t } } + /** */ + private void waitRebalanceFinished() throws IgniteInterruptedCheckedException, InterruptedException { + awaitPartitionMapExchange(true, true, null); + + for (Ignite ignite : G.allGrids()) { + GridCacheProcessor cacheProc = ((IgniteEx)ignite).context().cache(); + + for (String cacheName : cacheProc.cacheNames()) { + assertTrue(waitForCondition( + () -> cacheProc.internalCache(cacheName).context().topology().rebalanceFinished( + cacheProc.context().exchange().readyAffinityVersion() + ), 5000 + )); + } + } + } + /** {@inheritDoc} */ @Override protected boolean isClientPartitionAwarenessEnabled() { return partitionAware;