当前访客身份:游客 [ 登录 | 加入开源中国 ]

代码分享

当前位置:
代码分享 » Java  » 编程基础
分享到: 
收藏 +0
2
Node.Js的基于事件驱动的单线程异步编程模型让我印象深刻,其实任何一种语言都能实现这个模型。下面我就试着用Java语言来实现一个简单的事件驱动的单线程异步框架。

设计思想很简单。只要设置一个存放任务的阻塞队列,让一个工作线程每次从该队列中取出一个任务并执行,完毕再取下一个任务。由于单线程多任务,因此每个任务的粒度要充分小,否则一个大任务会阻塞其他任务的执行。解决的办法是将一个大任务切分成很多小任务,每个小任务能快速执行。(其实和CPU时间分片的原理一样)由于在运行过程中会创建大量瞬时(instant)的任务对象,可以预见在内存空间不充裕的情况下GC会比较频繁。 (当然这和并发量也有关系,但即使只实现一个普通的计算需求,比如计算一个级数的前100万项,也会产生至少100万个计算任务对象)

代码中给出了三个例子,一个是利用级数求PI的近似值,一个是读一个文本文件并统计每行文本的长度和单词数,最后一个例子是将前两个例子合并起来,观察单线程异步的并发效果. 


标签: <无>

代码片段(13) [全屏查看所有代码]

1. [文件] EventEmitter.java ~ 760B     下载(82)     跳至 [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] [12] [13] [全屏预览]

package event;

/**
 * User: 刘永健
 * Date: 12-10-2
 * Time: 下午10:53
 * To change this template use File | Settings | File Templates.
 */

/**
 * 一个EventEmitter能为某个事件注册监听器,或发出某个事件的通知
 */
public interface EventEmitter {
    /**
     * 为事件注册监听器
     * @param eventName 事件名
     * @param handler
     */
    public void on(String eventName, EventHandler handler);

    /**
     * 发出某个事件的通知
     * @param eventName 事件名
     * @param args
     */
    public void emit(String eventName, Object ... args);

    /**
     * 移除该事件的所有监听器
     * @param eventName
     */
    public void remove(String eventName);
}

2. [文件] EventHandler.java ~ 236B     下载(63)     跳至 [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] [12] [13] [全屏预览]

package event;

/**
 * User: 刘永健
 * Date: 12-10-2
 * Time: 下午9:30
 * To change this template use File | Settings | File Templates.
 */
public interface EventHandler {
     public void handle(EventObject event);
}

3. [文件] EventObject.java ~ 856B     下载(64)     跳至 [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] [12] [13] [全屏预览]

package event;

/**
 * User: 刘永健
 * Date: 12-10-2
 * Time: 下午9:32
 * To change this template use File | Settings | File Templates.
 */

/**
 * 表示一个事件
 */
public class EventObject {
    final private String eventName; // 事件名
    final private Object source;  // 事件源
    final private Object args[];  // 可选参数

    public EventObject(String eventName, Object source) {
        this(eventName, source, null);
    }

    public EventObject(String eventName, Object source, Object[] args) {
        this.eventName = eventName;
        this.source = source;
        this.args = args;
    }

    public String getEventName() {
        return eventName;
    }

    public Object getSource() {
        return source;
    }

    public Object[] getArgs() {
        return args;
    }
}

4. [文件] Task.java ~ 295B     下载(65)     跳至 [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] [12] [13] [全屏预览]

package event;

/**
 * User: 刘永健
 * Date: 12-10-2
 * Time: 下午9:30
 * To change this template use File | Settings | File Templates.
 */

/**
 * 表示一个任务
 */
public interface Task {
    /**
     * 执行一个具体任务
     */
    public void execute();
}

5. [文件] TaskController.java ~ 851B     下载(65)     跳至 [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] [12] [13] [全屏预览]

package event;

/**
 * User: 刘永健
 * Date: 12-10-3
 * Time: 下午1:48
 * To change this template use File | Settings | File Templates.
 */

/**
 * 任务控制器
 */
public interface TaskController {
    /**
     * 启动任务管理器
     */
    public void start();

    /**
     * 停止任务管理器。当调用这个方法后,任务管理器不再接收新提交的任务,但会继续执行已提交的任务
     */
    public void stop();

