From b864ea89d7ad7fac1f568550340177fecd9327f1 Mon Sep 17 00:00:00 2001 From: huanghongxun Date: Wed, 5 Feb 2020 11:58:50 +0800 Subject: [PATCH] add: restore old sync task executor --- .../jackhuang/hmcl/game/LauncherHelper.java | 2 +- .../TaskExecutorDialogWizardDisplayer.java | 2 +- .../hmcl/task/CancellableTaskExecutor.java | 254 ++++++++++++++++++ .../org/jackhuang/hmcl/task/Schedulers.java | 70 ++++- .../java/org/jackhuang/hmcl/task/Task.java | 17 ++ 5 files changed, 342 insertions(+), 3 deletions(-) create mode 100644 HMCLCore/src/main/java/org/jackhuang/hmcl/task/CancellableTaskExecutor.java diff --git a/HMCL/src/main/java/org/jackhuang/hmcl/game/LauncherHelper.java b/HMCL/src/main/java/org/jackhuang/hmcl/game/LauncherHelper.java index b8083d003..52a1ff581 100644 --- a/HMCL/src/main/java/org/jackhuang/hmcl/game/LauncherHelper.java +++ b/HMCL/src/main/java/org/jackhuang/hmcl/game/LauncherHelper.java @@ -213,7 +213,7 @@ public final class LauncherHelper { }); } }) - .executor(); + .cancellableExecutor(); launchingStepsPane.setExecutor(executor, false); executor.addTaskListener(new TaskListener() { diff --git a/HMCL/src/main/java/org/jackhuang/hmcl/ui/wizard/TaskExecutorDialogWizardDisplayer.java b/HMCL/src/main/java/org/jackhuang/hmcl/ui/wizard/TaskExecutorDialogWizardDisplayer.java index 6e6797405..53ddab7a8 100644 --- a/HMCL/src/main/java/org/jackhuang/hmcl/ui/wizard/TaskExecutorDialogWizardDisplayer.java +++ b/HMCL/src/main/java/org/jackhuang/hmcl/ui/wizard/TaskExecutorDialogWizardDisplayer.java @@ -60,7 +60,7 @@ public interface TaskExecutorDialogWizardDisplayer extends AbstractWizardDisplay } runInFX(() -> { - TaskExecutor executor = task.executor(new TaskListener() { + TaskExecutor executor = task.cancellableExecutor(new TaskListener() { @Override public void onStop(boolean success, TaskExecutor executor) { runInFX(() -> { diff --git a/HMCLCore/src/main/java/org/jackhuang/hmcl/task/CancellableTaskExecutor.java b/HMCLCore/src/main/java/org/jackhuang/hmcl/task/CancellableTaskExecutor.java new file mode 100644 index 000000000..d47dac599 --- /dev/null +++ b/HMCLCore/src/main/java/org/jackhuang/hmcl/task/CancellableTaskExecutor.java @@ -0,0 +1,254 @@ +package org.jackhuang.hmcl.task; + +import org.jackhuang.hmcl.util.Logging; +import org.jackhuang.hmcl.util.function.ExceptionalRunnable; + +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; + +public class CancellableTaskExecutor extends TaskExecutor { + + private final ConcurrentLinkedQueue> workerQueue = new ConcurrentLinkedQueue<>(); + private Executor scheduler = Schedulers.newThread(); + + public CancellableTaskExecutor(Task task) { + super(task); + } + + @Override + public TaskExecutor start() { + taskListeners.forEach(TaskListener::onStart); + workerQueue.add(Schedulers.schedule(scheduler, wrap(() -> { + boolean flag = executeTasks(Collections.singleton(firstTask)); + taskListeners.forEach(it -> it.onStop(flag, this)); + }))); + return this; + } + + @Override + public boolean test() { + taskListeners.forEach(TaskListener::onStart); + AtomicBoolean flag = new AtomicBoolean(true); + Future future = Schedulers.schedule(scheduler, wrap(() -> { + flag.set(executeTasks(Collections.singleton(firstTask))); + taskListeners.forEach(it -> it.onStop(flag.get(), this)); + })); + workerQueue.add(future); + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException | CancellationException ignored) { + } + return flag.get(); + } + + @Override + public synchronized void cancel() { + cancelled.set(true); + + while (!workerQueue.isEmpty()) { + Future future = workerQueue.poll(); + if (future != null) + future.cancel(true); + } + } + + private boolean executeTasks(Collection> tasks) throws InterruptedException { + if (tasks.isEmpty()) + return true; + + totTask.addAndGet(tasks.size()); + AtomicBoolean success = new AtomicBoolean(true); + CountDownLatch latch = new CountDownLatch(tasks.size()); + for (Task task : tasks) { + if (cancelled.get()) + return false; + Invoker invoker = new Invoker(task, latch, success); + try { + Future future = Schedulers.schedule(scheduler, invoker); + workerQueue.add(future); + } catch (RejectedExecutionException e) { + throw new InterruptedException(); + } + } + + if (cancelled.get()) + return false; + + try { + latch.await(); + } catch (InterruptedException e) { + return false; + } + return success.get() && !cancelled.get(); + } + + private boolean executeTask(Task task) { + task.setCancelled(this::isCancelled); + + if (cancelled.get()) { + task.setState(Task.TaskState.FAILED); + task.setException(new CancellationException()); + return false; + } + + task.setState(Task.TaskState.READY); + + if (task.getSignificance().shouldLog()) + Logging.LOG.log(Level.FINE, "Executing task: " + task.getName()); + + taskListeners.forEach(it -> it.onReady(task)); + + boolean flag = false; + + try { + if (task.doPreExecute()) { + try { + Schedulers.schedule(task.getExecutor(), wrap(task::preExecute)).get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof Exception) + throw (Exception) e.getCause(); + else + throw e; + } + } + + Collection> dependents = task.getDependents(); + boolean doDependentsSucceeded = executeTasks(dependents); + Exception dependentsException = dependents.stream().map(Task::getException).filter(Objects::nonNull).findAny().orElse(null); + if (!doDependentsSucceeded && task.isRelyingOnDependents() || cancelled.get()) { + task.setException(dependentsException); + throw new CancellationException(); + } + + if (doDependentsSucceeded) + task.setDependentsSucceeded(); + + try { + Schedulers.schedule(task.getExecutor(), wrap(() -> { + task.setState(Task.TaskState.RUNNING); + taskListeners.forEach(it -> it.onRunning(task)); + task.execute(); + })).get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof Exception) + throw (Exception) e.getCause(); + else + throw e; + } finally { + task.setState(Task.TaskState.EXECUTED); + } + + Collection> dependencies = task.getDependencies(); + boolean doDependenciesSucceeded = executeTasks(dependencies); + Exception dependenciesException = dependencies.stream().map(Task::getException).filter(Objects::nonNull).findAny().orElse(null); + + if (doDependenciesSucceeded) + task.setDependenciesSucceeded(); + + if (task.doPostExecute()) { + try { + Schedulers.schedule(task.getExecutor(), wrap(task::postExecute)).get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof Exception) + throw (Exception) e.getCause(); + else + throw e; + } + } + + if (!doDependenciesSucceeded && task.isRelyingOnDependencies()) { + Logging.LOG.severe("Subtasks failed for " + task.getName()); + task.setException(dependenciesException); + throw new CancellationException(); + } + + flag = true; + if (task.getSignificance().shouldLog()) { + Logging.LOG.log(Level.FINER, "Task finished: " + task.getName()); + } + + task.onDone().fireEvent(new TaskEvent(this, task, false)); + taskListeners.forEach(it -> it.onFinished(task)); + } catch (InterruptedException e) { + task.setException(e); + if (task.getSignificance().shouldLog()) { + Logging.LOG.log(Level.FINE, "Task aborted: " + task.getName()); + } + task.onDone().fireEvent(new TaskEvent(this, task, true)); + taskListeners.forEach(it -> it.onFailed(task, e)); + } catch (CancellationException | RejectedExecutionException e) { + if (task.getException() == null) + task.setException(e); + } catch (Exception e) { + task.setException(e); + exception = e; + if (task.getSignificance().shouldLog()) { + Logging.LOG.log(Level.FINE, "Task failed: " + task.getName(), e); + } + task.onDone().fireEvent(new TaskEvent(this, task, true)); + taskListeners.forEach(it -> it.onFailed(task, e)); + } + task.setState(flag ? Task.TaskState.SUCCEEDED : Task.TaskState.FAILED); + return flag; + } + + private static void rethrow(Throwable e) { + if (e == null) + return; + if (e instanceof ExecutionException || e instanceof CompletionException) { // including UncheckedException and UncheckedThrowable + rethrow(e.getCause()); + } else if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } else { + throw new CompletionException(e); + } + } + + private static Runnable wrap(ExceptionalRunnable runnable) { + return () -> { + try { + runnable.run(); + } catch (Exception e) { + rethrow(e); + } + }; + } + + private class Invoker implements Runnable { + + private final Task task; + private final CountDownLatch latch; + private final AtomicBoolean success; + + public Invoker(Task task, CountDownLatch latch, AtomicBoolean success) { + this.task = task; + this.latch = latch; + this.success = success; + } + + @Override + public void run() { + try { + Thread.currentThread().setName(task.getName()); + if (!executeTask(task)) + success.set(false); + } finally { + latch.countDown(); + } + } + + } +} diff --git a/HMCLCore/src/main/java/org/jackhuang/hmcl/task/Schedulers.java b/HMCLCore/src/main/java/org/jackhuang/hmcl/task/Schedulers.java index 4fd3317e7..d72214c30 100644 --- a/HMCLCore/src/main/java/org/jackhuang/hmcl/task/Schedulers.java +++ b/HMCLCore/src/main/java/org/jackhuang/hmcl/task/Schedulers.java @@ -19,9 +19,20 @@ package org.jackhuang.hmcl.task; import javafx.application.Platform; import org.jackhuang.hmcl.util.Logging; +import org.jetbrains.annotations.NotNull; import javax.swing.*; -import java.util.concurrent.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; /** * @@ -92,4 +103,61 @@ public final class Schedulers { if (SINGLE_EXECUTOR != null) SINGLE_EXECUTOR.shutdownNow(); } + + public static Future schedule(Executor executor, Runnable command) { + if (executor instanceof ExecutorService) { + return ((ExecutorService) executor).submit(command); + } + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference wrapper = new AtomicReference<>(); + + executor.execute(() -> { + try { + command.run(); + } catch (Exception e) { + wrapper.set(e); + } finally { + latch.countDown(); + } + Thread.interrupted(); // clear the `interrupted` flag to prevent from interrupting EventDispatch thread. + }); + + return new Future() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return latch.getCount() == 0; + } + + private Void getImpl() throws ExecutionException { + Exception e = wrapper.get(); + if (e != null) + throw new ExecutionException(e); + return null; + } + + @Override + public Void get() throws InterruptedException, ExecutionException { + latch.await(); + return getImpl(); + } + + @Override + public Void get(long timeout, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (!latch.await(timeout, unit)) + throw new TimeoutException(); + return getImpl(); + } + }; + } } diff --git a/HMCLCore/src/main/java/org/jackhuang/hmcl/task/Task.java b/HMCLCore/src/main/java/org/jackhuang/hmcl/task/Task.java index 432b7ad50..ad9e5e8d7 100644 --- a/HMCLCore/src/main/java/org/jackhuang/hmcl/task/Task.java +++ b/HMCLCore/src/main/java/org/jackhuang/hmcl/task/Task.java @@ -343,6 +343,23 @@ public abstract class Task { return executor; } + public final TaskExecutor cancellableExecutor() { + return new CancellableTaskExecutor(this); + } + + public final TaskExecutor cancellableExecutor(boolean start) { + TaskExecutor executor = new CancellableTaskExecutor(this); + if (start) + executor.start(); + return executor; + } + + public final TaskExecutor cancellableExecutor(TaskListener taskListener) { + TaskExecutor executor = new CancellableTaskExecutor(this); + executor.addTaskListener(taskListener); + return executor; + } + public final void start() { executor().start(); }