CREATE TABLE sls (salesman VARCHAR2(30), quantity number)
INSERT INTO sls VALUES('Tom', 100);
INSERT INTO sls VALUES('Chu', 200);
INSERT INTO sls VALUES('Tom', 300);
INSERT INTO sls VALUES('Mike', 100);
INSERT INTO sls VALUES('Scott', 300);
INSERT INTO sls VALUES('Tom', 250);
INSERT INTO sls VALUES('Scott', 100);
create or replace package oracle_map_reduce
-- The types we define here is similar to the input files
-- and output files that are used in MR code and are used to
-- store data while we run the actual package.
-- The big advantage is that we do not need to write to disk for
-- intermediate results.
type sales_t is table of sls%rowtype;
type sale_cur_t is ref cursor return sls%rowtype;
type sale_rec_t is record (name varchar2(30), total number);
type total_sales_t is table of sale_rec_t;
-- Next we define the funtions that do the work and make them known
-- to the outside world
-- Note that both mapper and reducer are tablefunctions!
-- Both mapper and reducer are pipelined and executable in parallel
-- the parallel degree is driven from the database side and is not
-- scheduled by the actual program
function mapper(inp_cur in sys_refcursor) return sales_t
pipelined parallel_enable (partition inp_cur by any);
-- the pipelined keyword tells the caller that this function acts as
-- a row source
-- parallel_enable indicates that this function can be executed in parallel
-- by the parallel query framework.
function reducer(in_cur in sale_cur_t) return total_sales_t
pipelined parallel_enable (partition in_cur by hash(salesman))
-- Finally we can cluster the results so that similar rows are chunked
-- together when used (note this does not drive distribution over the
-- parallel slaves, which is done by the partition clause shown in the mapper
-- and reducers)
cluster in_cur by (salesman);
-- The body of the package has the mapper and the reducer code
-- The header as is shown here by itself defines the signature of
-- the package and declares types and variables to be used in the
-- package.
-- The upper part is the header the following part if the body
-- Note the difference in the create statement below as compared
-- to the header
create or replace package body oracle_map_reduce
function mapper(inp_cur in sys_refcursor) return sales_t
pipelined parallel_enable (partition inp_cur by any)
sales_rec sls%ROWTYPE;
-- construct a record to hold an entire row from the SLS table
-- First loop over all records in the table
fetch inp_cur into sales_rec;
exit when inp_cur%notfound;
-- Place the found records from SLS into the variable
-- end the loop when there are no more rows to loop over
pipe row (sales_rec);
-- by using pipe row here we are giving back rows in streaming
-- fashion as you would expect from a table
-- this in combination with pipelined in the definition allows
-- the pipelining (e.g. giving data as it comes on board) of
-- a table function
end loop;
-- Return is a mandatory piece that allows the consumer of data (our reducer
-- in this case)
-- to ensure all data has been sent. After return the rowsource is exhausted
-- and no more data comes from this function.
end mapper;
-- The above mapper does in effect nothing other than streaming data
-- partitioned
-- over to the next step. In MR the stream would be written to a file and then -- redistributed to the reducers
-- The reducer below computes and emits the sales figures
function reducer(in_cur in sale_cur_t) return total_sales_t
pipelined parallel_enable (partition in_cur by hash(salesman))
-- The partition by clause indicates that all instances of a particular
-- salesman must be sent to one instances of the reducer function
cluster in_cur by (salesman)
-- The cluster by clause tells the parallel query framework to cluster
-- all instances of a particular salesman together.
sale_rec sls%ROWTYPE;
total_sale_rec sale_rec_t;
-- two containers are created, one as input the other as output
total_sale_rec.total := 0;
total_sale_rec.name := NULL;
-- reset the values to initial values
fetch in_cur into sale_rec;
exit when in_cur%notfound;
-- some if then logic to ensure we pipe a row once all is processed
if (total_sale_rec.name is null) then
-- The first instance is arriving, set the salesman value to that
-- input value
-- update 0 plus the incoming value for total
total_sale_rec.name := sale_rec.salesman;
total_sale_rec.total := total_sale_rec.total +
elsif ( total_sale_rec.name <> sale_rec.salesman) then
-- We now switch sales man, and are done with the first
-- salesman (as rows are partitioned and clustered)
-- First pipe out the result of the previous salesman we
-- processed
-- then update the information to work on this new salesman
pipe row (total_sale_rec);
total_sale_rec.name := sale_rec.salesman;
total_sale_rec.total := sale_rec.quantity;
-- We get here when we work on the same salesman and just add
-- the totals, the move on to the next record
total_sale_rec.total := total_sale_rec.total +
end if;
end loop;
-- The next piece of code ensures that any remaining rows that
-- have not been piped out
-- are piped out to the consumer. If there is a single salesman,
-- he is only piped out
-- in this piece of logic as we (in the above example code) only
-- pipe out upon a change
-- of salesman
if total_sale_rec.total<> 0 then
pipe row (total_sale_rec);
end if;
-- Again, we are now done and have piped all rows to our consumer
end reducer;
select *
from table(oracle_map_reduce.reducer(cursor(
select * from table(oracle_map_reduce.mapper(cursor(
select * from sls))) map_result)));