Skip to content

mihcsme_py.uploader#

mihcsme_py.uploader #

Upload MIHCSME metadata to OMERO using omero-py directly.

download_metadata_from_omero(conn: BlitzGateway, target_type: Literal['Screen', 'Plate'], target_id: int, namespace: str = DEFAULT_NS_BASE) -> MIHCSMEMetadata #

Download MIHCSME metadata from OMERO and convert to Pydantic model.

Parameters:

Name Type Description Default
conn BlitzGateway

OMERO connection

required
target_type Literal['Screen', 'Plate']

"Screen" or "Plate"

required
target_id int

ID of the target object

required
namespace str

Namespace for annotations (default: "MIHCSME")

DEFAULT_NS_BASE

Returns:

Type Description
MIHCSMEMetadata

MIHCSMEMetadata instance populated with data from OMERO

Example

from mihcsme_py import download_metadata_from_omero import ezomero

conn = ezomero.connect("omero.example.com", "user", "password") metadata = download_metadata_from_omero(conn, "Screen", 123) print(metadata.investigation_information.investigation_info.project_id)

Source code in src/mihcsme_py/uploader.py
def download_metadata_from_omero(
    conn: BlitzGateway,
    target_type: Literal["Screen", "Plate"],
    target_id: int,
    namespace: str = DEFAULT_NS_BASE,
) -> MIHCSMEMetadata:
    """
    Download MIHCSME metadata from OMERO and convert to Pydantic model.

    Args:
        conn: OMERO connection
        target_type: "Screen" or "Plate"
        target_id: ID of the target object
        namespace: Namespace for annotations (default: "MIHCSME")

    Returns:
        MIHCSMEMetadata instance populated with data from OMERO

    Example:
        >>> from mihcsme_py import download_metadata_from_omero
        >>> import ezomero
        >>>
        >>> conn = ezomero.connect("omero.example.com", "user", "password")
        >>> metadata = download_metadata_from_omero(conn, "Screen", 123)
        >>> print(metadata.investigation_information.investigation_info.project_id)
    """
    logger.info(f"Downloading metadata from {target_type} {target_id}...")

    # Get the target object
    target_obj = conn.getObject(target_type, target_id)
    if not target_obj:
        raise ValueError(f"{target_type} with ID {target_id} not found")

    # Dictionary to collect all metadata
    metadata_dict = {}

    # Helper function to get annotations from an object
    def get_annotations_as_dict(obj, ns_filter: str) -> Dict[str, Dict[str, Any]]:
        """Get MapAnnotations from an object and organize by namespace.

        The namespace structure is expected to be:
        - MIHCSME/InvestigationInformation/DataOwner
        - MIHCSME/InvestigationInformation/InvestigationInfo
        - MIHCSME/StudyInformation/Study
        - MIHCSME/AssayInformation/Assay
        etc.

        Where the format is: namespace_base/sheet_name/group_name
        """
        result = {}
        for ann in obj.listAnnotations():
            if hasattr(ann, "getNs") and ann.getNs() and ann.getNs().startswith(ns_filter):
                ns = ann.getNs()
                parts = ns.split("/")

                # Extract sheet name (second part) and group name (third part if exists)
                # e.g., "MIHCSME/InvestigationInformation/DataOwner" -> sheet_name="InvestigationInformation", group_name="DataOwner"
                if len(parts) >= 2:
                    sheet_name = parts[1]  # InvestigationInformation, StudyInformation, AssayInformation
                    group_name = parts[2] if len(parts) >= 3 else None  # DataOwner, Study, Assay, etc.
                else:
                    # Fallback for legacy format (just "MIHCSME")
                    sheet_name = parts[-1] if "/" in ns else ns
                    group_name = None

                # Get key-value pairs from MapAnnotation
                if hasattr(ann, "getValue"):
                    kv_pairs = {}
                    for key, value in ann.getValue():
                        kv_pairs[key] = value

                    if kv_pairs:
                        # Initialize sheet if not present
                        if sheet_name not in result:
                            result[sheet_name] = {}

                        # If we have a group name, organize under that group
                        if group_name:
                            if group_name not in result[sheet_name]:
                                result[sheet_name][group_name] = {}
                            result[sheet_name][group_name].update(kv_pairs)
                        else:
                            # No group name, store directly under sheet
                            result[sheet_name].update(kv_pairs)

        return result

    # 1. Get object-level metadata (Investigation, Study, Assay)
    object_annotations = get_annotations_as_dict(target_obj, namespace)

    # The annotations are now already organized by sheet and group
    # e.g., {"InvestigationInformation": {"DataOwner": {...}, "InvestigationInfo": {...}}}
    if SHEET_INVESTIGATION in object_annotations:
        metadata_dict["InvestigationInformation"] = object_annotations[SHEET_INVESTIGATION]

    if SHEET_STUDY in object_annotations:
        metadata_dict["StudyInformation"] = object_annotations[SHEET_STUDY]

    if SHEET_ASSAY in object_annotations:
        metadata_dict["AssayInformation"] = object_annotations[SHEET_ASSAY]

    # 2. Get well-level metadata (AssayConditions)
    assay_conditions = []

    if target_type == "Screen":
        # Iterate through all plates in the screen
        for plate in target_obj.listChildren():
            plate_name = plate.getName()
            for well in plate.listChildren():
                well_data = _get_well_metadata(well, namespace, plate_name)
                if well_data:
                    assay_conditions.append(well_data)

    elif target_type == "Plate":
        plate_name = target_obj.getName()
        for well in target_obj.listChildren():
            well_data = _get_well_metadata(well, namespace, plate_name)
            if well_data:
                assay_conditions.append(well_data)

    if assay_conditions:
        metadata_dict["AssayConditions"] = assay_conditions

    logger.info(f"Downloaded {len(assay_conditions)} well metadata entries")

    # Convert to MIHCSMEMetadata using from_omero_dict
    metadata = MIHCSMEMetadata.from_omero_dict(metadata_dict)

    return metadata

