import { Kafka } from "kafkajs";
import { WebSocketServer } from 'ws';
import jwt from 'jsonwebtoken';

import server from './index.js';
import db from './db.js';

const startChat = async () => {

    const kafka = new Kafka({
        clientId: 'backend',
        brokers: ['localhost:9092']
        // brokers: ['kafka:9092']
    });

    let wsClients = [];

    const producer = kafka.producer();
    const consumer = kafka.consumer({ groupId: 'chat-group' });

    await producer.connect();
    await consumer.connect();
    await consumer.subscribe({
        topic: "chatMessage",
        fromBeginning: true
    });

    await consumer.run({
        eachMessage: async ({topic, partition, message}) => {
            const jsonMessage = JSON.parse(message.value.toString())
            if (jsonMessage.origin == "website") return;
            console.log(wsClients.length)
            wsClients.forEach(client => {
                console.log("sending message to a client")
                client.send(JSON.stringify({
                    author: jsonMessage.author,
                    content: jsonMessage.content,
                    timestamp: jsonMessage.timestamp 
                }));
            });
            console.log(message.value.toString());
        }

    });

    const wsServer = new WebSocketServer({ noServer: true });
    wsServer.on('connection', socket => {
        console.log("Hooray! New socket connection")
        wsClients.push(socket);
        socket.on('message', async (message) => {
            message = JSON.parse(message.toString());
            if (!message || !message.author || !message.content || !message.jwt) {
                socket.send("Malformed package.");
                return;
            }
            console.log(message);
            const token = message.jwt;
            if (!jwt.verify(token, process.env.SECRET)) {
                socket.send("JWT is not valid.");
                return;
            }

            const datetime = Math.round(Date.now() / 1000);

            await db.query(
                "INSERT INTO chat_messages(author, datetime, content) VALUES ($1, to_timestamp($2)::timestamp, $3)",
                [message.author, datetime, message.content])
            .catch(e => {
                console.log("Error on inserting data into the DB: ", e);
            });


            await producer.send({
                topic: 'chatMessage',
                messages: [{
                    value: JSON.stringify({
                        author: message.author,
                        content: message.content,
                        timestamp: datetime,
                        origin: "website"
                    })
                }]
            });
            // we are not sending this message to all websockets right now, because plugin will emit
            // a chat event once it'll catch this message from kafka, triggering a new message
            // that will return here as if it was send from the minecraft.
        });

        socket.on('close', async () => {
            wsClients = wsClients.filter(s => s !== socket);
            await producer.disconnect();
        });
    });

    server.on('upgrade', (request, socket, head) => {
        wsServer.handleUpgrade(request, socket, head, socket => {
            wsServer.emit('connection', socket, request);
        })
    })
}

export default startChat;