使用 Netty 自定义解码器处理粘包和拆包问题详解

使用 Netty 自定义解码器处理粘包和拆包问题详解

在网络编程中,粘包和拆包问题是常见的挑战。粘包是指多个数据包在传输过程中粘在一起,而拆包是指一个数据包在传输过程中被拆分成多个部分。Netty 是一个高性能、事件驱动的网络应用框架(学习netty请参考:深入浅出Netty:高性能网络应用框架的原理与实践),提供了强大的工具来解决这些问题。

本文将详细介绍如何使用 Netty 创建自定义解码器和编码器来处理粘包和拆包问题。通过实现一个基于固定长度的解码器和编码器,我们可以确保数据包在传输过程中被正确解析和处理。本文将涵盖以下内容:

粘包与拆包问题

  • 粘包:指的是多个数据包粘在一起,接收端一次性接收多个数据包的情况。

  • 拆包:指的是一个数据包被拆分成多个部分,接收端多次接收到部分数据包的情况。

实现步骤

1. 创建 Netty 项目

首先,创建一个 Maven 项目,并添加 Netty 依赖:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.68.Final</version>
</dependency>

2. 自定义解码器

我们需要实现一个自定义解码器来处理粘包和拆包问题。这里使用的是基于固定长度的解码器。

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class CustomDecoder extends ByteToMessageDecoder {

    private static final int HEADER_SIZE = 4; // 包头的长度,假设包头是一个int表示整个包的长度

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        while (in.readableBytes() >= HEADER_SIZE) {
            in.markReaderIndex(); // 标记当前读指针位置

            int dataLength = in.readInt(); // 读取包头,获取数据包长度

            if (in.readableBytes() < dataLength) {
                in.resetReaderIndex(); // 重置读指针到标记位置
                return; // 等待更多的数据到达
            }

            byte[] data = new byte[dataLength];
            in.readBytes(data);

            out.add(data); // 将解码后的数据添加到输出列表中
        }
    }
}

3. 自定义编码器

为了与解码器配合,我们还需要自定义编码器。

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class CustomEncoder extends MessageToByteEncoder<byte[]> {

    @Override
    protected void encode(ChannelHandlerContext ctx, byte[] msg, ByteBuf out) throws Exception {
        out.writeInt(msg.length); // 写入包头,数据包长度
        out.writeBytes(msg); // 写入实际数据
    }
}

4. 配置 Netty 服务端

配置 Netty 服务端,使用自定义解码器和编码器。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {

    private final int port;

    public NettyServer(int port) {
        this.port = port;
    }

    public void start() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // 接受进来的连接
        EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理已经被接受的连接

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // 使用 NIO 传输
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     // 配置解码器和编码器
                     ch.pipeline().addLast(new CustomDecoder());
                     ch.pipeline().addLast(new CustomEncoder());
                     // 配置业务逻辑处理器
                     ch.pipeline().addLast(new ServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128) // 配置 TCP 参数
             .childOption(ChannelOption.SO_KEEPALIVE, true); // 保持连接

            // 绑定端口,开始接受进来的连接
            ChannelFuture f = b.bind(port).sync();
            // 等待服务器 socket 关闭
            f.channel().closeFuture().sync();
        } finally {
            // 关闭 EventLoopGroup,释放所有资源
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new NettyServer(8080).start();
    }
}

5. 配置 Netty 客户端

配置 Netty 客户端,同样使用自定义解码器和编码器。

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {

    private final String host;
    private final int port;

    public NettyClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class) // 使用 NIO 传输
             .option(ChannelOption.SO_KEEPALIVE, true) // 保持连接
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     // 配置解码器和编码器
                     ch.pipeline().addLast(new CustomDecoder());
                     ch.pipeline().addLast(new CustomEncoder());
                     // 配置业务逻辑处理器
                     ch.pipeline().addLast(new ClientHandler());
                 }
             });

            // 连接服务器
            ChannelFuture f = b.connect(host, port).sync();
            // 等待连接关闭
            f.channel().closeFuture().sync();
        } finally {
            // 关闭 EventLoopGroup,释放所有资源
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new NettyClient("localhost", 8080).start();
    }
}

6. 实现服务端和客户端处理器

服务端和客户端处理器分别处理接收到的数据。

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        byte[] data = (byte[]) msg;
        System.out.println("Server received: " + new String(data));
        // 处理数据逻辑,可以根据业务需求进行数据处理
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close(); // 关闭连接
    }
}

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        byte[] data = "Hello, Netty!".getBytes();
        ctx.writeAndFlush(data); // 发送数据
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        byte[] data = (byte[]) msg;
        System.out.println("Client received: " + new String(data));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close(); // 关闭连接
    }
}

