115 lines
3.7 KiB
JavaScript
115 lines
3.7 KiB
JavaScript
import { Kafka } from "kafkajs";
|
|
import { WebSocketServer } from 'ws';
|
|
import jwt from 'jsonwebtoken';
|
|
|
|
import server from './index.js';
|
|
import db from './db.js';
|
|
|
|
const startChat = async () => {
|
|
|
|
const kafka = new Kafka({
|
|
clientId: 'backend',
|
|
brokers: ['localhost:9092']
|
|
// brokers: ['kafka:9092']
|
|
});
|
|
|
|
let wsClients = [];
|
|
|
|
const producer = kafka.producer();
|
|
const consumer = kafka.consumer({ groupId: 'chat-group' });
|
|
|
|
await producer.connect();
|
|
await consumer.connect();
|
|
await consumer.subscribe({
|
|
topic: "chatMessage",
|
|
fromBeginning: true
|
|
});
|
|
|
|
await consumer.run({
|
|
eachMessage: async ({topic, partition, message}) => {
|
|
let jsonMessage
|
|
try {
|
|
jsonMessage = JSON.parse(message.value.toString())
|
|
} catch (e) {
|
|
console.log(e)
|
|
return
|
|
}
|
|
if (jsonMessage.origin == "website") return;
|
|
wsClients.forEach(client => {
|
|
console.log("sending message to a client")
|
|
client.send(JSON.stringify({
|
|
author: jsonMessage.author,
|
|
content: jsonMessage.content,
|
|
timestamp: jsonMessage.timestamp
|
|
}));
|
|
});
|
|
|
|
const datetime = Math.round(Date.now() / 1000);
|
|
|
|
await db.query(
|
|
"INSERT INTO chat_messages(author, datetime, content) VALUES ($1, to_timestamp($2)::timestamp, $3)",
|
|
[jsonMessage.author, datetime, jsonMessage.content])
|
|
.catch(e => {
|
|
console.log("Error on inserting data into the DB: ", e);
|
|
});
|
|
}
|
|
|
|
});
|
|
|
|
const wsServer = new WebSocketServer({ noServer: true });
|
|
wsServer.on('connection', socket => {
|
|
wsClients.push(socket);
|
|
socket.on('message', async (message) => {
|
|
message = JSON.parse(message.toString());
|
|
if (!message || !message.author || !message.content || !message.jwt) {
|
|
socket.send("Malformed package.");
|
|
return;
|
|
}
|
|
const token = message.jwt;
|
|
if (!jwt.verify(token, process.env.SECRET)) {
|
|
socket.send("JWT is not valid.");
|
|
return;
|
|
}
|
|
|
|
const datetime = Math.round(Date.now() / 1000);
|
|
|
|
await db.query(
|
|
"INSERT INTO chat_messages(author, datetime, content) VALUES ($1, to_timestamp($2)::timestamp, $3)",
|
|
[message.author, datetime, message.content])
|
|
.catch(e => {
|
|
console.log("Error on inserting data into the DB: ", e);
|
|
});
|
|
|
|
|
|
await producer.send({
|
|
topic: 'chatMessage',
|
|
messages: [{
|
|
value: JSON.stringify({
|
|
author: message.author,
|
|
content: message.content,
|
|
timestamp: datetime,
|
|
origin: "website"
|
|
})
|
|
}]
|
|
}).catch(e => {
|
|
console.log(e)
|
|
});
|
|
// we are not sending this message to all websockets right now, because plugin will emit
|
|
// a chat event once it'll catch this message from kafka, triggering a new message
|
|
// that will return here as if it was send from the minecraft.
|
|
});
|
|
|
|
socket.on('close', async () => {
|
|
wsClients = wsClients.filter(s => s !== socket);
|
|
await producer.disconnect();
|
|
});
|
|
});
|
|
|
|
server.on('upgrade', (request, socket, head) => {
|
|
wsServer.handleUpgrade(request, socket, head, socket => {
|
|
wsServer.emit('connection', socket, request);
|
|
})
|
|
})
|
|
}
|
|
|
|
export default startChat; |