java socket 客户端 这样 写 合不合理?求大家帮忙看一下 在 android 上运行

每周精粹 发布于 2014/09/25 17:09
阅读 215
收藏 0

公司要实现一个简单的聊天功能(在 android 上) ,用socket 做。本人水平有点低,今年刚刚毕业, 所以贴出代码给大家看看,哪里还有改进,谢谢各位的指导。由三个类 组成:  IbbSocketClient : 用于 连接及收发消息,

IbbSocketMessageListener 接口:用于 消息回调。

IbbSocketStateListener 接口 :状态回调

IbbSocketClient:

package com.kunyuanzhihui.ibb.socket;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import android.os.Handler;
import android.text.TextUtils;
import android.util.Log;

import com.kunyuanzhihui.ibb.IbbApplication;

/**
 *
 * SocketClient
 *
 * @author ving
 *
 */

public class IbbSocketClient {
    public final static String HOST = "112.124.97.xxx";
    public final static int PORT = xxxx;
    // 心跳
    public final static short CMD_HEART = 1024;
    // 发送消息
    public final static short CMD_SEND_MESSAGE = 1026;
    // 获取历史消息
    public final static short CMD_GET_HISTORY = 1027;
    // 获取历史消息条数
    public final static short CMD_GET_UNREAD = 1028;
    // 消息设置为已读
    public final static short CMD_SET_READ = 1029;
    // 退出聊天
    public final static short CMD_EXIT = 1030;
    // 收到服务器推送过来的消息
    public final static short CMD_RECEIVE = 2048;

    private Socket socketClient = null;
    // 输出流
    private DataOutputStream socketOutputStrem = null;
    // 输入流
    private DataInputStream socketInputStrem = null;
    private ByteArrayOutputStream tmp_allbyte = null;
    // 单列
    private static IbbSocketClient ibbSocketClient = null;
    // 线程池
    private static ExecutorService mTaskPool = null;
    ExecutorService exec = Executors.newCachedThreadPool();
    private Thread getMessageThread, connectThread;

    // 消息监听
    HashSet<IbbSocketMessageListener> msgListener = new HashSet<IbbSocketMessageListener>();
    // socket监听
    HashSet<IbbSocketStateListener> stateListener = new HashSet<IbbSocketStateListener>();

    private IbbSocketClient() {
        super();
        mTaskPool = Executors.newFixedThreadPool(5); // 创建一个最大维护线程数为5的线程池
        ConnectToServer();

    }

    public static synchronized IbbSocketClient getInstance() {
        if (null == ibbSocketClient) {
            ibbSocketClient = new IbbSocketClient();
        }
        return ibbSocketClient;
    }

    // 建立socket连接
    private synchronized void ConnectToServer() {
        connectThread = new Thread(new Runnable() {

            @Override
            public void run() {
                if (null == socketClient) {
                    try {
                        // 建立 Socket 连接
                        socketClient = new Socket(HOST, PORT);
                        System.out.println("建立  Socket 连接");
                        // 获取输出流
                        socketOutputStrem = new DataOutputStream(
                                socketClient.getOutputStream());
                        // 获取输入流
                        socketInputStrem = new DataInputStream(
                                socketClient.getInputStream());
                        getMessageFromServer();
                        connectionCallback();
                        handler.post(myRunnable);
                    } catch (UnknownHostException e) {
                        try {
                            Thread.sleep(1000 * 30);
                        } catch (InterruptedException e1) {
                        }
                        System.out.println("建立 错误 " + e.getMessage());
                        ConnectToServer();
                    } catch (IOException e) {
                        System.out.println("建立 错误 " + e.getMessage());
                        try {
                            Thread.sleep(1000 * 30);
                        } catch (InterruptedException e1) {
                        }
                        ConnectToServer();
                    }
                }
            }
        });

        connectThread.start();
        connectThread = null;
        System.gc();
    }

    /**
     * 异步上传数据
     *
     * @param cmd
     * @param msg
     */
    public void asyncSendMessage(short cmd, String msg) {

        SendMessageTask mTask = new SendMessageTask(cmd, msg);
        mTaskPool.execute(mTask);

    }

    // 发送消息线程
    private class SendMessageTask implements Runnable {
        private short cmd;
        private String msg;