    /**
     * 立即关闭任务管理,已提交且未开始执行的任务将会被丢弃
     */
    public void shutdown();

    /**
     * 任务管理器是否停止
     * @return
     */
    public boolean isStop();

    /**
     * 任务管理器
     * @return
     */
    public boolean isShutdown();
    public boolean isRunning();
}

6. [文件] TaskEventEmitter.java ~ 3KB     下载(61)     跳至 [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] [12] [13] [全屏预览]

package event;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
 * User: 刘永健
 * Date: 12-10-2
 * Time: 下午11:34
 * To change this template use File | Settings | File Templates.
 */
abstract public class TaskEventEmitter implements Task, EventEmitter {
    private Map<String, List<EventHandler>> eventHandlerMap = new HashMap<String, List<EventHandler>>();
    final private TaskExecutor executor;

    public TaskEventEmitter(TaskExecutor executor) {
        this.executor = executor;
        addDefaultExceptionHandlers();
    }

    protected void addDefaultExceptionHandlers() {
        EventHandler eh = new EventHandler() {
            @Override
            public void handle(EventObject event) {
                ((Exception) event.getSource()).printStackTrace();
            }
        };
        on(Exception.class.getName(), eh);
        on(IOException.class.getName(), eh);
        on(RuntimeException.class.getName(), eh);
        on(NullPointerException.class.getName(), eh);
        on(IndexOutOfBoundsException.class.getName(), eh);
    }

    @Override
    public void on(String eventName, EventHandler handler) {
        if (!eventHandlerMap.containsKey(eventName)) {
            List<EventHandler> eventHandlerList = new LinkedList<EventHandler>();
            eventHandlerMap.put(eventName, eventHandlerList);
        }
        eventHandlerMap.get(eventName).add(handler);
    }

    @Override
    public void remove(String eventName) {
        eventHandlerMap.remove(eventName);
    }

    @Override
    public void emit(final String eventName, final Object... args) {
        if (eventHandlerMap.containsKey(eventName)) {
            List<EventHandler> eventHandlerList = eventHandlerMap.get(eventName);
            for (final EventHandler handler : eventHandlerList) {
                executor.submit(new Task() {
                    @Override
                    public void execute() {
                        handler.handle(new EventObject(eventName, TaskEventEmitter.this, args));
                    }
                });
            }
        } else {
            System.out.println("No event handler listen this event: " + eventName);
            if (args[0] instanceof Exception) {
                ((Exception) args[0]).printStackTrace();
            }
        }
    }

    @Override
    public void execute() {
        try {
            run();
        } catch (Exception e) {
            emit(e.getClass().getName(), e);
        }
    }

    abstract protected void run() throws Exception;
}

7. [文件] TaskExecutor.java ~ 434B     下载(59)     跳至 [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] [12] [13] [全屏预览]

package event;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

/**
 * User: 刘永健
 * Date: 12-10-2
 * Time: 下午9:37
 * To change this template use File | Settings | File Templates.
 */

/**
 * 任务执行器
 */
 public interface TaskExecutor extends Task {

    /**
     * 提交一个任务
     * @param task
     */
    public void submit(Task task);
}

8. [文件] TaskManager.java ~ 4KB     下载(60)     跳至 [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] [12] [13] [全屏预览]

package event;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * User: 刘永健
 * Date: 12-10-3
 * Time: 下午1:47
 * To change this template use File | Settings | File Templates.
 */

/**
 * 表示一个任务管理器
 */
public class TaskManager implements TaskController {
    private TaskExecutor executor;  // 任务执行器
    private Thread executorThread;  // 用于执行任务的工作线程
    private int lenOfTaskQueue = Integer.MAX_VALUE;   // 任务队列的长度,默认为无限大

    public TaskManager() {
    }

    public TaskManager(int lenOfTaskQueue) {
        this.lenOfTaskQueue = lenOfTaskQueue;
    }

    public TaskManager(TaskExecutor executor) {
        this.executor = executor;
    }

    public TaskExecutor getExecutor() {
        if (executor == null){
            init();
        }
        return executor;
    }

