diff --git a/src/main/java/model/codec/EncodedEntity.java b/src/main/java/model/codec/EncodedEntity.java index 73e1afcb..8942af6a 100644 --- a/src/main/java/model/codec/EncodedEntity.java +++ b/src/main/java/model/codec/EncodedEntity.java @@ -1,18 +1,54 @@ package model.codec; +import java.io.Serializable; +import java.util.Arrays; + /** * Represents an encapsulation around the byte representation of an entity accompanied by its original type. */ -public class EncodedEntity { +public class EncodedEntity implements Serializable { private final byte[] bytes; private final String type; - // EncodedEntity(id.getBytes() || byte(i), "assignment") + /** + * Creates encoded entity. + * + * @param bytes bytes of entity. + * @param type types of entity. + */ public EncodedEntity(byte[] bytes, String type) { this.bytes = bytes.clone(); this.type = type; } + /** + * Hashcode of entity. + * + * @return hashcode of encodedentity. + */ + @Override + public int hashCode() { + return Arrays.hashCode(this.bytes); + } + + /** + * Check if objects are equal. + * + * @param o encodedentity. + * @return true if equals. + */ + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof EncodedEntity)) { + return false; + } + EncodedEntity that = (EncodedEntity) o; + return Arrays.equals(this.bytes, that.bytes); + } + public byte[] getBytes() { return bytes.clone(); } diff --git a/src/main/java/model/crypto/Sha3256Hash.java b/src/main/java/model/crypto/Sha3256Hash.java index 7943a858..ffe50277 100644 --- a/src/main/java/model/crypto/Sha3256Hash.java +++ b/src/main/java/model/crypto/Sha3256Hash.java @@ -1,5 +1,6 @@ package model.crypto; +import java.io.Serializable; import java.util.Arrays; import model.lightchain.Identifier; @@ -8,7 +9,7 @@ * Represents SHA3-256 data type which extends abstract Hash data type for * the cryptographic hash function used in LightChain. */ -public class Sha3256Hash extends Hash { +public class Sha3256Hash extends Hash implements Serializable { public static final int Size = 32; private final byte[] hashBytes; diff --git a/src/main/java/model/crypto/Signature.java b/src/main/java/model/crypto/Signature.java index dbeea7db..7d0cbc3c 100644 --- a/src/main/java/model/crypto/Signature.java +++ b/src/main/java/model/crypto/Signature.java @@ -30,4 +30,5 @@ public Identifier getSignerId() { public byte[] getBytes() { return bytes.clone(); } + } diff --git a/src/main/java/storage/Distributed.java b/src/main/java/storage/Distributed.java index 936a68d5..02487be9 100644 --- a/src/main/java/storage/Distributed.java +++ b/src/main/java/storage/Distributed.java @@ -3,7 +3,7 @@ import java.util.ArrayList; import model.Entity; -import model.lightchain.Block; +import model.exceptions.CodecException; import model.lightchain.Identifier; /** @@ -26,7 +26,7 @@ public interface Distributed { * @return true if entity did not exist on the database, false if entity is already in * database. */ - boolean add(Entity e); + boolean add(Entity e) throws CodecException; /** * Removes entity with given identifier. @@ -35,7 +35,7 @@ public interface Distributed { * @return true if entity exists on database and removed successfully, false if entity does not exist on * database. */ - boolean remove(Entity e); + boolean remove(Entity e) throws CodecException; /** * Returns the entity with given identifier. @@ -43,12 +43,12 @@ public interface Distributed { * @param e identifier of the entity. * @return the entity itself if exists and null otherwise. */ - Block get(Identifier e); + Entity get(Identifier e) throws CodecException; /** * Returns all entities stored in database. * * @return all stored entities in database. */ - ArrayList all(); + ArrayList all() throws CodecException; } diff --git a/src/main/java/storage/mapdb/DistributedMapDb.java b/src/main/java/storage/mapdb/DistributedMapDb.java new file mode 100644 index 00000000..e4301985 --- /dev/null +++ b/src/main/java/storage/mapdb/DistributedMapDb.java @@ -0,0 +1,151 @@ +package storage.mapdb; + +import java.util.ArrayList; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import model.Entity; +import model.codec.EncodedEntity; +import model.exceptions.CodecException; +import model.lightchain.Identifier; +import modules.codec.JsonEncoder; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.HTreeMap; +import org.mapdb.Serializer; + +/** + * Distributed databese that store encoded entities. + */ +public class DistributedMapDb implements storage.Distributed { + + private final DB db; + private final ReentrantReadWriteLock lock; + private static final String MAP_NAME = "distributed_map"; + private final HTreeMap distributedMap; + + /** + * Creates DistributedMapDb. + */ + public DistributedMapDb(String filePath) { + this.db = DBMaker.fileDB(filePath).make(); + this.lock = new ReentrantReadWriteLock(); + distributedMap = this.db.hashMap(MAP_NAME) + .keySerializer(Serializer.BYTE_ARRAY) + .createOrOpen(); + } + + /** + * Checks existence of entity on the database. + * + * @param entityId Identifier of entity. + * @return true if a entity with that identifier exists, false otherwise. + */ + @Override + public boolean has(Identifier entityId) { + boolean hasBoolean; + try { + lock.readLock().lock(); + hasBoolean = distributedMap.containsKey(entityId.getBytes()); + } finally { + lock.readLock().unlock(); + } + return hasBoolean; + } + + /** + * Adds entity to the database. + * + * @param e given entity to be added. + * @return true if entity did not exist on the database, false if entity is already in + * database. + */ + @Override + public boolean add(Entity e) throws CodecException { + JsonEncoder encoder = new JsonEncoder(); + boolean addBoolean; + try { + lock.writeLock().lock(); + addBoolean = distributedMap.putIfAbsentBoolean(e.id().getBytes(), encoder.encode(e)); + } catch (CodecException ex) { + throw new CodecException("could not encode the entity", ex); + } finally { + lock.writeLock().unlock(); + } + return addBoolean; + } + + /** + * Removes entity with given identifier. + * + * @param e identifier of the entity. + * @return true if entity exists on database and removed successfully, false if entity does not exist on + * database. + */ + @Override + public boolean remove(Entity e) throws CodecException { + JsonEncoder encoder = new JsonEncoder(); + boolean removeBoolean; + try { + lock.writeLock().lock(); + removeBoolean = distributedMap.remove(e.id().getBytes(), encoder.encode(e)); + } catch (CodecException exception) { + throw new CodecException("could not encode entity", exception); + } finally { + lock.writeLock().unlock(); + } + return removeBoolean; + } + + /** + * Returns the entity with given identifier. + * + * @param entityId identifier of the entity. + * @return the entity itself if exists and null otherwise. + */ + @Override + public Entity get(Identifier entityId) throws CodecException { + + Entity decodedEntity; + + try { + JsonEncoder encoder = new JsonEncoder(); + lock.readLock().lock(); + EncodedEntity encodedEntity = (EncodedEntity) distributedMap.get(entityId.getBytes()); + if (encodedEntity == null) { + return null; + } + decodedEntity = encoder.decode(encodedEntity); + } catch (CodecException e) { + throw new CodecException("could not found the class", e); + } finally { + lock.readLock().unlock(); + } + return decodedEntity; + } + + /** + * Returns all entities stored in database. + * + * @return all stored entities in database. + */ + @Override + public ArrayList all() throws CodecException { + JsonEncoder encoder = new JsonEncoder(); + ArrayList allEntities = new ArrayList<>(); + for (Object encodedEntity : distributedMap.values()) { + try { + allEntities.add(encoder.decode((EncodedEntity) encodedEntity)); + } catch (CodecException e) { + throw new CodecException("could not found the class", e); + } + } + return allEntities; + } + + /** + * It closes the database. + */ + public void closeDb() { + db.close(); + } +} diff --git a/src/test/java/modules/JsonEncoderTest.java b/src/test/java/modules/JsonEncoderTest.java index e390786f..9451bf16 100644 --- a/src/test/java/modules/JsonEncoderTest.java +++ b/src/test/java/modules/JsonEncoderTest.java @@ -18,7 +18,7 @@ public class JsonEncoderTest { @Test public void testEncodingRoundTrip() throws CodecException { JsonEncoder encoder = new JsonEncoder(); - EntityFixture entity = new EntityFixture(); + Entity entity = new EntityFixture(); Entity entityChanged = encoder.decode(encoder.encode(entity)); Assertions.assertEquals(entity, entityChanged); } diff --git a/src/test/java/storage/DistributedMapDbTest.java b/src/test/java/storage/DistributedMapDbTest.java new file mode 100644 index 00000000..1ea0c05a --- /dev/null +++ b/src/test/java/storage/DistributedMapDbTest.java @@ -0,0 +1,442 @@ +package storage; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import model.Entity; +import model.exceptions.CodecException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.org.apache.commons.io.FileUtils; +import storage.mapdb.DistributedMapDb; +import unittest.fixtures.BlockFixture; +import unittest.fixtures.TransactionFixture; + +/** + * Encapsulates tests for distributed storage. + */ +public class DistributedMapDbTest { + + private static final String TEMP_DIR = "tempdir"; + private static final String TEMP_FILE_ID = "tempfileID.db"; + private Path tempdir; + private ArrayList allEntities; + private DistributedMapDb db; + + /** + * Initialize database. + */ + @BeforeEach + void setUp() throws IOException { + Path currentRelativePath = Paths.get(""); + tempdir = Files.createTempDirectory(currentRelativePath, TEMP_DIR); + db = new DistributedMapDb(tempdir.toAbsolutePath() + "/" + TEMP_FILE_ID); + allEntities = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + allEntities.add(BlockFixture.newBlock(10)); + allEntities.add(TransactionFixture.newTransaction(10)); + } + } + + /** + * Closes database. + * + * @throws IOException if deleting temporary directory faces unhappy path. + */ + @AfterEach + void cleanup() throws IOException { + db.closeDb(); + FileUtils.deleteDirectory(new File(tempdir.toString())); + } + + /** + * When adding 20 new entities of different types (10 transactions and 10 blocks) sequentially, + * the Add method must return true for all of them. Moreover, after + * adding entities are done, querying the Has method for each of the entities should return true. + * After adding all entities + * are done, each entity must be retrievable using both its id (get). Also, when + * querying All method, list of all 20 entities must be returned. + * + * @throws IOException throw IOException. + */ + @Test + void sequentialAddTest() throws IOException, CodecException { + for (Entity entity : allEntities) { + Assertions.assertTrue(db.add(entity)); + } + for (Entity entity : allEntities) { + Assertions.assertTrue(db.has(entity.id())); + } + for (Entity entity : allEntities) { + Assertions.assertTrue(allEntities.contains(db.get(entity.id()))); + } + ArrayList all = db.all(); + Assertions.assertEquals(all.size(), 20); + for (Entity entity : allEntities) { + Assertions.assertTrue(all.contains(entity)); + } + db.closeDb(); + try { + FileUtils.deleteDirectory(new File(tempdir.toString())); + } catch (IOException e) { + throw new IOException("could not delete directory"); + } + } + + /** + * When adding 20 new entities of different types (10 transactions and 10 blocks) CONCURRENTLY, + * the Add method must return true for all of them. Moreover, after + * adding entities are done, querying the Has method for each of the entities should return true. + * After adding all entities + * are done, each entity must be retrievable using both its id (get). Also, when + * querying All method, list of all 20 entities must be returned. + */ + @Test + void concurrentAddTest() throws CodecException { + this.addAllEntitiesConcurrently(true); + this.checkForHasConcurrently(0); + this.checkForGetConcurrently(0); + this.checkForAllConcurrently(0); + } + + /** + * Add 20 new entities sequentially (10 transactions and 10 blocks), check that they are added correctly, i.e., + * while adding each entity Add must return + * true, Has returns true for each of them, each entity is retrievable by its identifier, + * and All returns list of all of them. + * Then Remove the first 10 entities (5 blocks and 5 transactions) sequentially. + * While Removing each of them, the Remove should return true. Then query all 20 entities using has, and get. + * Has should return false for the first 5 blocks amd 5 transactions that have been removed, + * and get should return null. But for the last 5 blocks and 5 transactions, has should return true, and get + * should successfully retrieve the exact entity. + * Also, All should return only the last 5 blocks and 5 transactions. + * + * @throws IOException for any unhappy path. + */ + @Test + void removeFirstTenTest() throws IOException, CodecException { + for (Entity entity : allEntities) { + Assertions.assertTrue(db.add(entity)); + } + for (int i = 0; i < 10; i++) { + Assertions.assertTrue(db.remove(allEntities.get(i))); + } + for (int i = 0; i < 20; i++) { + if (i < 10) { + Assertions.assertFalse(db.has(allEntities.get(i).id()) || db.all().contains(allEntities.get(i))); + } else { + Assertions.assertTrue(db.has(allEntities.get(i).id()) && db.all().contains(allEntities.get(i))); + } + } + db.closeDb(); + try { + FileUtils.deleteDirectory(new File(tempdir.toString())); + } catch (IOException e) { + throw new IOException("could not delete directory"); + } + } + + /** + * Add 20 new entities CONCURRENTLY (10 transactions and 10 blocks), check that they are added correctly, i.e., + * while adding each entity Add must return + * true, Has returns true for each of them, each entity is retrievable by its identifier, + * and All returns list of all of them. + * Then Remove the first 10 entities (5 blocks and 5 transactions) sequentially. + * While Removing each of them, the Remove should return true. Then query all 20 entities using has, and get. + * Has should return false for the first 5 blocks amd 5 transactions that have been removed, + * and get should return null. But for the last 5 blocks and 5 transactions, has should return true, and get + * should successfully retrieve the exact entity. + * Also, All should return only the last 5 blocks and 5 transactions. + */ + @Test + void concurrentRemoveFirstTenTest() throws CodecException { + this.addAllEntitiesConcurrently(true); + + this.checkForGetConcurrently(0); + this.checkForHasConcurrently(0); + this.checkForAllConcurrently(0); + + this.removeEntityTill(10); + + this.checkForGetConcurrently(10); + this.checkForHasConcurrently(10); + this.checkForAllConcurrently(10); + } + + /** + * Removes entities from distributed storage database till the given index concurrently. + * + * @param till exclusive index of the last entity being removed. + */ + private void removeEntityTill(int till) { + AtomicInteger threadError = new AtomicInteger(); + CountDownLatch doneRemove = new CountDownLatch(till); + Thread[] removeThreads = new Thread[till]; + for (int i = 0; i < till; i++) { + int finalI = i; + removeThreads[i] = new Thread(() -> { + try { + if (!db.remove(allEntities.get(finalI))) { + threadError.getAndIncrement(); + } + } catch (CodecException e) { + threadError.getAndIncrement(); + } + doneRemove.countDown(); + }); + } + + for (Thread t : removeThreads) { + t.start(); + } + try { + boolean doneOneTime = doneRemove.await(60, TimeUnit.SECONDS); + Assertions.assertTrue(doneOneTime); + } catch (InterruptedException e) { + Assertions.fail(); + } + Assertions.assertEquals(0, threadError.get()); + + } + + /** + * Add 20 new entities sequentially (10 blocks, and 10 transactions) + * and check that all of them are added correctly, i.e., while adding each entity + * Add must return true, has returns true for each of them, and All returns list of all of them. Moreover, each + * entity is retrievable using its identifier (get). Then try Adding all of them again, and + * Add should return false for each of them, while has should still return true, and get should be + * able to retrieve the entity. + * + * @throws IOException for any unhappy path. + */ + @Test + void duplicationTest() throws IOException, CodecException { + for (Entity entity : allEntities) { + Assertions.assertTrue(db.add(entity)); + } + for (Entity entity : allEntities) { + Assertions.assertTrue(db.has(entity.id())); + } + ArrayList all = db.all(); + Assertions.assertEquals(all.size(), 20); + for (Entity entity : all) { + Assertions.assertTrue(allEntities.contains(entity)); + } + for (Entity entity : allEntities) { + Assertions.assertTrue(allEntities.contains(db.get(entity.id()))); + } + for (Entity entity : allEntities) { + Assertions.assertFalse(db.add(entity)); + } + /* + After trying duplication, check again. + */ + for (Entity entity : allEntities) { + Assertions.assertTrue(db.has(entity.id())); + } + for (Entity entity : allEntities) { + Assertions.assertTrue(allEntities.contains(db.get(entity.id()))); + } + db.closeDb(); + try { + FileUtils.deleteDirectory(new File(tempdir.toString())); + } catch (IOException e) { + throw new IOException("could not delete directory"); + } + } + + /** + * Add 20 new entities concurrently (10 blocks, and 10 transactions) + * and check that all of them are added correctly, i.e., while adding each entity + * Add must return true, has returns true for each of them, and All returns list of all of them. Moreover, each + * entity is retrievable using its identifier (get). Then try Adding all of them again, and + * Add should return false for each of them, while has should still return true, and get should be + * able to retrieve the entity. + */ + @Test + void concurrentDuplicationTest() throws CodecException { + this.addAllEntitiesConcurrently(true); + + this.checkForGetConcurrently(0); + this.checkForHasConcurrently(0); + this.checkForAllConcurrently(0); + + this.addAllEntitiesConcurrently(false); + + this.checkForGetConcurrently(0); + this.checkForHasConcurrently(0); + this.checkForAllConcurrently(0); + } + + /** + * Adds all entities to the distributed storage database till the given index concurrently. + * + * @param expectedResult expected boolean result after each insertion; true means entity added successfully, + * false means entity was not added successfully. + */ + private void addAllEntitiesConcurrently(boolean expectedResult) { + AtomicInteger threadError = new AtomicInteger(); + CountDownLatch addDone = new CountDownLatch(allEntities.size()); + Thread[] addThreads = new Thread[allEntities.size()]; + + for (int i = 0; i < allEntities.size(); i++) { + int finalI = i; + addThreads[i] = new Thread(() -> { + try { + if (db.add(allEntities.get(finalI)) != expectedResult) { + threadError.getAndIncrement(); + } + } catch (CodecException e) { + threadError.getAndIncrement(); + } + addDone.countDown(); + }); + } + for (Thread t : addThreads) { + t.start(); + } + try { + boolean doneOneTime = addDone.await(60, TimeUnit.SECONDS); + Assertions.assertTrue(doneOneTime); + } catch (InterruptedException e) { + Assertions.fail(); + } + + Assertions.assertEquals(0, threadError.get()); + } + + /** + * Checks existence of entities in the entity storage database starting from the given index. + * + * @param from inclusive index of the first entity to check. + */ + private void checkForHasConcurrently(int from) { + AtomicInteger threadError = new AtomicInteger(); + CountDownLatch hasDone = new CountDownLatch(allEntities.size()); + Thread[] hasThreads = new Thread[allEntities.size()]; + for (int i = 0; i < allEntities.size(); i++) { + int finalI = i; + Entity entity = allEntities.get(i); + + hasThreads[i] = new Thread(() -> { + if (finalI < from) { + if (this.db.has(entity.id())) { + threadError.incrementAndGet(); + } + } else { + if (!this.db.has(entity.id())) { + threadError.getAndIncrement(); + } + } + + hasDone.countDown(); + }); + } + for (Thread t : hasThreads) { + t.start(); + } + try { + boolean doneOneTime = hasDone.await(60, TimeUnit.SECONDS); + Assertions.assertTrue(doneOneTime); + } catch (InterruptedException e) { + Assertions.fail(); + } + + Assertions.assertEquals(0, threadError.get()); + } + + /** + * Checks retrievability of entity from the distributed storage database starting from the given index. + * + * @param from inclusive index of the first entity to check. + */ + private void checkForGetConcurrently(int from) { + AtomicInteger threadError = new AtomicInteger(); + CountDownLatch getDone = new CountDownLatch(allEntities.size()); + Thread[] getThreads = new Thread[allEntities.size()]; + for (int i = 0; i < allEntities.size(); i++) { + int finalI = i; + Entity entity = allEntities.get(i); + getThreads[i] = new Thread(() -> { + Entity got = null; + try { + got = db.get(entity.id()); + } catch (CodecException e) { + threadError.incrementAndGet(); + } + if (finalI < from) { + if (got != null) { + threadError.incrementAndGet(); + } + } else { + if (!entity.equals(got)) { + threadError.getAndIncrement(); + } + if (!entity.id().equals(got.id())) { + threadError.getAndIncrement(); + } + } + getDone.countDown(); + }); + } + + for (Thread t : getThreads) { + t.start(); + } + try { + boolean doneOneTime = getDone.await(60, TimeUnit.SECONDS); + Assertions.assertTrue(doneOneTime); + } catch (InterruptedException e) { + Assertions.fail(); + } + Assertions.assertEquals(0, threadError.get()); + } + + /** + * Checks retrievability of entities from the distributed storage database starting from the given index. + * + * @param from inclusive index of the first transaction to check. + */ + private void checkForAllConcurrently(int from) throws CodecException { + AtomicInteger threadError = new AtomicInteger(); + CountDownLatch doneAll = new CountDownLatch(allEntities.size()); + Thread[] allThreads = new Thread[allEntities.size()]; + ArrayList all = db.all(); + for (int i = 0; i < allEntities.size(); i++) { + int finalI = i; + final Entity entity = allEntities.get(i); + allThreads[i] = new Thread(() -> { + if (finalI < from) { + if (all.contains(entity)) { + threadError.incrementAndGet(); + } + } else { + if (!all.contains(entity)) { + threadError.getAndIncrement(); + } + } + doneAll.countDown(); + }); + } + + for (Thread t : allThreads) { + t.start(); + } + try { + boolean doneOneTime = doneAll.await(60, TimeUnit.SECONDS); + Assertions.assertTrue(doneOneTime); + } catch (InterruptedException e) { + Assertions.fail(); + } + Assertions.assertEquals(0, threadError.get()); + } +} diff --git a/src/test/java/storage/DistributedTest.java b/src/test/java/storage/DistributedTest.java deleted file mode 100644 index 8ce62150..00000000 --- a/src/test/java/storage/DistributedTest.java +++ /dev/null @@ -1,40 +0,0 @@ -package storage; - -/** - * Encapsulates tests for distributed storage. - */ -public class DistributedTest { - // TODO: implement a unit test for each of the following scenarios: - // IMPORTANT NOTE: each test must have a separate instance of database, and the database MUST only created on a - // temporary directory. - // In following tests by a "new" entity, we mean an entity that already does not exist in the database, - // and by a "duplicate" entity, we mean one that already exists in the database. - // 1. When adding 20 new entities of different types (10 transactions and 10 blocks) sequentially, - // the Add method must return true for all of them. Moreover, after - // adding entities are done, querying the Has method for each of the entities should return true. - // After adding all entities - // are done, each entity must be retrievable using both its id (get). Also, when - // querying All method, list of all 20 entities must be returned. - // 2. Repeat test case 1 for concurrently adding entities as well as concurrently querying the database for has, and - // get. - // 3. Add 20 new entities sequentially (10 transactions and 10 blocks), check that they are added correctly, i.e., - // while adding each entity Add must return - // true, Has returns true for each of them, each entity is retrievable by its identifier, - // and All returns list of all of them. - // Then Remove the first 10 entities (5 blocks and 5 transactions) sequentially. - // While Removing each of them, the Remove should return true. Then query all 20 entities using has, and get. - // Has should return false for the first 5 blocks amd 5 transactions that have been removed, - // and get should return null. But for the last 5 blocks and 5 transactions, has should return true, and get - // should successfully retrieve the exact entity. - // Also, All should return only the last 5 blocks and 5 transactions. - // 4. Repeat test case 3 for concurrently adding and removing entities as well as concurrently querying the - // database for has, and get. - // 5. Add 20 new entities (10 blocks, and 10 transactions) - // and check that all of them are added correctly, i.e., while adding each entity - // Add must return true, has returns true for each of them, and All returns list of all of them. Moreover, each - // entity is retrievable using its identifier (get). Then try Adding all of them again, and - // Add should return false for each of them, while has should still return true, and get should be - // able to retrieve the entity. - // 6. Repeat test case 5 for concurrently adding entities as well as concurrently querying the - // database for has, get. -}