从 synchronized 切换至 ReentrantLock (#4564)

This commit is contained in:
Glavo
2025-10-02 15:50:56 +08:00
committed by GitHub
parent e0f5c4d91e
commit 40b4ecd42a
6 changed files with 102 additions and 36 deletions

View File

@@ -58,6 +58,7 @@ import java.nio.file.Path;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
@@ -706,6 +707,7 @@ public final class LauncherHelper {
*/ */
private final class HMCLProcessListener implements ProcessListener { private final class HMCLProcessListener implements ProcessListener {
private final ReentrantLock lock = new ReentrantLock();
private final HMCLGameRepository repository; private final HMCLGameRepository repository;
private final Version version; private final Version version;
private final LaunchOptions launchOptions; private final LaunchOptions launchOptions;
@@ -849,21 +851,27 @@ public final class LauncherHelper {
level = Lang.requireNonNullElse(Log4jLevel.guessLevel(log), Log4jLevel.INFO); level = Lang.requireNonNullElse(Log4jLevel.guessLevel(log), Log4jLevel.INFO);
logBuffer.add(new Log(log, level)); logBuffer.add(new Log(log, level));
} else { } else {
synchronized (this) { lock.lock();
try {
logs.addLast(new Log(log, level)); logs.addLast(new Log(log, level));
if (logs.size() > Log.getLogLines()) if (logs.size() > Log.getLogLines())
logs.removeFirst(); logs.removeFirst();
} finally {
lock.unlock();
} }
} }
if (!lwjgl) { if (!lwjgl) {
String lowerCaseLog = log.toLowerCase(Locale.ROOT); String lowerCaseLog = log.toLowerCase(Locale.ROOT);
if (!detectWindow || lowerCaseLog.contains("lwjgl version") || lowerCaseLog.contains("lwjgl openal")) { if (!detectWindow || lowerCaseLog.contains("lwjgl version") || lowerCaseLog.contains("lwjgl openal")) {
synchronized (this) { lock.lock();
try {
if (!lwjgl) { if (!lwjgl) {
lwjgl = true; lwjgl = true;
finishLaunch(); finishLaunch();
} }
} finally {
lock.unlock();
} }
} }
} }
@@ -888,9 +896,12 @@ public final class LauncherHelper {
// Game crashed before opening the game window. // Game crashed before opening the game window.
if (!lwjgl) { if (!lwjgl) {
synchronized (this) { lock.lock();
try {
if (!lwjgl) if (!lwjgl)
finishLaunch(); finishLaunch();
} finally {
lock.unlock();
} }
} }

View File

@@ -287,7 +287,7 @@ public class PartialInflaterInputStream extends FilterInputStream {
* the mark position becomes invalid. * the mark position becomes invalid.
* @see InputStream#reset() * @see InputStream#reset()
*/ */
public synchronized void mark(int readlimit) { public void mark(int readlimit) {
} }
/** /**
@@ -302,7 +302,7 @@ public class PartialInflaterInputStream extends FilterInputStream {
* @see InputStream#mark(int) * @see InputStream#mark(int)
* @see IOException * @see IOException
*/ */
public synchronized void reset() throws IOException { public void reset() throws IOException {
throw new IOException("mark/reset not supported"); throw new IOException("mark/reset not supported");
} }
} }

View File

@@ -97,7 +97,7 @@ public final class Schedulers {
return ForkJoinPool.commonPool(); return ForkJoinPool.commonPool();
} }
public static synchronized void shutdown() { public static void shutdown() {
LOG.info("Shutting down executor services."); LOG.info("Shutting down executor services.");
// shutdownNow will interrupt all threads. // shutdownNow will interrupt all threads.

View File

@@ -25,6 +25,7 @@ import javafx.beans.value.ObservableValue;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
@@ -144,6 +145,7 @@ public abstract class BindingMapping<T, U> extends ObjectBinding<U> {
private static final class AsyncMappedBinding<T, U> extends BindingMapping<T, U> { private static final class AsyncMappedBinding<T, U> extends BindingMapping<T, U> {
private final ReentrantLock lock = new ReentrantLock();
private boolean initialized = false; private boolean initialized = false;
private T prev; private T prev;
private U value; private U value;
@@ -159,13 +161,16 @@ public abstract class BindingMapping<T, U> extends ObjectBinding<U> {
} }
private void tryUpdateValue(T currentPrev) { private void tryUpdateValue(T currentPrev) {
synchronized (this) { lock.lock();
try {
if ((initialized && Objects.equals(prev, currentPrev)) if ((initialized && Objects.equals(prev, currentPrev))
|| isComputing(currentPrev)) { || isComputing(currentPrev)) {
return; return;
} }
computing = true; computing = true;
computingPrev = currentPrev; computingPrev = currentPrev;
} finally {
lock.unlock();
} }
CompletableFuture<? extends U> task; CompletableFuture<? extends U> task;
@@ -189,7 +194,8 @@ public abstract class BindingMapping<T, U> extends ObjectBinding<U> {
} }
private void valueUpdate(T currentPrev, U computed) { private void valueUpdate(T currentPrev, U computed) {
synchronized (this) { lock.lock();
try {
if (isComputing(currentPrev)) { if (isComputing(currentPrev)) {
computing = false; computing = false;
computingPrev = null; computingPrev = null;
@@ -197,15 +203,20 @@ public abstract class BindingMapping<T, U> extends ObjectBinding<U> {
value = computed; value = computed;
initialized = true; initialized = true;
} }
} finally {
lock.unlock();
} }
} }
private void valueUpdateFailed(T currentPrev) { private void valueUpdateFailed(T currentPrev) {
synchronized (this) { lock.lock();
try {
if (isComputing(currentPrev)) { if (isComputing(currentPrev)) {
computing = false; computing = false;
computingPrev = null; computingPrev = null;
} }
} finally {
lock.unlock();
} }
} }

View File

@@ -24,6 +24,7 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import org.jackhuang.hmcl.util.function.ExceptionalFunction; import org.jackhuang.hmcl.util.function.ExceptionalFunction;
@@ -37,6 +38,7 @@ import javafx.beans.binding.ObjectBinding;
*/ */
public final class ObservableCache<K, V, E extends Exception> { public final class ObservableCache<K, V, E extends Exception> {
private final ReentrantLock lock = new ReentrantLock();
private final ExceptionalFunction<K, V, E> source; private final ExceptionalFunction<K, V, E> source;
private final BiConsumer<K, Throwable> exceptionHandler; private final BiConsumer<K, Throwable> exceptionHandler;
private final V fallbackValue; private final V fallbackValue;
@@ -54,22 +56,29 @@ public final class ObservableCache<K, V, E extends Exception> {
} }
public Optional<V> getImmediately(K key) { public Optional<V> getImmediately(K key) {
synchronized (this) { lock.lock();
try {
return Optional.ofNullable(cache.get(key)); return Optional.ofNullable(cache.get(key));
} finally {
lock.unlock();
} }
} }
public void put(K key, V value) { public void put(K key, V value) {
synchronized (this) { lock.lock();
try {
cache.put(key, value); cache.put(key, value);
invalidated.remove(key); invalidated.remove(key);
} finally {
lock.unlock();
} }
Platform.runLater(observable::invalidate); Platform.runLater(observable::invalidate);
} }
private CompletableFuture<V> query(K key, Executor executor) { private CompletableFuture<V> query(K key, Executor executor) {
CompletableFuture<V> future; CompletableFuture<V> future;
synchronized (this) { lock.lock();
try {
CompletableFuture<V> prev = pendings.get(key); CompletableFuture<V> prev = pendings.get(key);
if (prev != null) { if (prev != null) {
return prev; return prev;
@@ -77,6 +86,8 @@ public final class ObservableCache<K, V, E extends Exception> {
future = new CompletableFuture<>(); future = new CompletableFuture<>();
pendings.put(key, future); pendings.put(key, future);
} }
} finally {
lock.unlock();
} }
executor.execute(() -> { executor.execute(() -> {
@@ -84,18 +95,24 @@ public final class ObservableCache<K, V, E extends Exception> {
try { try {
result = source.apply(key); result = source.apply(key);
} catch (Throwable ex) { } catch (Throwable ex) {
synchronized (this) { lock.lock();
try {
pendings.remove(key); pendings.remove(key);
} finally {
lock.unlock();
} }
exceptionHandler.accept(key, ex); exceptionHandler.accept(key, ex);
future.completeExceptionally(ex); future.completeExceptionally(ex);
return; return;
} }
synchronized (this) { lock.lock();
try {
cache.put(key, result); cache.put(key, result);
invalidated.remove(key); invalidated.remove(key);
pendings.remove(key, future); pendings.remove(key, future);
} finally {
lock.unlock();
} }
future.complete(result); future.complete(result);
Platform.runLater(observable::invalidate); Platform.runLater(observable::invalidate);
@@ -106,11 +123,14 @@ public final class ObservableCache<K, V, E extends Exception> {
public V get(K key) { public V get(K key) {
V cached; V cached;
synchronized (this) { lock.lock();
try {
cached = cache.get(key); cached = cache.get(key);
if (cached != null && !invalidated.containsKey(key)) { if (cached != null && !invalidated.containsKey(key)) {
return cached; return cached;
} }
} finally {
lock.unlock();
} }
try { try {
@@ -143,7 +163,9 @@ public final class ObservableCache<K, V, E extends Exception> {
return Bindings.createObjectBinding(() -> { return Bindings.createObjectBinding(() -> {
V result; V result;
boolean refresh; boolean refresh;
synchronized (this) {
lock.lock();
try {
result = cache.get(key); result = cache.get(key);
if (result == null) { if (result == null) {
result = fallbackValue; result = fallbackValue;
@@ -151,6 +173,8 @@ public final class ObservableCache<K, V, E extends Exception> {
} else { } else {
refresh = invalidated.containsKey(key); refresh = invalidated.containsKey(key);
} }
} finally {
lock.unlock();
} }
if (!quiet && refresh) { if (!quiet && refresh) {
query(key, executor); query(key, executor);
@@ -160,10 +184,13 @@ public final class ObservableCache<K, V, E extends Exception> {
} }
public void invalidate(K key) { public void invalidate(K key) {
synchronized (this) { lock.lock();
try {
if (cache.containsKey(key)) { if (cache.containsKey(key)) {
invalidated.put(key, Boolean.TRUE); invalidated.put(key, Boolean.TRUE);
} }
} finally {
lock.unlock();
} }
Platform.runLater(observable::invalidate); Platform.runLater(observable::invalidate);
} }

View File

@@ -22,17 +22,17 @@ import org.jackhuang.hmcl.util.Lang;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Predicate; import java.util.function.Predicate;
/** /// The managed process.
* The managed process. ///
* /// @author huangyuhui
* @author huangyuhui /// <!-- @see org.jackhuang.hmcl.launch.ExitWaiter -->
* @see org.jackhuang.hmcl.launch.ExitWaiter /// @see org.jackhuang.hmcl.launch.StreamPump
* @see org.jackhuang.hmcl.launch.StreamPump
*/
public final class ManagedProcess { public final class ManagedProcess {
private final ReentrantLock lock = new ReentrantLock();
private final Process process; private final Process process;
private final List<String> commands; private final List<String> commands;
private final String classpath; private final String classpath;
@@ -54,7 +54,7 @@ public final class ManagedProcess {
*/ */
public ManagedProcess(Process process, List<String> commands) { public ManagedProcess(Process process, List<String> commands) {
this.process = process; this.process = process;
this.commands = Collections.unmodifiableList(new ArrayList<>(commands)); this.commands = List.copyOf(commands);
this.classpath = null; this.classpath = null;
} }
@@ -67,7 +67,7 @@ public final class ManagedProcess {
*/ */
public ManagedProcess(Process process, List<String> commands, String classpath) { public ManagedProcess(Process process, List<String> commands, String classpath) {
this.process = process; this.process = process;
this.commands = Collections.unmodifiableList(new ArrayList<>(commands)); this.commands = List.copyOf(commands);
this.classpath = classpath; this.classpath = classpath;
} }
@@ -111,9 +111,11 @@ public final class ManagedProcess {
* *
* @see #addLine * @see #addLine
*/ */
public synchronized List<String> getLines(Predicate<String> lineFilter) { public List<String> getLines(Predicate<String> lineFilter) {
lock.lock();
if (lineFilter == null) if (lineFilter == null)
return Collections.unmodifiableList(Arrays.asList(lines.toArray(new String[0]))); return List.copyOf(lines);
ArrayList<String> res = new ArrayList<>(); ArrayList<String> res = new ArrayList<>();
for (String line : this.lines) { for (String line : this.lines) {
@@ -123,8 +125,13 @@ public final class ManagedProcess {
return Collections.unmodifiableList(res); return Collections.unmodifiableList(res);
} }
public synchronized void addLine(String line) { public void addLine(String line) {
lines.add(line); lock.lock();
try {
lines.add(line);
} finally {
lock.unlock();
}
} }
/** /**
@@ -133,15 +140,20 @@ public final class ManagedProcess {
* If a thread is monitoring this raw process, * If a thread is monitoring this raw process,
* you are required to add the instance by this method. * you are required to add the instance by this method.
*/ */
public synchronized void addRelatedThread(Thread thread) { public void addRelatedThread(Thread thread) {
relatedThreads.add(thread); lock.lock();
try {
relatedThreads.add(thread);
} finally {
lock.unlock();
}
} }
public synchronized void pumpInputStream(Consumer<String> onLogLine) { public void pumpInputStream(Consumer<String> onLogLine) {
addRelatedThread(Lang.thread(new StreamPump(process.getInputStream(), onLogLine, OperatingSystem.NATIVE_CHARSET), "ProcessInputStreamPump", true)); addRelatedThread(Lang.thread(new StreamPump(process.getInputStream(), onLogLine, OperatingSystem.NATIVE_CHARSET), "ProcessInputStreamPump", true));
} }
public synchronized void pumpErrorStream(Consumer<String> onLogLine) { public void pumpErrorStream(Consumer<String> onLogLine) {
addRelatedThread(Lang.thread(new StreamPump(process.getErrorStream(), onLogLine, OperatingSystem.NATIVE_CHARSET), "ProcessErrorStreamPump", true)); addRelatedThread(Lang.thread(new StreamPump(process.getErrorStream(), onLogLine, OperatingSystem.NATIVE_CHARSET), "ProcessErrorStreamPump", true));
} }
@@ -172,8 +184,13 @@ public final class ManagedProcess {
destroyRelatedThreads(); destroyRelatedThreads();
} }
public synchronized void destroyRelatedThreads() { public void destroyRelatedThreads() {
relatedThreads.forEach(Thread::interrupt); lock.lock();
try {
relatedThreads.forEach(Thread::interrupt);
} finally {
lock.unlock();
}
} }
@Override @Override