captcha_solver/main.py.old

159 lines
5.7 KiB
Python

from os import environ, makedirs, path, walk, listdir
from shutil import move
from dotenv import load_dotenv
from base64 import b64decode
import re
import requests
import cv2
import keras
import numpy as np
from keras.callbacks import EarlyStopping, ModelCheckpoint
load_dotenv()
# Constants
IMAGE_HEIGHT = 70
IMAGE_WIDTH = 200
DOWNLOAD_PATH = environ.get("DOWNLOAD_PATH")
TESTING_PATH = environ.get("TESTING_PATH")
TRAINING_PATH = environ.get("TRAINING_PATH")
PERCENT_OF_TESTING = int(environ.get("PERCENT_OF_TESTING"))
def prepare_dirs():
"""Create necessary directories for downloading and storing images."""
makedirs(DOWNLOAD_PATH, exist_ok=True)
makedirs(TESTING_PATH, exist_ok=True)
makedirs(TRAINING_PATH, exist_ok=True)
def fetch_captcha(id):
"""Fetch a captcha image by its ID and save it to the download path."""
try:
response = requests.get(f"{environ.get('CAPTCHA_AGGREGATOR_API')}/captcha/{id}")
response.raise_for_status()
captcha = response.json()["captcha"]
captcha_file_path = path.join(DOWNLOAD_PATH, f"{captcha['hash']}_{captcha['solution']}.jpeg")
with open(captcha_file_path, 'wb') as captcha_file:
captcha_file.write(b64decode(captcha['image']))
except requests.RequestException as e:
print(f"Error fetching captcha {id}: {e}")
def search_saved_captcha(hash, path):
"""Check if a captcha with the given hash exists in the specified path."""
regex = re.compile(f"{hash}_\\w{{6}}\\.jpeg")
for _, _, files in walk(path):
for file in files:
if regex.match(file):
return True
return False
def search_and_download_new(captchas):
"""Search for new captchas and download them if they don't already exist."""
for captcha in captchas:
id = captcha["id"]
hash = captcha["hash"]
if not (search_saved_captcha(hash, TRAINING_PATH) or
search_saved_captcha(hash, TESTING_PATH) or
search_saved_captcha(hash, DOWNLOAD_PATH)):
fetch_captcha(id)
def sort_datasets():
"""Sort downloaded captchas into training and testing datasets."""
amount_of_new_data = len([file for file in listdir(DOWNLOAD_PATH) if path.isfile(path.join(DOWNLOAD_PATH, file))])
amount_to_send_to_test = round(amount_of_new_data * (PERCENT_OF_TESTING / 100))
files = listdir(DOWNLOAD_PATH)
for index, file in enumerate(files):
if index < amount_to_send_to_test:
move(path.join(DOWNLOAD_PATH, file), TESTING_PATH)
else:
move(path.join(DOWNLOAD_PATH, file), TRAINING_PATH)
def download_dataset():
"""Download the dataset of captchas and sort them into training and testing sets."""
prepare_dirs()
try:
response = requests.get(f"{environ.get('CAPTCHA_AGGREGATOR_API')}/captcha/all")
response.raise_for_status()
captchas = response.json()["captchas"]
search_and_download_new(captchas)
sort_datasets()
except requests.RequestException as e:
print(f"Error downloading dataset: {e}")
def load_dataset(dataset_path):
"""Load images and their corresponding solutions from the specified dataset path."""
images = []
solutions = []
for filename in listdir(dataset_path):
img = cv2.imread(path.join(dataset_path, filename))
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
img = img / 255.0
images.append(img)
solution = path.splitext(filename)[0].split('_')[1]
solutions.append(solution)
unique_solutions = sorted(set(solutions))
solution_to_label = {solution: i for i, solution in enumerate(unique_solutions)}
labels = [solution_to_label[solution] for solution in solutions]
return np.array(images), np.array(labels), unique_solutions
def load_training_dataset():
"""Load the training dataset."""
return load_dataset(TRAINING_PATH)
def load_testing_dataset():
"""Load the testing dataset."""
return load_dataset(TESTING_PATH)
def train_nn():
"""Train the neural network on the training dataset."""
training_images, training_labels, unique_solutions = load_training_dataset()
testing_images, testing_labels = (None, None)
if PERCENT_OF_TESTING > 0:
testing_images, testing_labels, _ = load_testing_dataset()
model = keras.Sequential([
keras.layers.Conv2D(128, (3, 3), activation='relu', input_shape=(IMAGE_HEIGHT, IMAGE_WIDTH, 1)),
keras.layers.MaxPooling2D((2, 2)),
keras.layers.Conv2D(256, (3, 3), activation='relu'),
keras.layers.MaxPooling2D((2, 2)),
keras.layers.Conv2D(256, (3, 3), activation='relu'),
keras.layers.Flatten(),
keras.layers.Dense(128, activation='relu'),
keras.layers.Dropout(0.5), # Dropout for regularization
keras.layers.Dense(len(unique_solutions), activation='softmax') # Output layer
])
model.summary()
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
callbacks = [
EarlyStopping(monitor='accuracy', patience=3),
ModelCheckpoint('best_model.keras', save_best_only=True)
]
EPOCHS = 100
BATCH_SIZE = 8
if PERCENT_OF_TESTING > 0:
model.fit(np.array(training_images), np.array(training_labels),
epochs=EPOCHS, batch_size=BATCH_SIZE, callbacks=callbacks,
validation_data=(np.array(testing_images), np.array(testing_labels)),
)
else:
model.fit(np.array(training_images), np.array(training_labels),
epochs=EPOCHS, batch_size=BATCH_SIZE, callbacks=callbacks
)
keras.saving.save_model(model, 'captcha_solver.keras')
if __name__ == "__main__":
download_dataset()
train_nn()