目录

生产环境常见问题分析

消息零丢失方案

1、生产者发消息到Broker不丢失

2、Broker端保存消息不丢失

3、消费者端防止异步处理丢失消息

消息积压如何处理

如何保证消息顺序

​问题一、如何保证Producer发到Partition上的消息是有序的

问题二:Partition中的消息有序后,如何保证Consumer的消费顺序是有序的


生产环境常见问题分析

消息零丢失方案

1、生产者发消息到Broker不丢失

​ Kafka的消息生产者Producer,支持定制一个参数,ProducerConfig.ACKS_CONFIG。

acks配置为0 : 生产者只负责往Broker端发消息,而不关注Broker的响应。也就是说不关心Broker端有没有收到消息。性能高,但是数据会有丢消息的可能。

acks配置为1:当Broker端的Leader Partition接收到消息后,只完成本地日志文件的写入,然后就给生产者答复。其他Partiton异步拉取Leader Partiton的消息文件。这种方式如果其他Partiton拉取消息失败,也有可能丢消息。

acks配置为-1或者all:Broker端会完整所有Partition的本地日志写入后,才会给生产者答复。数据安全性最高,但是性能显然是最低的。

​       对于KafkaProducer,只要将acks设置成1或-1,那么Producer发送消息后都可以拿到Broker的反馈RecordMetadata,里面包含了消息在Broker端的partition,offset等信息。通过这这些信息可以判断消息是否发送成功。如果没有发送成功,Producer就可以根据情况选择重新进行发送。

2、Broker端保存消息不丢失

首先,合理优化刷盘频率,防止服务异常崩溃造成消息未刷盘。Kafka的消息都是先写入操作系统的PageCache缓存,然后再刷盘写入到硬盘。PageCache缓存中的消息是断电即丢失的。如果消息只在PageCache中,而没有写入硬盘,此时如果服务异常崩溃,这些未写入硬盘的消息就会丢失。Kafka并不支持写一条消息就刷一次盘的同步刷盘机制,只能通过调整刷盘的执行频率,提升消息安全。主要涉及几个参数:

flush.ms : 多长时间进行一次强制刷盘。

log.flush.interval.messages:表示当同一个Partiton的消息条数积累到这个数量时,就会申请一次刷盘操作。默认是Long.MAX。

log.flush.interval.ms:当一个消息在内存中保留的时间,达到这个数量时,就会申请一次刷盘操作。他的默认值是空。

​       然后,配置多备份因子,防止单点消息丢失。在Kafka中,可以给Topic配置更大的备份因子replication-factors。配置了备份因子后,Kafka会给每个Partition分配多个备份Partition。这些Partiton会尽量平均的分配到多个Broker上。并且,在这些Partiton中,会选举产生Leader Partition和Follower Partition。这样,当Leader Partition发生故障时,其他Follower Partition上还有消息的备份。就可以重新选举产生Leader Partition,继续提供服务。

3、消费者端防止异步处理丢失消息

消费者端由于有消息重试机制,正常情况下是不会丢消息的。每次消费者处理一批消息,需要在处理完后给Broker应答,提交当前消息的Offset。Broker接到应答后,会推进本地日志的Offset记录。如果Broker没有接到应答,那么Broker会重新向同一个消费者组的消费者实例推送消息,最终保证消息不丢失。这时,消费者端采用手动提交Offset的方式,相比自动提交会更容易控制提交Offset的时机。

消费者端唯一需要注意的是,不要异步处理业务逻辑。因为如果业务逻辑异步进行,而消费者已经同步提交了Offset,那么如果业务逻辑执行过程中出现了异常,失败了,那么Broker端已经接收到了消费者的应答,后续就不会再重新推送消息,这样就造成了业务层面的消息丢失。


消息积压如何处理

通常情况下,Kafka本身是能够存储海量消息的,他的消息积压能力是很强的。但是,如果发现消息积压问题已经影响了业务处理进度,这时就需要进行一定的优化。

