Java 多线程 你怎么看、

Ken5233 发布于 2013/04/17 16:45
阅读 1K+
收藏 2

最近在学习多线程,之前这块一直都是灰色地带,工作上用得少,自己也有畏惧心理。

我想实现的是,消息发送队列机制:一个多线程用于获取待发送队列,一个多线程用于发送。待发送队列大小是不确定的,待发送信息是有优先级的。

主要设计了三个类:MsgClient 用于获取发送队列  MsgSender 用于发送, MsgQueue 发送队列

消息对象

package run.thread.msg;

import java.util.Date;

/**
 * @author caomm
 * 
 */
public class Msg {

	/***
	 * 发送状态
	 * @author caomm
	 */
	public enum MsgStatus {
		ready, complate, success, error
	}

	private String id;

	private Date createDate;

	private Date sendDate;

	private String content;

	private Integer priority;

	private MsgStatus status;

	public Msg() {
		this.priority = 5;
		this.status = MsgStatus.ready;
		this.createDate = new Date();
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getContent() {
		return content;
	}

	public void setContent(String content) {
		this.content = content;
	}

	public Integer getPriority() {
		return priority;
	}

	public void setPriority(Integer priority) {
		this.priority = priority;
	}

	public MsgStatus getStatus() {
		return status;
	}

	public void setStatus(MsgStatus status) {
		this.status = status;
	}

	public Date getCreateDate() {
		return createDate;
	}

	public void setCreateDate(Date createDate) {
		this.createDate = createDate;
	}

	public Date getSendDate() {
		return sendDate;
	}

	public void setSendDate(Date sendDate) {
		this.sendDate = sendDate;
	}

	@Override
	public String toString() {
		return createDate.toString();
	}

}
package run.thread.msg;

/***
 * 客户端,负责获取未发送的短信
 * 
 * @author caomm
 * 
 */
public class MsgClient implements Runnable {

	private Msg msg;

	private MsgQueue queue;

	public MsgClient(MsgQueue queue) {
		this.queue = queue;
	}

	public MsgClient(MsgQueue queue, Msg msg) {
		this.queue = queue;
		this.msg = msg;
	}

	@Override
	public void run() {
		synchronized (queue) {
			this.msg = new Msg();
			queue.add(msg);
			queue.notify();
		}
	}

}
package run.thread.msg;

import java.util.Date;

import run.thread.msg.Msg.MsgStatus;

/***
 * 发送端,负责进行发送
 * 
 * @author caomm
 * 
 */
public class MsgSender extends Thread {

	public MsgQueue queue;

	public MsgSender() {
	}

	public MsgSender(MsgQueue queue) {
		this.queue = queue;
	}

	@Override
	public void run() {
		synchronized (queue) {
			Msg msg = queue.get();
			if (msg == null) {
				System.out.println(Thread.currentThread().getName() + " 线程等待!");
				try {
					queue.wait();
					this.join();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}

			// 重新获得对象锁时 再次判断对象是否为空
			while (msg == null) {
				msg = queue.get();
			}
			msg.setStatus(MsgStatus.complate);
			msg.setSendDate(new Date());
			System.out.println(Thread.currentThread().getName() + " sender : " + msg);

		}
	}

}
package run.thread.msg;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/***
 * 发送队列
 * 
 * @author caomm
 * 
 */
public class MsgQueue {

	private Queue<Msg> queue = new ConcurrentLinkedQueue<Msg>(); // 正常发送队列

	private Queue<Msg> highPriority = new ConcurrentLinkedQueue<Msg>();// 插队高级队列

	public void add(Msg msg) {
		// 新添加的消息优先级别若大于 8 则添加到插队队列
		if (msg.getPriority() > 8)
			this.highPriority.offer(msg);
		else
			this.queue.offer(msg);
	}

	/***
	 * 获取一条待发送的短信
	 * 
	 * @return
	 */
	public Msg get() {
		// 优于考虑插队队列
		if (!highPriority.isEmpty())
			return highPriority.poll();
		else if (!queue.isEmpty())
			return queue.poll();
		return null;
	}

	public void run() {
		// none
	}

}
package run.thread.msg;



public class MsgCase {

	/***
	 * 发送
	 */
	public void work() {

	}

