Presto源码解析—从SQL到AST抽象语法树
前⾯⼀篇讲了从客户端以Cli⽅式和JDBC⽅式提交SQL到Coordinator的原理,这⼀篇我们来看⼀下SQL提交到Coordinator是如何进⾏预处理,是如何从SQL变成AST抽象语法树的。
源码时序图如下:
接下来我们来详细看⼀下整个流程中⽐较重要的类和⽅法(部分细节会暂时忽略):
QueuedStatementResource:负责处理客户端的Restful请求,包括接收查询,查询执⾏状态等,关键接⼝有:
URL请求⽅式作⽤
/v1/statement POST提交查询
queued/{queryId}/{slug}/{token}Get查询执⾏状态
第⼀次请求URL为/v1/statement,对应QueuedStatementResource.postStatement⽅法,在postStatement⾥⾯会处理请求的HTTP参数,构造SessionContext(⽤于后续构造Session对象),并返回nextUri(客户端下⼀次请求的URL)。⽐较重要的⼀个是在构造SessionContext的时候会对请求头中X-Presto-Prepared-Statement的参数进⾏⼀次Parr,也就是语法解析,和本⽂中后⾯要讲的Parr⼀样,会进⼊这块逻辑的是Execute类型的SQL。
preparedStatements =parPreparedStatementsHeaders(headers);
第⼀次提交/v1/statement请求时不会⽴刻提交查询,当通过第⼀次返回的nextUri进⾏第⼆次请求是会
触发提交查询,具体的逻辑在Query.waitForDispatched⽅法⾥⾯调⽤ateQuery进⾏创建。
适合生日唱的歌private ListenableFuture<?>waitForDispatched()
{
// if query query submission has not finished, wait for it to finish
synchronized(this){
if(querySubmissionFuture ==null){
querySubmissionFuture = ateQuery(queryId, slug, ssionContext, query);
}
if(!querySubmissionFuture.isDone()){
全真岩return querySubmissionFuture;
}
}
// otherwi, wait for the query to finish
return dispatchManager.waitForDispatched(queryId);
}
当SQL执⾏完成后结果的获取是通过ExecutingStatementResource的/v1/statement/executing/{queryId}/{slug}/{token}接⼝进⾏获取的,ExecutingStatementResource只有在真正提交查询之后才会和客户端进⾏交互。
// ExecutingStatementResource
@GET
@Path("{queryId}/{slug}/{token}")
@Produces(MediaType.APPLICATION_JSON)
public void getQueryResults(
@PathParam("queryId") QueryId queryId,陈奕兰陵王
@PathParam("slug") String slug,
@PathParam("token")long token,
@QueryParam("maxWait") Duration maxWait,
@QueryParam("targetResultSize") DataSize targetResultSize,
@HeaderParam(X_FORWARDED_PROTO) String proto,
@Context UriInfo uriInfo,
@Suspended AsyncRespon asyncRespon)
{
Query query =getQuery(queryId, slug, token);
if(isNullOrEmpty(proto)){
proto = RequestUri().getScheme();
}
DataSize targetResultSizeToU = Optional.ofNullable(targetResultSize).map(size -> Ordering.natural().min(size, MAX_TARGET_RESULT_SIZE)) .orEl(defaultTargetResultSize);
asyncQueryResults(query, token, maxWait, targetResultSizeToU, uriInfo, proto, asyncRespon);
}
通过上⾯的Query.waitForDispatched⽅法进⼊到了Dispatcher的createQuery⽅法,具体的逻辑在通过异步线程丢在createQueryInternal⽅法⾥⾯了,createQueryInternal的逻辑包括了词法分析、语法解析,语义分析,选择resource group,⽣成执⾏计划等等(当然具体的实现还是在各个类⾥⾯,只是类似于⾯向过程语⾔⼀样串起来了)。
/**
* Creates and registers a dispatch query with the query tracker. This method will never fail to register a query with the query
* tracker. If an error occurs while creating a dispatch query, a failed dispatch will be created and registered.
*/
private<C>void createQueryInternal(QueryId queryId, Slug slug, SessionContext ssionContext, String query, ResourceGroupManager<C> resourceG roupManager)
{
Session ssion =null;
PreparedQuery preparedQuery =null;
try{
if(query.length()> maxQueryLength){
int queryLength = query.length();
query = query.substring(0, maxQueryLength);
throw new PrestoException(QUERY_TEXT_TOO_LARGE,format("Query text length (%s) exceeds the maximum length (%s)", queryLength, max QueryLength));
}
// decode ssion
ssion = ateSession(queryId, ssionContext);
// prepare query
preparedQuery = queryPreparer.prepareQuery(ssion, query);
// lect resource group
Optional<String> queryType =Statement().getClass()).map(Enum::name);
SelectionContext<C> lectionContext = resourceGroupManager.lectGroup(new SelectionCriteria(
Optional.Source()),
queryType));
// apply system default ssion properties (does not override ur t properties)
ssion = wSessionWithDefaultProperties(ssion, queryType, ResourceGroupId());
// mark existing transaction as active
transactionManager.activateTransaction(ssion,Statement()), accessControl);
DispatchQuery dispatchQuery = ateDispatchQuery(
ssion,
query,
preparedQuery,
slug,
boolean queryAdded =queryCreated(dispatchQuery);
if(queryAdded &&!dispatchQuery.isDone()){
submitQuerySync(dispatchQuery, lectionContext);
}
}
小班美工区
catch(Throwable throwable){
// creation must never fail, so register a failed query in this ca
if(ssion ==null){
ssion = Session.builder(new SessionPropertyManager())
.tQueryId(queryId)
.Identity())
.Source())
.build();
}
Optional<String> preparedSql = Optional.ofNullable(preparedQuery).flatMap(PreparedQuery::getPrepareSql);
DispatchQuery failedDispatchQuery = ateFailedDispatchQuery(ssion, query, preparedSql, pty(), thro wable);
queryCreated(failedDispatchQuery);
}
}
可以看到createQueryInternal⽅法的开始对SQL的长度有⼀定的限制,接下来便是通过前⾯得到的SessionContext和QueryId构造Session对象,Session对象包括queryId,catelog,schema,SystemProperties,ConnectorProperties,ClientTags等等。
public final class Session
{
private final QueryId queryId;
private final Optional<TransactionId> transactionId;
private final boolean clientTransactionSupport;
private final Identity identity;
private final Optional<String> source;
private final Optional<String> catalog;
private final Optional<String> schema;
private final SqlPath path;
private final TimeZoneKey timeZoneKey;
private final Locale locale;
瑞士的英文private final Optional<String> remoteUrAddress;
private final Optional<String> urAgent;
private final Optional<String> clientInfo;
private final Optional<String> traceToken;
private final Optional<String> labelInfo;
private Set<String> clientTags;
private final Set<String> clientCapabilities;
private final ResourceEstimates resourceEstimates;
电脑运行快捷键
private final long startTime;
private Map<String, String> systemProperties;
private Map<CatalogName, Map<String, String>> connectorProperties;
private final Map<String, Map<String, String>> unprocesdCatalogProperties;
private final SessionPropertyManager ssionPropertyManager;
private final Map<String, String> preparedStatements;
}
再下来就是将SQL解析成ATS的过程了,此处表⾯只有⼀⾏代码,但是包括了词法分析和语法分析,⽣成AST树这么⼏个关键过程。返回的PreparedQuery⾥⾯是⼀个Statement对象(解析之后的AST树),不同类型的SQL对应了不同的Statement的⼦类,如lect查询是Query、create table是CreateTable等等,后续的语义分析都将基于这个AST树进⾏visit。
// prepare query
preparedQuery = queryPreparer.prepareQuery(ssion, query);
SQL传过来时候通过反射调⽤Par进⾏解析
private Node invokeParr(String name, String sql, Function<SqlBaParr, ParrRuleContext> parFunction, ParsingOptions parsingOptions) {
try{
SqlBaLexer lexer =new SqlBaLexer(new CaInnsitiveStream(CharStreams.fromString(sql)));
CommonTokenStream tokenStream =new CommonTokenStream(lexer);
SqlBaParr parr =new SqlBaParr(tokenStream);
// Override the default error strategy to not attempt inrting or deleting a token.
// Otherwi, it mess up error reporting
parr.tErrorHandler(new DefaultErrorStrategy()
{
@Override
public Token recoverInline(Parr recognizer)
throws RecognitionException
{
if(nextTokensContext ==null){
throw new InputMismatchException(recognizer);
}
el{
throw new InputMismatchException(recognizer, nextTokensState, nextTokensContext);
}
}
});
parr.addParListener(new PostProcessor(Arrays.RuleNames()), parr));
lexer.addErrorListener(LEXER_ERROR_LISTENER);
if(enhancedErrorHandlerEnabled){
parr.addErrorListener(PARSER_ERROR_HANDLER);
}
el{题竹石牧牛
parr.addErrorListener(LEXER_ERROR_LISTENER);
}
ParrRuleContext tree;
try{
// first, try parsing with potentially faster SLL mode
tree = parFunction.apply(parr);
}
catch(ParCancellationException ex){
// if we fail, par with LL mode
tokenStream.ek(0);// rewind input stream
<();
tree = parFunction.apply(parr);新员工培训
}
return new AstBuilder(parsingOptions).visit(tree);
}
catch(StackOverflowError e){
throw new ParsingException(name +" is too large (stack overflow while parsing)");
}
}
通过Antlr进⾏词法分析,将⼀个个字符转化为⼀个个token
SqlBaLexer lexer =new SqlBaLexer(new CaInnsitiveStream(CharStreams.fromString(sql)));
CommonTokenStream tokenStream =new CommonTokenStream(lexer);