1、如果业务运行正常,只是因为消费者处理消息过慢,造成消息加压。那么可以增加Topic的Partition分区数,将消息拆分到更到的Partition。然后增加消费者个数,最多让消费者个数=Partition分区数,让一个Consumer负责一个分区,将消费进度提升到最大。

​        另外,在发送消息时,还是要尽量保证消息在各个Partition中的分布比较均匀。比如,在原有Topic下,可以调整Producer的分区策略,让Producer将后续的消息更多的发送到新增的Partition里,这样可以让各个Partition上的消息能够趋于平衡。如果你觉得这样太麻烦,那就新增一个Topic,配置更多的Partition以及对应的消费者实例。然后启动一批Consumer,将消息从旧的Topic搬运到新的Topic。这些Consumer不处理业务逻辑,只是做消息搬运,所以他们的性能是很高的。这样就能让新的Topic下的各个Partition数量趋于平衡。

2、如果是消费者的业务问题导致消息阻塞了,从而积压大量消息,并影响了系统正常运行。比如消费者序列化失败,或者业务处理全部异常。这时可以采用一种降级的方案,先启动一个Consumer将Topic下的消息先转发到其他队列中,然后再慢慢分析新队列里的消息处理问题。类似于死信队列的处理方式。


如何保证消息顺序

问题要分两个方面来考虑:

1、因为kafka中各个Partition的消息是并发处理的,所以要保证消息顺序,对于Producer,要保证将一组有序的消息发到同一个Partition里。因为Partition的数据是顺序写的,所以自然就能保证消息是按顺序保存的。

​ 2、对于消费者,需要能够按照1,2,3的顺序处理消息。

问题一、如何保证Producer发到Partition上的消息是有序的

首先,要保证Producer将消息都发送到一个Partition上,其实有两种方法。一种简答粗暴的想法就是给Topic只配一个Partition,没有其他Partition可选了,自然所有消息都到同一个Partition上了。表示从创建Topic时就放弃了多Partition带来的吞吐量便利,是不现实的。另一种是Topic依然配置多个Partition,但是通过定制Producer的Partition分区器,将消息分配到同一个Partition上。这样对于某一些要求局部有序的场景是至少是可行的。例如在电商场景,我可能只是需要保证同一个订单相关的多条消息有序,但是并不要求所有消息有序。这样就可以通过自定义分区路由器,将订单相同的多条消息发送到同一个Partition。

但是Producer都将消息往同一个Partition,也不能保证消息顺序。因为消息可能发送失败。比如Producer依次发送1,2,3,三条消息。如果消息1因为网络原因发送失败了,2 和3 发送成功了,这样消息顺序就乱了。如果把producer的acks参数设置成1或-1,这样每次发送消息后,可以根据Broker的反馈判断消息是否成功。思路是可行的,但是重试的次数,发送消息的数量等都是需要考虑的问题。

回顾一下之前对于生产者消息幂等性的设计:

Kafka的这个sequenceNumber是单调递增的。如果只是为了消息幂等性考虑,那么只要保证sequenceNumber唯一就行了,为什么要设计成单调递增呢?其实Kafka这样设计的原因就是可以通过sequenceNumber来判断消息的顺序。也就是说,在Producer发送消息之前就可以通过sequenceNumber定制好消息的顺序,然后Broker端就可以按照顺序来保存消息。与此同时, SequenceNumber单调递增的特性不光保证了消息是有顺序的,同时还保证了每一条消息不会丢失。一旦Kafka发现Producer传过来的SequenceNumber出现了跨越,那么就意味着中间有可能消息出现了丢失,就会往Producer抛出一个OutOfOrderSequenceException异常。

在生产者的配置类ProducerConfig中很快能找到很多和消息顺序ordering的描述:

public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."
                                                                            + " Note that if this configuration is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"
                                                                            + " message reordering after a failed send due to retries (i.e., if retries are enabled); "
                                                                            + " if retries are disabled or if <code>enable.idempotence</code> is set to true, ordering will be preserved."
                                                                            + " Additionally, enabling idempotence requires the value of this configuration to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
                                                                            + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. ";