upload_metadata_to_omero(conn: BlitzGateway, metadata: MIHCSMEMetadata, target_type: Literal['Screen', 'Plate'], target_id: int, namespace: str = DEFAULT_NS_BASE, replace: bool = False) -> Dict[str, Any] #

Upload MIHCSME metadata to OMERO from a Pydantic model.

Parameters:

Name Type Description Default
conn BlitzGateway

Active OMERO connection (BlitzGateway)

required
metadata MIHCSMEMetadata

MIHCSMEMetadata Pydantic model instance

required
target_type Literal['Screen', 'Plate']

Type of target object ("Screen" or "Plate")

required
target_id int

ID of the target OMERO object

required
namespace str

Base namespace for annotations (default: "MIHCSME")

DEFAULT_NS_BASE
replace bool

If True, remove existing annotations before uploading

False

Returns:

Type Description
Dict[str, Any]

Dictionary with upload summary: - status: 'success', 'partial_success', or 'error' - message: Human-readable message - wells_processed: Number of wells processed - wells_succeeded: Number of wells successfully annotated - wells_failed: Number of wells that failed - removed_annotations: Number of annotations removed (if replace=True)

Raises:

Type Description
ValueError

If target_type is not 'Screen' or 'Plate'

Source code in src/mihcsme_py/uploader.py
def upload_metadata_to_omero(
    conn: BlitzGateway,
    metadata: MIHCSMEMetadata,
    target_type: Literal["Screen", "Plate"],
    target_id: int,
    namespace: str = DEFAULT_NS_BASE,
    replace: bool = False,
) -> Dict[str, Any]:
    """
    Upload MIHCSME metadata to OMERO from a Pydantic model.

    Args:
        conn: Active OMERO connection (BlitzGateway)
        metadata: MIHCSMEMetadata Pydantic model instance
        target_type: Type of target object ("Screen" or "Plate")
        target_id: ID of the target OMERO object
        namespace: Base namespace for annotations (default: "MIHCSME")
        replace: If True, remove existing annotations before uploading

    Returns:
        Dictionary with upload summary:
            - status: 'success', 'partial_success', or 'error'
            - message: Human-readable message
            - wells_processed: Number of wells processed
            - wells_succeeded: Number of wells successfully annotated
            - wells_failed: Number of wells that failed
            - removed_annotations: Number of annotations removed (if replace=True)

    Raises:
        ValueError: If target_type is not 'Screen' or 'Plate'
    """
    summary = {
        "status": "error",
        "message": "Initialization failed",
        "target_type": target_type,
        "target_id": target_id,
        "wells_processed": 0,
        "wells_succeeded": 0,
        "wells_failed": 0,
        "removed_annotations": 0,
    }

    if target_type not in ["Screen", "Plate"]:
        summary["message"] = (
            f"This function only supports 'Screen' or 'Plate' as target object types, "
            f"not '{target_type}'."
        )
        logger.error(summary["message"])
        return summary

    processed_ok = True

    try:
        # If replace=True, remove existing annotations first
        if replace:
            logger.info(f"Replacing existing metadata for {target_type} {target_id}...")
            removal_count = _remove_metadata_recursive(conn, target_type, target_id, namespace)
            summary["removed_annotations"] = removal_count
            logger.info(f"Removed {removal_count} existing annotations")

        # 1. Apply Object-Level Metadata (Screen or Plate level)
        logger.info(f"\n{'=' * 80}")
        logger.info(f"UPLOADING METADATA TO {target_type} (ID: {target_id})")
        logger.info(f"{'=' * 80}")

        # Apply Investigation Information
        if metadata.investigation_information:
            logger.info(f"\n[1/4] Uploading Investigation Information...")
            num_groups = len(metadata.investigation_information.groups)
            logger.info(f"  → {num_groups} group(s) to upload")
            processed_ok &= _apply_grouped_metadata(
                conn,
                target_type,
                target_id,
                metadata.investigation_information.groups,
                f"{namespace}/{SHEET_INVESTIGATION}",
            )
        else:
            logger.info(f"\n[1/4] No Investigation Information to upload")

        # Apply Study Information
        if metadata.study_information:
            logger.info(f"\n[2/4] Uploading Study Information...")
            num_groups = len(metadata.study_information.groups)
            logger.info(f"  → {num_groups} group(s) to upload")
            processed_ok &= _apply_grouped_metadata(
                conn,
                target_type,
                target_id,
                metadata.study_information.groups,
                f"{namespace}/{SHEET_STUDY}",
            )
        else:
            logger.info(f"\n[2/4] No Study Information to upload")

        # Apply Assay Information
        if metadata.assay_information:
            logger.info(f"\n[3/4] Uploading Assay Information...")
            num_groups = len(metadata.assay_information.groups)
            logger.info(f"  → {num_groups} group(s) to upload")
            processed_ok &= _apply_grouped_metadata(
                conn,
                target_type,
                target_id,
                metadata.assay_information.groups,
                f"{namespace}/{SHEET_ASSAY}",
            )
        else:
            logger.info(f"\n[3/4] No Assay Information to upload")

        # 2. Apply Well-Level Metadata
        logger.info(f"\n[4/4] Uploading Well-Level Metadata (AssayConditions)...")

        if not metadata.assay_conditions:
            logger.info("  → No assay conditions to upload")
        else:
            logger.info(f"  → {len(metadata.assay_conditions)} well condition(s) to upload")
            # Convert to DataFrame for easier processing using to_dict() helper
            conditions_data = [condition.to_dict() for condition in metadata.assay_conditions]

            assay_conditions_df = pd.DataFrame(conditions_data)
            ns_conditions = f"{namespace}/{SHEET_CONDITIONS}"

            # Get plates to process
            plates = _get_plates_to_process(conn, target_type, target_id)

            if not plates:
                logger.warning(f"No plates found for {target_type} ID {target_id}")
            else:
                logger.info(f"Found {len(plates)} plate(s) to process")
                total_well_success = 0
                total_well_fail = 0

                for plate in plates:
                    plate_id = plate.getId()
                    plate_identifier = plate.getName()
                    logger.debug(f"Processing Plate ID: {plate_id}, Name: '{plate_identifier}'")

                    s, f = _apply_assay_conditions_to_wells(
                        conn, plate_id, plate_identifier, assay_conditions_df, ns_conditions
                    )
                    total_well_success += s
                    total_well_fail += f

                summary["wells_succeeded"] = total_well_success
                summary["wells_failed"] = total_well_fail
                summary["wells_processed"] = total_well_success + total_well_fail
                logger.info(
                    f"Well metadata summary: Processed={summary['wells_processed']}, "
                    f"Success={total_well_success}, Failures={total_well_fail}"
                )

        # Determine final status
        logger.info(f"\n{'=' * 80}")
        logger.info("UPLOAD SUMMARY")
        logger.info(f"{'=' * 80}")

        if processed_ok:
            summary["status"] = "success"
            if replace:
                summary["message"] = (
                    f"Metadata replaced successfully: removed {summary['removed_annotations']} "
                    f"old annotations, applied new metadata."
                )
            else:
                summary["message"] = "Annotations applied successfully."
        else:
            summary["status"] = "partial_success"
            summary["message"] = (
                f"Some {target_type.lower()}-level annotations may have failed (check logs)."
            )

        # Log summary details
        if replace:
            logger.info(f"Mode: REPLACE (removed {summary['removed_annotations']} old annotations)")
        else:
            logger.info(f"Mode: APPEND (kept existing annotations)")

        logger.info(f"Target: {target_type} ID {target_id}")
        logger.info(f"Wells processed: {summary['wells_processed']}")
        logger.info(f"Wells succeeded: {summary['wells_succeeded']}")
        logger.info(f"Wells failed: {summary['wells_failed']}")
        logger.info(f"Status: {summary['status'].upper()}")
        logger.info(f"{'=' * 80}\n")

        logger.info(f"Annotation process finished for {target_type} {target_id}")

    except Exception as e:
        summary["message"] = f"An unexpected error occurred during annotation: {e}"
        logger.error(summary["message"], exc_info=True)
        summary["status"] = "error"

    return summary