应⽤监控CAT之cat-client源码阅读(⼀)
CAT 由⼤众点评开发的,基于 Java 的实时应⽤监控平台,包括实时应⽤监控,业务监控。对于及时发现线上问题⾮常有⽤。(不知道⼤家有没有在⽤)
应⽤⾃然是最初级的,⽤完之后,还想了解下其背后的原理,所以有了源码阅读⼀说。
今天来看看 cat-client 模块,重在调⽤⽅。
打开⽂件,⾸先看⼀下使⽤说明,背景,资料。ok,进⼊正题。
先⼤致看⼀下⽬录结构:
接下来,从样例开始着⼿,在这⾥从单元测试开始⼲活。
public class CatTest {
@Test
public void test() {
Transaction trans = wTransaction("logTransaction", "logTransaction");
Throwable cau = new Throwable();
Cat.logError(cau);
Cat.logError("message", cau);
Cat.logTrace("logTrace", "<trace>");
Cat.logTrace("logTrace", "<trace>", Trace.SUCCESS, "data");
Cat.logMetric("logMetric", "test", "test");
Cat.logMetricForCount("logMetricForCount");
Cat.logMetricForCount("logMetricForCount", 4);
Cat.logMetricForDuration("logMetricForDuration", 100);
Cat.logMetricForSum("logMetricForSum", 100);
Cat.logMetricForSum("logMetricForSum", 100, 100);
Cat.logEvent("RemoteLink", "Call", Message.SUCCESS, "Cat-0a010680-384736-2061");
Cat.logEvent("EventType", "EventName");
Cat.logHeartbeat("logHeartbeat", "logHeartbeat", Message.SUCCESS, null);
trans.tStatus(Transaction.SUCCESS);
// trans.tStatus(cau);
Asrt.asrtEquals(true, Cat.isInitialized());
}
}
看得出来,cat把其主要功能都列举在了这个单元测试⾥。⼤概功能就是,记录event,trace,error,metrics.
不过,咱们只讨论下其中个别类型的处理就O了。
先来看第⼀个创建事务的⽅法:
2012成都中考
// 进⼊⽅法查看,1. 先获取⽣产者; 2. 创建⼀个事务
public static Transaction newTransaction(String type, String name) {
Producer().newTransaction(type, name);
}
// 查看获取⽣产者的⽅法,检查是否已初始化,如果没有初始化则进⾏初始化,深度咱们就先到这⾥
public static MessageProducer getProducer() {
checkAndInitialize();
return s_instance.m_producer;
}
// 2. 创建⼀个事务,1.先获取上下⽂如果没有则新建; 2. 如果可以记录消息,则⽴马创建⼀个默认事务DefaultTransaction; 3. 开启执⾏,返回事务实例,供下⽂调⽤; @Override
public Transaction newTransaction(String type, String name) {
// this enable CAT client logging cat message without explicit tup
if (!m_manager.hasContext()) {
m_manager.tup();pcd
}
if (m_manager.isMessageEnabled()) {
DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager);
m_manager.start(transaction, fal);
return transaction;
} el {
return NullMessage.TRANSACTION;
}
}
/
/ 2.1. 如何获取当前上下⽂,
@Override
public void tup() {
Context ctx;
if (m_domain != null) {
ctx = new Context(Id(), m_hostName, Ip());
} el {
ctx = new Context("Unknown", m_hostName, "");
}
m_context.t(ctx);
}
/
/ 2.2. 检查是否已初始化上下⽂
@Override
public boolean hasContext() {
return () != null;
}
// 2.3. 上下⽂怎么保证线程安全,使⽤ ThreadLocal 线程变量
private ThreadLocal<Context> m_context = new ThreadLocal<Context>();
// 2.4. 开启⼀个事务,1. 获取上下⽂; 2. 开启上下⽂事务; 3. 如果是tag类型的事务,则将其放⼊ m_taggedTransactions; 配置有误,只提⽰⼀次警告
@Override
apologize是什么意思
public void start(Transaction transaction, boolean forked) {
Context ctx = getContext();
if (ctx != null) {
ctx.start(transaction, forked);
if (transaction instanceof TaggedTransaction) {
TaggedTransaction tt = (TaggedTransaction) transaction;
m_taggedTransactions.Tag(), tt);
}
} el if (m_firstMessage) {
m_firstMessage = fal;
m_logger.warn("CAT client is not enabled becau it's not initialized yet");
}
}
/
/ 2.4.1. 获取上下⽂
private Context getContext() {
if (Cat.isInitialized()) {
Context ctx = ();
if (ctx != null) {
return ctx;
} el {
if (m_domain != null) {
ctx = new Context(Id(), m_hostName, Ip());
} el {
我需要你英文ctx = new Context("Unknown", m_hostName, "");
}
m_context.t(ctx);
return ctx;
}
}
return null;
}
// 2.4.2. 开启事务,1. 如果stack为空就把事务设置到m_tree上,否则处理⼦节点; 2. 把事务压⼊栈中;
public void start(Transaction transaction, boolean forked) {
if (!m_stack.isEmpty()) {
// Do NOT make strong reference from parent transaction to forked transaction.
/
/ Instead, we create a "soft" reference to forked transaction later, via linkAsRunAway()
// By doing so, there is no need for synchronization between parent and child threads.
// Both threads can complete() anytime despite the other thread.
if (!(transaction instanceof ForkedTransaction)) {
Transaction parent = m_stack.peek();
addTransactionChild(transaction, parent);
}
} el {
m_tree.tMessage(transaction);
}
if (!forked) {
m_stack.push(transaction);
}
}
// 2.4.3. 上下⽂结构
public Context(String domain, String hostName, String ipAddress) {
advancement
m_tree = new DefaultMessageTree(); // 创建⼀个消息树
m_stack = new Stack<Transaction>(); // 存放栈信息
Thread thread = Thread.currentThread();
中秋习俗
String groupName = ThreadGroup().getName();
m_tree.tThreadGroupName(groupName);
m_tree.tThreadId(String.Id()));
m_tree.Name());
m_tree.tDomain(domain);
m_tree.tHostName(hostName);
m_tree.tIpAddress(ipAddress);
m_length = 1;
m_knownExceptions = new HashSet<Throwable>();
}
// DefaultModuleInitializer
@Override
public void execute(ModuleContext ctx, modules) {
Set<Module> all = new LinkedHashSet<Module>();
info(ctx, "Initializing top level modules:");
for (Module module : modules) {
info(ctx, " " + Class().getName());
}
try {
expandAll(ctx, modules, all);
for (Module module : all) {
if (!module.isInitialized()) {
executeModule(ctx, module, m_index++);
}
}
} catch (Exception e) {
throw new RuntimeException("Error when initializing modules! Exception: " + e, e);
}
}
// 调⽤executeModule⽅法,初始化数据
private synchronized void executeModule(ModuleContext ctx, Module module, int index) throws Exception { long start = System.currentTimeMillis();
// t flat to avoid re-entrance
module.tInitialized(true);
info(ctx, index + " ------ " + Class().getName());
// execute itlf after its dependencies
module.initialize(ctx);
long end = System.currentTimeMillis();
info(ctx, index + " ------ " + Class().getName() + " DONE in " + (end - start) + " ms.");
}
// cat初始化
// this should be called during application initialization time
public static void initialize(File configFile) {
PlexusContainer container = DefaultContainer();
initialize(container, configFile);
}
public static void initialize(PlexusContainer container, File configFile) {
ModuleContext ctx = new DefaultModuleContext(container);
/
/ 该⽅法会去 l中查找 org.unidal.initialization.Module 的实现类,
Module module = ctx.lookup(Module.class, CatClientModule.ID);
if (!module.isInitialized()) {
ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);
ctx.tAttribute("cat-client-config-file", configFile);
欺骗英文
}
}
// l 中配置的 Module, 加载⼊ CatClientModule
<component>
<role>org.unidal.initialization.Module</role>
<role-hint>cat-client</role-hint>
<implementation>com.dianping.cat.CatClientModule</implementation>
</component>
// l 中配置⽇志输出
<plexus>
<components>
<component>
<role&dehaus.plexus.logging.LoggerManager</role>
<implementation>org.unidal.lookup.logger.TimedConsoleLoggerManager</implementation> <configuration>
<dateFormat>MM-dd HH:mm:ss.SSS</dateFormat>
<showClass>true</showClass>
<logFilePattern>cat_{0,date,yyyyMMdd}.log</logFilePattern>
<baDirRef>CAT_HOME</baDirRef>
<defaultBaDir>/data/applogs/cat</defaultBaDir>
</configuration>
</component>
</components>
</plexus>
// logEvent 举个例⼦,event处理过程
Cat.logEvent("RemoteLink", "Call", Message.SUCCESS, "Cat-0a010680-384736-2061");
// 进⼊⽅法
public static void logEvent(String type, String name, String status, String nameValuePairs) {
}
// DefaultMessageProducer, logEvent
@Override
public void logEvent(String type, String name, String status, String nameValuePairs) {
Event event = newEvent(type, name);
if (nameValuePairs != null && nameValuePairs.length() > 0) {
event.addData(nameValuePairs);
}
event.tStatus(status);
}
// DefaultEvent, complete ⽅法
@Override
public void complete() {
tCompleted(true);
if (m_manager != null) {
m_manager.add(this);
}
}
// DefaultMessageManager, add⽅法,添加到上下⽂中
@Override
public void add(Message message) {
Context ctx = getContext();
if (ctx != null) {
ctx.add(message);
}
}
leviev
// DefaultMessageManager, 最终添加⽅法
public void add(Message message) {
if (m_stack.isEmpty()) {
MessageTree tree = py();
tree.tMessage(message);
flush(tree);
} el {
Transaction parent = m_stack.peek();
addTransactionChild(message, parent);
}
}
// DefaultMessageManager, 发送刷写数据
public void flush(MessageTree tree) {
if (MessageId() == null) {
tree.tMessageId(nextMessageId());
}
MessageSender nder = Sender();
if (nder != null && isMessageEnabled()) {
nder.nd(tree);
ret();
} el {
m_throttleTimes++;
if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) {
m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes);
}
}
}
/
/ TcpSocketSender, 发送数据
fly的第三人称单数// 先插⼊ BlockingQueue<MessageTree> m_queue 阻塞队列中,如果插⼊失败,则进⾏⽇志队列检查
@Override
public void nd(MessageTree tree) {
if (isAtomicMessage(tree)) {
boolean result = m_atomicTrees.offer(tree, Sample());
if (!result) {
logQueueFullInfo(tree);
}
} el {
boolean result = m_queue.offer(tree, Sample());
if (!result) {
logQueueFullInfo(tree);
}
}
}
// ⽇志队列检查
private void logQueueFullInfo(MessageTree tree) {
if (m_statistics != null) {
Overflowed(tree);
}
int count = m_errors.incrementAndGet();
if (count % 1000 == 0 || count == 1) {
("Message queue is full in tcp socket nder! Count: " + count);
}
tree = null;
}
// 如果队列不为空,则插⼊到上⼀节点之后
private void addTransactionChild(Message message, Transaction transaction) {
long treePeriod = trimToHour(Message().getTimestamp());
long messagePeriod = Timestamp() - 10 * 1000L); // 10 conds extra time allowed if (treePeriod < messagePeriod || m_length >= MaxMessageLength()) {
uncateAndFlush(this, Timestamp());
}
transaction.addChild(message);
m_length++;