KafkaServer端异常和报错分析
Server启动异常
[2020-06-02 16:00:45,898] ERROR There was an error in one of the threads during logs loading: org.pes.SchemaException: Error re [2020-06-02 16:00:45,900] FATAL [Kafka Server 0], Fatal error during KafkaServer startup. Prepare to shutdown (kafka.rver.KafkaServer)
org.pes.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException
投足at org.ad(Schema.java:75)
at kafka.log.ProducerStateManager$.readSnapshot(ProducerStateManager.scala:289)
at kafka.log.ProducerStateManager.loadFromSnapshot(ProducerStateManager.scala:440)
at kafka.uncateAndReload(ProducerStateManager.scala:499)
韩家英at kafka.log.Log.loadProducerState(Log.scala:467)
at kafka.log.Log.<init>(Log.scala:191)
行开头的成语
at kafka.log.Log$.apply(Log.scala:1580)
at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$5$$anonfun$apply$12$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:172)
罗贯中简介
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:57)
// 源码位置
LogManager.loadLogs(){
val jobsForDir = for {
dirContent <- Option(dir.listFiles).toList
logDir <- dirContent if logDir.isDirectory
} yield {
CoreUtils.runnable {
val current = Log.apply(){
val topicPartition = Log.parTopicPartitionName(dir)
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
new Log(){
private val lastflushedTime = new AtomicLong(time.milliconds)
val leaderEpochCache: LeaderEpochCache = initializeLeaderEpochCache()
loadSegments()
loadProducerState(logEndOfft, reloadFromCleanShutdown = hasCleanShutdownFile){
if (producerStateManager.latestSnapshotOfft.isEmpty && (messageFormatVersion < RecordBatch.MAGIC_VALUE_V2 || reloadFromCleanShutdown)) {
//
} el {
if (logEndOfft != mapEndOfft) {
producers.clear()
ongoingTxns.clear()
unreplicatedTxns.clear()
loadFromSnapshot(logStartOfft, currentTimeMs);{
while (true) {
latestSnapshotFile match {抗击疫情绘画
米酒的功效
ca Some(file) =>{
info(s"Loading producer state from snapshot file ${Name} for partition $topicPartition")
val loadedProducers = readSnapshot(file){//ProducerStateManager.&.readSnapshot()
val buffer = Path)
val struct = ad(ByteBuffer.wrap(buffer));{//Read a struct from the buffer: 从缓存buff中读取⼀个Struct对象;
疫情一封信Object[] objects = new Object[fields.length];
for (int i = 0; i < fields.length; i++) {
try {
objects[i] = fields[i].ad(buffer);
} catch (Exception e) {// 是下⾯这⾥抛异常;
throw new SchemaException("Error reading field '" + fields[i].name + "': " +(e.getMessage() == null ? e.getClass().getName() : e.getMess }
}
}
}
中学生社会实践报告.filter { producerEntry => isProducerRetained(producerEntry, logStartOfft) && !isProducerExpired(currentTime, producerEntry)}
loadedProducers.foreach(loadProducerEntry)
}
}
}
}
}
}
}
}
}
}