    private class DefaultTaskExecutor implements TaskExecutor {
        final private BlockingQueue<Task> taskQueue; // 任务队列,所有待执行的任务都会被放入这个队列,等待执行

        public DefaultTaskExecutor() {
            this(Integer.MAX_VALUE);
        }

        public DefaultTaskExecutor(int len) {
            taskQueue = new LinkedBlockingQueue<Task>(len);
        }

        @Override
        public void submit(Task task) {
            if (!isStop) {
                try {
                    taskQueue.put(task);
                } catch (InterruptedException e) {
                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                }
            } else {
                throw new IllegalStateException("This executor is not alive.");
            }
        }


        @Override
        public void execute() {
            isRunning = true;
            taskQueue.clear();
            System.out.println("Start to process task ... ");
            while (!isShutdown && (!isStop || taskQueue.size() > 0)) {
                try {
                    Task task = taskQueue.take();
//                    System.out.println("Tak a task to execute , now the length of  task queue is " + taskQueue.size());
                    task.execute();
                } catch (InterruptedException e) {
                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                }
            }
            System.out.println("Stop to process task ... ");
            isRunning = false;
        }

    }

    private boolean isStop = true;  // 标识执行器是否接收新的任务
    private boolean isShutdown = true; // 标识执行器是否立即结束
    private boolean isRunning = false;  // 标识执行器是否正在运行

    public void start() {
        if (executor == null) {
            init();
        }
        if (isStop && !isRunning) {
            isStop = false;
            isShutdown = false;
            executorThread = new Thread() {
                public void run() {
                    executor.execute();
                }
            };
            executorThread.start();
        } else {
            throw new IllegalStateException("This executor is still alive or running, please stop or shutdown immediately and try it again later");
        }
    }

    /**
     * 初始化一个任务执行器
     */
    private void init() {
        executor = new DefaultTaskExecutor(lenOfTaskQueue);
    }

    public void stop() {
        isStop = true;
        executorThread.interrupt();
    }

    public void shutdown() {
        isStop = true;
        isShutdown = true;
        executorThread.interrupt();
    }

    @Override
    public boolean isStop() {
        return this.isStop;
    }

    @Override
    public boolean isShutdown() {
        return this.isShutdown;
    }

    @Override
    public boolean isRunning() {
        return this.isRunning;
    }


}

9. [文件] IODemo.java ~ 1KB     下载(60)     跳至 [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] [12] [13] [全屏预览]

package event;

import java.io.*;

/**
 * User: 刘永健
 * Date: 12-10-3
 * Time: 下午3:54
 * To change this template use File | Settings | File Templates.
 */
public class IODemo {
    public static void main(String[] args) throws InterruptedException {
        String fileName = "info.txt";
        final TaskManager manager = new TaskManager();
        manager.start();
        TaskExecutor executor = manager.getExecutor();
        Task ioTask = TaskHelper.createIOTask(executor, fileName);
        executor.submit(ioTask);
        Thread.sleep(5000L);
        manager.stop();
    }
}

class IOTask extends TaskEventEmitter {
    final private String fileName;
    final private String encoding;

    public IOTask(TaskExecutor executor, String fileName, String encoding) {
        super(executor);
        this.fileName = fileName;
        this.encoding = encoding;
    }

    public String getFileName() {
        return fileName;
    }

    public String getEncoding() {
        return encoding;
    }

    @Override
    protected void run() throws Exception {
        InputStream fis = this.getClass().getResourceAsStream("/" + fileName);

        if (fis != null) {
            BufferedReader reader = new BufferedReader(new InputStreamReader(fis, encoding));
            emit("open", getFileName());
            emit("next", reader);
        }else{
            throw new FileNotFoundException(fileName);
        }
    }
}


10. [文件] PICalcDemo.java ~ 956B     下载(59)     跳至 [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] [12] [13] [全屏预览]

package event;

/**
 * User: 刘永健
 * Date: 12-10-3
 * Time: 下午5:01
 * To change this template use File | Settings | File Templates.
 * π/4 = 1 - 1/3 + 1/5 - 1/7 + 1/9 - 1/11 + … + (-1)^(n-1)/(2*n-1)
 */
