diff --git a/index.ts b/index.ts index 62b1068..ea6f2ce 100644 --- a/index.ts +++ b/index.ts @@ -1,16 +1,12 @@ import { PubSubClient, PubSubRedemptionMessage } from "twitch-pubsub-client"; import { getApiClient, getAuthProvider } from "./src/backend/helpers/twitch"; +import { listen, sockets } from "./src/backend/webServer"; -import { AddressInfo } from "net"; import { ApiClient } from "twitch"; import { ChatClient } from "twitch-chat-client"; -import WebSocket from "ws"; -import express from "express"; import { promises as fs } from "fs"; -import path from "path"; const SCHEDULED_FILE = "./scheduled.json"; -const DEV_MODE = process.env.NODE_ENV === "development"; const scheduledActions: Array = []; let saInterval: NodeJS.Timeout; @@ -20,6 +16,12 @@ const channel = "alexbcberio"; let apiClient: ApiClient; let chatClient: ChatClient; +export { + handleClientAction, + scheduledActions, + saveScheduledActions +} + //! Important: store users & channels by id, not by username async function init() { @@ -46,6 +48,8 @@ async function init() { }); chatClient.connect(); + + listen(); } init(); @@ -100,58 +104,12 @@ async function onRedemption(message: PubSubRedemptionMessage) { } if (msg) { + console.log(msg); + broadcast(msg); } } -const app = express(); -const wsServer = new WebSocket.Server({ - noServer: true -}); - -let sockets: Array = []; -wsServer.on("connection", (socket, req) => { - console.log(`[WS] ${req.socket.remoteAddress} New connection established`); - sockets.push(socket); - socket.send( - JSON.stringify({ - env: DEV_MODE ? "dev" : "prod" - }) - ); - - socket.on("message", async (msg: string) => { - const data = JSON.parse(msg); - - // broadcast message - if (!data.actions || data.actions.length === 0) { - sockets - .filter(s => s !== socket) - .forEach(s => s.send(msg)); - return; - } - - for (const action of data.actions) { - if (!action.scheduledAt) { - await handleClientAction(action); - } else { - scheduledActions.push(action); - scheduledActions.sort((a, b) => a.scheduledAt - b.scheduledAt); - saveScheduledActions(); - } - } - - console.log( - `[WS] Received message with ${data.actions.length} actions:`, - data - ); - }); - - socket.on("close", () => { - sockets = sockets.filter(s => s !== socket); - console.log("[WS] Connection closed"); - }); -}); - async function handleClientAction(action: any) { if (action.channel && !isNaN(action.channel)) { action.channel = await getUsernameFromId(parseInt(action.channel)); @@ -324,22 +282,4 @@ async function stealVip(msg: { } return null; -} - -/* - Webserver - */ -app.use(express.static(path.join(__dirname, "client"))); -const server = app.listen(!DEV_MODE ? 8080 : 8081, "0.0.0.0"); - -server.on("listening", () => { - console.log( - `[Webserver] Listening on port ${(server.address() as AddressInfo).port}` - ); -}); - -server.on("upgrade", (req, socket, head) => { - wsServer.handleUpgrade(req, socket, head, socket => { - wsServer.emit("connection", socket, req); - }); -}); +} \ No newline at end of file diff --git a/src/backend/helpers/util.ts b/src/backend/helpers/util.ts new file mode 100644 index 0000000..b7c721b --- /dev/null +++ b/src/backend/helpers/util.ts @@ -0,0 +1,9 @@ +const environment = process.env.NODE_ENV; +const isDevelopment = environment === "development"; +const isProduction = environment === "production"; + +export { + environment, + isDevelopment, + isProduction +}; \ No newline at end of file diff --git a/src/backend/webServer/index.ts b/src/backend/webServer/index.ts new file mode 100644 index 0000000..c70baf9 --- /dev/null +++ b/src/backend/webServer/index.ts @@ -0,0 +1,93 @@ +import { IncomingMessage, Server } from "http"; +import { handleClientAction, saveScheduledActions, scheduledActions } from "../../.."; + +import { AddressInfo } from "net"; +import { Socket } from "net"; +import WebSocket from "ws"; +import express from "express"; +import { isDevelopment } from "../helpers/util"; +import { join } from "path"; + +const app = express(); +const sockets: Array = []; + +const wsServer = new WebSocket.Server({ + noServer: true +}); + +let server: Server; + +export { + listen, + sockets +} + +wsServer.on("connection", onConnection); + +app.use(express.static(join(process.cwd(), "client"))); + +function listen() { + if (server) { + console.log("[Webserver] Server is already running"); + return; + } + + server = app.listen(!isDevelopment ? 8080 : 8081, "0.0.0.0"); + + server.on("listening", onListening); + server.on("upgrade", onUpgrade); +} + +function onListening() { + console.log( + `[Webserver] Listening on port ${(server.address() as AddressInfo).port}` + ); +} + +function onUpgrade(req: IncomingMessage, socket: Socket, head: Buffer) { + wsServer.handleUpgrade(req, socket, head, socket => { + wsServer.emit("connection", socket, req); + }); +} + +function onConnection(socket: WebSocket, req: IncomingMessage) { + console.log(`[WS] ${req.socket.remoteAddress} New connection established`); + sockets.push(socket); + socket.send( + JSON.stringify({ + env: isDevelopment ? "dev" : "prod" + }) + ); + + socket.on("message", (msg: string) => onMessage(msg, socket)); + + socket.on("close", () => onClose(socket)); +} + +async function onMessage(msg: string, socket: WebSocket) { + const data = JSON.parse(msg); + + // broadcast message + if (!data.actions || data.actions.length === 0) { + sockets.filter(s => s !== socket).forEach(s => s.send(msg)); + return; + } + + for (const action of data.actions) { + if (!action.scheduledAt) { + await handleClientAction(action); + } else { + scheduledActions.push(action); + scheduledActions.sort((a, b) => a.scheduledAt - b.scheduledAt); + saveScheduledActions(); + } + } + + console.log(`[WS] Received message with ${data.actions.length} actions:`, data); +} + +function onClose(socket: WebSocket) { + const socketIdx = sockets.indexOf(socket); + sockets.splice(socketIdx, 1); + console.log("[WS] Connection closed"); +} \ No newline at end of file