`
Donald_Draper
  • 浏览: 945620 次
社区版块
存档分类
最新评论

netty 抽象调度事件执行器

阅读更多
netty 事件执行器组和事件执行器定义及抽象实现:http://donald-draper.iteye.com/blog/2391257
netty 多线程事件执行器组:http://donald-draper.iteye.com/blog/2391270
netty 多线程事件循环组:http://donald-draper.iteye.com/blog/2391276
引言:
前一篇文章我们看了多线程事件循环组,先来回顾一下:
     事件循环组EventLoopGroup继承了事件执行器组EventExecutorGroup,next方法返回的为事件循环EventLoop,事件循环组主要所做的工作为通道注册。
     事件循环EventLoop可理解为已顺序、串行的方式处理提交的任务的事件执行器EventExecutor。事件循环组EventLoopGroup可以理解为特殊的事件执行器组EventExecutorGroup;事件执行器组管理事件执行器,事件循环组管理事件循环。抽象事件循环AbstractEventLoop继承了抽象事件执行器AbstractEventExecutor,实现了事件循环接口。
     多线程事件循环组MultithreadEventLoopGroup继承了多线程事件执行器组,实现了事件循环组接口,相关注册通道方法委托给多线程事件循环组的next事件循环,线程工程创建的线程优先级默认为最大线程优先级;默认事件循环线程数为1和可用处理器数的2倍中的最大者,这个线程数就是构造多线程事件执行器组事件执行器数量。

今天我们要看的是Nio事件循环,先从NioEventLoopGroup来看,Nio事件循环如何创建:
//NioEventLoopGroup
/**
 * {@link MultithreadEventLoopGroup} implementations which is used for NIO {@link Selector} based {@link Channel}s.
 */
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    ...
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
}

从Nio事件循环组创建事件循环可以看出,事件循环为NioEventLoop。

/**
 * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
 * {@link Selector} and so does the multi-plexing of these in the event loop.
 *
 */
public final class NioEventLoop extends SingleThreadEventLoop {

/**
 * Abstract base class for {@link EventLoop}s that execute all its submitted tasks in a single thread.
 *
 */
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {


/**
 * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
 *
 */
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {


我们先从抽象调度事件执行器开始看:
package io.netty.util.concurrent;

import io.netty.util.internal.ObjectUtil;

import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * Abstract base class for {@link EventExecutor}s that want to support scheduling.
 */
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {

    Queue<ScheduledFutureTask<?>> scheduledTaskQueue;//调度任务队列

    protected AbstractScheduledEventExecutor() {
    }

    protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
        super(parent);
    }
}

我们先来看调度任务ScheduledFutureTask

package io.netty.util.concurrent;

import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

@SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {


//ScheduledFuture
package io.netty.util.concurrent;

/**
 * The result of an scheduled asynchronous operation.
 异步调度操作,继承了JUC的ScheduledFuture
 */
@SuppressWarnings("ClassNameSameAsAncestorName")
public interface ScheduledFuture<V> extends Future<V>, java.util.concurrent.ScheduledFuture<V> {
}


回到调度任务
@SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
    private static final AtomicLong nextTaskId = new AtomicLong();//调度任务id生成器
    private static final long START_TIME = System.nanoTime();//调度开始时间
    //当前已经开始的纳秒时间
    static long nanoTime() {
        return System.nanoTime() - START_TIME;
    }
    //第一个调度任务延时时间
    static long deadlineNanos(long delay) {
        return nanoTime() + delay;
    }
    private final long id = nextTaskId.getAndIncrement();//当前调度任务id
    private long deadlineNanos;//延时时间
    /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
    //调度任务间隔时间,0,不重复,大于0,周期性调度,小于零,固定延时调度,这个和JUC的调度任务一样
    private final long periodNanos;
    //延时调度非重复任务
    ScheduledFutureTask(
            AbstractScheduledEventExecutor executor,
            Runnable runnable, V result, long nanoTime) {
        this(executor, toCallable(runnable, result), nanoTime);
    }
    //延时调度非重复任务
    ScheduledFutureTask(
            AbstractScheduledEventExecutor executor,
            Callable<V> callable, long nanoTime) {

        super(executor, callable);
        deadlineNanos = nanoTime;
        periodNanos = 0;
    }
    //重复调度任务
    ScheduledFutureTask(
            AbstractScheduledEventExecutor executor,
            Callable<V> callable, long nanoTime, long period) {

        super(executor, callable);
        if (period == 0) {
            throw new IllegalArgumentException("period: 0 (expected: != 0)");
        }
        deadlineNanos = nanoTime;
        periodNanos = period;
    }
}

在一个构造方法中
//延时调度非重复任务
 ScheduledFutureTask(
         AbstractScheduledEventExecutor executor,
         Runnable runnable, V result, long nanoTime) {
     this(executor, toCallable(runnable, result), nanoTime);
 }

有一点我们要关注为:
toCallable(runnable, result)//将任务线程与结果类型包装成Callable任务
由于调度任务继承了PromiseTask类,此方法在PromiseTask中定义,如下:
//PromiseTask
package io.netty.util.concurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.RunnableFuture;

class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
   //任务线程与结果类型包装成Callable任务
    static <T> Callable<T> toCallable(Runnable runnable, T result) {
        return new RunnableAdapter<T>(runnable, result);
    }
    //返回值线程包装类
    private static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        @Override
        public T call() {
            task.run();
            return result;
        }

        @Override
        public String toString() {
            return "Callable(task: " + task + ", result: " + result + ')';
        }
    }
    protected final Callable<V> task;//内部任务线程
    PromiseTask(EventExecutor executor, Runnable runnable, V result) {
        this(executor, toCallable(runnable, result));
    }
    PromiseTask(EventExecutor executor, Callable<V> callable) {
        super(executor);
        task = callable;
    }
  ...
}


回到调度任务

再来看调度任务的其他方法:
//获取调度任务时间执行器
 @Override
 protected EventExecutor executor() {
     return super.executor();
 }
//调度任务延时时间
 public long deadlineNanos() {
     return deadlineNanos;
 }
//剩余延时时间
 public long delayNanos() {
     return Math.max(0, deadlineNanos() - nanoTime());
 }
//根据当前时间,计算剩余延时时间
 public long delayNanos(long currentTimeNanos) {
     return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
 }
//转换延时时间为纳秒
 @Override
 public long getDelay(TimeUnit unit) {
     return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
 }
//比较延时任务,先比较延时时间,相等,则比较id
 @Override
 public int compareTo(Delayed o) {
     if (this == o) {
         return 0;
     }

     ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
     long d = deadlineNanos() - that.deadlineNanos();
     if (d < 0) {
         return -1;
     } else if (d > 0) {
         return 1;
     } else if (id < that.id) {
         return -1;
     } else if (id == that.id) {
         throw new Error();
     } else {
         return 1;
     }
 }

这个使我们主要看的方法
 @Override
 public void run() {
     //如果执行器不在当前事件循环中,则断言失败
     assert executor().inEventLoop();
     try {
         if (periodNanos == 0) {
	     //非重复调度任务,设置任务不可取消
             if (setUncancellableInternal()) {
	        //执行任务
                 V result = task.call();
		 //设置执行结果
                 setSuccessInternal(result);
             }
         } else {
             // check if is done as it may was cancelled
	     //若是周期性任务,则检查调度任务是否取消
             if (!isCancelled()) {
	         //没有取消,直接执行调度任务
                 task.call();
                 if (!executor().isShutdown()) {
		     //如果当前事件执行器没有关闭,则重新计算下一次任务调度的延时时间
                     long p = periodNanos;
                     if (p > 0) {
                         deadlineNanos += p;
                     } else {
                         deadlineNanos = nanoTime() - p;
                     }
                     if (!isCancelled()) { 
		         //如果调度任务没取消,则添加调度任务到关联的调度事件执行器调度任务队列
                         // scheduledTaskQueue can never be null as we lazy init it before submit the task!
                         Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
                                 ((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
                         assert scheduledTaskQueue != null;
                         scheduledTaskQueue.add(this);
                     }
                 }
             }
         }
     } catch (Throwable cause) {
         //设置失败异常
         setFailureInternal(cause);
     }
 }

 @Override
 public boolean cancel(boolean mayInterruptIfRunning) {
     boolean canceled = super.cancel(mayInterruptIfRunning);
     if (canceled) {
         //如果取消成功,从调度事件执行器,调度任务队列移除任务
         ((AbstractScheduledEventExecutor) executor()).removeScheduled(this);
     }
     return canceled;
 }
//取消调度任务,不从调度事件执行器,调度任务队列移除任务
 boolean cancelWithoutRemove(boolean mayInterruptIfRunning) {
     return super.cancel(mayInterruptIfRunning);
 }

从上面来看,调度任务ScheduledFutureTask,内部有一个任务调度延时变量deadlineNanos,用于记录下一次调度的延时时间;调度任务间隔时间periodNanos为0,调度任务非周期性任务,大于0,周期性调度,小于零,固定延时调度;对于创建Runnable形式的调度,要先包装成Callable任务;调度任务执行时,对于非周期性任务,则直接执行,而周期性与间歇性任务,计算任务下一次任务调度的延时时间,如果调度任务没取消,则添加调度任务到关联的调度事件执行器调度任务队列。调度任务的比较先比较延时时间,如果延时时间相等,则比较任务id(延时时间和id越大,任务执行越靠后)。

看完调度任务,我们回到抽象调度事件执行器:
package io.netty.util.concurrent;

import io.netty.util.internal.ObjectUtil;

import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * Abstract base class for {@link EventExecutor}s that want to support scheduling.
 */
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {

    Queue<ScheduledFutureTask<?>> scheduledTaskQueue;//调度任务队列,类型为PriorityQueue(平衡二叉树)

    protected AbstractScheduledEventExecutor() {
    }

    protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
        super(parent);
    }

    protected static long nanoTime() {
        return ScheduledFutureTask.nanoTime();
    }
    //获取调度任务队列
    Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
        if (scheduledTaskQueue == null) {
	   //调度任务队列,以调度任务的延时时间和任务ID为比较因子,两者越大,越靠后调度
	   //优先级队列是一个平衡二叉树,延时时间小的任务在树的左边,大的在右边
            scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
        }
        return scheduledTaskQueue;
    }
    //判断任务队列是否为空
    private static  boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
        return queue == null || queue.isEmpty();
    }

    /**
     * Cancel all scheduled tasks.
     *取消所有调度任务,调用方法的线程必须在当前事件循环中
     * This method MUST be called only when {@link #inEventLoop()} is {@code true}.
     */
    protected void cancelScheduledTasks() {
        assert inEventLoop();
        Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
        if (isNullOrEmpty(scheduledTaskQueue)) {
            return;
        }

        final ScheduledFutureTask<?>[] scheduledTasks =
                scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]);
        //遍历调度任务队列中的任务,取消任务
        for (ScheduledFutureTask<?> task: scheduledTasks) {
            task.cancelWithoutRemove(false);
        }
       //清空调度任务队列
        scheduledTaskQueue.clear();
    }

    /**
     * @see #pollScheduledTask(long)
     拉取调度任务
     */
    protected final Runnable pollScheduledTask() {
        return pollScheduledTask(nanoTime());
    }

    /**
     * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
     * You should use {@link #nanoTime()} to retrieve the the correct {@code nanoTime}.
     返回延时时间已到的调度任务,及准备就绪待执行的调度任务
     */
    protected final Runnable pollScheduledTask(long nanoTime) {
        assert inEventLoop();
        //从调度任务队列peek一个调度任务
        Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
        ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
        if (scheduledTask == null) {
            return null;//空,则返回null
        }
        //否则如果队列头部的任务延时时间小于,延时任务已运行的时间,则返回调度任务,待执行
        if (scheduledTask.deadlineNanos() <= nanoTime) {
            scheduledTaskQueue.remove();
            return scheduledTask;
        }
        return null;
    }

    /**
     * Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
     返回下一个调度任务,调度需要等待的时间,没有任务返回-1
     */
    protected final long nextScheduledTaskNano() {
        Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
        ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
        if (scheduledTask == null) {
            return -1;
        }
	//0,表示延时时间已到,调度任务
        return Math.max(0, scheduledTask.deadlineNanos() - nanoTime());
    }
   //获取调度任务队列头部的任务
    final ScheduledFutureTask<?> peekScheduledTask() {
        Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
        if (scheduledTaskQueue == null) {
            return null;
        }
        return scheduledTaskQueue.peek();
    }

    /**
     * Returns {@code true} if a scheduled task is ready for processing.
     是否有延时时间已到的调度任务
     */
    protected final boolean hasScheduledTasks() {
        Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
        ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
        return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
    }
   //调度Runnable任务
    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
       //检查线程,事件单元是否为空
        ObjectUtil.checkNotNull(command, "command");
        ObjectUtil.checkNotNull(unit, "unit");
        if (delay < 0) {
	    //如果延时时间小于0,延时时间为0
            delay = 0;
        }
	//将Runnable任务包装成调度任务,委托给schedule(final ScheduledFutureTask<V> task) 
        return schedule(new ScheduledFutureTask<Void>(
                this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
    }
    //下面几个调度延时任务,周期性任务,间歇任务,都是先将任务包装成调度任务ScheduledFutureTask,
    //然后委托给schedule(final ScheduledFutureTask<V> task)方法
    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        ObjectUtil.checkNotNull(callable, "callable");
        ObjectUtil.checkNotNull(unit, "unit");
        if (delay < 0) {
            delay = 0;
        }
        return schedule(new ScheduledFutureTask<V>(
                this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        ObjectUtil.checkNotNull(command, "command");
        ObjectUtil.checkNotNull(unit, "unit");
        if (initialDelay < 0) {
            throw new IllegalArgumentException(
                    String.format("initialDelay: %d (expected: >= 0)", initialDelay));
        }
        if (period <= 0) {
            throw new IllegalArgumentException(
                    String.format("period: %d (expected: > 0)", period));
        }

        return schedule(new ScheduledFutureTask<Void>(
                this, Executors.<Void>callable(command, null),
                ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        ObjectUtil.checkNotNull(command, "command");
        ObjectUtil.checkNotNull(unit, "unit");
        if (initialDelay < 0) {
            throw new IllegalArgumentException(
                    String.format("initialDelay: %d (expected: >= 0)", initialDelay));
        }
        if (delay <= 0) {
            throw new IllegalArgumentException(
                    String.format("delay: %d (expected: > 0)", delay));
        }

        return schedule(new ScheduledFutureTask<Void>(
                this, Executors.<Void>callable(command, null),
                ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
    }
   //调度任务
    <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
        if (inEventLoop()) {
	    //如果线程在当前事务循环中,则添加调度任务到调度任务队列
            scheduledTaskQueue().add(task);
        } else {
	    //否则直接创建一个线程,完成添加调度任务到调度任务队列工作
            execute(new Runnable() {
                @Override
                public void run() {
                    scheduledTaskQueue().add(task);
                }
            });
        }

        return task;
    }
    //移除调度任务队列
    final void removeScheduled(final ScheduledFutureTask<?> task) {
        if (inEventLoop()) {
	    //如果线程在当前事务循环中,则直接从调度任务队列,移除调度任务
            scheduledTaskQueue().remove(task);
        } else {
	   //否则创建一个线程,完成移除工作
            execute(new Runnable() {
                @Override
                public void run() {
                    removeScheduled(task);
                }
            });
        }
    }
}

在调度周期性和间歇性任务,包装任务线程为调度任务时,有这么一句:
Executors.<Void>callable(command, null)

//Executors,JUC执行器工厂,这个JUC篇以说,这里简单列出来
 /**
     * Returns a {@link Callable} object that, when
     * called, runs the given task and returns the given result.  This
     * can be useful when applying methods requiring a
     * <tt>Callable</tt> to an otherwise resultless action.
     * @param task the task to run
     * @param result the result to return
     * @return a callable object
     * @throws NullPointerException if task null
     */
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
	//包装成RunnableAdapter
        return new RunnableAdapter<T>(task, result);
    }

从上面可以看出,抽象调度事件执行器AbstractScheduledEventExecutor,内部有一个调度任务队列scheduledTaskQueue(PriorityQueue),用于存储待调度的任务。抽象调度事件执行器无论是调度任务线程,周期性任务,还是间歇性任务,先将任务包装成调度任务ScheduledFutureTask,然后委托给#schedule(final ScheduledFutureTask<V> task)方法,#schedule方法首先判断线程是否在当前事务循环,如果在,则添加调度任务到调度任务队列,否则直接创建一个线程,完成添加调度任务到调度任务队列工作;
移除调度任务的思想与调度任务相同,只不过执行移除操作。


总结:

      调度任务ScheduledFutureTask,内部有一个任务调度延时变量deadlineNanos,用于记录下一次调度的延时时间;调度任务间隔时间periodNanos为0,调度任务非周期性任务,大于0,周期性调度,小于零,固定延时调度;对于创建Runnable形式的调度,要先包装成Callable任务;调度任务执行时,对于非周期性任务,则直接执行,而周期性与间歇性任务,计算任务下一次任务调度的延时时间,如果调度任务没取消,则添加调度任务到关联的调度事件执行器调度任务队列。
      抽象调度事件执行器AbstractScheduledEventExecutor,内部有一个调度任务队列
scheduledTaskQueue(PriorityQueue),用于存储待调度的任务。抽象调度事件执行器无论是调度任务线程,周期性任务,还是间歇性任务,先将任务包装成调度任务ScheduledFutureTask,然后委托给#schedule(final ScheduledFutureTask<V> task)方法,#schedule方法首先判断线程是否在当前事务循环,如果在,则添加调度任务到调度任务队列,否则直接创建一个线程,完成添加调度任务到调度任务队列工作;移除调度任务的思想与调度任务相同,只不过执行移除操作。



附:
//PromiseTask,可以异步任务结果重要的部分我们在文中已说,其他的一看就明白,
这里只贴出PromiseTask源码,不在讲解。
package io.netty.util.concurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.RunnableFuture;

class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {

    static <T> Callable<T> toCallable(Runnable runnable, T result) {
        return new RunnableAdapter<T>(runnable, result);
    }

    private static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;

        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }

        @Override
        public T call() {
            task.run();
            return result;
        }

        @Override
        public String toString() {
            return "Callable(task: " + task + ", result: " + result + ')';
        }
    }

    protected final Callable<V> task;

    PromiseTask(EventExecutor executor, Runnable runnable, V result) {
        this(executor, toCallable(runnable, result));
    }

    PromiseTask(EventExecutor executor, Callable<V> callable) {
        super(executor);
        task = callable;
    }

    @Override
    public final int hashCode() {
        return System.identityHashCode(this);
    }

    @Override
    public final boolean equals(Object obj) {
        return this == obj;
    }

    @Override
    public void run() {
        try {
            if (setUncancellableInternal()) {
                V result = task.call();
                setSuccessInternal(result);
            }
        } catch (Throwable e) {
            setFailureInternal(e);
        }
    }

    @Override
    public final Promise<V> setFailure(Throwable cause) {
        throw new IllegalStateException();
    }

    protected final Promise<V> setFailureInternal(Throwable cause) {
        super.setFailure(cause);
        return this;
    }

    @Override
    public final boolean tryFailure(Throwable cause) {
        return false;
    }

    protected final boolean tryFailureInternal(Throwable cause) {
        return super.tryFailure(cause);
    }

    @Override
    public final Promise<V> setSuccess(V result) {
        throw new IllegalStateException();
    }

    protected final Promise<V> setSuccessInternal(V result) {
        super.setSuccess(result);
        return this;
    }

    @Override
    public final boolean trySuccess(V result) {
        return false;
    }

    protected final boolean trySuccessInternal(V result) {
        return super.trySuccess(result);
    }

    @Override
    public final boolean setUncancellable() {
        throw new IllegalStateException();
    }
   //设置任务不可取消
    protected final boolean setUncancellableInternal() {
        return super.setUncancellable();
    }

    @Override
    protected StringBuilder toStringBuilder() {
        StringBuilder buf = super.toStringBuilder();
        buf.setCharAt(buf.length() - 1, ',');

        return buf.append(" task: ")
                  .append(task)
                  .append(')');
    }
}

来看设置任务不可取消方法
//DefaultPromise
package io.netty.util.concurrent;

import io.netty.util.Signal;
import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
    private static final InternalLogger rejectedExecutionLogger =
            InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
    private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
            SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
    @SuppressWarnings("rawtypes")
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");//任务结果状态
    private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS");//成功
    private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE");//不可取消
    private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
            new CancellationException(), DefaultPromise.class, "cancel(...)"));

    private volatile Object result;
    private final EventExecutor executor;
    /**
     * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
     * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
     *
     * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
     */
    private Object listeners;
    /**
     * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
     */
    private short waiters;

    /**
     * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
     * executor changes.
     */
    private boolean notifyingListeners;
  //设置任务不可取消
  @Override
    public boolean setUncancellable() {
        if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
            return true;
        }
        Object result = this.result;
        return !isDone0(result) || !isCancelled0(result);
    }
}

0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics