Hadoop & Примери за Mapreduce: Създайте първа програма в Java

Съдържание:

Anonim

В този урок ще се научите да използвате Hadoop с примери MapReduce. Използваните входни данни са SalesJan2009.csv. Той съдържа информация, свързана с продажбите, като наименование на продукта, цена, начин на плащане, град, държава на клиента и т.н. Целта е да разберете броя на продуктите, продадени във всяка държава.

В този урок ще научите -

  • Първа програма Hadoop MapReduce
  • Обяснение на клас SalesMapper
  • Обяснение на класа SalesCountryReducer
  • Обяснение на класа SalesCountryDriver

Първа програма Hadoop MapReduce

Сега в този урок MapReduce ще създадем първата си програма Java MapReduce:

Данни от SalesJan2009

Уверете се, че имате инсталиран Hadoop. Преди да започнете с действителния процес, сменете потребителя на „hduser“ (идентификатор, използван по време на конфигурирането на Hadoop, можете да превключите към идентификатора на потребителя, използван по време на вашата конфигурация на Hadoop за програмиране).

su - hduser_

Етап 1)

Създайте нова директория с име MapReduceTutorial като shwon в долния пример MapReduce

sudo mkdir MapReduceTutorial

Дайте разрешения

sudo chmod -R 777 MapReduceTutorial

SalesMapper.java

package SalesCountry;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesMapper extends MapReduceBase implements Mapper  {private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, OutputCollector  output, Reporter reporter) throws IOException {String valueString = value.toString();String[] SingleCountryData = valueString.split(",");output.collect(new Text(SingleCountryData[7]), one);}}

SalesCountryReducer.java

package SalesCountry;import java.io.IOException;import java.util.*;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesCountryReducer extends MapReduceBase implements Reducer {public void reduce(Text t_key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {Text key = t_key;int frequencyForCountry = 0;while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}output.collect(key, new IntWritable(frequencyForCountry));}}

SalesCountryDriver.java

package SalesCountry;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class SalesCountryDriver {public static void main(String[] args) {JobClient my_client = new JobClient();// Create a configuration object for the jobJobConf job_conf = new JobConf(SalesCountryDriver.class);// Set a name of the Jobjob_conf.setJobName("SalePerCountry");// Specify data type of output key and valuejob_conf.setOutputKeyClass(Text.class);job_conf.setOutputValueClass(IntWritable.class);// Specify names of Mapper and Reducer Classjob_conf.setMapperClass(SalesCountry.SalesMapper.class);job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);// Specify formats of the data type of Input and outputjob_conf.setInputFormat(TextInputFormat.class);job_conf.setOutputFormat(TextOutputFormat.class);// Set input and output directories using command line arguments,//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.FileInputFormat.setInputPaths(job_conf, new Path(args[0]));FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));my_client.setConf(job_conf);try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}}}

Изтеглете файлове тук

Проверете разрешенията за файлове на всички тези файлове

и ако липсват разрешения за четене, дайте същото-

Стъпка 2)

Експортирайте пътя на класа, както е показано в примера по-долу Hadoop

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Стъпка 3)

Компилирайте Java файлове (тези файлове присъстват в директорията Final-MapReduceHandsOn ). Неговите класови файлове ще бъдат поставени в директорията на пакета

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Това предупреждение може безопасно да бъде игнорирано.

Тази компилация ще създаде директория в текуща директория, наречена с име на пакет, посочено в изходния файл на java (т.е. SalesCountry в нашия случай) и ще постави всички компилирани файлове на класа в него.

Стъпка 4)

Създайте нов файл Manifest.txt

sudo gedit Manifest.txt

добавете следните редове към него,

Main-Class: SalesCountry.SalesCountryDriver

SalesCountry.SalesCountryDriver е името на основния клас. Моля, обърнете внимание, че трябва да натиснете клавиша Enter в края на този ред.

Стъпка 5)

Създайте Jar файл

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Проверете дали файлът на jar е създаден

Стъпка 6)

Стартирайте Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Стъпка 7)

Копирайте файла SalesJan2009.csv в ~ / inputMapReduce

Сега използвайте командата по-долу, за да копирате ~ / inputMapReduce в HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Можем спокойно да игнорираме това предупреждение.

Проверете дали файлът всъщност е копиран или не.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Стъпка 8)

Изпълнете заданието MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Това ще създаде изходна директория с име mapreduce_output_sales на HDFS. Съдържанието на тази директория ще бъде файл, съдържащ продажбите на продукти за всяка държава.

Стъпка 9)

Резултатът може да се види чрез командния интерфейс като,

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Резултатите могат да се видят и чрез уеб интерфейс като-

Отворете r в уеб браузър.

Сега изберете „Преглед на файловата система“ и отидете до / mapreduce_output_sales

Отворете част-r-00000

Обяснение на клас SalesMapper

В този раздел ще разберем изпълнението на класа SalesMapper .

1. Започваме с посочване на име на пакет за нашия клас. SalesCountry е име на нашия пакет. Моля, обърнете внимание, че изходът от компилацията, SalesMapper.class ще влезе в директория, наречена от това име на пакета: SalesCountry .

Следвайки това, ние импортираме библиотечни пакети.

По-долу моментна снимка показва изпълнение на клас SalesMapper-

Обяснение на примерен код:

1. Определение на класа SalesMapper-

