Netty实例-多个Netty实战⼩实例
Netty实例(多个实战⼩实例)
疯狂创客圈 Java 分布式聊天室【亿级流量】实战系列之18 【】 QQ群:104131248】
⽂章⽬录
源码⼯程
源码IDEA⼯程获取链接:
Netty是基于JDK NIO的⽹络框架
简化了NIO编程, 不⽤程序⾃⼰维护lector, 将⽹络通信和数据处理的部分做了分离
多⽤于做底层的数据通信, ⼼跳检测(keepalived)
1. 数据通信
1.1 Hello World
public class Server {
public static void main(String[] args) throws Exception {
// 1 创建线两个事件循环组
// ⼀个是⽤于处理服务器端接收客户端连接的
// ⼀个是进⾏⽹络通信的(⽹络读写的)
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
// 2 创建辅助⼯具类ServerBootstrap,⽤于服务器通道的⼀系列配置
monica什么意思
ServerBootstrap b = new ServerBootstrap();
.channel(NioServerSocketChannel.class) // 指定NIO的模式.NioServerSocketChannel对应TCP, NioD
atagramChannel对应UDP .option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP缓冲区
.option(ChannelOption.SO_SNDBUF, 32 * 1024) // 设置发送缓冲⼤⼩
.option(ChannelOption.SO_RCVBUF, 32 * 1024) // 这是接收缓冲⼤⼩
beenle.option(ChannelOption.SO_KEEPALIVE, true) // 保持连接
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception { //SocketChannel建⽴连接后的管道
// 3 在这⾥配置通信数据的处理逻辑, 可以addLast多个...
曲轴磨床sc.pipeline().addLast(new ServerHandler());
}
});
// 4 绑定端⼝, bind返回future(异步), 加上sync阻塞在获取连接处
1亿人重返极端贫困ChannelFuture cf1 = b.bind(8765).sync();
//ChannelFuture cf2 = b.bind(8764).sync(); //可以绑定多个端⼝
// 5 等待关闭, 加上sync阻塞在关闭请求处
cf1.channel().cloFuture().sync();
//cf2.channel().cloFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
plant extractSO_BACKLOG详解:
服务器的TCP内核维护两个队列A和B
客户端向服务端请求connect时, 发送SYN(第⼀次握⼿)
服务端收到SYN后, 向客户端发送SYN ACK(第⼆次握⼿), TCP内核将连接放⼊队列A
客户端收到后向服务端发送ACK(第三次握⼿), TCP内核将连接从A->B, accept返回, 连接完成
A/B队列的长度和即为BACKLOG, 当accept速度跟不上, A/B队列使得BACKLOG满了, 客户端连接就会被TCP内核拒绝
可以调⼤backlog缓解这⼀现象, 经验值~100
中括号public class ServerHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("rver ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
批评的艺术byte[] req = new adableBytes()];
String body = new String(req, "utf-8");
System.out.println("Server :" + body );
banner是什么意思String respon = "返回给客户端的响应:" + body ;
ctx.Bytes()));
// future完成后触发监听器, 此处是写完即关闭(短连接). 因此需要关闭连接时, 要通过rver端关闭. 直接关闭⽤⽅法ctx[.channel()].clo() //.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
System.out.println("读完了");
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
throws Exception {
ctx.clo();
}
}
public class Client {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
//ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync(); //可以使⽤多个端⼝
//发送消息, Buffer类型. write需要flush才发送, 可⽤writeFlush代替
cf1.channel().piedBuffer("777".getBytes()));
cf1.channel().piedBuffer("666".getBytes()));
Thread.sleep(2000);
cf1.channel().piedBuffer("888".getBytes()));
//cf2.channel().piedBuffer("999".getBytes()));
cf1.channel().cloFuture().sync();
/
/cf2.channel().cloFuture().sync();
group.shutdownGracefully();
}
}
public class ClientHandler extends ChannelHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new adableBytes()];
String body = new String(req, "utf-8");
System.out.println("Client :" + body );
} finally {
// 记得释放xxxHandler⾥⾯的⽅法的msg参数: 写(write)数据, msg引⽤将被⾃动释放不⽤⼿动处理; 但只读数据时,!必须⼿动释放引⽤数 lea(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cau)
throws Exception {
ctx.clo();
}
}
1.2 拆包粘包问题
TCP/IP确保了包的传送, 包的顺序等, 但编程中还需要解决拆包粘包问题
-> 接收的⼀连串包中的数据, 处理的分隔在哪⾥? 基本解决⽅案:
1)特殊字符作为结束分隔符
2)消息定长. 固定包的长度, 长度不够⽤空格补全. 接收⽅需要trim, 效率不⾼不推荐
3)⾃定义协议. 在消息头中包含消息总长度的字段. 需要安全性时可以考虑.
特殊字符
public class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.
option(ChannelOption.SO_SNDBUF, 32*1024)
.option(ChannelOption.SO_RCVBUF, 32*1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
bronyaprotected void initChannel(SocketChannel sc) throws Exception {
// 使⽤DelimiterBadFrameDecoder设置结尾分隔符$_
ByteBuf buf = piedBuffer("$_".getBytes());
sc.pipeline().addLast(new DelimiterBadFrameDecoder(1024, buf));
// 设置字符串形式的解码. 经过StringDecoder, Handler回调⽅法中接收的msg的具体类型就是String了(不再是ByteBuffer). 但写时仍需要传⼊ByteBuf fer
sc.pipeline().addLast(new StringDecoder());
// 通信数据的处理逻辑
sc.pipeline().addLast(new ServerHandler());
}
});
//4 绑定连接
ChannelFuture cf = b.bind(8765).sync();
//等待服务器监听端⼝关闭
cf.channel().cloFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
public class ServerHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(" rver ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Server :" + msg);
String respon = "服务器响应: " + msg + "$_";
ctx.Bytes()));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
考林}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { ctx.clo();
}
}
public class Client {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ByteBuf buf = piedBuffer("$_".getBytes());
sc.pipeline().addLast(new DelimiterBadFrameDecoder(1024, buf));
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("bbbb$_".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cccc$_".getBytes()));
cf.channel().cloFuture().sync();
group.shutdownGracefully();
}
}