【原创】⼤数据基础之Hive(1)HiveSQL执⾏过程之代码流程hive 2.1
hive执⾏sql有两种⽅式:
执⾏hive命令,⼜细分为hive -e,hive -f,hive交互式;genetically
执⾏beeline命令,beeline会连接远程thrift rver;
下⾯分别看这些场景下sql是怎样被执⾏的:
1 hive命令
启动命令
启动hive客户端命令
$HIVE_HOME/bin/hive
等价于
$HIVE_HOME/bin/hive --rvice cli
会调⽤
$HIVE_HOME/bin/ext/cli.sh
实际启动类为:org.apache.hadoop.hive.cli.CliDriver
代码解析
org.apache.hadoop.hive.cli.CliDriver
public static void main(String[] args) throws Exception {
int ret = new CliDriver().run(args);
}
public int run(String[] args) throws Exception {
...
/
/ execute cli driver work
try {
return executeDriver(ss, conf, oproc);
} finally {
ss.clo();
}
...
private int executeDriver(CliSessionState ss, HiveConf conf, OptionsProcessor oproc)
throws Exception {
...
if (ss.execString != null) {
int cmdProcessStatus = cli.String);
return cmdProcessStatus;
}
mollis...今年考研时间
try {
if (ss.fileName != null) {
return cli.processFile(ss.fileName);
}
} catch (FileNotFoundException e) {
return 3;
}
...
while ((line = adLine(curPrompt + "> ")) != null) {
if (!prefix.equals("")) {
prefix += '\n';
}
if (im().startsWith("--")) {
continue;
}
if (im().endsWith(";") && !im().endsWith("\\;")) {
line = prefix + line;
ret = cli.processLine(line, true);
...
public int processFile(String fileName) throws IOException {
...
rc = processReader(bufferReader);
...
public int processReader(BufferedReader r) throws IOException {
String line;
StringBuilder qsb = new StringBuilder();
while ((line = r.readLine()) != null) {
/
/ Skipping through comments
if (! line.startsWith("--")) {
qsb.append(line + "\n");
}
}
return (String()));
}
public int processLine(String line, boolean allowInterrupting) {
...
网络教育报名ret = processCmd(command);
...
public int processCmd(String cmd) {
...
CommandProcessor proc = (tokens, (HiveConf) conf);
ret = processLocalCmd(cmd, proc, ss);
...
institutes
int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) {
int tryCount = 0;
boolean needRetry;
int ret = 0;
do {
try {
needRetry = fal;
if (proc != null) {
if (proc instanceof Driver) {
Driver qp = (Driver) proc;
PrintStream out = ss.out;
long start = System.currentTimeMillis();
if (ss.getIsVerbo()) {
out.println(cmd);
}
qp.tTryCount(tryCount);
ret = qp.run(cmd).getResponCode();
.
..
while (qp.getResults(res)) {
for (String r : res) {
out.println(r);
}
...
CliDriver.main会调⽤run,run会调⽤executeDriver,在executeDriver中对应上边提到的三种情况:
⼀种是hive -e执⾏sql,此时ss.execString⾮空,执⾏完进程退出;
⼀种是hive -f执⾏sql⽂件,此时ss.fileName⾮空,执⾏完进程退出;
⼀种是hive交互式执⾏sql,此时会不断读取adLine,然后执⾏失去了并输出结果;
上述三种情况最终都会调⽤processLine,processLine会调⽤processLocalCmd,在processLocalCm
d中会先调⽤到Driver.run执⾏sql,执⾏完之后再调⽤Results输出结果,这也是Driver最重要的两个接⼝,Driver实现后边再看;
2 beeline命令the departed
beeline需要连接到hive thrift rver,先看hive thrift rver如何启动:
hive thrift rver
启动命令
启动hive thrift rver命令
$HIVE_HOME/bin/hiverver2
等价于
$HIVE_HOME/bin/hive --rvice hiverver2
会调⽤
$HIVE_HOME/bin/ext/hiverver2.sh
实际启动类为:org.apache.hive.rvice.rver.HiveServer2
启动过程
HiveServer2.main
startHiveServer2
init
addService-CLIService,ThriftBinaryCLIService
start
Service.start
CLIService.start
英语小故事视频ThriftBinaryCLIService.start
TThreadPoolServer.rve
类结构:【接⼝或⽗类->⼦类】
TServer->TThreadPoolServer
TProcessorFactory->SQLPlainProcessorFactory
TProcessor->TSetIpAddressProcessor
ThriftCLIService->ThriftBinaryCLIService
CLIService
HiveSession
代码解析
org.apache.hive.rvice.cli.thrift.ThriftBinaryCLIService
public ThriftBinaryCLIService(CLIService cliService, Runnable oomHook) {
super(cliService, SimpleName());
}
ThriftBinaryCLIService是⼀个核⼼类,其中会实际启动thrift rver,同时包装⼀个CLIService,请求最后都会调⽤底层的CLIService处理,下⾯看CLIService代码:
org.apache.hive.rvice.cli.CLIService
@Override
public OperationHandle executeStatement(SessionHandle ssionHandle, String statement,
Map<String, String> confOverlay) throws HiveSQLException {
poniOperationHandle opHandle =
LOG.debug(ssionHandle + ": executeStatement()");
情人节快乐英文return opHandle;
}
@Override
public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
long maxRows, FetchType fetchType) throws HiveSQLException {
RowSet rowSet = OperationManager().getOperation(opHandle)
.getParentSession().fetchResults(opHandle, orientation, maxRows, fetchType);
LOG.debug(opHandle + ": fetchResults()");
return rowSet;
}
CLIService最重要的两个接⼝,⼀个是executeStatement,⼀个是fetchResults,两个接⼝都会转发给HiveSession处理,下⾯看HiveSession实现类代码:
org.apache.hive.rvice.cli.ssion.HiveSessionImpl
@Override
public OperationHandle executeStatement(String statement, Map<String, String> confOverlay) throws HiveSQLException {
return executeStatementInternal(statement, confOverlay, fal, 0);
}
private OperationHandle executeStatementInternal(String statement,
Map<String, String> confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException {
acquire(true, true);
ExecuteStatementOperation operation = null;
OperationHandle opHandle = null;
try {
operation = getOperationManager().newExecuteStatementOperation(getSession(), statement,
confOverlay, runAsync, queryTimeout);
opHandle = Handle();
operation.run();
...
@Override
public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
long maxRows, FetchType fetchType) throws HiveSQLException {
acquire(true, fal);
try {
if (fetchType == FetchType.QUERY_OUTPUT) {
OperationNextRowSet(opHandle, orientation, maxRows);
}
OperationLogRowSet(opHandle, orientation, maxRows, ssionConf);
} finally {
relea(true, fal);
}
}
可见
public RowSet getOperationNextRowSet(OperationHandle opHandle,
FetchOrientation orientation, long maxRows)
throws HiveSQLException {
return getOperation(opHandle).getNextRowSet(orientation, maxRows);
}
下⾯写详细看Operation的run和getOperationNextRowSet:
org.apache.hive.rvice.cli.operation.Operation
public void run() throws HiveSQLException {
beforeRun();
try {
Metrics metrics = Instance();
if (metrics != null) {
try {
metrics.incrementCounter(MetricsConstant.OPEN_OPERATIONS);
} catch (Exception e) {
LOG.warn("Error Reporting open operation to Metrics system", e);
}
}
runInternal();
} finally {
afterRun();
}
}
public RowSet getNextRowSet() throws HiveSQLException {
return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS);
}
Operation是⼀个抽象类,
run会调⽤抽象⽅法runInternal
getNextRowSet会调⽤抽象⽅法getNextRowSet
下⾯会看到这两个抽象⽅法在⼦类中的实现,最终会依赖Driver的run和getResults;
1)先看runInternal在⼦类HiveCommandOperation中被实现:
org.apache.hive.rvice.cli.operation.HiveCommandOperation
@Override
public void runInternal() throws HiveSQLException {
tState(OperationState.RUNNING);
try {
String command = getStatement().trim();
String[] tokens = statement.split("\\s");
String commandArgs = command.substring(tokens[0].length()).trim();
CommandProcessorRespon respon = commandProcessor.run(commandArgs);
...
这⾥会调⽤CommandProcessor.run,实际会调⽤Driver.run(Driver是CommandProcessor的实现类);
2)再看getNextRowSet在⼦类SQLOperation中被实现:
org.apache.hive.rvice.cli.operation.SQLOperation
public RowSet getNextRowSet(FetchOrientation orientation, long maxRows)
throws HiveSQLException {
...
driver.tMaxRows((int) maxRows);
if (Results(convey)) {
return decode(convey, rowSet);
}
...
这⾥会调⽤Results;
3 Driver
通过上⾯的代码分析发现⽆论是hive命令⾏执⾏还是beeline连接thrift rver执⾏,最终都会依赖Driver,Driver最核⼼的两个接⼝:
run
getResults
代码解析
org.apache.hadoop.hive.ql.Driver
@Override
public CommandProcessorRespon run(String command)
throws CommandNeedRetryException {
return run(command, fal);
}
public CommandProcessorRespon run(String command, boolean alreadyCompiled)
throws CommandNeedRetryException {
CommandProcessorRespon cpr = runInternal(command, alreadyCompiled);
...
private CommandProcessorRespon runInternal(String command, boolean alreadyCompiled)
throws CommandNeedRetryException {
...
ret = compileInternal(command, true);
...
ret = execute(true);
...
private int compileInternal(String command, boolean deferClo) {
...
ret = compile(command, true, deferClo);
...
public int compile(String command, boolean retTaskIds, boolean deferClo) {
谷歌翻译搞笑...
plan = new QueryPlan(queryStr, m, StartTime(PerfLogger.DRIVER_RUN), queryId,