diff --git a/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java b/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java index d03015069..344a60a80 100644 --- a/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java +++ b/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java @@ -233,14 +233,26 @@ public ArcusFuture> multiSet(List keys, int exp, T return multiStore(StoreType.set, keys, exp, value); } + public ArcusFuture> multiSet(Map elements, int exp) { + return multiStore(StoreType.set, elements, exp); + } + public ArcusFuture> multiAdd(List keys, int exp, T value) { return multiStore(StoreType.add, keys, exp, value); } + public ArcusFuture> multiAdd(Map elements, int exp) { + return multiStore(StoreType.add, elements, exp); + } + public ArcusFuture> multiReplace(List keys, int exp, T value) { return multiStore(StoreType.replace, keys, exp, value); } + public ArcusFuture> multiReplace(Map elements, int exp) { + return multiStore(StoreType.replace, elements, exp); + } + /** * @param type store type * @param keys key list to store @@ -258,6 +270,38 @@ private ArcusFuture> multiStore(StoreType type, keyToFuture.put(key, future); } + return buildMultiFuture(keyToFuture); + } + + /** + * @param type store type + * @param elements map of key to value to store + * @param exp expiration time + * @return ArcusFuture with Map of key to Boolean result. If an operation fails exceptionally, + * the corresponding value in the map will be null. + */ + private ArcusFuture> multiStore(StoreType type, + Map elements, + int exp) { + Map> keyToFuture = new HashMap<>(elements.size()); + + for (Map.Entry entry : elements.entrySet()) { + CompletableFuture future = + store(type, entry.getKey(), exp, entry.getValue()).toCompletableFuture(); + keyToFuture.put(entry.getKey(), future); + } + + return buildMultiFuture(keyToFuture); + } + + /** + * Combine multiple CompletableFutures into a single ArcusFuture. + * + * @param keyToFuture a map of keys to their corresponding CompletableFutures + * @return an ArcusFuture that completes with a map of keys to their Boolean results. + */ + private ArcusMultiFuture> buildMultiFuture( + Map> keyToFuture) { return new ArcusMultiFuture<>(keyToFuture.values(), () -> { Map results = new HashMap<>(); for (Map.Entry> entry : keyToFuture.entrySet()) { @@ -848,7 +892,7 @@ public ArcusFuture>> bopMultiGet(List keys, for (Map.Entry> entry : arrangedKeys) { BTreeGetBulk getBulk = - createBTreeGetBulk(entry.getKey(), entry.getValue(), from, to, args); + createBTreeGetBulk(entry.getKey(), entry.getValue(), from, to, args); CompletableFuture>> future = bopMultiGetPerNode(client, getBulk).toCompletableFuture(); futureToKeys.put(future, entry.getValue()); diff --git a/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java b/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java index e08959f38..2c9c0deb2 100644 --- a/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java +++ b/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java @@ -93,7 +93,7 @@ public interface AsyncArcusCommandsIF { ArcusFuture prepend(String key, T val); /** - * Set values for multiple keys. + * Sets the same value for multiple keys. * * @param keys list of keys to store * @param exp expiration time in seconds @@ -103,7 +103,16 @@ public interface AsyncArcusCommandsIF { ArcusFuture> multiSet(List keys, int exp, T value); /** - * Add values for multiple keys if they do not exist. + * Sets multiple key-value pairs. + * + * @param elements map of keys and values to store + * @param exp expiration time in seconds + * @return Map of key to Boolean result + */ + ArcusFuture> multiSet(Map elements, int exp); + + /** + * Add the same value for multiple keys if they do not exist. * * @param keys list of keys to store * @param exp expiration time in seconds @@ -113,7 +122,17 @@ public interface AsyncArcusCommandsIF { ArcusFuture> multiAdd(List keys, int exp, T value); /** - * Replace values for multiple keys if they exist. + * Add multiple key-value pairs if they do not exist. + * + * @param elements map of keys and values to store + * @param exp expiration time in seconds + * @return Map of key to Boolean result + */ + ArcusFuture> multiAdd(Map elements, int exp); + + + /** + * Replace the same value for multiple keys if they exist. * * @param keys list of keys to store * @param exp expiration time in seconds @@ -122,6 +141,15 @@ public interface AsyncArcusCommandsIF { */ ArcusFuture> multiReplace(List keys, int exp, T value); + /** + * Replace multiple key-value pairs if they exist. + * + * @param elements map of keys and values to store + * @param exp expiration time in seconds + * @return Map of key to Boolean result + */ + ArcusFuture> multiReplace(Map elements, int exp); + /** * Get a value for the given key. * diff --git a/src/test/java/net/spy/memcached/v2/KVAsyncArcusCommandsTest.java b/src/test/java/net/spy/memcached/v2/KVAsyncArcusCommandsTest.java index 5c51427bf..3f11a84aa 100644 --- a/src/test/java/net/spy/memcached/v2/KVAsyncArcusCommandsTest.java +++ b/src/test/java/net/spy/memcached/v2/KVAsyncArcusCommandsTest.java @@ -1,5 +1,6 @@ package net.spy.memcached.v2; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CancellationException; @@ -19,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -90,7 +92,7 @@ void multiSetAndGet() throws ExecutionException, InterruptedException, TimeoutEx } @Test - void multiSetFail() throws ExecutionException, InterruptedException, TimeoutException { + void multiAddFail() throws ExecutionException, InterruptedException, TimeoutException { // given async.set(keys.get(0), 0, VALUE) // when @@ -365,4 +367,197 @@ void casExists() throws ExecutionException, InterruptedException, TimeoutExcepti .toCompletableFuture() .get(300L, TimeUnit.MILLISECONDS); } + + @Test + void multiSetMapSuccess() throws ExecutionException, InterruptedException, TimeoutException { + // given + Map elements = new HashMap<>(); + + for (int i = 0; i < keys.size(); i++) { + elements.put(keys.get(i), VALUE + i); + } + + // when + async.multiSet(elements, 60) + .thenCompose(result -> { + assertEquals(keys.size(), result.size()); + for (Boolean b : result.values()) { + assertTrue(b); + } + return async.multiGet(keys); + }) + // then + .thenAccept(result -> { + assertEquals(keys.size(), result.size()); + for (int i = 0; i < keys.size(); i++) { + assertEquals(VALUE + i, result.get(keys.get(i))); + } + }) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void multiAddMapSuccess() throws ExecutionException, InterruptedException, TimeoutException { + // given + Map elements = new HashMap<>(); + for (int i = 0; i < keys.size(); i++) { + elements.put(keys.get(i), VALUE + i); + } + + // when + async.multiAdd(elements, 60) + .thenCompose(result -> { + assertEquals(keys.size(), result.size()); + for (Boolean b : result.values()) { + assertTrue(b); + } + return async.multiGet(keys); + }) + // then + .thenAccept(result -> { + assertEquals(keys.size(), result.size()); + for (int i = 0; i < keys.size(); i++) { + assertEquals(VALUE + i, result.get(keys.get(i))); + } + }) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void multiAddMapPartialSuccess() throws ExecutionException, InterruptedException, + TimeoutException { + + // given + Map elements = new HashMap<>(); + for (int i = 0; i < keys.size(); i++) { + elements.put(keys.get(i), VALUE + i); + } + + /* 0th key is added before multiAdd, so it should fail. */ + async.set(keys.get(0), 60, VALUE + "-old") + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + async.multiAdd(elements, 60) + .thenCompose(result -> { + assertEquals(keys.size(), result.size()); + assertFalse(result.get(keys.get(0))); + for (int i = 1; i < keys.size(); i++) { + assertTrue(result.get(keys.get(i))); + } + return async.multiGet(keys); + }) + // then + .thenAccept(result -> { + assertEquals(keys.size(), result.size()); + assertEquals(VALUE + "-old", result.get(keys.get(0))); + for (int i = 1; i < keys.size(); i++) { + assertEquals(VALUE + i, result.get(keys.get(i))); + } + }) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void multiReplaceMapSuccess() throws ExecutionException, InterruptedException, + TimeoutException { + + // given + Map oldElements = new HashMap<>(); + for (int i = 0; i < keys.size(); i++) { + oldElements.put(keys.get(i), VALUE + "-old-" + i); + } + + Map newElements = new HashMap<>(); + for (int i = 0; i < keys.size(); i++) { + newElements.put(keys.get(i), VALUE + "-new-" + i); + } + + async.multiSet(oldElements, 60) + .thenAccept(result -> { + assertEquals(keys.size(), result.size()); + for (Boolean b : result.values()) { + assertTrue(b); + } + }) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + async.multiReplace(newElements, 60) + .thenCompose(result -> { + assertEquals(keys.size(), result.size()); + for (Boolean b : result.values()) { + assertTrue(b); + } + return async.multiGet(keys); + }) + // then + .thenAccept(result -> { + assertEquals(keys.size(), result.size()); + for (int i = 0; i < keys.size(); i++) { + assertEquals(VALUE + "-new-" + i, result.get(keys.get(i))); + } + }) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } + + @Test + void multiReplaceMapPartialSuccess() throws ExecutionException, InterruptedException, + TimeoutException { + + // given + Map oldElements = new HashMap<>(); + for (int i = 0; i < 2; i++) { + oldElements.put(keys.get(i), VALUE + "-old-" + i); + } + + Map newElements = new HashMap<>(); + for (int i = 0; i < keys.size(); i++) { + newElements.put(keys.get(i), VALUE + "-new-" + i); + } + + /* 0, 1st keys are added before multiReplace, so only they should succeed. */ + async.multiSet(oldElements, 60) + .thenAccept(result -> { + assertEquals(oldElements.size(), result.size()); + for (Boolean b : result.values()) { + assertTrue(b); + } + }) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + + // when + async.multiReplace(newElements, 60) + .thenCompose(result -> { + assertEquals(keys.size(), result.size()); + for (int i = 0; i < 2; i++) { + assertTrue(result.get(keys.get(i))); + } + + for (int i = 2; i < keys.size(); i++) { + assertFalse(result.get(keys.get(i))); + } + return async.multiGet(keys); + }) + // then + .thenAccept(result -> { + assertEquals(2, result.size()); + for (int i = 0; i < 2; i++) { + assertEquals(VALUE + "-new-" + i, result.get(keys.get(i))); + } + for (int i = 2; i < keys.size(); i++) { + assertNull(result.get(keys.get(i))); + } + }) + .toCompletableFuture() + .get(300L, TimeUnit.MILLISECONDS); + } }