Skywalking收集与发送链路数据部分源码解析
链路收集⼤体逻辑
这⾥先不分析skywalking是如何⾃动收集数据的,⽽是说⼀下agent在收集后如何存储与发送给collector,这部分的架构关系到性能开销与对服务的影响
⼤体逻辑如下:
agent内部缓存维护了⼀个⽣产消费者,收集数据时将⽣产的数据按分区放到缓存中,消费者⽤多线程消费数据,将缓存的数据封装成grpc 对象发送给collector
链路数据接收与发送
数据的接收与发送主要在类TraceSegmentServiceClient中处理
其中的⼀个重要属性是DataCarrier,它来实现的⽣产消费模式
private volatile DataCarrier<TraceSegment> carrier;
⼤致结构如下
DataCarrier.png
DataCarrier
属性如下:
//⼀个buffer的⼤⼩
private final int bufferSize;
//channel的⼤⼩
private final int channelSize;
private Channels<T> channels;
//消费者线程池封装
private ConsumerPool<T> consumerPool;
private String name;
⽅法#produce⽣产数据
public boolean produce(T data) {
if (consumerPool != null) {
if (!consumerPool.isRunning()) {
return fal;
}
}
满意的反义词
return this.channels.save(data);
}
channel的save⽅法
public boolean save(T data) {
//计算放在channel哪个位置
int index = dataPartitioner.partition(bufferChannels.length, data);
//重试次数
微信头像动物int retryCountDown = 1;
if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
int maxRetryCount = dataPartitioner.maxRetryCount();
if (maxRetryCount > 1) {
retryCountDown = maxRetryCount;
}
}
for (; retryCountDown > 0; retryCountDown--) {
//保存成功返回true
if (bufferChannels[index].save(data)) {
return true;
}
}
return fal;
}
进⼊到Buffer的save⽅法,TraceSegmentServiceClient⽤的策略是IF_POSSIBLE,缓存位置还有值直接返回,所以消费不过来会丢失部分数据
boolean save(T data) {
//数组位置⾃增
int i = AndIncrement();
//不为空的处理
if (buffer[i] != null) {
switch (strategy) {
ca BLOCKING:
boolean isFirstTimeBlocking = true;
while (buffer[i] != null) {
if (isFirstTimeBlocking) {
isFirstTimeBlocking = fal;保密英文
for (QueueBlockingCallback<T> callback : callbacks) {
游园不值意思
}
}
try {
Thread.sleep(1L);
} catch (InterruptedException e) {
}
}
break;
ca IF_POSSIBLE:
return fal;
ca OVERRIDE:
default:
}
}
//写⼊缓存
buffer[i] = data;
return true;
}
DataCarrier的consume⽅法初始化消费者线程池
public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
if (consumerPool != null) {
consumerPool.clo();
}
consumerPool = new ConsumerPool<T>(this.name, this.channels, consumerClass, num, consumeCycle);
consumerPool.begin();
return this;
}
参数consumerClass就是TraceSegmentServiceClient类⾃⼰,实现具体的消费⽅法,初始化以后线程
池就启动了consumerPool⽅法begin
public void begin() {
if (running) {
return;
}
try {
lock.lock();
//把channel分给不同的thread
this.allocateBuffer2Thread();
for (ConsumerThread consumerThread : consumerThreads) {
consumerThread.start();
苏东坡传
}
running = true;
} finally {
lock.unlock();
}
}
cusumerThread的run⽅法
@Override
public void run() {
running = true;
while (running) {
boolean hasData = consume();
if (!hasData) {
try {
Thread.sleep(consumeCycle);
} catch (InterruptedException e) {
}
}
}
consume();
}
最终会调到的TraceSegmentServiceClient这个消费者的consume⽅法,将TraceSegment转换成grpc对象发送给collector
@Override
public void consume(List<TraceSegment> data) {
if (CONNECTED.equals(status)) {
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(fal);
StreamObrver<UpstreamSegment> upstreamSegmentStreamObrver = llect(new StreamObrver<Downstream>() { @Override飞向苗乡侗寨
public void onNext(Downstream downstream) {
}
@Override
public void onError(Throwable throwable) {
status.finished();
if (logger.isErrorEnable()) {
<(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");
}
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
}
@Override
public void onCompleted() {
status.finished();
}
});
北京遇上西雅图
for (TraceSegment gment : data) {
try {
UpstreamSegment upstreamSegment = ansform();
} catch (Throwable t) {
<(t, "Transform and nd UpstreamSegment to collector fail.");
}
}
status.wait4Finish();
gmentUplinkedCounter += data.size();陈老师泄油瘦身汤
} el {
gmentAbandonedCounter += data.size();
}
printUplinkStatus();
}
channel
channel中包含⼀个Buffer数组:
//Buffer数组
private final Buffer<T>[] bufferChannels;
//数据分区策略
private IDataPartitioner<T> dataPartitioner;
/
/Buffer策略
private BufferStrategy strategy;
Buffer对象