[Change] CacheStore-Redis 修复 RedisConnectionPool 为所有 Factory 共用而导致多实例情况下会出现连接错误的问题;

[Change] RedisConnectionPool 将该类从静态方法类转变为实例类;
[Change] RedisCacheStore, RedisMapCacheStore, RedisSingleCacheStore 引入 RedisConnectionPool 实例以进行具体操作;
[Change] RedisMapCacheStoreTest, RedisSingleCacheStoreTest 调整 CacheStore 实例的获取, 不再自行创建具体实现, 转为通过 Factory 创建实例;
[Change] RedisCacheStoreFactory 单独创建一个 RedisConnectionPool 以提供给通过 Factory 自身创建的 CacheStore 实现;
This commit is contained in:
LamGC 2020-11-06 20:08:46 +08:00
parent dd7ac015f9
commit 1eee7d19c6
Signed by: LamGC
GPG Key ID: 6C5AE2A913941E1D
7 changed files with 91 additions and 38 deletions

View File

@ -30,6 +30,12 @@ import java.util.Set;
*/ */
public abstract class RedisCacheStore<V> implements CacheStore<V> { public abstract class RedisCacheStore<V> implements CacheStore<V> {
private final RedisConnectionPool connectionPool;
protected RedisCacheStore(RedisConnectionPool connectionPool) {
this.connectionPool = connectionPool;
}
/** /**
* 获取 Key 前缀. * 获取 Key 前缀.
* <p>key = getKeyPrefix() + key * <p>key = getKeyPrefix() + key
@ -49,7 +55,7 @@ public abstract class RedisCacheStore<V> implements CacheStore<V> {
@Override @Override
public boolean setTimeToLive(CacheKey key, long ttl) { public boolean setTimeToLive(CacheKey key, long ttl) {
String keyString = getKeyString(key); String keyString = getKeyString(key);
return RedisConnectionPool.executeRedis(jedis -> { return connectionPool.executeRedis(jedis -> {
Long result; Long result;
if (ttl >= 0) { if (ttl >= 0) {
result = jedis.pexpire(keyString, ttl); result = jedis.pexpire(keyString, ttl);
@ -62,7 +68,7 @@ public abstract class RedisCacheStore<V> implements CacheStore<V> {
@Override @Override
public long getTimeToLive(CacheKey key) { public long getTimeToLive(CacheKey key) {
return RedisConnectionPool.executeRedis(jedis -> { return connectionPool.executeRedis(jedis -> {
Long ttl = jedis.pttl(getKeyString(key)); Long ttl = jedis.pttl(getKeyString(key));
return ttl < 0 ? -1 : ttl; return ttl < 0 ? -1 : ttl;
}); });
@ -70,27 +76,27 @@ public abstract class RedisCacheStore<V> implements CacheStore<V> {
@Override @Override
public long size() { public long size() {
return RedisConnectionPool.executeRedis(Jedis::dbSize); return connectionPool.executeRedis(Jedis::dbSize);
} }
@Override @Override
public boolean clear() { public boolean clear() {
return RedisConnectionPool.executeRedis(jedis -> RedisUtils.isOk(jedis.flushDB())); return connectionPool.executeRedis(jedis -> RedisUtils.isOk(jedis.flushDB()));
} }
@Override @Override
public boolean exists(CacheKey key) { public boolean exists(CacheKey key) {
return RedisConnectionPool.executeRedis(jedis -> jedis.exists(getKeyString(key))); return connectionPool.executeRedis(jedis -> jedis.exists(getKeyString(key)));
} }
@Override @Override
public boolean remove(CacheKey key) { public boolean remove(CacheKey key) {
return RedisConnectionPool.executeRedis(jedis -> jedis.del(getKeyString(key)) == RedisUtils.RETURN_CODE_OK); return connectionPool.executeRedis(jedis -> jedis.del(getKeyString(key)) == RedisUtils.RETURN_CODE_OK);
} }
@Override @Override
public Set<String> keySet() { public Set<String> keySet() {
Set<String> keys = RedisConnectionPool.executeRedis(jedis -> jedis.keys(RedisUtils.KEY_PATTERN_ALL)); Set<String> keys = connectionPool.executeRedis(jedis -> jedis.keys(RedisUtils.KEY_PATTERN_ALL));
final int prefixLength = getKeyPrefix().length(); final int prefixLength = getKeyPrefix().length();
Set<String> newKeys = new HashSet<>(); Set<String> newKeys = new HashSet<>();
for (String key : keys) { for (String key : keys) {

View File

@ -47,6 +47,8 @@ public class RedisCacheStoreFactory implements CacheStoreFactory {
private final static String PROP_DATABASE = "redis.databaseId"; private final static String PROP_DATABASE = "redis.databaseId";
private final static String PROP_CLIENT_NAME = "redis.clientName"; private final static String PROP_CLIENT_NAME = "redis.clientName";
private final RedisConnectionPool connectionPool = new RedisConnectionPool();
@Override @Override
public void initial(File dataDirectory) { public void initial(File dataDirectory) {
final File propertiesFile = new File(dataDirectory, "redis.properties"); final File propertiesFile = new File(dataDirectory, "redis.properties");
@ -75,7 +77,7 @@ public class RedisCacheStoreFactory implements CacheStoreFactory {
Integer.parseInt(properties.getProperty(PROP_PORT, "6379")), Integer.parseInt(properties.getProperty(PROP_PORT, "6379")),
queryString); queryString);
RedisConnectionPool.setConnectionUrl(url); connectionPool.setConnectionUrl(url);
} catch (MalformedURLException e) { } catch (MalformedURLException e) {
log.error("构造连接 URL 时发生异常", e); log.error("构造连接 URL 时发生异常", e);
} }
@ -83,7 +85,7 @@ public class RedisCacheStoreFactory implements CacheStoreFactory {
@Override @Override
public <V> SingleCacheStore<V> newSingleCacheStore(String identify, StringConverter<V> converter) { public <V> SingleCacheStore<V> newSingleCacheStore(String identify, StringConverter<V> converter) {
return new RedisSingleCacheStore<>(identify, converter); return new RedisSingleCacheStore<>(connectionPool, identify, converter);
} }
@Override @Override
@ -98,11 +100,11 @@ public class RedisCacheStoreFactory implements CacheStoreFactory {
@Override @Override
public <V> MapCacheStore<V> newMapCacheStore(String identify, StringConverter<V> converter) { public <V> MapCacheStore<V> newMapCacheStore(String identify, StringConverter<V> converter) {
return new RedisMapCacheStore<>(identify, converter); return new RedisMapCacheStore<>(connectionPool, identify, converter);
} }
@Override @Override
public boolean canGetCacheStore() { public boolean canGetCacheStore() {
return RedisConnectionPool.available(); return connectionPool.available();
} }
} }

View File

@ -34,16 +34,17 @@ import java.util.function.Function;
class RedisConnectionPool { class RedisConnectionPool {
private final static Logger log = LoggerFactory.getLogger(RedisConnectionPool.class); private final static Logger log = LoggerFactory.getLogger(RedisConnectionPool.class);
private final static AtomicReference<JedisPool> POOL = new AtomicReference<>();
private final static AtomicReference<URL> CONNECTION_URL = new AtomicReference<>();
public static synchronized void setConnectionUrl(URL connectionUrl) { private final AtomicReference<JedisPool> POOL = new AtomicReference<>();
private final AtomicReference<URL> CONNECTION_URL = new AtomicReference<>();
public synchronized void setConnectionUrl(URL connectionUrl) {
if(CONNECTION_URL.get() != null) { if(CONNECTION_URL.get() != null) {
CONNECTION_URL.set(connectionUrl); CONNECTION_URL.set(connectionUrl);
} }
} }
public static synchronized void reconnectRedis() { public synchronized void reconnectRedis() {
JedisPool jedisPool = POOL.get(); JedisPool jedisPool = POOL.get();
if (jedisPool != null && !jedisPool.isClosed()) { if (jedisPool != null && !jedisPool.isClosed()) {
return; return;
@ -66,7 +67,7 @@ class RedisConnectionPool {
* <p>注意, 需回收 Jedis 对象, 否则可能会耗尽连接池导致后续操作受到影响. * <p>注意, 需回收 Jedis 对象, 否则可能会耗尽连接池导致后续操作受到影响.
* @return 返回可用的 Jedis 连接. * @return 返回可用的 Jedis 连接.
*/ */
public static Jedis getConnection() { public Jedis getConnection() {
JedisPool pool = POOL.get(); JedisPool pool = POOL.get();
if (pool == null || pool.isClosed()) { if (pool == null || pool.isClosed()) {
reconnectRedis(); reconnectRedis();
@ -85,7 +86,7 @@ class RedisConnectionPool {
* @param <R> 返回值类型. * @param <R> 返回值类型.
* @return 返回 function 返回的内容. * @return 返回 function 返回的内容.
*/ */
public static <R> R executeRedis(Function<Jedis, R> function) { public <R> R executeRedis(Function<Jedis, R> function) {
try (Jedis jedis = getConnection()) { try (Jedis jedis = getConnection()) {
return function.apply(jedis); return function.apply(jedis);
} }
@ -95,7 +96,7 @@ class RedisConnectionPool {
* 检查 Redis 连接池是否有可用的资源. * 检查 Redis 连接池是否有可用的资源.
* @return 如果连接池依然活跃, 返回 true. * @return 如果连接池依然活跃, 返回 true.
*/ */
public static boolean available() { public boolean available() {
JedisPool jedisPool = POOL.get(); JedisPool jedisPool = POOL.get();
if (jedisPool == null || jedisPool.isClosed()) { if (jedisPool == null || jedisPool.isClosed()) {
reconnectRedis(); reconnectRedis();

View File

@ -33,8 +33,11 @@ public class RedisMapCacheStore<V> extends RedisCacheStore<Map<String, V>> imple
private final String keyPrefix; private final String keyPrefix;
private final StringConverter<V> converter; private final StringConverter<V> converter;
private final RedisConnectionPool connectionPool;
public RedisMapCacheStore(String keyPrefix, StringConverter<V> converter) { public RedisMapCacheStore(RedisConnectionPool connectionPool, String keyPrefix, StringConverter<V> converter) {
super(connectionPool);
this.connectionPool = connectionPool;
keyPrefix = Strings.nullToEmpty(keyPrefix).trim(); keyPrefix = Strings.nullToEmpty(keyPrefix).trim();
if (!keyPrefix.isEmpty() && keyPrefix.endsWith(RedisUtils.KEY_SEPARATOR)) { if (!keyPrefix.isEmpty() && keyPrefix.endsWith(RedisUtils.KEY_SEPARATOR)) {
this.keyPrefix = keyPrefix; this.keyPrefix = keyPrefix;
@ -52,7 +55,7 @@ public class RedisMapCacheStore<V> extends RedisCacheStore<Map<String, V>> imple
@Override @Override
public int mapSize(CacheKey key) { public int mapSize(CacheKey key) {
return RedisConnectionPool.executeRedis(jedis -> { return connectionPool.executeRedis(jedis -> {
String keyString = getKeyString(key); String keyString = getKeyString(key);
if (jedis.exists(keyString)) { if (jedis.exists(keyString)) {
return jedis.hlen(keyString).intValue(); return jedis.hlen(keyString).intValue();
@ -63,7 +66,7 @@ public class RedisMapCacheStore<V> extends RedisCacheStore<Map<String, V>> imple
@Override @Override
public Set<String> mapFieldSet(CacheKey key) { public Set<String> mapFieldSet(CacheKey key) {
return RedisConnectionPool.executeRedis(jedis -> { return connectionPool.executeRedis(jedis -> {
String keyString = getKeyString(key); String keyString = getKeyString(key);
if (jedis.exists(keyString)) { if (jedis.exists(keyString)) {
return jedis.hkeys(keyString); return jedis.hkeys(keyString);
@ -74,7 +77,7 @@ public class RedisMapCacheStore<V> extends RedisCacheStore<Map<String, V>> imple
@Override @Override
public Set<V> mapValueSet(CacheKey key) { public Set<V> mapValueSet(CacheKey key) {
List<String> rawValueSet = RedisConnectionPool.executeRedis(jedis -> { List<String> rawValueSet = connectionPool.executeRedis(jedis -> {
String keyString = getKeyString(key); String keyString = getKeyString(key);
if (jedis.exists(keyString)) { if (jedis.exists(keyString)) {
return jedis.hvals(keyString); return jedis.hvals(keyString);
@ -97,7 +100,7 @@ public class RedisMapCacheStore<V> extends RedisCacheStore<Map<String, V>> imple
public boolean put(CacheKey key, String field, V value) { public boolean put(CacheKey key, String field, V value) {
Objects.requireNonNull(field); Objects.requireNonNull(field);
Objects.requireNonNull(value); Objects.requireNonNull(value);
return RedisConnectionPool.executeRedis(jedis -> { return connectionPool.executeRedis(jedis -> {
String keyString = getKeyString(key); String keyString = getKeyString(key);
return jedis.hset(keyString, field, converter.to(value)) == RedisUtils.RETURN_CODE_OK; return jedis.hset(keyString, field, converter.to(value)) == RedisUtils.RETURN_CODE_OK;
}); });
@ -113,7 +116,7 @@ public class RedisMapCacheStore<V> extends RedisCacheStore<Map<String, V>> imple
final Map<String, String> targetMap = new HashMap<>(map.size()); final Map<String, String> targetMap = new HashMap<>(map.size());
map.forEach((k, v) -> targetMap.put(k, converter.to(v))); map.forEach((k, v) -> targetMap.put(k, converter.to(v)));
return RedisConnectionPool.executeRedis(jedis -> { return connectionPool.executeRedis(jedis -> {
String keyString = getKeyString(key); String keyString = getKeyString(key);
return RedisUtils.isOk(jedis.hmset(keyString, targetMap)); return RedisUtils.isOk(jedis.hmset(keyString, targetMap));
}); });
@ -123,7 +126,7 @@ public class RedisMapCacheStore<V> extends RedisCacheStore<Map<String, V>> imple
public boolean putIfNotExist(CacheKey key, String field, V value) { public boolean putIfNotExist(CacheKey key, String field, V value) {
Objects.requireNonNull(field); Objects.requireNonNull(field);
Objects.requireNonNull(value); Objects.requireNonNull(value);
return RedisConnectionPool.executeRedis(jedis -> { return connectionPool.executeRedis(jedis -> {
String keyString = getKeyString(key); String keyString = getKeyString(key);
return jedis.hsetnx(keyString, field, converter.to(value)) == RedisUtils.RETURN_CODE_OK; return jedis.hsetnx(keyString, field, converter.to(value)) == RedisUtils.RETURN_CODE_OK;
}); });
@ -132,7 +135,7 @@ public class RedisMapCacheStore<V> extends RedisCacheStore<Map<String, V>> imple
@Override @Override
public V get(CacheKey key, String field) { public V get(CacheKey key, String field) {
Objects.requireNonNull(field); Objects.requireNonNull(field);
String value = RedisConnectionPool.executeRedis(jedis -> jedis.hget(getKeyString(key), field)); String value = connectionPool.executeRedis(jedis -> jedis.hget(getKeyString(key), field));
if (value == null) { if (value == null) {
return null; return null;
} }
@ -142,14 +145,14 @@ public class RedisMapCacheStore<V> extends RedisCacheStore<Map<String, V>> imple
@Override @Override
public boolean removeField(CacheKey key, String field) { public boolean removeField(CacheKey key, String field) {
Objects.requireNonNull(field); Objects.requireNonNull(field);
return RedisConnectionPool.executeRedis(jedis -> return connectionPool.executeRedis(jedis ->
jedis.hdel(getKeyString(key), field) == RedisUtils.RETURN_CODE_OK); jedis.hdel(getKeyString(key), field) == RedisUtils.RETURN_CODE_OK);
} }
@Override @Override
public boolean containsField(CacheKey key, String field) { public boolean containsField(CacheKey key, String field) {
Objects.requireNonNull(field); Objects.requireNonNull(field);
return RedisConnectionPool.executeRedis(jedis -> jedis.hexists(getKeyString(key), field)); return connectionPool.executeRedis(jedis -> jedis.hexists(getKeyString(key), field));
} }
@Override @Override
@ -168,7 +171,7 @@ public class RedisMapCacheStore<V> extends RedisCacheStore<Map<String, V>> imple
String[] fieldsArray = new String[fields.size()]; String[] fieldsArray = new String[fields.size()];
fields.toArray(fieldsArray); fields.toArray(fieldsArray);
return RedisConnectionPool.executeRedis(jedis -> return connectionPool.executeRedis(jedis ->
jedis.hdel(getKeyString(key), fieldsArray) != RedisUtils.RETURN_CODE_FAILED); jedis.hdel(getKeyString(key), fieldsArray) != RedisUtils.RETURN_CODE_FAILED);
} }

View File

@ -36,8 +36,11 @@ public class RedisSingleCacheStore<V> extends RedisCacheStore<V> implements Sing
private final String keyPrefix; private final String keyPrefix;
private final StringConverter<V> converter; private final StringConverter<V> converter;
private final RedisConnectionPool connectionPool;
public RedisSingleCacheStore(String keyPrefix, StringConverter<V> converter) { public RedisSingleCacheStore(RedisConnectionPool connectionPool, String keyPrefix, StringConverter<V> converter) {
super(connectionPool);
this.connectionPool = connectionPool;
keyPrefix = Strings.nullToEmpty(keyPrefix).trim(); keyPrefix = Strings.nullToEmpty(keyPrefix).trim();
if (!keyPrefix.isEmpty() && keyPrefix.endsWith(RedisUtils.KEY_SEPARATOR)) { if (!keyPrefix.isEmpty() && keyPrefix.endsWith(RedisUtils.KEY_SEPARATOR)) {
this.keyPrefix = keyPrefix; this.keyPrefix = keyPrefix;
@ -50,20 +53,20 @@ public class RedisSingleCacheStore<V> extends RedisCacheStore<V> implements Sing
@Override @Override
public boolean set(CacheKey key, V value) { public boolean set(CacheKey key, V value) {
return RedisConnectionPool.executeRedis(jedis -> return connectionPool.executeRedis(jedis ->
RedisUtils.isOk(jedis.set(getKeyString(key), converter.to(Objects.requireNonNull(value))))); RedisUtils.isOk(jedis.set(getKeyString(key), converter.to(Objects.requireNonNull(value)))));
} }
@Override @Override
public boolean setIfNotExist(CacheKey key, V value) { public boolean setIfNotExist(CacheKey key, V value) {
return RedisConnectionPool.executeRedis(jedis -> return connectionPool.executeRedis(jedis ->
jedis.setnx(getKeyString(key), converter.to(Objects.requireNonNull(value))) jedis.setnx(getKeyString(key), converter.to(Objects.requireNonNull(value)))
== RedisUtils.RETURN_CODE_OK); == RedisUtils.RETURN_CODE_OK);
} }
@Override @Override
public V get(CacheKey key) { public V get(CacheKey key) {
String value = RedisConnectionPool.executeRedis(jedis -> jedis.get(getKeyString(key))); String value = connectionPool.executeRedis(jedis -> jedis.get(getKeyString(key)));
if (value == null) { if (value == null) {
return null; return null;
} }

View File

@ -18,6 +18,7 @@
package net.lamgc.cgj.bot.cache.redis; package net.lamgc.cgj.bot.cache.redis;
import com.google.common.base.Throwables;
import net.lamgc.cgj.bot.cache.CacheKey; import net.lamgc.cgj.bot.cache.CacheKey;
import net.lamgc.cgj.bot.cache.MapCacheStore; import net.lamgc.cgj.bot.cache.MapCacheStore;
import net.lamgc.cgj.bot.cache.convert.StringToStringConverter; import net.lamgc.cgj.bot.cache.convert.StringToStringConverter;
@ -25,7 +26,9 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -34,8 +37,24 @@ import java.util.Map;
*/ */
public class RedisMapCacheStoreTest { public class RedisMapCacheStoreTest {
private final static MapCacheStore<String> cacheStore = private final static RedisCacheStoreFactory factory;
new RedisMapCacheStore<>("test", new StringToStringConverter()); private final static TemporaryFolder tempFolder = TemporaryFolder.builder().build();
static {
try {
tempFolder.create();
} catch (IOException e) {
Assert.fail(Throwables.getStackTraceAsString(e));
}
factory = new RedisCacheStoreFactory();
try {
factory.initial(tempFolder.newFolder("cache-redis"));
} catch (IOException e) {
Assert.fail(Throwables.getStackTraceAsString(e));
}
}
private final static MapCacheStore<String> cacheStore = factory.newMapCacheStore("test", new StringToStringConverter());
@Before @Before
public void before() { public void before() {

View File

@ -17,13 +17,16 @@
package net.lamgc.cgj.bot.cache.redis; package net.lamgc.cgj.bot.cache.redis;
import com.google.common.base.Throwables;
import net.lamgc.cgj.bot.cache.CacheKey; import net.lamgc.cgj.bot.cache.CacheKey;
import net.lamgc.cgj.bot.cache.SingleCacheStore; import net.lamgc.cgj.bot.cache.SingleCacheStore;
import net.lamgc.cgj.bot.cache.convert.StringToStringConverter; import net.lamgc.cgj.bot.cache.convert.StringToStringConverter;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -33,7 +36,24 @@ import java.util.Map;
*/ */
public class RedisSingleCacheStoreTest { public class RedisSingleCacheStoreTest {
private final static SingleCacheStore<String> cacheStore = new RedisSingleCacheStore<>("test", new StringToStringConverter()); private final static RedisCacheStoreFactory factory;
private final static TemporaryFolder tempFolder = TemporaryFolder.builder().build();
static {
try {
tempFolder.create();
} catch (IOException e) {
Assert.fail(Throwables.getStackTraceAsString(e));
}
factory = new RedisCacheStoreFactory();
try {
factory.initial(tempFolder.newFolder("cache-redis"));
} catch (IOException e) {
Assert.fail(Throwables.getStackTraceAsString(e));
}
}
private final static SingleCacheStore<String> cacheStore = factory.newSingleCacheStore("test", new StringToStringConverter());
@Before @Before
public void before() { public void before() {
@ -42,8 +62,7 @@ public class RedisSingleCacheStoreTest {
@Test @Test
public void nullThrowTest() { public void nullThrowTest() {
final SingleCacheStore<String> tempCacheStore = final SingleCacheStore<String> tempCacheStore = factory.newSingleCacheStore("test" + RedisUtils.KEY_SEPARATOR, new StringToStringConverter());
new RedisSingleCacheStore<>("test" + RedisUtils.KEY_SEPARATOR, new StringToStringConverter());
final CacheKey key = new CacheKey("testKey"); final CacheKey key = new CacheKey("testKey");
// RedisSingleCacheStore // RedisSingleCacheStore