add: more brief launching steps

This commit is contained in:
huanghongxun
2020-02-19 13:34:40 +08:00
parent a144a5eee0
commit 1ea9170b2e
14 changed files with 310 additions and 210 deletions

View File

@@ -24,7 +24,10 @@ import org.jackhuang.hmcl.util.function.ExceptionalRunnable;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.*;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
/**
@@ -42,7 +45,7 @@ public final class AsyncTaskExecutor extends TaskExecutor {
@Override
public TaskExecutor start() {
taskListeners.forEach(TaskListener::onStart);
future = executeTasks(Collections.singleton(firstTask))
future = executeTasks(null, Collections.singleton(firstTask))
.thenApplyAsync(exception -> {
boolean success = exception == null;
@@ -95,7 +98,7 @@ public final class AsyncTaskExecutor extends TaskExecutor {
future.cancel(true);
}
private CompletableFuture<Exception> executeTasks(Collection<Task<?>> tasks) {
private CompletableFuture<Exception> executeTasks(Task<?> parentTask, Collection<Task<?>> tasks) {
if (tasks == null || tasks.isEmpty())
return CompletableFuture.completedFuture(null);
@@ -105,7 +108,7 @@ public final class AsyncTaskExecutor extends TaskExecutor {
return CompletableFuture.allOf(tasks.stream()
.map(task -> CompletableFuture.completedFuture(null)
.thenComposeAsync(unused2 -> executeTask(task))
.thenComposeAsync(unused2 -> executeTask(parentTask, task))
).toArray(CompletableFuture<?>[]::new));
})
.thenApplyAsync(unused -> (Exception) null)
@@ -120,11 +123,13 @@ public final class AsyncTaskExecutor extends TaskExecutor {
});
}
private CompletableFuture<?> executeTask(Task<?> task) {
private CompletableFuture<?> executeTask(Task<?> parentTask, Task<?> task) {
return CompletableFuture.completedFuture(null)
.thenComposeAsync(unused -> {
task.setCancelled(this::isCancelled);
task.setState(Task.TaskState.READY);
if (parentTask != null && task.getStage() == null)
task.setStage(parentTask.getStage());
if (task.getSignificance().shouldLog())
Logging.LOG.log(Level.FINE, "Executing task: " + task.getName());
@@ -137,7 +142,7 @@ public final class AsyncTaskExecutor extends TaskExecutor {
return CompletableFuture.completedFuture(null);
}
})
.thenComposeAsync(unused -> executeTasks(task.getDependents()))
.thenComposeAsync(unused -> executeTasks(task, task.getDependents()))
.thenComposeAsync(dependentsException -> {
boolean isDependentsSucceeded = dependentsException == null;
@@ -158,7 +163,7 @@ public final class AsyncTaskExecutor extends TaskExecutor {
rethrow(throwable);
});
})
.thenComposeAsync(unused -> executeTasks(task.getDependencies()))
.thenComposeAsync(unused -> executeTasks(task, task.getDependencies()))
.thenComposeAsync(dependenciesException -> {
boolean isDependenciesSucceeded = dependenciesException == null;

View File

@@ -23,14 +23,7 @@ import org.jackhuang.hmcl.util.function.ExceptionalRunnable;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
@@ -47,7 +40,7 @@ public class CancellableTaskExecutor extends TaskExecutor {
public TaskExecutor start() {
taskListeners.forEach(TaskListener::onStart);
workerQueue.add(Schedulers.schedule(scheduler, wrap(() -> {
boolean flag = executeTasks(Collections.singleton(firstTask));
boolean flag = executeTasks(null, Collections.singleton(firstTask));
taskListeners.forEach(it -> it.onStop(flag, this));
})));
return this;
@@ -58,7 +51,7 @@ public class CancellableTaskExecutor extends TaskExecutor {
taskListeners.forEach(TaskListener::onStart);
AtomicBoolean flag = new AtomicBoolean(true);
Future<?> future = Schedulers.schedule(scheduler, wrap(() -> {
flag.set(executeTasks(Collections.singleton(firstTask)));
flag.set(executeTasks(null, Collections.singleton(firstTask)));
taskListeners.forEach(it -> it.onStop(flag.get(), this));
}));
workerQueue.add(future);
@@ -82,7 +75,7 @@ public class CancellableTaskExecutor extends TaskExecutor {
}
}
private boolean executeTasks(Collection<? extends Task<?>> tasks) throws InterruptedException {
private boolean executeTasks(Task<?> parentTask, Collection<? extends Task<?>> tasks) throws InterruptedException {
if (tasks.isEmpty())
return true;
@@ -92,7 +85,7 @@ public class CancellableTaskExecutor extends TaskExecutor {
for (Task<?> task : tasks) {
if (cancelled.get())
return false;
Invoker invoker = new Invoker(task, latch, success);
Invoker invoker = new Invoker(parentTask, task, latch, success);
try {
Future<?> future = Schedulers.schedule(scheduler, invoker);
workerQueue.add(future);
@@ -112,7 +105,7 @@ public class CancellableTaskExecutor extends TaskExecutor {
return success.get() && !cancelled.get();
}
private boolean executeTask(Task<?> task) {
private boolean executeTask(Task<?> parentTask, Task<?> task) {
task.setCancelled(this::isCancelled);
if (cancelled.get()) {
@@ -122,6 +115,8 @@ public class CancellableTaskExecutor extends TaskExecutor {
}
task.setState(Task.TaskState.READY);
if (parentTask != null && task.getStage() == null)
task.setStage(parentTask.getStage());
if (task.getSignificance().shouldLog())
Logging.LOG.log(Level.FINE, "Executing task: " + task.getName());
@@ -140,8 +135,12 @@ public class CancellableTaskExecutor extends TaskExecutor {
}
Collection<? extends Task<?>> dependents = task.getDependents();
boolean doDependentsSucceeded = executeTasks(dependents);
Exception dependentsException = dependents.stream().map(Task::getException).filter(Objects::nonNull).findAny().orElse(null);
boolean doDependentsSucceeded = executeTasks(task, dependents);
Exception dependentsException = dependents.stream().map(Task::getException)
.filter(Objects::nonNull)
.filter(x -> !(x instanceof CancellationException))
.filter(x -> !(x instanceof InterruptedException))
.findAny().orElse(null);
if (!doDependentsSucceeded && task.isRelyingOnDependents() || cancelled.get()) {
task.setException(dependentsException);
throw new CancellationException();
@@ -163,8 +162,12 @@ public class CancellableTaskExecutor extends TaskExecutor {
}
Collection<? extends Task<?>> dependencies = task.getDependencies();
boolean doDependenciesSucceeded = executeTasks(dependencies);
Exception dependenciesException = dependencies.stream().map(Task::getException).filter(Objects::nonNull).findAny().orElse(null);
boolean doDependenciesSucceeded = executeTasks(task, dependencies);
Exception dependenciesException = dependencies.stream().map(Task::getException)
.filter(Objects::nonNull)
.filter(x -> !(x instanceof CancellationException))
.filter(x -> !(x instanceof InterruptedException))
.findAny().orElse(null);
if (doDependenciesSucceeded)
task.setDependenciesSucceeded();
@@ -250,11 +253,13 @@ public class CancellableTaskExecutor extends TaskExecutor {
private class Invoker implements Runnable {
private final Task<?> parentTask;
private final Task<?> task;
private final CountDownLatch latch;
private final AtomicBoolean success;
public Invoker(Task<?> task, CountDownLatch latch, AtomicBoolean success) {
public Invoker(Task<?> parentTask, Task<?> task, CountDownLatch latch, AtomicBoolean success) {
this.parentTask = parentTask;
this.task = task;
this.latch = latch;
this.success = success;
@@ -264,7 +269,7 @@ public class CancellableTaskExecutor extends TaskExecutor {
public void run() {
try {
Thread.currentThread().setName(task.getName());
if (!executeTask(task))
if (!executeTask(parentTask, task))
success.set(false);
} finally {
latch.countDown();

View File

@@ -17,13 +17,11 @@
*/
package org.jackhuang.hmcl.task;
import org.jackhuang.hmcl.event.Event;
import org.jackhuang.hmcl.event.EventBus;
import org.jackhuang.hmcl.util.CacheRepository;
import org.jackhuang.hmcl.util.Logging;
import org.jackhuang.hmcl.util.io.ChecksumMismatchException;
import org.jackhuang.hmcl.util.io.FileUtils;
import org.jackhuang.hmcl.util.io.IOUtils;
import org.jackhuang.hmcl.util.io.NetworkUtils;
import org.jackhuang.hmcl.util.io.ResponseCodeException;
import org.jackhuang.hmcl.util.io.*;
import java.io.File;
import java.io.IOException;
@@ -35,10 +33,8 @@ import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import static java.util.Objects.requireNonNull;
@@ -279,7 +275,7 @@ public class FileDownloadTask extends Task<Void> {
updateProgress(downloaded, contentLength);
long now = System.currentTimeMillis();
if (now - lastTime >= 1000) {
updateMessage((downloaded - lastDownloaded) / 1024 + "KB/s");
updateDownloadSpeed(downloaded - lastDownloaded);
lastDownloaded = downloaded;
lastTime = now;
}
@@ -337,4 +333,39 @@ public class FileDownloadTask extends Task<Void> {
throw new DownloadException(urls.get(0), exception);
}
private static final Timer timer = new Timer("DownloadSpeedRecorder", true);
private static final AtomicInteger downloadSpeed = new AtomicInteger(0);
public static final EventBus speedEvent = new EventBus();
static {
timer.schedule(new TimerTask() {
@Override
public void run() {
speedEvent.fireEvent(new SpeedEvent(speedEvent, downloadSpeed.getAndSet(0)));
}
}, 0, 1000);
}
private static void updateDownloadSpeed(int speed) {
downloadSpeed.addAndGet(speed);
}
public static class SpeedEvent extends Event {
private final int speed;
public SpeedEvent(Object source, int speed) {
super(source);
this.speed = speed;
}
/**
* Download speed in byte/sec.
* @return
*/
public int getSpeed() {
return speed;
}
}
}

