import { Kafka } from "kafkajs"; import { WebSocketServer } from 'ws'; import jwt from 'jsonwebtoken'; import server from './index.js'; import db from './db.js'; const startChat = async () => { const kafka = new Kafka({ clientId: 'backend', brokers: ['localhost:9092'] // brokers: ['kafka:9092'] }); let wsClients = []; const producer = kafka.producer(); const consumer = kafka.consumer({ groupId: 'chat-group' }); await producer.connect(); await consumer.connect(); await consumer.subscribe({ topic: "chatMessage", fromBeginning: true }); await consumer.run({ eachMessage: async ({topic, partition, message}) => { let jsonMessage try { jsonMessage = JSON.parse(message.value.toString()) } catch (e) { console.log(e) return } if (jsonMessage.origin == "website") return; wsClients.forEach(client => { console.log("sending message to a client") client.send(JSON.stringify({ author: jsonMessage.author, content: jsonMessage.content, timestamp: jsonMessage.timestamp })); }); const datetime = Math.round(Date.now() / 1000); await db.query( "INSERT INTO chat_messages(author, datetime, content) VALUES ($1, to_timestamp($2)::timestamp, $3)", [jsonMessage.author, datetime, jsonMessage.content]) .catch(e => { console.log("Error on inserting data into the DB: ", e); }); } }); const wsServer = new WebSocketServer({ noServer: true }); wsServer.on('connection', socket => { wsClients.push(socket); socket.on('message', async (message) => { message = JSON.parse(message.toString()); if (!message || !message.author || !message.content || !message.jwt) { socket.send("Malformed package."); return; } const token = message.jwt; if (!jwt.verify(token, process.env.SECRET)) { socket.send("JWT is not valid."); return; } const datetime = Math.round(Date.now() / 1000); await db.query( "INSERT INTO chat_messages(author, datetime, content) VALUES ($1, to_timestamp($2)::timestamp, $3)", [message.author, datetime, message.content]) .catch(e => { console.log("Error on inserting data into the DB: ", e); }); await producer.send({ topic: 'chatMessage', messages: [{ value: JSON.stringify({ author: message.author, content: message.content, timestamp: datetime, origin: "website" }) }] }); // we are not sending this message to all websockets right now, because plugin will emit // a chat event once it'll catch this message from kafka, triggering a new message // that will return here as if it was send from the minecraft. }); socket.on('close', async () => { wsClients = wsClients.filter(s => s !== socket); await producer.disconnect(); }); }); server.on('upgrade', (request, socket, head) => { wsServer.handleUpgrade(request, socket, head, socket => { wsServer.emit('connection', socket, request); }) }) } export default startChat;