问题二:Partition中的消息有序后,如何保证Consumer的消费顺序是有序的
    public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";
    private static final String FETCH_MAX_BYTES_DOC = "The maximum amount of data the server should return for a fetch request. " +
            "Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than " +
            "this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. " +
            "The maximum record batch size accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or " +
            "<code>max.message.bytes</code> (topic config). Note that the consumer performs multiple fetches in parallel.";
    public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;

这里明确提到Consumer其实是每次并行的拉取多个Batch批次的消息进行处理的。也就是说Consumer拉取过来的多批消息并不是串行消费的。所以在Kafka提供的客户端Consumer中,是没有办法直接保证消费的消息顺序。其实这也比较好理解,因为Kafka设计的重点是高吞吐量,所以他的设计是让Consumer尽最大的能力去消费消息。而只要对消费的顺序做处理,就必然会影响Consumer拉取消息的性能。

​        所以这时候,我们能做的就是在Consumer的处理逻辑中,将消息进行排序。比如将消息按照业务独立性收集到一个集合中,然后在集合中对消息进行排序。

相关文章

使用Go Validator在Go应用中有效验证数据

Go Validator是一个开源的包,为Go结构体提供强大且易于使用的数据验证功能。该库允许开发者为其数据结构定义自定义验证规则,并确保传入的数据满足指定的条件。Go Validator支持内置验证器、自定义验证器,甚至允许您链式多个验证规则以满足更复杂的数据验证需求。如果内置验证器无法满足您的需求,您可以通过定义自己的验证函数来创建自定义验证器。这个功能允许您实现特定于应用程序需求的验证逻辑。

Redis高并发分布锁实战

Redis分布式锁自己去实现可能会出现几个问题没有在finally显示释放锁,当客户端挂掉了,锁没有被及时删除,这样会导致死锁问题,它这个是需要我们显示的释放锁假如此时我们设置过期时间,但是我们用的是同一个key,就可能出现下一个线程删除上一个线程的锁,但是上一个线程还没有执行完,它这个需要key是不能重复的假如我们既设置了过期时间也指定了不同的key,此时可能因为网络延迟出现上一个线程删除下一个线程的锁,也就是说业务执行的时间超过了锁过期的时间,它这个需要一个锁续命的功能。

【Vue3】使用ref与reactive创建响应式对象

先来简单介绍一下ref,它可以定义响应式的变量let xxx = ref(初始值)。**返回值:**一个RefImpl的实例对象,简称ref对象或refref对象的value属性是响应式的。JSxxx.value,但模板中不需要.value,直接使用即可。对于let name = ref('张三')来说,name不是响应式的,name.value是响应式的。下面我们看一看上图红框中代表的意思是,我们哪里需要响应就在哪个里面导入上述代码即可。

如何设置页面恢复运行事件触发回调

由于 Android 原生的 resume 和 pause 事件不能区分是压后台导致还是页面切换导致,所以 pageResume 和 pagePause 事件是通过 JSAPI 调用记录回调的,仅适用于同一个 session 内 Window 之间的互相切换。当一个 WebView 界面重新回到栈顶时,例如从后台被唤起、锁屏界面恢复、从下个页面回退,会触发页面恢复运行(resume)事件。如果这个界面是通过 popWindow 或 popTo 到达,且传递了 data 参数,则此页可以获取到这些参数。

日常遇到Maven出现依赖版本/缓存问题通用思路。

如果怀疑是本地仓库中缓存的依赖有问题,可以手动删除本地仓库(默认位置在用户的.m2/repository目录下),但这是一个较为极端的做法,因为这会删除所有项目的所有本地依赖,之后Maven将不得不重新下载这些依赖。针对于这样的问题 首先我们的第一思路 就是怀疑到是缓存的问题,那么我在这里去描述一下 我们遇到这类通用类的问题如何解决。检查项目的pom.xml文件,确认依赖声明正确无误,没有冲突的版本号或不正确的依赖范围。版本问题导致的,但是我确认过了一下的一些操作 依然没有解决我的问题。

什么是tomcat?tomcat是干什么用的?

