当前位置: 首页 > news >正文

MaxCompute实践之路(三) -- Java对接MaxCompute

一. jdbc方式对接maxcompute

  1. 创建完成项目后,创建以下类,根据注释,调好对应参数即可
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;public class Main {private static final String DRIVER_NAME = "com.aliyun.odps.jdbc.OdpsDriver";public static void main(String[] args) throws SQLException {try {Class.forName(DRIVER_NAME);} catch (ClassNotFoundException e) {e.printStackTrace();System.exit(1);}Connection conn = DriverManager.getConnection("jdbc:odps:<maxcompute endpoint>?project=<maxcompute project>","aliyun accessId", "aliyun accessKey");ResultSet rs;Statement stmt = conn.createStatement();String sql = "SELECT * FROM JDBC_TEST";stmt.executeQuery(sql);ResultSet rset = stmt.getResultSet();while (rset.next()) {System.out.println(String.valueOf(rset.getInt(1)) + "\t" + rset.getString(2));}}
}

项目结构如下
在这里插入图片描述

  1. 查看官方文档中的 生态对接-jdbc参考-使用说明。
    在上面创建的module项目的java包中创建测试类,参考官网例子即可。
    官网链接:https://help.aliyun.com/document_detail/177015.html
    在这里插入图片描述

二. java jdk中的SQLTask方式对接maxcompute

  1. 过程与jdbc的相同,只需重新创建以下实体类即可
import java.util.List;
import com.aliyun.odps.Instance;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.task.SQLTask;
public class testSql {private static final String accessId = "";private static final String accessKey = "";       private static final String endPoint = "http://service.odps.aliyun.com/api";private static final String project = "";private static final String sql = "select category from iris;";public static voidmain(String[] args) {Account account = new AliyunAccount(accessId, accessKey);Odps odps = new Odps(account);odps.setEndpoint(endPoint);odps.setDefaultProject(project);Instance i;try {i = SQLTask.run(odps, sql);i.waitForSuccess();List<Record> records = SQLTask.getResult(i);for(Record r:records){System.out.println(r.get(0).toString());}} catch (OdpsException e) {e.printStackTrace();}}
}
  1. 官网链接:
    https://help.aliyun.com/document_detail/34614.html?spm=a2c4g.11186623.6.670.3b0c4849C06sIW

三. 流式数据通道SDK(Streaming Tunnel)方式对接maxcompute

MaxCompute流式数据通道服务提供了以流式的方式把数据写入MaxCompute的能力,使用与原批量数据通道服务不同的一套全新的API及后端服务。流式服务在API上极大简化了分布式服务的开发成本,同时解决了批量数据通道在高并发、高QPS(Queries-per-second)场景下的性能瓶颈。

  1. 对应该方式实体类。
import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
public class StreamUploadSample {// 阿里云账号AccessKey ID。private static String accessId = "<your_access_id>";// 阿里云账号AccessKey Secret。private static String accessKey = "<your_access_key>";// MaxCompute项目的Endpoint信息,详情请参见Endpoint。private static String odpsEndpoint = "<endpoint>";// MaxCompute项目的Tunnel Endpoint信息,详情请参见Endpoint。private static String tunnelEndpoint = "<tunnel_endpoint>";// MaxCompute项目的名称。private static String project = "<your_project>";// MaxCompute项目中的表名称。private static String table = "<your_table_name>";// MaxCompute项目中的表的分区信息。private static String partition = "<your_partition_spec>";public static void main(String args[]) {Account account = new AliyunAccount(accessId, accessKey);Odps odps = new Odps(account);odps.setEndpoint(odpsEndpoint);odps.setDefaultProject(project);try {TableTunnel tunnel = new TableTunnel(odps);// 默认情况下,不需要设置Tunnel Endpoint,可以通过Endpoint自动路由。// 只有少数场景(比如路由的Endpoint网络不通)需要设置Tunnel Endpoint。// 可以通过以下接口设置Tunnel Endpoint。// tunnel.setEndpoint(tunnelEndpoint);PartitionSpec partitionSpec = new PartitionSpec(partition);TableTunnel.StreamUploadSession uploadSession = tunnel.createStreamUploadSession(project,table, partitionSpec);TableSchema schema = uploadSession.getSchema();TableTunnel.StreamRecordPack pack = uploadSession.newRecordPack();Record record = uploadSession.newRecord();for (int i = 0; i < schema.getColumns().size(); i++) {Column column = schema.getColumn(i);switch (column.getType()) {case BIGINT:record.setBigint(i, 1L);break;case BOOLEAN:record.setBoolean(i, true);break;case DATETIME:record.setDatetime(i, new Date());break;case DOUBLE:record.setDouble(i, 0.0);break;case STRING:record.setString(i, "sample");break;default:throw new RuntimeException("Unknown column type: "+ column.getType());}}for (int i = 0; i < 10; i++) {pack.append(record);}int retry = 0;while (retry < 3) {try {// flush成功表示数据写入成功,写入成功后数据立即可见。// flush成功后pack对象可以复用,避免频繁申请内存导致内存回收。// flush失败可以直接重试。// flush失败后pack对象不可重用,需要重新创建新的StreamRecordPack对象。String traceId = pack.flush();System.out.println("flush success:" + traceId);break;} catch (IOException e) {retry++;e.printStackTrace();Thread.sleep(500);}}System.out.println("upload success!");} catch (TunnelException e) {e.printStackTrace();} catch (IOException | InterruptedException e) {e.printStackTrace();}}
}
  1. 使用经验:
    该方式可以在创建的pick中插入多个Record进行提交,例如插入10000个。经过测试pick中1条数据与10000条数据上传到库里的时间基本没差距。
    应对高并发情况,可将创建链接的代码抽离成工具类,使其不用每次提交都创建链接,可提高上传速度。
    创建表时,加入分区字段,可提高查询速度。
  2. 官网链接:
    https://help.aliyun.com/document_detail/198167.html

http://www.taodudu.cc/news/show-6310052.html

相关文章:

  • 【大数据】MaxCompute概述
  • maxcompute-入门-环境安装
  • 阿里云大数据之MaxComputer简介
  • Math.max
  • MaxCompute SQL示例解析
  • 什么是 MaxCompute
  • MaxCompute快速入门
  • MaxCompute_概念/使用
  • 阿里云 - MaxCompute研究
  • Maxcompute 小记1
  • maxcompute-入门-数据下载
  • 关于MaxCompute的基本了解
  • 大数据-玩转数据-MaxCompute窗口函数
  • 阿里云产品 系列(一)MaxCompute简介与使用--上
  • 阿里云短信api发送异常
  • 对接阿里云短信平台报错
  • 发送短信验证码到手机(阿里大于平台) java
  • 阿里大鱼短信发送接口开发
  • 关于调用阿里大鱼发送手机验证码短信同一账号发送多次后失败
  • php 阿里云短信服务及阿里大鱼实现短信验证码的发送
  • linux内存与扇区,磁盘的基础知识——扇区、柱面、磁道、族
  • C++ 文件描述符
  • 应用进程和内核的关系
  • 离散数学__第2章命题逻辑的推理理论__析取范式和合取范式
  • 离散数学 求命题公式的主析取范式和主合取范式
  • [离散数学]命题逻辑P_5:命题公式分类和等价
  • 【二】头歌平台实验-离散数学逻辑与推理
  • 离散数学笔记_第一章:逻辑和证明(1)
  • 离散数学笔记_第一章:逻辑和证明(2 )
  • [离散数学]命题逻辑P_7:范式