翻译于 2014/09/14 09:46
1 人 顶 此译文
The Map-Reduce model has become a popular way for programmers to describe and implement parallel programs. These custom map-reduce programs are often used to process a large data set in parallel. This post shows how to implement Map-Reduce Programs within the Oracle database using Parallel Pipelined Table Functions and parallel operations.
Pipelined Table Functions were introduced in Oracle 9i as a way of embedding procedural logic within a data flow. At a logical level, a Table Function is a function that can appear in the FROM clause and thus functions as a table returning a stream of rows. Table Functions can also take a stream of rows as an input. Since Pipelined Table Functions are embedded in the data flow they allow data to be 'streamed' to a SQL statement avoiding intermediate materialization in most cases. Additionally, Pipelined Table Functions can be parallelized.
在程序员开发并行程序时,Map-Reduce模式正变得流行起来。这些map-reduce程序通常来并行处理大量数据。本文来演示如何在Oracle数据库上,通过使用Parallel Pipelined Table函数及并行操作,来实现Map-Reduce程序。(译者注:table()是oracle中一个函数,可以把定义为Pipelined的function的返回结果进行SQL查询)
Pipelined Table函数是在Oracle 9i引入的,作为能在数据流中嵌入过程逻辑代码方法。从逻辑上说,一个Table函数是可以出现在from子句中,该函数就像数据表一样的返回多行数据。Table函数同样也可以接收多行数据做为输入参数。大多数情况下,Pipelined Table函数可以嵌入到一个数据流中,它让数据“流”进SQL语句中,从而避免增加一个物理层(直译:具体化的中介)。再次说明,Pipelined Table函数是可以并行处理的。
To parallelize a Table Function the programmer specifies a key to repartition the input data. Table Functions can be implemented natively in PL/SQL, Java, and C. You can find more information and examples about Table Functions and the functionality mentioned above at the following URL:
http://download.oracle.com/docs/cd/B10501_01/appdev.920/a96624/08_subs.htm#19677
Pipelined Table Functions have been used by customers for several releases and are a core part of Oracle's extensibility infrastructure. Both external users and Oracle Development have used Table Functions as an efficient and easy way of extending the database kernel.
Examples of table functions being used within Oracle are the implementation of a number of features in Oracle Spatial and Oracle Warehouse Builder. Oracle Spatial usages include spatial joins and several spatial data mining operations. Oracle Warehouse Builder allows end users to leverage Table Functions to parallelize procedural logic in data flows such as the Match-Merge algorithm and other row-by-row processing algorithms.
为了并行Table函数,开发人员必须指定指定一个键对输入数据进行重定位。Table函数可以直接在PL/SQL, Java, and 中实现,你可以查到关于Table函数的更多信息、例子以及上面提到的那些功能,网址是:http://download.oracle.com/docs/cd/B10501_01/appdev.920/a96624/08_subs.htm#19677
在多个发行版中,Pipelined Table函数已经被用户使用,并成为Oracle可扩展基础功能的一个核心部分。无论是外部用户,还是Oracle的开发部门,Table函数成为一个有效的、简单的扩充数据库核心功能的方法。
类似Table函数的功能已经在Oracle内使用,并且是Oracle Spatial 和Oracle Warehouse Builder许多特色功能的实现方式。Oracle Spatial(空间数据处理系统)使用它涉及spatial joins 和许多 spatial data的数据挖掘的操作。Oracle Warehouse Builder让让用户使用Table 函数对数据流进行并行处理的逻辑,比如Match-Merge 算法和其它逐行计算的算法。
All examples are available in plain text in this file: omr.sql.
To illustrate the usage of parallelism, and Pipelined Table Functions to write a Map-Reduce algorithm inside the Oracle database, we describe how to implement the canonical map-reduce example: a word count. For those unfamiliar with the example, the goal of word count is to return all distinct words within a set of documents as well as a count of how often this word occurs within this set of documents.
The procedural code in this word count example is implemented in PL/SQL but, as said before, Oracle allows you to pick your language of choice to implement said procedural logic.
所有的例子都在omr.sql文件中。
为了说明并行的使用方法以及用Pipelined Table函数在Oracle数据库内写一个Map-Reduce算法, 我们实现一个最经典的map-reduce例子--单词计数。单词计数是实现返回一组文档中所有不重复单词出现的个数的程序,也可以说是查询单词出现频率功能。
示例代码是用PL/SQL实现,但如前所说,Oracle允许你选择其它语言来实现这个过程逻辑。
We will be looking at a set of documents, these documents can be either files outside of the database, or they can be stored as Secure Files/CLOB columns within the database. Within this table our documents are stored, effectively reflecting a file system.
In this case we are going to create an table within the database using the following definition:
CREATE TABLE documents (a CLOB)
LOB(a) STORE AS SECUREFILE(TABLESPACE sysaux);
Each row in this table corresponds to a single document. We populate this table with a very simple corpus resulting in 3 documents with the text shown here:
INSERT INTO documents VALUES ('abc def');
INSERT INTO documents VALUES ('def ghi');
INSERT INTO documents VALUES ('ghi jkl');
commit;
The end result of both the map function and the reduce table function are going to live in a package, keeping the code nice and tidy. To show the steps to be taken we will take snippets from the overall package and show those in the section to follow. The actual package will contain a set of types, which are required for the code to work. All code was tested on Oracle Database 11g (11.1.0.6).
Download the full code here.
The following figures show the package being deployed.
1、配置环境
我们将在一组文档中查找,这些文档可以是数据库之外的文件中,也可以保存在Secure Files/CLOB的数据库内的列中。在我们这个存文档的表也相当于一个文件系统。
在本例中,我们将在数据库内创建一个表,用下面的声明:
CREATE TABLE documents (a CLOB)
LOB(a) STORE AS SECUREFILE(TABLESPACE sysaux);
该表的每一行都对应一个文档,我们在用下面的语句,这个表中插入三个简单的文档:
INSERT INTO documents VALUES ('abc def');
INSERT INTO documents VALUES ('def ghi');
INSERT INTO documents VALUES ('ghi jkl');
commit;
map代码和reduce代码都将包含在一个包中,保持代码的整洁。为了展示这些步骤,我将把这些代码段从包中拿出来,在下面各小节展示。在实际的包中,还必须要定义几个types。所有代码均在Oracle Database 11g (11.1.0.6)测试通过。
First we need to create a generic function to "map" (as in map-reduce) or tokenize a document. Note that the goal is not to show the best map function, but how this will work in principle in the database. This specific map function is very basic and better implementations may be found elsewhere.
You can use the aggregation engine to get the results and only use the mapper. A query and a result would look like this:
The aggregation is done in SQL, no reducer required.
Of course, you could write your own aggregation Table Function to count the occurrences of words in a document. That is what you would do if you were writing the map-reduce program without leveraging the Oracle aggregation engine as we did before. This aggregation Table Function is the reducer of the map-reduce program unit.
The Table Function specifies that it's input be partitioned by word and could (to use the Oracle execution engine's sort) ask for the data to ordered or clustered by word. We show a sample count program in this post to complete the example.
2、创建Mapper and the Reducer
首先我们要创建一个普通的map函数来给文档做标记。记住,我们不是要展示这个map函数有多么好,而是要表达这在数据库工作的原理。这个map函数非常基本,其它地方也可能有更好的实现。
你可以使用数据库的聚合引擎及仅map函数来得到最终结果。一个请求和结果看起来是: SQL完成聚合操作,不需要reducer的函数。
当然,你也可以写自己的聚合的Table函数来计算单词的出现次数。如果你不用oracle的聚合引擎的话,你必须自己来写map-reduce的程序。这个聚合Table函数就相当于map-reduce中的reducer部分。
Table函数要求输入必须按单词分组,需要将数据排序(用oracle 执行引擎的sort)或单词分簇。我们展示一个简单的记数程序在本文中。
After you have completed both the mapper and the reducer you are ready to do the full map-reduce in the database. Running a query using this Table Function will give us a parallel workload on external documents, doing what the typical map-reduce programs do.
Oracle Table Functions are a proven technology, used by many internal and external parties to extend Oracle Database 11g.
Oracle Table Functions are a robust scalable way to implement Map-Reduce within the Oracle database and leverage the scalability of the Oracle Parallel Execution framework. Using this in combination with SQL provides an efficient and simple mechanism for database developers to develop Map-Reduce functionality within the environment they understand and with the languages they know.
Download the code here: omr.sql. For the example, I ran this in OE (as you can see on the SQL screens). No special privileges required.
第3步 ,数据库中进行map-reduce
当你写完mapper and the reducer后,你就可以在数据库中进行map-reduce.执行一个包含Table函数的请求,就能对外部文档进行并行的按照map-reduce的代码执行。
Oracle Table函数是经得起验证的技术,并在Oracle的内外广泛使用的扩展Oracle11g的技术。
Oracle Table函数是稳定并可扩展的方法,在Oracle数据库内实现Map-Reduce,并且能够利用Oracle并行执行框架的扩展性。在SQL中利用它,能让数据库开发人员用自己熟悉的环境和语言,为他们提供一个有效的、简单的机制去实现Map-Reduce方法。
你可以下载orm.sql,没有什么特殊的权限需求。
附:orm.sql代码
CREATE TABLE documents (a CLOB) LOB(a) STORE AS SECUREFILE(TABLESPACE sysaux); INSERT INTO documents VALUES ('abc def'); INSERT INTO documents VALUES ('def ghi'); INSERT INTO documents VALUES ('ghi jkl'); commit; create or replace package oracle_map_reduce is type word_t is record (word varchar2(4000)); type words_t is table of word_t; type word_cur_t is ref cursor return word_t; type wordcnt_t is record (word varchar2(4000), count number); type wordcnts_t is table of wordcnt_t; function mapper(doc in sys_refcursor, sep in varchar2) return words_t pipelined parallel_enable (partition doc by any); function reducer(in_cur in word_cur_t) return wordcnts_t pipelined parallel_enable (partition in_cur by hash(word)) cluster in_cur by (word); end; / create or replace package body oracle_map_reduce is -- -- The mapper is a simple tokenizer that tokenizes the input documents -- and emits individual words -- function mapper(doc in sys_refcursor, sep in varchar2) return words_t pipelined parallel_enable (partition doc by any) is document clob; istart number; pos number; len number; word_rec word_t; begin -- for every document loop fetch doc into document; exit when doc%notfound; istart := 1; len := length(document); -- For every word within a document while (istart <= len) loop pos := instr(document, sep, istart); if (pos = 0) then word_rec.word := substr(document, istart); pipe row (word_rec); istart := len + 1; else word_rec.word := substr(document, istart, pos - istart); pipe row (word_rec); istart := pos + 1; end if; end loop; -- end loop for a single document end loop; -- end loop for all documents return; end mapper; -- -- The reducer emits words and the number of times they're seen -- function reducer(in_cur in word_cur_t) return wordcnts_t pipelined parallel_enable (partition in_cur by hash(word)) cluster in_cur by (word) is word_count wordcnt_t; next varchar2(4000); begin word_count.count := 0; loop fetch in_cur into next; exit when in_cur%notfound; if (word_count.word is null) then word_count.word := next; word_count.count := word_count.count + 1; elsif (next <> word_count.word) then pipe row (word_count); word_count.word := next; word_count.count := 1; else word_count.count := word_count.count + 1; end if; end loop; if word_count.count <> 0 then pipe row (word_count); end if; return; end reducer; end; / -- Select statements select word, count(*) from ( select value(map_result).word word from table(oracle_map_reduce.mapper(cursor(select a from documents), ' ')) map_result) group by (word); select * from table(oracle_map_reduce.reducer( cursor(select value(map_result).word word from table(oracle_map_reduce.mapper( cursor(select a from documents), ' ')) map_result)));