Tomcat是一个开源的、轻量级的应用服务器,是Apache软件基金会的一个项目。它实现了Java Servlet、JavaServer Pages(JSP)和Java Expression Language(EL)等Java技术,用于支持在Java平台上运行的动态Web应用程序。AJP是用于Apache服务器与Tomcat之间进行通信的协议,通常用于将动态生成的内容传递给Apache服务器进行处理。它能够运行Servlet和JSP,提供了一个环境,使得开发者能够构建和运行基于Java的Web应用。

C# winfrom中excel文件导入导出

在C#交流群里,看到很多小伙伴在excel数据导入导出到C#界面上存在疑惑,所以今天专门做了这个主题,希望大家有所收获!环境:win10+vs2017界面:主要以演示为主,所以没有做优化,然后主界面上添加两个按钮,分别命名为ExportExcel和ImportExcel,添加两个dataGridView,分别是dataGridView1和dataGridView2然后在窗体加载程序中给dataGr...

Java 与 JavaScript 的区别与联系

Java 和 JavaScript 两种编程语言在软件开发中扮演着重要的角色。尽管它们都以“Java”命名,但实际上它们是完全不同的语言,各有其独特的特点和用途。本文将深入探讨 Java 和 JavaScript 的区别与联系,帮助大家更好地理解它们在编程世界中的作用。

C语言中的作用域与生命周期

但是全局变量被 static 修饰之后,外部链接属性就变成了内部链接属性,只能在自己所在的源文件内部使用了,其他源文件,即使声明了,也是无法正常使用的。结论:static修饰局部变量改变了变量的生命周期,生命周期改变的本质是改变了变量的存储类型,本来一个局部变量是存储在内存的栈区的,但是被 static 修饰后存储到了静态区。extern 是用来声明外部符号的,如果一个全局的符号在A文件中定义的,在B文件中想使用,就可以使用extern进行声明,然后使用。全局变量的生命周期是:整个程序的生命周期。

Python和Java的区别(不断更新)

运行效率:一般来说,Java的运行效率要高于Python,这主要是因为Java是编译型语言,其代码在执行前会进行预编译,而Python是解释型语言,边解释边执行。而Python没有类似的强大虚拟机,但它的核心是可以很方便地使用C语言函数或C++库,这使得Python可以轻松地与底层硬件进行交互。**类型系统:**Java是一种静态类型语言,所有变量需要先声明(类型)才能使用,且类型在编译时就已经确定。总的来说,Python和Java各有其优势和特点,选择哪种语言取决于具体的项目需求、开发环境以及个人偏好。

服务器与电脑的区别?

服务器是指一种专门提供计算和存储资源、运行特定软件服务的物理或虚拟计算机。服务器主要用于接受和处理来自客户端(如个人电脑、手机等)的请求,并向客户端提供所需的服务或数据。服务器在网络环境中扮演着中心节点的角色,负责存储和管理数据、提供网络服务、处理计算任务等。

windows下ngnix自启动(借助工具winSw)

在windows下安装nginx后,不想每次都手动启动。本文记录下windows下ngnix自启动(借助工具winSw)的操作流程提示:以下是本篇文章正文内容,下面案例可供参考本文记录下windows下ngnix自启动(借助工具winSw)的操作流程。

synchronized 和 Lock 有什么区别?synchronized 和 ReentrantLock 区别是什么?说一下 atomic 的原理?

例如,AtomicInteger 的 incrementAndGet() 方法就是通过 CAS 操作实现的,它首先尝试原子地将共享变量加 1,如果操作成功,则返回新的值,否则重试直到操作成功为止。CAS 操作的原理是,当 V 的值等于 A 时,将 V 的值更新为 B,否则什么也不做。synchronized 和 Lock 都是 Java 中用于实现线程同步的关键字/类库,它们都能够提供对共享资源的安全访问和防止数据竞争的功能,但是在实现方式、特性、适用场景等方面存在一些差异。

SpringBoot security 安全认证(二)——登录拦截器

