Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class LocalIgniteCluster implements AutoCloseable {
private static final Random rnd = new Random();

/** Servers. */
private final List<Ignite> srvs = new ArrayList<>();
public final List<Ignite> srvs = new ArrayList<>();

/** Configurations of the failed servers. */
private final List<NodeConfiguration> failedCfgs = new ArrayList<>();
Expand Down
144 changes: 109 additions & 35 deletions modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.ignite.client;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -46,25 +46,33 @@
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.client.thin.AbstractThinClientTest;
import org.apache.ignite.internal.client.thin.ClientOperation;
import org.apache.ignite.internal.client.thin.ClientServerError;
import org.apache.ignite.internal.client.thin.ServicesTest;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static java.util.Arrays.asList;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
import static org.apache.ignite.events.EventType.EVTS_CACHE;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;

/**
* High Availability tests.
Expand All @@ -82,19 +90,20 @@ public class ReliabilityTest extends AbstractThinClientTest {
@Parameterized.Parameter(1)
public boolean async;

/** Async operations. */
@Parameterized.Parameter(2)
public int idx;

/**
* @return List of parameters to test.
*/
@Parameterized.Parameters(name = "partitionAware={0}, async={1}")
@Parameterized.Parameters(name = "partitionAware={0}, async={1}, idx={2}")
public static Collection<Object[]> testData() {
List<Object[]> res = new ArrayList<>();

res.add(new Object[] {false, false});
res.add(new Object[] {false, true});
res.add(new Object[] {true, false});
res.add(new Object[] {true, true});

return res;
return GridTestUtils.cartesianProduct(
asList(false, true),
asList(false, true),
IntStream.range(0, 50).boxed().collect(Collectors.toList())
);
}

/**
Expand All @@ -120,23 +129,24 @@ public void testFailover() throws Exception {
final ClientCache<Integer, String> cache = client.getOrCreateCache(
new ClientCacheConfiguration()
.setName("testFailover")
.setPartitionLossPolicy(READ_ONLY_SAFE)
.setCacheMode(CacheMode.REPLICATED)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
);

// Simple operation failover: put/get
assertOnUnstableCluster(cluster, () -> {
Integer key = rnd.nextInt();
String val = key.toString();

cachePut(cache, key, val);
// // Simple operation failover: put/get
// assertOnUnstableCluster(cluster, () -> {
// Integer key = rnd.nextInt();
// String val = key.toString();
//
// cachePut(cache, key, val);
//
// String cachedVal = cache.get(key);
//
// assertEquals(val, cachedVal);
// });

String cachedVal = cache.get(key);

assertEquals(val, cachedVal);
});

cache.clear();
// cache.clear();

// Composite operation failover: query
Map<Integer, String> data = IntStream.rangeClosed(1, 1000).boxed()
Expand All @@ -152,10 +162,36 @@ public void testFailover() throws Exception {
try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
List<Cache.Entry<Integer, String>> res = cur.getAll();

assertEquals("Unexpected number of entries", data.size(), res.size());

Map<Integer, String> act = res.stream()
.collect(Collectors.toMap(Cache.Entry::getKey, Cache.Entry::getValue));
.collect(Collectors.toMap(Cache.Entry::getKey, Cache.Entry::getValue));

Map<Integer, Object> failover0 = null;
Map<Integer, Object> failover1 = null;
Map<Integer, Object> failover2 = null;

if (data.size() != res.size()) {
synchronized (this) {
failover0 = new HashMap<>();
failover1 = new HashMap<>();
failover2 = new HashMap<>();

for (int i : data.keySet()) {
failover0.put(i, cluster.srvs.get(0).cache("testFailover").get(i));

if (cluster.srvs.get(1) != null)
failover1.put(i, cluster.srvs.get(1).cache("testFailover").get(i));

if (cluster.srvs.get(2) != null)
failover2.put(i, cluster.srvs.get(2).cache("testFailover").get(i));
}
}
}

assertEquals(
"Unexpected number of entries " + act + " " + failover0 + " " + failover1 + " " + failover2,
data.size(),
res.size()
);

assertEquals("Unexpected entries", data, act);
}
Expand Down Expand Up @@ -192,6 +228,7 @@ public void testFailover() throws Exception {
* Test single server failover.
*/
@Test
@Ignore
public void testSingleServerFailover() throws Exception {
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1);
IgniteClient client = Ignition.startClient(getClientConfiguration()
Expand All @@ -214,6 +251,7 @@ public void testSingleServerFailover() throws Exception {
* Test single server can be used multiple times in configuration.
*/
@Test
@Ignore
public void testSingleServerDuplicatedFailover() throws Exception {
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1);
IgniteClient client = Ignition.startClient(getClientConfiguration()
Expand All @@ -237,6 +275,7 @@ public void testSingleServerDuplicatedFailover() throws Exception {
* Test single server can be used multiple times in configuration.
*/
@Test
@Ignore
public void testRetryReadPolicyRetriesCacheGet() {
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1);
IgniteClient client = Ignition.startClient(getClientConfiguration()
Expand All @@ -261,6 +300,7 @@ public void testRetryReadPolicyRetriesCacheGet() {
* Tests retry policy exception handling.
*/
@Test
@Ignore
public void testExceptionInRetryPolicyPropagatesToCaller() {
Assume.assumeFalse(partitionAware);

Expand Down Expand Up @@ -294,6 +334,7 @@ public void testExceptionInRetryPolicyPropagatesToCaller() {
*/
@SuppressWarnings("ThrowableNotThrown")
@Test
@Ignore
public void testNullRetryPolicyDisablesFailover() {
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1);
IgniteClient client = Ignition.startClient(getClientConfiguration()
Expand Down Expand Up @@ -322,6 +363,7 @@ public void testNullRetryPolicyDisablesFailover() {
*/
@SuppressWarnings("ThrowableNotThrown")
@Test
@Ignore
public void testRetryNonePolicyDisablesFailover() {
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1);
IgniteClient client = Ignition.startClient(getClientConfiguration()
Expand Down Expand Up @@ -349,6 +391,7 @@ public void testRetryNonePolicyDisablesFailover() {
* Tests that {@link ClientOperationType} is updated accordingly when {@link ClientOperation} is added.
*/
@Test
@Ignore
public void testRetryPolicyConvertOpAllOperationsSupported() {
List<ClientOperation> nullOps = Arrays.stream(ClientOperation.values())
.filter(o -> o.toPublicOperationType() == null)
Expand All @@ -369,6 +412,7 @@ public void testRetryPolicyConvertOpAllOperationsSupported() {
* Test that failover doesn't lead to silent query inconsistency.
*/
@Test
@Ignore
public void testQueryConsistencyOnFailover() throws Exception {
int CLUSTER_SIZE = 2;

Expand Down Expand Up @@ -407,6 +451,7 @@ public void testQueryConsistencyOnFailover() throws Exception {
* Test that client works properly with servers txId intersection.
*/
@Test
@Ignore
@SuppressWarnings("ThrowableNotThrown")
public void testTxWithIdIntersection() throws Exception {
// Partition-aware client connects to all known servers at the start, and dropAllThinClientConnections
Expand Down Expand Up @@ -477,6 +522,7 @@ public void testTxWithIdIntersection() throws Exception {
* Test reconnection throttling.
*/
@Test
@Ignore
@SuppressWarnings("ThrowableNotThrown")
public void testReconnectionThrottling() throws Exception {
// If partition awareness is enabled, channels are restored asynchronously without applying throttling.
Expand Down Expand Up @@ -506,7 +552,7 @@ public void testReconnectionThrottling() throws Exception {
doSleep(throttlingPeriod);

// Attempt to reconnect after throttlingPeriod should pass.
assertTrue(GridTestUtils.waitForCondition(() -> {
assertTrue(waitForCondition(() -> {
try {
cachePut(cache, 0, 0);

Expand All @@ -523,6 +569,7 @@ public void testReconnectionThrottling() throws Exception {
* Test server-side critical error.
*/
@Test
@Ignore
public void testServerCriticalError() throws Exception {
AtomicBoolean failure = new AtomicBoolean();

Expand Down Expand Up @@ -552,7 +599,7 @@ public void testServerCriticalError() throws Exception {

GridTestUtils.assertThrowsAnyCause(log, () -> cache.remove(0), ClientServerError.class, msg);

assertTrue(GridTestUtils.waitForCondition(failure::get, 1_000L));
assertTrue(waitForCondition(failure::get, 1_000L));
}
}

Expand All @@ -561,6 +608,7 @@ public void testServerCriticalError() throws Exception {
* cluster failover.
*/
@Test
@Ignore
public void testServiceMethodInvocationAfterFailover() throws Exception {
PersonExternalizable person = new PersonExternalizable("Person 1");

Expand Down Expand Up @@ -627,6 +675,7 @@ public void testServiceMethodInvocationAfterFailover() throws Exception {
* Tests that server does not disconnect idle clients when heartbeats are enabled.
*/
@Test
@Ignore
public void testServerDoesNotDisconnectIdleClientWithHeartbeats() throws Exception {
IgniteConfiguration serverCfg = getConfiguration().setClientConnectorConfiguration(
new ClientConnectorConfiguration().setIdleTimeout(2000));
Expand All @@ -646,6 +695,7 @@ public void testServerDoesNotDisconnectIdleClientWithHeartbeats() throws Excepti
* Tests service proxy failover.
*/
@Test
@Ignore
public void testServiceProxyFailover() throws Exception {
Assume.assumeTrue(partitionAware);

Expand Down Expand Up @@ -703,20 +753,27 @@ private void assertOnUnstableCluster(LocalIgniteCluster cluster, Runnable clo) t
Future<?> topChangeFut = Executors.newSingleThreadExecutor().submit(() -> {
try {
for (int i = 0; i < 5 && !stopFlag.get(); i++) {
while (cluster.size() != 1)
cluster.failNode();
while (cluster.size() != 1) {
synchronized (this) {
cluster.failNode();
}
}

while (cluster.size() != cluster.getInitialSize())
cluster.restoreNode();
while (cluster.size() != cluster.getInitialSize()) {
synchronized (this) {
cluster.restoreNode();
}
}

awaitPartitionMapExchange();
waitRebalanceFinished();
}
}
catch (InterruptedException ignore) {
catch (InterruptedException | IgniteInterruptedCheckedException ignore) {
// No-op.
}

stopFlag.set(true);
finally {
stopFlag.set(true);
}
});

// Use Ignite while nodes keep failing.
Expand All @@ -731,6 +788,23 @@ private void assertOnUnstableCluster(LocalIgniteCluster cluster, Runnable clo) t
}
}

/** */
private void waitRebalanceFinished() throws IgniteInterruptedCheckedException, InterruptedException {
awaitPartitionMapExchange(true, true, null);

for (Ignite ignite : G.allGrids()) {
GridCacheProcessor cacheProc = ((IgniteEx)ignite).context().cache();

for (String cacheName : cacheProc.cacheNames()) {
assertTrue(waitForCondition(
() -> cacheProc.internalCache(cacheName).context().topology().rebalanceFinished(
cacheProc.context().exchange().readyAffinityVersion()
), 5000
));
}
}
}

/** {@inheritDoc} */
@Override protected boolean isClientPartitionAwarenessEnabled() {
return partitionAware;
Expand Down
Loading