from os import environ, makedirs, path, walk, listdir from shutil import move from dotenv import load_dotenv from base64 import b64decode import re import requests import tf2onnx import cv2 import keras import numpy as np load_dotenv() DOWNLOAD_PATH=environ.get("DOWNLOAD_PATH") TESTING_PATH=environ.get("TESTING_PATH") TRAINING_PATH=environ.get("TRAINING_PATH") def prepare_dirs(): makedirs(DOWNLOAD_PATH, exist_ok=True) makedirs(TESTING_PATH, exist_ok=True) makedirs(TRAINING_PATH, exist_ok=True) def fetch_captcha(id): # print(f"Fetching captcha with id {id}") captcha = requests.get(f"{environ.get('CAPTCHA_AGGREGATOR_API')}/captcha/{id}").json()["captcha"] with open(f"{DOWNLOAD_PATH}/{captcha['hash']}_{captcha['solution']}.jpeg", 'wb') as captcha_file: captcha_file.write(b64decode(captcha['image'])) def search_saved_captcha(hash, path): # print(f"searching captcha with hash {hash} in {path}") regex = re.compile(hash + '_\\w{6}\\.jpeg') for _, _, files in walk(path): for file in files: if regex.match(file): return True return False def search_and_download_new(captchas): # print(f"Searching and downloading new captchas") for captcha in captchas: id = captcha["id"] hash = captcha["hash"] training_exists = search_saved_captcha(hash, TRAINING_PATH) testing_exists = search_saved_captcha(hash, TESTING_PATH) new_exists = search_saved_captcha(hash, DOWNLOAD_PATH) if not training_exists and not testing_exists and not new_exists: fetch_captcha(id) def sort_datasets(): # print(f"Sorting datasets") percent_of_testing = int(environ.get("PERCENT_OF_TESTING")) amount_of_new_data = len([file for file in listdir(DOWNLOAD_PATH) if path.isfile(f'{DOWNLOAD_PATH}/{file}')]) amount_to_send_to_test = round(amount_of_new_data * (percent_of_testing / 100)) for _, _, files in walk(DOWNLOAD_PATH): for index, file in enumerate(files): if index < amount_to_send_to_test: move(f"{DOWNLOAD_PATH}/{file}", TESTING_PATH) else: move(f"{DOWNLOAD_PATH}/{file}", TRAINING_PATH) def download_dataset(): prepare_dirs() captchas = requests.get(f"{environ.get('CAPTCHA_AGGREGATOR_API')}/captcha/all").json()["captchas"] search_and_download_new(captchas) sort_datasets() def load_dataset(dataset_path): images = [] solutions = [] for filename in listdir(dataset_path): img = cv2.imread(f"{dataset_path}/{filename}") img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) img = img / 255.0 images.append(img) solution = path.splitext(filename)[0].split('_')[1] solutions.append(solution) unique_solutions = sorted(set(solutions)) solution_to_label = {solution: i for i, solution in enumerate(unique_solutions)} labels = [solution_to_label[solution] for solution in solutions] return images, labels, unique_solutions def load_training_dataset(): return load_dataset(TRAINING_PATH) def load_testing_dataset(): return load_dataset(TESTING_PATH) def train_nn(): training_images, training_labels, unique_solutions = load_training_dataset() if int(environ.get("PERCENT_OF_TESTING")) > 0: testing_images, testing_labels, _ = load_testing_dataset() model = keras.Sequential([ keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(70, 200, 1)), keras.layers.MaxPooling2D((2, 2)), keras.layers.Conv2D(64, (3, 3), activation='relu'), keras.layers.MaxPooling2D((2, 2)), keras.layers.Conv2D(64, (3, 3), activation='relu'), keras.layers.Flatten(), keras.layers.Dense(64, activation='relu'), keras.layers.Dense(len(unique_solutions), activation='softmax') ]) model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) if int(environ.get("PERCENT_OF_TESTING")) > 0: model.fit(np.array(training_images), np.array(training_labels), epochs=10, batch_size=128, validation_data=(np.array(testing_images), np.array(testing_labels))) else: model.fit(np.array(training_images), np.array(training_labels), epochs=10, batch_size=128) keras.saving.save_model(model, 'captcha_solver.keras') # model.save('model.h5') # tf2onnx.convert.from_keras(model, opset=13, output_path='model_onnx') if __name__ == "__main__": download_dataset() train_nn()