MaxCompute MapReduce

客户端下载地址:https://help.aliyun.com/document\_detail/27971.html

比如有一张很大的表。表里有个String字段记录的是用空格分割开单词。最后需要统计所有记录中,每个单词出现的次数是多少。那整体的计算流程是

解决方案:

生产及周期调度

第一步:大于10M的resources通过MaxCompute CLI客户端上传,

如果Reduce后面还需要做进一步的Reduce计算,可以用拓展MapReduce模型(简称MRR)。MRR其实就是Reduce阶段结束后,不直接输出结果,而是再次经过Shuffle后接另外一个Reduce。

第三步:瘦身Jar,因为Dataworks执行MR作业的时候,一定要本地执行,所以保留个main就可以;

jar -resources mapreduce-examples.jar -classpath mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.WordCount wc_in wc_out

​本文为云栖社区原创内容,未经允许不得转载。返回搜狐,查看更多

Shuffle-分配Reduce

图片 1

setPartitionColumns(String[]
cols)设置作业的分区列,定义了数据分配到Reducer的分配策略。

客户端配置AK、EndPoint:https://help.aliyun.com/document\_detail/27804.html

对比前面的快速开始,可以看到除去数据准备阶段,和MR相关的,有资源的上传(add
jar步骤)和jar命令启动MR作业两步。

作者:隐林

Reduce阶段

第二步:目前通过MaxCompute
CLI上传的资源,在Dataworks左侧资源列表是找不到的,只能通过list
resources查看确认资源;

客户端先解析-classpath参数,找到main方法相关的jar包的位置

责任编辑:

不支持反射/自定义类加载器(所以不支持一些第三方包)

原标题:通过简单瘦身,解决Dataworks 10M文件限制问题

在odpscmd里执行add jar命令:

add jar C:\test_mr\test_mr.jar -f;//添加资源

具体的插件的安装方法步骤可以参考文档,本文不在赘言。

摘要:
用户在DataWorks上执行MapReduce作业的时候,文件大于10M的JAR和资源文件不能上传到Dataworks,导致无法使用调度去定期执行MapReduce作业。
解决方案: jar -resources test_mr.

setNumReduceTasks(int n)设置 Reducer 任务数,默认为 Mapper 任务数的
1/4。如果是Map
only的任务,需要设置成0。可以参考这里

list resources;//查看资源

客户端发起add jar/add
file等资源操作,把在客户端的机器(比如我测试的时候是从我的笔记本)上,运行任务涉及的资源文件传到服务器上。这样后面运行任务的时候,服务器上才能有对应的代码和文件可以用。如果以前已经传过了,这一步可以省略。

用户在DataWorks上执行MapReduce作业的时候,文件大于10M的JAR和资源文件不能上传到Dataworks,导致无法使用调度去定期执行MapReduce作业。

-resources告诉服务器,在运行任务的时候,需要用到的资源有哪些。

通过上述方法,我们可以在Dataworks上跑大于10M的MR作业。

Map阶段:每个Mapper针对每条数据,解析里面的字符串,用空格切开字符串,得到一组单词。针对其中每个单词,写一条记录

setCombinerOptimizeEnable(boolean
isCombineOpt)设置是否对Combiner进行优化。

将代码拷贝到IDE里,编译打包成mapreduce-examples.jar

拓展MapReduce

wc_in wc_out是传给main方法的参数,通过解析main方法传入参数String[]
args获得这个参数

JobConf定义了这个任务的细节,还是这个图,解释一下JobConf的其他设置项的用法。

线上运行

图片 2

setMapOutputValueSchema(Column[] schema)设置 Mapper 输出到 Reducer 的
Value 行属性。和上个设置一起定义了Mapper到Reducer的数据格式。

数据输出

这个命令发起作业。MapReduce的任务是运行在MaxCompute集群上的,客户端需要通过这个命令把任务运行相关的信息告诉集群。

Reduce阶段:Reducer拿前面已经排序好的输入,相同的单词的所有输入进入同一个Redue循环,在循环里,做个数的累加。

OutputUtils.addTable(TableInfo table, JobConf
conf)设置了输出的表。多路输入输出可以参考这里

任务提交

Shuffle-合并排序

com.aliyun.odps.mapred.open.example.WordCount wc_in wc_out`

odpscmd  -u accessId  -p  accessKey  –project=testproject
–endpoint=http://service.odps.aliyun.com/api  -e “jar -resources
aaa.jar -classpath ./aaa.jar com.XXX.A”

如果在odpscmd的配置文件里已经配置好了,那只需要写-e的部分。

资源表和文件可以让一些小表/小文件可以方便被读取。鉴于读取数据的限制需要小于64次,一般是在setup里读取后缓存起来,具体的例子可以参考这里

前言

安全沙箱

最后通过JobClient.runJob(job);客户端往服务器发起了这个MapReduce作业。

运行环境

Q:如何实现M->R->M->R这种逻辑呢

reduce(){

输出阶段:输出Reduce的计算结果,写入到表里或者返回给客户端。

大数据开发套件可以配置Shell作业。可以在Shell作业里参考上面的方法用odpscmd
-e/-f来调度MapReduce作业。

在odpscmd里执行

setOutputOverwrite(boolean
isOverwrite)设置对输出表是否进行覆盖。类似SQL里的Insert into/overwrite
Talbe的区别。

产品限制

MapReduce

A:在Reduce代码里直接嵌套上Map的逻辑就可以了,把第二个M的工作在前一个R里完成,而不是作为计算引擎调度层面上的一个单独步骤,比如

setOutputGroupingColumns(String[]
cols)数据在Reducer里排序好了后,是哪些数据进入到同一个reduce方法的,就是看这里的设置。一般来说,设置的和setPartitionColumns(String[]
cols)一样。可以看到二次排序的用法。

根据com.aliyun.odps.mapred.open.example.WordCount,找到main方法所在类的路径和名字

Shuffle阶段-分配Reducer:把Mapper输出的单词分发给Reducer。Reducer拿到数据后,再做一次排序。因为Reducer拿到的数据已经在Mapper里已经是排序过的了,所以这里的排序只是针对排序过的数据做合并排序。

setReducerClass(Class theClass)设置Reducer使用的Java类。

任务提交

-f和-e一样,只是把命令写到文件里,然后用odpscmd -f
xxx.sql引用这个文件,那这个文件里的多个指令都会被执行。

定时调度