Data_Skew 清疚 2022-12-02 15:20 96阅读 0赞 ### 文章目录 ### * 1 原理 * 2 示例 * * * MockData * 2.1 重新设计key * * 2.1.1 Mapper * 2.1.2 Mapper2 * 2.1.3 Reducer * 2.2 设计随机分区 * 3 大小表Join * * 3.1 join的实现原理 * 3.2 大表和小表Join * * * * MR实现join # 1 原理 # ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zOTM4MTgzMw_size_16_color_FFFFFF_t_70_pic_center] Shuffle的时候,按照Key进行 同一个key分配到一个reducer task来进行处理 造成两种现象 * 个别Task执行很慢 * OOM(JVM Out Of Memory) , 单个JVM压力过大 失败 # 2 示例 # ### MockData ### package DataSkew; import java.io.*; import java.util.Random; public class MockData { public static void main(String[] args) throws IOException { String[] words = new String[]{ "ifeng","aaa","bbb","ccc","ddd"}; Random random = new Random(); BufferedWriter writer = new BufferedWriter( new OutputStreamWriter( new FileOutputStream(new File("data/Skew.txt"))) ); for(int i = 0; i < 10;i++){ for(int j=0;j <= 10;j++){ writer.write(words[0]); writer.write(","); writer.write(words[0]); writer.write(","); writer.write(words[random.nextInt(words.length)]); writer.write(","); } writer.newLine(); } writer.flush(); writer.close(); } } ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zOTM4MTgzMw_size_16_color_FFFFFF_t_70_pic_center 1] 数据大概这个样子 ,ifeng过多 必然会数据倾斜 ## 2.1 重新设计key ## ### 2.1.1 Mapper ### package DataSkew; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.Random; public class WCMapper extends Mapper<LongWritable, Text,Text, IntWritable> { IntWritable ONE = new IntWritable(1); //定义一个reduce变量 int reduces; //定义一个随机数生成器变量 Random random; @Override protected void setup(Context context) throws IOException, InterruptedException { //通过context.getNumReduceTasks()方法获取到用户配置的reduce个数。 reduces = context.getNumReduceTasks(); //生成一个随机数生成器 random = new Random(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取一行 按照指定分隔符拆分 String[] splits = value.toString().split(","); // 输出 for(String word:splits){ //从reducs的范围中获取一个int类型的随机数赋值给randVal int randVal = random.nextInt(reduces); //重新定义key String newWord = word+"_"+ randVal; context.write(new Text(newWord),ONE); } } } ### 2.1.2 Mapper2 ### package DataSkew; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WCMapper2 extends Mapper<LongWritable,Text,Text,IntWritable> { //处理的数据类似于“1_1 677” @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); // String[] arr = line.split("\t"); //newKey String newKey = arr[0].split("_")[0]; //newVAl int newVal = Integer.parseInt(arr[1]); context.write(new Text(newKey), new IntWritable(newVal)); } } ### 2.1.3 Reducer ### package DataSkew; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable value : values){ count += value.get(); } context.write(key,new IntWritable(count)); } } ## 2.2 设计随机分区 ## 通过重写partitioner方法来进行 , 设置随机分区 ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zOTM4MTgzMw_size_16_color_FFFFFF_t_70_pic_center 2] package DataSkew; import org.apache.hadoop.mapreduce.Partitioner; import java.util.Random; public class Partition extends Partitioner { @Override public int getPartition(Object o, Object o2, int numPartitions) { Random random = new Random(); return random.nextInt(numPartitions); } } package DataSkew; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Dirver2 { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(conf); job.setJobName("Wordcount"); job.setJarByClass(Dirver2.class); job.setMapperClass(WCMapper3.class); job.setReducerClass(WCReducer.class); job.setPartitionerClass(Partition.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path p = new Path("out3"); if (fs.exists(p)) { fs.delete(p, true); } FileInputFormat.addInputPath(job, new Path("data/Skew.txt")); FileOutputFormat.setOutputPath(job, p); job.setNumReduceTasks(2); job.waitForCompletion(true); } } # 3 大小表Join # ## 3.1 join的实现原理 ## select u.name, o.orderid from order o join user u on o.uid = u.uid; 在map的输出value中为不同表的数据打上tag标记,在reduce阶段根据tag判断数据来源。MapReduce的过程如下(这里只是说明最基本的Join的实现,还有其他的实现方式) ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zOTM4MTgzMw_size_16_color_FFFFFF_t_70_pic_center 3] ## 3.2 大表和小表Join ## Join时 按照join的key进行分发 join左边的数据会首先读入内存, 1) 如果左边的Key分散,内存中的数据较小,join任务执行会比较快 2) 如果左边的key比较集中,数据倾斜 所以应当先把小表放到左边 , 先进入内存 **解决方式** 小表放在join左边,先进内存 , 在map端完成reduce 0.7.0之前 ,需要 select /*+ MAPJOIN(b) */ a.key,a.value from a join b on a.key = b.key 0.7.0 之后 : 配置 hive.auto.convert.join hive.auto.convert.join default:(0.7.0-0.10.0)false; (0.11.0-)true hive-default.xml模板中错误地将默认设置为false,在Hive 0.11.0到0.13.1 hive.smalltable.filesize(0.7.0) or hive.mapjoin.smalltable.filesize(0.8.1) default:25000000 默认值为2500000(25M),通过配置该属性来确定使用该优化的表的大小,如果表的大小小于此值就会被加载进内存中 ======================================================================================== create table Users( cid int,city String,name String) row format delimited fields terminated by ','; load data local inpath '/data/Data_Skew/s1.txt' into table Users; create table city( cid int,Geo String) row format delimited fields terminated by ','; load data local inpath '/data/Data_Skew/c1.txt' into table city; ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zOTM4MTgzMw_size_16_color_FFFFFF_t_70_pic_center 4] select /*+MAPJOIN(smallTableTwo)*/ a.cid,b.city,b.name from `default`.city a left join `default`.users b on a.cid = b.cid **Join 分为两种** 1. map端join 效率高 并行度高 不容易数据倾斜 擅长做大小表关联 设置map端join是否开启:set hive.auto.convert.join - true 设置map端join小表的大小: set hive.mapjoin.smalltable.filesize = 25000000 关联操作时,两个表类似mr过程中的FileInputFormat.addInputPath() 获取数据长度,得出大表和小表的大小。最小表 > 25000000 的话 走reduce端的join 1. reduce端join 大表Join大表处理数据倾斜 切分其中一个大表,一对多 切多的表 26MB 1Gb ##### MR实现join ##### package Join; import com.sun.jndi.toolkit.url.Uri; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class TableDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { // String inputDir = args[0]; String inputDir = "D:\\workplace\\com.MR\\data\\s1.txt"; String outputDir = "out"; String cacheDir = "D:\\workplace\\com.MR\\data\\c1.txt"; //Uri uri = Uri.parse(cacheDir); // String outputDir = args[1]; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TableDriver.class); job.setMapperClass(TableMapper.class); job.setReducerClass(TableReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TableBean.class); job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class); job.addCacheFile(new URI(cacheDir)); FileInputFormat.setInputPaths(job, new Path(inputDir)); FileOutputFormat.setOutputPath(job, new Path(outputDir)); System.exit(job.waitForCompletion(true)?0:1); } } package Join; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> { // 文件名称 private String fName; private TableBean tableBean = new TableBean(); private Text k = new Text(); public TableMapper() { super(); } @Override protected void setup(Context context) throws IOException, InterruptedException { // 判断数据来源 FileSplit split = (FileSplit) context.getInputSplit(); fName = split.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] args = value.toString().split(","); tableBean.setCid(args[0]); tableBean.setCity(args[1]); tableBean.setName(args[2]); context.write(k, tableBean); } } package Join; import com.sun.org.apache.bcel.internal.generic.TABLESWITCH; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> { @Override protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException { List<TableBean> orderBeans = new ArrayList<>(); TableBean pdBean = new TableBean(); for (TableBean value: values) { // if(value.getFlag().equals("order")){ TableBean tmp = new TableBean(); // 将value的属性 copy到tmp中 try { BeanUtils.copyProperties(tmp, value); } catch (IllegalAccessException | InvocationTargetException e) { e.printStackTrace(); } orderBeans.add(tmp); //} // else { try { BeanUtils.copyProperties(pdBean, value); } catch (IllegalAccessException | InvocationTargetException e) { e.printStackTrace(); } // } } for(TableBean bean : orderBeans){ bean.setCid(pdBean.getCid()); context.write(bean, NullWritable.get()); } } } [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zOTM4MTgzMw_size_16_color_FFFFFF_t_70_pic_center]: /images/20221123/1bf84d1d0f3848c5bafa925851f64f3e.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zOTM4MTgzMw_size_16_color_FFFFFF_t_70_pic_center 1]: /images/20221123/ff4dc2a1a8d7418bbce992a02531a05b.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zOTM4MTgzMw_size_16_color_FFFFFF_t_70_pic_center 2]: /images/20221123/1af732f2ca55484c94270c3dfbd459ba.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zOTM4MTgzMw_size_16_color_FFFFFF_t_70_pic_center 3]: /images/20221123/3c3d55efb8024ac5bcb09dffdd289275.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zOTM4MTgzMw_size_16_color_FFFFFF_t_70_pic_center 4]: /images/20221123/e1197412ea824a188144e0868a7f7c15.png
还没有评论,来说两句吧...