""" Title: OCR model for reading Captchas Author: [A_K_Nain](https://twitter.com/A_K_Nain) Date created: 2020/06/14 Last modified: 2024/03/13 Description: How to implement an OCR model using CNNs, RNNs and CTC loss. Accelerator: GPU Converted to Keras 3 by: [Sitam Meur](https://github.com/sitamgithub-MSIT) """ """ ## Introduction This example demonstrates a simple OCR model built with the Functional API. Apart from combining CNN and RNN, it also illustrates how you can instantiate a new layer and use it as an "Endpoint layer" for implementing CTC loss. For a detailed guide to layer subclassing, please check out [this page](https://keras.io/guides/making_new_layers_and_models_via_subclassing/) in the developer guides. """ """ ## Setup """ import requests import re from os import makedirs, walk, environ, path, listdir from dotenv import load_dotenv load_dotenv() # Constants IMAGE_HEIGHT = 70 IMAGE_WIDTH = 200 DOWNLOAD_PATH = environ.get("DOWNLOAD_PATH") TESTING_PATH = environ.get("TESTING_PATH") TRAINING_PATH = environ.get("TRAINING_PATH") PERCENT_OF_TESTING = int(environ.get("PERCENT_OF_TESTING")) environ["KERAS_BACKEND"] = "tensorflow" import numpy as np import matplotlib.pyplot as plt from pathlib import Path import tensorflow as tf import keras from keras import ops from keras import layers def prepare_dirs(): """Create necessary directories for downloading and storing images.""" makedirs(DOWNLOAD_PATH, exist_ok=True) makedirs(TESTING_PATH, exist_ok=True) makedirs(TRAINING_PATH, exist_ok=True) def fetch_captcha(id): """Fetch a captcha image by its ID and save it to the download path.""" try: response = requests.get(f"{environ.get('CAPTCHA_AGGREGATOR_API')}/captcha/{id}") response.raise_for_status() captcha = response.json()["captcha"] captcha_file_path = path.join(DOWNLOAD_PATH, f"{captcha['hash']}_{captcha['solution']}.jpeg") with open(captcha_file_path, 'wb') as captcha_file: captcha_file.write(b64decode(captcha['image'])) except requests.RequestException as e: print(f"Error fetching captcha {id}: {e}") def search_saved_captcha(hash, path): """Check if a captcha with the given hash exists in the specified path.""" regex = re.compile(f"{hash}_\\w{{6}}\\.jpeg") for _, _, files in walk(path): for file in files: if regex.match(file): return True return False def search_and_download_new(captchas): """Search for new captchas and download them if they don't already exist.""" for captcha in captchas: id = captcha["id"] hash = captcha["hash"] if not (search_saved_captcha(hash, TRAINING_PATH) or search_saved_captcha(hash, TESTING_PATH) or search_saved_captcha(hash, DOWNLOAD_PATH)): fetch_captcha(id) def sort_datasets(): """Sort downloaded captchas into training and testing datasets.""" amount_of_new_data = len([file for file in listdir(DOWNLOAD_PATH) if path.isfile(path.join(DOWNLOAD_PATH, file))]) amount_to_send_to_test = round(amount_of_new_data * (PERCENT_OF_TESTING / 100)) files = listdir(DOWNLOAD_PATH) for index, file in enumerate(files): if index < amount_to_send_to_test: move(path.join(DOWNLOAD_PATH, file), TESTING_PATH) else: move(path.join(DOWNLOAD_PATH, file), TRAINING_PATH) def download_dataset(): """Download the dataset of captchas and sort them into training and testing sets.""" prepare_dirs() try: response = requests.get(f"{environ.get('CAPTCHA_AGGREGATOR_API')}/captcha/all") response.raise_for_status() captchas = response.json()["captchas"] search_and_download_new(captchas) sort_datasets() except requests.RequestException as e: print(f"Error downloading dataset: {e}") download_dataset() # Path to the data directory data_dir = Path("./datasets/training") # Get list of all the images images = sorted(list(map(str, list(data_dir.glob("*.jpeg"))))) labels = [img.split(path.sep)[-1].split(".jpeg")[0].split("_")[1].upper() for img in images] characters = set(char for label in labels for char in label) characters = sorted(list(characters)) print("Number of images found: ", len(images)) print("Number of labels found: ", len(labels)) print("Number of unique characters: ", len(characters)) print("Characters present: ", characters) # Batch size for training and validation batch_size = 16 # Desired image dimensions img_width = 200 img_height = 70 # Factor by which the image is going to be downsampled # by the convolutional blocks. We will be using two # convolution blocks and each block will have # a pooling layer which downsample the features by a factor of 2. # Hence total downsampling factor would be 4. downsample_factor = 4 # Maximum length of any captcha in the dataset # print([len(label) for label in labels]) max_length = max([len(label) for label in labels]) """ ## Preprocessing """ # Mapping characters to integers char_to_num = layers.StringLookup(vocabulary=list(characters), mask_token=None) # Mapping integers back to original characters num_to_char = layers.StringLookup( vocabulary=char_to_num.get_vocabulary(), mask_token=None, invert=True ) def split_data(images, labels, train_size=0.9, shuffle=True): # 1. Get the total size of the dataset size = len(images) # 2. Make an indices array and shuffle it, if required indices = ops.arange(size) if shuffle: indices = keras.random.shuffle(indices) # 3. Get the size of training samples train_samples = int(size * train_size) # 4. Split data into training and validation sets x_train, y_train = images[indices[:train_samples]], labels[indices[:train_samples]] x_valid, y_valid = images[indices[train_samples:]], labels[indices[train_samples:]] return x_train, x_valid, y_train, y_valid # Splitting data into training and validation sets x_train, x_valid, y_train, y_valid = split_data(np.array(images), np.array(labels)) def encode_single_sample(img_path, label): # 1. Read image img = tf.io.read_file(img_path) # 2. Decode and convert to grayscale img = tf.io.decode_jpeg(img, channels=1) # 3. Convert to float32 in [0, 1] range img = tf.image.convert_image_dtype(img, tf.float32) # 4. Resize to the desired size img = ops.image.resize(img, [img_height, img_width]) # 5. Transpose the image because we want the time # dimension to correspond to the width of the image. img = ops.transpose(img, axes=[1, 0, 2]) # 6. Map the characters in label to numbers label = char_to_num(tf.strings.unicode_split(label, input_encoding="UTF-8")) # 7. Return a dict as our model is expecting two inputs return {"image": img, "label": label} """ ## Create `Dataset` objects """ train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) train_dataset = ( train_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE) .batch(batch_size) .prefetch(buffer_size=tf.data.AUTOTUNE) ) validation_dataset = tf.data.Dataset.from_tensor_slices((x_valid, y_valid)) validation_dataset = ( validation_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE) .batch(batch_size) .prefetch(buffer_size=tf.data.AUTOTUNE) ) """ ## Visualize the data """ _, ax = plt.subplots(4, 4, figsize=(10, 5)) for batch in train_dataset.take(1): images = batch["image"] labels = batch["label"] for i in range(16): img = (images[i] * 255).numpy().astype("uint8") label = tf.strings.reduce_join(num_to_char(labels[i])).numpy().decode("utf-8") ax[i // 4, i % 4].imshow(img[:, :, 0].T, cmap="gray") ax[i // 4, i % 4].set_title(label) ax[i // 4, i % 4].axis("off") plt.show() """ ## Model """ def ctc_batch_cost(y_true, y_pred, input_length, label_length): label_length = ops.cast(ops.squeeze(label_length, axis=-1), dtype="int32") input_length = ops.cast(ops.squeeze(input_length, axis=-1), dtype="int32") sparse_labels = ops.cast( ctc_label_dense_to_sparse(y_true, label_length), dtype="int32" ) y_pred = ops.log(ops.transpose(y_pred, axes=[1, 0, 2]) + keras.backend.epsilon()) return ops.expand_dims( tf.compat.v1.nn.ctc_loss( inputs=y_pred, labels=sparse_labels, sequence_length=input_length ), 1, ) def ctc_label_dense_to_sparse(labels, label_lengths): label_shape = ops.shape(labels) num_batches_tns = ops.stack([label_shape[0]]) max_num_labels_tns = ops.stack([label_shape[1]]) def range_less_than(old_input, current_input): return ops.expand_dims(ops.arange(ops.shape(old_input)[1]), 0) < tf.fill( max_num_labels_tns, current_input ) init = ops.cast(tf.fill([1, label_shape[1]], 0), dtype="bool") dense_mask = tf.compat.v1.scan( range_less_than, label_lengths, initializer=init, parallel_iterations=1 ) dense_mask = dense_mask[:, 0, :] label_array = ops.reshape( ops.tile(ops.arange(0, label_shape[1]), num_batches_tns), label_shape ) label_ind = tf.compat.v1.boolean_mask(label_array, dense_mask) batch_array = ops.transpose( ops.reshape( ops.tile(ops.arange(0, label_shape[0]), max_num_labels_tns), tf.reverse(label_shape, [0]), ) ) batch_ind = tf.compat.v1.boolean_mask(batch_array, dense_mask) indices = ops.transpose( ops.reshape(ops.concatenate([batch_ind, label_ind], axis=0), [2, -1]) ) vals_sparse = tf.compat.v1.gather_nd(labels, indices) return tf.SparseTensor( ops.cast(indices, dtype="int64"), vals_sparse, ops.cast(label_shape, dtype="int64"), ) class CTCLayer(layers.Layer): def __init__(self, name=None): super().__init__(name=name) self.loss_fn = ctc_batch_cost def call(self, y_true, y_pred): # Compute the training-time loss value and add it # to the layer using `self.add_loss()`. batch_len = ops.cast(ops.shape(y_true)[0], dtype="int64") input_length = ops.cast(ops.shape(y_pred)[1], dtype="int64") label_length = ops.cast(ops.shape(y_true)[1], dtype="int64") input_length = input_length * ops.ones(shape=(batch_len, 1), dtype="int64") label_length = label_length * ops.ones(shape=(batch_len, 1), dtype="int64") loss = self.loss_fn(y_true, y_pred, input_length, label_length) self.add_loss(loss) # At test time, just return the computed predictions return y_pred def build_model(): # Inputs to the model input_img = layers.Input( shape=(img_width, img_height, 1), name="image", dtype="float32" ) labels = layers.Input(name="label", shape=(None,), dtype="float32") # First conv block x = layers.Conv2D( 32, (3, 3), activation="relu", kernel_initializer="he_normal", padding="same", name="Conv1", )(input_img) x = layers.MaxPooling2D((2, 2), name="pool1")(x) # Second conv block x = layers.Conv2D( 64, (3, 3), activation="relu", kernel_initializer="he_normal", padding="same", name="Conv2", )(x) x = layers.MaxPooling2D((2, 2), name="pool2")(x) # We have used two max pool with pool size and strides 2. # Hence, downsampled feature maps are 4x smaller. The number of # filters in the last layer is 64. Reshape accordingly before # passing the output to the RNN part of the model new_shape = ((img_width // 4), (img_height // 4) * 64) x = layers.Reshape(target_shape=new_shape, name="reshape")(x) x = layers.Dense(64, activation="relu", name="dense1")(x) x = layers.Dropout(0.2)(x) # RNNs x = layers.Bidirectional(layers.LSTM(128, return_sequences=True, dropout=0.25))(x) x = layers.Bidirectional(layers.LSTM(64, return_sequences=True, dropout=0.25))(x) # Output layer x = layers.Dense( len(char_to_num.get_vocabulary()) + 1, activation="softmax", name="dense2" )(x) # Add CTC layer for calculating CTC loss at each step output = CTCLayer(name="ctc_loss")(labels, x) # Define the model model = keras.models.Model( inputs=[input_img, labels], outputs=output, name="captcha_solver" ) # Optimizer opt = keras.optimizers.Adam() # Compile the model and return model.compile(optimizer=opt) return model # Get the model model = build_model() model.summary() """ ## Training """ # TODO restore epoch count. epochs = 100 early_stopping_patience = 10 # Add early stopping early_stopping = keras.callbacks.EarlyStopping( monitor="val_loss", patience=early_stopping_patience, restore_best_weights=True ) # Train the model history = model.fit( train_dataset, validation_data=validation_dataset, epochs=epochs, callbacks=[early_stopping], ) """ ## Inference You can use the trained model hosted on [Hugging Face Hub](https://huggingface.co/keras-io/ocr-for-captcha) and try the demo on [Hugging Face Spaces](https://huggingface.co/spaces/keras-io/ocr-for-captcha). """ def ctc_decode(y_pred, input_length, greedy=True, beam_width=100, top_paths=1): input_shape = ops.shape(y_pred) num_samples, num_steps = input_shape[0], input_shape[1] y_pred = ops.log(ops.transpose(y_pred, axes=[1, 0, 2]) + keras.backend.epsilon()) input_length = ops.cast(input_length, dtype="int32") if greedy: (decoded, log_prob) = tf.nn.ctc_greedy_decoder( inputs=y_pred, sequence_length=input_length ) else: (decoded, log_prob) = tf.compat.v1.nn.ctc_beam_search_decoder( inputs=y_pred, sequence_length=input_length, beam_width=beam_width, top_paths=top_paths, ) decoded_dense = [] for st in decoded: st = tf.SparseTensor(st.indices, st.values, (num_samples, num_steps)) decoded_dense.append(tf.sparse.to_dense(sp_input=st, default_value=-1)) return (decoded_dense, log_prob) # Get the prediction model by extracting layers till the output layer prediction_model = keras.models.Model( model.input[0], model.get_layer(name="dense2").output ) prediction_model.summary() # A utility function to decode the output of the network def decode_batch_predictions(pred): input_len = np.ones(pred.shape[0]) * pred.shape[1] # Use greedy search. For complex tasks, you can use beam search results = ctc_decode(pred, input_length=input_len, greedy=True)[0][0][ :, :max_length ] # Iterate over the results and get back the text output_text = [] for res in results: res = tf.strings.reduce_join(num_to_char(res)).numpy().decode("utf-8") output_text.append(res) return output_text # Let's check results on some validation samples for batch in validation_dataset.take(1): batch_images = batch["image"] batch_labels = batch["label"] preds = prediction_model.predict(batch_images) pred_texts = decode_batch_predictions(preds) orig_texts = [] for label in batch_labels: label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8") orig_texts.append(label) _, ax = plt.subplots(4, 4, figsize=(15, 5)) for i in range(len(pred_texts)): img = (batch_images[i, :, :, 0] * 255).numpy().astype(np.uint8) img = img.T title = f"Prediction: {pred_texts[i]}" ax[i // 4, i % 4].imshow(img, cmap="gray") ax[i // 4, i % 4].set_title(title) ax[i // 4, i % 4].axis("off") plt.show()