使用Netty实现心跳机制
代码环境:JDK1.8和Netty4.x
具体思路如下:
使用Netty提供的IdleStateHandler来检测读写操作的空闲时间
使用Protocol Buffer序列化
客户端write空闲5s后向服务端发送一个心跳包
服务端read空闲6s后心跳丢失计数器+1(丢失的心跳包数量)
当丢失的心跳包数量超过3个时,主动断开该客户端的channel
断开连接后,客户端10s之后重新连接
代码已上传至GitHub:
代码实现:
数据包结构(proto文件)
option java_outer_classname = "PacketProto";message Packet { // 包的类型 enum PacketType { // 心跳包 HEARTBEAT = 1; // 非心跳包 DATA = 2; } // 包类型 required PacketType packetType = 1; // 数据部分(可选,心跳包不包含数据部分) optional string data = 2;}
ClientHeartbeatHandler类
public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("--- Server is active ---"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("--- Server is inactive ---"); // 10s 之后尝试重新连接服务器 System.out.println("10s 之后尝试重新连接服务器..."); Thread.sleep(10 * 1000); Client.doConnect(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { // 不管是读事件空闲还是写事件空闲都向服务器发送心跳包 sendHeartbeatPacket(ctx); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("连接出现异常"); } /** * 发送心跳包 * * @param ctx */ private void sendHeartbeatPacket(ChannelHandlerContext ctx) { Packet.Builder builder = newBuilder(); builder.setPacketType(Packet.PacketType.HEARTBEAT); Packet packet = builder.build(); ctx.writeAndFlush(packet); }}
Client类
public class Client { private static Channel ch; private static Bootstrap bootstrap; public static void main(String[] args) { NioEventLoopGroup workGroup = new NioEventLoopGroup(); try { bootstrap = new Bootstrap(); bootstrap .group(workGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new IdleStateHandler(0, 5, 0)); pipeline.addLast(new ClientHeartbeatHandler()); } }); // 连接服务器 doConnect(); // 模拟不定时发送向服务器发送数据的过程 Random random = new Random(); while (true) { int num = random.nextInt(21); Thread.sleep(num * 1000); PacketProto.Packet.Builder builder = newBuilder(); builder.setPacketType(PacketProto.Packet.PacketType.DATA); builder.setData("我是数据包(非心跳包) " + num); PacketProto.Packet packet = builder.build(); ch.writeAndFlush(packet); } } catch (InterruptedException e) { e.printStackTrace(); } finally { workGroup.shutdownGracefully(); } } /** * 抽取出该方法 (断线重连时使用) * * @throws InterruptedException */ public static void doConnect() throws InterruptedException { ch = bootstrap.connect("127.0.0.1", 20000).sync().channel(); }}
ServerHeartbeatHandler类
public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter { // 心跳丢失计数器 private int counter; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("--- Client is active ---"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("--- Client is inactive ---"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 判断接收到的包类型 if (msg instanceof Packet) { Packet packet = (Packet) msg; switch (packet.getPacketType()) { case HEARTBEAT: handleHeartbreat(ctx, packet); break; case DATA: handleData(ctx, packet); break; default: break; } } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { // 空闲6s之后触发 (心跳包丢失) if (counter >= 3) { // 连续丢失3个心跳包 (断开连接) ctx.channel().close().sync(); System.out.println("已与Client断开连接"); } else { counter++; System.out.println("丢失了第 " + counter + " 个心跳包"); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("连接出现异常"); } /** * 处理心跳包 * * @param ctx * @param packet */ private void handleHeartbreat(ChannelHandlerContext ctx, Packet packet) { // 将心跳丢失计数器置为0 counter = 0; System.out.println("收到心跳包"); ReferenceCountUtil.release(packet); } /** * 处理数据包 * * @param ctx * @param packet */ private void handleData(ChannelHandlerContext ctx, Packet packet) { // 将心跳丢失计数器置为0 counter = 0; String data = packet.getData(); System.out.println(data); ReferenceCountUtil.release(packet); }}
Server类
public class Server { public static void main(String[] args) { NioEventLoopGroup acceptorGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(acceptorGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ProtobufVarint32FrameDecoder()); pipeline.addLast(new ProtobufDecoder(PacketProto.Packet.getDefaultInstance())); pipeline.addLast(new IdleStateHandler(6, 0, 0)); pipeline.addLast(new ServerHeartbeatHandler()); } }); Channel ch = bootstrap.bind(20000).sync().channel(); System.out.println("Server has started..."); ch.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { acceptorGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}