KafkaServer端异常和报错分析
真人外教一对一Server启动异常
[2020-06-02 16:00:45,898] ERROR There was an error in one of the threads during logs loading: org.pes.SchemaException: Error re [2020-06-02 16:00:45,900] FATAL [Kafka Server 0], Fatal error during KafkaServer startup. Prepare to shutdown (kafka.rver.KafkaServer)
org.pes.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException
at org.ad(Schema.java:75)
at kafka.log.ProducerStateManager$.readSnapshot(ProducerStateManager.scala:289)
乱世家人at kafka.log.ProducerStateManager.loadFromSnapshot(ProducerStateManager.scala:440)
at kafka.uncateAndReload(ProducerStateManager.scala:499)
at kafka.log.Log.loadProducerState(Log.scala:467)
at kafka.log.Log.<init>(Log.scala:191)
at kafka.log.Log$.apply(Log.scala:1580)
at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$5$$anonfun$apply$12$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:172)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:57)
// 源码位置
LogManager.loadLogs(){
lookforwardval jobsForDir = for {
dirContent <- Option(dir.listFiles).toList
logDir <- dirContent if logDir.isDirectory
} yield {
apparentCoreUtils.runnable {
val current = Log.apply(){
val topicPartition = Log.parTopicPartitionName(dir)
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
new Log(){
private val lastflushedTime = new AtomicLong(time.milliconds)
val leaderEpochCache: LeaderEpochCache = initializeLeaderEpochCache()
loadSegments()
loadProducerState(logEndOfft, reloadFromCleanShutdown = hasCleanShutdownFile){
jumprope
if (producerStateManager.latestSnapshotOfft.isEmpty && (messageFormatVersion < RecordBatch.MAGIC_VALUE_V2 || reloadFromCleanShutdown)) {
//
赐予者
} el {
if (logEndOfft != mapEndOfft) {
producers.clear()
ongoingTxns.clear()
unreplicatedTxns.clear()
loadFromSnapshot(logStartOfft, currentTimeMs);{
while (true) {
latestSnapshotFile match {
ca Some(file) =>{
info(s"Loading producer state from snapshot file ${Name} for partition $topicPartition")
val loadedProducers = readSnapshot(file){//ProducerStateManager.&.readSnapshot()
val buffer = Path)
val struct = ad(ByteBuffer.wrap(buffer));{//Read a struct from the buffer: 从缓存buff中读取⼀个Struct对象;
Object[] objects = new Object[fields.length];
for (int i = 0; i < fields.length; i++) {
try {
objects[i] = fields[i].ad(buffer);
} catch (Exception e) {// 是下⾯这⾥抛异常;
throw new SchemaException("Error reading field '" + fields[i].name + "': " +(e.getMessage() == null ? e.getClass().getName() : e.getMess }
}
}
}
.filter { producerEntry => isProducerRetained(producerEntry, logStartOfft) && !isProducerExpired(currentTime, producerEntry)}
loadedProducers.foreach(loadProducerEntry)
}
}
}
}proportionate
}
}
}
deadline
}
四年级英语试卷分析}
connect}