add message protocol

This commit is contained in:
SikongJueluo 2025-03-26 20:52:23 +08:00
parent 4fb4c08044
commit 67439780ed
No known key found for this signature in database
11 changed files with 159 additions and 106 deletions

View File

@ -10,6 +10,7 @@
"lodash": "^4.17.21", "lodash": "^4.17.21",
"log-symbols": "^7.0.0", "log-symbols": "^7.0.0",
"pinia": "^3.0.1", "pinia": "^3.0.1",
"tinypool": "^1.0.2",
"trpc-bun-adapter": "^1.2.2", "trpc-bun-adapter": "^1.2.2",
"ts-log": "^2.2.7", "ts-log": "^2.2.7",
"ts-results-es": "^5.0.1", "ts-results-es": "^5.0.1",
@ -525,6 +526,8 @@
"tapable": ["tapable@2.2.1", "", {}, "sha512-GNzQvQTOIP6RyTfE2Qxb8ZVlNmw0n88vp1szwWRimP02mnTsx3Wtn5qRdqY9w2XduFNUgvOwhNnQsjwCp+kqaQ=="], "tapable": ["tapable@2.2.1", "", {}, "sha512-GNzQvQTOIP6RyTfE2Qxb8ZVlNmw0n88vp1szwWRimP02mnTsx3Wtn5qRdqY9w2XduFNUgvOwhNnQsjwCp+kqaQ=="],
"tinypool": ["tinypool@1.0.2", "", {}, "sha512-al6n+QEANGFOMf/dmUMsuS5/r9B06uwlyNjZZql/zv8J7ybHCgoihBNORZCY2mzUuAnomQa2JdhyHKzZxPCrFA=="],
"totalist": ["totalist@3.0.1", "", {}, "sha512-sf4i37nQ2LBx4m3wB74y+ubopq6W/dIzXg0FDGjsYnZHVa1Da8FH853wlL2gtUhg+xJXjfk3kUZS3BRoQeoQBQ=="], "totalist": ["totalist@3.0.1", "", {}, "sha512-sf4i37nQ2LBx4m3wB74y+ubopq6W/dIzXg0FDGjsYnZHVa1Da8FH853wlL2gtUhg+xJXjfk3kUZS3BRoQeoQBQ=="],
"trpc-bun-adapter": ["trpc-bun-adapter@1.2.2", "", { "peerDependencies": { "@trpc/server": "^11.0.0-rc.566" } }, "sha512-TVhZEDXZvhIM2lfNTVCx9u5fu7/86b7RuhQSM0CUs4vlIan64Sfko5m0stbjqSHNgvLBsvXKtUD8FeQOQGLfpg=="], "trpc-bun-adapter": ["trpc-bun-adapter@1.2.2", "", { "peerDependencies": { "@trpc/server": "^11.0.0-rc.566" } }, "sha512-TVhZEDXZvhIM2lfNTVCx9u5fu7/86b7RuhQSM0CUs4vlIan64Sfko5m0stbjqSHNgvLBsvXKtUD8FeQOQGLfpg=="],

View File

@ -19,6 +19,7 @@
"lodash": "^4.17.21", "lodash": "^4.17.21",
"log-symbols": "^7.0.0", "log-symbols": "^7.0.0",
"pinia": "^3.0.1", "pinia": "^3.0.1",
"tinypool": "^1.0.2",
"trpc-bun-adapter": "^1.2.2", "trpc-bun-adapter": "^1.2.2",
"ts-log": "^2.2.7", "ts-log": "^2.2.7",
"ts-results-es": "^5.0.1", "ts-results-es": "^5.0.1",

View File

