博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty实现心跳检测与断线重连
阅读量:6851 次
发布时间:2019-06-26

本文共 7202 字,大约阅读时间需要 24 分钟。

使用Netty实现心跳机制

代码环境:JDK1.8和Netty4.x

具体思路如下:

  • 使用Netty提供的IdleStateHandler来检测读写操作的空闲时间

  • 使用Protocol Buffer序列化

  • 客户端write空闲5s后向服务端发送一个心跳包

  • 服务端read空闲6s后心跳丢失计数器+1(丢失的心跳包数量)

  • 当丢失的心跳包数量超过3个时,主动断开该客户端的channel

  • 断开连接后,客户端10s之后重新连接

代码已上传至GitHub:

代码实现:

数据包结构(proto文件)

option java_outer_classname = "PacketProto";message Packet {    // 包的类型    enum PacketType {        // 心跳包        HEARTBEAT = 1;        // 非心跳包        DATA = 2;    }    // 包类型    required PacketType packetType = 1;        // 数据部分(可选,心跳包不包含数据部分)    optional string data = 2;}

ClientHeartbeatHandler类

public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        System.out.println("--- Server is active ---");    }    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        System.out.println("--- Server is inactive ---");        // 10s 之后尝试重新连接服务器        System.out.println("10s 之后尝试重新连接服务器...");        Thread.sleep(10 * 1000);        Client.doConnect();    }    @Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {        if (evt instanceof IdleStateEvent) {            // 不管是读事件空闲还是写事件空闲都向服务器发送心跳包            sendHeartbeatPacket(ctx);        }    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        System.out.println("连接出现异常");    }    /**     * 发送心跳包     *     * @param ctx     */    private void sendHeartbeatPacket(ChannelHandlerContext ctx) {        Packet.Builder builder = newBuilder();        builder.setPacketType(Packet.PacketType.HEARTBEAT);        Packet packet = builder.build();        ctx.writeAndFlush(packet);    }}

Client类

public class Client {    private static Channel ch;    private static Bootstrap bootstrap;    public static void main(String[] args) {        NioEventLoopGroup workGroup = new NioEventLoopGroup();        try {            bootstrap = new Bootstrap();            bootstrap                    .group(workGroup)                    .channel(NioSocketChannel.class)                    .handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new IdleStateHandler(0, 5, 0)); pipeline.addLast(new ClientHeartbeatHandler()); } }); // 连接服务器 doConnect(); // 模拟不定时发送向服务器发送数据的过程 Random random = new Random(); while (true) { int num = random.nextInt(21); Thread.sleep(num * 1000); PacketProto.Packet.Builder builder = newBuilder(); builder.setPacketType(PacketProto.Packet.PacketType.DATA); builder.setData("我是数据包(非心跳包) " + num); PacketProto.Packet packet = builder.build(); ch.writeAndFlush(packet); } } catch (InterruptedException e) { e.printStackTrace(); } finally { workGroup.shutdownGracefully(); } } /** * 抽取出该方法 (断线重连时使用) * * @throws InterruptedException */ public static void doConnect() throws InterruptedException { ch = bootstrap.connect("127.0.0.1", 20000).sync().channel(); }}

ServerHeartbeatHandler类

public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter {    // 心跳丢失计数器    private int counter;    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        System.out.println("--- Client is active ---");    }    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        System.out.println("--- Client is inactive ---");    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        // 判断接收到的包类型        if (msg instanceof Packet) {            Packet packet = (Packet) msg;            switch (packet.getPacketType()) {                case HEARTBEAT:                    handleHeartbreat(ctx, packet);                    break;                case DATA:                    handleData(ctx, packet);                    break;                default:                    break;            }        }    }    @Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {        if (evt instanceof IdleStateEvent) {            // 空闲6s之后触发 (心跳包丢失)            if (counter >= 3) {                // 连续丢失3个心跳包 (断开连接)                ctx.channel().close().sync();                System.out.println("已与Client断开连接");            } else {                counter++;                System.out.println("丢失了第 " + counter + " 个心跳包");            }        }    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        System.out.println("连接出现异常");    }    /**     * 处理心跳包     *     * @param ctx     * @param packet     */    private void handleHeartbreat(ChannelHandlerContext ctx, Packet packet) {        // 将心跳丢失计数器置为0        counter = 0;        System.out.println("收到心跳包");        ReferenceCountUtil.release(packet);    }    /**     * 处理数据包     *     * @param ctx     * @param packet     */    private void handleData(ChannelHandlerContext ctx, Packet packet) {        // 将心跳丢失计数器置为0        counter = 0;        String data = packet.getData();        System.out.println(data);        ReferenceCountUtil.release(packet);    }}

Server类

public class Server {    public static void main(String[] args) {        NioEventLoopGroup acceptorGroup = new NioEventLoopGroup(1);        NioEventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap bootstrap = new ServerBootstrap();            bootstrap                    .group(acceptorGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .childHandler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ProtobufVarint32FrameDecoder()); pipeline.addLast(new ProtobufDecoder(PacketProto.Packet.getDefaultInstance())); pipeline.addLast(new IdleStateHandler(6, 0, 0)); pipeline.addLast(new ServerHeartbeatHandler()); } }); Channel ch = bootstrap.bind(20000).sync().channel(); System.out.println("Server has started..."); ch.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { acceptorGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}

转载地址:http://cdgul.baihongyu.com/

你可能感兴趣的文章
朱晔的互联网架构实践心得S2E1:业务代码究竟难不难写?
查看>>
Parameters转换为Variant数组
查看>>
译言精选-十天内掌握线性代数:惊人的超速学习实验
查看>>
POJ 2545+2591+2247+1338简单水题
查看>>
[翻译]DirectX 状态
查看>>
【C++ Primer】【习题】【1.3】
查看>>
基于C#实现的开源自动更新程序
查看>>
Hadoop 集群启动一直处于safemode解决方法
查看>>
poj 2503:Babelfish(字典树,经典题,字典翻译)
查看>>
【转】eclipse中egit插件使用
查看>>
underscore的封装和扩展
查看>>
Grunt + Bower—前端构建利器(转)
查看>>
xpath属性值的模糊匹配
查看>>
221. Maximal Square
查看>>
python --文本文件的输入输出
查看>>
Tslib的移植【转】
查看>>
iOS开发--音乐文件播放工具类的封装(包含了音效的封装)
查看>>
如何获取一个AlertDialog中的EditText中输入的内容
查看>>
OpenGL帧缓存对象(FBO:Frame Buffer Object) 【转】
查看>>
hihoCoder_二分·归并排序之逆序对
查看>>