Netty实践:ChannelInboundHandlerAdapter与ChannelO。。。st1;
buffer.ByteBuf;
buffer.Unpooled;
channel.ChannelHandler;
channel.ChannelHandlerContext;
channel.ChannelInboundHandlerAdapter;
util.ChartUtil;
urrent.Future;
urrent.GenericFutureListener;
import channel.ChannelFutureListener;
import urrent.ScheduledExecutorService;
/
**
* Created by Administrator on 2017/5/17.
*/
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("注册事件");
}
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("取消注册事件");
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("有新客户端连接接⼊。。。"+ctx.channel().remoteAddress());
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("失去连接");
}
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
System.out.println("读客户端传⼊数据="+in.toString(ChartUtil.UTF_8));
ctx.piedBuffer("channelRead Netty rocks!", ChartUtil.UTF_8));
//ctx.fireChannelActive();
}
public void channelReadComplete(ChannelHandlerContext ctx){
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
System.out.println("执⾏成功="+future.isSuccess());
}
}
});
ctx.piedBuffer("channelReadComplete Netty rocks!", ChartUtil.UTF_8)).addListener(new GenericFutureListener<Future<? super Void>>() { @Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
}el {
}
}
});
}
public void urEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("urEventTriggered");
}
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelWritabilityChanged");
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cau) {
cau.printStackTrace();
ctx.clo();
}
}
st1;
buffer.ByteBuf;
buffer.Unpooled;
channel.ChannelHandler;
channel.ChannelHandlerContext;
channel.ChannelInboundHandlerAdapter;
util.ChartUtil;
util.ReferenceCountUtil;
util.ReferenceCounted;
urrent.Future;
urrent.GenericFutureListener;
/**
* Created by Administrator on 2017/5/17.
*/
@ChannelHandler.Sharable
public class EchoOtherServerHandler extends ChannelInboundHandlerAdapter {
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("Other注册事件");
ctx.fireChannelRegistered();
}
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("Other取消注册事件");
ctx.fireChannelUnregistered();
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Other有新客户端连接接⼊。。。"+ctx.channel().remoteAddress());
ctx.fireChannelActive();
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Other失去连接");
ctx.fireChannelInactive();
}
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
System.out.println("Other读客户端传⼊数据="+in.toString(ChartUtil.UTF_8));
final ByteBuf byteBuf = piedBuffer("Other channelRead Netty rocks!", ChartUtil.UTF_8);
ctx.writeAndFlush(byteBuf);
ctx.fireChannelRead(msg);
//lea(msg);
}
public void channelReadComplete(ChannelHandlerContext ctx){
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(new GenericFutureListener<Future<? super Void>>() { @Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
System.out.println("Other执⾏成功="+future.isSuccess());
}
}
});
final ByteBuf byteBuf = piedBuffer("Other channelReadComplete Netty rocks!", ChartUtil.UTF_8); ctx.writeAndFlush(byteBuf).addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
}el {
}
}
});
ctx.fireChannelReadComplete();
}
public void urEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("Other urEventTriggered");
}
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
System.out.println("Other channelWritabilityChanged");
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cau) {
cau.printStackTrace();
ctx.clo();
}
st1;
buffer.ByteBuf;
channel.ChannelHandlerContext;
channel.ChannelOutboundHandler;
channel.ChannelOutboundHandlerAdapter;
channel.ChannelPromi;
import java.nio.chart.Chart;
/**
* Created by Administrator on 2017/6/8.
*/
public class EchoServerOutHandler extends ChannelOutboundHandlerAdapter {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromi promi) throws Exception {
System.out.println("EchoServerOutHandler write "+((ByteBuf)msg).toString(Chart.defaultChart()));
ctx.write(msg, promi);
}
}
st1;
bootstrap.ServerBootstrap;
buffer.ByteBufAllocator;
channel.ChannelFuture;
channel.ChannelInitializer;
channel.ChannelPipeline;
channel.EventLoopGroup;
channel.nio.NioEventLoopGroup;
channel.socket.SocketChannel;
channel.socket.nio.NioServerSocketChannel;
InetSocketAddress;
/**
* Created by Administrator on 2017/5/17.
*/
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws InterruptedException {
int port = 8080;
ByteBufAllocator allocator;
new EchoServer(port).start();
}
public void start() throws InterruptedException {
final EchoServerHandler echoServerHandler = new EchoServerHandler();
final EchoOtherServerHandler echoOtherServerHandler = new EchoOtherServerHandler();
final EchoServerOutHandler echoServerOutHandler = new EchoServerOutHandler();
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
ServerBootstrap rverBootstrap = new ServerBootstrap();
channel(NioServerSocketChannel.class).//指定channel使⽤Nio传输
localAddress(new InetSocketAddress(port)).//执⾏端⼝设置套接字地址
childHandler(new ChannelInitializer<SocketChannel>() {//添加echoServerHandler到Channel的channelpipeline上 @Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addFirst(echoOtherServerHandler);
channelPipeline.addFirst(echoServerOutHandler);
channelPipeline.addLast(echoServerHandler);
}
});
ChannelFuture f = rverBootstrap.bind().sync();//异步绑定服务器,调⽤sync()⽅法阻塞等待直到绑定完成
f.channel().cloFuture().sync();//获得Channel的clofutrue,并且阻塞当前线程直到它完成
} catch (InterruptedException e) {
eventLoopGroup.shutdownGracefully().sync();
}
}
}
st1;
buffer.ByteBuf;
buffer.Unpooled;
channel.ChannelHandler;
channel.ChannelHandlerContext;
channel.ChannelInboundHandlerAdapter;
channel.SimpleChannelInboundHandler;
util.ChartUtil;
urrent.Future;
urrent.GenericFutureListener;
import channel.ChannelFutureListener;
/**
* Created by Administrator on 2017/5/17.
*/
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.piedBuffer("Netty rocks!", ChartUtil.UTF_8)); }
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
ByteBuf in = msg;
System.out.println("读取服务端channelRead0="+in.toString(ChartUtil.UTF_8)); }
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cau) {
cau.printStackTrace();
ctx.clo();
}
}
st1;
bootstrap.Bootstrap;
buffer.ByteBuf;
buffer.Unpooled;
channel.ChannelFuture;
channel.ChannelInitializer;
channel.EventLoopGroup;
channel.nio.NioEventLoopGroup;
channel.oio.OioEventLoopGroup;
channel.socket.SocketChannel;
channel.socket.nio.NioSocketChannel;
channel.socket.oio.OioSocketChannel;
urrent.Future;
urrent.GenericFutureListener;
InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.chart.Chart;
/**
* Created by Administrator on 2017/5/17.
*/
public class EchoClient {
private final int port;
public EchoClient(int port) {
this.port = port;
}
public static void main(String[] argsw) {
ByteBuffer byteBuffer;
String host = "127.0.0.1";
int port = 8080;
try {
new EchoClient(8080).start();
} catch (Exception e) {
e.printStackTrace();
}
}
public void start() throws Exception {
final EchoClientHandler clientHandler = new EchoClientHandler();
EventLoopGroup eventLoopGroup = new OioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
channel(OioSocketChannel.class).
remoteAddress(new InetSocketAddress("localhost",port)).
handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(clientHandler);
}
});
/
/异步连接远程服务,连接远程服务成功后,输出"已经连接到服务器!"
final ChannelFuture f = b.connect();
f.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
System.out.println("已经连接到服务器!");
ByteBuf byteBuf = piedBuffer("创建ByteBuf", Chart.defaultChart()); ChannelFuture channelFuture = f.channel().writeAndFlush(byteBuf);
}el {
Throwable throwable = future.cau();
throwable.printStackTrace();
}
}
});
f.channel().cloFuture().sync();
} catch (InterruptedException e) {
eventLoopGroup.shutdownGracefully().sync();
}
}
}
服务端⽇志:
Other注册事件
注册事件
Other有新客户端连接接⼊。。。/127.0.0.1:58678
有新客户端连接接⼊。。。/127.0.0.1:58678
Other读客户端传⼊数据=创建ByteBufNetty rocks!
EchoServerOutHandler write Other channelRead Netty rocks!
读客户端传⼊数据=创建ByteBufNetty rocks!
EchoServerOutHandler write channelRead Netty rocks! EchoServerOutHandler write
Other执⾏成功=true
EchoServerOutHandler write Other channelReadComplete Netty rocks! EchoServerOutHandler write
执⾏成功=true
EchoServerOutHandler write channelReadComplete Netty rocks!
------------------------------------------------------------------------------------------------
客户端⽇志:
已经连接到服务器!
读取服务端channelRead0=Other channelRead Netty rocks!
读取服务端channelRead0=channelRead Netty rocks!
读取服务端channelRead0=Other channelReadComplete Netty rocks!
读取服务端channelRead0=channelReadComplete Netty rocks!