Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,26 @@ public ArcusFuture<Map<String, Boolean>> multiSet(List<String> keys, int exp, T
return multiStore(StoreType.set, keys, exp, value);
}

public ArcusFuture<Map<String, Boolean>> multiSet(Map<String, T> elements, int exp) {
return multiStore(StoreType.set, elements, exp);
}

public ArcusFuture<Map<String, Boolean>> multiAdd(List<String> keys, int exp, T value) {
return multiStore(StoreType.add, keys, exp, value);
}

public ArcusFuture<Map<String, Boolean>> multiAdd(Map<String, T> elements, int exp) {
return multiStore(StoreType.add, elements, exp);
}

public ArcusFuture<Map<String, Boolean>> multiReplace(List<String> keys, int exp, T value) {
return multiStore(StoreType.replace, keys, exp, value);
}

public ArcusFuture<Map<String, Boolean>> multiReplace(Map<String, T> elements, int exp) {
return multiStore(StoreType.replace, elements, exp);
}

/**
* @param type store type
* @param keys key list to store
Expand All @@ -258,6 +270,38 @@ private ArcusFuture<Map<String, Boolean>> multiStore(StoreType type,
keyToFuture.put(key, future);
}

return buildMultiFuture(keyToFuture);
}

/**
* @param type store type
* @param elements map of key to value to store
* @param exp expiration time
* @return ArcusFuture with Map of key to Boolean result. If an operation fails exceptionally,
* the corresponding value in the map will be null.
*/
private ArcusFuture<Map<String, Boolean>> multiStore(StoreType type,
Map<String, T> elements,
int exp) {
Map<String, CompletableFuture<?>> keyToFuture = new HashMap<>(elements.size());

for (Map.Entry<String, T> entry : elements.entrySet()) {
CompletableFuture<Boolean> future =
store(type, entry.getKey(), exp, entry.getValue()).toCompletableFuture();
keyToFuture.put(entry.getKey(), future);
}

return buildMultiFuture(keyToFuture);
}

