正在估算剩余时间(icloud恢复正在估算剩余时间)

  • 时间:
  • 浏览:58
  • 来源:奥一装修网

已请求更新正在估算剩余时间

大数据书写的真实问题-第2章:Spark面试问题第二章目录第二章Spark2。1 Spark原理2。1。1 Shuffle原理2。1。1。1 SortShuffle2。1。1。2和hadoop shuffle 2。1。2作业提交过程之间的区别(重要) 2。1。2。1独立版本2。1。2。2纱线客户摘要:与独立版本的区别在于AM仅用作中间连接,而实际AM是驱动程序的sparkContext2。1。2。3纱线群集摘要:仅与yarn客户相比驱动程序端已从客户端更改为群集中的NodeManager节点。 2。1。3工作操作原理阶段1:我们编写驱动程序来定义RDD的作用和转换。这些依赖关系构成了操作DAG。阶段2:根据已形成的DAG,DAGScheduler将其分为不同阶段。阶段3:每个阶段都有一个TaskSet。 DAGScheduler将TaskSet交给TaskScheduler执行。执行任务后,TaskScheduler将结果返回给DAGSCheduler。第四阶段:TaskScheduler将任务分发到每个Worker节点以执行,然后将结果返回给TaskScheduler。 2。1。4任务重试和本地化级别TaskScheduler遍历taskSet,调用launchTask方法以根据数据“本地化级别”将任务发送到指定的Executor任务。选择执行器时,第一级将具有优先权,如果执行器资源不足,它将等待一段时间(默认3s),然后逐渐降级。
重试机制将被执行。如果任务提交失败,默认情况下它将重试3次。 3次后,DAGScheduler将重新提交TaskSet,然后重试。总共提交了4次。当工作被判定为失败12次后,执行器将被杀死。完成75%的任务后,每100秒计算所有剩余任务执行时间的中位数。超过此数字1。5倍的任务被判定为困难任务。 2。1。5 DAG原理(源级别)2。1。6 SparkContext创建过程(源级别)2。1。7 Spark SQL操作原理2。1。8执行器的Spark内存模型内存分为4 + 1块:执行:计算内存,使用Memory用于在执行各种运算符时存储临时对象的存储:高速缓存存储器,主要存储在内存中捕获的数据,并且广播变量也存在于此。用户存储器:用户存储器,用于存储RDD信息(如RDD依赖项)。保留内存:保留内存,用于存储Spark自己的对象。堆外内存:堆外内存。启动后,计算和高速缓存都可以存储在堆外存储器中。堆外内存不受spark GC的影响。执行和存储使用联合存储机制,该机制可以借用彼此的存储区域,但是执行可以强制收集存储内存,反之亦然。任务共享执行程序的内存区域。 Spark准备一个hashMap来记录每个任务使用的内存。当任务申请新的内存时,如果剩余内存不足,它将阻塞直到有足够的内存。每个任务至少需要启动1 / 2N的内存。 2。1。9运算符的原理2。1。9。1 foreach和foreachPartition之间的区别两种运算符都属于Action运算符,但适用于不同的场景。 Foreach主要基于输出打印,用于数据显示,并且foreachPartition适合于每个连接。创建连接时使用此连接,
提高执行效率并减少资源消耗。 2。1。9。2 map和mapPartitions的区别这两个运算符都是转换运算符和转换运算符,但是适用于不同的场景,map用于处理每条数据,也就是说,执行效率略低,而mapPartition是处理一个分区数据,返回值是一个集合,也就是说,后者在效率方面效率更高,前者稍低,但是在执行安全性方面,map更适合处理大量数据,而mappartition适用于中小型数据量,如果数据量太大,将导致程序崩溃或oom。 2。2 spark版本2。x和1。6之间的区别基础执行内存模型已从以前的静态内存模型更改为动态内存模型。在spark 2。X之后引入了一个称为DataSet的新功能,该功能等效于集成。DF和RDD之间的关系使操作Spark API更加容易。 2。3 Spark概念2。3。1 RDDRDD是一种灵活的分布式数据集,是只读分区记录的集合,只能根据某个数据集或其他RDD进行转换,因此具有高容错和低开销的特点。 2。3。2 Jobjob可以被视为我们在驱动程序中提交的程序中的一项操作,或者通过spark-submit提交,我们的程序中有许多操作,它们对应于许多作业。可以并行执行的任务任务集是在存在依赖关系的各个阶段之间进行序列化,而sparkJob可能会生成多组阶段。 Stage具有两个子类:ResultStage和ShuffleMapStage2。3。3。1 ResultStage在RDD的某些分区上应用函数以计算操作操作的结果,
产生随机数据。它们出现在随机播放之前。执行完成后,将保存结果数据,以便reduce任务可以获取它。 2。3。4 Tasktask是执行Spark作业的逻辑单元。在执行程序2。3。4。1的CPU内核中运行后,发生ShuffleMapTaskshuffle操作,然后Task将被分为两种类型的任务,其中上游数据称为ShuffleMapTask,主要用于上游数据处理以准备下游数据拉取。 2。3。4。2 ResultTask下游数据主要基于上游数据的结果集。实际上,ResultTask会根据元数据索引提取数据文件,然后根据Key汇总所有值。 2。3。5 Spark中对kyro序列化了解多少?实际上,Spark的序列化默认为org。apache。spark。serializer。JavaSerializer,可选的org。apache。spark。serializer。KryoSerializer,只要它是org。apache。spark。serializer的子类即可,但如果是仅应用了,可能您自己不会实现。序列化仍然对spark应用程序的性能有很大影响。对于特定的数据格式,KryoSerializer的性能可以达到JavaSerializer的十倍以上。当然,在整个Spark程序中都会考虑它,而不是考虑比例。这么大,但以Wordcount为例,通常很容易实现30%以上的性能提升。对于某些基本类型的数据,例如Int,性能的提高几乎可以忽略不计。与JavaSerializer相比,KryoSerializer依靠Twitter的Chill库来实现,主要问题是并非所有Java Serializable对象都可以支持。请注意,此处的序列化程序可以匹配的对象是随机播放数据,
Spark Task的序列化是通过spark。closure。serializer配置的,但目前仅支持JavaSerializer。 2。3。6持久火花使用catch和persist方法保持结果。 persist方法共有5个参数,分别对应12个缓存级别。这12个级别分别来自磁盘存储,内存存储,堆外内存存储,是否反转序列化和备份编号的五个角度设置。捕获使用仅保留在内存中的Memory_Only。 2。3。7检查点火花通过检查点方法将RDD状态保存在高可用性存储中。与持久性不同,它是RDD状态的副本,并且是持久性的。执行checkPoint之后,不再保存依赖项链。此外,在程序完成时,永久性存储缓存将自动删除,并且检查点保存的RDD状态只能手动清除。 2。3。8广播变量在正常情况下,spark复制每个任务所需的数据副本。如果大量的Task需要使用相同的数据,则此方法将导致节点Excutor(包括更多Task)从驱动程序侧提取大量重复数据,从而占用网络IO和内存资源。使用广播变量后,Task将延迟加载数据。加载时,首先在本地Excutor的BlockManager中搜索。如果找不到,它将在最近的节点的BlockManager中搜索。在找到数据之前,数据将被传输到本地存储。多个任务可以重用此数据,从而大大减少了内存占用和IO时间。 2。3。9蓄能器火花为整个过程中执行附加MR任务的蓄能器提供了条件。可以将其初始化并发送到驱动程序侧的每个任务,然后在每个任务中向其添加数据,并在reduce还原后最终减少结果。聚合后返回驱动程序。您可以自定义累加器的类型并通过实现聚合方法来创建自定义累加器。另外,spark2还支持特殊的累加器-收集器,它不需要执行reduce,它将原始数据存储在收集中并返回。
需要抓住转换运算符,否则可能导致重复累积。 2。3。10分区2。3。10。1概念分区是RDD中用于并行计算的计算单元。它是RDD数据集的逻辑分片。分区的格式确定并行计算的粒度,而分区的数量确定任务的数量。 2。3。10。2功能通过在同一节点上放置相同的密钥,可以避免不同节点聚合密钥时,随机操作产生的网络IO;另外,预先分区的数据在连接时只能被另一个表混洗。不打乱,这通常用于将大表连接到小表。 2。3。10。3默认分区程序HashPartitioner:对键的哈希值/分区数进行分区可选分区器RangePartitioner:范围分区器,按字典顺序或编号大小/分区号排序到分区2。3。10。4自定义分区器,通过实现get partition total number方法和获取分区号方法,指定自定义规则的键分区方法;使用自定义分区程序创建的RDD来执行复杂的聚合或联接操作会更高效。 2。3。11并行性Spark作业的最大并行性=执行器数*每个执行器的cpu核心数。但是,当前的spark并行性取决于任务数,任务数=分区数。可以通过spark。default。parallelism设置分区数,也可以在使用运算符时明确指定分区程序和分区数。 Spark正式建议将分区数设置为最大并行度的2-3倍,以确保预先计算的线程可立即用于后续任务,并且每个任务处理的数据量将更少。 2。4火花调整总结减少GC:增加计算内存;将经常使用的大型缓存缓存到堆外内存中;使用计数器存储重复数据以增加并行度:在混洗期间指定分区数,通过重新分区(可用线程中的2个)-3倍增加分区,减少分区,可以为本地聚合指定带有混洗的重分区以减少混洗:使用指定的分区程序进行分区,
然后读取大表并使用广播变量进行joinmap端减少:使用combineByKey运算符指定persist:在内存不足时使用persisit指定高速缓存磁盘而不是catch参数调整:设置压缩RDD(保存内存,增加CPU),设置kyro序列化2。延长等待时间,以降低本地化级别,启用预测性执行等。2。5其他2。5。1 SparkSQL和Hive之间的差异示例HQL窗口函数在拥有之后发生,因此其窗口函数可以在组聚集后对数据进行计数,相反,Spark在作业之间没有序列关系,因此其窗口函数只能在聚合之前使用数据。

