评论删除后,数据将无法恢复
Let’s get something out of the way quickly: Hadoop is NOT a database. It is NOT a library. In reality, there is NO single product called Hadoop. Hadoop is made up of stand-alone modules such as a distributed file system called HDFS, a distributed database named HBASE, a library for parallel processing of large distributed datasets called MapReduce, and the list goes on. An analogy would be Microsoft Office. There is no application called Microsoft Office. It’s the name given to a suite of desktop applications like Word, Excel, etc.
In this post we will focus on the Hadoop Distributed File System (HDFS) and MapReduce. These two are core Hadoop modules and are widely used. Together (HDFS + MapReduce) form a very powerful framework for distributed batch processing – I want you to remember this statement by heart.
HDFS and MapReduce form a framework for distributed batch processing.
Hadoop’s true power lies in its ability to scale to hundreds or thousands of nodes to distribute large amounts of work across a set of machines to be performed in parallel.
Hadoop modules are built upon a Distributed File System appropriately named Hadoop Distributed File System (HDFS). The most famous `distributed` file system in existence today is the Network File System or NFS. HDFS is different from NFS on many levels, especially with regards to scalability.
Note: The design of HDFS is based on Google File System (GFS) described in this paper.
HDFS is based on a master/slave architecture and the design requires that a single master node keeps track of all the files in the file system. This is called NameNode. A NameNode stores only the meta-data about the files present on the file system: it doesn’t store the actual data. The data is stored on DataNodes. Files are stored on the HDFS in blocks which are typically 64 MB in size.
NameNode versus DataNode: A NameNode manages the HDFS’s namespace and regulates access to files and directories. It distributes data blocks to DataNodes and stores this mapping information. DataNodes are responsible for storing data and serving read and write requests from clients.
Let us consider an example, suppose we are storing a 131 MB file on the HDFS. The file will be stored in three blocks on the HDFS (64 + 64 + 3). NameNode will distribute the three blocks to DataNodes and keep track of the mapping. To read a file stored on the HDFS, the client must have HDFS installed. The client HDFS will obtain the file information from NameNode such as the number of file blocks and their location and then download these blocks directly from DataNodes.
Fore more information on the HDFS, I recommend the following link: http://hadoop.apache.org/docs/hdfs/current/hdfs_design.html
MapReduce (MR) is a framework or a library for writing applications to process large amounts of distributed data in parallel. Like HDFS, it’s architecture is also based on master/slave model. The master is a special node which coordinates activity between several worker nodes.
Here’s how it works: The master receives the input data which is to be processed. The input data is split into smaller chunks and all these chunks are distributed to and processed in parallel on multiple worker nodes. This is called the Map Phase. The workers send their results back to the master node which aggregates these results to produce the sum total. This is called the Reduce phase.
Note: I’ve oversimplified the inner workings of MapReduce. The Map phase output is written to the local disk of workers which is partitioned in as many regions as there are Reduce workers available. This locations is then passed to the master which passes it onto Reduce workers. I recommend this paper on MapReduce by Google, which actually introduced it.
MR applications at least must provide the following three input parameters:
You must remember this always: All input and output in MR is based on <key, value> pairs. It is everywhere, the input to the Map function, its output, the input to the Reduce function and its output are all in <key, value> pairs.
To wrap our minds around MapReduce, let us consider an example. Suppose you have just become the Development Lead for a company which specializes in reading seismic data which measure earthquake magnitudes around the world. There are thousands of such sensors deployed around the world recording earthquake data in log files, the following format:
Each entry consists of lot of details. The items in red are the magnitude of the earthquake and the name of region where the reading was taken, respectively.
There are millions of such log files available. In addition, logs also contain erroneous entries such as when the sensor became faulty and went in an infinite loop dumping thousands of lines a second. The input data is stored on 50 machines and all the log files combined are about 10 Terabytes in size. Your Director of Software asks you to perform a simple task: for every region where sensors were deployed, find out the highest magnitude of the earthquake recorded.
Now, let’s think about this for a moment. This tasks sounds rather simple. You could use your trusted linux tools like `grep`, `sort`, or even `awk` to accomplish this if the logfiles were available on a single computer. But they are not – they are scattered across 50 computers. Processing data on each computer manually and combing results will be too inefficient (for a Lead Developer, that is).
This is the kind of problem where you can use Hadoop. Let us see how you would do it:
/** * The `Mapper` function. It receives a line of input from the file, * extracts `region name` and `earthquake magnitude` from it, and outputs * the `region name` and `magnitude` in <key, value> manner. * @param key - The line offset in the file - ignored. * @param value - This is the line itself. * @param context - Provides access to the OutputCollector and Reporter. * @throws IOException * @throws InterruptedException */ @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split(",", 12); // Ignore invalid lines if (line.length != 12) { System.out.println("- " + line.length); return; } // The output `key` is the name of the region String outputKey = line[11]; // The output `value` is the magnitude of the earthquake double outputValue = Double.parseDouble(line[8]); // Record the output in the Context object context.write(new Text(outputKey), new DoubleWritable(outputValue)); }
/** * The `Reducer` function. Iterates through all earthquake magnitudes for a * region to find the maximum value. The output is the `region name` and the * `maximum value of the magnitude`. * @param key - The name of the region * @param values - Iterator over earthquake magnitudes in the region * @param context - Used for collecting output * @throws IOException * @throws InterruptedException */ @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // Standard algorithm for finding the max value double maxMagnitude = Double.MIN_VALUE; for (DoubleWritable value : values) { maxMagnitude = Math.max(maxMagnitude, value.get()); } context.write(key, new DoubleWritable(maxMagnitude)); }
You can find the complete source code for this application on GitHub repository I have created.
评论删除后,数据将无法恢复
评论(4)