public class CacheImpl implements ICache{
/**
* 最大缓存数量
*/
static final private int MAXCACHESIZE = 1000000;
/**
* 散列映射表的默认初始容量为 16,即初始默认为 16 个桶 在构造函数中没有指定这个参数时,使用本参数
*/
static final int DEFAULT_INITIAL_CAPACITY = 16;
/**
* 散列映射表的默认装载因子为 0.75,该值是 table 中包含的 HashEntry 元素的个数与 table 数组长度的比值 当 table
* 中包含的 HashEntry 元素的个数超过了 table 数组的长度与装载因子的乘积时, 将触发 再散列
* 在构造函数中没有指定这个参数时,使用本参数
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;
/**
* 散列表的默认并发级别为 16。该值表示当前更新线程的估计数 在构造函数中没有指定这个参数时,使用本参数
*/
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* 散列映射表最大的容量
*/
static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* 最大的缓存片个数
*/
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
/* ---------------- Fields -------------- */
/**
* segments 的掩码值 key 的散列码的高位用来选择具体的 segment
*/
final int segmentMask;
/**
* 偏移量
*/
final int segmentShift;
/**
* 定位到缓存片的hash算法
*
* @param h
* @return
*/
private static int hash(int h) {
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
final Cache cacheFor(int hash) {
return this.caches[(hash >>> segmentShift) & segmentMask];
}
/**
* 缓存实体
*
s */
static final class CacheEntry implements Serializable {
/**
*
*/
private static final long serialVersionUID = 7664619266911312995L;
final String key;
final int hash;
volatile Object value;
volatile long livetime;
final CacheEntry next;
CacheEntry(String key, int hash, CacheEntry next, Object value, long livetime) {
this.key = key;
this.hash = hash;
this.next = next;
this.value = value;
this.livetime = livetime;
}
static final CacheEntry[] newArray(int i) {
return new CacheEntry[i];
}
}
/**
* The segments, each of which is a specialized hash table
*/
final Cache[] caches;
/**
* 缓存片
*/
static final class Cache extends ReentrantLock implements Serializable {
/**
*
*/
private static final long serialVersionUID = 3015261008227415326L;
/**
* 每一块缓存最大缓存个数
*/
transient int maxcache = 0;
/**
* 在本 Cache 范围内,包含的 CahceEntry 元素的个数 该变量被声明为 volatile 型
*/
transient volatile int count;
/**
* 线程是否开始清除缓存
*/
transient volatile boolean startclearcache = false;
/**
* table 被更新的次数
*/
transient int modCount;
/**
* 当 table 中包含的 CacheEntry 元素的个数超过本变量值时,触发 table 的再散列
*/
transient int threshold;
/**
* table 是由 CacheEntry 对象组成的数组 如果散列时发生碰撞,碰撞的 CacheEntry 对象就以链表的形式链接成一个链表
* table 数组的数组成员代表散列映射表的一个桶 每个 table 守护整个 ConcurrentCache 包含桶总数的一部分
* 如果并发级别为 16,table 则守护 ConcurrentCache 包含的桶总数的 1/16
*/
transient volatile CacheEntry[] table;
/**
* 总的插入次数
*/
private final AtomicLong totalput = new AtomicLong(0);
/**
* 总的get次数
*/
private final AtomicLong totalget = new AtomicLong(0);
/**
* 总的get成功次数
*/
private final AtomicLong totalgetsuccesses = new AtomicLong(0);
/**
* 装载因子
*/
final float loadFactor;
final int index;
Cache(int initialCapacity, float lf, int maxcache, int index) {
loadFactor = lf;
this.maxcache = maxcache;
this.index = index;
setTable(CacheEntry.newArray(initialCapacity));
}
static final Cache[] newArray(int i) {
return new Cache[i];
}
/**
* 设置 table 引用到这个新生成的CacheEntry 数组 只能在持有锁或构造函数中调用本方法
*/
void setTable(CacheEntry[] newTable) {
threshold = (int) (newTable.length * loadFactor);
table = newTable;
}
/**
* 根据 key 的散列值,找到 table 中对应的那个桶(table 数组的某个数组成员)
*/
CacheEntry getFirst(int hash) {
CacheEntry[] tab = table;
return tab[hash & (tab.length - 1)];
}
public long getTotalput() {
return this.totalput.get();
}
public long getTotalget() {
return this.totalget.get();
}
public long getTotalGetSuccesses() {
return this.totalgetsuccesses.get();
}
/**
* get的时候如果已经找到对应的key了,但是返回的是null值,则又可能当时正有读操作,加锁在读一次
* @param e
* @return
*/
Object readValueUnderLock(CacheEntry e) {
lock();
try {
return e.value;
} finally {
unlock();
}
}
/**
* 设置调取成功统计
*/
public void setGetSuccesses() {
this.totalgetsuccesses.incrementAndGet();
}
Object get(String key, int hash) {
totalget.incrementAndGet();
if (count != 0) { // read-volatile
CacheEntry e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
if (System.currentTimeMillis() > e.livetime)
return null;
Object v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
private KeyLinked head;
public KeyLinked current;
public final class KeyLinked implements Serializable {
/**
*
*/
private static final long serialVersionUID = -2057372648832421472L;
public String key;
public KeyLinked next;
// public KeyLinked head;
}
Object put(String key, int hash, Object value, boolean onlyIfAbsent, long livetime) {
/**
* 如果线程开始清除缓存,拒绝所有的put操作
*/
if (startclearcache) {
System.out.println("cache" + this.index + "put refuse");
return null;
}
lock();
try {
totalput.incrementAndGet();
int c = count;
if (c++ > threshold) // ensure capacity
rehash();
CacheEntry[] tab = table;
int index = hash & (tab.length - 1);
CacheEntry first = tab[index];
CacheEntry e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
Object oldValue;
if (e != null) {
oldValue = e.value;
e.value = value;
e.livetime = System.currentTimeMillis() + livetime * 1000;
} else {
oldValue = null;
++modCount;
tab[index] = new CacheEntry(key, hash, first, value, System.currentTimeMillis() + (livetime * 1000));
count = c; // write-volatile
/**
* 加入链表key
*/
if (head == null) {
head = new KeyLinked();
head.key = key;
current = head;
} else {
KeyLinked k = new KeyLinked();
k.key = key;
if (current == null)
current = head;
current.next = k;
current = k;
}
}
/**
* 如果该片缓存大于最大个数了,删除其中一个值 fifo淘汰策略
*/
if (count >= this.maxcache && head != null) {
/**
* 找到最老的key
*/
String oldkey = head.key;
if (oldkey != null) {
/**
* 从map中移除
*/
int hash1 = hash(oldkey.hashCode());
this.remove(oldkey, hash1, null);
/**
* 从链表中移除
*/
KeyLinked newhead = head.next;
head.next = null;
head = newhead;
}
}
if (modCount % 10000 == 0)
System.out.println("concurrentcache" + this.index + " size:" + count);
if (modCount == 100000000)
modCount = 0;
return oldValue;
} finally {
unlock();
}
}
void rehash() {
CacheEntry[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity >= MAXIMUM_CAPACITY)
return;
CacheEntry[] newTable = CacheEntry.newArray(oldCapacity << 1);
threshold = (int) (newTable.length * loadFactor);
int sizeMask = newTable.length - 1;
for (int i = 0; i < oldCapacity; i++) {
CacheEntry e = oldTable[i];
if (e != null) {
CacheEntry next = e.next;
int idx = e.hash & sizeMask;
if (next == null)
newTable[idx] = e;
else {
CacheEntry lastRun = e;
int lastIdx = idx;
for (CacheEntry last = next; last != null; last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone all remaining nodes
for (CacheEntry p = e; p != lastRun; p = p.next) {
int k = p.hash & sizeMask;
CacheEntry n = newTable[k];
newTable[k] = new CacheEntry(p.key, p.hash, n, p.value, p.livetime);
}
}
}
}
table = newTable;
}
boolean remove(Object key, int hash) {
lock();
try {
remove(key, hash, null);
return true;
} catch (Exception t) {
t.printStackTrace();
} finally {
unlock();
}
return false;
}
/**
* Remove; match on key only if value null, else match both.
*/
Object remove(Object key, int hash, Object value) {
// try {
int c = count - 1;
CacheEntry[] tab = table;
int index = hash & (tab.length - 1);
CacheEntry first = tab[index];
CacheEntry e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
Object oldValue = null;
if (e != null) {
Object v = e.value;
if (value == null || value.equals(v)) {
oldValue = v;
// All entries following removed node can stay
// in list, but all preceding ones need to be
// cloned.
++modCount;
CacheEntry newFirst = e.next;
for (CacheEntry p = first; p != e; p = p.next)
newFirst = new CacheEntry(p.key, p.hash, newFirst, p.value, p.livetime);
tab[index] = newFirst;
count = c; // write-volatile
}
}
return oldValue;
}
public int getCount() {
return count;
}
public int getCacheArrayLength() {
return this.table.length;
}
/**
* 线程开启清除过期缓存
*/
public void startClearCache() {
startClearCacheTimer();
}
/**
* 清除缓存,如果blockName为null,则表示清除过期缓存,否则清除key中包含blockname的缓存
* @param blockName
*/
private void startClearCacheWithBlockName(String blockName) {
/**
* 设置清除缓存开关
*/
startclearcache = true;
try {
if (head != null) {
KeyLinked k = head;
KeyLinked prv = head;
int total = 0;
while (k != null) {
total++;
if (total == 1) {
prv = k;
k = k.next;
continue;
}
int hash = hash(k.key.hashCode());
CacheEntry[] tab = table;
int index = hash & (tab.length - 1);
CacheEntry first = tab[index];
CacheEntry e = first;
while (e != null && (e.hash != hash || !k.key.equals(e.key)))
e = e.next;
if (total < this.maxcache) {
String[] blockNames = blockName.split("\\|");
boolean find = false;
for (String bn : blockNames) {
if (e == null || !e.key.contains(bn + ".")) {
continue;
}
find = true;
}
if (!find) {
prv = k;
k = k.next;
continue;
}
}
++modCount;
int c = count - 1;
CacheEntry newFirst = e.next;
for (CacheEntry p = first; p != e; p = p.next)
newFirst = new CacheEntry(p.key, p.hash, newFirst, p.value, p.livetime);
tab[index] = newFirst;
count = c; // write-volatile
/**
* 从链表中移除
*/
KeyLinked next = k.next;
k.next = null;
prv.next = next;
k = next;
}
}
} catch (Throwable te) {
te.printStackTrace();
} finally {
startclearcache = false;
}
}
private void startClearCacheTimer() {
/**
* 设置清除缓存开关
*/
startclearcache = true;
try {
CacheEntry[] tab = table;
if (tab == null || tab.length == 0)
return;
this.head = null;
this.current = null;
KeyLinked newHead = null;
KeyLinked newcurent = null;
int total = 0;
for (int i = 0; i < tab.length; i++) {
CacheEntry first = tab[i];
CacheEntry e = first;
if (e == null)
continue;
CacheEntry cleanCache = null;
while (e != null) {
total++;
if (e.livetime < System.currentTimeMillis() && cleanCache == null) {
cleanCache = e;
}
e = e.next;
}
if (cleanCache == null && total < this.maxcache) {
/**
* 创建链表
*/
if (newHead == null) {
newHead = new KeyLinked();
newHead.key = first.key;
newcurent = newHead;
} else {
e = first;
while (e != null) {
KeyLinked k = new KeyLinked();
k.key = k.key;
newcurent.next = k;
newcurent = k;
e = e.next;
}
}
continue;
}
e = first;
int temcout = 0;
while (e != null) {
temcout++;
e = e.next;
}
modCount += temcout;
int c = count - temcout;
count = c;
tab[i] = null;
}
this.head = newHead;
this.current = newcurent;
} catch (Throwable te) {
te.printStackTrace();
} finally {
startclearcache = false;
}
}
/**
* 删除所有缓存
*/
private void clearAllCache() {
/**
* 设置清除缓存开关
*/
startclearcache = true;
try {
CacheEntry[] tab = table;
if (tab == null || tab.length == 0)
return;
this.head = null;
this.current = null;
count = 0;
modCount = 0;
for (int i = 0; i < tab.length; i++) {
tab[i] = null;
}
} catch (Throwable te) {
te.printStackTrace();
} finally {
startclearcache = false;
}
}
}
public CacheImpl() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
public CacheImpl(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.caches = Cache.newArray(ssize);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = 1;
while (cap < c)
cap <<= 1;
for (int i = 0; i < this.caches.length; ++i)
this.caches[i] = new Cache(cap, loadFactor, MAXCACHESIZE / this.caches.length, i);
/**
* 启动清除缓存线程
*/
if (!startclear) {
clearCacheThread = clearCacheThread();
clearCacheThread.start();
}
}
/**
* 是否开始清除缓存
*/
private volatile Boolean startclear = false;
/**
* 最后一次清除缓存时间
*/
private volatile long lastclearTime = System.currentTimeMillis();
/**
* 清除缓存时间间隔
*/
private static final int LOAD_TIME_SPAN = 1000 * 60 * 60;
/**
* 停止清除缓存标示
*/
private volatile Boolean closeThread = false;
/**
* 清除缓存线程
*/
private transient Thread clearCacheThread;
/**
* 删除过期缓存线程
* @return
*/
private Thread clearCacheThread() {
return new Thread(new Runnable() {
public void run() {
if (startclear)
return;
startclear = true;
while (true) {
if (closeThread)
return;
if (System.currentTimeMillis() - lastclearTime < LOAD_TIME_SPAN) {
try {
Thread.sleep(LOAD_TIME_SPAN);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("start clear cache");
Cache cache = null;
for (int i = 0; i < 16; i++) {
cache = caches[i];
cache.startClearCache();
}
System.out.println("end clear cache");
lastclearTime = System.currentTimeMillis();
try {
Thread.sleep(LOAD_TIME_SPAN);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
break;
}
}
}
});
}
/**
* 添加缓存
*/
public void putCache(String cacheName, String elmkey, Object value, long times) {
if (value == null)
return;
String key = this.getKey(cacheName, elmkey);
int hash = hash(key.hashCode());
Cache c = cacheFor(hash);
if (c == null) {
System.out.println("key:" + elmkey + " not find cache");
return;
}
c.put(key, hash, value, false, times);
}
public Boolean PutAndUpdateCache(String cacheName, String elmkey, Object value, long times) {
this.putCache(cacheName, elmkey, value, times);
return true;
}
/**
* 获取缓存key
* @param cacheName
* @param elmkey
* @return
*/
public String getKey(String cacheName, String elmkey) {
return cacheName + "." + elmkey;
}
/**
* 获取缓存
*/
public Object getValue(String cacheName, String elmkey) {
String key = getKey(cacheName, elmkey);
int hash = hash(key.hashCode());
Cache c = cacheFor(hash);
if (c == null) {
System.out.println("key:" + elmkey + " not find cache");
return null;
}
Object value = c.get(key, hash);
if (value != null) {
c.setGetSuccesses();
}
return value;
}
/**
* 获取缓存(已经知道缓存key)
*
* @param cacheKey
* @return
*/
public Object getValue(String cacheKey) {
String key = cacheKey;
int hash = hash(key.hashCode());
Cache c = cacheFor(hash);
if (c == null) {
System.out.println("key:" + cacheKey + " not find cache");
return null;
}
Object value = c.get(key, hash);
if (value != null) {
c.setGetSuccesses();
}
return value;
}
/**
* 获取缓存中的详细缓存类型及类型的个数
*/
@SuppressWarnings("rawtypes")
public Map<String, Long> getCacheDetail() {
Map<String, Long> cacheMap = new TreeMap<String, Long>();
long totalput = 0;
long totalget = 0;
long totalsuc = 0;
for (int i = 0; i < this.caches.length; i++) {
KeyLinked k = this.caches[i].head;
totalput += this.caches[i].getTotalput();
totalget += this.caches[i].getTotalget();
totalsuc += this.caches[i].getTotalGetSuccesses();
while (k != null) {
if (this.caches[i] == null || k.key == null) {
continue;
}
Object o = this.caches[i].get(k.key, hash(k.key.hashCode()));
if (o == null) {
k = k.next;
continue;
}
String className = o.getClass().getName();
if (o.getClass().isArray() || (o instanceof Collection) || (o instanceof Map)) {
Iterator it = null;
if (o instanceof Collection)
it = ((Collection) o).iterator();
else if (o instanceof Map)
it = ((Map) o).values().iterator();
try {
if (it != null && it.hasNext()) {
Object cc = it.next();
if (cc != null) {
className = className + "." + cc.getClass().getName();
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
long count = 1;
if (cacheMap.containsKey(className)) {
count = cacheMap.get(className) + 1;
}
cacheMap.put(className, count);
k = k.next;
}
cacheMap.put("cacheCache"+i, (long)(this.caches[i].count));
}
cacheMap.put("totalput", totalput);
cacheMap.put("totalget", totalget);
cacheMap.put("totalGetSuccesses", totalsuc);
if(totalsuc == 0){
cacheMap.put("缓存命中率:", 0L);
}else {
cacheMap.put("缓存命中率(去掉了%):", totalsuc * 100 / totalget);
}
return cacheMap;
}
@SuppressWarnings("rawtypes")
public Iterator getIterator(Object obj) throws Exception {
if (obj instanceof Collection) {
return ((Collection) obj).iterator();
} else if (obj instanceof Map) {
return ((Map) obj).values().iterator();
}
return null;
}
/**
* 清除缓存
*/
public boolean remove(String key) {
int hash = hash(key.hashCode());
return cacheFor(hash).remove(key, hash);
}
/**
* 清除缓存
*/
public boolean remove(String block, String key) {
String cachekey = this.getKey(block, key);
return this.remove(cachekey);
}
/**
* 清除包含了blockName的缓存 异步清除
* @param blockName
*/
public void removeCacheByBlockName(String blockName) {
final String bn = blockName;
System.out.println("start clear blockcache:" + blockName);
new Thread(new Runnable() {
public void run() {
if (closeThread)
return;
Cache cache = null;
for (int i = 0; i < 16; i++) {
System.out.println("start clear blockcache:" + i);
cache = caches[i];
cache.startClearCacheWithBlockName(bn);
System.out.println("end clear blockcache:" + i);
}
System.out.println("end clear blockcache");
}
}).start();
}
public void stopCache() {
new Thread(new Runnable() {
public void run() {
if (closeThread)
return;
Cache cache = null;
for (int i = 0; i < 16; i++) {
System.out.println("start clear blockcache:" + i);
cache = caches[i];
cache.clearAllCache();
System.out.println("end clear blockcache:" + i);
}
System.out.println("end clear blockcache");
}
}).start();
}
}