将 Schedulers.io() 切换至虚拟线程 (#4548)

This commit is contained in:
Glavo
2025-09-26 20:12:01 +08:00
committed by GitHub
parent 225dba7448
commit 4b4dd592c3
2 changed files with 78 additions and 28 deletions

View File

@@ -18,48 +18,81 @@
package org.jackhuang.hmcl.task; package org.jackhuang.hmcl.task;
import javafx.application.Platform; import javafx.application.Platform;
import org.jackhuang.hmcl.util.Lang;
import org.jetbrains.annotations.Nullable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.function.Function;
import static org.jackhuang.hmcl.util.Lang.threadPool;
import static org.jackhuang.hmcl.util.logging.Logger.LOG; import static org.jackhuang.hmcl.util.logging.Logger.LOG;
/** /// @author huangyuhui
*
* @author huangyuhui
*/
public final class Schedulers { public final class Schedulers {
private Schedulers() { private Schedulers() {
} }
private static volatile ExecutorService IO_EXECUTOR; private static final @Nullable Function<String, ExecutorService> NEW_VIRTUAL_THREAD_PER_TASK_EXECUTOR;
/** static {
* Get singleton instance of the thread pool for I/O operations, if (Runtime.version().feature() >= 21) {
* usually for reading files from disk, or Internet connections. try {
* MethodHandles.Lookup lookup = MethodHandles.publicLookup();
* This thread pool has no more than 4 threads, and number of threads will get
* reduced if concurrency is less than thread number. Class<?> vtBuilderCls = Class.forName("java.lang.Thread$Builder$OfVirtual");
*
* @return Thread pool for I/O operations. MethodHandle ofVirtualHandle = lookup.findStatic(Thread.class, "ofVirtual", MethodType.methodType(vtBuilderCls));
*/ MethodHandle setNameHandle = lookup.findVirtual(vtBuilderCls, "name", MethodType.methodType(vtBuilderCls, String.class, long.class));
MethodHandle toFactoryHandle = lookup.findVirtual(vtBuilderCls, "factory", MethodType.methodType(ThreadFactory.class));
MethodHandle newThreadPerTaskExecutorFactory = lookup.findStatic(Executors.class, "newThreadPerTaskExecutor", MethodType.methodType(ExecutorService.class, ThreadFactory.class));
NEW_VIRTUAL_THREAD_PER_TASK_EXECUTOR = name -> {
try {
Object virtualThreadBuilder = ofVirtualHandle.invoke();
setNameHandle.invoke(virtualThreadBuilder, name, 1L);
ThreadFactory threadFactory = (ThreadFactory) toFactoryHandle.invoke(virtualThreadBuilder);
return (ExecutorService) newThreadPerTaskExecutorFactory.invokeExact(threadFactory);
} catch (Throwable e) {
throw new AssertionError("Unreachable", e);
}
};
} catch (Throwable e) {
throw new AssertionError("Unreachable", e);
}
} else {
NEW_VIRTUAL_THREAD_PER_TASK_EXECUTOR = null;
}
}
/// @return Returns null if the Java version is below 21, otherwise always returns a non-null value.
public static ExecutorService newVirtualThreadPerTaskExecutor(String name) {
if (NEW_VIRTUAL_THREAD_PER_TASK_EXECUTOR == null) {
return null;
}
return NEW_VIRTUAL_THREAD_PER_TASK_EXECUTOR.apply(name);
}
/// This thread pool is suitable for network and local I/O operations.
///
/// For Java 21 or later, all tasks will be dispatched to virtual threads.
///
/// @return Thread pool for I/O operations.
public static ExecutorService io() { public static ExecutorService io() {
if (IO_EXECUTOR == null) { return Holder.IO_EXECUTOR;
synchronized (Schedulers.class) {
if (IO_EXECUTOR == null) {
IO_EXECUTOR = threadPool("IO", true, 4, 10, TimeUnit.SECONDS);
}
}
}
return IO_EXECUTOR;
} }
public static Executor javafx() { public static Executor javafx() {
return Platform::runLater; return Platform::runLater;
} }
/// Default thread pool, equivalent to [ForkJoinPool#commonPool()].
///
/// It is recommended to perform computation tasks on this thread pool. For I/O operations, please use [#io()].
public static Executor defaultScheduler() { public static Executor defaultScheduler() {
return ForkJoinPool.commonPool(); return ForkJoinPool.commonPool();
} }
@@ -70,9 +103,18 @@ public final class Schedulers {
// shutdownNow will interrupt all threads. // shutdownNow will interrupt all threads.
// So when we want to close the app, no threads need to be waited for finish. // So when we want to close the app, no threads need to be waited for finish.
// Sometimes it resolves the problem that the app does not exit. // Sometimes it resolves the problem that the app does not exit.
}
if (IO_EXECUTOR != null) private static final class Holder {
IO_EXECUTOR.shutdownNow(); private static final ExecutorService IO_EXECUTOR;
static {
//noinspection resource
ExecutorService vtExecutor = newVirtualThreadPerTaskExecutor("IO");
IO_EXECUTOR = vtExecutor != null
? vtExecutor
: Executors.newCachedThreadPool(Lang.counterThreadFactory("IO", true));
}
} }
} }

View File

@@ -236,14 +236,22 @@ public final class Lang {
} }
public static ThreadPoolExecutor threadPool(String name, boolean daemon, int threads, long timeout, TimeUnit timeunit) { public static ThreadPoolExecutor threadPool(String name, boolean daemon, int threads, long timeout, TimeUnit timeunit) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
threads, threads,
timeout, timeunit,
new LinkedBlockingQueue<>(),
counterThreadFactory(name, daemon));
pool.allowCoreThreadTimeOut(true);
return pool;
}
public static ThreadFactory counterThreadFactory(String name, boolean daemon) {
AtomicInteger counter = new AtomicInteger(1); AtomicInteger counter = new AtomicInteger(1);
ThreadPoolExecutor pool = new ThreadPoolExecutor(threads, threads, timeout, timeunit, new LinkedBlockingQueue<>(), r -> { return r -> {
Thread t = new Thread(r, name + "-" + counter.getAndIncrement()); Thread t = new Thread(r, name + "-" + counter.getAndIncrement());
t.setDaemon(daemon); t.setDaemon(daemon);
return t; return t;
}); };
pool.allowCoreThreadTimeOut(true);
return pool;
} }
public static int parseInt(Object string, int defaultValue) { public static int parseInt(Object string, int defaultValue) {