/**
* COPYRIGHT (C) 2010 LY. ALL RIGHTS RESERVED.
*
* No part of this publication may be reproduced, stored in a retrieval system,
* or transmitted, on any form or by any means, electronic, mechanical, photocopying,
* recording, or otherwise, without the prior written permission of 3KW.
*
* Created By: zzqiang
* Created On: 2013-10-18
*
* Amendment History:
*
* Amended By Amended On Amendment Description
* ------------ ----------- ---------------------------------------------
*
**/
package com.core.sgip.client;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Iterator;
import java.util.List;
import org.apache.log4j.Logger;
import com.core.sgip.SGIPMsg;
import com.core.sgip.body.command.BindResp;
import com.core.sgip.body.command.Submit;
import com.core.sgip.body.command.SubmitResp;
import com.core.sgip.constant.ClientConstant;
import com.core.sgip.constant.SGIPConstant;
import com.core.sgip.factory.SGIPFactory;
import com.core.sgip.interf.MessageHandler;
public class SGIPClient {
private static Logger logger = Logger.getLogger(SGIPClient.class);
private static MessageHandler messageHandler = ClientConstant.SGIP_MSG_HANDLER;
public static void sendMsg(List<String> listUserNumber, String content) throws Exception
{
// 开始通信
if (ClientConstant.IS_NIO.equalsIgnoreCase("y"))
{
sendMsg(listUserNumber, content, true);
} else
{
sendMsg(listUserNumber, content, false);
}
}
/**
* 发送消息
*
* @param listUserNumber
* @param content
* @throws Exception
*/
public static void sendMsg(List<String> listUserNumber, String content,boolean isNIO) throws Exception
{
if (null == listUserNumber || 0 == listUserNumber.size()
|| null == content || 0 == content.length())
{
return;
}
// 验证号码是否前面有86
for (int i = 0 ; i < listUserNumber.size(); i++)
{
String un = listUserNumber.get(i);
if(!un.startsWith("86"))
{
listUserNumber.set(i, "86" + un);
}
}
if(isNIO)
{
startNioCommu(listUserNumber, content);
}else
{
startCommunication(listUserNumber, content);
}
}
private static void startNioCommu(List<String>listUserNumber,String content) throws Exception
{
int size = listUserNumber.size();
int multi = 1;
int fromIndex = 0;
int toIndex = size;
if(size > ClientConstant.SGIP_SUBMIT_MAX_USER_NUMBER)
{
multi = size / ClientConstant.SGIP_SUBMIT_MAX_USER_NUMBER + (size % ClientConstant.SGIP_SUBMIT_MAX_USER_NUMBER > 0 ? 1 : 0);
}
for(int i = 0; i < multi; i++)
{
toIndex = fromIndex + ClientConstant.SGIP_SUBMIT_MAX_USER_NUMBER;
toIndex = toIndex > size ? size : toIndex;
List<String> tempUserNumbers = listUserNumber.subList(fromIndex, toIndex);
fromIndex = (i+1) * ClientConstant.SGIP_SUBMIT_MAX_USER_NUMBER;
startNIOCommunication(tempUserNumbers, content);
logger.debug("------------------------------");
}
}
private static boolean headKey(SelectionKey key ,List<String> listUserNumber,String content) throws Exception
{
boolean quit = false;
try
{
if (key.isConnectable())
{
SocketChannel socketChannel = (SocketChannel) key.channel();
// 由于非阻塞模式,connect只管发起连接请求,finishConnect()方法会阻塞到链接结束并返回是否成功
// 另外还有一个isConnectionPending()返回的是是否处于正在连接状态(还在三次握手中)
if (socketChannel.finishConnect())
{
// 链接成功了可以做一些自己的处理,略
logger.debug("********* nio socket connect success **********");
// 处理完后必须吧OP_CONNECT关注去掉,改为关注OP_READ
key.interestOps(SelectionKey.OP_WRITE);
key.attach(SGIPConstant.SGIP_BIND);
}
}
if (key.isReadable())
{
logger.debug("****************nio socket into readable ********");
ByteArrayOutputStream baos = new ByteArrayOutputStream();
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
try
{
byte[] bytes;
int size = socketChannel.read(buffer);
if (size >= 0)
{
buffer.flip();
bytes = new byte[size];
buffer.get(bytes);
baos.write(bytes);
buffer.clear();
}
bytes = baos.toByteArray();
int result = 0;
long receiveCommandId = (Long) key.attachment();
if (receiveCommandId == SGIPConstant.SGIP_BIND_RESP)
{
SGIPMsg returnMsg = SGIPFactory.constructSGIPMsg(bytes);
logger.debug("*********end receive bindResp*********returnMsg="
+ returnMsg);
if (null != returnMsg.getCommand()
&& ((BindResp) returnMsg.getCommand())
.getResult() == 0)
{
key.attach(SGIPConstant.SGIP_SUBMIT);
} else
{
BindResp br = (BindResp) returnMsg.getCommand();
result = br.getResult();
logger.debug("****************** bindResp's result: "
+ result);
String errorMsg = SGIPConstant.ERROR_CODE
.get(result + "");
logger.error("错误消息:" + errorMsg);
quit = true;
throw new Exception(errorMsg);
}
} else if (receiveCommandId == SGIPConstant.SGIP_SUBMIT_RESP)
{
SGIPMsg returnMsg = SGIPFactory.constructSGIPMsg(bytes);
logger.debug("*********end receive submitResp*********returnMsg="
+ returnMsg);
if (null != returnMsg.getCommand()
&& ((SubmitResp) returnMsg.getCommand())
.getResult() == 0)
{
key.attach(SGIPConstant.SGIP_UNBIND);//判断集合是否处理完成 没有完成继续发送
} else
{
SubmitResp br = (SubmitResp) returnMsg.getCommand();
result = br.getResult();
logger.debug("****************** SubmitResp's result: "
+ result);
String errorMsg = SGIPConstant.ERROR_CODE
.get(result + "");
logger.error("错误消息:" + errorMsg);
quit = true;
throw new Exception(errorMsg);
}
} else if (receiveCommandId == SGIPConstant.SGIP_UNBIND_RESP)
{
SGIPMsg returnMsg = SGIPFactory.constructSGIPMsg(bytes);
logger.debug("*********end receive unbindResp*********returnMsg="
+ returnMsg);
}
if (receiveCommandId != SGIPConstant.SGIP_UNBIND_RESP)
{
key.interestOps(SelectionKey.OP_WRITE);
} else
{
quit = true;
socketChannel.close();
}
logger.debug("********* quit=" + quit);
} catch (Exception e)
{
quit = true;
logger.error("Error", e);
throw e;
} finally
{
baos.close();
if (buffer != null)
{
buffer = null;
}
}
}
if (quit)
{
return quit;
}
if (key.isWritable())
{
logger.debug("****************nio socket into writable ********");
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.configureBlocking(false);
long sendCommandId = (Long) key.attachment();
if (sendCommandId == SGIPConstant.SGIP_BIND)
{
SGIPMsg sgipMsg = SGIPFactory
.getSGIPMsg(SGIPConstant.SGIP_BIND);
ByteBuffer block = ByteBuffer.wrap(sgipMsg.getByteData());
logger.debug("*********send bind *********sgipMsg="
+ sgipMsg);
socketChannel.write(block);
key.attach(SGIPConstant.SGIP_BIND_RESP);
} else if (sendCom