Skip to content

Index

omero_annotate_ai.processing #

Image and file processing functionality.

generate_patch_coordinates(image_shape: Tuple[int, int], patch_size: List[int], n_patches: int, random_patch: bool = True) -> Tuple[List[Tuple[int, int]], Tuple[int, int]] #

Generate non-overlapping patch coordinates for an image.

CRUCIAL: Ensures patches do not overlap when generating multiple patches.

Parameters:

Name Type Description Default
image_shape Tuple[int, int]

(height, width) of the image

required
patch_size List[int]

(height, width) of patches

required
n_patches int

Number of patches to generate

required
random_patch bool

Whether to generate random patches or grid-based patches

True

Returns:

Type Description
List[Tuple[int, int]]

Tuple containing:

Tuple[int, int]
  • List of (x, y) coordinates for patch top-left corners (non-overlapping)
Tuple[List[Tuple[int, int]], Tuple[int, int]]
  • Actual patch size (height, width) to use (adjusted if image smaller than patch)
Source code in src/omero_annotate_ai/processing/image_functions.py
def generate_patch_coordinates(
    image_shape: Tuple[int, int],
    patch_size: List[int],
    n_patches: int,
    random_patch: bool = True,
) -> Tuple[List[Tuple[int, int]], Tuple[int, int]]:
    """Generate non-overlapping patch coordinates for an image.

    CRUCIAL: Ensures patches do not overlap when generating multiple patches.

    Args:
        image_shape: (height, width) of the image
        patch_size: (height, width) of patches
        n_patches: Number of patches to generate
        random_patch: Whether to generate random patches or grid-based patches

    Returns:
        Tuple containing:
        - List of (x, y) coordinates for patch top-left corners (non-overlapping)
        - Actual patch size (height, width) to use (adjusted if image smaller than patch)
    """
    height, width = image_shape
    patch_h, patch_w = patch_size

    # Check if image is smaller than patch
    if width < patch_w or height < patch_h:
        # Image smaller than patch, return image size as patch size
        print("Image smaller than patch size, using full image")
        actual_patch_size = (height, width)
        return [(0, 0)], actual_patch_size

    # Image is large enough for requested patch size
    actual_patch_size = (patch_h, patch_w)

    # Ensure patches fit within image
    max_x = max(0, width - patch_w)
    max_y = max(0, height - patch_h)

    coordinates = []

    if random_patch:
        # Generate random non-overlapping coordinates
        used_areas = []  # Track used rectangular areas
        max_attempts = n_patches * 20  # Limit attempts to avoid infinite loops
        attempts = 0

        while len(coordinates) < n_patches and attempts < max_attempts:
            attempts += 1
            x = rnd.randint(0, max_x)
            y = rnd.randint(0, max_y)

            # Check if this patch overlaps with any existing patch
            new_rect = (x, y, x + patch_w, y + patch_h)
            overlaps = False

            for used_rect in used_areas:
                if _rectangles_overlap(new_rect, used_rect):
                    overlaps = True
                    break

            if not overlaps:
                coordinates.append((x, y))
                used_areas.append(new_rect)

        if len(coordinates) < n_patches:
            print(
                f"Could only place {len(coordinates)} non-overlapping patches out of {n_patches} requested"
            )

    else:
        # Generate grid-based non-overlapping patches
        # Calculate how many patches fit in each dimension
        patches_x = max(1, (width + patch_w - 1) // patch_w)  # Ceiling division
        patches_y = max(1, (height + patch_h - 1) // patch_h)
        max_grid_patches = patches_x * patches_y

        if n_patches > max_grid_patches:
            print(
                f"Requested {n_patches} patches, but only {max_grid_patches} non-overlapping patches fit"
            )
            n_patches = max_grid_patches

        # Calculate spacing to distribute patches evenly
        if patches_x > 1:
            step_x = (width - patch_w) // (patches_x - 1)
        else:
            step_x = 0

        if patches_y > 1:
            step_y = (height - patch_h) // (patches_y - 1)
        else:
            step_y = 0

        # Generate grid coordinates
        patch_count = 0
        for row in range(patches_y):
            for col in range(patches_x):
                if patch_count >= n_patches:
                    break

                x = min(col * step_x, max_x)
                y = min(row * step_y, max_y)
                coordinates.append((x, y))
                patch_count += 1

            if patch_count >= n_patches:
                break

    return coordinates, actual_patch_size

label_to_rois(label_img, z_slice, channel, timepoint, is_volumetric=False, patch_offset=None) #

Convert a 2D or 3D label image to OMERO ROI shapes

Parameters:

Name Type Description Default
label_img ndarray

2D labeled image or 3D labeled stack

required
z_slice int or list

Z-slice index or list/range of Z indices

required
channel int

Channel index

required
timepoint int

Time point index

required
is_volumetric bool

Whether the label image is 3D volumetric data

False
patch_offset

Optional (x,y) offset for placing ROIs in a larger image

None

Returns:

Name Type Description
list

List of OMERO shape objects

Source code in src/omero_annotate_ai/processing/image_functions.py
def label_to_rois(
    label_img,
    z_slice,
    channel,
    timepoint,
    is_volumetric=False,
    patch_offset=None,
):
    """
    Convert a 2D or 3D label image to OMERO ROI shapes

    Args:
        label_img (np.ndarray): 2D labeled image or 3D labeled stack
        z_slice (int or list): Z-slice index or list/range of Z indices
        channel (int): Channel index
        timepoint (int): Time point index
        is_volumetric (bool): Whether the label image is 3D volumetric data
        patch_offset: Optional (x,y) offset for placing ROIs in a larger image

    Returns:
        list: List of OMERO shape objects
    """
    shapes = []

    # Unpack patch offset if provided
    x_offset, y_offset = (0, 0) if patch_offset is None else patch_offset

    if is_volumetric and label_img.ndim > 2:
        # 3D volumetric data - process each z slice
        for z_index, z_plane in enumerate(label_img):
            # If z_slice is a range or list, use the actual z-index from that range
            if isinstance(z_slice, (range, list)):
                actual_z = (
                    z_slice[z_index] if z_index < len(z_slice) else z_slice[0] + z_index
                )
            else:
                actual_z = z_slice + z_index  # Assume z_slice is the starting index

            print(f"Processing volumetric ROIs for z-slice {actual_z}")
            shapes.extend(
                process_label_plane(
                    z_plane,
                    actual_z,
                    channel,
                    timepoint,
                    x_offset,
                    y_offset,
                )
            )
    else:
        # 2D data - process single plane
        shapes.extend(
            process_label_plane(
                label_img, z_slice, channel, timepoint, x_offset, y_offset
            )
        )

    return shapes

mask_to_contour(mask) #

Converts a binary mask to a list of ROI coordinates.

Parameters:

Name Type Description Default
mask ndarray

binary mask

required

Returns:

Name Type Description
list

list of ROI coordinates

Source code in src/omero_annotate_ai/processing/image_functions.py
def mask_to_contour(mask):
    """Converts a binary mask to a list of ROI coordinates.

    Args:
        mask (np.ndarray): binary mask

    Returns:
        list: list of ROI coordinates
    """

    contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    return contours

prepare_training_data_from_table(conn: Any, table_id: int, output_dir: Union[str, Path], training_name: str = 'micro_sam_training', validation_split: float = 0.2, clean_existing: bool = True, tmp_dir: Optional[Union[str, Path]] = None, verbose: bool = False, label_channel: Optional[int] = None, training_channels: Optional[List[int]] = None, upload_label_input: bool = False) -> Dict[str, Any] #

Prepare training data from OMERO annotation table.

Downloads images and labels from OMERO based on annotation table data, splits into training/validation sets, and organizes into directory structure suitable for micro-SAM training.

Parameters:

Name Type Description Default
conn Any

OMERO connection object

required
table_id int

ID of the annotation table in OMERO

required
output_dir Union[str, Path]

Directory to store training data

required
training_name str

Name for the training session (used in directory naming)

'micro_sam_training'
validation_split float

Fraction of data to use for validation (0.0-1.0) if not already defined in the table

0.2
clean_existing bool

Whether to clean existing output directories

True
tmp_dir Optional[Union[str, Path]]

Temporary directory for downloads (optional)

None
verbose bool

If True, show detailed debug information in console output

False
label_channel Optional[int]

Optional channel index for label/segmentation images. If provided and different from training_channels, downloads label channel images to *_label_input directories alongside the training data.

None
training_channels Optional[List[int]]

Optional list of channel indices for training input images. If different from label_channel, downloads from these channels for training_input and val_input. Currently uses first channel if multiple specified.

None
upload_label_input bool

If True and using separate channels, uploads the label_input images back to OMERO as file annotations. Default is False.

False

Returns:

Type Description
Dict[str, Any]

Dictionary with paths to created directories:

Dict[str, Any]

{ 'base_dir': Path to base output directory, 'training_input': Path to training images, 'training_label': Path to training labels (segmentation masks), 'training_label_input': Path to label channel images (only if separate channels), 'val_input': Path to validation images, 'val_label': Path to validation labels (segmentation masks), 'val_label_input': Path to label channel images for validation (only if separate channels), 'stats': Statistics about the prepared data

Dict[str, Any]

}

Raises:

Type Description
ValueError

If table not found or invalid parameters

ImportError

If required dependencies missing

Source code in src/omero_annotate_ai/processing/training_functions.py
def prepare_training_data_from_table(
    conn: Any,
    table_id: int,
    output_dir: Union[str, Path],
    training_name: str = "micro_sam_training",
    validation_split: float = 0.2,
    clean_existing: bool = True,
    tmp_dir: Optional[Union[str, Path]] = None,
    verbose: bool = False,
    label_channel: Optional[int] = None,
    training_channels: Optional[List[int]] = None,
    upload_label_input: bool = False,
) -> Dict[str, Any]:
    """
    Prepare training data from OMERO annotation table.

    Downloads images and labels from OMERO based on annotation table data,
    splits into training/validation sets, and organizes into directory structure
    suitable for micro-SAM training.

    Args:
        conn: OMERO connection object
        table_id: ID of the annotation table in OMERO
        output_dir: Directory to store training data
        training_name: Name for the training session (used in directory naming)
        validation_split: Fraction of data to use for validation (0.0-1.0) if not already defined in the table
        clean_existing: Whether to clean existing output directories
        tmp_dir: Temporary directory for downloads (optional)
        verbose: If True, show detailed debug information in console output
        label_channel: Optional channel index for label/segmentation images. If provided
            and different from training_channels, downloads label channel images to
            *_label_input directories alongside the training data.
        training_channels: Optional list of channel indices for training input images.
            If different from label_channel, downloads from these channels for
            training_input and val_input. Currently uses first channel if multiple specified.
        upload_label_input: If True and using separate channels, uploads the label_input
            images back to OMERO as file annotations. Default is False.

    Returns:
        Dictionary with paths to created directories:
        {
            'base_dir': Path to base output directory,
            'training_input': Path to training images,
            'training_label': Path to training labels (segmentation masks),
            'training_label_input': Path to label channel images (only if separate channels),
            'val_input': Path to validation images,
            'val_label': Path to validation labels (segmentation masks),
            'val_label_input': Path to label channel images for validation (only if separate channels),
            'stats': Statistics about the prepared data
        }

    Raises:
        ValueError: If table not found or invalid parameters
        ImportError: If required dependencies missing
    """
    # Validate parameters
    if not 0.0 <= validation_split <= 1.0:
        raise ValueError("validation_split must be between 0.0 and 1.0")

    # Convert paths
    output_dir = Path(output_dir)
    try:
        output_dir.mkdir(parents=True, exist_ok=True)
    except Exception as e:
        raise OSError(f"Failed to create output directory {output_dir}: {e}")

    # Set up logger for this training session
    logger = create_training_logger(output_dir, verbose=verbose)
    logger.info(f"Starting training data preparation from table {table_id}")
    logger.debug(
        f"Parameters: output_dir={output_dir}, validation_split={validation_split}, clean_existing={clean_existing}"
    )

    if tmp_dir is None:
        tmp_dir = output_dir / "tmp"
    tmp_dir = Path(tmp_dir)
    try:
        tmp_dir.mkdir(parents=True, exist_ok=True)
    except Exception as e:
        raise OSError(f"Failed to create temporary directory {tmp_dir}: {e}")

    try:
        table = ezomero.get_table(conn, table_id)
    except Exception as e:
        raise ValueError(f"Failed to load table {table_id}: {e}")

    if table is None or len(table) == 0:
        raise ValueError(f"Table {table_id} is empty or not found")

    logger.info(f"Loaded table with {len(table)} rows")

    # Save the table locally for inspection
    table_path = output_dir / f"table_{table_id}.csv"
    try:
        table.to_csv(table_path, index=True)
        logger.info(f"Table saved to: {table_path}")
    except Exception as e:
        logger.warning(f"Failed to save table: {e}")

    # Check if 'processed' column exists and filter to only processed rows
    if "processed" in table.columns:
        initial_count = len(table)
        unprocessed_count = len(table[~table["processed"]])

        if unprocessed_count > 0:
            logger.warning(
                f"Found {unprocessed_count} unprocessed rows out of {initial_count} total rows"
            )
            logger.info(
                f"Proceeding with {initial_count - unprocessed_count} processed rows for training"
            )

        # Filter to only processed rows
        table = table[table["processed"]].copy()

        if len(table) == 0:
            raise ValueError(
                "No processed rows found in the table. Cannot proceed with training."
            )

        logger.info(f"Using {len(table)} processed rows for training")

    else:
        logger.warning(
            "No 'processed' column found - assuming all rows are ready for training"
        )

    # Validate table schema and data integrity
    validate_table_schema(table, logger)
    logger.info("Table schema validated for processing")

    # Determine if we're using separate channels for labeling and training
    uses_separate_channels = (
        label_channel is not None
        and training_channels is not None
        and label_channel not in training_channels
    )

    if uses_separate_channels:
        logger.info(
            f"Using separate channels: label_channel={label_channel}, training_channels={training_channels}"
        )

    # Determine the effective training channel to use
    effective_train_channel = training_channels[0] if training_channels else None

    # Create standard directory structure
    created_dirs = _create_training_directories(
        output_dir=output_dir,
        uses_separate_channels=uses_separate_channels,
        include_test=False,  # Table function doesn't support test category
        clean_existing=clean_existing,
    )

    # Split data based on existing 'train'/'validate' columns or automatic split
    if "train" in table.columns and "validate" in table.columns:
        # Use existing split from table
        train_images = table[table["train"]]
        val_images = table[table["validate"]]
        logger.info(f"Using existing train/validate split from table")
    else:
        # Automatic split
        n_val = int(len(table) * validation_split)
        shuffled_indices = np.random.permutation(len(table))
        val_indices = shuffled_indices[:n_val]
        train_indices = shuffled_indices[n_val:]

        train_images = table.iloc[train_indices]
        val_images = table.iloc[val_indices]
        logger.info(f"Applied automatic split with validation_split={validation_split}")

    logger.info(
        f"Using {len(train_images)} training images and {len(val_images)} validation images"
    )

    # Prepare training data (uses training channel if specified)
    training_input_dir, training_label_dir = _prepare_dataset_from_table(
        conn,
        train_images,
        output_dir,
        subset_type="training",
        tmp_dir=tmp_dir,
        train_channel=effective_train_channel,
        logger=logger,
        verbose=verbose,
    )

    # Prepare validation data (uses training channel if specified)
    val_input_dir, val_label_dir = _prepare_dataset_from_table(
        conn,
        val_images,
        output_dir,
        subset_type="val",
        tmp_dir=tmp_dir,
        train_channel=effective_train_channel,
        logger=logger,
        verbose=verbose,
    )

    # Update created_dirs with actual paths from _prepare_dataset_from_table
    created_dirs["training_input"] = training_input_dir
    created_dirs["training_label"] = training_label_dir
    created_dirs["val_input"] = val_input_dir
    created_dirs["val_label"] = val_label_dir

    # If using separate channels, also prepare label channel images
    training_label_input_dir = None
    val_label_input_dir = None
    label_input_upload_ids = []
    if uses_separate_channels:
        logger.info(
            f"Preparing label channel ({label_channel}) images for separate channel workflow"
        )
        training_label_input_dir, _ = _prepare_dataset_from_table(
            conn,
            train_images,
            output_dir,
            subset_type="training_label",
            tmp_dir=tmp_dir,
            train_channel=label_channel,
            logger=logger,
            verbose=verbose,
        )
        val_label_input_dir, _ = _prepare_dataset_from_table(
            conn,
            val_images,
            output_dir,
            subset_type="val_label",
            tmp_dir=tmp_dir,
            train_channel=label_channel,
            logger=logger,
            verbose=verbose,
        )

        # Update created_dirs with label_input paths
        created_dirs["training_label_input"] = training_label_input_dir
        created_dirs["val_label_input"] = val_label_input_dir

        # Upload label_input images to OMERO if requested
        if upload_label_input:
            logger.info("Uploading label_input images to OMERO...")
            all_images = pd.concat([train_images, val_images])
            all_label_input_dirs = [training_label_input_dir, val_label_input_dir]

            for label_input_dir in all_label_input_dirs:
                if label_input_dir and label_input_dir.exists():
                    for tif_file in sorted(label_input_dir.glob("*.tif")):
                        # Extract index from filename (e.g., input_00001.tif -> 1)
                        try:
                            file_idx = int(tif_file.stem.split("_")[-1])
                            if file_idx < len(all_images):
                                row = all_images.iloc[file_idx]
                                image_id = int(row["image_id"])
                                timepoint = (
                                    int(row["timepoint"])
                                    if pd.notna(row.get("timepoint"))
                                    else None
                                )
                                z_slice = (
                                    int(row["z_slice"])
                                    if pd.notna(row.get("z_slice"))
                                    else None
                                )

                                # Lazy import to avoid circular dependency
                                from ..omero.omero_functions import (
                                    upload_label_input_image,
                                )

                                file_ann_id = upload_label_input_image(
                                    conn,
                                    image_id=image_id,
                                    label_input_file=str(tif_file),
                                    trainingset_name=training_name,
                                    channel=label_channel,
                                    timepoint=timepoint,
                                    z_slice=z_slice,
                                )
                                label_input_upload_ids.append(file_ann_id)
                        except (ValueError, IndexError) as e:
                            logger.warning(f"Could not upload {tif_file}: {e}")

            logger.info(
                f"Uploaded {len(label_input_upload_ids)} label_input images to OMERO"
            )

    # Clean up temporary directory
    if tmp_dir.exists():
        shutil.rmtree(tmp_dir)
        logger.debug(f"Cleaned up temporary directory: {tmp_dir}")

    # Close logger handlers to release file locks (important for Windows)
    for handler in logger.handlers[:]:
        handler.close()
        logger.removeHandler(handler)

    # Collect statistics
    stats = {
        "n_training_images": len(list(training_input_dir.glob("*.tif"))),
        "n_training_labels": len(list(training_label_dir.glob("*.tif"))),
        "n_val_images": len(list(val_input_dir.glob("*.tif"))),
        "n_val_labels": len(list(val_label_dir.glob("*.tif"))),
        "total_rows_processed": len(table),
    }

    # Add label input stats if using separate channels
    if uses_separate_channels:
        stats["n_training_label_input"] = len(
            list(training_label_input_dir.glob("*.tif"))
        )
        stats["n_val_label_input"] = len(list(val_label_input_dir.glob("*.tif")))
        if label_input_upload_ids:
            stats["n_label_input_uploaded"] = len(label_input_upload_ids)

    # Build standard result dictionary
    extra_fields = {}
    if label_input_upload_ids:
        extra_fields["label_input_upload_ids"] = label_input_upload_ids

    result = _build_standard_result(
        base_dir=output_dir, created_dirs=created_dirs, stats=stats, **extra_fields
    )

    # Check if preparation actually succeeded
    if stats["n_training_images"] == 0 and stats["n_val_images"] == 0:
        logger.error(f"Training data preparation FAILED in: {output_dir}")
        logger.error(f"Statistics: {stats}")
        raise ValueError(
            "Training data preparation failed - no images were processed successfully. Check the error messages above."
        )
    else:
        logger.info(f"Training data prepared successfully in: {output_dir}")
        logger.info(f"Statistics: {stats}")

    # Close logger handlers to release file locks (important for Windows)
    for handler in logger.handlers[:]:
        handler.close()
        logger.removeHandler(handler)

    return result

process_label_plane(label_plane, z_slice, channel, timepoint, x_offset=0, y_offset=0) #

Process a single 2D label plane to generate OMERO shapes with optional offset

Parameters:

Name Type Description Default
label_plane

2D label plane (numpy array)

required
z_slice

Z-slice index

required
channel

Channel index

required
timepoint

Time point index

required
x_offset

X offset for contour coordinates (default: 0)

0
y_offset

Y offset for contour coordinates (default: 0)

0

Returns:

Name Type Description
list

List of OMERO shapes

Source code in src/omero_annotate_ai/processing/image_functions.py
def process_label_plane(
    label_plane, z_slice, channel, timepoint, x_offset=0, y_offset=0
):
    """
    Process a single 2D label plane to generate OMERO shapes with optional offset

    Args:
        label_plane: 2D label plane (numpy array)
        z_slice: Z-slice index
        channel: Channel index
        timepoint: Time point index
        x_offset: X offset for contour coordinates (default: 0)
        y_offset: Y offset for contour coordinates (default: 0)

    Returns:
        list: List of OMERO shapes
    """

    shapes = []
    unique_labels = np.unique(label_plane)

    # Skip background (label 0)
    for label in unique_labels[1:]:
        # Create binary mask for this label
        mask = (label_plane == label).astype(np.uint8)

        # Get contours
        contours = mask_to_contour(mask)

        # Convert each contour to polygon ROI
        for contour in contours:
            contour = contour[:, 0, :]  # Reshape to (N, 2)

            # Apply offset to contour points if needed
            if x_offset != 0 or y_offset != 0:
                contour = contour + np.array([x_offset, y_offset])

            # Create polygon without text parameter
            segmentation_type = "volumetric" if isinstance(z_slice, (list, range)) or z_slice > 0 else "manual"
            poly = ezomero.rois.Polygon(
                points=contour,  # explicitly name the points parameter
                z=z_slice,
                c=channel,
                t=timepoint,
                label=f"micro_sam.{segmentation_type}_instance_segmentation",
            )
            shapes.append(poly)

    return shapes

reorganize_local_data_for_training(config: AnnotationConfig, annotation_dir: Union[str, Path], output_dir: Optional[Union[str, Path]] = None, file_mode: Literal['copy', 'move', 'symlink'] = 'copy', clean_existing: bool = True, include_test: Optional[bool] = None, verbose: bool = False) -> Dict[str, Any] #

Reorganize locally-stored annotation data into training folder structure.

Works entirely offline - no OMERO connection required. This function takes the flat folder structure from the annotation pipeline (input/, output/) and reorganizes it into the split-based structure expected by training workflows (training_input/, training_label/, val_input/, val_label/).

Parameters:

Name Type Description Default
config AnnotationConfig

AnnotationConfig with populated annotations (contains category info)

required
annotation_dir Union[str, Path]

Directory containing annotation output (input/, output/ folders)

required
output_dir Optional[Union[str, Path]]

Target directory for training structure (default: same as annotation_dir)

None
file_mode Literal['copy', 'move', 'symlink']

How to handle files: - "copy": Copy files (keeps originals) - default - "move": Move files (removes originals) - "symlink": Create symbolic links (falls back to copy on Windows if symlinks fail)

'copy'
clean_existing bool

Remove existing training folders before reorganization

True
include_test Optional[bool]

Whether to create test_input/test_label folders for test category. - None (default): Auto-detect - include if test annotations exist - True: Always include test folders - False: Never include test folders (skip test annotations)

None
verbose bool

Show detailed progress

False

Returns:

Type Description
Dict[str, Any]

Dictionary with paths to created directories and statistics:

Dict[str, Any]

{ 'base_dir': Path to base output directory, 'training_input': Path to training images, 'training_label': Path to training labels, 'training_label_input': Path to label channel images (only if separate channels), 'val_input': Path to validation images, 'val_label': Path to validation labels, 'val_label_input': Path to validation label channel images (only if separate channels), 'test_input': Path to test images (only if include_test=True), 'test_label': Path to test labels (only if include_test=True), 'stats': Statistics about the reorganized data, 'file_mapping': Mapping of annotation_id to output files

Dict[str, Any]

}

Raises:

Type Description
ValueError

If config has no annotations or no processed annotations

FileNotFoundError

If annotation_dir doesn't exist or is missing input/output folders

Source code in src/omero_annotate_ai/processing/training_functions.py
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
def reorganize_local_data_for_training(
    config: "AnnotationConfig",
    annotation_dir: Union[str, Path],
    output_dir: Optional[Union[str, Path]] = None,
    file_mode: Literal["copy", "move", "symlink"] = "copy",
    clean_existing: bool = True,
    include_test: Optional[bool] = None,
    verbose: bool = False,
) -> Dict[str, Any]:
    """
    Reorganize locally-stored annotation data into training folder structure.

    Works entirely offline - no OMERO connection required. This function takes
    the flat folder structure from the annotation pipeline (input/, output/) and
    reorganizes it into the split-based structure expected by training workflows
    (training_input/, training_label/, val_input/, val_label/).

    Args:
        config: AnnotationConfig with populated annotations (contains category info)
        annotation_dir: Directory containing annotation output (input/, output/ folders)
        output_dir: Target directory for training structure (default: same as annotation_dir)
        file_mode: How to handle files:
            - "copy": Copy files (keeps originals) - default
            - "move": Move files (removes originals)
            - "symlink": Create symbolic links (falls back to copy on Windows if symlinks fail)
        clean_existing: Remove existing training folders before reorganization
        include_test: Whether to create test_input/test_label folders for test category.
            - None (default): Auto-detect - include if test annotations exist
            - True: Always include test folders
            - False: Never include test folders (skip test annotations)
        verbose: Show detailed progress

    Returns:
        Dictionary with paths to created directories and statistics:
        {
            'base_dir': Path to base output directory,
            'training_input': Path to training images,
            'training_label': Path to training labels,
            'training_label_input': Path to label channel images (only if separate channels),
            'val_input': Path to validation images,
            'val_label': Path to validation labels,
            'val_label_input': Path to validation label channel images (only if separate channels),
            'test_input': Path to test images (only if include_test=True),
            'test_label': Path to test labels (only if include_test=True),
            'stats': Statistics about the reorganized data,
            'file_mapping': Mapping of annotation_id to output files
        }

    Raises:
        ValueError: If config has no annotations or no processed annotations
        FileNotFoundError: If annotation_dir doesn't exist or is missing input/output folders
    """
    # Convert paths
    annotation_dir = Path(annotation_dir)
    output_dir = Path(output_dir) if output_dir else annotation_dir

    # Validate annotation directory structure BEFORE setting up logger
    # (logger tries to create directories which would fail for invalid paths)
    if not annotation_dir.exists():
        raise FileNotFoundError(f"Annotation directory not found: {annotation_dir}")

    output_source = annotation_dir / "output"

    # New layout:    label_input/ + training_input/
    # Legacy layout: input/
    label_input_source = annotation_dir / "label_input"
    training_input_source = annotation_dir / "training_input"
    input_source = annotation_dir / "input"

    if not label_input_source.exists() and not input_source.exists():
        raise FileNotFoundError(
            f"No input folder found in: {annotation_dir} "
            "(expected 'label_input/' or 'input/')"
        )
    if not output_source.exists():
        raise FileNotFoundError(f"Output folder not found: {output_source}")

    # Set up logger (after validation so we know paths are valid)
    logger = create_training_logger(output_dir, verbose=verbose)
    logger.info("Reorganizing local annotation data for training")
    logger.info(f"Source: {annotation_dir}, Target: {output_dir}, Mode: {file_mode}")

    # Helper to close logger handlers (important for Windows file locks)
    def _close_logger_handlers():
        for handler in logger.handlers[:]:
            handler.close()
            logger.removeHandler(handler)

    # Validate config has annotations
    if not config.annotations:
        _close_logger_handlers()
        raise ValueError("Config has no annotations. Run annotation workflow first.")

    # Filter to processed annotations only
    processed_annotations = [ann for ann in config.annotations if ann.processed]
    if not processed_annotations:
        _close_logger_handlers()
        raise ValueError("No processed annotations found in config")

    logger.info(
        f"Found {len(processed_annotations)} processed annotations out of {len(config.annotations)} total"
    )

    # Auto-detect test annotations if include_test is None
    if include_test is None:
        has_test_annotations = any(
            ann.category == "test" for ann in processed_annotations
        )
        include_test = has_test_annotations
        if has_test_annotations:
            logger.info(
                "Auto-detected test annotations - will create test_input/test_label folders"
            )

    # Check if using separate channels
    uses_separate_channels = config.spatial_coverage.uses_separate_channels()
    if uses_separate_channels:
        logger.info(
            "Separate channel workflow detected - will create *_label_input folders"
        )

    # Determine which categories we have
    categories = set(ann.category for ann in processed_annotations)
    logger.info(f"Categories found: {categories}")

    # Create standard directory structure using helper
    created_dirs = _create_training_directories(
        output_dir=output_dir,
        uses_separate_channels=uses_separate_channels,
        include_test=include_test,
        clean_existing=clean_existing,
    )

    # Process annotations by category
    stats: Dict[str, Any] = {
        "n_training_images": 0,
        "n_training_labels": 0,
        "n_training_label_input": 0,
        "n_val_images": 0,
        "n_val_labels": 0,
        "n_val_label_input": 0,
        "n_test_images": 0,
        "n_test_labels": 0,
        "n_test_label_input": 0,
        "n_skipped": 0,
        "n_missing_input": 0,
        "n_missing_label": 0,
        "file_operations": {},
    }

    file_mapping: Dict[str, Dict[str, Any]] = {}
    category_counters: Dict[str, int] = {"training": 0, "validation": 0, "test": 0}

    # Build folder structure once (consistent with directory creation above)
    folder_structure = _get_standard_folder_structure(uses_separate_channels, include_test)

    for ann in processed_annotations:
        category = ann.category

        if category == "test" and not include_test:
            stats["n_skipped"] += 1
            continue

        # Only process known categories
        if category not in ("training", "validation", "test"):
            stats["n_skipped"] += 1
            continue

        annotation_id = ann.annotation_id

        # label_input/{id}.tif (new) or input/{id}.tif (legacy single-channel)
        label_input_file = label_input_source / f"{annotation_id}.tif"
        if not label_input_file.exists():
            label_input_file = input_source / f"{annotation_id}.tif"

        # training_input/{id}.tif only resolved when needed (separate-channel path)
        if uses_separate_channels:
            train_input_file = training_input_source / f"{annotation_id}.tif"

        label_file = output_source / f"{annotation_id}_mask.tif"

        # Get sequential index for this category
        idx = category_counters[category]
        category_counters[category] += 1

        # Determine destination folder names
        input_folder = folder_structure.get(f"{category}_input", f"{category}_input")
        label_folder = folder_structure.get(f"{category}_label", f"{category}_label")

        label_dest = output_dir / label_folder / f"label_{idx:05d}.tif"

        # Track mapping
        file_mapping[annotation_id] = {
            "category": category,
            "index": idx,
            "label_dest": str(label_dest),
        }

        if uses_separate_channels:
            # Separate-channel workflow:
            #   - label-channel image (fluorescence) → *_label_input/
            #   - training-channel image (e.g. brightfield) → *_input/
            label_input_folder = folder_structure.get(
                f"{category}_label_input", f"{category}_label_input"
            )
            label_input_dest = output_dir / label_input_folder / f"input_{idx:05d}.tif"
            input_dest = output_dir / input_folder / f"input_{idx:05d}.tif"

            file_mapping[annotation_id]["input_dest"] = str(input_dest)
            file_mapping[annotation_id]["label_input_dest"] = str(label_input_dest)

            # Copy label-channel image → *_label_input/
            if label_input_file.exists():
                label_input_dest.parent.mkdir(parents=True, exist_ok=True)
                operation = _create_file_link_or_copy(
                    label_input_file, label_input_dest, file_mode, logger
                )
                stats["file_operations"][operation] = (
                    stats["file_operations"].get(operation, 0) + 1
                )
                if category == "training":
                    stats["n_training_label_input"] += 1
                elif category == "validation":
                    stats["n_val_label_input"] += 1
                elif category == "test":
                    stats["n_test_label_input"] += 1
                logger.debug(
                    f"[{operation}] {label_input_file.name} -> {label_input_dest.name} (label_input)"
                )
            else:
                logger.warning(f"Label-channel input file not found: {label_input_file}")

            # Copy training-channel image → *_input/
            if train_input_file.exists():
                input_dest.parent.mkdir(parents=True, exist_ok=True)
                operation = _create_file_link_or_copy(
                    train_input_file, input_dest, file_mode, logger
                )
                stats["file_operations"][operation] = (
                    stats["file_operations"].get(operation, 0) + 1
                )
                if category == "training":
                    stats["n_training_images"] += 1
                elif category == "validation":
                    stats["n_val_images"] += 1
                elif category == "test":
                    stats["n_test_images"] += 1
                logger.debug(
                    f"[{operation}] {train_input_file.name} -> {input_dest.name} (train_input)"
                )
            else:
                stats["n_missing_input"] += 1
                logger.warning(
                    f"Training-channel input file not found: {train_input_file}. "
                    "Re-run the annotation pipeline to generate training channel images."
                )
        else:
            # Single-channel workflow: label-channel image goes directly to *_input/
            input_dest = output_dir / input_folder / f"input_{idx:05d}.tif"
            file_mapping[annotation_id]["input_dest"] = str(input_dest)

            if label_input_file.exists():
                input_dest.parent.mkdir(parents=True, exist_ok=True)
                operation = _create_file_link_or_copy(
                    label_input_file, input_dest, file_mode, logger
                )
                stats["file_operations"][operation] = (
                    stats["file_operations"].get(operation, 0) + 1
                )
                if category == "training":
                    stats["n_training_images"] += 1
                elif category == "validation":
                    stats["n_val_images"] += 1
                elif category == "test":
                    stats["n_test_images"] += 1
                logger.debug(f"[{operation}] {label_input_file.name} -> {input_dest.name}")
            else:
                stats["n_missing_input"] += 1
                logger.warning(f"Input file not found: {label_input_file}")

        # Process label/mask file (same for both workflows)
        if label_file.exists():
            label_dest.parent.mkdir(parents=True, exist_ok=True)
            operation = _create_file_link_or_copy(
                label_file, label_dest, file_mode, logger
            )
            stats["file_operations"][operation] = (
                stats["file_operations"].get(operation, 0) + 1
            )
            if category == "training":
                stats["n_training_labels"] += 1
            elif category == "validation":
                stats["n_val_labels"] += 1
            elif category == "test":
                stats["n_test_labels"] += 1
            logger.debug(f"[{operation}] {label_file.name} -> {label_dest.name}")
        else:
            stats["n_missing_label"] += 1
            logger.warning(f"Label file not found: {label_file}")

    # Build standard result dictionary
    result = _build_standard_result(
        base_dir=output_dir,
        created_dirs=created_dirs,
        stats=stats,
        file_mapping=file_mapping,
    )

    # Log summary
    total_processed = (
        stats["n_training_images"] + stats["n_val_images"] + stats["n_test_images"]
    )
    logger.info(f"Reorganization complete: {total_processed} images processed")
    if uses_separate_channels:
        logger.info(
            f"  Training: {stats['n_training_images']} train images, "
            f"{stats['n_training_label_input']} label-channel images, "
            f"{stats['n_training_labels']} labels"
        )
        logger.info(
            f"  Validation: {stats['n_val_images']} train images, "
            f"{stats['n_val_label_input']} label-channel images, "
            f"{stats['n_val_labels']} labels"
        )
    else:
        logger.info(
            f"  Training: {stats['n_training_images']} images, {stats['n_training_labels']} labels"
        )
        logger.info(
            f"  Validation: {stats['n_val_images']} images, {stats['n_val_labels']} labels"
        )
    if include_test:
        logger.info(
            f"  Test: {stats['n_test_images']} images, {stats['n_test_labels']} labels"
        )
    if stats["n_missing_input"] > 0 or stats["n_missing_label"] > 0:
        logger.warning(
            f"  Missing files: {stats['n_missing_input']} inputs, {stats['n_missing_label']} labels"
        )
    logger.info(f"  File operations: {stats['file_operations']}")

    # After a move, remove empty source folders
    if file_mode == "move":
        for src in [label_input_source, training_input_source, input_source, output_source]:
            try:
                if src.exists() and not any(src.iterdir()):
                    src.rmdir()
                    logger.debug(f"Removed empty source folder: {src.name}/")
            except OSError:
                pass

    _close_logger_handlers()

    return result

run_training(training_config: Dict[str, Any], framework: str = 'microsam') -> Dict[str, Any] #

Execute training with framework-specific implementation.

Parameters:

Name Type Description Default
training_config Dict[str, Any]

Configuration dictionary from setup_training()

required
framework str

Training framework to use ("microsam", future: "cellpose", etc.)

'microsam'

Returns:

Type Description
Dict[str, Any]

Dictionary containing training results and model paths

Raises:

Type Description
ValueError

If framework is not supported

ImportError

If required framework packages are not available

Source code in src/omero_annotate_ai/processing/training_utils.py
def run_training(
    training_config: Dict[str, Any],
    framework: str = "microsam"
) -> Dict[str, Any]:
    """
    Execute training with framework-specific implementation.

    Args:
        training_config: Configuration dictionary from setup_training()
        framework: Training framework to use ("microsam", future: "cellpose", etc.)

    Returns:
        Dictionary containing training results and model paths

    Raises:
        ValueError: If framework is not supported
        ImportError: If required framework packages are not available
    """
    if framework.lower() == "microsam":
        return _run_microsam_training(training_config)
    else:
        supported_frameworks = ["microsam"]
        raise ValueError(
            f"Unsupported framework: {framework}. "
            f"Supported frameworks: {supported_frameworks}"
        )

setup_training(training_result: Dict[str, Any], model_name: str = '', model_type: str = 'vit_b_lm', epochs: int = 50, n_iterations: Optional[int] = None, batch_size: int = 2, learning_rate: float = 1e-05, patch_shape: Union[Tuple[int, int], Tuple[int, int, int]] = (512, 512), n_objects_per_batch: int = 25, save_every: int = 1000, validate_every: int = 500, **kwargs) -> Dict[str, Any] #

Setup training configuration from training_result dict.

Parameters:

Name Type Description Default
training_result Dict[str, Any]

Dictionary from prepare_training_data_from_table()

required
model_name str

Name for the training session/model

''
model_type str

SAM model variant ("vit_b", "vit_l", "vit_h")

'vit_b_lm'
epochs int

Number of training epochs (primary training parameter)

50
n_iterations Optional[int]

Number of training iterations (calculated from epochs if None)

None
batch_size int

Training batch size

2
learning_rate float

Learning rate for training

1e-05
patch_shape Union[Tuple[int, int], Tuple[int, int, int]]

Input patch dimensions (height, width) or (slices, height, width)

(512, 512)
n_objects_per_batch int

Number of objects per batch for sampling

25
save_every int

Save checkpoint every N iterations

1000
validate_every int

Run validation every N iterations

500
**kwargs

Framework-specific parameters

{}

Returns:

Type Description
Dict[str, Any]

Dictionary containing all training configuration and paths

Raises:

Type Description
ValueError

If training_result is missing required keys

FileNotFoundError

If training directories don't exist

Source code in src/omero_annotate_ai/processing/training_utils.py
def setup_training(
    training_result: Dict[str, Any],
    model_name: str = "",
    # Model parameters
    model_type: str = "vit_b_lm",
    # Training parameters
    epochs: int = 50,
    n_iterations: Optional[int] = None,
    batch_size: int = 2,
    learning_rate: float = 1e-5,

    # Data parameters
    patch_shape: Union[Tuple[int, int], Tuple[int, int, int]] = (512, 512),
    n_objects_per_batch: int = 25,

    # Checkpointing
    save_every: int = 1000,
    validate_every: int = 500,

    **kwargs
) -> Dict[str, Any]:
    """
    Setup training configuration from training_result dict.

    Args:
        training_result: Dictionary from prepare_training_data_from_table()
        model_name: Name for the training session/model
        model_type: SAM model variant ("vit_b", "vit_l", "vit_h")
        epochs: Number of training epochs (primary training parameter)
        n_iterations: Number of training iterations (calculated from epochs if None)
        batch_size: Training batch size
        learning_rate: Learning rate for training
        patch_shape: Input patch dimensions (height, width) or (slices, height, width)
        n_objects_per_batch: Number of objects per batch for sampling
        save_every: Save checkpoint every N iterations
        validate_every: Run validation every N iterations
        **kwargs: Framework-specific parameters

    Returns:
        Dictionary containing all training configuration and paths

    Raises:
        ValueError: If training_result is missing required keys
        FileNotFoundError: If training directories don't exist
    """
    # Validate training_result dict
    required_keys = ['training_input', 'training_label', 'val_input', 'val_label']
    missing_keys = [key for key in required_keys if key not in training_result]
    if missing_keys:
        raise ValueError(f"training_result missing required keys: {missing_keys}")

    # Validate directories exist
    for key in required_keys:
        path = Path(training_result[key])
        if not path.exists():
            raise FileNotFoundError(f"Training directory does not exist: {path}")

    # Generate model name and output paths
    timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
    if not model_name or model_name.strip() == '':
        model_name = f"micro_sam_training_{timestamp}"

    # Determine output directory - use from training_result or create from paths
    if 'output_dir' in training_result:
        output_dir = Path(training_result['output_dir'])
    else:
        # Infer output directory from training paths
        training_path = Path(training_result['training_input'])
        output_dir = training_path.parent

    checkpoint_folder = output_dir / "checkpoints"

    # Calculate n_iterations if not provided
    if n_iterations is None and epochs > 0:
        # Estimate iterations per epoch based on dataset size
        training_stats = training_result.get('stats', {})
        n_training_images = training_stats.get('n_training_images', 100)
        # Rough estimate: iterations per epoch = dataset_size / batch_size
        iterations_per_epoch = max(1, n_training_images // batch_size)
        n_iterations = epochs * iterations_per_epoch

    # Build training configuration
    training_config = {
        # Paths
        'training_input': Path(training_result['training_input']),
        'training_label': Path(training_result['training_label']),
        'val_input': Path(training_result['val_input']),
        'val_label': Path(training_result['val_label']),
        'output_dir': output_dir,
        'checkpoint_folder': checkpoint_folder,
        'model_name': model_name,

        # Model parameters
        'model_type': model_type,

        # Training parameters
        'epochs': epochs,
        'n_iterations': n_iterations,
        'batch_size': batch_size,
        'learning_rate': learning_rate,

        # Data parameters
        'patch_shape': patch_shape,
        'n_objects_per_batch': n_objects_per_batch,

        # Checkpointing
        'save_every': save_every,
        'validate_every': validate_every,

        # Original training result for reference
        'training_result': training_result,
    }

    # Add any additional framework-specific parameters
    training_config.update(kwargs)

    return training_config

validate_table_schema(df: pd.DataFrame, logger=None) -> None #

Validate that the table has the required columns and basic data integrity.

Parameters:

Name Type Description Default
df DataFrame

DataFrame from OMERO table

required
logger

Optional logger instance for logging messages

None

Raises:

Type Description
ValueError

If required columns are missing or data integrity issues found

Source code in src/omero_annotate_ai/processing/utils.py
def validate_table_schema(df: pd.DataFrame, logger=None) -> None:
    """
    Validate that the table has the required columns and basic data integrity.

    Args:
        df: DataFrame from OMERO table
        logger: Optional logger instance for logging messages

    Raises:
        ValueError: If required columns are missing or data integrity issues found
    """
    # Required columns for training data preparation
    required_columns = {
        'image_id', 'channel', 'z_slice', 'timepoint', 'label_id', 
        'train', 'validate', 'is_patch', 'patch_x', 'patch_y', 
        'patch_width', 'patch_height'
    }

    # Optional columns that enhance functionality
    optional_columns = {'is_volumetric'}

    missing_columns = required_columns - set(df.columns)
    if missing_columns:
        raise ValueError(f"Missing required columns: {sorted(missing_columns)}")

    # Check for completely null critical columns
    critical_columns = ['image_id', 'label_id']
    for col in critical_columns:
        if col in df.columns and df[col].isna().all():
            raise ValueError(f"Column '{col}' contains no valid data")

    # Basic data type check for image_id (should be numeric)
    if not pd.api.types.is_numeric_dtype(df['image_id']):
        try:
            pd.to_numeric(df['image_id'], errors='raise')
        except (ValueError, TypeError):
            raise ValueError("Column 'image_id' contains non-numeric data")

    # Log optional columns that are available
    available_optional = optional_columns.intersection(set(df.columns))
    if available_optional:
        message = f"Optional columns found: {sorted(available_optional)}"
        if logger:
            logger.debug(message)
        else:
            print(message)