针对同一个客户端,MINA怎么可以同时开启多个线程以不同的频率发送多种消息呢?

颖辉小居 发布于 2016/01/12 20:58
阅读 2K+
收藏 0

我的项目要求在客户端连接到服务器的时候,就会时时的给他每秒一次的推送数据,还有其他的数据有的是5秒一次,有的是1分一次。我试着自己建了多个线程传入ISsession 发送数据 总是报错。如果只开启一个线程就正常。

下面是hander中的代码:

@Override
	public void sessionOpened(IoSession session) throws Exception {
		System.out.println("【server】sessionOpened ID:" + session.getId());
		if (allIoSessions == null) {
			allIoSessions = session.getService().getManagedSessions();
		}
		System.out.println("有人连接,当前客户数:" + allIoSessions.size());
		new Thread(new AllPoliceCoordsThread(session, service)).start();
		new Thread(new HotPoliceCoordsThread(session, service)).start();
		new Thread(new UserCoordsThread(session, service)).start();

	}

注释掉两个线程的话,只留一个(1秒发一次)就正常了。可是如果把频率改成1毫秒一次。就算一个线程也要不停地报错

加载中
1
x未央
x未央
增加一个队列,多线程产生的消息都先入队
0
颖辉小居
颖辉小居
该评论暂时无法显示,详情咨询 QQ 群:点此入群
0
颖辉小居
颖辉小居
package com.tcp.mina.main;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.apache.mina.core.IoUtil;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

import com.safe.model.Police;
import com.safe.model.User;
import com.safe.service.CenterService;
import com.safe.util.ConstantUtil;
import com.safe.util.DateUtil;
import com.tcp.mina.frame.Frame;
import com.tcp.mina.model.M_p_HandingAlarm;
import com.tcp.mina.model.M_p_Pant;
import com.tcp.mina.model.M_p_PeopleInfo;
import com.tcp.mina.model.M_q_GetPeopleInfo;
import com.tcp.mina.model.M_q_HandingAlarm;
import com.tcp.mina.model.M_q_Pant;
import com.tcp.mina.model.PoliceTask;
import com.tcp.mina.msgcoder.MsgCoder;
import com.tcp.mina.thread.AllPoliceCoordsThread;
import com.tcp.mina.thread.HotPoliceCoordsThread;
import com.tcp.mina.thread.UserCoordsThread;
import com.tcp.mina.util.TCPConstant;

public class MyServerHandler extends IoHandlerAdapter {

	public static Map<Long, IoSession> allIoSessions;

	CenterService service;

	public MyServerHandler(CenterService service) {
		super();
		this.service = service;
	}

	@Override
	public void exceptionCaught(IoSession session, Throwable cause)
			throws Exception {
		System.out.println("exceptionCaught");
		cause.printStackTrace();
	}