本节内容:实现登录拦截器,除了登录接口之外所有接口访问都要携带Token,并且对Token合法性进行验证,实现登录状态的保持。核心内容:1、要实现登录拦截器,从Request请求中获取token,从缓存中获取Token并验证登录是否过期,若验证通过则放行;2、实现对拦截器配置,SpringBoot 安全模块使用HttpSecurity 来完成请求安全管理。

深入理解 Java 变量类型、声明及应用

Java 变量 变量是用于存储数据值的容器。在 Java 中,有不同类型的变量,例如: String - 存储文本,例如 &amp;quot;你好&amp;quot;。字符串值用双引号引起来。 int - 存储整数(全数字),没有小数,例如 123 或 -123。 float - 存储浮点数,带有小数,例如 19.

Zookeeper分布式队列实战

ZooKeeper实现队列步骤1.创建队列根节点:在Zookeeper中创建一个持久节点,用作队列的根节点。所有队列元素的节点将放在这个根节点下。2.实现入队操作:当需要将一个元素添加到队列时,可以在队列的根节点下创建一个临时有序节点。节点的数据可以包含队列元素的信息。3.实现出队操作:当需要从队列中取出一个元素时,先获取根节点下的所有子节点。再找到具有最小序号的子节点,获取该节点的数据,删除该节点,然后返回节点的数据。

SpringMVC校验注解不生效

用来实现参数校验功能。Spring使用hibernate-validator作为它的默认实现,我们只需要进行一些简单的注解声明,就可以达到参数校验的功能。但是在实际使用场景中,经常会出现校验没生效的问题。

Web 安全之点击劫持(Clickjacking)攻击详解

点击劫持(Clickjacking)攻击,又称为界面伪装攻击,是一种利用视觉欺骗手段进行攻击的方式。攻击者通过技术手段欺骗用户点击本没有打算点击的位置,当用户在被攻击者攻击的页面上进行操作时,实际点击结果被劫持,从而被攻击者利用。这种攻击方式利用了用户对网站的信任,通过覆盖层(通常是透明的iframe)覆盖在另一个网页之上,使受害者无法察觉。

为什么Java中的String类被设计为final类?

String类作为Java中不可或缺的类之一,被设计成final类带来了不可变性、安全性、可靠性和性能优势。不可变的特性使得String对象在多线程环境下安全共享,提高了应用程序的并发性和性能。此外,String类的设计还符合Java类库的一致性和规范,确保了整个语言的稳定性和可靠性。因此,String类被设计成final类是出于多方面的考虑,以提供最佳的使用体验和编程效率。

bat脚本打开多个黑窗口并执行不同的命令

在使用java -jar运行jar包之前,需要先启动redis,而redis的安装目录与jar包不在同一目录下,所以每次启动项目的时候都需要来回的切换目录。现写了一个bat脚本,用来一键启动redis和jar包。start cmd /k &quot;cd /d redis安装目录 &amp;&amp; redis-server redis.windows.conf&quot;其中,cmd /k命令是不关闭黑窗口的命令,timeout /T 3表示等待3秒,/NOBREAK表示键盘输入不会中断等待。

鸿蒙(ArkUI)开发:实现二级联动

列表的二级联动(Cascading List)是指根据一个列表(一级列表)的选择结果,来更新另一个列表(二级列表)的选项。这种联动可以使用户根据实际需求,快速定位到想要的选项,提高交互体验。例如,短视频中拍摄风格的选择、照片编辑时的场景的选择,本文即为大家介绍如何开发二级联动。

Vue和React的区别 | | React函数式写法和类写法的区别

React 更多的是一个库而不是框架,它更专注于视图层的管理,通过社区和第三方库来进行补充和扩展。类式组件: 类式组件是 ES6 中引入的 class 类的一种用法,它继承自 React.Component,拥有完整的生命周期和内部状态管理能力。它是无状态的,没有生命周期和内部状态。而在 React 中,我们使用 JSX 语法,它是一种 JavaScript 的扩展语法,可以在 JavaScript 中直接编写类似 HTML 的结构。在 React 中,我们可以定义组件的两种方式,即函数式组件和类式组件。
返回
顶部