鼎鼎大名的 Hadoop 和 MapReduce,看完 MapReduce 讓我更想研讀函數式語言啊!(重點錯XD
大數據走向可以從3個Vs開始談起:
- Velocity(速度) : Batch to Read-time
- Volume(資料大小) : GB to TB to PB
- Variety(資料種類) : structure to unstructure
而現在Hadoop系列的成員大底為MapReduce(Hadoop MapReduce), Storage(HDFS), Query(HBase),下面就從最核心的MapReduce看起吧^^
一、MapReduce 簡介
MapReduce 是由 Google 所引進的軟體框架,目的是對電腦叢集上的大型資料集執行分散式運算。
- MapReduce 採 Divide and Conquer 典範
- MapReduce 靈感來自 lisp 和其他函數式程式語言中的古老的 map 和 reduce 操作
- 使用者只要指定 map 與 reduce,其他交由框架處理
MapReduce 優點
- 可以提供高度的可靠性運算
- 可以提供容錯機制
- 可以降低網路傳輸的頻寬需求
- 可以提供負載平衡
1. Map 函數
Map 函數的輸入是「一個」key/value 序對 ,輸出則為另「一組」intermediate key/value 序對組。例如:
[1,2,3,4] – (*2) -> [2,3,6,8]
map 操作是高度並行的,每個元素都是獨立的,創建一個新的列表來保存新的答案,而原始列表沒有被更改。
2. Reduce 函數
此函數負責針對相同的 intermediate key 合併其所有相關聯的 intermediate values。並產生輸出結果的key/value序對。例如:
Reduce – [1,2,3,4] – (sum) -> 10
Reduce 不如 Map 函數那麼並行,但是因為歸納總是有一個簡單的答案,大規模的運算相對獨立。
二、Apache Hadoop
Apache Hadoop 實作 Google 的 MapReduce,提供開放原始碼 MapReduce 架構
- 以 Java 做為原生語言
- 以 Hadoop 分散式檔案系統 (HDFS) 做為資料儲存系統
[備忘] HDFS Shell
# Browsing Your HDFS Folder hadoop fs -ls # Upload Files or Folder to HDFS hadoop fs -put [${HOME}/hadoop/conf] [toHDFS] # Download HDFS Files or Folder to Local hadoop fs -get [toHDFS] [fromHDFS] # Check diff [${HOME}/hadoop/conf] [fromHDFS/] # Remove Files or Folder hadoop fs -rm toHDFS/masters # Browse Files Directly hadoop fs -cat toHDFS/slaves # More Commands hadoop fs
[例題] 在大量的文件集合中,計算每個word的出現次數
1. Map, Reduce 函式
- Map: (offset, line) ➝ [(word1, 1), (word2, 1), ... ]
- Reduce: (word, [1, 1, ...]) ➝ [(word, n)]
WordCount.java (hadoop範例程式)
import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class WordCount { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
2. 執行
hadoop fs -put ${HOME}/hadoop/conf lab11_input hadoop fs -rmr lab11_out2 hadoop jar ${HOME}/hadoop/hadoop-examples-*.jar wordcount lab11_input lab11_out2
hadoop fs -ls lab11_out2 hadoop fs -cat lab11_out2/part-r-00000
References
MapReduce 基本指令操作
http://trac.3du.me/cloud/wiki/NTUOSS160412/Lab5
[ 深入雲計算 ] 初識 MapReduce : 實例之 Inverted Index
http://puremonkey2010.blogspot.tw/2013/11/mapreduce-inverted-index.html