用于SparkListener获得jsc。setLocalProperty(“线程”,“客户端”); rdd2。count();这样,作业和作业提交者之间的映射关系被记录在jobInfoMap中。当发现作业延迟时,可以通过SparkContext的Call cancelJob来取消,但这足以在这里吗?往下看,执行者取消了作业,并最终调用:def kill(interruptThread:Boolean){_killed = true if(context!= Null){context。markInterrupted()} if(interruptThread && taskThread!= Null){taskThread。最后调用Thread。interrupt函数为启动任务的线程设置中断标志位,因此在长时间允许的任务中,需要判断Thread中断标志位。设置后,您需要退出并做一些清理,即存在类似的代码段:if(Thread。interrupted()){// 。。。线程被中断,清理资源} //或者当调用sleep时,需要等待功能时将抛出InterruptedException异常。捕获并进行相应的处理除了上述操作之外,您还需要配置操作以对每个作业调用kill,即spark。job。interruptOnCancel属性为true //设置提交的本地属性触发动作提交作业之前的线程,以使SparkListener获得jsc。setLocalProperty(“线程”,“客户端”); //配置作业收到终止请求后的动作,

导致程序崩溃。这种情况不是很常见,但出现时也很头疼。通常,依赖关系冲突表现为NoSuchMethodError,ClassNotFoundException或与Spark作业执行期间与类加载有关的其他JVM异常。解决此问题的主要方法有两种:一种是修改您的应用程序以使用与Spark使用相同版本的依赖库。第二种是通过通常称为“阴影”的方式打包应用程序。 Maven构造工具通过使用shading插件(实际上,shading的功能也是将该插件命名为maven-shade-plugin的原因)来支持这种打包方法,以进行高级配置。阴影使您可以将冲突的程序包保留在另一个名称空间中,并自动重写应用程序代码,以便它们使用重命名的版本。该技术有些简单且粗糙,但是对于解决运行时依赖冲突的问题非常有效。 org。apache。maven。plugins maven-shade-plugin 2。3包阴影2。5。8 Spark可以使用哪些作业提交方法?提交方法有:本地模式(用于常规测试),独立模式(Spark自己的集群模式),Spark-On-Yarn模式(将Spark App提交给Yarn进行执行,这是更常用的),不常用)2。5 。9描述Spark资源的动态调整。为什么我们需要动态资源分配和动态控制率?默认情况下,Spark首先分配资源,然后执行计算,这是粗粒度的资源分配。粗粒度的好处:资源是预先分配的,因此在计算任务时直接使用这些资源。粗粒度的缺点:从Spark Streaming的角度来看,存在高低峰,并且在高峰和低峰期间所需资源不同;如果按照高峰的角度分配它们,将会浪费很多资源。
连续扫描执行器,例如:在一段时间内,执行器未收到任何任务,因此将删除执行器。调整动态资源时,最好不要设置太多内核。设置了太多的核心。如果资源调整得太频繁,那就麻烦了(Core的数量通常设置为奇数:3、5、7)。 Spark Streaming对处理资源的动态调整是Executor的动态调整。 Spark流在批处理期间执行。此批处理持续时间需要大量资源。下一个批处理持续时间不需要那么多资源。当您要调整资源时,您没有时间调整资源。当前批处理持续时间正在运行。如果已经过期,则此时的资源调整是浪费的。资源动态应用程序Spark流基于Spark核心,它还支持动态资源分配。您可以在spark。dynamicAllocation。enabled中配置是否启用动态分配。如果启用,则新的ExecutorAllocationManager。通过配置参数:spark。dynamicAllocation。enabled来查看是否需要启用Executor的动态分配。您可以在程序运行时连续设置spark。dynamicAllocation。enabled参数的值。如果支持动态资源分配,请使用ExecutorAllocationManager类。 2。5。10可以使用spark创建斐波那契数吗? Spark最初批量处理大量数据。如果将Spark用于实现Rabbit序列,则它是伪命题。
比递归实现更有效* @param n * @return * / def m2(n:Int):Int = {var n1 = 0 var n2 = 1 var res = 0 if(n》批处理间隔:过多的流量,群集繁忙,数据积压会导致系统崩溃。流量控制。通过设置spark。streaming。kafka。maxRatePerPartition,可以静态调整每个请求的最大流量,但是需要重新启动集群。背压机制不需要重新启动集群根据当前系统处理速度进行智能化调整流量阈值,将spark。streaming。backpressure。enabled设置为true,打开背压机制后,sparkStreaming将根据处理速率自动估算下一批流量阈值我们可以通过更改几个增益比率来控制其自动估计模型,其底层使用Guava令牌桶算法来实现当前限制:该程序将令牌提取到t进行存储,如果它获得令牌,它将缓存数据而无法获取。封锁并等待。可以通过更改令牌的速度来实现流量控制。其他方案2。6。2。3批量累积updataStateBykeyupdataStateBykey是特殊的reduceByKey,阶段在oldValue + reduceByKey(newValue1,newValue2)时,传入updateFunc以实现批次之间数据的累积。为此,必须设置checkPoint路径,updataStateBykey将自动将每次计算的结果持久保存到磁盘上的批次之间。数据缓存在内存中。缺点:使用了大量内存,生成了大量小文件mapwithStatemapwithState是spark1。6的新累积操作,目前仍在测试中,其原理不可在线使用,只知道它是对它的升级updataStateBykey版本,效率提高了10倍。缺点:信息不完整,社区很小,不建议使用状态流累积操作,建议使用窗口+第三方存储(redis)来达到同样的效果。
UpdateStateByKey:有关全局密钥状态的统计信息,但是即使没有数据输入,它也会返回每批中前一个密钥的状态。这样做的缺点是,如果数据量太大,并且我们需要检查数据,这将占用大量存储空间。如果要使用updateStateByKey,则需要设置一个检查点目录(updateStateByKey无法保存密钥本身的状态),并启用检查点机制。由于密钥的状态保留在内存中,如果该密钥处于关闭状态,则重新启动之前保留的状态将消失,因此,如果要长时间保存,则需要启用检查点以还原数据。实现案例:对象LoadKafkaDataDemo {def main(参数:Array 【String】):单元= {val checkpointDir =“ d:// cp--2” val ssc = StreamingContext。getOrCreate(checkpointDir,()=》 createContext)ssc。 start()ssc。awaitTermination()} / ** *此方法包含主要的计算逻辑,
自动提交偏移量“ enable。auto。commit”-》(true:java。lang。Boolean))val topic = Array(“ test1”)//使用者数据val日志:InputDStream 【ConsumerRecord 【String,String】】 = KafkaUtils 。 createDirectStream(ssc,LocationStrategies。PreferConsistent,ConsumerStrategies。Subscribe(topics,kafkaParam))//计算已消耗数据的字数//不需要关键数据,只保留值,因为value是实际的日志日志数据值行:DStream 【String】 = logs。map(_。Value())val tups:DStream 【(String,Int)】 = lines。flatMap(_。Split(“”))。 Map((_,1))val res = tups。updateStateByKey(func,new HashPartitioner(ssc。sparkContext。defaultParallelism),true)res。print ssc} val func =(it:迭代器【【String,Seq 【Int】,选项【Int】)】)=》 {it。map {case(a,b,c)=》 {(a,b。sum + c。getOrElse(0))}}}} MapWithState:也用于全局统计键状态,但是如果没有数据输入,则不会返回到先前的键状态,会有一点增量的感觉。这样做的好处是我们只关心已更改的密钥,

自动提交偏移量“ enable。auto。commit”-》(true:java。lang。Boolean))//指定主题val topic = Array(“ test1”)//使用者数据val日志:DStream 【String】 = KafkaUtils。createDirectStream (ssc,LocationStrategies。PreferConsistent,ConsumerStrategies。Subscribe(topics,kafkaParam))。map(_。value)val resultWordCount:DStream 【(String,Long)】 =日志。filter(line =》 line。nonEmpty)。flatMap(line =》 line。split(“”)。map((_,1)))。reduceByKey(_ + _)。mapWithState(spec)resultWordCount。print()//开始开始处理ssc。start()ssc。awaitTermination ()}} 2。6。3其他2。6。3。1一批SparkStreaming多长时间?一批中有多少数据?批次间隔需要与业务一起确定。如果实时性要求较高,则需要调低批处理间隔。每批中的数据量与每天生成的数据量直接相关,计算时需要考虑峰值条件。应该注意的是,批次间隔越长,将为每个批次计算出更多的数据。 2。6。3。2 SparkStreaming的消费速度跟不上生产速度怎么办?默认情况下,Spark Streaming使用接收方或直接方以生产方数据速率接收数据。
即,每批数据的处理时间比Spark Streaming批处理间隔长。越来越多的数据被接收,但是数据的处理速度跟不上,导致系统开始累积数据,这可能进一步导致执行器端的OOM问题和故障。 Spark Streaming 1。5之后的架构:●为了自动调整数据传输速率,将一个名为RateController的新组件添加到原始架构。该组件继承自StreamingListener,该侦听器侦听所有作业的onBatchCompleted事件,并基于processingDelay,SchedulingDelay,当前批处理的记录数以及处理完成事件来估计速率。此速率主要用于更新流每秒可以处理的最大记录数。速率估算器可以通过多种方式实现,但是当前的Spark 2。2仅实现基于PID的速率估算器。 ●InputDStreams内部的RateController将存储计算出的最大速率,在处理onBatchCompleted事件之后,它将把计算出的速率推送到ReceiverSupervisorImpl,以便接收器知道接下来应该接收多少数据。 ●如果用户配置spark。streaming。receiver。maxRate或spark。streaming。kafka。maxRatePerPartition,则最终接收多少数据取决于这三个中的最小值。换句话说,每个接收器或每个Kafka分区将每秒处理的数据不超过spark。streaming。receiver。maxRate或spark。streaming。kafka。maxRatePerPartition的值。
只需将spark。streaming。backpressure。enabled设置为true。此参数的默认值为false。背压机制还涉及以下参数,包括未在文档中列出的参数:●spark。streaming。backpressure。initialRate:启用背压机制时,每个接收器都会接收第一批数据的初始最大速率。未设置默认值。 ●spark。streaming。backpressure。rateEstimator:Rate estimator类,默认值为pid,Spark当前仅支持此类型,您可以根据需要实现。 ●spark。streaming。backpressure。pid。proportional:用于响应错误的权重(最后一批与当前批次之间的变化)。默认值为1,只能将其设置为非负值。对“错误”的响应权重(最后一批与该批次之间的变化)●spark。streaming。backpressure。pid。integral:由错误累积的响应权重,具有抑制作用(有效阻尼)。默认值为0。2,并且只能设置为非负值。权重为对误差积累的响应。这具有阻尼作用。 ●spark。streaming。backpressure。pid。derived:对错误趋势的响应权重。这可能会导致批次大小波动,从而有助于快速增加/减少容量。默认值为0,只能将其设置为非负值。
默认值为100,只能将其设置为非负值。上面是Spark背压机制,结合了Spark资源的动态调整(在以下问题中进行了详细说明),是问题2。6。3。4 SparkStreaming批处理间隔的完整解决方案,其中处理后的数据存储在批处理中interval是SparkStreaming处理实时需求的时间间隔,需要根据业务需求确定批处理间隔。实时需求的处理结果通常存储在可快速读取以提高效率的数据库中,例如Redis,MongoDB,HBase。 2。6。3。5 Spark Streaming的窗口大小和每个窗口处理的数据量。必须根据业务需要确定问题。例如,要实现的需求是计算每分钟的第一个小时内的在线人数。上面要求的窗口大小(窗口长度)为1小时,然后计算每个窗口要处理的数据量。窗口处理的数据量=每个批次处理的平均数据量*窗口中的批次数2。6。3。7 Spark Streaming如何使用MySQL数据,如果:在MySQL中的用户名为Zhang San,Spark已经被消耗掉了,但是这个我的名字改成了张小三,我该怎么办?如何同步? Spark Streaming是批处理。每一批的计算方法是使用来自MySQL的数据进行统计。获得结果后,结果将保存到相应的数据库中。如果此时更新了MySQL的字段值,那么更新后的值将不会影响前一批Streaming的结果,只会影响后一批的结果。除非是要覆盖以前的结果。 2。6。3。8 flink和spark流之间的区别在Spark流(独立模式)下运行的角色如下:主服务器:主要负责总体群集资源管理和应用程序调度; Worker:负责单个节点的资源管理,驱动程序和执行程序的启动等;驱动程序:执行用户输入程序的位置,即执行SparkContext的位置,
反馈执行状态和执行结果。 Flink运行时(独立模式)的主要角色是:Jobmanager:协调分布式执行,它们计划任务,协调检查点,协调故障恢复等。至少有一个JobManager。在高可用性下,可以启动多个JobManager,其中一个被选为领导者,另一个被选为待机者。 Taskmanager:负责执行特定任务,缓存和交换数据流,至少一个TaskManager;插槽:每个任务插槽代表一个固定的TaskManager。对于某些资源,插槽数量代表任务管理器可以并行执行的任务数量。运行模型比较Spark Streaming是一个微批处理过程。运行时,需要指定批处理时间。每次运行作业时都要处理一批数据。 Flink是事件驱动的。事件可以理解为消息。事件驱动的应用程序是一种状态应用程序,它可以从一个或多个流中注入事件,通过触发计算来更新状态,或使用外部操作来响应注入的事件。编程模型的比较Spark Streaming Spark Streaming和kafka的组合主要是两个模型:上面两个模型在编程机制上相似,但是API和内部数据获取方面存在一些差异。新版本取消了企业中常用的基于接收器的模型。基于直接Dstream的模式。
您可能对此有疑问。轮询kafka数据时,轮询用于批量获取数据(您可以设置批量大小和超时时间)。这不能称为事件触发。实际上,flink在内部从民意调查中筛选出数据,然后将其一一发出,从而形成了事件触发机制。
实际上,每个批次都是一个Spark Core任务。对于已编码的Spark Core任务,从生成到执行结束,它主要包括以下部分:Flink任务调度对于Flink的流任务客户端,它将首先生成StreamGraph,然后生成JobGraph,然后将jobGraph提交给Jobmanager,最终将jobGraph完成。ExecutionGraph的转换最终由jobManager调度和执行。如上图所示,有一个由数据源,MapFunction和ReduceFunction组成的程序。数据源和MapFunction的并发性为4,ReduceFunction的并发性为3。数据流由Source-Map-Reduce序列组成,并在每个TaskManager具有2个TaskManager和3个Task Slot的群集上运行。可以看出,将flink拓扑生成提交执行后,除非发生故障,否则拓扑组件的执行位置保持不变,并且并行度由每个运算符的并行度决定,类似于Storm。在Spark Streaming中,每个批处理都是根据数据局部性和资源条件进行调度的,并且没有固定的执行拓扑。 Flink是拓扑中的数据流执行,而Spark Streaming是数据高速缓存批处理的并行处理。时间机制比较就流处理程序的时间而言,有三个时间概念:处理时间处理时间是指每台计算机的系统时间。当流式传输程序使用处理时间时,将使用运行每个操作员实例的机器时间。处理时间是最简单的时间概念,不需要流和机器之间的协调。它提供了最佳的性能和最低的延迟。但是,在分布式和异步环境中,处理时间不能为消息事件提供时序保证,因为它受消息传输的延迟和操作员之间消息流动速度的限制。事件时间事件时间是指事件在其设备上发生的时间,
然后flink可以提取时间。基于事件时间处理的流程序可以保证处理事件时的顺序,但是基于事件时间的应用程序必须包含水印机制。基于事件的处理通常具有一定的滞后性,因为它需要等待后续事件并处理乱序事件,并且在使用对时间敏感的应用程序时应谨慎考虑。注入时间注入时间是将事件注入到flink中的时间。事件在源操作员处获取源的当前时间作为事件注入时间,随后基于时间的处理操作员将使用该时间来处理数据。与事件时间相比,注入时间无法处理乱序事件或滞后事件,但是应用程序乱序指定了如何生成水印。内部注入时间程序的过程与事件时间类似,但是时间戳分配和水印生成都是自动的。 Spark时间机制Spark流仅支持处理时间,结构化流支持处理时间和事件时间,并支持水印机制来处理滞后数据。 Flink时间机制flink支持三种时间机制:事件时间,注入时间,处理时间,并支持水印机制来处理滞后数据。容错机制和处理语义Spark Streaming保证只处理一次Spark Streaming任务,我们可以设置检查点,然后如果发生故障并重新启动,我们可以从最后一个检查点进行恢复,但是此行为只能使数据不丢失,可能是重复处理不能完全执行一次语义处理。对于结合了Spark Streaming和kafka的Direct Stream,您可以自己维护Zookeeper,kafka或任何其他外部系统的偏移量,并在每次提交结果后提交偏移量,以便故障恢复重启可以使用上次提交的偏移量进行恢复,确保数据不会丢失。
这时,我们需要确保处理结果的多次输出不会影响正常业务。由此可以进行分析,假设要确保仅对语义进行一次数据处理,则结果输出和偏移量提交必须在一次事务中完成。以下是三种做法:即,结果数据包含偏移量。这样,提交结果和提交偏移量是一个完成的操作,没有数据丢失或重复处理。故障恢复后,可以使用上次提交结果的偏移量。 Flink和kafka 0。11仅保证一次处理。如果接收器仅支持一种语义,则必须以事务方式将数据写入Kafka,以便在提交事务时,两个检查点之间的所有写操作都作为一个事务提交。这样可以确保在发生故障或崩溃时可以回滚这些写操作。在具有多个并发执行接收器的分布式应用程序中,仅执行一次提交或回滚是不够的,因为所有组件必须就这些提交或回滚达成共识,以确保结果一致。 Flink使用两阶段提交协议和预提交阶段来解决此问题。