From 8183e8ceed2a786f2173d0086a571be817232af0 Mon Sep 17 00:00:00 2001 From: leca Date: Sat, 10 May 2025 13:41:22 +0300 Subject: [PATCH] experiments --- main.py | 123 +++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 78 insertions(+), 45 deletions(-) diff --git a/main.py b/main.py index 1609a12..3d9d136 100644 --- a/main.py +++ b/main.py @@ -4,33 +4,42 @@ from dotenv import load_dotenv from base64 import b64decode import re import requests -import tf2onnx import cv2 import keras import numpy as np +from keras.callbacks import EarlyStopping, ModelCheckpoint load_dotenv() -DOWNLOAD_PATH=environ.get("DOWNLOAD_PATH") -TESTING_PATH=environ.get("TESTING_PATH") -TRAINING_PATH=environ.get("TRAINING_PATH") +# Constants +IMAGE_HEIGHT = 70 +IMAGE_WIDTH = 200 +DOWNLOAD_PATH = environ.get("DOWNLOAD_PATH") +TESTING_PATH = environ.get("TESTING_PATH") +TRAINING_PATH = environ.get("TRAINING_PATH") +PERCENT_OF_TESTING = int(environ.get("PERCENT_OF_TESTING")) def prepare_dirs(): + """Create necessary directories for downloading and storing images.""" makedirs(DOWNLOAD_PATH, exist_ok=True) makedirs(TESTING_PATH, exist_ok=True) makedirs(TRAINING_PATH, exist_ok=True) def fetch_captcha(id): - # print(f"Fetching captcha with id {id}") - captcha = requests.get(f"{environ.get('CAPTCHA_AGGREGATOR_API')}/captcha/{id}").json()["captcha"] - - with open(f"{DOWNLOAD_PATH}/{captcha['hash']}_{captcha['solution']}.jpeg", 'wb') as captcha_file: - captcha_file.write(b64decode(captcha['image'])) + """Fetch a captcha image by its ID and save it to the download path.""" + try: + response = requests.get(f"{environ.get('CAPTCHA_AGGREGATOR_API')}/captcha/{id}") + response.raise_for_status() + captcha = response.json()["captcha"] + captcha_file_path = path.join(DOWNLOAD_PATH, f"{captcha['hash']}_{captcha['solution']}.jpeg") + with open(captcha_file_path, 'wb') as captcha_file: + captcha_file.write(b64decode(captcha['image'])) + except requests.RequestException as e: + print(f"Error fetching captcha {id}: {e}") def search_saved_captcha(hash, path): - # print(f"searching captcha with hash {hash} in {path}") - regex = re.compile(hash + '_\\w{6}\\.jpeg') - + """Check if a captcha with the given hash exists in the specified path.""" + regex = re.compile(f"{hash}_\\w{{6}}\\.jpeg") for _, _, files in walk(path): for file in files: if regex.match(file): @@ -38,41 +47,45 @@ def search_saved_captcha(hash, path): return False def search_and_download_new(captchas): - # print(f"Searching and downloading new captchas") + """Search for new captchas and download them if they don't already exist.""" for captcha in captchas: id = captcha["id"] hash = captcha["hash"] - training_exists = search_saved_captcha(hash, TRAINING_PATH) - testing_exists = search_saved_captcha(hash, TESTING_PATH) - new_exists = search_saved_captcha(hash, DOWNLOAD_PATH) - if not training_exists and not testing_exists and not new_exists: + if not (search_saved_captcha(hash, TRAINING_PATH) or + search_saved_captcha(hash, TESTING_PATH) or + search_saved_captcha(hash, DOWNLOAD_PATH)): fetch_captcha(id) def sort_datasets(): - # print(f"Sorting datasets") - percent_of_testing = int(environ.get("PERCENT_OF_TESTING")) - amount_of_new_data = len([file for file in listdir(DOWNLOAD_PATH) if path.isfile(f'{DOWNLOAD_PATH}/{file}')]) - amount_to_send_to_test = round(amount_of_new_data * (percent_of_testing / 100)) - for _, _, files in walk(DOWNLOAD_PATH): - for index, file in enumerate(files): - if index < amount_to_send_to_test: - move(f"{DOWNLOAD_PATH}/{file}", TESTING_PATH) - else: - move(f"{DOWNLOAD_PATH}/{file}", TRAINING_PATH) + """Sort downloaded captchas into training and testing datasets.""" + amount_of_new_data = len([file for file in listdir(DOWNLOAD_PATH) if path.isfile(path.join(DOWNLOAD_PATH, file))]) + amount_to_send_to_test = round(amount_of_new_data * (PERCENT_OF_TESTING / 100)) + + files = listdir(DOWNLOAD_PATH) + for index, file in enumerate(files): + if index < amount_to_send_to_test: + move(path.join(DOWNLOAD_PATH, file), TESTING_PATH) + else: + move(path.join(DOWNLOAD_PATH, file), TRAINING_PATH) def download_dataset(): + """Download the dataset of captchas and sort them into training and testing sets.""" prepare_dirs() - - captchas = requests.get(f"{environ.get('CAPTCHA_AGGREGATOR_API')}/captcha/all").json()["captchas"] - - search_and_download_new(captchas) - sort_datasets() + try: + response = requests.get(f"{environ.get('CAPTCHA_AGGREGATOR_API')}/captcha/all") + response.raise_for_status() + captchas = response.json()["captchas"] + search_and_download_new(captchas) + sort_datasets() + except requests.RequestException as e: + print(f"Error downloading dataset: {e}") def load_dataset(dataset_path): + """Load images and their corresponding solutions from the specified dataset path.""" images = [] solutions = [] for filename in listdir(dataset_path): - img = cv2.imread(f"{dataset_path}/{filename}") + img = cv2.imread(path.join(dataset_path, filename)) img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) img = img / 255.0 images.append(img) @@ -83,39 +96,59 @@ def load_dataset(dataset_path): solution_to_label = {solution: i for i, solution in enumerate(unique_solutions)} labels = [solution_to_label[solution] for solution in solutions] - return images, labels, unique_solutions + return np.array(images), np.array(labels), unique_solutions def load_training_dataset(): + """Load the training dataset.""" return load_dataset(TRAINING_PATH) def load_testing_dataset(): + """Load the testing dataset.""" return load_dataset(TESTING_PATH) + def train_nn(): + """Train the neural network on the training dataset.""" training_images, training_labels, unique_solutions = load_training_dataset() - if int(environ.get("PERCENT_OF_TESTING")) > 0: + testing_images, testing_labels = (None, None) + + if PERCENT_OF_TESTING > 0: testing_images, testing_labels, _ = load_testing_dataset() model = keras.Sequential([ - keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(70, 200, 1)), + keras.layers.Conv2D(128, (3, 3), activation='relu', input_shape=(IMAGE_HEIGHT, IMAGE_WIDTH, 1)), keras.layers.MaxPooling2D((2, 2)), - keras.layers.Conv2D(64, (3, 3), activation='relu'), + keras.layers.Conv2D(256, (3, 3), activation='relu'), keras.layers.MaxPooling2D((2, 2)), - keras.layers.Conv2D(64, (3, 3), activation='relu'), + keras.layers.Conv2D(256, (3, 3), activation='relu'), keras.layers.Flatten(), - keras.layers.Dense(64, activation='relu'), - keras.layers.Dense(len(unique_solutions), activation='softmax') + keras.layers.Dense(128, activation='relu'), + keras.layers.Dropout(0.5), # Dropout for regularization + keras.layers.Dense(len(unique_solutions), activation='softmax') # Output layer ]) + model.summary() model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) - if int(environ.get("PERCENT_OF_TESTING")) > 0: - model.fit(np.array(training_images), np.array(training_labels), epochs=10, batch_size=128, validation_data=(np.array(testing_images), np.array(testing_labels))) + + callbacks = [ + EarlyStopping(monitor='accuracy', patience=3), + ModelCheckpoint('best_model.keras', save_best_only=True) + ] + + EPOCHS = 100 + BATCH_SIZE = 8 + + if PERCENT_OF_TESTING > 0: + model.fit(np.array(training_images), np.array(training_labels), + epochs=EPOCHS, batch_size=BATCH_SIZE, callbacks=callbacks, + validation_data=(np.array(testing_images), np.array(testing_labels)), + ) else: - model.fit(np.array(training_images), np.array(training_labels), epochs=10, batch_size=128) + model.fit(np.array(training_images), np.array(training_labels), + epochs=EPOCHS, batch_size=BATCH_SIZE, callbacks=callbacks + ) keras.saving.save_model(model, 'captcha_solver.keras') - # model.save('model.h5') - # tf2onnx.convert.from_keras(model, opset=13, output_path='model_onnx') if __name__ == "__main__":