import { Kafka } from "kafkajs"; import ws from 'ws'; import jwt from 'jwt'; import server from './index.js'; const kafka = new Kafka({ clientId: 'backend', brokers: ['kafka:9092'] }); const wsClients = []; const producer = kafka.producer(); const consumer = kafka.consumer(); await producer.connect(); await consumer.connect(); await consumer.subscribe({ topic: "chatMessage", fromBeginning: true }); const onMessageFromServer = async ({ topic, partition, message }) => { wsClients.forEach(client => { client.send({ message }) }) }; await consumer.run({ eachMessage: onMessageFromServer }); const wsServer = new ws.Server({ noServer: true }); wsServer.on('connection', socket => { wsClients.push(socket); socket.on('message', async (message) => { const token = message.jwt; if (!jwt.verify(token, process.env.secret)) { socket.send("JWT is not valid.") return; } await producer.send({ topic: 'chatMessage', messages: [{ author: message.author, content: message.content, date: message.date }] }); }); socket.on('close', async () => { wsClients = wsClients.filter(s => s !== socket); await producer.disconnect(); }); }); server.on('upgrade', (request, socket, head) => { wsServer.handleUpgrade(request, socket, head, socket => { wsServer.emit('connection', socket, request); }) })