View File

@@ -79,6 +79,25 @@ public abstract class Task<T> {
return cancelled != null ? cancelled.get() : false;
}
// stage
private String stage = null;
/**
* Stage of task implies the goal of this task, for grouping tasks.
* Stage will inherit from the parent task.
*/
public String getStage() {
return stage;
}
/**
* You must initialize stage in preExecute.
* @param stage the stage
*/
public void setStage(String stage) {
this.stage = stage;
}
// state
private TaskState state = TaskState.READY;
@@ -344,18 +363,18 @@ public abstract class Task<T> {
}
public final TaskExecutor cancellableExecutor() {
return new CancellableTaskExecutor(this);
return new AsyncTaskExecutor(this);
}
public final TaskExecutor cancellableExecutor(boolean start) {
TaskExecutor executor = new CancellableTaskExecutor(this);
TaskExecutor executor = new AsyncTaskExecutor(this);
if (start)
executor.start();
return executor;
}
public final TaskExecutor cancellableExecutor(TaskListener taskListener) {
TaskExecutor executor = new CancellableTaskExecutor(this);
TaskExecutor executor = new AsyncTaskExecutor(this);
executor.addTaskListener(taskListener);
return executor;
}
@@ -743,6 +762,12 @@ public abstract class Task<T> {
return whenComplete(executor, () -> success.accept(getResult()), failure);
}
public Task<T> withStage(String stage) {
StageTask<T> task = new StageTask<>(this);
task.setStage(stage);
return task;
}
public static Task<Void> runAsync(ExceptionalRunnable<?> closure) {
return runAsync(Schedulers.defaultScheduler(), closure);
}
@@ -760,6 +785,10 @@ public abstract class Task<T> {
}
public static <T> Task<T> composeAsync(ExceptionalSupplier<Task<T>, ?> fn) {
return composeAsync(getCaller(), fn);
}
public static <T> Task<T> composeAsync(String name, ExceptionalSupplier<Task<T>, ?> fn) {
return new Task<T>() {
Task<T> then;
@@ -774,7 +803,7 @@ public abstract class Task<T> {
public Collection<Task<?>> getDependencies() {
return then == null ? Collections.emptySet() : Collections.singleton(then);
}
};
}.setName(name);
}
public static <V> Task<V> supplyAsync(Callable<V> callable) {
@@ -960,4 +989,21 @@ public abstract class Task<T> {
return relyingOnDependents;
}
}
public static class StageTask<T> extends Task<T> {
private final Task<T> task;
StageTask(Task<T> task) {
this.task = task;
}
@Override
public Collection<Task<?>> getDependents() {
return Collections.singleton(task);
}
@Override
public void execute() throws Exception {
setResult(task.getResult());
}
}
}