Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.launchdarkly.sdk.server;

/**
* Optional interface for data stores that can disable their internal cache.
* <p>
* This is currently for internal implementations only.
*/
interface DisableableCache {
/**
* Disables the internal cache. After this call, the cache is no longer
* consulted on reads and no longer populated by writes.
* <p>
* Implementations should release the cache contents so the memory can be
* reclaimed. The call must be idempotent: subsequent invocations should be
* safe and have no further effect.
*/
void disableCache();
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
* <p>
* This class is only constructed by {@link PersistentDataStoreBuilder}.
*/
final class PersistentDataStoreWrapper implements DataStore, SettableCache {
final class PersistentDataStoreWrapper implements DataStore, SettableCache, DisableableCache {
private final PersistentDataStore core;
private final LoadingCache<CacheKey, Optional<ItemDescriptor>> itemCache;
private final LoadingCache<DataKind, KeyedItems<ItemDescriptor>> allCache;
Expand All @@ -54,9 +54,15 @@ final class PersistentDataStoreWrapper implements DataStore, SettableCache {
private final AtomicBoolean inited = new AtomicBoolean(false);
private final ListeningExecutorService cacheExecutor;
private final LDLogger logger;

private final Object externalStoreLock = new Object();
private volatile CacheExporter externalCache;

// Once true, the cache is bypassed on reads and writes; entries already in
// the cache have been invalidated by disableCache(). The cache instances
// themselves remain alive until GC reclaims them; the LoadingCache loaders
// are short-circuited because every touch site checks this flag first.
private volatile boolean cacheDisabled;

PersistentDataStoreWrapper(
final PersistentDataStore core,
Expand Down Expand Up @@ -151,14 +157,26 @@ public void close() throws IOException {
core.close();
}

@Override
public void disableCache() {
if (cacheDisabled) return;
// Volatile write publishes the bypass flag before clearing cache contents.
// Future readers observe cacheDisabled == true and skip the cache call
// sites.
cacheDisabled = true;
if (itemCache != null) itemCache.invalidateAll();
if (allCache != null) allCache.invalidateAll();
if (initCache != null) initCache.invalidateAll();
}

@Override
public boolean isInitialized() {
if (inited.get()) {
return true;
}
boolean result;
try {
if (initCache != null) {
if (initCache != null && !cacheDisabled) {
result = initCache.get("");
} else {
result = core.isInitialized();
Expand Down Expand Up @@ -187,7 +205,7 @@ public void init(FullDataSet<ItemDescriptor> allData) {
allBuilder.add(new AbstractMap.SimpleEntry<>(kind, items));
}
RuntimeException failure = initCore(new FullDataSet<>(allBuilder.build(), allData.shouldPersist()));
if (itemCache != null && allCache != null) {
if (itemCache != null && allCache != null && !cacheDisabled) {
itemCache.invalidateAll();
allCache.invalidateAll();
if (failure != null && !cacheIndefinitely) {
Expand Down Expand Up @@ -228,7 +246,7 @@ private RuntimeException initCore(FullDataSet<SerializedItemDescriptor> allData)
@Override
public ItemDescriptor get(DataKind kind, String key) {
try {
ItemDescriptor ret = itemCache != null ? itemCache.get(CacheKey.forItem(kind, key)).orNull() :
ItemDescriptor ret = (itemCache != null && !cacheDisabled) ? itemCache.get(CacheKey.forItem(kind, key)).orNull() :
getAndDeserializeItem(kind, key);
processError(null);
return ret;
Expand All @@ -242,7 +260,7 @@ public ItemDescriptor get(DataKind kind, String key) {
public KeyedItems<ItemDescriptor> getAll(DataKind kind) {
try {
KeyedItems<ItemDescriptor> ret;
ret = allCache != null ? allCache.get(kind) : getAllAndDeserialize(kind);
ret = (allCache != null && !cacheDisabled) ? allCache.get(kind) : getAllAndDeserialize(kind);
processError(null);
return ret;
} catch (Exception e) {
Expand Down Expand Up @@ -281,7 +299,7 @@ public boolean upsert(DataKind kind, String key, ItemDescriptor item) {
}
failure = e;
}
if (itemCache != null) {
if (itemCache != null && !cacheDisabled) {
CacheKey cacheKey = CacheKey.forItem(kind, key);
if (failure == null) {
if (updated) {
Expand All @@ -297,7 +315,7 @@ public boolean upsert(DataKind kind, String key, ItemDescriptor item) {
}
}
}
if (allCache != null) {
if (allCache != null && !cacheDisabled) {
// If the cache has a finite TTL, then we should remove the "all items" cache entry to force
// a reread the next time All is called. However, if it's an infinite TTL, we need to just
// update the item within the existing "all items" entry (since we want things to still work
Expand Down Expand Up @@ -340,7 +358,7 @@ public void setCacheExporter(CacheExporter externalDataSource) {

@Override
public CacheStats getCacheStats() {
if (itemCache == null || allCache == null) {
if (itemCache == null || allCache == null || cacheDisabled) {
return null;
}
com.google.common.cache.CacheStats itemStats = itemCache.stats();
Expand Down Expand Up @@ -443,8 +461,9 @@ private boolean pollAvailabilityAfterOutage() {
}

// Fall back to cache-based recovery if external store is not available/initialized
// and we're in infinite cache mode
if (cacheIndefinitely && allCache != null) {
// and we're in infinite cache mode. Under FDv2 this branch is dead once
// disableCache has run: the externalCache path above supersedes it.
if (cacheIndefinitely && allCache != null && !cacheDisabled) {
// If we're in infinite cache mode, then we can assume the cache has a full set of current
// flag data (since presumably the data source has still been running) and we can just
// write the contents of the cache to the underlying data store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ private void maybeSwitchStore() {
}
synchronized (activeStoreLock) {
activeReadStore = memoryStore;
if (persistentStore instanceof DisableableCache) {
((DisableableCache) persistentStore).disableCache();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,19 @@
* </code></pre>
*
* In this example, {@code .url()} is an option specifically for the Redis integration, whereas
* {@code cacheSeconds()} is an option that can be used for any persistent data store.
* {@code cacheSeconds()} is an option that can be used for any persistent data store.
* <p>
* Note that this class is abstract; the actual implementation is created by calling
* {@link Components#persistentDataStore(ComponentConfigurer)}.
* <p>
* Under the FDv2 data system, the cache options configured here ({@link #cacheTime(Duration)},
* {@link #cacheSeconds(long)}, {@link #cacheMillis(long)}, {@link #cacheForever()},
* {@link #noCaching()}, {@link #staleValuesPolicy(StaleValuesPolicy)},
* {@link #recordCacheStats(boolean)}) only govern the brief bootstrap window before the in-memory
* store has received its first full payload. Once the in-memory store takes over as the active
* read source, the persistent-store cache is released and these settings have no further effect.
* These options are kept for backward compatibility and may be deprecated in a future major
* version.
* @since 4.12.0
*/
public abstract class PersistentDataStoreBuilder implements ComponentConfigurer<DataStore> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;
Expand Down Expand Up @@ -713,6 +714,115 @@ public void statusRemainsUnavailableIfStoreSaysItIsAvailableButInitFails() throw
assertThat(core.initedCount.get(), greaterThan(initedCount));
}

@Test
public void disableCacheIsIdempotent() {
assumeThat(testMode.isCached(), is(true));
wrapper.disableCache();
wrapper.disableCache(); // must not throw
}

@Test
public void disableCacheIsSafeOnUncachedWrapper() {
assumeThat(testMode.isCached(), is(false));
wrapper.disableCache(); // must not throw
}

@Test
public void getAfterDisableCacheReturnsCurrentCoreState() {
assumeThat(testMode.isCached(), is(true));
TestItem item1v1 = new TestItem("key", 1);
TestItem item1v2 = new TestItem("key", 2);

core.forceSet(TEST_ITEMS, item1v1);
// Prime the cache.
assertThat(wrapper.get(TEST_ITEMS, item1v1.key), equalTo(item1v1.toItemDescriptor()));

wrapper.disableCache();

// Mutate the core behind the wrapper's back; if the cache were still
// serving reads we would see the stale v1.
core.forceSet(TEST_ITEMS, item1v2);
assertThat(wrapper.get(TEST_ITEMS, item1v2.key), equalTo(item1v2.toItemDescriptor()));
}

@Test
public void getAllAfterDisableCacheReturnsCurrentCoreState() {
assumeThat(testMode.isCached(), is(true));
TestItem item1 = new TestItem("keyA", 1);
TestItem item2 = new TestItem("keyB", 1);

core.forceSet(TEST_ITEMS, item1);
// Prime the cache.
Map<String, ItemDescriptor> primed = toItemsMap(wrapper.getAll(TEST_ITEMS));
assertThat(primed.size(), is(1));

wrapper.disableCache();

core.forceSet(TEST_ITEMS, item2);
Map<String, ItemDescriptor> afterDrop = toItemsMap(wrapper.getAll(TEST_ITEMS));
assertThat(afterDrop.size(), is(2));
}

@Test
public void upsertAfterDisableCacheWritesThroughToCoreOnly() {
assumeThat(testMode.isCached(), is(true));
TestItem item = new TestItem("key", 1);

wrapper.disableCache();

assertThat(wrapper.upsert(TEST_ITEMS, item.key, item.toItemDescriptor()), is(true));
// The write must have landed in the core.
assertThat(core.data.get(TEST_ITEMS).get(item.key), equalTo(item.toSerializedItemDescriptor()));
// And subsequent reads must reach the core (no repopulated cache).
assertThat(wrapper.get(TEST_ITEMS, item.key), equalTo(item.toItemDescriptor()));
}

@Test
public void initAfterDisableCacheWritesThroughToCoreWithoutRepopulatingCache() {
assumeThat(testMode.isCached(), is(true));
TestItem itemA = new TestItem("keyA", 1);
TestItem itemB = new TestItem("keyA", 2);

wrapper.disableCache();

wrapper.init(new DataBuilder().add(TEST_ITEMS, itemA).build());

assertThat(core.data.get(TEST_ITEMS).get(itemA.key), equalTo(itemA.toSerializedItemDescriptor()));

// Mutate the core behind the wrapper's back; if the cache had repopulated
// we would still see itemA on the next read.
core.forceSet(TEST_ITEMS, itemB);
assertThat(wrapper.get(TEST_ITEMS, itemB.key), equalTo(itemB.toItemDescriptor()));
}

@Test
public void getCacheStatsAfterDisableCacheReturnsNull() {
assumeThat(testMode.isCached(), is(true));
// Build a wrapper with stats recording enabled so getCacheStats is non-null pre-disable.
PersistentDataStoreWrapper w = new PersistentDataStoreWrapper(
new MockPersistentDataStore(),
testMode.getCacheTtl(),
PersistentDataStoreBuilder.StaleValuesPolicy.EVICT,
true,
this::updateStatus,
sharedExecutor,
testLogger);
try {
assertNotNull(w.getCacheStats());
w.disableCache();
assertNull(w.getCacheStats());
} finally {
try { w.close(); } catch (IOException e) { /* ignore */ }
}
}

@Test
public void closeAfterDisableCacheDoesNotThrow() throws IOException {
assumeThat(testMode.isCached(), is(true));
wrapper.disableCache();
wrapper.close(); // safety belt; tearDown will also call close, which must be safe
}

private void causeStoreError(MockPersistentDataStore core, PersistentDataStoreWrapper w) {
core.unavailable = true;
core.fakeError = new RuntimeException(FAKE_ERROR.getMessage());
Expand Down
Loading
Loading