在日常系统开发过程中,我们常常用到各种缓存,例如 Redis、memcache。当缓存服务器由于压力过大而崩溃时,很多核心业务会受到严重影响。这时候,我们需要考虑一些本地缓存策略。即当缓存服务器出现问题时,我们的核心服务也能正常运转一段时间,而不至于立即全面瘫痪。

基于Java算法的高效本地缓存代码实现-图片-1

通常一些核心接口,我们会对周边接口采取一些自动降级、异常隔离,从而保证核心业务的正常交易。但是核心业务中往往有些简单信息严重依赖缓存服务器。例如类似开关的一些配置信息。这类数据量不大的信息,我们完全可以放到本地缓存中。

以下是笔者在开发过程中简单实现的几个类。

一、缓存对象类

/**
 * 
 * @from www.ityuan.com
 * @date 2017年12月29日 下午2:19:32
 */
public class LocalCacheObject {
    /**
     * 缓存对象
     */
    private Object object;
    /**
     * 缓存的时间
     */
    private long time;
    public LocalCacheObject(Object obj, long time){
        this.object = obj;
        this.time = time;
    }
    
    public void setValueAndTime(Object obj, long time){
        this.object = obj;
        this.time = time;
    }
    
    public long getTime(){
        return time;
    }
    
    public void setTime(long time) {
        this.time = time;
    }

    public Object getObject(){
        return object;
    }

    @Override
    public String toString() {
        return "LocalCacheObject [object=" + object + ", time=" + time + "]";
    }
}

二、缓存容器工具类,提供过期异步更新数据策略

/**
 * @from www.ityuan.com
 * @date 2017年12月29日 下午2:21:17
 */
public class LocalCacheUtils {
    private static ConcurrentHashMap<String, LocalCacheObject> localCacheMap = new ConcurrentHashMap<String, LocalCacheObject>();
    private static final Logger logger = LoggerFactory.getLogger(LocalCacheUtils.class);
    /**
     * 本地缓存过期时间,2分钟
     */
    private static final long EXPIRE_TIME = 2 * 60 * 1000;
    
    public static Object getLocalCache(String key, GetValueCallback callback) {
        return getLocalCache(key, callback, EXPIRE_TIME);
    }
    
    public static Object getLocalCache(String key, GetValueCallback callback,long expireTime){
        LocalCacheObject obj = localCacheMap.get(key);
        long currentTime = System.currentTimeMillis();
        if(obj == null){
            Object val = callback.getValue();
            obj = new LocalCacheObject(val, currentTime);
            logger.info("####保存本地缓存:key="+key+", time="+currentTime+", Obj="+obj);
            localCacheMap.putIfAbsent(key, obj);
            return obj.getObject();
        } else {            
            if(currentTime - obj.getTime() > expireTime){
                // 防止重复多次调异步接口
                obj.setTime(currentTime);
                asynUpdateCache(key, callback);
            } 
            logger.info("####读取本地缓存:key="+key+", time="+currentTime+", Obj="+obj);
            return obj.getObject();
        }
    }
    
    private static void asynUpdateCache(final String key,final GetValueCallback callback) {
        ThreadUtils.execute(new Runnable() {
            @Override
            public void run() {
                long currentTime = System.currentTimeMillis();
                Object val = callback.getValue();
                LocalCacheObject obj = localCacheMap.get(key);
                logger.info("####异步更新本地缓存:key="+key+", time="+currentTime+", newValue="+val+", oldObj="+obj);
                obj.setValueAndTime(val, currentTime);
            }
        });
    }
    
}

三、缓存原数据获取回调接口类

/**
 * @from www.ityuan.com
 * @date 2017年12月29日 下午2:21:17
 */
public class LocalCacheUtils {
    private static ConcurrentHashMap<String, LocalCacheObject> localCacheMap = new ConcurrentHashMap<String, LocalCacheObject>();
    private static final Logger logger = LoggerFactory.getLogger(LocalCacheUtils.class);
    /**
     * 本地缓存过期时间,2分钟
     */
    private static final long EXPIRE_TIME = 2 * 60 * 1000;
    