总结

通过自定义解码器和编码器,Netty 可以有效处理粘包和拆包问题。本文介绍了如何实现一个基于固定长度的数据包解码器和编码器,并展示了在 Netty 客户端和服务端中使用自定义解码器和编码器的完整代码示例。通过这种方式,可以确保数据包在传输过程中被正确解析和处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/740209.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Day 32:503. 下一个更大的元素Ⅱ

Leetcode 503. 下一个更大的元素Ⅱ 给定一个循环数组 nums &#xff08; nums[nums.length - 1] 的下一个元素是 nums[0] &#xff09;&#xff0c;返回 nums 中每个元素的 下一个更大元素 。 数字 x 的 下一个更大的元素 是按数组遍历顺序&#xff0c;这个数字之后的第一个比它…

嵌入式实验---实验七 SPI通信实验

一、实验目的 1、掌握STM32F103SPI通信程序设计流程&#xff1b; 2、熟悉STM32固件库的基本使用。 二、实验原理 1、使用STM32F103R6通过74HC595控制一位LID数码管&#xff0c;实现以下两个要求&#xff1a; &#xff08;1&#xff09;数码管从0到9循环显示&#xff1b; …

[leetcode]add-strings 字符串相加

. - 力扣&#xff08;LeetCode&#xff09; class Solution { public:string addStrings(string num1, string num2) {int i num1.length() - 1, j num2.length() - 1, add 0;string ans "";while (i > 0 || j > 0 || add ! 0) {int x i > 0 ? num1[i…

[word] word 如何在文档中进行分栏排版? #媒体#其他#媒体

word 如何在文档中进行分栏排版&#xff1f; 目标效果 将唐代诗人李白的组诗作品《清平调词》进行分栏排版&#xff0c;共分三栏&#xff0c;每一首诗作为一栏&#xff0c;参考效果如下图。

基于STM32的智能健康监测手表

目录 引言环境准备智能健康监测手表系统基础代码实现&#xff1a;实现智能健康监测手表系统 4.1 数据采集模块4.2 数据处理与分析4.3 通信模块实现4.4 用户界面与数据可视化应用场景&#xff1a;健康监测与管理问题解决方案与优化收尾与总结 1. 引言 智能健康监测手表通过使…

ONLYOFFICE 8.1版本桌面编辑器深度体验:创新功能与卓越性能的结合

ONLYOFFICE 8.1版本桌面编辑器深度体验&#xff1a;创新功能与卓越性能的结合 随着数字化办公的日益普及&#xff0c;一款高效、功能丰富的办公软件成为了职场人士的必备工具。ONLYOFFICE团队一直致力于为用户提供全面而先进的办公解决方案。最新推出的ONLYOFFICE 8.1版本桌面编…

【Mysql】数据库事务-手动提交

数据库事务 ** 什么是事务** 事务是一个整体,由一条或者多条SQL 语句组成,这些SQL语句要么都执行成功,要么都执行失败, 只要有一条SQL出现异常,整个操作就会回滚,整个业务执行失败。 比如: 银行的转账业务,张三给李四转账500元 , 至少要操作两次数据库, 张三 -500, 李四 50…

国产的浏览器我就喜爱这一款,它比微软的edge更让人喜爱

小编最近在用Yandex搜索引擎&#xff0c;这个基本上追剧找资料&#xff0c;看漫画什么的都是用到它&#xff08;dddd&#xff09; 有小伙伴就说了&#xff0c;这搜索引擎确实好用&#xff0c;但是不够方便呀&#xff0c;就很多浏览器都不能将它设置为默认引擎进行使用&#xf…

【ONLYOFFICE深度探索】:ONLYOFFICE桌面编辑器8.1震撼发布,打造高效办公新境界

文章目录 一、功能完善的PDF编辑器&#xff1a;解锁文档处理新维度二、幻灯片版式设计&#xff1a;释放创意&#xff0c;打造专业演示三、改进从右至左显示&#xff1a;尊重多元文化&#xff0c;优化阅读体验四、新增本地化选项&#xff1a;连接全球用户&#xff0c;跨越语言障…

详解Spring AOP(一)