public class PICalcDemo {
    public static void main(String[] args) {
        final TaskManager manager = new TaskManager();
        manager.start();
        TaskExecutor executor = manager.getExecutor();
         Task piTask = TaskHelper.createPiTask(executor, 10000);
        executor.submit(piTask);
    }

}

class PICalcTask extends TaskEventEmitter {
    private final int N;

    PICalcTask(TaskExecutor executor, int n) {
        super(executor);
        if (n < 1) throw new IllegalArgumentException("n must be larger than 0");
        this.N = n;
    }

    public int getN() {
        return N;
    }

    @Override
    protected void run() throws Exception {
        emit("next", 1);
    }
}

11. [文件] CompoundTaskDemo.java ~ 981B     下载(59)     跳至 [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] [12] [13] [全屏预览]

package event;

/**
 * User: 刘永健
 * Date: 12-10-3
 * Time: 下午5:27
 * To change this template use File | Settings | File Templates.
 */
public class CompoundTaskDemo {
    public static void main(String[] args) {
        TaskManager manager = new TaskManager();
        TaskExecutor executor = manager.getExecutor();
        manager.start();
        TaskEventEmitter ioTask = TaskHelper.createIOTask(executor, "info.txt");
        TaskEventEmitter piTask = TaskHelper.createPiTask(executor, 100);
        final TaskEventEmitter guardTask = new GuardTask(manager, 2);
        EventHandler handler = new EventHandler() {
            @Override
            public void handle(EventObject event) {
                guardTask.emit("end");
            }
        };
        ioTask.on("close",handler);
        piTask.on("finish", handler);
        executor.submit(ioTask);
        executor.submit(piTask);
        executor.submit(guardTask);

    }
}

12. [文件] TaskHelper.java ~ 5KB     下载(59)     跳至 [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] [12] [13] [全屏预览]

package event;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;

/**
 * User: 刘永健
 * Date: 12-10-3
 * Time: 下午5:28
 * To change this template use File | Settings | File Templates.
 */
public class TaskHelper {

    public static TaskEventEmitter createIOTask(TaskExecutor executor, String fileName){

        final IOTask task = new IOTask(executor, fileName, "UTF-8");

        task.on("open", new EventHandler() {
            @Override
            public void handle(EventObject event) {
                String fileName = (String) event.getArgs()[0];
                System.out.println(Thread.currentThread() + " - " + fileName + " has been opened.");
            }
        });
        task.on("next", new EventHandler() {
            @Override
            public void handle(EventObject event) {
                BufferedReader reader = (BufferedReader) event.getArgs()[0];
                try {
                    String line = reader.readLine();
                    if (line != null) {
                        task.emit("ready", line);
                        task.emit("next", reader);
                    } else {
                        task.emit("close", task.getFileName());
                    }
                } catch (IOException e) {
                    task.emit(e.getClass().getName(), e, task.getFileName());
                    try {
                        reader.close();
                        task.emit("close", task.getFileName());
                    } catch (IOException e1) {
                        e1.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                    }
                }
            }
        });
        task.on("ready", new EventHandler() {
            @Override
            public void handle(EventObject event) {
                String line = (String) event.getArgs()[0];
                int len = line.length();
                int wordCount = line.split("[\\s+,.]+").length;
                System.out.println(Thread.currentThread()+" - word count: "+wordCount+" length: "+len);
            }
        });
        task.on(IOException.class.getName(), new EventHandler() {
            @Override
            public void handle(EventObject event) {
                Object[] args = event.getArgs();
                IOException e = (IOException) args[0];
                String fileName = (String) args[1];
                System.out.println(Thread.currentThread()+ " - An IOException occurred while reading " + fileName + ", error: " + e.getMessage());
            }
        });
        task.on("close", new EventHandler() {
            @Override
            public void handle(EventObject event) {
                String fileName = (String) event.getArgs()[0];
                System.out.println(Thread.currentThread() + " - " + fileName + " has been closed.");
            }
        });
        task.on(FileNotFoundException.class.getName(), new EventHandler() {
            @Override
            public void handle(EventObject event) {
                FileNotFoundException e = (FileNotFoundException) event.getArgs()[0];
                e.printStackTrace();
                System.exit(1);
            }
        });
        return task;
    }


