diff --git a/src/main/java/model/exceptions/LightChainDistributedStorageException.java b/src/main/java/model/exceptions/LightChainDistributedStorageException.java index 5e5713a6..a505f641 100644 --- a/src/main/java/model/exceptions/LightChainDistributedStorageException.java +++ b/src/main/java/model/exceptions/LightChainDistributedStorageException.java @@ -3,4 +3,7 @@ /** * Represents a runtime exception happens on distributed storage layer of LightChain. */ -public class LightChainDistributedStorageException extends Exception{} +public class LightChainDistributedStorageException extends Exception { + public LightChainDistributedStorageException(String s) { + } +} diff --git a/src/main/java/network/NetworkAdapter.java b/src/main/java/network/NetworkAdapter.java index e5151104..956f435c 100644 --- a/src/main/java/network/NetworkAdapter.java +++ b/src/main/java/network/NetworkAdapter.java @@ -1,5 +1,7 @@ package network; +import java.util.ArrayList; + import model.Entity; import model.exceptions.LightChainDistributedStorageException; import model.exceptions.LightChainNetworkingException; @@ -38,4 +40,13 @@ public interface NetworkAdapter { * @throws LightChainDistributedStorageException any unhappy path taken on retrieving the Entity. */ Entity get(Identifier identifier, String namespace) throws LightChainDistributedStorageException; + + /** + * Retrieves all entities stored on the underlying DHT of nodes that stored on this channel. + * + * @param namespace the namespace on which this query is resolved. + * @return list of all entities stored on this channel from underlying DHT. + * @throws LightChainDistributedStorageException any unhappy path taken on retrieving the Entities. + */ + ArrayList allEntities(String namespace) throws LightChainDistributedStorageException; } diff --git a/src/test/java/networking/MockConduit.java b/src/test/java/networking/MockConduit.java index 86f2191e..a135ba65 100644 --- a/src/test/java/networking/MockConduit.java +++ b/src/test/java/networking/MockConduit.java @@ -53,7 +53,7 @@ public void unicast(Entity e, Identifier target) throws LightChainNetworkingExce */ @Override public void put(Entity e) throws LightChainDistributedStorageException { - + this.networkAdapter.put(e, channel); } /** @@ -66,12 +66,12 @@ public void put(Entity e) throws LightChainDistributedStorageException { */ @Override public Entity get(Identifier identifier) throws LightChainDistributedStorageException { - return null; + return this.networkAdapter.get(identifier, channel); } @Override public ArrayList allEntities() throws LightChainDistributedStorageException { - return null; + return this.networkAdapter.allEntities(channel); } public boolean hasSent(Identifier entityId) { diff --git a/src/test/java/networking/stub/Hub.java b/src/test/java/networking/stub/Hub.java index 3a53543c..bc46d6ca 100644 --- a/src/test/java/networking/stub/Hub.java +++ b/src/test/java/networking/stub/Hub.java @@ -1,8 +1,11 @@ package networking.stub; +import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import model.Entity; +import model.exceptions.LightChainDistributedStorageException; import model.lightchain.Identifier; import network.Network; @@ -10,20 +13,91 @@ * Models the core communication part of the networking layer that allows stub network instances to talk to each other. */ public class Hub { + private final ReentrantReadWriteLock lock; + private final String channel1 = "test-network-channel-1"; + private final String channel2 = "test-network-channel-2"; private final ConcurrentHashMap networks; + private final ConcurrentHashMap channelMap1; + private final ConcurrentHashMap channelMap2; /** * Create a hub. */ public Hub() { this.networks = new ConcurrentHashMap<>(); + this.channelMap1 = new ConcurrentHashMap<>(); + this.channelMap2 = new ConcurrentHashMap<>(); + this.lock = new ReentrantReadWriteLock(); + } + + /** + * Put Entity to channel. + * + * @param e entitiy. + * @param namespace channel name. + * @throws LightChainDistributedStorageException any unhappy path taken on storing the Entity. + */ + public void putEntityToChannel(Entity e, String namespace) throws LightChainDistributedStorageException { + try { + lock.writeLock().lock(); + if (namespace.equals(channel1)) { + channelMap1.put(e.id(), e); + } else if (namespace.equals(channel2)) { + channelMap2.put(e.id(), e); + } else { + throw new LightChainDistributedStorageException("entity could not be put the given channel"); + } + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Get entity from channel. + * + * @param identifier of entity. + * @param namespace channel name. + * @return entity. + * @throws LightChainDistributedStorageException any unhappy path taken on storing the Entity. + */ + public Entity getEntityFromChannel(Identifier identifier, String namespace) + throws LightChainDistributedStorageException { + try { + lock.readLock().lock(); + if (namespace.equals(channel1)) { + return channelMap1.get(identifier); + } else if (namespace.equals(channel2)) { + return channelMap2.get(identifier); + } else { + throw new LightChainDistributedStorageException("could not get the entity"); + } + } finally { + lock.readLock().unlock(); + } + } + + /** + * Retrieves all entities stored on the underlying DHT of nodes that stored on this channel. + * + * @param namespace the namespace on which this query is resolved. + * @return list of all entities stored on this channel from underlying DHT. + * @throws LightChainDistributedStorageException any unhappy path taken on retrieving the Entities. + */ + public ArrayList getAllEntities(String namespace) throws LightChainDistributedStorageException { + if (namespace.equals(channel1)) { + return new ArrayList<>(channelMap1.values()); + } else if (namespace.equals(channel2)) { + return new ArrayList<>(channelMap2.values()); + } else { + throw new LightChainDistributedStorageException("could not get the entities"); + } } /** * Registeration of a network to the Hub. * * @param identifier identifier of network. - * @param network to be registered. + * @param network to be registered. */ public void registerNetwork(Identifier identifier, Network network) { networks.put(identifier, network); @@ -32,8 +106,8 @@ public void registerNetwork(Identifier identifier, Network network) { /** * Transfer entity from to another network on the same channel. * - * @param entity entity to be transferred. - * @param target identifier of target. + * @param entity entity to be transferred. + * @param target identifier of target. * @param channel channel on which the entity is delivered to target. */ public void transferEntity(Entity entity, Identifier target, String channel) throws IllegalStateException { diff --git a/src/test/java/networking/stub/StubNetwork.java b/src/test/java/networking/stub/StubNetwork.java index 7eed4979..13e14352 100644 --- a/src/test/java/networking/stub/StubNetwork.java +++ b/src/test/java/networking/stub/StubNetwork.java @@ -1,5 +1,6 @@ package networking.stub; +import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; import model.Entity; @@ -110,7 +111,7 @@ public void unicast(Entity e, Identifier target, String channel) throws LightCha */ @Override public void put(Entity e, String namespace) throws LightChainDistributedStorageException { - + this.hub.putEntityToChannel(e, namespace); } /** @@ -124,6 +125,18 @@ public void put(Entity e, String namespace) throws LightChainDistributedStorageE */ @Override public Entity get(Identifier identifier, String namespace) throws LightChainDistributedStorageException { - return null; + return this.hub.getEntityFromChannel(identifier, namespace); + } + + /** + * Retrieves all entities stored on the underlying DHT of nodes that stored on this channel. + * + * @param namespace the namespace on which this query is resolved. + * @return list of all entities stored on this channel from underlying DHT. + * @throws LightChainDistributedStorageException any unhappy path taken on retrieving the Entities. + */ + @Override + public ArrayList allEntities(String namespace) throws LightChainDistributedStorageException { + return this.hub.getAllEntities(namespace); } } \ No newline at end of file diff --git a/src/test/java/networking/stub/StubNetworkStorageTest.java b/src/test/java/networking/stub/StubNetworkStorageTest.java index bb73a1bb..1062b307 100644 --- a/src/test/java/networking/stub/StubNetworkStorageTest.java +++ b/src/test/java/networking/stub/StubNetworkStorageTest.java @@ -1,17 +1,195 @@ package networking.stub; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import model.Entity; +import model.exceptions.LightChainDistributedStorageException; +import network.Conduit; +import networking.MockEngine; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import unittest.fixtures.EntityFixture; + /** * Encapsulates tests for the storage side of the stub network. */ public class StubNetworkStorageTest { - // TODO: implement test scenarios - // Use mock engines with stub network. - // 1. Engine A1 (on one network) puts an entity on channel1 and Engine B1 on another network can get it on the - // same channel1 successfully, while Engine B2 on another channel2 can't get it successfully. - // 2. Engine A1 (on one network) can CONCURRENTLY put 100 different entities on channel1, and - // Engine B1 on another network can get each entity using its entity id only the the same channel, - // while Engine B2 on another channel2 can't get it any of them successfully. - // 3. Engine A1 (on one network) can CONCURRENTLY put 100 different entities on channel1, and - // Engine B1 on another network can get all of them at once using allEntities method, - // while Engine B2 on another channel2 can't get none of them using all. + private final String channel1 = "test-network-channel-1"; + private final String channel2 = "test-network-channel-2"; + private Hub hub; + private StubNetwork stubNetwork1; + private StubNetwork stubNetwork2; + private MockEngine a1; + private MockEngine b1; + private MockEngine b2; + private Conduit ca1; + private Conduit cb1; + private Conduit cb2; + private ArrayList allEntities; + private ArrayList firstChannelEntities; + private ArrayList secondChannelEntities; + + @BeforeEach + void setUp() { + this.hub = new Hub(); + stubNetwork1 = new StubNetwork(hub); + a1 = new MockEngine(); + ca1 = stubNetwork1.register(a1, channel1); + stubNetwork2 = new StubNetwork(hub); + b1 = new MockEngine(); + cb1 = stubNetwork2.register(b1, channel1); + b2 = new MockEngine(); + cb2 = stubNetwork2.register(b2, channel2); + } + + /** + * Engine A1 (on one network) puts an entity on channel1 and Engine B1 on another network can get it on the + * same channel1 successfully, while Engine B2 on another channel2 can't get it successfully. + */ + @Test + void testPutOneEntity() { + + Entity entity = new EntityFixture(); + try { + ca1.put(entity); + if (!cb1.get(entity.id()).equals(entity) || cb2.get(entity.id()) != null) { + Assertions.fail(); + } + } catch (LightChainDistributedStorageException e) { + Assertions.fail(); + } + } + + /** + * Engine A1 (on one network) can CONCURRENTLY put 100 different entities on channel1, and + * Engine B1 on another network can get each entity using its entity id only the the same channel, + * while Engine B2 on another channel2 can't get it any of them successfully. + */ + @Test + void putEntityConcurrently() { + this.putEntityConcurrentlyFunction(); + this.getEntityConcurrentlyFunction(); + } + + /** + * Engine A1 (on one network) can CONCURRENTLY put 100 different entities on channel1, and + * Engine B1 on another network can get all of them at once using allEntities method, + * while Engine B2 on another channel2 can't get none of them using all. + */ + @Test + void testAllMethodConcurrently() { + this.putEntityConcurrentlyFunction(); + this.checkAllConcurrently(); + } + + /** + * Engine B1 on another network can get all of them at once using allEntities method, + * while Engine B2 on another channel2 can't get none of them using all. + */ + private void checkAllConcurrently() { + int concurrencyDegree = 2; + AtomicInteger threadError = new AtomicInteger(); + + CountDownLatch allDone = new CountDownLatch(concurrencyDegree); + Thread[] entityThreads = new Thread[concurrencyDegree]; + for (int i = 0; i < concurrencyDegree; i++) { + int finalI = i; + entityThreads[i] = new Thread(() -> { + try { + if (finalI == 0) { + firstChannelEntities = cb1.allEntities(); + } else { + secondChannelEntities = cb2.allEntities(); + } + allDone.countDown(); + } catch (LightChainDistributedStorageException e) { + threadError.getAndIncrement(); + } + }); + } + for (Thread t : entityThreads) { + t.start(); + } + try { + boolean doneOneTime = allDone.await(60, TimeUnit.SECONDS); + Assertions.assertTrue(doneOneTime); + } catch (InterruptedException e) { + Assertions.fail(); + } + Assertions.assertEquals(0, threadError.get()); + Assertions.assertTrue(firstChannelEntities.containsAll(allEntities)); + Assertions.assertTrue(secondChannelEntities.isEmpty()); + } + + /** + * Engine A1 (on one network) can CONCURRENTLY put 100 different entities on channel1. + */ + private void putEntityConcurrentlyFunction() { + int concurrencyDegree = 100; + AtomicInteger threadError = new AtomicInteger(); + CountDownLatch putDone = new CountDownLatch(concurrencyDegree); + Thread[] entityThreads = new Thread[concurrencyDegree]; + allEntities = new ArrayList<>(); + for (int i = 0; i < concurrencyDegree; i++) { + entityThreads[i] = new Thread(() -> { + Entity entity = new EntityFixture(); + allEntities.add(entity); + try { + ca1.put(entity); + putDone.countDown(); + } catch (LightChainDistributedStorageException e) { + threadError.getAndIncrement(); + } + }); + } + for (Thread t : entityThreads) { + t.start(); + } + try { + boolean doneOneTime = putDone.await(60, TimeUnit.SECONDS); + Assertions.assertTrue(doneOneTime); + } catch (InterruptedException e) { + Assertions.fail(); + } + Assertions.assertEquals(0, threadError.get()); + } + + /** + * Engine B1 on another network can get it on the + * same channel1 successfully, while Engine B2 on another channel2 can't get it successfully. + */ + private void getEntityConcurrentlyFunction() { + int concurrencyDegree = 100; + AtomicInteger threadError = new AtomicInteger(); + CountDownLatch getDone = new CountDownLatch(concurrencyDegree); + Thread[] entityThreads = new Thread[concurrencyDegree]; + for (int i = 0; i < concurrencyDegree; i++) { + int finalI = i; + entityThreads[i] = new Thread(() -> { + Entity entity = allEntities.get(finalI); + try { + if (!cb1.get(entity.id()).equals(entity) || cb2.get(entity.id()) != null) { + threadError.getAndIncrement(); + } + getDone.countDown(); + } catch (LightChainDistributedStorageException e) { + threadError.getAndIncrement(); + } + }); + } + for (Thread t : entityThreads) { + t.start(); + } + try { + boolean doneOneTime = getDone.await(60, TimeUnit.SECONDS); + Assertions.assertTrue(doneOneTime); + } catch (InterruptedException e) { + Assertions.fail(); + } + Assertions.assertEquals(0, threadError.get()); + } }