Skip to content

Index

omero_annotate_ai.processing #

Image and file processing functionality.

generate_patch_coordinates(image_shape: Tuple[int, int], patch_size: List[int], n_patches: int, random_patch: bool = True) -> Tuple[List[Tuple[int, int]], Tuple[int, int]] #

Generate non-overlapping patch coordinates for an image.

CRUCIAL: Ensures patches do not overlap when generating multiple patches.

Parameters:

Name Type Description Default
image_shape Tuple[int, int]

(height, width) of the image

required
patch_size List[int]

(height, width) of patches

required
n_patches int

Number of patches to generate

required
random_patch bool

Whether to generate random patches or grid-based patches

True

Returns:

Type Description
List[Tuple[int, int]]

Tuple containing:

Tuple[int, int]
  • List of (x, y) coordinates for patch top-left corners (non-overlapping)
Tuple[List[Tuple[int, int]], Tuple[int, int]]
  • Actual patch size (height, width) to use (adjusted if image smaller than patch)
Source code in src/omero_annotate_ai/processing/image_functions.py
def generate_patch_coordinates(
    image_shape: Tuple[int, int],
    patch_size: List[int],
    n_patches: int,
    random_patch: bool = True,
) -> Tuple[List[Tuple[int, int]], Tuple[int, int]]:
    """Generate non-overlapping patch coordinates for an image.

    CRUCIAL: Ensures patches do not overlap when generating multiple patches.

    Args:
        image_shape: (height, width) of the image
        patch_size: (height, width) of patches
        n_patches: Number of patches to generate
        random_patch: Whether to generate random patches or grid-based patches

    Returns:
        Tuple containing:
        - List of (x, y) coordinates for patch top-left corners (non-overlapping)
        - Actual patch size (height, width) to use (adjusted if image smaller than patch)
    """
    height, width = image_shape
    patch_h, patch_w = patch_size

    # Check if image is smaller than patch
    if width < patch_w or height < patch_h:
        # Image smaller than patch, return image size as patch size
        print("⚠️ Image smaller than patch size, using full image")
        actual_patch_size = (height, width)
        return [(0, 0)], actual_patch_size

    # Image is large enough for requested patch size
    actual_patch_size = (patch_h, patch_w)

    # Ensure patches fit within image
    max_x = max(0, width - patch_w)
    max_y = max(0, height - patch_h)

    coordinates = []

    if random_patch:
        # Generate random non-overlapping coordinates
        used_areas = []  # Track used rectangular areas
        max_attempts = n_patches * 20  # Limit attempts to avoid infinite loops
        attempts = 0

        while len(coordinates) < n_patches and attempts < max_attempts:
            attempts += 1
            x = rnd.randint(0, max_x)
            y = rnd.randint(0, max_y)

            # Check if this patch overlaps with any existing patch
            new_rect = (x, y, x + patch_w, y + patch_h)
            overlaps = False

            for used_rect in used_areas:
                if _rectangles_overlap(new_rect, used_rect):
                    overlaps = True
                    break

            if not overlaps:
                coordinates.append((x, y))
                used_areas.append(new_rect)

        if len(coordinates) < n_patches:
            print(
                f"Could only place {len(coordinates)} non-overlapping patches out of {n_patches} requested"
            )

    else:
        # Generate grid-based non-overlapping patches
        # Calculate how many patches fit in each dimension
        patches_x = max(1, (width + patch_w - 1) // patch_w)  # Ceiling division
        patches_y = max(1, (height + patch_h - 1) // patch_h)
        max_grid_patches = patches_x * patches_y

        if n_patches > max_grid_patches:
            print(
                f"Requested {n_patches} patches, but only {max_grid_patches} non-overlapping patches fit"
            )
            n_patches = max_grid_patches

        # Calculate spacing to distribute patches evenly
        if patches_x > 1:
            step_x = (width - patch_w) // (patches_x - 1)
        else:
            step_x = 0

        if patches_y > 1:
            step_y = (height - patch_h) // (patches_y - 1)
        else:
            step_y = 0

        # Generate grid coordinates
        patch_count = 0
        for row in range(patches_y):
            for col in range(patches_x):
                if patch_count >= n_patches:
                    break

                x = min(col * step_x, max_x)
                y = min(row * step_y, max_y)
                coordinates.append((x, y))
                patch_count += 1

            if patch_count >= n_patches:
                break

    return coordinates, actual_patch_size

label_to_rois(label_img, z_slice, channel, timepoint, model_type, is_volumetric=False, patch_offset=None) #

Convert a 2D or 3D label image to OMERO ROI shapes

Parameters:

Name Type Description Default
label_img ndarray

2D labeled image or 3D labeled stack

required
z_slice int or list

Z-slice index or list/range of Z indices

required
channel int

Channel index

required
timepoint int

Time point index

required
model_type str

SAM model type used

required
is_volumetric bool

Whether the label image is 3D volumetric data

False
patch_offset

Optional (x,y) offset for placing ROIs in a larger image

None

Returns:

Name Type Description
list

List of OMERO shape objects

Source code in src/omero_annotate_ai/processing/image_functions.py
def label_to_rois(
    label_img,
    z_slice,
    channel,
    timepoint,
    model_type,
    is_volumetric=False,
    patch_offset=None,
):
    """
    Convert a 2D or 3D label image to OMERO ROI shapes

    Args:
        label_img (np.ndarray): 2D labeled image or 3D labeled stack
        z_slice (int or list): Z-slice index or list/range of Z indices
        channel (int): Channel index
        timepoint (int): Time point index
        model_type (str): SAM model type used
        is_volumetric (bool): Whether the label image is 3D volumetric data
        patch_offset: Optional (x,y) offset for placing ROIs in a larger image

    Returns:
        list: List of OMERO shape objects
    """
    shapes = []

    # Unpack patch offset if provided
    x_offset, y_offset = (0, 0) if patch_offset is None else patch_offset

    if is_volumetric and label_img.ndim > 2:
        # 3D volumetric data - process each z slice
        for z_index, z_plane in enumerate(label_img):
            # If z_slice is a range or list, use the actual z-index from that range
            if isinstance(z_slice, (range, list)):
                actual_z = (
                    z_slice[z_index] if z_index < len(z_slice) else z_slice[0] + z_index
                )
            else:
                actual_z = z_slice + z_index  # Assume z_slice is the starting index

            print(f"Processing volumetric ROIs for z-slice {actual_z}")
            shapes.extend(
                process_label_plane(
                    z_plane,
                    actual_z,
                    channel,
                    timepoint,
                    model_type,
                    x_offset,
                    y_offset,
                )
            )
    else:
        # 2D data - process single plane
        shapes.extend(
            process_label_plane(
                label_img, z_slice, channel, timepoint, model_type, x_offset, y_offset
            )
        )

    return shapes

mask_to_contour(mask) #

Converts a binary mask to a list of ROI coordinates.

Parameters:

Name Type Description Default
mask ndarray

binary mask

required

Returns:

Name Type Description
list

list of ROI coordinates

Source code in src/omero_annotate_ai/processing/image_functions.py
def mask_to_contour(mask):
    """Converts a binary mask to a list of ROI coordinates.

    Args:
        mask (np.ndarray): binary mask

    Returns:
        list: list of ROI coordinates
    """

    contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    return contours

prepare_training_data_from_table(conn: Any, table_id: int, output_dir: Union[str, Path], training_name: str = 'micro_sam_training', validation_split: float = 0.2, clean_existing: bool = True, tmp_dir: Optional[Union[str, Path]] = None, verbose: bool = False) -> Dict[str, Any] #

Prepare training data from OMERO annotation table.

Downloads images and labels from OMERO based on annotation table data, splits into training/validation sets, and organizes into directory structure suitable for micro-SAM training.

Parameters:

Name Type Description Default
conn Any

OMERO connection object

required
table_id int

ID of the annotation table in OMERO

required
output_dir Union[str, Path]

Directory to store training data

required
training_name str

Name for the training session (used in directory naming)

'micro_sam_training'
validation_split float

Fraction of data to use for validation (0.0-1.0) if not already defined in the table

0.2
clean_existing bool

Whether to clean existing output directories

True
tmp_dir Optional[Union[str, Path]]

Temporary directory for downloads (optional)

None
verbose bool

If True, show detailed debug information in console output

False

Returns:

Type Description
Dict[str, Any]

Dictionary with paths to created directories:

Dict[str, Any]

{ 'base_dir': Path to base output directory, 'training_input': Path to training images, 'training_label': Path to training labels, 'val_input': Path to validation images, 'val_label': Path to validation labels, 'stats': Statistics about the prepared data

Dict[str, Any]

}

Raises:

Type Description
ValueError

If table not found or invalid parameters

ImportError

If required dependencies missing

Source code in src/omero_annotate_ai/processing/training_functions.py
def prepare_training_data_from_table(
    conn: Any,
    table_id: int,
    output_dir: Union[str, Path],
    training_name: str = "micro_sam_training",
    validation_split: float = 0.2,
    clean_existing: bool = True,
    tmp_dir: Optional[Union[str, Path]] = None,
    verbose: bool = False
) -> Dict[str, Any]:
    """
    Prepare training data from OMERO annotation table.

    Downloads images and labels from OMERO based on annotation table data,
    splits into training/validation sets, and organizes into directory structure
    suitable for micro-SAM training.

    Args:
        conn: OMERO connection object
        table_id: ID of the annotation table in OMERO
        output_dir: Directory to store training data
        training_name: Name for the training session (used in directory naming)
        validation_split: Fraction of data to use for validation (0.0-1.0) if not already defined in the table
        clean_existing: Whether to clean existing output directories
        tmp_dir: Temporary directory for downloads (optional)
        verbose: If True, show detailed debug information in console output

    Returns:
        Dictionary with paths to created directories:
        {
            'base_dir': Path to base output directory,
            'training_input': Path to training images,
            'training_label': Path to training labels,
            'val_input': Path to validation images, 
            'val_label': Path to validation labels,
            'stats': Statistics about the prepared data
        }

    Raises:
        ValueError: If table not found or invalid parameters
        ImportError: If required dependencies missing
    """
    # Validate parameters
    if not 0.0 <= validation_split <= 1.0:
        raise ValueError("validation_split must be between 0.0 and 1.0")

    # Convert paths
    output_dir = Path(output_dir)
    try:
        output_dir.mkdir(parents=True, exist_ok=True)
    except Exception as e:
        raise OSError(f"Failed to create output directory {output_dir}: {e}")

    # Set up logger for this training session
    logger = create_training_logger(output_dir, verbose=verbose)
    logger.info(f"Starting training data preparation from table {table_id}")
    logger.debug(f"Parameters: output_dir={output_dir}, validation_split={validation_split}, clean_existing={clean_existing}")

    if tmp_dir is None:
        tmp_dir = output_dir / "tmp"
    tmp_dir = Path(tmp_dir)
    try:
        tmp_dir.mkdir(parents=True, exist_ok=True)
    except Exception as e:
        raise OSError(f"Failed to create temporary directory {tmp_dir}: {e}")

    try:
        table = ezomero.get_table(conn, table_id)
    except Exception as e:
        raise ValueError(f"Failed to load table {table_id}: {e}")

    if table is None or len(table) == 0:
        raise ValueError(f"Table {table_id} is empty or not found")

    logger.info(f"Loaded table with {len(table)} rows")

    # Save the table locally for inspection (without debug in name)
    table_path = output_dir / f"table_{table_id}.csv"
    try:
        table.to_csv(table_path, index=True)
        logger.info(f"Table saved to: {table_path}")
    except Exception as e:
        logger.warning(f"Failed to save table: {e}")

    # Check if 'processed' column exists and filter to only processed rows
    if 'processed' in table.columns:
        initial_count = len(table)
        unprocessed_count = len(table[~table['processed']])

        if unprocessed_count > 0:
            logger.warning(f"Found {unprocessed_count} unprocessed rows out of {initial_count} total rows")
            logger.info(f"Proceeding with {initial_count - unprocessed_count} processed rows for training")

        # Filter to only processed rows
        table = table[table['processed']].copy()

        if len(table) == 0:
            raise ValueError("No processed rows found in the table. Cannot proceed with training.")

        logger.info(f"Using {len(table)} processed rows for training")

    else:
        logger.warning("No 'processed' column found - assuming all rows are ready for training")

    # Validate table schema and data integrity
    validate_table_schema(table, logger)
    logger.info("Table schema validated for processing")

    # Clean existing directories if requested
    if clean_existing:
        folders = ["training_input", "training_label", "val_input", "val_label"]
        for folder in folders:
            folder_path = output_dir / folder
            if folder_path.exists():
                shutil.rmtree(folder_path)

    # Split data based on existing 'train'/'validate' columns or automatic split
    if 'train' in table.columns and 'validate' in table.columns:
        # Use existing split from table
        train_images = table[table['train']]
        val_images = table[table['validate']]
        logger.info(f"Using existing train/validate split from table")
    else:
        # Automatic split
        n_val = int(len(table) * validation_split)
        shuffled_indices = np.random.permutation(len(table))
        val_indices = shuffled_indices[:n_val]
        train_indices = shuffled_indices[n_val:]

        train_images = table.iloc[train_indices]
        val_images = table.iloc[val_indices]
        logger.info(f"Applied automatic split with validation_split={validation_split}")

    logger.info(f"Using {len(train_images)} training images and {len(val_images)} validation images")

    # Prepare training data
    training_input_dir, training_label_dir = _prepare_dataset_from_table(
        conn, train_images, output_dir, subset_type="training", tmp_dir=tmp_dir, logger=logger, verbose=verbose
    )

    # Prepare validation data
    val_input_dir, val_label_dir = _prepare_dataset_from_table(
        conn, val_images, output_dir, subset_type="val", tmp_dir=tmp_dir, logger=logger, verbose=verbose
    )

    # Clean up temporary directory
    if tmp_dir.exists():
        shutil.rmtree(tmp_dir)
        logger.debug(f"Cleaned up temporary directory: {tmp_dir}")

    # Collect statistics
    stats = {
        'n_training_images': len(list(training_input_dir.glob('*.tif'))),
        'n_training_labels': len(list(training_label_dir.glob('*.tif'))),
        'n_val_images': len(list(val_input_dir.glob('*.tif'))),
        'n_val_labels': len(list(val_label_dir.glob('*.tif'))),
        'total_rows_processed': len(table)
    }

    result = {
        'base_dir': output_dir,
        'training_input': training_input_dir,
        'training_label': training_label_dir,
        'val_input': val_input_dir,
        'val_label': val_label_dir,
        'stats': stats
    }

    # Check if preparation actually succeeded
    if stats['n_training_images'] == 0 and stats['n_val_images'] == 0:
        logger.error(f"Training data preparation FAILED in: {output_dir}")
        logger.error(f"Statistics: {stats}")
        raise ValueError("Training data preparation failed - no images were processed successfully. Check the error messages above.")
    else:
        logger.info(f"Training data prepared successfully in: {output_dir}")
        logger.info(f"Statistics: {stats}")

    return result

process_label_plane(label_plane, z_slice, channel, timepoint, model_type, x_offset=0, y_offset=0) #

Process a single 2D label plane to generate OMERO shapes with optional offset

Parameters:

Name Type Description Default
label_plane

2D label plane (numpy array)

required
z_slice

Z-slice index

required
channel

Channel index

required
timepoint

Time point index

required
model_type

SAM model type identifier

required
x_offset

X offset for contour coordinates (default: 0)

0
y_offset

Y offset for contour coordinates (default: 0)

0

Returns:

Name Type Description
list

List of OMERO shapes

Source code in src/omero_annotate_ai/processing/image_functions.py
def process_label_plane(
    label_plane, z_slice, channel, timepoint, model_type, x_offset=0, y_offset=0
):
    """
    Process a single 2D label plane to generate OMERO shapes with optional offset

    Args:
        label_plane: 2D label plane (numpy array)
        z_slice: Z-slice index
        channel: Channel index
        timepoint: Time point index
        model_type: SAM model type identifier
        x_offset: X offset for contour coordinates (default: 0)
        y_offset: Y offset for contour coordinates (default: 0)

    Returns:
        list: List of OMERO shapes
    """

    shapes = []
    unique_labels = np.unique(label_plane)

    # Skip background (label 0)
    for label in unique_labels[1:]:
        # Create binary mask for this label
        mask = (label_plane == label).astype(np.uint8)

        # Get contours
        contours = mask_to_contour(mask)

        # Convert each contour to polygon ROI
        for contour in contours:
            contour = contour[:, 0, :]  # Reshape to (N, 2)

            # Apply offset to contour points if needed
            if x_offset != 0 or y_offset != 0:
                contour = contour + np.array([x_offset, y_offset])

            # Create polygon without text parameter
            poly = ezomero.rois.Polygon(
                points=contour,  # explicitly name the points parameter
                z=z_slice,
                c=channel,
                t=timepoint,
                label=f'micro_sam.{"volumetric" if isinstance(z_slice, (list, range)) or z_slice > 0 else "manual"}_instance_segmentation.{model_type}',
            )
            shapes.append(poly)

    return shapes

run_training(training_config: Dict[str, Any], framework: str = 'microsam') -> Dict[str, Any] #

Execute training with framework-specific implementation.

Parameters:

Name Type Description Default
training_config Dict[str, Any]

Configuration dictionary from setup_training()

required
framework str

Training framework to use ("microsam", future: "cellpose", etc.)

'microsam'

Returns:

Type Description
Dict[str, Any]

Dictionary containing training results and model paths

Raises:

Type Description
ValueError

If framework is not supported

ImportError

If required framework packages are not available

Source code in src/omero_annotate_ai/processing/training_utils.py
def run_training(
    training_config: Dict[str, Any],
    framework: str = "microsam"
) -> Dict[str, Any]:
    """
    Execute training with framework-specific implementation.

    Args:
        training_config: Configuration dictionary from setup_training()
        framework: Training framework to use ("microsam", future: "cellpose", etc.)

    Returns:
        Dictionary containing training results and model paths

    Raises:
        ValueError: If framework is not supported
        ImportError: If required framework packages are not available
    """
    if framework.lower() == "microsam":
        return _run_microsam_training(training_config)
    else:
        supported_frameworks = ["microsam"]
        raise ValueError(
            f"Unsupported framework: {framework}. "
            f"Supported frameworks: {supported_frameworks}"
        )

setup_training(training_result: Dict[str, Any], model_name: str = '', model_type: str = 'vit_b_lm', epochs: int = 50, n_iterations: Optional[int] = None, batch_size: int = 2, learning_rate: float = 1e-05, patch_shape: Union[Tuple[int, int], Tuple[int, int, int]] = (512, 512), n_objects_per_batch: int = 25, save_every: int = 1000, validate_every: int = 500, **kwargs) -> Dict[str, Any] #

Setup training configuration from training_result dict.

Parameters:

Name Type Description Default
training_result Dict[str, Any]

Dictionary from prepare_training_data_from_table()

required
model_name str

Name for the training session/model

''
model_type str

SAM model variant ("vit_b", "vit_l", "vit_h")

'vit_b_lm'
epochs int

Number of training epochs (primary training parameter)

50
n_iterations Optional[int]

Number of training iterations (calculated from epochs if None)

None
batch_size int

Training batch size

2
learning_rate float

Learning rate for training

1e-05
patch_shape Union[Tuple[int, int], Tuple[int, int, int]]

Input patch dimensions (height, width) or (slices, height, width)

(512, 512)
n_objects_per_batch int

Number of objects per batch for sampling

25
save_every int

Save checkpoint every N iterations

1000
validate_every int

Run validation every N iterations

500
**kwargs

Framework-specific parameters

{}

Returns:

Type Description
Dict[str, Any]

Dictionary containing all training configuration and paths

Raises:

Type Description
ValueError

If training_result is missing required keys

FileNotFoundError

If training directories don't exist

Source code in src/omero_annotate_ai/processing/training_utils.py
def setup_training(
    training_result: Dict[str, Any],
    model_name: str = "",
    # Model parameters
    model_type: str = "vit_b_lm",
    # Training parameters
    epochs: int = 50,
    n_iterations: Optional[int] = None,
    batch_size: int = 2,
    learning_rate: float = 1e-5,

    # Data parameters
    patch_shape: Union[Tuple[int, int], Tuple[int, int, int]] = (512, 512),
    n_objects_per_batch: int = 25,

    # Checkpointing
    save_every: int = 1000,
    validate_every: int = 500,

    **kwargs
) -> Dict[str, Any]:
    """
    Setup training configuration from training_result dict.

    Args:
        training_result: Dictionary from prepare_training_data_from_table()
        model_name: Name for the training session/model
        model_type: SAM model variant ("vit_b", "vit_l", "vit_h")
        epochs: Number of training epochs (primary training parameter)
        n_iterations: Number of training iterations (calculated from epochs if None)
        batch_size: Training batch size
        learning_rate: Learning rate for training
        patch_shape: Input patch dimensions (height, width) or (slices, height, width)
        n_objects_per_batch: Number of objects per batch for sampling
        save_every: Save checkpoint every N iterations
        validate_every: Run validation every N iterations
        **kwargs: Framework-specific parameters

    Returns:
        Dictionary containing all training configuration and paths

    Raises:
        ValueError: If training_result is missing required keys
        FileNotFoundError: If training directories don't exist
    """
    # Validate training_result dict
    required_keys = ['training_input', 'training_label', 'val_input', 'val_label']
    missing_keys = [key for key in required_keys if key not in training_result]
    if missing_keys:
        raise ValueError(f"training_result missing required keys: {missing_keys}")

    # Validate directories exist
    for key in required_keys:
        path = Path(training_result[key])
        if not path.exists():
            raise FileNotFoundError(f"Training directory does not exist: {path}")

    # Generate model name and output paths
    timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
    if not model_name or model_name.strip() == '':
        model_name = f"micro_sam_training_{timestamp}"

    # Determine output directory - use from training_result or create from paths
    if 'output_dir' in training_result:
        output_dir = Path(training_result['output_dir'])
    else:
        # Infer output directory from training paths
        training_path = Path(training_result['training_input'])
        output_dir = training_path.parent

    checkpoint_folder = output_dir / "checkpoints"

    # Calculate n_iterations if not provided
    if n_iterations is None and epochs > 0:
        # Estimate iterations per epoch based on dataset size
        training_stats = training_result.get('stats', {})
        n_training_images = training_stats.get('n_training_images', 100)
        # Rough estimate: iterations per epoch = dataset_size / batch_size
        iterations_per_epoch = max(1, n_training_images // batch_size)
        n_iterations = epochs * iterations_per_epoch

    # Build training configuration
    training_config = {
        # Paths
        'training_input': Path(training_result['training_input']),
        'training_label': Path(training_result['training_label']),
        'val_input': Path(training_result['val_input']),
        'val_label': Path(training_result['val_label']),
        'output_dir': output_dir,
        'checkpoint_folder': checkpoint_folder,
        'model_name': model_name,

        # Model parameters
        'model_type': model_type,

        # Training parameters
        'epochs': epochs,
        'n_iterations': n_iterations,
        'batch_size': batch_size,
        'learning_rate': learning_rate,

        # Data parameters
        'patch_shape': patch_shape,
        'n_objects_per_batch': n_objects_per_batch,

        # Checkpointing
        'save_every': save_every,
        'validate_every': validate_every,

        # Original training result for reference
        'training_result': training_result,
    }

    # Add any additional framework-specific parameters
    training_config.update(kwargs)

    return training_config

validate_table_schema(df: pd.DataFrame, logger=None) -> None #

Validate that the table has the required columns and basic data integrity.

Parameters:

Name Type Description Default
df DataFrame

DataFrame from OMERO table

required
logger

Optional logger instance for logging messages

None

Raises:

Type Description
ValueError

If required columns are missing or data integrity issues found

Source code in src/omero_annotate_ai/processing/training_functions.py
def validate_table_schema(df: pd.DataFrame, logger=None) -> None:
    """
    Validate that the table has the required columns and basic data integrity.

    Args:
        df: DataFrame from OMERO table
        logger: Optional logger instance for logging messages

    Raises:
        ValueError: If required columns are missing or data integrity issues found
    """
    # Required columns for training data preparation
    required_columns = {
        'image_id', 'channel', 'z_slice', 'timepoint', 'label_id', 
        'train', 'validate', 'is_patch', 'patch_x', 'patch_y', 
        'patch_width', 'patch_height'
    }

    # Optional columns that enhance functionality
    optional_columns = {'is_volumetric'}

    missing_columns = required_columns - set(df.columns)
    if missing_columns:
        raise ValueError(f"Missing required columns: {sorted(missing_columns)}")

    # Check for completely null critical columns
    critical_columns = ['image_id', 'label_id']
    for col in critical_columns:
        if col in df.columns and df[col].isna().all():
            raise ValueError(f"Column '{col}' contains no valid data")

    # Basic data type check for image_id (should be numeric)
    if not pd.api.types.is_numeric_dtype(df['image_id']):
        try:
            pd.to_numeric(df['image_id'], errors='raise')
        except (ValueError, TypeError):
            raise ValueError("Column 'image_id' contains non-numeric data")

    # Log optional columns that are available
    available_optional = optional_columns.intersection(set(df.columns))
    if available_optional:
        message = f"Optional columns found: {sorted(available_optional)}"
        if logger:
            logger.debug(message)
        else:
            print(message)