Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 195 additions & 0 deletions src/test/java/networking/p2p/NetworkTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,201 @@ public class NetworkTest {
private final String channel1 = "test-network-channel-1";
private final String channel2 = "test-network-channel-2";

/**
*Create 10 networks, all having a mock engine registering to the same channels. The first network unicasts an entity
* fixture concurrently to all other engines, and other engines should receive it.
*/
@Test
void testTenP2pNetworksOneToAll() {
int concurrencyDegree = 9;
AtomicInteger threadError = new AtomicInteger();
CountDownLatch unicastDone = new CountDownLatch(concurrencyDegree);

P2pNetwork[] p2pNetworks = new P2pNetwork[concurrencyDegree + 1];
ArrayList<MockEngine> enginesChannel1 = new ArrayList<>();
P2pNetwork network1 = new P2pNetwork(IdentifierFixture.newIdentifier(), PORT_ZERO);
MockEngine engine1 = new MockEngine();
p2pNetworks[0] = network1;
Conduit conduit = network1.register(engine1, channel1);
Thread[] unicastThreads = new Thread[concurrencyDegree];
for (int i = 1; i <= concurrencyDegree; i++) {
P2pNetwork network = new P2pNetwork(IdentifierFixture.newIdentifier(), PORT_ZERO);
MockEngine engine = new MockEngine();
network.register(engine, channel1);
enginesChannel1.add(engine);
p2pNetworks[i] = network;
}
startNetworks(p2pNetworks);
Entity entity = new EntityFixture();
for (int i = 0; i < concurrencyDegree; i++) {
int finalI = i;
unicastThreads[i] = new Thread(() -> {
try {
conduit.unicast(entity, p2pNetworks[finalI + 1].getId());
unicastDone.countDown();
} catch (LightChainNetworkingException e) {
threadError.getAndIncrement();
}
});
}
for (Thread t : unicastThreads) {
t.start();
}
try {
boolean doneOneTime = unicastDone.await(60, TimeUnit.SECONDS);
Assertions.assertTrue(doneOneTime);
} catch (InterruptedException e) {
Assertions.fail();
}
for (MockEngine engine : enginesChannel1) {

Assertions.assertTrue(engine.hasReceived(entity));
}
Assertions.assertEquals(0, threadError.get());
}

/**
* Create 10 networks, all having a mock engine registering to the same channels.
* Each network unicasts an entity fixture concurrently to all other engines, and other engines should receive it.
*/
@Test
void testTenP2pNetworksAllToAll() {
int concurrencyDegree = 100;
AtomicInteger threadError = new AtomicInteger();
CountDownLatch unicastDone = new CountDownLatch(concurrencyDegree);
P2pNetwork[] p2pNetworks = new P2pNetwork[10];
ArrayList<MockEngine> enginesChannel1 = new ArrayList<>();
Thread[] unicastThreads = new Thread[concurrencyDegree];
ArrayList<Conduit> conduits = new ArrayList<>();
for (int i = 0; i < 10; i++) {
P2pNetwork network = new P2pNetwork(IdentifierFixture.newIdentifier(), PORT_ZERO);
MockEngine engine = new MockEngine();
Conduit conduit = network.register(engine, channel1);
conduits.add(conduit);
enginesChannel1.add(engine);
p2pNetworks[i] = network;
}
startNetworks(p2pNetworks);
int counter = 0;
for (int j = 0; j < 10; j++) {
int finalJ = j;
for (int i = 0; i < 10; i++) {
int finalI = i;
unicastThreads[counter] = new Thread(() -> {
try {
Entity entity = new EntityFixture();
if (finalI != finalJ) {
conduits.get(finalJ).unicast(entity, p2pNetworks[finalI].getId());
if (!enginesChannel1.get(finalI).hasReceived(entity)) {
threadError.getAndIncrement();
}
}
unicastDone.countDown();
} catch (LightChainNetworkingException e) {
threadError.getAndIncrement();
}
});
counter++;
}
}
for (Thread t : unicastThreads) {
t.start();
}
try {
boolean doneOneTime = unicastDone.await(60, TimeUnit.SECONDS);
Assertions.assertTrue(doneOneTime);
} catch (InterruptedException e) {
Assertions.fail();
}
Assertions.assertEquals(0, threadError.get());
}

/**
* Create 10 networks, all having two mock engine registering one registering on channel 1 and the other on channel
* 2. Each network unicasts an entity fixture concurrently to all other engines, and other engines should receive it.
* Engines registering on different channel should not receive each others’ unicasts.
*/
@Test
void testTenP2pNetworksWithTwoEnginesAllToAll() {
int concurrencyDegree = 200;
AtomicInteger threadError = new AtomicInteger();
CountDownLatch unicastDone = new CountDownLatch(concurrencyDegree);
P2pNetwork[] p2pNetworks = new P2pNetwork[10];
ArrayList<MockEngine> enginesChannel1 = new ArrayList<>();
ArrayList<MockEngine> enginesChannel2 = new ArrayList<>();
Thread[] unicastThreads = new Thread[concurrencyDegree];
ArrayList<Conduit> conduits1 = new ArrayList<>();
ArrayList<Conduit> conduits2 = new ArrayList<>();
for (int i = 0; i < 10; i++) {
P2pNetwork network = new P2pNetwork(IdentifierFixture.newIdentifier(), PORT_ZERO);
MockEngine engine1 = new MockEngine();
MockEngine engine2 = new MockEngine();
Conduit conduit1 = network.register(engine1, channel1);
Conduit conduit2 = network.register(engine2, channel2);
conduits1.add(conduit1);
conduits2.add(conduit2);
enginesChannel1.add(engine1);
enginesChannel2.add(engine2);
p2pNetworks[i] = network;
}
startNetworks(p2pNetworks);
int counter = 0;
for (int k = 0; k < 2; k++) {
int finalK = k;
for (int j = 0; j < 10; j++) {
int finalJ = j;
for (int i = 0; i < 10; i++) {
int finalI = i;
unicastThreads[counter] = new Thread(() -> {
try {
Entity entity1 = new EntityFixture();
Conduit conduit;
MockEngine engine;
if (finalK == 0) {
conduit = conduits1.get(finalJ);
engine = enginesChannel1.get(finalI);
} else {
conduit = conduits2.get(finalJ);
engine = enginesChannel2.get(finalI);
}

if (finalI != finalJ) {
conduit.unicast(entity1, p2pNetworks[finalI].getId());

if (!engine.hasReceived(entity1)) {
threadError.getAndIncrement();
}
}
unicastDone.countDown();
} catch (LightChainNetworkingException e) {
threadError.getAndIncrement();
}
});
counter++;
}
}
}
for (Thread t : unicastThreads) {
t.start();
}
try {
boolean doneOneTime = unicastDone.await(60, TimeUnit.SECONDS);
Assertions.assertTrue(doneOneTime);
} catch (InterruptedException e) {
Assertions.fail();
}
/*
We know that they received the correct ones. Check if there is redundancy.
*/
for (MockEngine mockEngine : enginesChannel1) {
Assertions.assertEquals(9, mockEngine.totalReceived());
}
for (MockEngine mockEngine : enginesChannel2) {
Assertions.assertEquals(9, mockEngine.totalReceived());
}
Assertions.assertEquals(0, threadError.get());
}

/**
* Engine A1 (on one network) can send message to Engine A2
* (on another network), and the message is received by Engine A2.
Expand Down