1)分布式的运算程序往往需要分成至少2个阶段
2)第一个阶段的maptask并发实例,完全并行运行,互不相干
3)第二个阶段的reduce task并发实例互不相干,但是他们的数据依赖于上一个阶段的所有maptask并发实例的输出
4)MapReduce编程模型只能包含一个map阶段和一个reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个mapreduce程序,串行运行
MR程序具体运行步骤如下:
1)在MapReduce程序读取文件的输入目录上存放相应的文件。
2)客户端程序在submit()方法执行前,获取待处理的数据信息,然后根据集群中参数的配置形成一个任务分配规划。
3)客户端提交job.split、jar包、job.xml等文件给yarn,yarn中的resourcemanager启动MRAppMaster。
4)MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程。
5)maptask利用客户指定的inputformat来读取数据,形成输入KV对。
6)maptask将输入KV对传递给客户定义的map()方法,做逻辑运算。
7)map()运算完毕后将KV对收集到maptask缓存。
8)maptask缓存中的KV对按照K分区排序后不断写到磁盘文件。
9)MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据分区。
10)Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算。
11)Reducetask运算完毕后,调用客户指定的outputformat将结果数据输出到外部存储。
新建Maven工程,在pom.xml文件中添加依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
</dependencies>
在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
需求:在给定的文本文件中统计输出每一个单词出现的总次数
输入数据:
hello world
hadoop
spark
hello world
hadoop
spark
hello world
hadoop
spark
代码:
1)定义一个mapper类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* (1)用户自定义的Mapper要继承自己的父类
* (2)Mapper的输入数据是K-V对的形式(K-V的类型可自定义)
* (3)Mapper中的业务逻辑写在map()方法中
* (4)Mapper的输出数据是K-V对的形式(K-V的类型可自定义)
* (5)map()方法(maptask进程)对每一个<K,V>调用一次
*/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* map()方法(maptask进程)对每一个<K,V>调用一次
*
* @param key : 数据的offset
* @param value : 要处理的一行数据
* @param context : 上下文
* @throws IOException
* @throws InterruptedException
*/
IntWritable v = new IntWritable(1);
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、获取一行数据,将一行数据转化为String类型
String line = value.toString();
//2、切割
String[] words = line.split(" ");
//3、循环写出
for (String word : words) {
k.set(word);
context.write(k, v);
}
}
}
2)定义一个reducer类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* (1)用户自定义的Reducer要继承自己的父类
* (2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
* (3)Reducer的业务逻辑写在reduce()方法中
* (4)Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法
*/
public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法
*
* @param key : 单词
* @param values : 单词个数(1)的集合
* @param context : 上下文
* @throws IOException
* @throws InterruptedException
*/
IntWritable v = new IntWritable();
int sum;
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//1、累加
sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
//2、写出
v.set(sum);
context.write(key, v);
}
}
3)定义一个主类,用来描述job并提交job
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 相当于yarn客户端,负责提交MapReduce程序
*/
public class WordcountDriver {
public static void main(String[] args) throws Exception {
// 1 获取配置信息以及封装任务
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 设置jar加载路径
job.setJarByClass(WordcountDriver.class);
// 3 设置map和reduce类
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReduce.class);
// 4 设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置Reduce输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置job数据输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
第一种情况:
设置好job数据输入和输出路径,直接在IDEA上运行代码,得到输出结果即可。
第二种情况:
在IDEA上打包成jar,上传到集群上运行
运行命令:
hadoop jar mapreduce-1.0-SNAPSHOT.jar com.atguigu.wordcount.WordcountDriver /user/atguigu/wc/input/hello.txt /user/atguigu/wc/output
hadoop jar jar包名称 Driver类 数据输入路径 数据输出路径
得到输出结果即可。
需求:去除日志中字段长度小于等于11的日志。
输入数据:
数据源
期望输出数据:
每行字段长度都大于11。(实际通过计数器计数:符合要求的为true,不符合的为false)
需求分析:
需要在Map阶段对输入的数据根据规则进行过滤清洗。
代码:
1)编写LogParseMapper类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class LogParseMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
boolean result = LogParse(line, context);
if (!result) {
return;
}
k.set(line);
context.write(k, NullWritable.get());
}
private boolean LogParse(String line, Context context) {
String[] fields = line.split(" ");
if (fields.length > 11) {
context.getCounter("map", "true").increment(1);
return true;
}
context.getCounter("map", "false").increment(1);
return false;
}
}
2)编写LogParseDriver类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LogParseDriver {
public static void main(String[] args) throws Exception {
//输入输出路径需要根据自己