        public SendMessageTask(short cmd, String msg) {
            super();
            this.cmd = cmd;
            this.msg = msg;
        }

        @Override
        public void run() {
            writeMessageToServer(cmd, msg);
        }

    }

    /**
     * 向服务器 发送 消息 ** 加同步锁 防止数据混乱
     *
     * @param cmd
     * @param msg
     */
    public synchronized void writeMessageToServer(short cmd, String msg) {

        // 判断输出流是否空
        if (null != socketOutputStrem) {
            byte[] personbyte = new byte[10];
            int personlen = 0;
            // 判断 内容是否为空
            if (null != msg && msg.length() > 0) {
                personbyte = msg.getBytes();
                personlen = personbyte.length + 6;
            } else {
                personlen = 6;
            }

            try {
                socketOutputStrem.writeByte(1);
                socketOutputStrem.writeByte(0);

                socketOutputStrem.writeByte(0);
                socketOutputStrem.writeByte(0);

                socketOutputStrem.writeByte(0);
                socketOutputStrem.writeByte(0);

                socketOutputStrem.writeShort(personlen);
                socketOutputStrem.writeShort(cmd);

                socketOutputStrem.writeByte(0);
                socketOutputStrem.writeByte(0);

                socketOutputStrem.writeByte(0);
                socketOutputStrem.writeByte(0);
                if (personlen > 6)
                    socketOutputStrem.write(personbyte);
                socketOutputStrem.flush();
                Log.i(HOST, "向服务器发送数据: " + new String(personbyte));

            } catch (IOException e) {
                Log.e(HOST, e.getMessage());

            }
        } else {
            Log.e(HOST, "输出流为空");
        }

    }

    private void getMsgFromServer() {
        String msg = "";
        short cmd = 0;
        try {
            if (socketInputStrem != null) {
                int outCnt = socketInputStrem.available();
                if (outCnt > 0) {
                    byte[] allbyte;
                    if (tmp_allbyte != null) {
                        byte[] leftbyte = new byte[outCnt];
                        socketInputStrem.read(leftbyte);
                        tmp_allbyte.write(leftbyte, 0, outCnt);

                        allbyte = tmp_allbyte.toByteArray();

                        outCnt = allbyte.length;
                    } else {
                        allbyte = new byte[outCnt];
                        socketInputStrem.read(allbyte);
                    }

                    if (outCnt >= 14) {
                        byte[] lenByte = subBytes(allbyte, 6, 2);
                        BigInteger tmphlen = new java.math.BigInteger(lenByte);
                        short lenInt = tmphlen.shortValue();

                        if (lenInt > (outCnt - 6)) {
                            tmp_allbyte = new ByteArrayOutputStream(
                                    lenInt + 6 + 10);
                            tmp_allbyte.write(allbyte);

                        } else {
                            tmp_allbyte = null;
                        }

                        byte[] cmdByte = subBytes(allbyte, 8, 2);

                        BigInteger tmph = new java.math.BigInteger(cmdByte);

                        short cmdInt = tmph.shortValue();
                        cmd = cmdInt;
                        byte[] strbyte = subBytes(allbyte, 14, outCnt - 14);
                        msg = new String(strbyte, "UTF-8");
                        Log.i(HOST, "收到 服务器 :cmd =" + cmd + "\n" + msg);
                        // 回调消息
                        for (IbbSocketMessageListener listener : msgListener) {
                            if (listener.supportCommand(cmdInt)) {
                                listener.onRecevice(msg, cmdInt);
                            }
                        }

                    }

                }
            } else {
                Log.e(HOST, "socketInputStrem :  为空");
                for (IbbSocketMessageListener listener : msgListener) {
                    listener.onFail(cmd, "socketInputStrem :  为空");
                }
            }
        } catch (Exception e) {
            Log.e(HOST, "收到服务器数据 : " + e.getMessage());
            for (IbbSocketMessageListener listener : msgListener) {
                listener.onFail(cmd, e.getMessage());
            }
        }
    }

    // 服务器发送 心跳
    private void writeHeart() {
        // String inputStr = "{\"id\": 4}";
        String str = "";
        if (null != IbbApplication.Ibb_User) {
            if (!TextUtils.isEmpty(IbbApplication.Ibb_User.getId())) {
                str = "{\"id\":\"" + IbbApplication.Ibb_User.getId() + "\"}";
                writeMessageToServer(CMD_HEART, str);
                Log.i(HOST, str);

            }
        }

    }