@ -10,6 +10,8 @@ export const int8 = z.number().lt(Math.pow(2, 7)).gte(-Math.pow(2, 8))
export const int16 = z.number().lt(Math.pow(2, 15)).gte(-Math.pow(2, 16)) export const int16 = z.number().lt(Math.pow(2, 15)).gte(-Math.pow(2, 16))
export const int32 = z.number().lt(Math.pow(2, 31)).gte(-Math.pow(2, 32)) export const int32 = z.number().lt(Math.pow(2, 31)).gte(-Math.pow(2, 32))
export function UNUSED(_: unknown): void { }
export namespace fun { export namespace fun {
export function numberToBytes(num: number, bytesLength: number): Uint8Array { export function numberToBytes(num: number, bytesLength: number): Uint8Array {
var array = new Uint8Array(bytesLength) var array = new Uint8Array(bytesLength)

View File

@ -394,15 +394,15 @@ export namespace BoardTable {
} }
let condition: string let condition: string
if (_.isString(name)) { if (_.isString(val)) {
const retCond = fun.sqlConditionFromString("name", name, "OR") const retCond = fun.sqlConditionFromString(columnName, val, "OR")
if (retCond.isSome()) { if (retCond.isSome()) {
condition = retCond.value condition = retCond.value
} else { } else {
return new Err(BOARD_ERR_WRONG_TYPE) return new Err(BOARD_ERR_WRONG_TYPE)
} }
} else if (fun.isStringArray(name)) { } else if (fun.isStringArray(val)) {
const retCond = fun.sqlConditionFromArray("name", name, "OR") const retCond = fun.sqlConditionFromArray(columnName, val, "OR")
if (retCond.isSome()) { if (retCond.isSome()) {
condition = retCond.value condition = retCond.value
} else { } else {

1
server/msgBus.ts Normal file
View File

@ -0,0 +1 @@
import { MsgProtocol } from "./msgProtocol";

79
server/msgProtocol.ts Normal file
View File

@ -0,0 +1,79 @@
import { type TransferListItem } from "worker_threads";
import { z } from "zod";
import { UNUSED } from "./common";
export namespace MsgProtocol {
const QueryDataSchema = z.object({ type: z.literal("Query"), args: z.any() })
const ResultDataSchema = z.object({ type: z.literal("Result"), result: z.any() })
const ErrorDataSchema = z.object({ type: z.literal("Error"), error: z.string() })
const MessageSchema = z.object({
command: z.string(),
data: z.discriminatedUnion("type", [QueryDataSchema, ResultDataSchema, ErrorDataSchema]),
dest: z.string(),
src: z.string(),
})
const MessageQuerySchema = z.object({
command: z.string(),
data: QueryDataSchema,
dest: z.string(),
src: z.string(),
})
const MessageResultSchema = z.object({
command: z.string(),
data: ResultDataSchema,
dest: z.string(),
src: z.string(),
})
const MessageErrorSchema = z.object({
command: z.string(),
data: ErrorDataSchema,
dest: z.string(),
src: z.string(),
})
const MessageHandlerSchema = z.function()
.args(z.union([MessageResultSchema, MessageErrorSchema]))
.returns(z.void())
export type Message = z.infer<typeof MessageSchema>
export type MessageQuery = z.infer<typeof MessageQuerySchema>
export type MessageResult = z.infer<typeof MessageResultSchema>
export type MessageError = z.infer<typeof MessageErrorSchema>
export type MessageHandler = z.infer<typeof MessageHandlerSchema>
export function isMessage(obj: any): obj is Message {
return MessageSchema.safeParse(obj).success
}
export function isMessageQuery(obj: any): obj is MessageQuery {
return MessageQuerySchema.safeParse(obj).success
}
export function isMessageResult(obj: any): obj is MessageResult {
return MessageResultSchema.safeParse(obj).success
}
export function isMessageError(obj: any): obj is MessageError {
return MessageErrorSchema.safeParse(obj).success
}
export function genMessageResult(result: any, srcMsg: MessageQuery): MessageResult {
return {
command: srcMsg.command,
dest: srcMsg.src,
src: srcMsg.dest,
data: {
type: "Result",
result: result
}
} as MessageResult
}
export function genMessageError(error: string, srcMsg: MessageQuery): MessageError {
return {
command: srcMsg.command,
dest: srcMsg.src,
src: srcMsg.dest,
data: {
type: "Error",
error: error
}
} as MessageError
}
}

View File

@ -1,5 +1,4 @@
import { router, publicProcedure } from "./trpc.ts" import { router, publicProcedure } from "./trpc.ts"
// import { addUser } from "./database.ts";
export const appRouter = router({ export const appRouter = router({
api: router({ api: router({

View File

@ -1,58 +1,6 @@
import type { udp } from "bun" import { resolve } from "path";
import Tinypool from "tinypool";
const udpServer = await Bun.udpSocket({ const udpClientsPool = new Tinypool({
port: 33000, filename: resolve(__dirname, "./udpClient.ts")
socket: {
data(_socket, _buf, _port, _addr) {
// todo : Handle Recieved Data
}
}
}) })
export const udpSocketPool = await createUDPSocketPool(5)
type bunUDPSocket = udp.Socket<"buffer">
export function getUDPServerPort() {
return udpServer.port
}
export class UDPSocketPool {
freeSockets: Set<bunUDPSocket> = new Set()
busySockets: Set<bunUDPSocket> = new Set()
getFreeSocket(): bunUDPSocket {
const socket = this.freeSockets.values().next().value
if (socket !== undefined) {
this.busySockets.add(socket)
this.freeSockets.delete(socket)
} else {
throw Error("Failure: Create udp socket failed")
}
return socket
}
releaseSocket(socket: any) {
this.freeSockets.add(socket)
this.busySockets.delete(socket)
}
send(data: udp.Data, port: number, hostname: string) {
const socket = this.getFreeSocket()
socket.send(data, port, hostname)
this.releaseSocket(socket)
}
}
export async function createUDPSocketPool(depth: number): Promise<UDPSocketPool> {
const pool = new UDPSocketPool()
for (var i = 0; i < depth; i++) {
const socket = await Bun.udpSocket({})
pool.freeSockets.add(socket)
}
return pool
}

30
server/udpClient.ts Normal file
View File

@ -0,0 +1,30 @@
import { type udp } from "bun";
import { MsgProtocol } from "./msgProtocol";
import { parentPort } from "worker_threads";
declare var self: Worker
const udpClient = await Bun.udpSocket({})
parentPort?.on("message", (msg: MsgProtocol.MessageQuery) => {
if (MsgProtocol.isMessageQuery(msg)) {
switch (msg.command) {
case "send": {
const args = msg.data.args as {
data: udp.Data,
port: number,
address: string,
}
udpClient.send(args.data, args.port, args.address)
postMessage(MsgProtocol.genMessageResult("Send Successfully", msg))
break
}
default: {
postMessage(MsgProtocol.genMessageError("No Such Command", msg))
break
}
}
} else {
postMessage(MsgProtocol.genMessageError("No Currenct Destination", msg))
}
})

View File

@ -1,9 +1,6 @@
import type { Board } from "./database"; import { fun } from "./common";
import { findBoard, isBoard } from "./database";
import { numberToBytes } from "./common";
import { z } from "zod"; import { z } from "zod";
import _ from "lodash"; import _ from "lodash";
import { udpSocketPool } from "./udp";
export namespace EquipmentPackage { export namespace EquipmentPackage {
const HEADER_LENGTH = 8 const HEADER_LENGTH = 8
@ -75,7 +72,7 @@ export namespace EquipmentPackage {
array[3] = this._reserved array[3] = this._reserved
array[4] = this._reserved array[4] = this._reserved
let addressBytes = numberToBytes(this.address, 3) let addressBytes = fun.numberToBytes(this.address, 3)
array[5] = addressBytes[0] array[5] = addressBytes[0]
array[6] = addressBytes[1] array[6] = addressBytes[1]
array[7] = addressBytes[2] array[7] = addressBytes[2]
@ -117,43 +114,3 @@ export namespace EquipmentPackage {
} }
} }
export class Equipment {
board: Board
constructor(name?: string)
constructor(board?: Board)
constructor(arg1: any) {
if (isBoard(arg1)) {
this.board = arg1
} else if (typeof arg1 === "string") {
try {
const board = findBoard(arg1)
this.board = board
} catch (error) {
throw new Error("Equipment Construction Failure")
}
} else {
throw new Error("Equipment Construction Failure")
}
}
send(header: EquipmentPackage.HeaderOptions) {
const pack = new EquipmentPackage.Package(header)
return udpSocketPool.send(pack.toUint8Array(), this.board.port, this.board.ipv4)
}
// TODO: add params file
uploadBitStream() {
const header: EquipmentPackage.HeaderOptions = {
returnAck: true,
transformType: "Extend",
readWriteType: "w"
}
return this.send(header)
}
}

33
server/udpServer.ts Normal file
View File

@ -0,0 +1,33 @@
import { parentPort } from "worker_threads"
import { MsgProtocol } from "./msgProtocol"
import _ from "lodash"
declare var self: Worker
export type UDPServerMsgType = "port"
const udpServer = await Bun.udpSocket({
port: 33000,
socket: {
data(_socket, _buf, _port, _addr) {
// todo : Handle Recieved Data
}
}
})
parentPort?.on("message", (msg: MsgProtocol.MessageQuery) => {
if (MsgProtocol.isMessageQuery(msg)) {
switch (msg.command) {
case "port": {
postMessage(MsgProtocol.genMessageResult(udpServer.port, msg))
break
}
default: {
break
}
}
} else {
return
}
})
postMessage("UDP Server Start Successfully!")