从实战中了解数据开发全流程——DataWorksOpenAPI实战简介:DataWorks作为飞天⼤数据平台操作系统,历经11年发展,形成了涵盖数据集成、数据开发、数据治理、数据服务的⼀站式 简介:
⼤数据开发治理平台。很多企业⽤户在使⽤产品的过程中希望他们的本地服务能够和阿⾥云上的DataWorks服务进⾏交互,从⽽提升企业⼤数据处理的效率,减少⼈⼯操作和运维⼯作,降低数据风险和企业成本,现在DataWorks开放OpenAPI能⼒满⾜企业的定制化需求。
DataWorks作为飞天⼤数据平台操作系统,历经11年发展,形成了涵盖数据集成、数据开发、数据治理、数据服务的⼀站式⼤数据开发治理平台。很多企业⽤户在使⽤产品的过程中希望他们的本地服务能够和阿⾥云上的DataWorks服务进⾏交互,从⽽提升企业⼤数据处理的效率,减少⼈⼯操作和运维⼯作,降低数据风险和企业成本,现在DataWorks开放OpenAPI能⼒满⾜企业的定制化需求。
DataWorks OpenAPI涵盖租户、元数据、数据开发、运维中⼼、数据质量、数据服务等DataWorks核⼼能⼒,企业版和旗舰版分别赠送100万次/⽉、1000万次/⽉的免费调⽤额度。
关于Dataworks OpenAPI开通要求和开放地域可查阅
限DataWorks企业版及以上使⽤
开通7天试⽤请使⽤钉钉扫码联系
实战简介
我们假设这样⼀个简单的场景,开发⼈员想把RDS库⾥⾯的数据同步到⼀张MaxCompute分区表中,然后在⾃建系统的页⾯上展⽰经过数据分析后的报表数据,那么如何通过DataWorks OpenAPI去完成整个链路的实现呢?
实战准备
⼀、引⼊D ataWor ks Op e nA PI SD K
这⼀部分可参考 ,除了java语⾔,我们还⽀持Python,PHP,C#,Go 等语⾔⽀持。默认情况下我们不需要显式去指定DataWorks OpenAPI 的EndPoint,但是如果aliyun-java-sdk-core版本偏低的情况下可能会找不到DataWorks OpenAPI的Endpoint,这时候可在不升级版本的情况下通过使⽤如下代码进⾏请求。
IClientProfile profile = Profile("cn-shanghai", "XXX", "xxx");
DefaultProfile.addEndpoint("cn-shanghai","dataworks-public", "");
IAcsClient client = new DefaultAcsClient(profile);客家酿豆腐
如上代码是显式地指定了DataWorks OpenAPI的EndPoint,dataworks.${regionId}.这样的域名格式在公⽹环境下可访问,但是有些⽤户需要在VPC环境下调⽤OpenAPI,那么则需要把域名dataworks.${regionId}. 变更成 dataworks-
vpc.${regionId}.,这样在VPC⽹络环境下即使不能访问公⽹也能请求到DataWorks OpenAPI。
如果您不清楚regionId(地域ID)的概念,可参考。
⼆、了解D ataWor ks Op e nA PI⽂档
详细阅读DataWorks OpenAPI⽂档对开发⾮常有帮助,做API开发时如果对参数的约束不太理解时可参考DataWorks OpenAPI⽂档,⾥⾯对每个出⼊参、参数⽰例、错误码描述都有详细的解释。
实战步骤
步骤⼀:创建RD S数据源
集成租户API可创建引擎、创建数据源、查看项⽬空间等信息。在我们这个业务场景中,MaxCompute分区表存在于MaxCompute引擎中,我们在DataWorks管控台创建完MaxCompute⼯作
空间后会⾃动创建好MaxCompute引擎的数据源,所以我们只需要使⽤【CreateConnection】创建好RDS数据源即可:
CreateConnectionRequest createRequest = new CreateConnectionRequest();
createRequest.tProjectId(-1L);
createRequest.tName("TEST_CONNECTION");
createRequest.tConnectionType("MYSQL");
createRequest.tEnvType(1);
createRequest.tContent("{\"password\":\"12345\"}");
Long connectionId;
try {
CreateConnectionRespon createRespon = AcsRespon(createRequest);
Asrt.Data());
connectionId = Data();
UpdateConnectionRequest updateRequest = new UpdateConnectionRequest();
updateRequest.tConnectionId(connectionId);
updateRequest.tDescription("1");
UpdateConnectionRespon acsRespon = AcsRespon(updateRequest);
Asrt.Data());
DeleteConnectionRequest deleteRequest = new DeleteConnectionRequest();
deleteRequest.tConnectionId(connectionId);
DeleteConnectionRespon deleteRespon = AcsRespon(deleteRequest);
Asrt.Data());
} catch (ClientException e) {
e.printStackTrace();
Asrt.fail();
}
UpdateConnection和DeleteConnection可分别修改和删除数据源信息。另外对项⽬空间的成员进⾏管理的API集是CreateProjectMember、DeleteProjectMember、RemoveProjectMemberFromRole、ListProjectMembers。
步骤⼆:表的创建
集成DataWorks元数据OpenAPI我们能管理引擎侧的表信息,通过DataWorks管控台和租户API我们完成了MaxCompute引擎和RDS数据源的创建⼯作,下⼀步需要完成表的创建,可通过元数据的【CreateTable】完成:
IClientProfile profile = Profile("cn-shanghai", "XXX", "xxx");
IAcsClient client = new DefaultAcsClient(profile);
CreateTableRequest request = new CreateTableRequest();
request.tTableName("table_test");
request.tColumnss(new ArrayList<>());
request.tEndpoint("endpoint");
CreateTableRespon respon = AcsRespon(request);
String nextTaskId = TaskInfo().getNextTaskId();
System.out.println(nextTaskId);
关于表管理的API集是CreateTable、UpdateTable、DeleteTable、GetMetaDBTableList、CheckMetaTable等,除了可对表进⾏管理,元数据API还能对表元数据、表主题进⾏管理,更多详情可参考DataWorks OpenAPI⽂档。
步骤三:任务开发和发布调度
集成数据开发API可管理⽂件,并对⽂件进⾏提交和发布后⽣成周期任务,周期任务会定时调度运⾏,创建不同类型的⽂件是根据FileType 这个字段决定的,⽬前我们已⽀持⾮常多的FileType,通过运维中⼼的API【ListProgramTypeCount】可获取所有已⽀持的系统节点以及⾃定义节点。
IClientProfile profile = Profile("cn-shanghai", "XXX", "xxx");
IAcsClient client = new DefaultAcsClient(profile);
CreateFileRequest createFileRequest = new CreateFileRequest();
createFileRequest.tFileType(DefaultNodeType.Code());
createFileRequest.tInputList(projectIdentifier+"_root");
createFileRequest.tContent(content);
createFileRequest.tFileName("create_file_" + caId);
createFileRequest.tFileFolderPath("业务流程/POP接⼝测试/MaxCompute/test_folder_3");
万圣节糖createFileRequest.tFileDescription("create file " + caId);
createFileRequest.tRerunMode("ALL_ALLOWED");
CreateFileRespon createFileRespon = getAcsRespon(createFileRequest);
content字段存储SQL脚本、Shell脚本、数据集成的脚本代码,数据集成的脚本格式可参考。
使⽤【CreateFile】创建完脚本后,如需修改可使⽤UpdateFile、DeleteFile进⾏管理。和页⾯上的操作流程⼀致的是完成⽂件开发后得提交和发布⽂件才会⽣成周期实例,这⾥要注意的是需要轮询SubmitFile返回的 DeploymentId,只有当GetDeployment返回的状态是完成时(status.finished())才表⽰部署成功。
IClientProfile profile = Profile("cn-shanghai", "XXX", "xxx");
IAcsClient client = new DefaultAcsClient(profile);
SubmitFileRequest request = new SubmitFileRequest();
request.tFileId(fileId);
request.tComment("submit file");
SubmitFileRespon submitFileRespon = getAcsRespon(submitFileRequest);
//检查提交结果
DeploymentStatus status = null;
GetDeploymentRespon.Data.Deployment deployment = null;
int retryTimes = 0;
while (retryTimes < 6) {
GetDeploymentRequest getDeploymentRequest = Data());
GetDeploymentRespon getDeploymentRespon = getAcsRespon(getDeploymentRequest);
LOGGER.info("Deployment status got - RequestId[{}]", RequestId());
Asrt.Data());
deployment = Data().getDeployment();
Asrt.asrtNotNull(deployment);
Asrt.Name().equalsIgnoreCa(baId));
Asrt.CreatorId().equalsIgnoreCa(baId));
Asrt.HandlerId().equalsIgnoreCa(baId));
Asrt.asrtEquals((int) FromEnvironment(), DatastudioEnvironment.LOCAL.value());
Asrt.asrtEquals((int) ToEnvironment(), DatastudioEnvironment.DEV.value());
Asrt.asrtTrue(StringUtils.ErrorMessage()));
status = Enums.find(DeploymentStatus.class, Status());
Asrt.asrtNotNull(status);
if (status.finished()) {
LOGGER.info("Deployment finished - FinalStatus[{}]", status);
break;
}
LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
retryTimes++;
}
如果是在标准模式的项⽬下开发,提交完成后,还需要发布⽂件才能最终提交到调度成为周期任务。发布⽂件使⽤DeployFile,和提交⽂件⼀样,也需要使⽤GetDeployment轮询部署状态。
DeployFileRequest request = new DeployFileRequest();
request.tFileId(fileId);
request.tComment("deploy file");
DeployFileRespon deployFileRespon = getAcsRespon(deployFileRequest);
//检查发布部署结果
DeploymentStatus status = null;
GetDeploymentRespon.Data.Deployment deployment = null;
int retryTimes = 0;
while (retryTimes < 6) {
GetDeploymentRequest getDeploymentRequest = getDeploymentRequest(deploymentId);
GetDeploymentRespon getDeploymentRespon = getAcsRespon(getDeploymentRequest);
LOGGER.info("Deployment status got - RequestId[{}]", RequestId());
Asrt.Data());
deployment = Data().getDeployment();
Asrt.asrtNotNull(deployment);
LOGGER.info("Deployment information got - DeploymentId[{}] - DeploymentDetail[{}]",
deploymentId, new Gson().toJson(deployment));
Asrt.CreatorId().equalsIgnoreCa(baId));
Asrt.asrtTrue(StringUtils.ErrorMessage()));
status = Enums.find(DeploymentStatus.class, Status());
Asrt.asrtNotNull(status);
if (status.finished()) {
LOGGER.info("Deployment finished - FinalStatus[{}]", status);
break;
}
LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
retryTimes++;
立风念什么
}张万国
数据开发API除了可对⽂件管理外,还能管理⽂件夹、资源、函数,更多详情可参考DataWorks OpenAPI⽂档。
步骤四:配置运维监控
通过API完成周期任务的⽣产之后,会在DataWorks平台每天⽣成调度实例被定时调度运⾏,使⽤运维中⼼API可对周期任务和周期实例进⾏运维操作,可通过GetNode、GetInstance、ListInstances等API查看周期任务和周期实例,监控实例运⾏情况。
GetInstanceRequest request = new GetInstanceRequest();
request.tInstanceId(INSTANCE_ID);
request.tProjectEnv(PROJECT_ENV);
try {
GetInstanceRespon respon = AcsRespon(request);
穷人续写400字
Object data = ReturnModelParr.par("getInstanceSuccess", Json(respon));
BizInstanceDto bizInstanceDto = GsonUtils.String(), BizInstanceDto.class);
Asrt.asrtEquals("NOT_RUN", Status().toString());
Asrt.asrtEquals(1590416703313L, ModifyTime().getTime());
Asrt.asrtEquals(INSTANCE_ID, InstanceId());
Asrt.asrtEquals("DAILY", DagType().toString());
Asrt.asrtEquals("kzh", NodeName());
Asrt.asrtEquals("", ParamValues());
Asrt.asrtEquals(1590416703313L, CreateTime().getTime());
Asrt.asrtEquals(1590422400000L, CycTime().getTime());
Asrt.asrtEquals(338450167L, DagId().longValue());
Asrt.asrtEquals(1590336000000L, Bizdate().getTime());
Asrt.asrtEquals(33115L, NodeId().longValue());
} catch (Exception e) {
e.printStackTrace();
Asrt.fail();
}
如果实例运⾏异常可通过RestartInstance、SetSuccessInstance、SuspendInstance、ResumeInstance处理。
使⽤CreateRemind、UpdateRemind等API可创建⾃定义报警规则,确保每天基线顺利产出,⼀旦异常可告警通知到⼈⼯,然后介⼊。
CreateRemindRequest createRemindRequest = new CreateRemindRequest();
createRemindRequest.tRemindName("REMIND_CREATE_TEST");
createRemindRequest.tRemindUnit(PopRemindUnit.NODE.name());
createRemindRequest.tRemindType(RemindType.ERROR.name());
createRemindRequest.tAlertUnit(PopAlertUnit.OTHER.name());
createRemindRequest.tDndEnd("08:00");
createRemindRequest.tNodeIds("-1");
createRemindRequest.tMaxAlertTimes(1);
createRemindRequest.tAlertInterval(1800);
createRemindRequest.tAlertMethods(PopAlertMethod.MAIL.name());
createRemindRequest.tAlertTargets(MosadConstants.POP_UID);
try {
美国条形码
CreateRemindRespon createRespon = AcsRespon(createRemindRequest);
MosadReturnModelParr.par("createRemindTest", Json(createRespon));
Asrt.Data() > 0);
} catch (Exception ex) {
ex.printStackTrace();
return;
}
运维中⼼主要提供周期任务、⼿动业务流程、基线查询、告警配置和查询等相关API,可参考DataWorks OpenAPI⽂档。
步骤五:配置数据质量监控
在这个业务场景中,我们通过前⾯介绍的API已经可以每天定时把数据从RDS同步到MaxCompute的表中了。如果我们担⼼产⽣脏数据或者数据缺失影响到线上业务,那么可通过数据质量API来集成DataWorks数据质量监控能⼒,当表数据产出异常时,可以⽴刻触发给规则订阅⼈。
CreateQualityRuleRequest request = new CreateQualityRuleRequest();
request.tBlockType(0);
request.tComment("test-createTemplateRuleSuccess");
request.tCriticalThreshold("50");
request.tEntityId(entityId);
request.tOperator("abs");
request.tPredictType(0);
request.tProjectName(PROJECT_NAME);小月子餐10天食谱大全
request.tProperty("table_count");
request.tPropertyType("table");
request.tRuleName("createTemplateRuleSuccess");
request.tRuleType(0);
request.tTemplateId(7);
request.tWarningThreshold("10");
try {
CreateQualityRuleRespon respon = AcsRespon(request);
Object data = ReturnModelParr.par("createTemplateRuleSuccess", Json(respon));
Long templateRuleId = Long.String());
Asrt.asrtTrue(templateRuleId > 0);
归田园居
return templateRuleId;
} catch (Exception e) {
e.printStackTrace();
Asrt.asrtFal(true);
return null;
}
CreateQualityRule、GetQualityFollower、CreateQualityRelativeNode等数据质量API集可管理数据质量规则,更多数据质量API可参考DataWorks OpenAPI⽂档。
步骤六:⽣成数据服务A PI
我们通过元数据API完成了表创建,通过数据开发API完成⽂件和周期任务创建,通过数据质量和运维中⼼API配置好了监控规
则,MaxCompute分区表数据亦可顺利产⽣,这时候我们还需要最后⼀个步骤把MaxCompute分区表的数据通过数据服务OpenAPI⽣成⼀个数据服务API向系统提供数据服务。