/**
* Combine multiple CompletableFutures into a single ArcusFuture.
*
* @param keyToFuture a map of keys to their corresponding CompletableFutures
* @return an ArcusFuture that completes with a map of keys to their Boolean results.
*/
private ArcusMultiFuture<Map<String, Boolean>> buildMultiFuture(
Map<String, CompletableFuture<?>> keyToFuture) {
return new ArcusMultiFuture<>(keyToFuture.values(), () -> {
Map<String, Boolean> results = new HashMap<>();
for (Map.Entry<String, CompletableFuture<?>> entry : keyToFuture.entrySet()) {
Expand Down Expand Up @@ -848,7 +892,7 @@ public ArcusFuture<Map<String, BTreeElements<T>>> bopMultiGet(List<String> keys,

for (Map.Entry<MemcachedNode, List<String>> entry : arrangedKeys) {
BTreeGetBulk<T> getBulk =
createBTreeGetBulk(entry.getKey(), entry.getValue(), from, to, args);
createBTreeGetBulk(entry.getKey(), entry.getValue(), from, to, args);
CompletableFuture<Map<String, BTreeElements<T>>> future =
bopMultiGetPerNode(client, getBulk).toCompletableFuture();
futureToKeys.put(future, entry.getValue());
Expand Down
34 changes: 31 additions & 3 deletions src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public interface AsyncArcusCommandsIF<T> {
ArcusFuture<Boolean> prepend(String key, T val);

/**
* Set values for multiple keys.
* Sets the same value for multiple keys.
*
* @param keys list of keys to store
* @param exp expiration time in seconds
Expand All @@ -103,7 +103,16 @@ public interface AsyncArcusCommandsIF<T> {
ArcusFuture<Map<String, Boolean>> multiSet(List<String> keys, int exp, T value);

/**
* Add values for multiple keys if they do not exist.
* Sets multiple key-value pairs.
*
* @param elements map of keys and values to store
* @param exp expiration time in seconds
* @return Map of key to Boolean result
*/
ArcusFuture<Map<String, Boolean>> multiSet(Map<String, T> elements, int exp);

/**
* Add the same value for multiple keys if they do not exist.
*
* @param keys list of keys to store
* @param exp expiration time in seconds
Expand All @@ -113,7 +122,17 @@ public interface AsyncArcusCommandsIF<T> {
ArcusFuture<Map<String, Boolean>> multiAdd(List<String> keys, int exp, T value);

/**
* Replace values for multiple keys if they exist.
* Add multiple key-value pairs if they do not exist.
*
* @param elements map of keys and values to store
* @param exp expiration time in seconds
* @return Map of key to Boolean result
*/
ArcusFuture<Map<String, Boolean>> multiAdd(Map<String, T> elements, int exp);


/**
* Replace the same value for multiple keys if they exist.
*
* @param keys list of keys to store
* @param exp expiration time in seconds
Expand All @@ -122,6 +141,15 @@ public interface AsyncArcusCommandsIF<T> {
*/
ArcusFuture<Map<String, Boolean>> multiReplace(List<String> keys, int exp, T value);

/**
* Replace multiple key-value pairs if they exist.
*
* @param elements map of keys and values to store
* @param exp expiration time in seconds
* @return Map of key to Boolean result
*/
ArcusFuture<Map<String, Boolean>> multiReplace(Map<String, T> elements, int exp);

/**
* Get a value for the given key.
*
Expand Down
197 changes: 196 additions & 1 deletion src/test/java/net/spy/memcached/v2/KVAsyncArcusCommandsTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package net.spy.memcached.v2;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
Expand All @@ -19,6 +20,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -90,7 +92,7 @@ void multiSetAndGet() throws ExecutionException, InterruptedException, TimeoutEx
}

@Test
void multiSetFail() throws ExecutionException, InterruptedException, TimeoutException {
void multiAddFail() throws ExecutionException, InterruptedException, TimeoutException {
// given
async.set(keys.get(0), 0, VALUE)
// when
Expand Down Expand Up @@ -365,4 +367,197 @@ void casExists() throws ExecutionException, InterruptedException, TimeoutExcepti
.toCompletableFuture()
.get(300L, TimeUnit.MILLISECONDS);
}

@Test
void multiSetMapSuccess() throws ExecutionException, InterruptedException, TimeoutException {
// given
Map<String, Object> elements = new HashMap<>();

for (int i = 0; i < keys.size(); i++) {
elements.put(keys.get(i), VALUE + i);
}

// when
async.multiSet(elements, 60)
.thenCompose(result -> {
assertEquals(keys.size(), result.size());
for (Boolean b : result.values()) {
assertTrue(b);
}
return async.multiGet(keys);
})
// then
.thenAccept(result -> {
assertEquals(keys.size(), result.size());
for (int i = 0; i < keys.size(); i++) {
assertEquals(VALUE + i, result.get(keys.get(i)));
}
})
.toCompletableFuture()
.get(300L, TimeUnit.MILLISECONDS);
}

@Test
void multiAddMapSuccess() throws ExecutionException, InterruptedException, TimeoutException {
// given
Map<String, Object> elements = new HashMap<>();
for (int i = 0; i < keys.size(); i++) {
elements.put(keys.get(i), VALUE + i);
}

// when
async.multiAdd(elements, 60)
.thenCompose(result -> {
assertEquals(keys.size(), result.size());
for (Boolean b : result.values()) {
assertTrue(b);
}
return async.multiGet(keys);
})
// then
.thenAccept(result -> {
assertEquals(keys.size(), result.size());
for (int i = 0; i < keys.size(); i++) {
assertEquals(VALUE + i, result.get(keys.get(i)));
}
})
.toCompletableFuture()
.get(300L, TimeUnit.MILLISECONDS);
}

@Test
void multiAddMapPartialSuccess() throws ExecutionException, InterruptedException,
TimeoutException {

// given
Map<String, Object> elements = new HashMap<>();
for (int i = 0; i < keys.size(); i++) {
elements.put(keys.get(i), VALUE + i);
}

/* 0th key is added before multiAdd, so it should fail. */
async.set(keys.get(0), 60, VALUE + "-old")
.thenAccept(Assertions::assertTrue)
.toCompletableFuture()
.get(300L, TimeUnit.MILLISECONDS);

// when
async.multiAdd(elements, 60)
.thenCompose(result -> {
assertEquals(keys.size(), result.size());
assertFalse(result.get(keys.get(0)));
for (int i = 1; i < keys.size(); i++) {
assertTrue(result.get(keys.get(i)));
}
return async.multiGet(keys);
})
// then
.thenAccept(result -> {
assertEquals(keys.size(), result.size());
assertEquals(VALUE + "-old", result.get(keys.get(0)));
for (int i = 1; i < keys.size(); i++) {
assertEquals(VALUE + i, result.get(keys.get(i)));
}
})
.toCompletableFuture()
.get(300L, TimeUnit.MILLISECONDS);
}

@Test
void multiReplaceMapSuccess() throws ExecutionException, InterruptedException,
TimeoutException {

// given
Map<String, Object> oldElements = new HashMap<>();
for (int i = 0; i < keys.size(); i++) {
oldElements.put(keys.get(i), VALUE + "-old-" + i);
}

Map<String, Object> newElements = new HashMap<>();
for (int i = 0; i < keys.size(); i++) {
newElements.put(keys.get(i), VALUE + "-new-" + i);
}

async.multiSet(oldElements, 60)
.thenAccept(result -> {
assertEquals(keys.size(), result.size());
for (Boolean b : result.values()) {
assertTrue(b);
}
})
.toCompletableFuture()
.get(300L, TimeUnit.MILLISECONDS);

// when
async.multiReplace(newElements, 60)
.thenCompose(result -> {
assertEquals(keys.size(), result.size());
for (Boolean b : result.values()) {
assertTrue(b);
}
return async.multiGet(keys);
})
// then
.thenAccept(result -> {
assertEquals(keys.size(), result.size());
for (int i = 0; i < keys.size(); i++) {
assertEquals(VALUE + "-new-" + i, result.get(keys.get(i)));
}
})
.toCompletableFuture()
.get(300L, TimeUnit.MILLISECONDS);
}

@Test
void multiReplaceMapPartialSuccess() throws ExecutionException, InterruptedException,
TimeoutException {

// given
Map<String, Object> oldElements = new HashMap<>();
for (int i = 0; i < 2; i++) {
oldElements.put(keys.get(i), VALUE + "-old-" + i);
}

Map<String, Object> newElements = new HashMap<>();
for (int i = 0; i < keys.size(); i++) {
newElements.put(keys.get(i), VALUE + "-new-" + i);
}

/* 0, 1st keys are added before multiReplace, so only they should succeed. */
async.multiSet(oldElements, 60)
.thenAccept(result -> {
assertEquals(oldElements.size(), result.size());
for (Boolean b : result.values()) {
assertTrue(b);
}
})
.toCompletableFuture()
.get(300L, TimeUnit.MILLISECONDS);

// when
async.multiReplace(newElements, 60)
.thenCompose(result -> {
assertEquals(keys.size(), result.size());
for (int i = 0; i < 2; i++) {
assertTrue(result.get(keys.get(i)));
}

for (int i = 2; i < keys.size(); i++) {
assertFalse(result.get(keys.get(i)));
}
return async.multiGet(keys);
})
// then
.thenAccept(result -> {
assertEquals(2, result.size());
for (int i = 0; i < 2; i++) {
assertEquals(VALUE + "-new-" + i, result.get(keys.get(i)));
}
for (int i = 2; i < keys.size(); i++) {
assertNull(result.get(keys.get(i)));
}
})
.toCompletableFuture()
.get(300L, TimeUnit.MILLISECONDS);
}
}
Loading