публичен клас SalesMapper разширява MapReduceBase реализира Mapper {

Всеки клас на mapper трябва да бъде разширен от клас MapReduceBase и той трябва да реализира интерфейс Mapper .

2. Дефиниране на функция „карта“ -

public void map(LongWritable key,Text value,OutputCollector output,Reporter reporter) throws IOException

Основната част от класа на Mapper е метод 'map ()' , който приема четири аргумента.

При всяко извикване на метод 'map ()' се предава двойка ключ-стойност ( 'ключ' и 'стойност' в този код).

Методът 'map ()' започва с разделяне на входния текст, който се получава като аргумент. Той използва маркера, за да раздели тези редове на думи.

String valueString = value.toString();String[] SingleCountryData = valueString.split(",");

Тук „,“ се използва като разделител.

След това се формира двойка, като се използва запис при 7-ми индекс на масив „SingleCountryData“ и стойност „1“ .

output.collect (нов текст (SingleCountryData [7]), един);

Избираме запис на 7-ми индекс, защото се нуждаем от данни за страната и той се намира на 7-ми индекс в масива „SingleCountryData“ .

Моля, обърнете внимание, че нашите входни данни са във формата по-долу (където Държавата е на 7 -ми индекс, с 0 като начален индекс) -

Дата на транзакция, Продукт, Цена, Тип на плащане, Име, Град, Щат, Държава , Създаден акаунт, Последно_влизане, Географска ширина, Географска дължина

Изходът на mapper отново е двойка ключ-стойност, която се извежда с помощта на метода 'collect ()' на 'OutputCollector' .

Обяснение на класа SalesCountryReducer

В този раздел ще разберем изпълнението на класа SalesCountryReducer .

1. Започваме с посочване на име на пакета за нашия клас. SalesCountry е име на външен пакет. Моля, обърнете внимание, че изходът от компилацията, SalesCountryReducer.class ще влезе в директория, наречена от това име на пакета: SalesCountry .

Следвайки това, ние импортираме библиотечни пакети.

По-долу моментна снимка показва изпълнение на клас SalesCountryReducer-

Обяснение на кода:

1. Определение на класа SalesCountryReducer-

публичен клас SalesCountryReducer разширява MapReduceBase прилага Редуктор {

Тук първите два типа данни „Text“ и „IntWritable“ са тип данни за въвеждане на ключ-стойност към редуктора.

Изходът на mapper е под формата на , . Този изход на mapper става вход за редуктора. Така че, за да се приведе в съответствие с неговия тип данни, тук се използват Text и IntWritable като тип данни.

Последните два типа данни „Text“ и „IntWritable“ са тип данни на изхода, генерирани от редуктор под формата на двойка ключ-стойност.

Всеки клас на редуктор трябва да бъде разширен от клас MapReduceBase и той трябва да реализира интерфейс на редуктор .

2. Дефиниране на функция за намаляване

public void reduce( Text t_key,Iterator values,OutputCollector output,Reporter reporter) throws IOException {

Входът за метода намаляване () е ключ със списък от множество стойности.

Например в нашия случай ще бъде-

<Обединени арабски емирства, 1>, <Обединени арабски емирства, 1>, <Обединени арабски емирства, 1>, <обединени арабски емирства, 1>, <обединени арабски емирства, 1>, <обединени арабски емирства, 1>.

Това се дава на редуктора като <Обединени арабски емирства, {1,1,1,1,1,1}>

Така че, за да се приемат аргументи от тази форма, се използват първите два типа данни, а именно Text и Iterator . Текстът е тип данни за ключ, а Iterator е тип данни за списък със стойности за този ключ.

Следващият аргумент е от тип OutputCollector , който събира изхода на фазата на редуктора.

Методът намаление () започва с копиране на стойността на ключа и инициализиране на броя на честотите до 0.

Текстов ключ = t_key; int честотаForCountry = 0;

След това, използвайки цикъл ' while' , ние прелистваме списъка със стойности, свързани с ключа, и изчисляваме крайната честота, като сумираме всички стойности.

 while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}

Сега ние изтласкваме резултата към изходния колектор под формата на ключ и получено отчитане на честотата .

По-долу кодът прави това-

output.collect(key, new IntWritable(frequencyForCountry));

Обяснение на класа SalesCountryDriver

В този раздел ще разберем изпълнението на класа SalesCountryDriver

1. Започваме с посочване на име на пакет за нашия клас. SalesCountry е име на външен пакет. Моля, обърнете внимание, че изходът от компилацията, SalesCountryDriver.class ще влезе в директория, наречена с това име на пакета: SalesCountry .

Ето ред, указващ името на пакета, последван от код за импортиране на библиотечни пакети.

2. Дефинирайте клас на драйвер, който ще създаде нова клиентска работа, обект на конфигурация и ще рекламира класове Mapper и Reducer.

Класът на водача е отговорен за настройването на нашата работа MapReduce да работи в Hadoop. В този клас ние посочваме име на заданието, тип данни за вход / изход и имена на класове за картографиране и редуктор .

3. В кодовия фрагмент по-долу задаваме входни и изходни директории, които се използват съответно за консумиране на набор от данни за въвеждане и извеждане на изходни данни.

arg [0] и arg [1] са аргументите от командния ред, предадени с команда, дадена в MapReduce практически, т.е.,

$ HADOOP_HOME / bin / hadoop jar ProductSalePerCountry.jar / inputMapReduce / mapreduce_output_sales

4. Задействайте нашата работа

Под изпълнението на кода на MapReduce работа-

try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}