目录 1. AOP概述 2.Spring AOP快速入门 2.1引入AOP依赖 2.2编写AOP程序 3.Spring AOP核心概念 3.1切点&#xff08;PointCut&#xff09; 3.2连接点&#xff08;Join Point&#xff09; 3.3通知&#xff08;Advice&#xff09; 3.4切面&#xff08;Aspect&#xff09; …

JDBC的概念 ,核心API的介绍 , 注册驱动介绍

第一章 JDBC 1、JDBC的概念 目标 能够掌握JDBC的概念能够理解JDBC的作用 讲解 客户端操作MySQL数据库的方式 使用第三方客户端来访问MySQL&#xff1a;SQLyog、Navicat 使用MySQL自带的命令行方式 通过Java来访问MySQL数据库&#xff0c;今天要学习的内容 如何通过Java代…

时间?空间?复杂度??

1.什么是时间复杂度和空间复杂度&#xff1f; 1.1算法效率 算法效率分析分为两种&#xff1a;第一种是时间效率&#xff0c;第二种是空间效率。时间效率被称为时间复杂度&#xff0c;而空间效率被称为空间复杂度。 时间复杂度主要衡量的是一个算法的运行速度&#xff0c;而空…

会声会影视频剪辑软件教程之剪辑软件波纹在哪 剪辑软件波纹怎么去掉 波纹剪辑是什么意思

波纹效果做不好&#xff0c;那一定是剪辑软件没选对。一款好用的视频剪辑软件&#xff0c;一定拥有多个制作波纹效果的方法。用户可以根据剪辑创作的需要&#xff0c;挑选最适合作品的波纹效果来使用。有关剪辑软件波纹在哪&#xff0c;剪辑软件波纹怎么去掉的问题&#xff0c;…

使用Fiddler如何创造大量数据

在调试和分析网络流量时&#xff0c;您是否曾为无法深入了解请求和响应的数据而感到困惑&#xff1f;如果有一种工具可以帮助您轻松抓取和分析网络流量&#xff0c;您的工作效率将大大提升。Fiddler正是这样一款功能强大的抓包工具&#xff0c;广受开发者和测试人员的青睐。 Fi…

【日常开发之Windows共享文件】Java实现Windows共享文件上传下载

文章目录 Windows 配置代码部分Maven代码 Windows 配置 首先开启服务&#xff0c;打开控制面板点击程序 点击启用或关闭Windows功能 SMB1.0选中红框内的 我这边是专门创建了一个用户 创建一个文件夹然后点击属性界面&#xff0c;点击共享 下拉框选择你选择的用户点击添加…

CSS规则——font-face

font-face 什么是font-face&#xff1f; 想要让网页文字千变万化&#xff0c;仅靠font-family还不够&#xff0c;还要借助font-face&#xff08;是一个 CSS 规则&#xff0c;它允许你在网页上使用自定义字体&#xff0c;而不仅仅是用户系统中预装的字体。这意味着你可以通过提…

Vue父组件mounted执行完后再执行子组件mounted

// 创建地图实例 this.map new BMap.Map(‘map’) } } ... 现在这样可能会报错&#xff0c;因为父组件中的 map 还没创建成功。必须确保父组件的 map 创建完成&#xff0c;才能使用 this.$parent.map 的方法。 那么&#xff0c;现在的问题是&#xff1a;如何保证父组件 mo…

全空间数据处理

高精度三维数据往往因为体量巨大、数据标准不一、高保密性要求等&#xff0c;给数据的后期储存、处理、分析及展示造成巨大困扰。多源异构数据的客观存在性与数据无缝融合的困难性&#xff0c;为空间信息数据和业务过程中其他文件的有效管理与共享制造了诸多障碍。 随着数字孪…

数据库断言-数据库更新

数据库更新的步骤和查询sql的步骤一致 1、连接数据库 驱动管理器调用连接数据库方法&#xff08;传入url&#xff0c;user&#xff0c;password&#xff09;&#xff0c;赋值给变量 2、操作数据库 connection调用参数化方法&#xff0c;对sql语法进行检查&#xff0c;存储s…

Elasticsearch:倒数排序融合 - Reciprocal rank fusion - 8.14

警告&#xff1a;此功能处于技术预览阶段&#xff0c;可能会在未来版本中更改或删除。语法可能会在正式发布之前发生变化。Elastic 将努力修复任何问题&#xff0c;但技术预览中的功能不受官方正式发布功能的支持 SLA 约束。 倒数排序融合 (reciprocal rank fusion - RRF) 是一…