
IoService:IoService相当于是Mina的Socket层,负责所有SocketIO事件的注册,select,分发等。它位于org.apache.mina.core.service包内,它有两个子接口,表示Server端接收方的IoAcceptor和Client发起方的IoConnector,以及所有的实现类:
NioDatagramAcceptor/NioDatagramConnector:基于UDP的实现
NioSocketAcceptor/NioSocketConnector:基于TCP的实现
VmPipeAcceptor/VmPipeConnector:基于Pipe的实现
服务端初始化的方式一般是:
- SocketAcceptor acceptor = new NioSocketAcceptor();
- acceptor.setReuseAddress( true );
- acceptor.setHandler(new EchoProtocolHandler());
- acceptor.bind(new InetSocketAddress(PORT));
SocketAcceptor acceptor = new NioSocketAcceptor();
acceptor.setReuseAddress( true );
acceptor.setHandler(new EchoProtocolHandler());
acceptor.bind(new InetSocketAddress(PORT));
初始化NioSocketAcceptor的时候会做主要事情:
1.需要通过具体的实现提供的transportMetaData,来判断其中规定的session配置类(eg:DefaultSocketSessionConfig)是否来自接口SessionConfig.
- public NioSocketAcceptor() {
- super(new DefaultSocketSessionConfig(), NioProcessor.class);
- ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
- }
public NioSocketAcceptor() {
super(new DefaultSocketSessionConfig(), NioProcessor.class);
((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}
2.默认启动了一个SimpleIoProcessorPool来包装NioProcessor.
- protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
- Class<? extends IoProcessor<T>> processorClass) {
- this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass),
- true);
- }
protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
Class<? extends IoProcessor<T>> processorClass) {
this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass),
true);
}
而SimpleIoProcessorPool默认是启动CPU个数 +1个 NioProcess,并以数组形式管理。
- private static final int DEFAULT_SIZE = Runtime.getRuntime()
- .availableProcessors() + 1;
- private final IoProcessor<T>[] pool;
- public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType) {
- this(processorType, null, DEFAULT_SIZE);
- }
- public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType,
- Executor executor, int size) {
- if (processorType == null) {
- throw new NullPointerException("processorType");
- }
- if (size <= 0) {
- throw new IllegalArgumentException("size: " + size
- + " (expected: positive integer)");
- }
- if (executor == null) {
- this.executor = executor = Executors.newCachedThreadPool();
- this.createdExecutor = true;
- } else {
- this.executor = executor;
- this.createdExecutor = false;
- }
- pool = new IoProcessor[size];
- boolean success = false;
- Constructor<? extends IoProcessor<T>> processorConstructor = null;
- boolean usesExecutorArg = true;
- ....
- }
private static final int DEFAULT_SIZE = Runtime.getRuntime()
.availableProcessors() + 1;
private final IoProcessor<T>[] pool;
public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType) {
this(processorType, null, DEFAULT_SIZE);
}
public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType,
Executor executor, int size) {
if (processorType == null) {
throw new NullPointerException("processorType");
}
if (size <= 0) {
throw new IllegalArgumentException("size: " + size
+ " (expected: positive integer)");
}
if (executor == null) {
this.executor = executor = Executors.newCachedThreadPool();
this.createdExecutor = true;
} else {
this.executor = executor;
this.createdExecutor = false;
}
pool = new IoProcessor[size];
boolean success = false;
Constructor<? extends IoProcessor<T>> processorConstructor = null;
boolean usesExecutorArg = true;
....
}
在这里并没有指定Executor,因此用默认的Executors.newCachedThreadPool().这个ThreadPool管理着NioSocketAcceptor和所有的IoProcessor处理线程.
在这里,NioSocketAcceptor可以理解为一个Server,而NioProcessor就是其中的并行的处理程序。NioProcessor默认个数是CPU个数+1,这正好是属于Processor的selector的个数,它们专门处理OP_READ/OP_WRITE事件。在NioSocketAcceptor里面也有个Selector,它是专门用作处理OP_ACCEPT事件.这个分离设计使OP_ACCEPT不被读写事件所影响。因此,采用默认设置,MINA会启动 CPU个数+2 Selector.
每个NioSocketAcceptor内部有个Acceptor线程对象,NioSocketAcceptor会使用传入的或者生成的Executor来执行这个Acceptor。
每个NioProcessor内部也有个Processor线程对象,NioProcessor会使用传入的或者生成的Executor来执行这个Processor。
3.IoService初始化时,建立与监听器容器IoServiceListenerSupport的双向关联,注册匿名内部实现serviceActivationListener到监听器容器.
客户端的启动方式:
- NioSocketConnector connector = new NioSocketConnector();
- connector.getFilterChain().addLast( "logger", new LoggingFilter() );
- connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
- connector.setConnectTimeout(30);
- connector.setHandler(new TimeClientHandler());
- ConnectFuture cf = connector.connect(
- new InetSocketAddress("127.0.0.1", 8833));
NioSocketConnector connector = new NioSocketConnector();
connector.getFilterChain().addLast( "logger", new LoggingFilter() );
connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
connector.setConnectTimeout(30);
connector.setHandler(new TimeClientHandler());
ConnectFuture cf = connector.connect(
new InetSocketAddress("127.0.0.1", 8833));
其基本过程跟NioSocketAcceptor差不多.
IoFilter
IoFilter也是一个接口,有点像ServletFilter,在事件被 IoHandler处理之前或之后进行一些特定的操作,比如记录日志(LoggingFilter)、压缩数据(CompressionFilter),SSL加密(SSLFilter)黑名单过滤(BlackListFilter),心跳检测(KeepAliveFilter)等等。这些都是MINA自带的。
实际项目中,有个东西肯定是存在的,就是ProtocolCodecFilter或者跟它类似的filter.我们不可能让IoHandler直接去操作字节流,所以在这一层,一定要把二进制流转成java对象或者文本,这样才能不枉费MINA的良好分层设计。
ProtocolCodecFilter
使用ProtocolCodecFilter很简单,我们只要把ProtocolCodecFilter加入到FilterChain就可以了,但是我们需要提供一个ProtocolCodecFactory。其实ProtocolCodecFilter仅仅是实现了过滤器部分的功能,它会将最终的转换工作,交给从ProtocolCodecFactory获得的Encode和Decode。如果我们需要编写自己的ProtocolCodec,就应该从 ProtocolCodecFactory入手。MINA内置了几个ProtocolCodecFactory,比较常用的就是 ObjectSerializationCodecFactory和TextLineCodecFactory。
ObjectSerializationCodecFactory是Java Object序列化之后的内容直接跟ByteBuffer互相转化,比较适合两端都是Java的情况使用。在笔者所做的自定义协议环境下,在这里就需要重写ObjectSerializationCodecFactory。
TextLineCodecFactory就是 String跟ByteBuffer的转化,比如HTTP,RTSP这类纯文本协议。
IoFilter的顺序问题:IoFilter是有加入顺序的,例如,先加入LoggingFilter再加入ProtocolCodecFilter,和先加入ProtocolCodecFilter再加入LoggingFilter的效果是不一样的,前者 LoggingFilter写入日志的内容是ByteBuffer,而后者写入日志的是转换后具体的类,例如String。
ExecutorFilter
有一个比较重要的过滤器就是ExecutorFilter。前面已经知道,每个NioProcess对应的是一个Process内部线程对象,在Process的run方法里面会循环的调用process方法。也就是第一个事件没有执行完,第二个事件不会执行,如果某次消息处理太耗时,就会导致其他消息等待,整体的吞吐量下降。ExecutorFilter的的作用就是将同一个类型的消息合并起来按顺序调用
- public final void messageReceived(NextFilter nextFilter, IoSession session,
- Object message) {
- if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
- IoFilterEvent event = new IoFilterEvent(nextFilter,
- IoEventType.MESSAGE_RECEIVED, session, message);
- fireEvent(event);
- } else {
- nextFilter.messageReceived(session, message);
- }
- }
public final void messageReceived(NextFilter nextFilter, IoSession session,
Object message) {
if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
IoFilterEvent event = new IoFilterEvent(nextFilter,
IoEventType.MESSAGE_RECEIVED, session, message);
fireEvent(event);
} else {
nextFilter.messageReceived(session, message);
}
}
从上面的代码可以看到,如果满足事件集合条件,就组装成IoFilterEvent交给Executor异步执行。这样在filter上就不会因为某个事件filter执行时间过长而block后面的事件。
- protected void fireEvent(IoFilterEvent event) {
- executor.execute(event);
- }
protected void fireEvent(IoFilterEvent event) {
executor.execute(event);
}
- public void fire() {
- IoSession session = getSession();
- NextFilter nextFilter = getNextFilter();
- IoEventType type = getType();
- if ( LOGGER.isDebugEnabled()) {
- LOGGER.debug( "Firing a {} event for session {}",type, session.getId() );
- }
- switch (type) {
- case MESSAGE_RECEIVED:
- Object parameter = getParameter();
- nextFilter.messageReceived(session, parameter);
- break;
- case MESSAGE_SENT:
- WriteRequest writeRequest = (WriteRequest)getParameter();
- nextFilter.messageSent(session, writeRequest);
- break;
- case WRITE:
- writeRequest = (WriteRequest)getParameter();
- nextFilter.filterWrite(session, writeRequest);
- break;
- case CLOSE:
- nextFilter.filterClose(session);
- break;
- case EXCEPTION_CAUGHT:
- Throwable throwable = (Throwable)getParameter();
- nextFilter.exceptionCaught(session, throwable);
- break;
- case SESSION_IDLE:
- nextFilter.sessionIdle(session, (IdleStatus) getParameter());
- break;
- case SESSION_OPENED:
- nextFilter.sessionOpened(session);
- break;
- case SESSION_CREATED:
- nextFilter.sessionCreated(session);
- break;
- case SESSION_CLOSED:
- nextFilter.sessionClosed(session);
- break;
- default:
- throw new IllegalArgumentException("Unknown event type: " + type);
- }
- if ( LOGGER.isDebugEnabled()) {
- LOGGER.debug( "Event {} has been fired for session {}", type, session.getId() );
- }
- }
public void fire() {
IoSession session = getSession();
NextFilter nextFilter = getNextFilter();
IoEventType type = getType();
if ( LOGGER.isDebugEnabled()) {
LOGGER.debug( "Firing a {} event for session {}",type, session.getId() );
}
switch (type) {
case MESSAGE_RECEIVED:
Object parameter = getParameter();
nextFilter.messageReceived(session, parameter);
break;
case MESSAGE_SENT:
WriteRequest writeRequest = (WriteRequest)getParameter();
nextFilter.messageSent(session, writeRequest);
break;
case WRITE:
writeRequest = (WriteRequest)getParameter();
nextFilter.filterWrite(session, writeRequest);
break;
case CLOSE:
nextFilter.filterClose(session);
break;
case EXCEPTION_CAUGHT:
Throwable throwable = (Throwable)getParameter();
nextFilter.exceptionCaught(session, throwable);
break;
case SESSION_IDLE:
nextFilter.sessionIdle(session, (IdleStatus) getParameter());
break;
case SESSION_OPENED:
nextFilter.sessionOpened(session);
break;
case SESSION_CREATED:
nextFilter.sessionCreated(session);
break;
case SESSION_CLOSED:
nextFilter.sessionClosed(session);
break;
default:
throw new IllegalArgumentException("Unknown event type: " + type);
}
if ( LOGGER.isDebugEnabled()) {
LOGGER.debug( "Event {} has been fired for session {}", type, session.getId() );
}
}
实际上MINA是用filterChain的方式顺序调用所有注册的filter.默认的DefaultIoFilterChain
- if (readBytes > 0) {
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireMessageReceived(buf);
if (readBytes > 0) {
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireMessageReceived(buf);
然后就会一个filter接一个的传下去,直到最后的尾调用IoHandler.
IoHandler
IoHanlder则是所有事件最终产生响应的位置,一般用来处理业务比如分析数据,数据库操作等。
- void sessionCreated(IoSession session) throws Exception;
- void sessionOpened(IoSession session) throws Exception;
- void sessionClosed(IoSession session) throws Exception;
- void sessionIdle(IoSession session, IdleStatus status) throws Exception;
- void exceptionCaught(IoSession session, Throwable cause) throws Exception;
- void messageReceived(IoSession session, Object message) throws Exception;
- void messageSent(IoSession session, Object message) throws Exception;
void sessionCreated(IoSession session) throws Exception;
void sessionOpened(IoSession session) throws Exception;
void sessionClosed(IoSession session) throws Exception;
void sessionIdle(IoSession session, IdleStatus status) throws Exception;
void exceptionCaught(IoSession session, Throwable cause) throws Exception;
void messageReceived(IoSession session, Object message) throws Exception;
void messageSent(IoSession session, Object message) throws Exception;
messageReceived是接收客户端消息的事件,我们应该在这里实现业务逻辑。
messageSent是服务器发送消息的事件,一般情况下不会使用它。
sessionClosed是客户端断开连接的事件,可以在这里进行一些资源回收等操作。sessionCreated和sessionOpened,两者稍有不同,sessionCreated是由I/O processor线程触发的,而sessionOpened在其后,由业务线程触发的,由于MINA的I/O processor线程非常少,因此如果我们真的需要使用sessionCreated,也必须是耗时短的操作,一般情况下,我们应该把业务初始化的功能放在sessionOpened事件中。
IoSession
IoSession是一个接口,它提供了对当前连接的操作功能,还有用户定义属性的存储功能,就像HttpSession。一个IoSession就代表一个Client与Server的连接。IoSession是线程安全的,第一层子类AbstractIoSession内部用到了大量的lock机制,因此可以放心的使用而不用担心并发问题。常用的方法:
- //
- WriteFuture write(Object message)
- CloseFuture close();
- //属性相关操作
- Object getAttribute(String key);
- Object setAttribute(String key, Object value);
- Object removeAttribute(String key);
- Set getAttributeKeys();
- //连接状态操作
- boolean isConnected();
- boolean isClosing();
- SocketAddress getRemoteAddress();
- boolean isIdle(IdleStatus status);
//
WriteFuture write(Object message)
CloseFuture close();
//属性相关操作
Object getAttribute(String key);
Object setAttribute(String key, Object value);
Object removeAttribute(String key);
Set getAttributeKeys();
//连接状态操作
boolean isConnected();
boolean isClosing();
SocketAddress getRemoteAddress();
boolean isIdle(IdleStatus status);
主要方法可以分为三类:
连接操作
最主要的方法有两个,向客户端发送消息和断开连接。可以看的出,write接受的变量是一个Object,实际传入的类型应该是最后一个filter处理后的结果。最初始的状态下,message是一个IoBuffer.
- IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireMessageReceived(buf);
IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());IoFilterChain filterChain = session.getFilterChain();
filterChain.fireMessageReceived(buf);
另外注意的是,write返回的WriteFuture类,这样调用IoSession.write方法是不会阻塞的。调用了write方法之后,消息内容会发到底层等候发送,至于什么时候发出,就看线程的调度处理了。非常典型的Future模式的应用。
如果需要确认消息是否成功的发送出去了,只需要wait一下,然后检查下future的状态。
- WriteFuture future = session.write(xx);
- future.awaitUninterruptibly();
- boolean isCompleted = future.isWritten()
WriteFuture future = session.write(xx);
future.awaitUninterruptibly();
boolean isCompleted = future.isWritten()
当调用write的时候,会通过跟read相反的次序依次传递,直至由IoService负责把数据发送给客户端.
如果在很短的时间里,对同一个IoSession进行了两次write操作,客户端有可能只收到一条消息,而这条消息就是服务器发出的两条消息前后接起来。这样的设计可以在高并发的时候节省网络开销。
属性存储操作
一般用于状态维护。参考HttpSession的作用。跟Attribute相关的4个方法都是跟这个相关的。
连接状态
最后面的4个方法全是连接属性的查询
、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、
基本介绍:
Apache MINA 2是一个开发高性能和高可伸缩性网络应用程序的网络应用框架。它提供了一个抽象的事件驱动的异步API,可以使用TCP/IP、UDP/IP、串口和虚拟机内部的管道等传输方式。Apache MINA 2可以作为开发网络应用程序的一个良好基础。
Mina 的API 将真正的网络通信与我们的应用程序隔离开来,你只需要关心你要发送、
接收的数据以及你的业务逻辑即可。
mina的基本架构:
在图中的模块链中,IoService 便是应用程序的入口,相当于我们前面代码中的 IoAccepter,IoAccepter 便是 IoService 的一个扩展接口。IoService 接口可以用来添加多个 IoFilter,这些 IoFilter 符合责任链模式并由 IoProcessor 线程负责调用。而 IoAccepter 在 ioService 接口的基础上还提供绑定某个通讯端口以及取消绑定的接口。ioHandler则为应用逻辑处理类。
主要类以及接口:
(1.)IoService:这个接口在一个线程上负责套接字的建立,拥有自己的Selector,监
听是否有连接被建立。
(2.)IoProcessor:这个接口在另一个线程上负责检查是否有数据在通道上读写,也就是
说它也拥有自己的Selector,这是与我们使用JAVA NIO编码时的一个不同之处,
通常在JAVA NIO编码中,我们都是使用一个Selector,也就是不区分IoService
与IoProcessor两个功能接口。另外,IoProcessor负责调用注册在IoService上
的过滤器,并在过滤器链之后调用IoHandler。
(3.)IoFilter:这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤、
数据的编码(write方向)与解码(read方向)等功能,其中数据的encode与decode
是最为重要的、也是你在使用Mina时最主要关注的地方。
(4.)IoHandler:这个接口负责编写业务逻辑,也就是接收、发送数据的地方。
(5.)IoSession:Session可以理解为服务器与客户端的特定连接,该连接由服务器地址、端口以及客户端地址、端口来决定。客户端发起请求时,指定服务器地址和端口,客户端也会指定或者根据网络路由信息自动指定一个地址、自动分配一个端口。这个地址、端口对构成一个Session。Session是服务器端对这种连接的抽象,MINA对其进行了封装,定义了IoSession接口,用来代表客户端与服务器的连接,在服务器端来指代客户端,实现对客户端的操作、绑定与客户端有关的信息与对象。通过利用Session的这个概念,编写程序时就可以在服务器端非常方便地区分出是当前处理的是哪个客户端的请求、维持客户端的状态信息、可以实现客户端之间相互通讯。
一图胜千言,MINA的核心类图:
服务端代码大致如下:
Java代码
- //初始化Acceptor—可以不指定线程数量,MINA2里面默认是CPU数量+2
- //是你的工作主线程
- NioSocketAcceptor acceptor = new NioSocketAcceptor(5);
- //建立线程池
- java.util.concurrent.Executor threadPool = newFixedThreadPool(1500);
- //加入过滤器(Filter)到Acceptor
- acceptor.getFilterChain().addLast(“exector”, new ExecutorFilter(threadPool));
- //编码解码器
- acceptor.getFilterChain().addLast(“codec”,new ProtocolCodecFilter(new WebDecoder(),new XmlEncoder()));
- //日志
- LoggingFilter filter = new LoggingFilter();
- filter.setExceptionCaughtLogLevel(LogLevel.DEBUG);
- filter.setMessageReceivedLogLevel(LogLevel.DEBUG);
- filter.setMessageSentLogLevel(LogLevel.DEBUG);
- filter.setSessionClosedLogLevel(LogLevel.DEBUG);
- filter.setSessionCreatedLogLevel(LogLevel.DEBUG);
- filter.setSessionIdleLogLevel(LogLevel.DEBUG);
- filter.setSessionOpenedLogLevel(LogLevel.DEBUG);
- acceptor.getFilterChain().addLast(“logger”, filter);
- //设置的是主服务监听的端口可以重用
- acceptor.setReuseAddress(true);
- //设置每一个非主监听连接的端口可以重用
- acceptor.getSessionConfig().setReuseAddress(true);
- //MINA2中,当启动一个服务端的时候,要设定初始化缓冲区的长度,如果不设置这个值,系统默认为2048,当客户端发过来的消息超过设定值的时候,
- //MINA2的机制是分段接受的,将字符是放入缓冲区中读取,所以在读取消息的时候,需要判断有多少次。这样的好处就是可以节省通讯的流量。
- //设置输入缓冲区的大小
- acceptor.getSessionConfig().setReceiveBufferSize(1024);
- //设置输出缓冲区的大小
- acceptor.getSessionConfig().setSendBufferSize(10240);
- //设置为非延迟发送,为true则不组装成大包发送,收到东西马上发出
- acceptor.getSessionConfig().setTcpNoDelay(true);
- //设置主服务监听端口的监听队列的最大值为100,如果当前已经有100个连接,再新的连接来将被服务器拒绝
- acceptor.setBacklog(100);
- acceptor.setDefaultLocalAddress(new InetSocketAddress(port));
- //加入处理器(Handler)到Acceptor
- acceptor.setHandler(new YourHandler());
- acceptor.bind();
//初始化Acceptor—可以不指定线程数量,MINA2里面默认是CPU数量+2 //是你的工作主线程 NioSocketAcceptor acceptor = new NioSocketAcceptor(5); //建立线程池 java.util.concurrent.Executor threadPool = newFixedThreadPool(1500); //加入过滤器(Filter)到Acceptor acceptor.getFilterChain().addLast("exector", new ExecutorFilter(threadPool)); //编码解码器 acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new WebDecoder(),new XmlEncoder())); //日志 LoggingFilter filter = new LoggingFilter(); filter.setExceptionCaughtLogLevel(LogLevel.DEBUG); filter.setMessageReceivedLogLevel(LogLevel.DEBUG); filter.setMessageSentLogLevel(LogLevel.DEBUG); filter.setSessionClosedLogLevel(LogLevel.DEBUG); filter.setSessionCreatedLogLevel(LogLevel.DEBUG); filter.setSessionIdleLogLevel(LogLevel.DEBUG); filter.setSessionOpenedLogLevel(LogLevel.DEBUG); acceptor.getFilterChain().addLast("logger", filter); //设置的是主服务监听的端口可以重用 acceptor.setReuseAddress(true); //设置每一个非主监听连接的端口可以重用 acceptor.getSessionConfig().setReuseAddress(true); //MINA2中,当启动一个服务端的时候,要设定初始化缓冲区的长度,如果不设置这个值,系统默认为2048,当客户端发过来的消息超过设定值的时候, //MINA2的机制是分段接受的,将字符是放入缓冲区中读取,所以在读取消息的时候,需要判断有多少次。这样的好处就是可以节省通讯的流量。 //设置输入缓冲区的大小 acceptor.getSessionConfig().setReceiveBufferSize(1024); //设置输出缓冲区的大小 acceptor.getSessionConfig().setSendBufferSize(10240); //设置为非延迟发送,为true则不组装成大包发送,收到东西马上发出 acceptor.getSessionConfig().setTcpNoDelay(true); //设置主服务监听端口的监听队列的最大值为100,如果当前已经有100个连接,再新的连接来将被服务器拒绝 acceptor.setBacklog(100); acceptor.setDefaultLocalAddress(new InetSocketAddress(port)); //加入处理器(Handler)到Acceptor acceptor.setHandler(new YourHandler()); acceptor.bind();
客户端代码大致如下:
客户端的初始化和服务器端其实是一样的,就是初始化类不一样,客户端是作为发送者的。
Java代码
- SocketConnector connector = new NioSocketConnector();
- connector.getFilterChain().addLast(“codec”, new ProtocolCodecFilter(new XmlCodecFactory(Charset.forName(charsetName), null, sertType)));
- //指定线程池
- connector.getFilterChain().addLast(“executor”, new ExecutorFilter());
- //指定业务处理类
- connector.setHandler(this);
SocketConnector connector = new NioSocketConnector(); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new XmlCodecFactory(Charset.forName(charsetName), null, sertType))); //指定线程池 connector.getFilterChain().addLast("executor", new ExecutorFilter()); //指定业务处理类 connector.setHandler(this);
在IoHandler中定义了一些事件方法,比如messageReceived,sessionOpend,sessionCreated,exceptionCaught等,用户只需要在方法内部实现对应的处理逻辑即可。
心跳机制:
mina自身带的心跳机制好处在于,它附加了处理,让心跳消息不会传到业务层,在底层就完成了。
事件模型:
MINA可以看成是事件驱动的。通常在网络通讯中,可以将整个过程划分为几个基本的阶段,如建立连接、数据通信、关闭连接。
数据通信一般包括数据的发送和接收,由于在通信过程中,可能要多次发送和接收数据,以进行不同的业务交互。
不可能一直都接收和发送数据,因此就有Idle出现,在MINA中,如果在设定的时间内没有数据发送或接收,那么就会触发一个Idle事件。
附录:对与协议的理解,摘自ppt
http协议
对应于应用层
tcp协议
对应于传输层
ip协议
对应于网络层
三者本质上没有可比性。 何况HTTP协议是基于TCP连接的。
TCP/IP是传输层协议,主要解决数据如何在网络中传输;而HTTP是应用层协议,主要解决如何包装数据。
我们在传输数据时,可以只使用传输层(TCP/IP),但是那样的话,由于没有应用层,便无法识别数据内容,如果想要使传输的数据有意义,则必须使用应用层协议,应用层协议很多,有HTTP、FTP、TELNET等等,也可以自己定义应用层协议。WEB使用HTTP作传输层协议,以封装HTTP文本信息,然后使用TCP/IP做传输层协议将它发送到网络上。
Socket是对TCP/IP协议的封装,Socket本身并不是协议,而是一个调用接口(API),通过Socket,我们才能使用TCP/IP协议。
这也就不难理解为什么有些内部的系统调用采用socket,而不是http。
本身web的这种系统,HTTP已经将报文信息封装好了。各种JEE的WEB框架,都能够直接获取报文中的信息,而socket方式,可以双方很方便的自己定义报文的内容,加密方式等等。
URL:应用层
SOCKET
:网络传输层
Socket(套接字)
是一种基于网络传输层的远程进程间通信编程接口,有操作系统提供一个套接字包含,主机名、端口号两个部分。其中端口号是0~65535之间的一个整数。通常小于1024的端口号被统一分配给特定的网络服务,如ftp服务,21;http服务, 80;SMTP服务,25;POP3服务,110;telnet服务,23等等
套接字(socket)是通信的基石,是支持TCP/IP协议的网络通信的基本操作单元。它是网络通信过程中端点的抽象表示,包含进行网络通信必须的五种信息:连接使用的协议,本地主机的IP地址,本地进程的协议端口,远地主机的IP地址,远地进程的协议端口。
应用层通过传输层进行数据通信时,TCP会遇到同时为多个应用程序进程提供并发服务的问题。多个TCP连接或多个应用程序进程可能需要通过同一个
TCP协议端口传输数据。为了区别不同的应用程序进程和连接,许多计算机操作系统为应用程序与TCP/IP协议交互提供了套接字(Socket)接口。
由于通常情况下Socket连接就是TCP连接,因此Socket连接一旦建立,通信双方即可开始相互发送数据内容,直到双方连接断开。但在实际网络应用中,客户端到服务器之间的通信往往需要穿越多个中间节点,例如路由器、网关、防火墙等,大部分防火墙默认会关闭长时间处于非活跃状态的连接而导致
Socket 连接断连,因此需要通过轮询告诉网络,该连接处于活跃状态。
而HTTP连接使用的是“请求—响应”的方式,不仅在请求时需要先建立连接,而且需要客户端向服务器发出请求后,服务器端才能回复数据。
很多情况下,需要服务器端主动向客户端推送数据,保持客户端与服务器数据的实时与同步。此时若双方建立的是Socket连接,服务器就可以直接将数据传送给客户端;若双方建立的是HTTP连接,则服务器需要等到客户端发送一次请求后才能将数据传回给客户端,因此,客户端定时向服务器端发送连接请求,不仅可以保持在线,同时也是在“询问”服务器是否有新的数据,如果有就将数据传给客户端。
、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、、
最近由于项目本身的需要,正在进行Mina框架的学习,并且将其整合到正在开发的系统中。下面将会根据实际的工作情况分享一些心得感受。
一、 项目需求:
我们正在开发的系统,现在主要分为两个部分,正两个部分之间需要使用TCP Socket进行网络通讯。具体开发的难点是发送消息的部分。由于需要考虑到每次创建连接时造成的系统开销,所以使用的连接方式必须是长连接,就是保存连接,不能断开。而且在连接的另一端发生当机的情况下能够及时回复,不会就此丢掉和这一端的通讯。在连接发送消息后,能够判定另一端无法及时收到消息的情况,并且做出正确处理。
综上所述,能够整理出如下三条需求:
l 两端的连接通讯必须是长连接,不能每次重新建立。
l 在连接断开的情况下能够及时处理,并能有效恢复。
l 发送数据需要有超时机制。
我们这一阶段的Mina框架的使用便是围绕着这三条需求展开的。
二、 Mina框架相关知识简介:
在正式开始Mina框架的实际应用前,先简单介绍一些Mina的基本知识,以便于下面的实用场景分析。中间会穿插架构图和示例代码。
在介绍架构之前先认识几个接口:
IoAccepter 相当于网络应用程序中的服务器端
IoConnector 相当于客户端
IoSession 当前客户端到服务器端的一个连接实例
IoHandler 业务处理逻辑
IoFilter 过滤器用于悬接通讯层接口与业务层接口
然后可以看一下Mina的架构图,如图2-1Mina框架图所示。
在图中的模块链中,IoService便是应用程序的入口,相当于基本接口中的IoAccepter,IoAccepter便是IoService的一个扩展接口。IoService接口可以用来添加多个IoFilter,这些IoFilter符合责任链模式并由IoProcessor线程负责调用。而IoAccepter在ioService接口的基础上还提供绑定某个通讯端口以及取消绑定的接口。在日常应用中,我们可以这样使用IoAccepter:
IoAcceptor acceptor = new SocketAcceptor(); |
相当于我们使用了 Socket 通讯方式作为服务的接入,当前版本的 Mina还提供了除SocketAccepter外的基于数据报文通讯的DatagramAccepter以及基于管道通讯的VmPipeAccepter。另外还包括串口通讯接入方式,目前基于串口通讯的接入方式已经在最新测试版的MINA中提供。我们也可以自行实现IoService接口来使用自己的通讯方式。
而在上图中最右端也就是IoHandler,这便是业务处理模块。我们的项目大部分的工作也就是在这个接口的实现类中完成。在业务处理类中不需要去关心实际的通讯细节,只管处理客户端传输过来的信息即可。编写Handler类就是使用Mina开发网络应用程序的重心所在,相当于Mina已经帮你处理了所有的通讯方面的细节问题。为了简化Handler类,MINA提供了IoHandlerAdapter类,此类仅仅是实现了IoHandler接口,但并不做任何处理。
一个IoHandler接口中具有如下一些方法(摘自Mina的API文档):
void exceptionCaught(IoSession session, Throwable cause) |
void messageReceived(IoSession session, Object message) |
void messageSent(IoSession session, Object message) |
void sessionClosed(IoSession session) |
void sessionCreated(IoSession session) |
void sessionIdle(IoSession session, IdleStatus status) |
void sessionOpened(IoSession session) |
前面我们提到IoService是负责底层通讯接入,而IoHandler是负责业务处理的。那么Mina架构图中的IoFilter作何用途呢?答案是我们想作何用途都可以。但是有一个用途却是必须的,那就是作为IoService和IoHandler之间的桥梁。IoHandler接口中最重要的一个方法是messageReceived,这个方法的第二个参数是一个Object型的消息,众所周知,Object是所有Java对象的基础,那到底谁来决定这个消息到底是什么类型呢?答案也就在这个IoFilter中。在我们的应用中,我们添加了一个IoFilter是new ProtocolCodecFilter(new TextLineCodecFactory()),这个过滤器的作用是将来自客户端输入的信息转换成一行行的文本后传递给IoHandler,因此我们可以在messageReceived中直接将msg对象强制转换成String对象。
而如果我们不提供任何过滤器的话,那么在messageReceived方法中的第二个参数类型就是一个byte的缓冲区,对应的类是org.apache.mina.common.ByteBuffer。虽然你也可以将解析客户端信息放在IoHandler中来做,但这并不是推荐的做法,使原来清晰的模型又模糊起来,变得IoHandler不只是业务处理,还得充当协议解析的任务。
Mina自身带有一些常用的过滤器,例如LoggingFilter(日志记录)、BlackListFilter(黑名单过滤)、CompressionFilter(压缩)、SSLFilter(SSL加密)等。
在我们的项目中,主要的工作是在发送消息的部分,所以Mina框架的实现主要是围绕着IoHandler和IoSession进行展开。根据上面的讲解,在实际使用中,可以用下面的代码创建一个简单的用户发送消息的客户端。
SocketConnector connector =new SocketConnector();
IoFilter filter =new ProtocolCodecFilter(new TextLineCodecFactory());
connector.getFilterChain().addLast("audit", filter);
SocketAddress address =new InetSocketAddress(ip, port);
ConnectFuture future = connector.connect(address,new ClientHandler());
future.join();
if (!future.isConnected()) {
logger.error("不能建立网络连接。" + address);
returnnull;
}
session = future.getSession();
这样便可以使用session进行消息发送,方法是使用write方法,创建一个WriteFuture就可以将信息发送出去了。下面将结合我们的三个网络通讯的需求,在实际项目中分析Mina框架的使用。
三、 实用场景分析:
首先是长连接的问题,为了避免每次重复创建连接,就要对连接进行管理。每一个连接在Mina中的就是通过session进行体现的,换言之,就是将session管理起来。所以,我们索性就把网络通讯的部分,与项目的其他部分进行隔离,实现一个网络通讯层,在其中统一管理session。然后在通讯层中实现一个静态Map,Key是IP+Port,Value就是对应的session。然后在每次需要进行连接的时候,从这个Map中通过IP和Port获取对应的session。然后判断这个session是否被创建,或者是session是否被关闭。如果这个session有效,便直接进行使用。如果session无效,再重新创建这个session,然后放到Session Map中。如图3-1通讯层结构类图所示。
然后是连接断开和恢复的处理,在我们实现的Client Handler中,有一个可以被覆盖的方法,void exceptionCaught(IoSession session, Throwable cause)。这个方法在服务器端非正常当机的情况下可以捕获到异常,而且服务器端在线上环境下是不会进行主动连接断开的。所以异常情况便可以包括现有的连接断开情况。如果有这样的情况发生,就将这个session关闭到,然后再需要重新获取这个session的时候,便会判定这个session已经断开,这时会重新创建一个新的session,将Session Map中的元素覆盖掉。
最后是发送信息超时,这个是在每个session的写操作的时候处理,每个写操作在Mina框架中都是一个异步操作,本身程序是不会等待整个操作的结束的,因为这是根据性能上的考虑。但是如果我们需要知道发送消息是否超时,便可以在前期的简单实现过程中使用这种方式。首先为session设定一个写操作的超时时间,我们设置5秒钟,然后在每个写操作之后都使用join方法,等待异步操作结束。最后便可以判断写操作是否成功进行,这样就可以处理发送消息超时的问题。可以用下面的代码表示。
publicvoid sessionCreated(IoSession session)throws Exception {
super.sessionCreated(session);
session.setWriteTimeout(5);
if (session.getTransportType() == TransportType.SOCKET) {
((SocketSessionConfig) session.getConfig()).setKeepAlive(true);
}
}
这个是在session被创建的时候,同时还有一项设置是,将Session的连接模式设置成长连接。这样连接就不会有超时中断的现象。
publicstaticboolean setMessage(String ip,int port, String message) {
IoSession session =getSession(ip, port);
if (session !=null) {
WriteFuture wf = session.write(message);
wf.join();
if (wf.isWritten()) {
returntrue;
}
}
returnfalse;
}
这个是发送消息的部分,在发送消息后,判断消息是否成功发送,这样就可以做到处理发送消息超时的问题。
四、 后期优化:
在项目中,现在的Mina框架实现形式还是有许多需要改进,并优化的部分。比如说超时的处理和多线程的问题。
在发送超时的问题上,如果每次发送消息后,都进行异步操作的等待,那么在数据量十分庞大的情况下便会产生效率问题。根据Mina框架中存在的Future模式,可以使用listener来处理是否发送消息超时。可以在Session中添加专门用来处理消息发送超时的listener,然后在需要发送的消息上标注是否超时,如果超时进行重发,或者是其他操作。这样可以大大加快发送消息的速度,但是对于程序的复杂性,便会有很大的提升。由于现在正处在项目的初期是现阶段,可以不需要考虑这种复杂的模式。但是可以最为后期优化的内容。
至于多线程的问题,由于Mina框架本身就拥有线程池的功能,所以它是可以做到多线程的消息发送的。可是这需要消息缓冲区的配合,而且会造成现有系统整合上的冲突,所以,也不在目前情况的考虑范围内。但是,后期可以考虑实现这方面的功能。
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
Mina NIO Socket个人总结,其中包括重连机制,自定义解码器,需要加入的jar包log4j.jar,mina-core-2.0.1.jar,slf4j-api-1.4.2.jar,slf4j-log4j12-1.4.2.jar,也希望给接触者一些帮助。解码器感觉有点麻烦,各位指教。我的解码器"]"为一条消息的结束标记。后面附源码,如有更好方法请留言。
Server
- package com.joe.server;
- import java.net.InetSocketAddress;
- import org.apache.mina.filter.codec.ProtocolCodecFilter;
- import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
- import org.apache.mina.filter.executor.ExecutorFilter;
- import org.apache.mina.filter.executor.OrderedThreadPoolExecutor;
- import org.apache.mina.filter.logging.LoggingFilter;
- import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
- import com.joe.codec.decoder.MyMessageDecoder;
- import com.joe.codec.encoder.MyMessageEncoder;
- import com.joe.handler.ServerIoHandler;
- /**
- * @author joe
- */
- public class MyServer {
- private NioSocketAcceptor acceptor;
- /**
- * Constructor
- */
- public MyServer() {
- try {
- acceptor = new NioSocketAcceptor();
- acceptor.getFilterChain().addLast("threadPool",
- new ExecutorFilter(new OrderedThreadPoolExecutor()));// 设置线程池,以支持多线程
- acceptor.getFilterChain().addLast("logger", new LoggingFilter());
- /**
- * 默认编码器,解码器,遇到\n默认消息结束
- * 当然可以加参数指定解码字符,但解码字符会被截掉
- * 例如:new TextLineCodecFactory(Charset.forName("UTF-8"),"]","]");
- * 则会认为"]"为一条消息结束,遇到"]"则截取
- * 比如服务器给你发送的消息是aaaa]aaaa]
- * 会收到两条消息:
- * 1、aaaa
- * 2、aaaa
- * 后面的"]"则去掉了
- */
- // acceptor.getFilterChain().addLast(
- // "codec",
- // new ProtocolCodecFilter(new TextLineCodecFactory(Charset
- // .forName("UTF-8"))));// 指定编码过滤器
- DemuxingProtocolCodecFactory pcf = new DemuxingProtocolCodecFactory();
- //自定义编码器
- pcf.addMessageEncoder(String.class, new MyMessageEncoder());
- //自定义解码器
- pcf.addMessageDecoder(new MyMessageDecoder());
- ProtocolCodecFilter codec = new ProtocolCodecFilter(pcf);
- acceptor.getFilterChain().addLast("codec",codec);// 指定编码过滤器
- acceptor.setReuseAddress(true);
- acceptor.setHandler(new ServerIoHandler());// 指定业务逻辑处理器
- acceptor.setDefaultLocalAddress(new InetSocketAddress(8888));// 设置端口号
- acceptor.bind();// 启动监听
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- /**
- * @param args
- */
- public static void main(String[] args) {
- new MyServer();
- }
- }
package com.joe.server;
import java.net.InetSocketAddress;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.executor.OrderedThreadPoolExecutor;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import com.joe.codec.decoder.MyMessageDecoder;
import com.joe.codec.encoder.MyMessageEncoder;
import com.joe.handler.ServerIoHandler;
/**
* @author joe
*/
public class MyServer {
private NioSocketAcceptor acceptor;
/**
* Constructor
*/
public MyServer() {
try {
acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast("threadPool",
new ExecutorFilter(new OrderedThreadPoolExecutor()));// 设置线程池,以支持多线程
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
/**
* 默认编码器,解码器,遇到\n默认消息结束
* 当然可以加参数指定解码字符,但解码字符会被截掉
* 例如:new TextLineCodecFactory(Charset.forName("UTF-8"),"]","]");
* 则会认为"]"为一条消息结束,遇到"]"则截取
* 比如服务器给你发送的消息是aaaa]aaaa]
* 会收到两条消息:
* 1、aaaa
* 2、aaaa
* 后面的"]"则去掉了
*/
// acceptor.getFilterChain().addLast(
// "codec",
// new ProtocolCodecFilter(new TextLineCodecFactory(Charset
// .forName("UTF-8"))));// 指定编码过滤器
DemuxingProtocolCodecFactory pcf = new DemuxingProtocolCodecFactory();
//自定义编码器
pcf.addMessageEncoder(String.class, new MyMessageEncoder());
//自定义解码器
pcf.addMessageDecoder(new MyMessageDecoder());
ProtocolCodecFilter codec = new ProtocolCodecFilter(pcf);
acceptor.getFilterChain().addLast("codec",codec);// 指定编码过滤器
acceptor.setReuseAddress(true);
acceptor.setHandler(new ServerIoHandler());// 指定业务逻辑处理器
acceptor.setDefaultLocalAddress(new InetSocketAddress(8888));// 设置端口号
acceptor.bind();// 启动监听
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* @param args
*/
public static void main(String[] args) {
new MyServer();
}
}
client
- package com.joe.client;
- import java.net.InetSocketAddress;
- import java.nio.charset.Charset;
- import java.util.Date;
- import java.util.Timer;
- import java.util.TimerTask;
- import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
- import org.apache.mina.core.future.ConnectFuture;
- import org.apache.mina.filter.codec.ProtocolCodecFilter;
- import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
- import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
- import org.apache.mina.filter.logging.LoggingFilter;
- import org.apache.mina.transport.socket.nio.NioSocketConnector;
- import com.joe.codec.decoder.MyMessageDecoder;
- import com.joe.codec.encoder.MyMessageEncoder;
- import com.joe.handler.ClientIoHandler;
- /**
- * @author joe
- */
- public class MyClient {
- private NioSocketConnector connector;
- /**
- * Constructor
- */
- public MyClient() {
- connector = new NioSocketConnector();
- /**
- * 设置信息交换的IoHandler,负责接收和发送信息的处理
- */
- connector.setHandler(new ClientIoHandler());
- //配置过滤器
- DefaultIoFilterChainBuilder chain = connector.getFilterChain();
- //增加日志过滤器
- chain.addLast("logger", new LoggingFilter());
- //增加字符编码过滤器以及设置编码器和解码器
- //chain.addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
- /**
- * 默认编码器,解码器,遇到\n默认消息结束
- * 当然可以加参数指定解码字符,但解码字符会被截掉
- * 例如:new TextLineCodecFactory(Charset.forName("UTF-8"),"]","]");
- * 则会认为"]"为一条消息结束,遇到"]"则截取
- * 比如服务器给你发送的消息是aaaa]aaaa]
- * 会收到两条消息:
- * 1、aaaa
- * 2、aaaa
- * 后面的"]"则去掉了
- */
- // acceptor.getFilterChain().addLast(
- // "codec",
- // new ProtocolCodecFilter(new TextLineCodecFactory(Charset
- // .forName("UTF-8"))));// 指定编码过滤器
- DemuxingProtocolCodecFactory pcf = new DemuxingProtocolCodecFactory();
- //自定义编码器
- pcf.addMessageEncoder(String.class, new MyMessageEncoder());
- //自定义解码器
- pcf.addMessageDecoder(new MyMessageDecoder());
- ProtocolCodecFilter codec = new ProtocolCodecFilter(pcf);
- chain.addLast("codec",codec);// 指定编码过滤器
- //设置默认连接的地址和端口
- connector.setDefaultRemoteAddress(new InetSocketAddress("localhost", 8888));
- /**
- * 重连机制
- * 如果没有连接,则过30 * 1000毫秒客户端会尝试重新连接服务器
- * 如果连接,则下面的代码不会执行
- */
- new Timer().schedule(new TimerTask() {
- @Override
- public void run() {
- if (null != connector && !connector.isActive()) {
- try {
- //尝试连接默认的地址和端口
- ConnectFuture connFuture = connector.connect();
- connFuture.awaitUninterruptibly();
- } catch (Exception e) {
- // TODO: handle exception
- e.printStackTrace();
- }
- }
- }
- }, new Date(), 30 * 1000);
- }
- /**
- * @param args
- */
- public static void main(String[] args) {
- new MyClient();
- }
- }
package com.joe.client;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import com.joe.codec.decoder.MyMessageDecoder;
import com.joe.codec.encoder.MyMessageEncoder;
import com.joe.handler.ClientIoHandler;
/**
* @author joe
*/
public class MyClient {
private NioSocketConnector connector;
/**
* Constructor
*/
public MyClient() {
connector = new NioSocketConnector();
/**
* 设置信息交换的IoHandler,负责接收和发送信息的处理
*/
connector.setHandler(new ClientIoHandler());
//配置过滤器
DefaultIoFilterChainBuilder chain = connector.getFilterChain();
//增加日志过滤器
chain.addLast("logger", new LoggingFilter());
//增加字符编码过滤器以及设置编码器和解码器
//chain.addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
/**
* 默认编码器,解码器,遇到\n默认消息结束
* 当然可以加参数指定解码字符,但解码字符会被截掉
* 例如:new TextLineCodecFactory(Charset.forName("UTF-8"),"]","]");
* 则会认为"]"为一条消息结束,遇到"]"则截取
* 比如服务器给你发送的消息是aaaa]aaaa]
* 会收到两条消息:
* 1、aaaa
* 2、aaaa
* 后面的"]"则去掉了
*/
// acceptor.getFilterChain().addLast(
// "codec",
// new ProtocolCodecFilter(new TextLineCodecFactory(Charset
// .forName("UTF-8"))));// 指定编码过滤器
DemuxingProtocolCodecFactory pcf = new DemuxingProtocolCodecFactory();
//自定义编码器
pcf.addMessageEncoder(String.class, new MyMessageEncoder());
//自定义解码器
pcf.addMessageDecoder(new MyMessageDecoder());
ProtocolCodecFilter codec = new ProtocolCodecFilter(pcf);
chain.addLast("codec",codec);// 指定编码过滤器
//设置默认连接的地址和端口
connector.setDefaultRemoteAddress(new InetSocketAddress("localhost", 8888));
/**
* 重连机制
* 如果没有连接,则过30 * 1000毫秒客户端会尝试重新连接服务器
* 如果连接,则下面的代码不会执行
*/
new Timer().schedule(new TimerTask() {
@Override
public void run() {
if (null != connector && !connector.isActive()) {
try {
//尝试连接默认的地址和端口
ConnectFuture connFuture = connector.connect();
connFuture.awaitUninterruptibly();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
}, new Date(), 30 * 1000);
}
/**
* @param args
*/
public static void main(String[] args) {
new MyClient();
}
}
自定义编码器(MessageEncoder)
- package com.joe.codec.encoder;
- import org.apache.mina.core.buffer.IoBuffer;
- import org.apache.mina.core.session.IoSession;
- import org.apache.mina.filter.codec.ProtocolEncoderOutput;
- import org.apache.mina.filter.codec.demux.MessageEncoder;
- /**
- * @author joe
- * @param <T>
- */
- public class MyMessageEncoder implements MessageEncoder<String> {
- /**
- * 编码器未做任何处理
- * @param session
- * @param msg
- * @param out
- * @throws Exception
- */
- public void encode(IoSession session, String msg,
- ProtocolEncoderOutput out) throws Exception {
- IoBuffer buf = IoBuffer.allocate(msg.getBytes().length);
- buf.put(msg.getBytes());
- buf.flip();
- out.write(buf);
- }
- }
package com.joe.codec.encoder;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.apache.mina.filter.codec.demux.MessageEncoder;
/**
* @author joe
* @param <T>
*/
public class MyMessageEncoder implements MessageEncoder<String> {
/**
* 编码器未做任何处理
* @param session
* @param msg
* @param out
* @throws Exception
*/
public void encode(IoSession session, String msg,
ProtocolEncoderOutput out) throws Exception {
IoBuffer buf = IoBuffer.allocate(msg.getBytes().length);
buf.put(msg.getBytes());
buf.flip();
out.write(buf);
}
}
自定义解码器(MessageDecoder)
- package com.joe.codec.decoder;
- import org.apache.mina.core.buffer.IoBuffer;
- import org.apache.mina.core.session.IoSession;
- import org.apache.mina.filter.codec.ProtocolDecoderOutput;
- import org.apache.mina.filter.codec.demux.MessageDecoder;
- import org.apache.mina.filter.codec.demux.MessageDecoderResult;
- /**
- * @author joe
- */
- public class MyMessageDecoder implements MessageDecoder {
- //消息的开始
- private int flag = 0;
- //消息的长度
- private int length = 0;
- //消息的结尾
- private int flaglast = 0;
- //不是第一条消息
- private boolean notfirstmessage = false;
- public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
- int rem = in.remaining();
- int fornumber;
- byte aa;
- if (notfirstmessage) {
- flag++;
- fornumber = rem + flag;
- } else {
- flag = 0;
- fornumber = rem + flag;
- }
- try {
- for (int i = flag; i < fornumber; i++) {
- aa = in.get(i);
- if (']' == aa) {
- flaglast = flag;
- flag = i;
- length = flag - flaglast;
- notfirstmessage = true;
- return MessageDecoderResult.OK;
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- notfirstmessage = false;
- return MessageDecoderResult.NEED_DATA;
- }
- public MessageDecoderResult decode(IoSession session, IoBuffer in,
- ProtocolDecoderOutput out) throws Exception {
- try {
- if (length == 0 || length == 1) {
- in.get();
- out.write("");
- return MessageDecoderResult.OK;
- }
- length++;
- byte[] result = new byte[length];
- for (int i = 0; i < length; i++) {
- result[i] = in.get();
- }
- if (0 == in.remaining()) {
- notfirstmessage = false;
- }
- String cont = new String(result, "us-ascii");
- out.write(cont.trim());
- return MessageDecoderResult.OK;
- } catch (Exception e) {
- e.printStackTrace();
- }
- return MessageDecoderResult.OK;
- }
- public void finishDecode(IoSession session, ProtocolDecoderOutput out)
- throws Exception {
- }
- }