diff --git a/bun.lock b/bun.lock index ba78f2b..a246c5b 100644 --- a/bun.lock +++ b/bun.lock @@ -10,6 +10,7 @@ "lodash": "^4.17.21", "log-symbols": "^7.0.0", "pinia": "^3.0.1", + "tinypool": "^1.0.2", "trpc-bun-adapter": "^1.2.2", "ts-log": "^2.2.7", "ts-results-es": "^5.0.1", @@ -525,6 +526,8 @@ "tapable": ["tapable@2.2.1", "", {}, "sha512-GNzQvQTOIP6RyTfE2Qxb8ZVlNmw0n88vp1szwWRimP02mnTsx3Wtn5qRdqY9w2XduFNUgvOwhNnQsjwCp+kqaQ=="], + "tinypool": ["tinypool@1.0.2", "", {}, "sha512-al6n+QEANGFOMf/dmUMsuS5/r9B06uwlyNjZZql/zv8J7ybHCgoihBNORZCY2mzUuAnomQa2JdhyHKzZxPCrFA=="], + "totalist": ["totalist@3.0.1", "", {}, "sha512-sf4i37nQ2LBx4m3wB74y+ubopq6W/dIzXg0FDGjsYnZHVa1Da8FH853wlL2gtUhg+xJXjfk3kUZS3BRoQeoQBQ=="], "trpc-bun-adapter": ["trpc-bun-adapter@1.2.2", "", { "peerDependencies": { "@trpc/server": "^11.0.0-rc.566" } }, "sha512-TVhZEDXZvhIM2lfNTVCx9u5fu7/86b7RuhQSM0CUs4vlIan64Sfko5m0stbjqSHNgvLBsvXKtUD8FeQOQGLfpg=="], diff --git a/package.json b/package.json index 66deb12..b278ada 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,7 @@ "lodash": "^4.17.21", "log-symbols": "^7.0.0", "pinia": "^3.0.1", + "tinypool": "^1.0.2", "trpc-bun-adapter": "^1.2.2", "ts-log": "^2.2.7", "ts-results-es": "^5.0.1", diff --git a/server/common.ts b/server/common.ts index e9507b5..63effc7 100644 --- a/server/common.ts +++ b/server/common.ts @@ -10,6 +10,8 @@ export const int8 = z.number().lt(Math.pow(2, 7)).gte(-Math.pow(2, 8)) export const int16 = z.number().lt(Math.pow(2, 15)).gte(-Math.pow(2, 16)) export const int32 = z.number().lt(Math.pow(2, 31)).gte(-Math.pow(2, 32)) +export function UNUSED(_: unknown): void { } + export namespace fun { export function numberToBytes(num: number, bytesLength: number): Uint8Array { var array = new Uint8Array(bytesLength) diff --git a/server/database.ts b/server/database.ts index d6b1134..bc6a101 100644 --- a/server/database.ts +++ b/server/database.ts @@ -394,15 +394,15 @@ export namespace BoardTable { } let condition: string - if (_.isString(name)) { - const retCond = fun.sqlConditionFromString("name", name, "OR") + if (_.isString(val)) { + const retCond = fun.sqlConditionFromString(columnName, val, "OR") if (retCond.isSome()) { condition = retCond.value } else { return new Err(BOARD_ERR_WRONG_TYPE) } - } else if (fun.isStringArray(name)) { - const retCond = fun.sqlConditionFromArray("name", name, "OR") + } else if (fun.isStringArray(val)) { + const retCond = fun.sqlConditionFromArray(columnName, val, "OR") if (retCond.isSome()) { condition = retCond.value } else { diff --git a/server/msgBus.ts b/server/msgBus.ts new file mode 100644 index 0000000..c413269 --- /dev/null +++ b/server/msgBus.ts @@ -0,0 +1 @@ +import { MsgProtocol } from "./msgProtocol"; diff --git a/server/msgProtocol.ts b/server/msgProtocol.ts new file mode 100644 index 0000000..5c8fdab --- /dev/null +++ b/server/msgProtocol.ts @@ -0,0 +1,79 @@ +import { type TransferListItem } from "worker_threads"; +import { z } from "zod"; +import { UNUSED } from "./common"; + +export namespace MsgProtocol { + const QueryDataSchema = z.object({ type: z.literal("Query"), args: z.any() }) + const ResultDataSchema = z.object({ type: z.literal("Result"), result: z.any() }) + const ErrorDataSchema = z.object({ type: z.literal("Error"), error: z.string() }) + + const MessageSchema = z.object({ + command: z.string(), + data: z.discriminatedUnion("type", [QueryDataSchema, ResultDataSchema, ErrorDataSchema]), + dest: z.string(), + src: z.string(), + }) + const MessageQuerySchema = z.object({ + command: z.string(), + data: QueryDataSchema, + dest: z.string(), + src: z.string(), + }) + const MessageResultSchema = z.object({ + command: z.string(), + data: ResultDataSchema, + dest: z.string(), + src: z.string(), + }) + const MessageErrorSchema = z.object({ + command: z.string(), + data: ErrorDataSchema, + dest: z.string(), + src: z.string(), + }) + const MessageHandlerSchema = z.function() + .args(z.union([MessageResultSchema, MessageErrorSchema])) + .returns(z.void()) + + export type Message = z.infer + export type MessageQuery = z.infer + export type MessageResult = z.infer + export type MessageError = z.infer + export type MessageHandler = z.infer + + export function isMessage(obj: any): obj is Message { + return MessageSchema.safeParse(obj).success + } + export function isMessageQuery(obj: any): obj is MessageQuery { + return MessageQuerySchema.safeParse(obj).success + } + export function isMessageResult(obj: any): obj is MessageResult { + return MessageResultSchema.safeParse(obj).success + } + export function isMessageError(obj: any): obj is MessageError { + return MessageErrorSchema.safeParse(obj).success + } + + export function genMessageResult(result: any, srcMsg: MessageQuery): MessageResult { + return { + command: srcMsg.command, + dest: srcMsg.src, + src: srcMsg.dest, + data: { + type: "Result", + result: result + } + } as MessageResult + } + export function genMessageError(error: string, srcMsg: MessageQuery): MessageError { + return { + command: srcMsg.command, + dest: srcMsg.src, + src: srcMsg.dest, + data: { + type: "Error", + error: error + } + } as MessageError + } +} diff --git a/server/router.ts b/server/router.ts index 4c1e36a..19690b6 100644 --- a/server/router.ts +++ b/server/router.ts @@ -1,5 +1,4 @@ import { router, publicProcedure } from "./trpc.ts" -// import { addUser } from "./database.ts"; export const appRouter = router({ api: router({ diff --git a/server/udp.ts b/server/udp.ts index 4c6d5f1..72ca459 100644 --- a/server/udp.ts +++ b/server/udp.ts @@ -1,58 +1,6 @@ -import type { udp } from "bun" +import { resolve } from "path"; +import Tinypool from "tinypool"; -const udpServer = await Bun.udpSocket({ - port: 33000, - socket: { - data(_socket, _buf, _port, _addr) { - // todo : Handle Recieved Data - } - } +const udpClientsPool = new Tinypool({ + filename: resolve(__dirname, "./udpClient.ts") }) - -export const udpSocketPool = await createUDPSocketPool(5) - -type bunUDPSocket = udp.Socket<"buffer"> - -export function getUDPServerPort() { - return udpServer.port -} - -export class UDPSocketPool { - freeSockets: Set = new Set() - busySockets: Set = new Set() - - getFreeSocket(): bunUDPSocket { - const socket = this.freeSockets.values().next().value - if (socket !== undefined) { - this.busySockets.add(socket) - this.freeSockets.delete(socket) - } else { - throw Error("Failure: Create udp socket failed") - } - return socket - } - - releaseSocket(socket: any) { - this.freeSockets.add(socket) - this.busySockets.delete(socket) - } - - send(data: udp.Data, port: number, hostname: string) { - const socket = this.getFreeSocket() - socket.send(data, port, hostname) - this.releaseSocket(socket) - } - -} - -export async function createUDPSocketPool(depth: number): Promise { - const pool = new UDPSocketPool() - for (var i = 0; i < depth; i++) { - const socket = await Bun.udpSocket({}) - pool.freeSockets.add(socket) - } - - return pool -} - - diff --git a/server/udpClient.ts b/server/udpClient.ts new file mode 100644 index 0000000..2bb7828 --- /dev/null +++ b/server/udpClient.ts @@ -0,0 +1,30 @@ +import { type udp } from "bun"; +import { MsgProtocol } from "./msgProtocol"; +import { parentPort } from "worker_threads"; + +declare var self: Worker + +const udpClient = await Bun.udpSocket({}) + +parentPort?.on("message", (msg: MsgProtocol.MessageQuery) => { + if (MsgProtocol.isMessageQuery(msg)) { + switch (msg.command) { + case "send": { + const args = msg.data.args as { + data: udp.Data, + port: number, + address: string, + } + udpClient.send(args.data, args.port, args.address) + postMessage(MsgProtocol.genMessageResult("Send Successfully", msg)) + break + } + default: { + postMessage(MsgProtocol.genMessageError("No Such Command", msg)) + break + } + } + } else { + postMessage(MsgProtocol.genMessageError("No Currenct Destination", msg)) + } +}) diff --git a/server/equipment.ts b/server/udpPackage.ts similarity index 73% rename from server/equipment.ts rename to server/udpPackage.ts index 79ab187..a914c9a 100644 --- a/server/equipment.ts +++ b/server/udpPackage.ts @@ -1,9 +1,6 @@ -import type { Board } from "./database"; -import { findBoard, isBoard } from "./database"; -import { numberToBytes } from "./common"; +import { fun } from "./common"; import { z } from "zod"; import _ from "lodash"; -import { udpSocketPool } from "./udp"; export namespace EquipmentPackage { const HEADER_LENGTH = 8 @@ -75,7 +72,7 @@ export namespace EquipmentPackage { array[3] = this._reserved array[4] = this._reserved - let addressBytes = numberToBytes(this.address, 3) + let addressBytes = fun.numberToBytes(this.address, 3) array[5] = addressBytes[0] array[6] = addressBytes[1] array[7] = addressBytes[2] @@ -117,43 +114,3 @@ export namespace EquipmentPackage { } } - -export class Equipment { - board: Board - - constructor(name?: string) - constructor(board?: Board) - constructor(arg1: any) { - if (isBoard(arg1)) { - this.board = arg1 - - } else if (typeof arg1 === "string") { - try { - const board = findBoard(arg1) - this.board = board - } catch (error) { - throw new Error("Equipment Construction Failure") - } - - } else { - throw new Error("Equipment Construction Failure") - } - } - - send(header: EquipmentPackage.HeaderOptions) { - const pack = new EquipmentPackage.Package(header) - - return udpSocketPool.send(pack.toUint8Array(), this.board.port, this.board.ipv4) - } - - // TODO: add params file - uploadBitStream() { - const header: EquipmentPackage.HeaderOptions = { - returnAck: true, - transformType: "Extend", - readWriteType: "w" - } - - return this.send(header) - } -} diff --git a/server/udpServer.ts b/server/udpServer.ts new file mode 100644 index 0000000..7a54a76 --- /dev/null +++ b/server/udpServer.ts @@ -0,0 +1,33 @@ +import { parentPort } from "worker_threads" +import { MsgProtocol } from "./msgProtocol" +import _ from "lodash" + +declare var self: Worker +export type UDPServerMsgType = "port" + +const udpServer = await Bun.udpSocket({ + port: 33000, + socket: { + data(_socket, _buf, _port, _addr) { + // todo : Handle Recieved Data + } + } +}) + +parentPort?.on("message", (msg: MsgProtocol.MessageQuery) => { + if (MsgProtocol.isMessageQuery(msg)) { + switch (msg.command) { + case "port": { + postMessage(MsgProtocol.genMessageResult(udpServer.port, msg)) + break + } + default: { + break + } + } + } else { + return + } +}) + +postMessage("UDP Server Start Successfully!")