mirror of
https://github.com/SikongJueluo/cc-utils.git
synced 2025-11-29 12:57:50 +08:00
feature: rwlock; fix: test for semaphore
feature: - add read & write lock and test - sorted array add peek function fix: - fix semaphore test (sleep block in async) reconstruct: - semaphore releaser
This commit is contained in:
191
src/lib/ReadWriteLock.ts
Normal file
191
src/lib/ReadWriteLock.ts
Normal file
@@ -0,0 +1,191 @@
|
||||
import { Semaphore } from "./Semaphore";
|
||||
|
||||
const E_CANCELED = new Error("Read-write lock canceled");
|
||||
|
||||
export interface ReadLockHandle {
|
||||
release(): void;
|
||||
}
|
||||
|
||||
export interface WriteLockHandle {
|
||||
release(): void;
|
||||
}
|
||||
|
||||
export class ReadWriteLock {
|
||||
private _semaphore: Semaphore;
|
||||
private _maxReaders: number;
|
||||
private _writerWeight: number;
|
||||
private _readerPriority: number;
|
||||
private _writerPriority: number;
|
||||
|
||||
constructor(
|
||||
maxReaders = 1000,
|
||||
readerPriority = 10,
|
||||
writerPriority = 0, // Lower number = higher priority
|
||||
cancelError: Error = E_CANCELED,
|
||||
) {
|
||||
if (maxReaders <= 0) {
|
||||
throw new Error("Max readers must be positive");
|
||||
}
|
||||
|
||||
this._maxReaders = maxReaders;
|
||||
this._writerWeight = maxReaders; // Writers need all capacity for exclusivity
|
||||
this._readerPriority = readerPriority;
|
||||
this._writerPriority = writerPriority;
|
||||
this._semaphore = new Semaphore(maxReaders, cancelError);
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquires a read lock. Multiple readers can hold the lock simultaneously.
|
||||
*/
|
||||
async acquireRead(): Promise<ReadLockHandle> {
|
||||
const [, release] = await this._semaphore.acquire(1, this._readerPriority);
|
||||
|
||||
return { release };
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to acquire a read lock immediately. Returns null if not available.
|
||||
*/
|
||||
tryAcquireRead(): ReadLockHandle | null {
|
||||
const release = this._semaphore.tryAcquire(1);
|
||||
|
||||
if (release === null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return { release };
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquires a write lock. Only one writer can hold the lock at a time,
|
||||
* and it has exclusive access (no readers can access simultaneously).
|
||||
*/
|
||||
async acquireWrite(): Promise<WriteLockHandle> {
|
||||
const [, release] = await this._semaphore.acquire(
|
||||
this._writerWeight,
|
||||
this._writerPriority,
|
||||
);
|
||||
|
||||
return { release };
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to acquire a write lock immediately. Returns null if not available.
|
||||
*/
|
||||
tryAcquireWrite(): WriteLockHandle | null {
|
||||
const release = this._semaphore.tryAcquire(this._writerWeight);
|
||||
|
||||
if (release === null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return { release };
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a callback with a read lock.
|
||||
*/
|
||||
async runWithReadLock<T>(callback: () => T | Promise<T>): Promise<T> {
|
||||
return this._semaphore.runExclusive(
|
||||
async () => await callback(),
|
||||
1,
|
||||
this._readerPriority,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a callback with a write lock (exclusive access).
|
||||
*/
|
||||
async runWithWriteLock<T>(callback: () => T | Promise<T>): Promise<T> {
|
||||
return this._semaphore.runExclusive(
|
||||
async () => await callback(),
|
||||
this._writerWeight,
|
||||
this._writerPriority,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits until a read lock could be acquired (but doesn't acquire it).
|
||||
*/
|
||||
async waitForReadUnlock(): Promise<void> {
|
||||
return this._semaphore.waitForUnlock(1, this._readerPriority);
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits until a write lock could be acquired (but doesn't acquire it).
|
||||
*/
|
||||
async waitForWriteUnlock(): Promise<void> {
|
||||
return this._semaphore.waitForUnlock(
|
||||
this._writerWeight,
|
||||
this._writerPriority,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if any locks are currently held.
|
||||
*/
|
||||
isLocked(): boolean {
|
||||
return this._semaphore.isLocked();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if a write lock is currently held (exclusive access).
|
||||
*/
|
||||
isWriteLocked(): boolean {
|
||||
return this._semaphore.getValue() <= 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if only read locks are held (no write lock).
|
||||
*/
|
||||
isReadLocked(): boolean {
|
||||
const currentValue = this._semaphore.getValue();
|
||||
return currentValue < this._maxReaders && currentValue > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of available read slots.
|
||||
*/
|
||||
getAvailableReads(): number {
|
||||
return Math.max(0, this._semaphore.getValue());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current number of active readers (approximate).
|
||||
*/
|
||||
getActiveReaders(): number {
|
||||
const available = this._semaphore.getValue();
|
||||
if (available <= 0) {
|
||||
return 0; // Write lock is held
|
||||
}
|
||||
return this._maxReaders - available;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels all pending lock acquisitions.
|
||||
*/
|
||||
cancel(): void {
|
||||
this._semaphore.cancel();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the maximum number of concurrent readers allowed.
|
||||
*/
|
||||
getMaxReaders(): number {
|
||||
return this._maxReaders;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum number of concurrent readers.
|
||||
* Note: This may affect currently waiting operations.
|
||||
*/
|
||||
setMaxReaders(maxReaders: number): void {
|
||||
if (maxReaders <= 0) {
|
||||
throw new Error("Max readers must be positive");
|
||||
}
|
||||
|
||||
this._maxReaders = maxReaders;
|
||||
this._writerWeight = maxReaders;
|
||||
this._semaphore.setValue(maxReaders);
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,8 @@ interface Waiter {
|
||||
resolve(): void;
|
||||
}
|
||||
|
||||
type Releaser = () => void;
|
||||
|
||||
export class Semaphore {
|
||||
private _value: number;
|
||||
private _cancelError: Error;
|
||||
@@ -27,7 +29,7 @@ export class Semaphore {
|
||||
this._cancelError = cancelError;
|
||||
}
|
||||
|
||||
acquire(weight = 1, priority = 0): Promise<[number, () => void]> {
|
||||
acquire(weight = 1, priority = 0): Promise<[number, Releaser]> {
|
||||
if (weight <= 0) {
|
||||
throw new Error(`invalid weight ${weight}: must be positive`);
|
||||
}
|
||||
@@ -43,7 +45,7 @@ export class Semaphore {
|
||||
});
|
||||
}
|
||||
|
||||
tryAcquire(weight = 1): (() => void) | null {
|
||||
tryAcquire(weight = 1): Releaser | null {
|
||||
if (weight <= 0) {
|
||||
throw new Error(`invalid weight ${weight}: must be positive`);
|
||||
}
|
||||
@@ -139,11 +141,10 @@ export class Semaphore {
|
||||
}
|
||||
|
||||
private _peek(): QueueEntry | undefined {
|
||||
const array = this._queue.toArray();
|
||||
return array.length > 0 ? array[0] : undefined;
|
||||
return this._queue.peek();
|
||||
}
|
||||
|
||||
private _newReleaser(weight: number): () => void {
|
||||
private _newReleaser(weight: number): Releaser {
|
||||
let called = false;
|
||||
return () => {
|
||||
if (called) return;
|
||||
|
||||
@@ -72,6 +72,10 @@ export class SortedArray<T> {
|
||||
return value?.data;
|
||||
}
|
||||
|
||||
public peek(): T | undefined {
|
||||
return this._data[0]?.data;
|
||||
}
|
||||
|
||||
public toArray(): T[] {
|
||||
return this._data.map(({ data }) => data);
|
||||
}
|
||||
|
||||
@@ -1,7 +1,22 @@
|
||||
import { testTimeBasedRotation } from "./testCCLog";
|
||||
import { testSortedArray } from "./testSortedArray";
|
||||
import { testSemaphore } from "./testSemaphore";
|
||||
import { testReadWriteLock } from "./testReadWriteLock";
|
||||
|
||||
testTimeBasedRotation();
|
||||
testSortedArray();
|
||||
testSemaphore();
|
||||
testSemaphore()
|
||||
.then(() => {
|
||||
print("Semaphore test completed");
|
||||
return testReadWriteLock();
|
||||
})
|
||||
.catch((error) => {
|
||||
print(`Semaphore test failed: ${error}`);
|
||||
});
|
||||
testReadWriteLock()
|
||||
.then(() => {
|
||||
print("ReadWriteLock test completed");
|
||||
})
|
||||
.catch((error) => {
|
||||
print(`Test failed: ${error}`);
|
||||
});
|
||||
|
||||
160
src/test/testReadWriteLock.ts
Normal file
160
src/test/testReadWriteLock.ts
Normal file
@@ -0,0 +1,160 @@
|
||||
import { ReadWriteLock } from "../lib/ReadWriteLock";
|
||||
|
||||
function assert(condition: boolean, message: string) {
|
||||
if (!condition) {
|
||||
error(message);
|
||||
}
|
||||
}
|
||||
|
||||
export async function testReadWriteLock() {
|
||||
print("Testing ReadWriteLock...");
|
||||
|
||||
async function testMultipleReaders() {
|
||||
const lock = new ReadWriteLock(3);
|
||||
const reader1 = await lock.acquireRead();
|
||||
const reader2 = await lock.acquireRead();
|
||||
assert(
|
||||
lock.getActiveReaders() === 2,
|
||||
"allows multiple readers: active readers should be 2",
|
||||
);
|
||||
reader1.release();
|
||||
assert(
|
||||
lock.getActiveReaders() === 1,
|
||||
"allows multiple readers: active readers should be 1",
|
||||
);
|
||||
reader2.release();
|
||||
assert(
|
||||
lock.getActiveReaders() === 0,
|
||||
"allows multiple readers: active readers should be 0",
|
||||
);
|
||||
print("testMultipleReaders passed");
|
||||
}
|
||||
|
||||
async function testSingleWriter() {
|
||||
const lock = new ReadWriteLock(3);
|
||||
const writer = await lock.acquireWrite();
|
||||
assert(
|
||||
lock.isWriteLocked() === true,
|
||||
"allows only one writer: isWriteLocked should be true",
|
||||
);
|
||||
writer.release();
|
||||
assert(
|
||||
lock.isWriteLocked() === false,
|
||||
"allows only one writer: isWriteLocked should be false",
|
||||
);
|
||||
print("testSingleWriter passed");
|
||||
}
|
||||
|
||||
async function testWriterBlocksReaders() {
|
||||
const lock = new ReadWriteLock(3);
|
||||
const writer = await lock.acquireWrite();
|
||||
let readerAcquired = false;
|
||||
const _ = lock.acquireRead().then(() => {
|
||||
readerAcquired = true;
|
||||
});
|
||||
assert(
|
||||
!readerAcquired,
|
||||
"blocks readers when a writer has the lock: reader should not be acquired yet",
|
||||
);
|
||||
writer.release();
|
||||
assert(
|
||||
readerAcquired,
|
||||
"blocks readers when a writer has the lock: reader should be acquired now",
|
||||
);
|
||||
print("testWriterBlocksReaders passed");
|
||||
}
|
||||
|
||||
async function testReaderBlocksWriters() {
|
||||
const lock = new ReadWriteLock(3);
|
||||
const reader = await lock.acquireRead();
|
||||
let writerAcquired = false;
|
||||
const _ = lock.acquireWrite().then(() => {
|
||||
writerAcquired = true;
|
||||
});
|
||||
assert(
|
||||
!writerAcquired,
|
||||
"blocks writers when a reader has the lock: writer should not be acquired yet",
|
||||
);
|
||||
reader.release();
|
||||
assert(
|
||||
writerAcquired,
|
||||
"blocks writers when a reader has the lock: writer should be acquired now",
|
||||
);
|
||||
print("testReaderBlocksWriters passed");
|
||||
}
|
||||
|
||||
function testTryAcquireRead() {
|
||||
const lock = new ReadWriteLock(1);
|
||||
const reader1 = lock.tryAcquireRead();
|
||||
assert(
|
||||
reader1 !== null,
|
||||
"tryAcquireRead works: first reader should be acquired",
|
||||
);
|
||||
const reader2 = lock.tryAcquireRead();
|
||||
assert(
|
||||
reader2 === null,
|
||||
"tryAcquireRead works: second reader should not be acquired",
|
||||
);
|
||||
reader1!.release();
|
||||
const reader3 = lock.tryAcquireRead();
|
||||
assert(
|
||||
reader3 !== null,
|
||||
"tryAcquireRead works: third reader should be acquired",
|
||||
);
|
||||
reader3!.release();
|
||||
print("testTryAcquireRead passed");
|
||||
}
|
||||
|
||||
function testTryAcquireWrite() {
|
||||
const lock = new ReadWriteLock();
|
||||
const writer1 = lock.tryAcquireWrite();
|
||||
assert(
|
||||
writer1 !== null,
|
||||
"tryAcquireWrite works: first writer should be acquired",
|
||||
);
|
||||
const writer2 = lock.tryAcquireWrite();
|
||||
assert(
|
||||
writer2 === null,
|
||||
"tryAcquireWrite works: second writer should not be acquired",
|
||||
);
|
||||
writer1!.release();
|
||||
const writer3 = lock.tryAcquireWrite();
|
||||
assert(
|
||||
writer3 !== null,
|
||||
"tryAcquireWrite works: third writer should be acquired",
|
||||
);
|
||||
writer3!.release();
|
||||
print("testTryAcquireWrite passed");
|
||||
}
|
||||
|
||||
async function testRunWithReadLock() {
|
||||
const lock = new ReadWriteLock();
|
||||
let value = 0;
|
||||
await lock.runWithReadLock(() => {
|
||||
value = 1;
|
||||
});
|
||||
assert(value === 1, "runWithReadLock works: value should be 1");
|
||||
print("testRunWithReadLock passed");
|
||||
}
|
||||
|
||||
async function testRunWithWriteLock() {
|
||||
const lock = new ReadWriteLock();
|
||||
let value = 0;
|
||||
await lock.runWithWriteLock(() => {
|
||||
value = 1;
|
||||
});
|
||||
assert(value === 1, "runWithWriteLock works: value should be 1");
|
||||
print("testRunWithWriteLock passed");
|
||||
}
|
||||
|
||||
await testMultipleReaders();
|
||||
await testSingleWriter();
|
||||
await testWriterBlocksReaders();
|
||||
await testReaderBlocksWriters();
|
||||
testTryAcquireRead();
|
||||
testTryAcquireWrite();
|
||||
await testRunWithReadLock();
|
||||
await testRunWithWriteLock();
|
||||
|
||||
print("ReadWriteLock tests passed!");
|
||||
}
|
||||
@@ -62,30 +62,27 @@ async function testQueueing() {
|
||||
const [, release1] = await s.acquire();
|
||||
events.push("acquired1");
|
||||
|
||||
// These two will be queued
|
||||
await s.acquire().then(([, release]) => {
|
||||
// These two will be queued. Store their promises.
|
||||
const p2 = s.acquire().then(([, release]) => {
|
||||
events.push("acquired2");
|
||||
sleep(0.1);
|
||||
release();
|
||||
events.push("released2");
|
||||
});
|
||||
|
||||
await s.acquire().then(([, release]) => {
|
||||
events.push("acquired3");
|
||||
release();
|
||||
events.push("released3");
|
||||
});
|
||||
|
||||
// Give some time for promises to queue
|
||||
sleep(0.1);
|
||||
const p3 = s.acquire().then(([, release]) => {
|
||||
events.push("acquired3");
|
||||
events.push("released3");
|
||||
release();
|
||||
});
|
||||
|
||||
assert(events.length === 1, "Only first acquire should have completed");
|
||||
|
||||
// Release the first lock, allowing the queue to proceed
|
||||
release1();
|
||||
events.push("released1");
|
||||
release1();
|
||||
|
||||
// Wait for all promises to finish
|
||||
sleep(0.5);
|
||||
await Promise.all([p2, p3]);
|
||||
|
||||
const expected = [
|
||||
"acquired1",
|
||||
@@ -111,20 +108,20 @@ async function testPriority() {
|
||||
events.push("acquired_main");
|
||||
|
||||
// Queue with low priority
|
||||
await s.acquire(1, 10).then(([, release]) => {
|
||||
const p1 = s.acquire(1, 10).then(([, release]) => {
|
||||
events.push("acquired_low_prio");
|
||||
release();
|
||||
});
|
||||
|
||||
// Queue with high priority
|
||||
await s.acquire(1, 1).then(([, release]) => {
|
||||
const p2 = s.acquire(1, 1).then(([, release]) => {
|
||||
events.push("acquired_high_prio");
|
||||
release();
|
||||
});
|
||||
|
||||
sleep(0.1);
|
||||
release1();
|
||||
sleep(0.1);
|
||||
|
||||
await Promise.all([p1, p2]);
|
||||
|
||||
const expected = ["acquired_main", "acquired_high_prio", "acquired_low_prio"];
|
||||
assert(
|
||||
@@ -142,16 +139,15 @@ async function testWaitForUnlock() {
|
||||
const [, release] = await s.acquire();
|
||||
assert(s.isLocked(), "Semaphore should be locked");
|
||||
|
||||
await s.waitForUnlock().then(() => {
|
||||
const p1 = s.waitForUnlock().then(() => {
|
||||
waited = true;
|
||||
assert(!s.isLocked(), "Should be unlocked when wait is over");
|
||||
});
|
||||
|
||||
sleep(0.1);
|
||||
assert(!waited, "waitForUnlock should not resolve yet");
|
||||
|
||||
release();
|
||||
sleep(0.1);
|
||||
await Promise.all([p1]);
|
||||
assert(waited, "waitForUnlock should have resolved");
|
||||
print(" Test passed: testWaitForUnlock");
|
||||
}
|
||||
@@ -174,9 +170,7 @@ async function testCancel() {
|
||||
},
|
||||
);
|
||||
|
||||
sleep(0.1);
|
||||
s.cancel();
|
||||
sleep(0.1);
|
||||
|
||||
assert(rejected, "pending acquire should have been rejected");
|
||||
assert(s.getValue() === 0, "cancel should not affect current lock");
|
||||
|
||||
Reference in New Issue
Block a user