diff --git a/db_schema.psql b/db_schema.psql index 0c9b859..a9ae2ff 100644 --- a/db_schema.psql +++ b/db_schema.psql @@ -63,3 +63,11 @@ CREATE UNIQUE INDEX IF NOT EXISTS hwids_publickey_idx ON hwids (publickey); -- Add the foreign key constraint to the users table if it doesn't exist -- ALTER TABLE public.users ADD CONSTRAINT IF NOT EXISTS users_hwids_fk FOREIGN KEY (hwidid) REFERENCES public.hwids(id); + + +CREATE TABLE IF NOT EXISTS chat_messages ( + id SERIAL PRIMARY KEY, + author VARCHAR(32) REFERENCES users(username), + datetime TIMESTAMP, + content TEXT +); \ No newline at end of file diff --git a/public/js/chat.js b/public/js/chat.js index 3296b9d..987f5fc 100644 --- a/public/js/chat.js +++ b/public/js/chat.js @@ -1,6 +1,29 @@ +function getCookie(name) { + const value = `; ${document.cookie}`; + const parts = value.split(`; ${name}=`); + if (parts.length === 2) return parts.pop().split(';').shift(); +} + $(document).ready(() => { + //prod + // const socket = new WebSocket('wss://auth.foxarmy.org'); + //dev + const socket = new WebSocket('ws://localhost:3000'); + + socket.onmessage = message => { + console.log(message) + } + const sendData = async () => { - alert(1); + const jwt = getCookie("jwt"); + const author = await (await fetch(`/api/getUsername`)).json() + const content = $("#chat-input").val(); + + const message = { + jwt, author, content + }; + + socket.send(JSON.stringify(message)); } $("#send-message-button").click(sendData); diff --git a/src/index.js b/src/index.js index 16fc699..9fb23f4 100644 --- a/src/index.js +++ b/src/index.js @@ -6,6 +6,7 @@ import path from 'path'; import ApiRouter from './routers/api.js'; import UserRouter from './routers/user.js'; +import startChat from './messages.js'; const app = express(); @@ -29,6 +30,7 @@ app.use('/', UserRouter); const server = app.listen(process.env.PORT, () => { console.log("App has been started!"); + startChat(); }); export default server; diff --git a/src/messages.js b/src/messages.js index 421fba8..02934bc 100644 --- a/src/messages.js +++ b/src/messages.js @@ -1,68 +1,102 @@ import { Kafka } from "kafkajs"; -import ws from 'ws'; -import jwt from 'jwt'; +import { WebSocketServer } from 'ws'; +import jwt from 'jsonwebtoken'; import server from './index.js'; +import db from './db.js'; -const kafka = new Kafka({ - clientId: 'backend', - brokers: ['kafka:9092'] -}); +const startChat = async () => { -const wsClients = []; + const kafka = new Kafka({ + clientId: 'backend', + brokers: ['localhost:9092'] + // brokers: ['kafka:9092'] + }); -const producer = kafka.producer(); -const consumer = kafka.consumer(); + let wsClients = []; -await producer.connect(); -await consumer.connect(); -await consumer.subscribe({ - topic: "chatMessage", - fromBeginning: true -}); + const producer = kafka.producer(); + const consumer = kafka.consumer({ groupId: 'chat-group' }); -const onMessageFromServer = async ({ topic, partition, message }) => { - wsClients.forEach(client => { - client.send({ - message - }) - }) -}; + await producer.connect(); + await consumer.connect(); + await consumer.subscribe({ + topic: "chatMessage", + fromBeginning: true + }); -await consumer.run({ - eachMessage: onMessageFromServer -}); - -const wsServer = new ws.Server({ noServer: true }); -wsServer.on('connection', socket => { - wsClients.push(socket); - socket.on('message', async (message) => { - - const token = message.jwt; - if (!jwt.verify(token, process.env.secret)) { - socket.send("JWT is not valid.") - return; + await consumer.run({ + eachMessage: async ({topic, partition, message}) => { + const jsonMessage = JSON.parse(message.value.toString()) + if (jsonMessage.origin == "website") return; + console.log(wsClients.length) + wsClients.forEach(client => { + console.log("sending message to a client") + client.send(JSON.stringify({ + author: jsonMessage.author, + content: jsonMessage.content, + timestamp: jsonMessage.timestamp + })); + }); + console.log(message.value.toString()); } - await producer.send({ - topic: 'chatMessage', - messages: [{ - author: message.author, - content: message.content, - date: message.date - }] + }); + + const wsServer = new WebSocketServer({ noServer: true }); + wsServer.on('connection', socket => { + console.log("Hooray! New socket connection") + wsClients.push(socket); + socket.on('message', async (message) => { + message = JSON.parse(message.toString()); + if (!message || !message.author || !message.content || !message.jwt) { + socket.send("Malformed package."); + return; + } + console.log(message); + const token = message.jwt; + if (!jwt.verify(token, process.env.SECRET)) { + socket.send("JWT is not valid."); + return; + } + + const datetime = Math.round(Date.now() / 1000); + + await db.query( + "INSERT INTO chat_messages(author, datetime, content) VALUES ($1, to_timestamp($2)::timestamp, $3)", + [message.author, datetime, message.content]) + .catch(e => { + console.log("Error on inserting data into the DB: ", e); + }); + + + await producer.send({ + topic: 'chatMessage', + messages: [{ + value: JSON.stringify({ + author: message.author, + content: message.content, + timestamp: datetime, + origin: "website" + }) + }] + }); + // we are not sending this message to all websockets right now, because plugin will emit + // a chat event once it'll catch this message from kafka, triggering a new message + // that will return here as if it was send from the minecraft. + }); + + socket.on('close', async () => { + wsClients = wsClients.filter(s => s !== socket); + await producer.disconnect(); }); }); - socket.on('close', async () => { - wsClients = wsClients.filter(s => s !== socket); - await producer.disconnect(); - }); -}); - -server.on('upgrade', (request, socket, head) => { - wsServer.handleUpgrade(request, socket, head, socket => { - wsServer.emit('connection', socket, request); + server.on('upgrade', (request, socket, head) => { + wsServer.handleUpgrade(request, socket, head, socket => { + wsServer.emit('connection', socket, request); + }) }) -}) +} +export default startChat; \ No newline at end of file