    public static Object getLocalCache(String key, GetValueCallback callback) {
        return getLocalCache(key, callback, EXPIRE_TIME);
    }
    
    public static Object getLocalCache(String key, GetValueCallback callback,long expireTime){
        LocalCacheObject obj = localCacheMap.get(key);
        long currentTime = System.currentTimeMillis();
        if(obj == null){
            Object val = callback.getValue();
            obj = new LocalCacheObject(val, currentTime);
            logger.info("####保存本地缓存:key="+key+", time="+currentTime+", Obj="+obj);
            localCacheMap.putIfAbsent(key, obj);
            return obj.getObject();
        } else {            
            if(currentTime - obj.getTime() > expireTime){
                // 防止重复多次调异步接口
                obj.setTime(currentTime);
                asynUpdateCache(key, callback);
            } 
            logger.info("####读取本地缓存:key="+key+", time="+currentTime+", Obj="+obj);
            return obj.getObject();
        }
    }
    
    private static void asynUpdateCache(final String key,final GetValueCallback callback) {
        ThreadUtils.execute(new Runnable() {
            @Override
            public void run() {
                long currentTime = System.currentTimeMillis();
                Object val = callback.getValue();
                LocalCacheObject obj = localCacheMap.get(key);
                logger.info("####异步更新本地缓存:key="+key+", time="+currentTime+", newValue="+val+", oldObj="+obj);
                obj.setValueAndTime(val, currentTime);
            }
        });
    }
    
}

四、简单的异步执行工具类,需要spring.xml配置注入多线池:bean commonExecutor

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class ThreadUtils {
    
    private static ThreadPoolTaskExecutor commonExecutor;
    
    
    public void setCommonExecutor(ThreadPoolTaskExecutor commonExecutor){
        ThreadUtils.commonExecutor = commonExecutor;
    }
    
    /**
     * 直接在公共线程池中执行线程
     * @param runnable 可运行对象
     */
    public static void execute(Runnable runnable) {
        try {
            commonExecutor.execute(runnable);
        } catch (Exception e) {
        }
    }
    
    
    /**
     * 新建一个线程池
     * @param threadSize 同时执行的线程数大小
     * @return ExecutorService
     */
    public static ExecutorService newExecutor(int threadSize){
        return Executors.newFixedThreadPool(threadSize);
    }
    
    /**
     * 获得一个新的线程池
     * @return ExecutorService
     */
    public static ExecutorService newExecutor(){
        return Executors.newCachedThreadPool();
    }
    
    /**
     * 获得一个新的线程池,只有单个线程
     * @return ExecutorService
     */
    public static ExecutorService newSingleExecutor(){
        return Executors.newSingleThreadExecutor();
    }
    
    /**
     * 执行异步方法
     * @param runnable 需要执行的方法体
     * @return 执行的方法体
     */
    public static Runnable excAsync(final Runnable runnable){
        Thread thread = new Thread(){
            @Override
            public void run() {
                runnable.run();
            }
        };
        thread.start();
        
        return runnable;
    }
    
    /**
     * 执行有返回值的异步方法<br/>
     * Future代表一个异步执行的操作,通过get()方法可以获得操作的结果,如果异步操作还没有完成,则,get()会使当前线程阻塞
     * @return Future
     */
    public static <T> Future<T> execAsync(Callable<T> task){
        return commonExecutor.submit(task);
    }
}

五、应用实例

Object obj = LocalCacheUtils.getLocalCache(OP_SYS_KEY, new GetValueCallback() {
                @Override
                public Object getValue() {
                    SysConfig config = RedisUtils.getByKey(OP_SYS_KEY);
                    if (config == null) {
                        return null;
                    } else {
                        return config.getContent();
                    }
                }

}, 2 * 60 * 1000);

以上应用读取Key:OP_SYS_KEY的本地缓存,如果本地有,则从本地获取,不存在则从Redis缓存获取。Redis取不到,则从数据库端获取。如果获取本地缓存时,时间过期2分钟了,则重新异步从Redis获取最新的缓存数据,确保本地缓存能一定频率的自动更新最新数据。