<small id='nQmWDb'></small> <noframes id='q8XuWorOsC'>

  • <tfoot id='2iUyBkzI'></tfoot>

      <legend id='eH0mrxa'><style id='Gm8vJrc'><dir id='OKe1cC'><q id='CK8p'></q></dir></style></legend>
      <i id='zvuxSM7XfB'><tr id='CpcT9y'><dt id='zcoRSQ'><q id='de3Qq'><span id='gDBLJINK'><b id='sYkIt'><form id='2xkVaG3H'><ins id='qgdnE0'></ins><ul id='nozKe'></ul><sub id='Ay7h'></sub></form><legend id='CUukcBVWMv'></legend><bdo id='3BlLoDC'><pre id='CwKnbQp'><center id='bvDVHw4'></center></pre></bdo></b><th id='dOSFZic'></th></span></q></dt></tr></i><div id='b8KN3SMOC'><tfoot id='QyfS4KB'></tfoot><dl id='UrVBQzPL'><fieldset id='OEXou4'></fieldset></dl></div>

          <bdo id='1YzEpHAIST'></bdo><ul id='VeujTNi'></ul>

          1. <li id='usAQvgOay'></li>
            登陆

            章鱼网竟彩足球推荐-Disruptor结构源码阅览-怎么不重复消费

            admin 2019-12-13 269人围观 ,发现0个评论

            RingBuffer 怎样确保数据不丢掉

            由于ringbuffer是一个环形的行列,那么出产者和顾客在遍历这个行列的时分,怎样制衡呢?

            1、出产快,消费慢,数据丢掉?

            出产者速度过快,导致一个目标还没消费完,就循环出产了一个新的目标要参加ringbuffer,导致消费不完整,构成数据丢掉?

            咱们留意到,在咱们获取出产者下一个方位的时分,是经过ringbuffer的next办法,而这个next办法是调用了sequencer的next办法

            这个目标,在咱们创立disruptor目标的时分,创立的

            所以这个ringbuffer便是disruptor中的sequencer目标,那么在进行获取next的时分,这里是怎样获取下一个的呢?是否会对这个出产获取下一个序列进行相应的等候战略,防止发作相应的搅扰!!!

            这个各位看官还需多看看里边的代码以及封装(特别是封装,真是九转十八弯),多了解,我这绕着绕着很简略就绕晕了,刚开始也是云里雾里。

            EventProcessor接口概览

            EventProcessor望文生义,便是事情处理器(handle和process都能够翻译为“处理”,可是process侧重于机器的处理,而handle侧重于有人工的处理,所以运用handl章鱼网竟彩足球推荐-Disruptor结构源码阅览-怎么不重复消费e表明用户逻辑的处理,运用process表明机器的处理),这个接口有两个完成类,分别是WorkProcessor和BatchEventProcessor,它们对应的逻辑处理顾客分别是EventHandler和WorkHandler。下面是EventProcessor的UML类图及EventHandler和EventProcessor的接口界说。



            /**
            * Callback interface to be implemented for processing events as they become available in the {@link RingBuffer}
            *
            * @param event implementation storing the data for sharing during exchange or parallel coordination of an event.
            * @see BatchEventProcessor#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the handler.
            * 处理事情的回调接口
            */
            public interface EventHandler
            {
            /**
            * Called when a publisher has published an event to the {@link RingBuffer}
            *
            * @param event published to the {@link RingBuffer}
            * @param sequence of the event being processed
            * @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
            * @throws Exception if the EventHandler would like the exception handled further up the chain.
            */
            void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
            }
            /**
            * EventProcessors waitFor events to become available for consumption from the {@link RingBuffer}
            *


            * An EventProcessor will generally be associated with a Thread for execution.
            * 事情履行器,等候RingBuffer有可用消费事情。一个事情处理器相关一个履行线程
            */
            public interface EventProcessor extends Runnable
            {
            /**
            * Get a reference to the {@link Sequence} being used by this {@link EventProcessor}.
            *
            * @return reference to the {@link Sequence} for this {@link EventProcessor}
            */
            Sequence getSequence();
            /**
            * Signal that this EventProcessor should stop when it has finished consuming at the next clean break.
            * It will call {@link SequenceBarrier#alert()} to notify the thread to check status.
            */
            void halt();
            boolean isRunning();
            }

            EventProcessor接口承继了Runnable接口,首要有两种完成:单线程批量处理BatchEventProcessor和多线程处理WorkProcessor

            在运用Disruptor协助类构建顾客时,运用handleEventsWith办法传入多个EventHandler,内部运用多个BatchEventProcessor相关多个线程履行。这种状况相似JMS中的发布订阅方式,同一事情会被多个顾客并行消费。适用于同一事情触发多种操作。

            而运用Disruptor的handleEventsWithWorkerPool传入多个WorkHandler时,内部运用多个WorkProcessor相关多个线程履行。这种状况相似JMS的点对点方式,同一事情会被一组顾客其中之一消费。适用于提高顾客并行处理才干。

            消费技能完成

            咱们先回忆下Disruptor顾客的两个特色:顾客依靠图(即下文所谓的“消费链”)和事情多播。

            假定现在有A,B,C,D四个顾客,它们都能组成什么样的方式呢?从很多的排列组合中,我挑了4组比较有代表性的消费链方式。



            image.png

            • 第1组中,顾客A消费按成后,B、C、D可一起消费;
            • 第2组中,顾客A、B、C、D次序消费;
            • 第3组中,顾客A、B次序消费后,C、D一起消费;
            • 第4组中,顾客A在消费完成后,B和C能够一起消费,可是有必要在都消费完成后,D才干消费。

            标号为1、3、4的消费链都运用了事情多播,可见事情多播归于消费链的一种组合方式。留意,在上面4种组合中,每个组合的每一水平行,都归于一个顾客组。

            这些还仅仅较为简略的消费链组成,实践中消费链或许会更杂乱。

            那么在Disruptor内部是怎样完成消费链的呢?

            咱们能够先考虑下。假如想把独立的顾客组成消费链,那么后方的顾客(组)必定要知道在它前方的顾客(组)的处理状况,不然就做不到次序消费。一起,顾客也要了解出产者的方位,来判别是否有可用事情。之前咱们剖析出产者代码的时分,现已讲过,出产者为了不掩盖没有消费彻底的事情,有必要知道最慢顾客的处理状况

            做到了这些才会有才干去操控顾客组成消费链。下面让咱们详细看Disruptor中的完成。

            单出产者,多顾客方式。多顾客关于音讯不重复消费。

            package liuqiang.complex.multi;
            import com.lmax.disruptor.EventFactory;
            import com.lmax.disruptor.RingBuffer;
            import com.lmax.disruptor.YieldingWaitStrategy;
            import com.lmax.disruptor.dsl.Disruptor;
            import com.lmax.disruptor.dsl.EventHandlerGroup;
            import com.lmax.disruptor.dsl.ProducerType;
            import liuqiang.complex.common.*;
            import java.util.concurrent.Executors;
            public class Main3 {
            //单出产者,多顾客方式。多顾客关于音讯不重复消费。例如:1线程消费了音讯0,则2线程只能从0后边的音讯消费,不能对音讯0进行消费。
            public static void main(String[] args) throws Exception {
            EventFactory factory = new OrderFactory();
            int ringBufferSize = 1024 * 1024;
            Disruptor disruptor =
            new Disruptor(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
            /*
            * 该办法传入的顾客需求完成WorkHandler接口,办法的内部完成是:先创立WorkPool,然后封装WorkPool为EventHandlerPool回来。
            * 顾客1、2关于音讯的消费有时有竞赛,确保同一音讯只能有一个顾客消费
            */
            disruptor.handleEventsWithWorkerPool(new OrderHandler1("1"), new OrderHandler1("2"));
            disruptor.start();
            RingBuffer ringBuffer = disruptor.getRingBuffer();
            Producer producer = new Producer(ringBuffer);
            //单出产者,出产3条数据
            for (int l = 0; l < 3; l++) {
            producer.onData(l + "");
            }
            //为了确保顾客线程现已发动,留足满足的时刻。详细原因详见另一篇博客:disruptor的shutdown失效问题
            Thread.sleep(1000);
            disruptor.shutdown();
            }
            }

            调用handleEventsWithWorkerPool构成WorkerPool,并进一步章鱼网竟彩足球推荐-Disruptor结构源码阅览-怎么不重复消费封装成EventHandlerGroup。关于同一条音讯,两顾客不重复消费。

            或许输出成果如下:

            OrderHandler1 1,消费信息:0

            OrderHandler1 2,消费信息:1

            OrderHandler1 1,消费信息:2

            顾客可用序列屏障-SequenceBarrier

            咱们要点看一下SequenceBarrier,可直译为“序列屏障”。SequenceBarrier的首要作用是和谐获取顾客可处理到的最大序号,内部持有着出产者和其依靠的顾客序列。它的接口界说如下。

            public interface SequenceBarrier
            {
            /**
            * Wait for the given sequence to be available for consumption.

            * 等候指定序列可用
            * @param sequence to wait for
            * @return the sequence up to which is available
            * @throws AlertException if a status change has occurred for the Disruptor
            * @throws InterruptedException if the thread needs awaking on a condition variable.
            * @throws TimeoutException
            *
            */
            long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;
            /**
            * Get the current cursor value that can be read.

            * 获取当时可读游标值
            *
            * @return value of the cursor for entries that have been published.
            *
            */
            long getCursor();
            /**
            * The current alert status for the barrier.

            * 当时的alert状况
            *
            * @return true if in alert otherwise false.
            */
            boolean isAlerted();
            /**
            * Alert the {@link EventProcessor}s of a status change and stay in this status until cleared.

            *
            * 告诉顾客状况改变。当调用EventProcessor#halt()将调用此办法。
            */
            void alert();
            /**
            * Clear the current alert status.

            * 清楚alert状况
            章鱼网竟彩足球推荐-Disruptor结构源码阅览-怎么不重复消费*/
            void clearAlert();
            /**
            * Check if an alert has been raised and throw an {@link AlertException} if it has.
            * 查看是否发作alert,发作将抛出反常
            * @throws AlertException if alert has been raised.
            */
            void checkAlert() throws AlertException;
            }
            SequenceBarrier实例引证被EventProcessor持有,用于等候并获取可用的消费事情,首要体现在waitFor这个办法。

            要完成这个功用,需求3点条件:

            1. 知道出产者的方位。
            2. 由于Disruptor支撑顾客链,在不同的顾客组之间,要确保后边的消 费者组只要在前顾客组中的顾客都处理完毕后,才干进行处理。
            3. 暂时没有事情可消费,在等候可用消费时,还需求运用某种等候战略进行等候。

            看下SequenceBarrier完成类ProcessingSequenceBarrier的代码是怎样完成waitFor办法。

            final class ProcessingSequenceBa章鱼网竟彩足球推荐-Disruptor结构源码阅览-怎么不重复消费rrier implements SequenceBarrier
            {
            private final WaitStrategy waitStrategy; // 等候可用消费时,指定的等候战略
            private final Sequence dependentSequence; // 依靠的上组顾客的序号,假如当时为第一组则为cursorSequence(即出产者发布游标序列),不然运用FixedSequenceGroup封装上组顾客序列
            private volatile boolean alerted = false; // 当触发halt时,将符号alerted为true
            private final Sequence cursorSequence; // 章鱼网竟彩足球推荐-Disruptor结构源码阅览-怎么不重复消费AbstractSequencer中的cursor引证,记载当时发布者发布的最新方位
            private final Sequencer sequencer; // MultiProducerSequencer 或 SingleProducerSequencer
            public ProcessingSequenceBarrier(
            final Sequencer sequencer,
            final WaitStrategy waitStrategy,
            final Sequence cursorSequence,
            final Sequence[] dependentSequences)
            {
            this.sequencer = sequencer;
            this.waitStrategy = waitStrategy;
            this.cursorSequence = cursorSequence;
            if (0 == dependentSequences.length) // 依靠的上一组序列长度,第一次是0
            {
            dependentSequence = cursorSequence;
            }
            else // 将上一组序列数组复制成新数组保存,引证不变
            {
            dependentSequence = new FixedSequenceGroup(dependentSequences);
            }
            }
            @Override
            public long waitFor(final long sequence)
            throws AlertException, InterruptedException, TimeoutException
            {
            // 查看是否中止服务
            checkAlert();
            // 获取最大可用序号 sequence为给定序号,一般为当时序号+1,cursorSequence记载出产者最新方位,
            long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
            if (availableSequence < sequence)
            {
            return availableSequence;
            }
            // 回来已发布最高的序列值,将对每个序号廉江进行校验
            return sequencer.getHighestPublishedSequence(sequence, availableSequence);
            }
            // ...
            }
            请关注微信公众号
            微信二维码
            不容错过
            Powered By Z-BlogPHP