    public static TaskEventEmitter createPiTask(TaskExecutor executor, int n) {
        final PICalcTask task = new PICalcTask(executor, n);
        // 计算下一个级数项
        task.on("next", new EventHandler() {
            @Override
            public void handle(EventObject event) {
                int n = ((Integer) event.getArgs()[0]).intValue();
                double xn = Math.pow(-1, n - 1) / (2 * n - 1);
                task.emit("sum", xn);
            }
        });
        // 将每一个级数项加起来
        task.on("sum", new EventHandler() {
            private int i = 0;
            private double sum = 0;

            @Override
            public void handle(EventObject event) {
                double xn = ((Double) event.getArgs()[0]).doubleValue();
                sum += xn;
                i++;
                System.out.println(Thread.currentThread()+" - sum = "+ sum);
                if (i >= task.getN()) {
                    task.emit("finish", sum * 4);
                } else {
                    task.emit("next", i + 1);
                }
            }
        });
        // 完成PI的近似计算
        task.on("finish", new EventHandler() {
            @Override
            public void handle(EventObject event) {
                Double sum = (Double) event.getArgs()[0];
                System.out.println("pi=" + sum);
            }
        });
        return task;
    }
}

13. [文件] GuardTask.java ~ 830B     下载(58)     跳至 [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] [12] [13] [全屏预览]

package event;

/**
 * User: 刘永健
 * Date: 12-10-3
 * Time: 下午6:35
 * To change this template use File | Settings | File Templates.
 */

/**
 * 守卫任务类
 * <p></p>
 * 当所有具体任务都执行完毕,通知任务管理器关闭
 */
public class GuardTask extends TaskEventEmitter {
    private final int N;

    public GuardTask(final TaskManager manager, int n) {
        super(manager.getExecutor());
        this.N = n;
        on("end", new EventHandler() {
            private  int i=0;
            @Override
            public void handle(EventObject event) {
                i++;
                if (i >= N){
                    manager.stop();
                }
            }
        });
    }

    @Override
    protected void  run() throws Exception {

    }
}


开源中国-程序员在线工具:Git代码托管 API文档大全(120+) JS在线编辑演示 二维码 更多»

发表评论 回到顶部 网友评论(10)

  • 1楼:calvinlsy 发表于 2012-10-08 15:05 回复此评论

  • 2楼:山药蛋 发表于 2012-10-09 09:18 回复此评论
    当看到你的标题,我就想到了Node.js
  • 3楼:mj4738 发表于 2012-10-12 21:58 回复此评论

    引用来自“calvinklein”的评论


  • 4楼:firefoxmmx 发表于 2012-10-21 19:56 回复此评论
    参考参考~

  • 5楼:大蛋散 发表于 2012-12-03 16:40 回复此评论
    很有参考价值。
    不过nodeJs是通过linux的异步系统调用来实现异步通信的, 你这个框架貌似少了点东西。。。能实现任务的异步调度吗?
  • 6楼:mj4738 发表于 2012-12-25 14:45 回复此评论

    引用来自“大蛋散”的评论

    很有参考价值。
    不过nodeJs是通过linux的异步系统调用来实现异步通信的, 你这个框架貌似少了点东西。。。能实现任务的异步调度吗?
    可以,等我有空
  • 7楼:xiaoxiongying 发表于 2012-12-25 20:49 回复此评论
    顶起,楼主好强大
  • 8楼:keleba2013 发表于 2013-09-12 09:18 回复此评论
    这个有很多问题的。
    1.内存无法回收。因为使用map存放handler。执行完事件没释放资源。导致hangle中引用的资源java也无法回收。如果事件注册多了。容易内存溢出。如果释放了map中得hangle。那么在handle中就无法再使用释放的事件了
  • 9楼:iMouse-Wu 发表于 2016-07-27 16:43 回复此评论
    感觉,把事件处理和任务处理分开实现会降低代码的复杂度(任务处理可以直接借助线程池)
  • 10楼:yayi 发表于 2016-07-27 19:20 回复此评论

    引用来自“iMouse-Wu”的评论

    感觉,把事件处理和任务处理分开实现会降低代码的复杂度(任务处理可以直接借助线程池)
    6666666,先Mark一下
开源从代码分享开始 分享代码