//......
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
if (DefaultMQPushConsumerImpl().isConsumeOrderly()) {
return;
}
int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;appreciate的用法
for (int i = 0; i < loop; i++) {
MessageExt msg = null;
try {
adLock().lockInterruptibly();
try {
今的组词
if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.ConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) msg = msgTreeMap.firstEntry().getValue();
} el {
break;
}
考虑周全} finally {
翁读音
adLock().unlock();
周杰伦作曲的歌
}
} catch (InterruptedException e) {
<("getExpiredMsg exception", e);
}
try {
pushConsumer.ndMessageBack(msg, 3);
log.info("nd expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOfft={}", Topic(), MsgId(), StoreHost(), QueueId( try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
if (!msgTreeMap.isEmpty() && QueueOfft() == msgTreeMap.firstKey()) {
try {
removeMessage(Collections.singletonList(msg));
} catch (Exception e) {
<("nd expired msg exception", e);
}
}
文迪雅
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
<("getExpiredMsg exception", e);
}
} catch (Exception e) {
<("nd expired msg exception", e);
古墓探险
}
}
}
public long removeMessage(final List<MessageExt> msgs) {
long result = -1;
final long now = System.currentTimeMillis();
try {
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
毕业论文大专if (!msgTreeMap.isEmpty()) {
result = this.queueOfftMax + 1;
int removedCnt = 0;
for (MessageExt msg : msgs) {
MessageExt prev = QueueOfft());
if (prev != null) {
removedCnt--;
msgSize.addAndGet(0 - Body().length);
}
}
msgCount.addAndGet(removedCnt);
if (!msgTreeMap.isEmpty()) {
result = msgTreeMap.firstKey();
}