    private Handler handler = new Handler();

    private Runnable myRunnable = new Runnable() {
        public void run() {
            handler.postDelayed(this, 1000 * 30);
            if (null != socketClient) {
                writeHeart();
                connectionCallback();
            } else {
                // 先关闭 再从新链接
                closeSocketClent();
                ibbSocketClient = new IbbSocketClient();
            }

        }
    };

    private void getMessageFromServer() {

        getMessageThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    if (null != socketClient) {
                        getMsgFromServer();
                    } else {
                        // 先关闭 再从新链接
                        closeSocketClent();
                        ConnectToServer();
                    }
                }
            }
        });

        getMessageThread.start();
    }

    private byte[] subBytes(byte[] src, int begin, int count) {
        byte[] bs = new byte[count];
        for (int i = begin; i < begin + count; i++)
            bs[i - begin] = src[i];
        return bs;
    }

    /**
     *
     * socket 状态连接监听函数
     *
     *
     */
    private void connectionCallback() {
        Log.i(HOST, "连接状态的监听");
        boolean tag = getConnectionStatus();
        for (IbbSocketStateListener c : stateListener) {
            c.connectionStatus(tag);
        }

    }

    /*
     * 获取 socket 连接状态
     *
     * true 连接中 false 断开了连接
     */
    private synchronized Boolean getConnectionStatus() {
        try {
            // 发送1个字节的紧急数据,默认情况下,服务器端没有开启紧急数据处理,不影响正常通信
            if (null != socketClient && socketClient.isConnected()) {
                socketClient.sendUrgentData(0);
            } else {
                closeSocketClent();
                ConnectToServer();
                Log.i(HOST, "ConnectionStatus : " + false);
                return false;
            }
            Log.i(HOST, "ConnectionStatus : " + true);

            return true;
        } catch (Exception se) {
            closeSocketClent();
            ibbSocketClient = new IbbSocketClient();
            Log.i(HOST, "ConnectionStatus : " + false);
            return false;
        }
    }

    /**
     *
     * 添加 消息监听
     *
     * @param listener
     */

    public void addSocketMessageListener(IbbSocketMessageListener listener) {
        this.msgListener.add(listener);
    }

    /**
     *
     * 删除消息 监听
     *
     * @param listener
     */
    public void removeSocketMessageListener(IbbSocketMessageListener listener) {
        this.msgListener.remove(listener);
    }

    /**
     * socket 状态监听
     *
     * @param listener
     */
    public void addSocketStateListener(IbbSocketStateListener listener) {
        this.stateListener.add(listener);
    }

    /**
     * 移除 状态监听
     *
     * @param listener
     */
    public void removeSocketStateListener(IbbSocketStateListener listener) {
        this.stateListener.remove(listener);
    }

    // 关闭 Socket
    public void closeSocketClent() {

        try {
            if (socketOutputStrem != null) {
                socketOutputStrem.close();
                socketOutputStrem = null;
            }
        } catch (Exception e) {
            socketOutputStrem = null;
        }
        try {
            if (socketInputStrem != null) {
                socketInputStrem.close();
                socketInputStrem = null;
            }
        } catch (Exception e) {
            socketInputStrem = null;
        }

        try {
            if (socketClient != null) {
                socketClient.shutdownInput();
                socketClient.shutdownOutput();
                socketClient.close();
                socketClient = null;
            }
        } catch (IOException e) {
            socketClient = null;
        } finally {
            socketClient = null;
        }
    }

}


package com.kunyuanzhihui.ibb.socket;

/**
 *
 * 消息回调
 *
 * @author ving
 *
 */
public interface IbbSocketMessageListener {

    public void onRecevice(Object result, short cmd);

    public boolean supportCommand(short cmd);

    public void onFail(short cmd, String err);

}

package com.kunyuanzhihui.ibb.socket;

/**
 *  状态回调
 *
 * @author ving
 *
 */
public interface IbbSocketStateListener {
    public void connectionStatus(Boolean status);
}




加载中
0
每周精粹
每周精粹
http://www.oschina.net/code/snippet_194890_38941 这里比较清晰。
返回顶部
顶部