	public static void main(String[] args) {
		try {
			int threadSize = 20;

			MsgQueue queue = new MsgQueue();

			MsgSender ms1 = new MsgSender(queue);
			for (int i = 0; i < threadSize; i++) {
				new Thread(ms1, "发送线程 [2-" + i + "]: ").start();
			}

			MsgClient mc1 = new MsgClient(queue);
			for (int i = 0; i < threadSize; i++) {
				new Thread(mc1, "查询线程 [1-" + i + "]: ").start();
			}

		} catch (Exception e) {
			e.printStackTrace();
		}
	}


	/***
	 * 收回执
	 */
	public void receipt() {

	}

}

 上述代码,若是在 MsgSender 中, wait 过后不添加 while 则会出现null 现象,不知道 while 会不会有性能影响

另, 在 MsgClient 中, notify 与 notifyAll 有何区别,jdk 中的解释很抽象,请大家给个形象的解释 呵呵。

欢迎大家拍砖与指导给出不同意见或实现 谢谢!!!

加载中
0
Ken5233
Ken5233
没人坐沙发 
0
stevenliu
stevenliu

元方 就这么看!

0
stevenliu
stevenliu
你发错位置了吧!
Ken5233
Ken5233
?难道我不是发的java 技术问答?
0
李国刚
李国刚

用LinkedBlockingQueue 肯定比你这个高效


Ken5233
Ken5233
谢谢提醒,我暂时还未考虑此性能问题,只是想我这个是否有错,希望得到大家的指点
0
huan
huan
notify 只会通知一个等待锁的线程,notifyall 会通知所有等待锁的线程,一般使用notifyall, 以免等待不同条件的线程丢失通知。你这里最好使用信号量,不要自己手动的写wait和notify。java1.5之后的多线程已经得到极大加强。你不会还在用1.4吧。
Ken5233
Ken5233
谢谢提醒,我去看看关于信号量的使用api 不过这个地方,我觉得应该是使用notify为好
0
Ken5233
Ken5233

经过多次测试,发现会有死循环.

主要是当线程wait 之后,怎样回到原来的线程。

notify不会准确的通知某一线程去启动,而是由jvm通知任意线程。

其实这个应该就是一个生产者消费者 只是会有多个消费者和生产者而已


0
小孙学无止境
小孙学无止境
有现成的mq不用,何必自造轮子
Ken5233
Ken5233
能不能给个具体的链接
Ken5233
Ken5233
谢谢提醒,我不知道。问度娘去
0
Ken5233
Ken5233
菜鸟了,上述代码中 queue 就是线程安全的 所以不需要同步。
0
震秦
震秦

引用来自“huan”的答案

notify 只会通知一个等待锁的线程,notifyall 会通知所有等待锁的线程,一般使用notifyall, 以免等待不同条件的线程丢失通知。你这里最好使用信号量,不要自己手动的写wait和notify。java1.5之后的多线程已经得到极大加强。你不会还在用1.4吧。

一个活动的线程调用了它的wait()方法该线程会进入休眠。

notify()是只让当前休眠的线程唤醒。

notifyall()会让程序中所有休眠的线程唤醒。

一般的要用notify(),而不是notifyAll(),notifyAll()唤醒所有线程可能会破坏其他框架或者类库中程序的线程逻辑(当然高质量的框架都会自动纠错这种情况)。

0
huan
huan

引用来自“震秦”的答案

引用来自“huan”的答案

notify 只会通知一个等待锁的线程,notifyall 会通知所有等待锁的线程,一般使用notifyall, 以免等待不同条件的线程丢失通知。你这里最好使用信号量,不要自己手动的写wait和notify。java1.5之后的多线程已经得到极大加强。你不会还在用1.4吧。

一个活动的线程调用了它的wait()方法该线程会进入休眠。

notify()是只让当前休眠的线程唤醒。

notifyall()会让程序中所有休眠的线程唤醒。

一般的要用notify(),而不是notifyAll(),notifyAll()唤醒所有线程可能会破坏其他框架或者类库中程序的线程逻辑(当然高质量的框架都会自动纠错这种情况)。

错误,请仔细看notifyAll的定义。而且在多线程使用中推荐使用notifyAll,不容易出现错误。
Ken5233
Ken5233
notifyall 通知所有等待的线程进入cpu排队
返回顶部
顶部