	@Override
	public void messageReceived(IoSession session, Object message)
			throws Exception {
		System.out.println("【server】messageReceived: " + message);
		Frame frame = null;
		if (message instanceof Frame) {
			frame = (Frame) message;
		} else {
			return;
		}
		int msgType = frame.getMsgType();
		switch (msgType) {
		/** 一 收到心跳 */
		case TCPConstant.MSGTYPE_Q_PANT:
			M_q_Pant m_q_Pant = new MsgCoder<M_q_Pant>().readFrame(frame,
					M_q_Pant.class);
			System.out.println("请求消息-心跳请求:" + m_q_Pant);
			// 响应心跳
			M_p_Pant p_Pant = new M_p_Pant();
			session.write(new MsgCoder<M_p_Pant>().readMsg(p_Pant));
			break;
		/** 二 获取人物信息请求 */
		case TCPConstant.MSGTYPE_Q_PEOPLEINFO:
			M_q_GetPeopleInfo q_GetPeopleMsg = new MsgCoder<M_q_GetPeopleInfo>()
					.readFrame(frame, M_q_GetPeopleInfo.class);
			int peopleType = q_GetPeopleMsg.getType();
			if (TCPConstant.PEOPLETYPE_POLICE == peopleType) {// 警员
				Police police = service.getPoliceByPoliceNo(q_GetPeopleMsg
						.getID().trim());
				String birthdayStr = DateUtil.fmtDateToStr(police.getDetails()
						.getBirthday(), "yyyy-MM-dd");

				Frame policeFrame = new MsgCoder<M_p_PeopleInfo>()
						.readMsg(new M_p_PeopleInfo(police.getPoliceNo(),
								peopleType, police.getDetails().getName(),
								birthdayStr, police.getDetails().getTel(),
								police.getDetails().getAddress(), police
										.getDetails().getShenFenId(), police
										.getDetails().getPhoto()));
				session.write(policeFrame);

			} else if (TCPConstant.PEOPLETYPE_USER == peopleType) {// 用户
				User user = service.getUserByLoginName(q_GetPeopleMsg.getID()
						.trim());
				String birthdayStr = DateUtil.fmtDateToStr(user.getDetails()
						.getBirthday(), "yyyy-MM-dd");
				Frame userFrame = new MsgCoder<M_p_PeopleInfo>()
						.readMsg(new M_p_PeopleInfo(user.getLoginName(),
								peopleType, user.getDetails().getName(),
								birthdayStr, user.getDetails().getTel(), user
										.getDetails().getAddress(), user
										.getDetails().getShenFenId(), user
										.getDetails().getPhoto()));
				session.write(userFrame);
			}
			break;
		/** 三 处理报警请求 */
		case TCPConstant.MSGTYPE_Q_HANDINGALARM:
			// 解析请求消息
			M_q_HandingAlarm q_handingMsg = new MsgCoder<M_q_HandingAlarm>()
					.readFrame(frame, M_q_HandingAlarm.class);
			// 操作派警
			int alarmId = q_handingMsg.getEventID();
			int type = q_handingMsg.getType();

			if (type == ConstantUtil.ALARM_TYPE_TRUE) {// 真警则派发任务
				// 派发任务
				List<PoliceTask> policeTasks = q_handingMsg.getTask();
				List<String> policeNos = new ArrayList<String>();
				String taskContent = "";
				for (PoliceTask policeTask : policeTasks) {
					policeNos.add(policeTask.getPoliceID());
					taskContent = policeTask.getTaskInfo();
				}
				service.sendTaskAndNotice(policeNos, taskContent, alarmId);
			} else {// 不是真警 则修改对应类型 1假警 2重复报警
				// 修改报警类别
				service.updateAlarmStatus(alarmId, type);
			}
			/** 响应该报警的最新状态 */
			// 发送处理报警的响应消息 1 这里直接返回给所有客户端,2警员提交任务时 发送给所有客户端
			// AlarmInfo alarmInfo=service.getAlarmById();
			// TODO 假数据
			M_p_HandingAlarm msg = new M_p_HandingAlarm(alarmId, 2);
			Frame pMsgframe = new MsgCoder<M_p_HandingAlarm>().readMsg(msg);
			// 获取所有正在连接的IoSession
			Collection<IoSession> sessions = session.getService()
					.getManagedSessions().values();

			// 将消息写到所有IoSession
			IoUtil.broadcast(pMsgframe, sessions);
			break;
		default:
			System.out.println("TCP:TCPListenRequest 未知的请求!~");
			break;
		}

		// session.write(message);
	}

	@Override
	public void messageSent(IoSession session, Object message) throws Exception {
		System.out.println("【server】messageSent: " + message);
	}

	@Override
	public void sessionClosed(IoSession session) throws Exception {
		System.out.println("【server】sessionClosed");
		System.out.println("有人关闭,当前客户数:" + allIoSessions.size());
	}

	@Override
	public void sessionCreated(IoSession session) throws Exception {
		System.out.println("【server】sessionCreated");
	}

	@Override
	public void sessionIdle(IoSession session, IdleStatus status)
			throws Exception {
		System.out.println("【server】sessionIdle");
	}

	@Override
	public void sessionOpened(IoSession session) throws Exception {
		System.out.println("【server】sessionOpened ID:" + session.getId());
		if (allIoSessions == null) {
			allIoSessions = session.getService().getManagedSessions();
		}
		System.out.println("有人连接,当前客户数:" + allIoSessions.size());
		new Thread(new AllPoliceCoordsThread(session, service)).start();
		new Thread(new HotPoliceCoordsThread(session, service)).start();
		new Thread(new UserCoordsThread(session, service)).start();

	}

}

handler的完整代码。结果很惨,同时在多个线程里收发消息就会出现消息错乱了



0
maradona
maradona
该评论暂时无法显示,详情咨询 QQ 群:点此入群
maradona
maradona
回复 @颖辉小居 : 那三个线程难道不是你写的?直接锁住IoSession这个对象不就行了
颖辉小居
颖辉小居
回复 @maradona : 如果是我自己的方法我加个锁没问题,可是发送消息的IOSession 是引用包里的。获得它的地方是通过重写别人的方法,得到的
maradona
maradona
回复 @颖辉小居 : 你都用多线程了...不知道怎么同步?IOSession没说是线程安全的吧..
颖辉小居
颖辉小居
这个同步锁怎么加啊?发出消息都是session直接调用的write
0
颖辉小居
颖辉小居

时隔一年再看这个问题 感触良多

0
颖辉小居
颖辉小居
package yh.net.mina;

import org.apache.mina.core.session.IoSession;

public class IoSender {
	public static void noticeMsg(IoSession session, Object msg) {
		synchronized (session) {
			session.write(msg);
		}
	}
}

这样应该可以吧!

为了解决服务端主动持续的多线程的向同一个Iosession(客户端)发送消息

OSCHINA
登录后可查看更多优质内容
返回顶部
顶部
返回顶部
顶部