KafkaMetrics模块解析背景
Metrics是kafka内部使⽤的监控模块,主要有以下⼏个组成部分:
1. Measurable
2. Stat
3. Sensor
4. Metric
类结构
我们先来看⼀下这些类的继承关系和结构,有个⼤概的认识
Measurable
Stat
- Sensor
Metric
孔雀图片绘画
接⼝分析
1. Measurable
Measurable接⼝是度量类型最基础的接⼝,通过measure()⽅法获取被监控的值。
牛肉酱做法public interface Measurable extends MetricValueProvider<Double> {
double measure(MetricConfig config, long now);
}
2. Stat
Stat接⼝表⽰需要经过统计计算的度量类型,例如平均值、最⼤值、最⼩值等,通过record()⽅法记录某值并更新度量值。
public interface Stat {
public void record(MetricConfig config, double value, long timeMs);
}
MeasuleStat继承了Measureable接⼝和Stat接⼝,并没有添加新的⽅法。CompoundStat接⼝表⽰多个Stat的组合。
SampledStat是⼀个⽐较重要的抽象类,它表⽰⼀个抽样的度量值,除了Total外的其他MeasureableS
tat接⼝实现都依赖它功能。在SampleStat中可以有多个Sample并通过多个Sample完成对⼀个值的度量,在每个Sample中都记录了其对应的时间窗⼝和事件数
量,SampledStat在计算最终的结果值时,可以根据这两个值决定是否使⽤此sample中的数据。SampledStat实现了MeasuleStat接⼝的record()⽅法和measure()⽅法。在record()⽅法中会根据时间窗⼝和事件数使⽤合适的Sample对象进⾏记录。
public void record(MetricConfig config, double value, long timeMs) {
// 拿到当前时间的sample对象
Sample sample = current(timeMs);
// 检测当前sample是否已经完成取样
if (sample.isComplete(timeMs, config))
sample = advance(config, timeMs);
// 更新sample对象
update(sample, config, value, timeMs);
// smaple对象的事件数加1
sample.eventCount += 1;
}
measure()⽅法⾸先会将过期的sample重置,之后调⽤combine⽅法完成计算。combine⽅法是抽象⽅法,不同⼦类有不同的实现。
public double measure(MetricConfig config, long now) {
// 检查sample是否过期
purgeObsoleteSamples(config, now);
return combine(this.samples, config, now);
}
3. Sensor
在实际应⽤中,对同⼀个操作需要有多个不同⽅⾯的度量,例如需要监控请求的最⼤长度,同时也需要监控请求的平均长度等。kafka通过将多个相关的度量对象封装在进nsor中实现。
4. Metric
Metrics类,负责统⼀管理Sensor对象、KafkaMetric对象。
public class Metrics implements Cloable {
// 默认配置信息
private final MetricConfig config;
// 保存了添加到Metrics中的KafkaMetrics对象
private final ConcurrentMap<MetricName, KafkaMetric> metrics;
// 保存了添加到Metrics中的Sensor的集合
private final ConcurrentMap<String, Sensor> nsors;
// 记录了每个Sensor的⼦Sensor集合
法务实习报告
private final ConcurrentMap<Sensor, List<Sensor>> childrenSensors;
private final List<MetricsReporter> reporters;
private final Time time;
private final ScheduledThreadPoolExecutor metricsScheduler;
private static final Logger log = Logger(Metrics.class);
拌小根蒜// 从nsors集合中获取nsor对象,如果指定的Sensor不存在则创建新Sensor对象,并使⽤childrenSensors集合记录Sensor的层级关系
public synchronized Sensor nsor(String name, MetricConfig config, long inactiveSensorExpirationTimeSeconds, Sensor.RecordingLevel recordingLev el, parents) {
// 根据name从nsors集合中获取nsor对象
Sensor s = getSensor(name);
if (s == null) {
// 如果不存在则创建nsor对象
s = new Sensor(this, name, parents, config == null ? fig : config, time, inactiveSensorExpirationTimeSeconds, recordingLevel);
this.nsors.put(name, s);
if (parents != null) {
// 通过childrenSensors记录nsor的层级关系
for (Sensor parent : parents) {
List<Sensor> children = (parent);
if (children == null) {
children = new ArrayList<>();
childrenSensors.put(parent, children);
}
children.add(s);
}
}
log.debug("Added nsor with name {}", name);
}
return s;
}
}
注册会计师收入使⽤场景
Producer、Consumer、Broker都会⽤到。下⾯以Producer举例。
Producer的构造函数中会初始化Metrics。车昆怎么读
MetricConfig metricConfig = new MetricConfig().Int(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.Long(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.String(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
List<MetricsReporter> reporters = ConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); reporters.add(new JmxReporter(JMX_PREFIX));
Producer主要⽤Metrics来度量和统计"produce-throttle-time"的相关指标。
public static Sensor throttleTimeSensor(SenderMetricsRegistry metrics) {
Sensor produceThrottleTimeSensor = metrics.nsor("produce-throttle-time");
produceThrottleTimeSensor.add(metrics.produceThrottleTimeAvg, new Avg());
produceThrottleTimeSensor.add(metrics.produceThrottleTimeMax, new Max());
return produceThrottleTimeSensor;
}
如上,metrics⾸先注册了名为“produce-throttle-time”的nsor。然后给这个nsor加了两个指标,分别是produceThrottleTimeAvg(平均值)和produceThrottleTimeMax(最⼤值)。这两个指标对应的度量⽅法分别是Avg的实例对象和Max的实例对象。
什么触发这些指标的统计呢?答案是在客户端收到发送消息的Respon后。如下:
<(CommonFields.THROTTLE_TIME_MS), now);
莲花诗词这个record⽅法解析如下:
public void record(double value, long timeMs, boolean checkQuotas) {
if (shouldRecord()) {
this.lastRecordTime = timeMs;
// 线程安全
synchronized (this) {
// 遍历所有stat,这⾥对应的是上⽂的Avg和Max
for (Stat stat : this.stats)
if (checkQuotas)
checkQuotas(timeMs);
}
for (Sensor parent : parents)
}
}
Avg和Max都继承了SampledStat的record()⽅法。
public void record(MetricConfig config, double value, long timeMs) {
Sample sample = current(timeMs);
if (sample.isComplete(timeMs, config))
根本上师sample = advance(config, timeMs);
// 这⾥的update就由各⼦类单独实现。
update(sample, config, value, timeMs);
sample.eventCount += 1;
}
/
/ Avg
@Override
protected void update(Sample sample, MetricConfig config, double value, long now) {
// 很简单,先求和
sample.value += value;
}
// Max
@Override
protected void update(Sample sample, MetricConfig config, double value, long now) {
// 直接取最⼤值
sample.value = Math.max(sample.value, value);
}
最后这两个指标的计算会由JmxReporter调⽤,最终的计算逻辑在SampledStat的combine()⽅法中。指标值最终会呈现在jmx中。