Skip to content

PetClassManager

Experimental Feature

This component is part of the experimental Remote Injection system. Use with caution in production environments.

labchain.container.persistent.pet_class_manager.PetClassManager

Manager for persistent class storage with version tracking.

This class handles the serialization, storage, and retrieval of plugin classes using cloudpickle and a hash-based versioning system. Each class version is stored immutably by its content hash, with a 'latest' pointer for convenience.

Key Features:

- Hash-based version tracking using SHA-256 of source code
- Immutable storage of class versions
- 'latest' pointer management for development workflow
- Status checking (synced, untracked, out_of_sync)
- Push/pull operations for class synchronization

Storage Structure:

    plugins/
    ā”œā”€ā”€ ClassName/
    │   ā”œā”€ā”€ abc123...pkl  # Immutable class binary by hash
    │   ā”œā”€ā”€ def456...pkl  # Another version
    │   └── latest.json   # Pointer to current version
    └── AnotherClass/
        └── ...

Usage:

    from labchain.storage import LocalStorage
    from labchain.container.persistent import PetClassManager

    storage = LocalStorage("./storage")
    manager = PetClassManager(storage)

    # Check status
    status = manager.check_status(MyFilter)

    # Push class
    manager.push(MyFilter)

    # Pull latest
    cls = manager.pull("MyFilter")

    # Pull specific version
    cls = manager.pull("MyFilter", code_hash="abc123...")

Attributes:

Name Type Description
storage BaseStorage

Storage backend for class persistence.

Methods:

Name Description
_get_remote_latest_meta

str) -> Optional[Dict[str, str]]: Retrieve the 'latest.json' manifest from storage.

check_status

Type[TypePlugable]) -> str: Compare local vs remote versions.

get_class_hash

Type[TypePlugable]) -> str: Generate SHA-256 hash of class source code.

persist_class

Type[TypePlugable]) -> str: Serialize and upload class if it doesn't exist.

push

Type[TypePlugable]) -> None: Push local version and mark as 'latest' in storage.

pull

str, code_hash: Optional[str] = None) -> Type[TypePlugable]: Fetch a specific version or 'latest' from storage.

recover_class

str, code_hash: str) -> Type[TypePlugable]: Download and reconstruct class from its hash path.

Note

This class is designed to work with the PetFactory and PetContainer to provide seamless class persistence across different environments.

Source code in labchain/container/persistent/pet_class_manager.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
class PetClassManager:
    """
    Manager for persistent class storage with version tracking.

    This class handles the serialization, storage, and retrieval of plugin classes
    using cloudpickle and a hash-based versioning system. Each class version is
    stored immutably by its content hash, with a 'latest' pointer for convenience.

    Key Features:

        - Hash-based version tracking using SHA-256 of source code
        - Immutable storage of class versions
        - 'latest' pointer management for development workflow
        - Status checking (synced, untracked, out_of_sync)
        - Push/pull operations for class synchronization

    Storage Structure:
    ```
        plugins/
        ā”œā”€ā”€ ClassName/
        │   ā”œā”€ā”€ abc123...pkl  # Immutable class binary by hash
        │   ā”œā”€ā”€ def456...pkl  # Another version
        │   └── latest.json   # Pointer to current version
        └── AnotherClass/
            └── ...
    ```

    Usage:
    ```python
        from labchain.storage import LocalStorage
        from labchain.container.persistent import PetClassManager

        storage = LocalStorage("./storage")
        manager = PetClassManager(storage)

        # Check status
        status = manager.check_status(MyFilter)

        # Push class
        manager.push(MyFilter)

        # Pull latest
        cls = manager.pull("MyFilter")

        # Pull specific version
        cls = manager.pull("MyFilter", code_hash="abc123...")
    ```

    Attributes:
        storage (BaseStorage): Storage backend for class persistence.

    Methods:
        _get_remote_latest_meta(class_name: str) -> Optional[Dict[str, str]]:
            Retrieve the 'latest.json' manifest from storage.

        check_status(class_obj: Type[TypePlugable]) -> str:
            Compare local vs remote versions.

        get_class_hash(class_obj: Type[TypePlugable]) -> str:
            Generate SHA-256 hash of class source code.

        persist_class(class_obj: Type[TypePlugable]) -> str:
            Serialize and upload class if it doesn't exist.

        push(class_obj: Type[TypePlugable]) -> None:
            Push local version and mark as 'latest' in storage.

        pull(class_name: str, code_hash: Optional[str] = None) -> Type[TypePlugable]:
            Fetch a specific version or 'latest' from storage.

        recover_class(class_name: str, code_hash: str) -> Type[TypePlugable]:
            Download and reconstruct class from its hash path.

    Note:
        This class is designed to work with the PetFactory and PetContainer
        to provide seamless class persistence across different environments.
    """

    def __init__(self, storage: BaseStorage):
        """
        Initialize the PetClassManager with a storage backend.

        Args:
            storage (BaseStorage): Storage backend to use for class persistence.

        Example:
        ```python
            from labchain.storage import S3Storage

            storage = S3Storage(bucket="my-ml-models")
            manager = PetClassManager(storage)
        ```
        """
        self.storage = storage

    def _get_remote_latest_meta(self, class_name: str) -> Optional[Dict[str, str]]:
        """
        Retrieve the 'latest.json' manifest from storage for a specific class.

        This internal method fetches the metadata that points to the current
        'latest' version of a class in storage.

        Args:
            class_name (str): Name of the class to look up.

        Returns:
            Optional[Dict[str, str]]: Dictionary with 'hash' and 'class_name' keys,
                                      or None if not found or error occurs.

        Note:
            This method handles different return types from storage.download_file
            (bytes, str, or dict) and normalizes them to a dict.
        """
        # Path WITHOUT "plugins/" prefix since context="plugins" adds it
        path = f"{class_name}/latest.json"

        if self.storage.check_if_exists(path, context="plugins"):
            try:
                data = self.storage.download_file(path, context="plugins")

                if isinstance(data, bytes):
                    return json.loads(data.decode("utf-8"))
                elif isinstance(data, str):
                    return json.loads(data)
                return data

            except Exception as e:
                rprint(
                    f"[red]Error reading remote metadata for {class_name}: {e}[/red]"
                )
                return None

        return None

    def check_status(self, class_obj: Type[TypePlugable]) -> str:
        """
        Compare local class version vs remote version.

        This method computes the hash of the local class and compares it
        with the hash stored in the remote 'latest' pointer.

        Args:
            class_obj (Type[TypePlugable]): The class to check.

        Returns:

            str: Status string - one of:

                - 'synced': Local and remote hashes match
                - 'out_of_sync': Local and remote hashes differ
                - 'untracked': No remote version exists

        Example:
        ```python
            status = manager.check_status(MyFilter)
            if status == 'out_of_sync':
                manager.push(MyFilter)
        ```
        """
        local_hash = self.get_class_hash(class_obj)
        remote_meta = self._get_remote_latest_meta(class_obj.__name__)

        if not remote_meta:
            return "untracked"
        if local_hash == remote_meta["hash"]:
            return "synced"

        return "out_of_sync"

    def get_class_hash(self, class_obj: Type[TypePlugable]) -> str:
        """
        Generate SHA-256 hash based on class source code.

        This method extracts the source code of the class using inspect.getsource
        and computes a SHA-256 hash. For built-in or dynamically created classes
        without accessible source, it falls back to hashing the module and qualified name.

        Args:
            class_obj (Type[TypePlugable]): The class to hash.

        Returns:
            str: Hex digest of the SHA-256 hash (64 characters).

        Example:
        ```python
            hash1 = manager.get_class_hash(MyFilter)
            # Modify MyFilter source code
            hash2 = manager.get_class_hash(MyFilter)
            assert hash1 != hash2  # Hashes differ after modification
        ```

        Note:
            The hash is deterministic - the same source code will always
            produce the same hash, enabling reliable version tracking.
        """
        try:
            h = hashlib.sha256()

            h.update(class_obj.__module__.encode())
            h.update(class_obj.__qualname__.encode())

            # 2. Base classes (order matters)
            for base in class_obj.__bases__:
                h.update(base.__module__.encode())
                h.update(base.__qualname__.encode())

            # 3. Methods defined in this class only
            for name, obj in sorted(class_obj.__dict__.items()):
                if isinstance(obj, types.FunctionType):
                    code = obj.__code__

                    h.update(name.encode())

                    # Core bytecode
                    h.update(code.co_code)

                    # Constants and names affect semantics
                    h.update(repr(code.co_consts).encode())
                    h.update(repr(code.co_names).encode())

                    # Signature (API-level change)
                    sig = inspect.signature(obj)
                    h.update(str(sig).encode())
            return h.hexdigest()

        except (TypeError, OSError):
            identifier = f"{class_obj.__module__}.{class_obj.__qualname__}"
            return hashlib.sha256(identifier.encode("utf-8")).hexdigest()

    def persist_class(self, class_obj: Type[TypePlugable]) -> str:
        """
        Serialize the class and upload to storage if it doesn't exist.

        This method computes the class hash, serializes it with cloudpickle,
        and uploads it to storage only if a class with that hash doesn't
        already exist (avoiding redundant uploads).

        Args:
            class_obj (Type[TypePlugable]): The class to persist.

        Returns:
            str: Hash of the persisted class.

        Example:
        ```python
            hash_value = manager.persist_class(MyFilter)
            print(f"Persisted MyFilter with hash: {hash_value}")
        ```

        Note:
            This method does not update the 'latest' pointer. Use push()
            for the full push workflow including pointer update.
        """

        code_hash = cast(BasePlugin, class_obj)._hash
        if code_hash is not None:
            path = f"{class_obj.__name__}/{code_hash}.pkl"

            if not self.storage.check_if_exists(path, context="plugins"):
                binary = cloudpickle.dumps(class_obj)
                self.storage.upload_file(binary, file_name=path, context="plugins")
            return code_hash
        else:
            raise ValueError("Class must have a hash attribute.")

    def push(self, class_obj: Type[TypePlugable]) -> None:
        """
        Push local version and mark it as 'latest' in storage.

        This method performs a complete push workflow:

        1. Computes the class hash
        2. Serializes and uploads the class binary (if not already present)
        3. Updates the 'latest.json' pointer to reference this version

        Args:
            class_obj (Type[TypePlugable]): The class to push.

        Returns:
            None

        Example:
        ```python
            # After modifying MyFilter locally
            manager.push(MyFilter)
            # Now remote 'latest' points to the new version
        ```

        Note:
            Pushing creates immutable snapshots. Old versions remain accessible
            by their hash, enabling rollback and version-specific reconstruction.
        """
        code_hash = cast(BasePlugin, class_obj)._hash
        class_name = class_obj.__name__

        # Path WITHOUT "plugins/" prefix since context="plugins" adds it
        path = f"{class_name}/{code_hash}.pkl"
        if not self.storage.check_if_exists(path, context="plugins"):
            self.storage.upload_file(class_obj, file_name=path, context="plugins")

        # 2. Update 'latest' development pointer
        manifest = {"hash": code_hash, "class_name": class_name}
        self.storage.upload_file(
            manifest,
            file_name=f"{class_name}/latest.json",
            context="plugins",
        )

    def pull(
        self, class_name: str, code_hash: Optional[str] = None
    ) -> Type[TypePlugable]:  # type: ignore
        """
        Fetch a specific version or 'latest' from storage.

        This method retrieves a class from storage. If no specific hash is
        provided, it follows the 'latest' pointer to get the current version.

        Args:
            class_name (str): Name of the class to fetch.
            code_hash (Optional[str]): Specific hash to fetch. If None, fetches 'latest'.

        Returns:
            Type[TypePlugable]: The reconstructed class object.

        Raises:
            ValueError: If no remote versions exist for the class.

        Example:
        ```python
            # Pull latest version
            MyFilter = manager.pull("MyFilter")

            # Pull specific version
            MyFilterV1 = manager.pull("MyFilter", code_hash="abc123...")
        ```

        Note:
            The returned class is fully functional and can be instantiated
            immediately. All methods and attributes are preserved.
        """
        target_hash = code_hash

        # If no specific hash requested, fetch latest pointer
        if not target_hash:
            remote_meta = self._get_remote_latest_meta(class_name)
            if not remote_meta:
                raise ValueError(f"No remote versions found for {class_name}")
            target_hash = remote_meta["hash"]

        # Recover the class using the final hash
        return self.recover_class(class_name, target_hash)

    @staticmethod
    def _rehydrate_class_globals(clz: Type) -> None:
        """
        Reinject module globals into all methods of a deserialized class.
        This is required for cloudpickle-loaded classes that reference
        external symbols (e.g. XYData, labchain, etc).
        """
        module_name = getattr(clz, "__module__", None)

        if not module_name:
            return

        module = sys.modules.get(module_name)
        if not module:
            return

        module_globals = module.__dict__

        for attr in clz.__dict__.values():
            if isinstance(attr, (types.FunctionType, types.MethodType)):
                attr.__globals__.update(module_globals)  # type: ignore

    def recover_class(self, class_name: str, code_hash: str) -> Type[TypePlugable]:  # type: ignore
        """
        Download and reconstruct class from its hash path.

        This method fetches the serialized class binary from storage using
        the class name and hash, then deserializes it with cloudpickle.

        Args:
            class_name (str): Name of the class.
            code_hash (str): Hash of the version to recover.

        Returns:
            Type[TypePlugable]: The reconstructed class object.

        Example:
        ```python
            # Recover a specific version directly
            MyFilterV1 = manager.recover_class("MyFilter", "abc123...")

            instance = MyFilterV1(param=42)
        ```

        Note:
            This is a lower-level method typically called by pull().
            Prefer using pull() for most use cases.
        """
        # Path WITHOUT "plugins/" prefix since context="plugins" adds it

        path = f"{class_name}/{code_hash}.pkl"
        class_obj = self.storage.download_file(path, context="plugins")
        if hasattr(class_obj.__init__, "__typeguard_original_function__"):
            # Restauramos la función original sin chequeo
            class_obj.__init__ = class_obj.__init__.__typeguard_original_function__

        PetClassManager._rehydrate_class_globals(class_obj)

        return class_obj

__init__(storage)

Initialize the PetClassManager with a storage backend.

Parameters:

Name Type Description Default
storage BaseStorage

Storage backend to use for class persistence.

required

Example:

    from labchain.storage import S3Storage

    storage = S3Storage(bucket="my-ml-models")
    manager = PetClassManager(storage)

Source code in labchain/container/persistent/pet_class_manager.py
def __init__(self, storage: BaseStorage):
    """
    Initialize the PetClassManager with a storage backend.

    Args:
        storage (BaseStorage): Storage backend to use for class persistence.

    Example:
    ```python
        from labchain.storage import S3Storage

        storage = S3Storage(bucket="my-ml-models")
        manager = PetClassManager(storage)
    ```
    """
    self.storage = storage

get_class_hash(class_obj)

Generate SHA-256 hash based on class source code.

This method extracts the source code of the class using inspect.getsource and computes a SHA-256 hash. For built-in or dynamically created classes without accessible source, it falls back to hashing the module and qualified name.

Parameters:

Name Type Description Default
class_obj Type[TypePlugable]

The class to hash.

required

Returns:

Name Type Description
str str

Hex digest of the SHA-256 hash (64 characters).

Example:

    hash1 = manager.get_class_hash(MyFilter)
    # Modify MyFilter source code
    hash2 = manager.get_class_hash(MyFilter)
    assert hash1 != hash2  # Hashes differ after modification

Note

The hash is deterministic - the same source code will always produce the same hash, enabling reliable version tracking.

Source code in labchain/container/persistent/pet_class_manager.py
def get_class_hash(self, class_obj: Type[TypePlugable]) -> str:
    """
    Generate SHA-256 hash based on class source code.

    This method extracts the source code of the class using inspect.getsource
    and computes a SHA-256 hash. For built-in or dynamically created classes
    without accessible source, it falls back to hashing the module and qualified name.

    Args:
        class_obj (Type[TypePlugable]): The class to hash.

    Returns:
        str: Hex digest of the SHA-256 hash (64 characters).

    Example:
    ```python
        hash1 = manager.get_class_hash(MyFilter)
        # Modify MyFilter source code
        hash2 = manager.get_class_hash(MyFilter)
        assert hash1 != hash2  # Hashes differ after modification
    ```

    Note:
        The hash is deterministic - the same source code will always
        produce the same hash, enabling reliable version tracking.
    """
    try:
        h = hashlib.sha256()

        h.update(class_obj.__module__.encode())
        h.update(class_obj.__qualname__.encode())

        # 2. Base classes (order matters)
        for base in class_obj.__bases__:
            h.update(base.__module__.encode())
            h.update(base.__qualname__.encode())

        # 3. Methods defined in this class only
        for name, obj in sorted(class_obj.__dict__.items()):
            if isinstance(obj, types.FunctionType):
                code = obj.__code__

                h.update(name.encode())

                # Core bytecode
                h.update(code.co_code)

                # Constants and names affect semantics
                h.update(repr(code.co_consts).encode())
                h.update(repr(code.co_names).encode())

                # Signature (API-level change)
                sig = inspect.signature(obj)
                h.update(str(sig).encode())
        return h.hexdigest()

    except (TypeError, OSError):
        identifier = f"{class_obj.__module__}.{class_obj.__qualname__}"
        return hashlib.sha256(identifier.encode("utf-8")).hexdigest()

check_status(class_obj)

Compare local class version vs remote version.

This method computes the hash of the local class and compares it with the hash stored in the remote 'latest' pointer.

Parameters:

Name Type Description Default
class_obj Type[TypePlugable]

The class to check.

required

Returns:

str: Status string - one of:

    - 'synced': Local and remote hashes match
    - 'out_of_sync': Local and remote hashes differ
    - 'untracked': No remote version exists

Example:

    status = manager.check_status(MyFilter)
    if status == 'out_of_sync':
        manager.push(MyFilter)

Source code in labchain/container/persistent/pet_class_manager.py
def check_status(self, class_obj: Type[TypePlugable]) -> str:
    """
    Compare local class version vs remote version.

    This method computes the hash of the local class and compares it
    with the hash stored in the remote 'latest' pointer.

    Args:
        class_obj (Type[TypePlugable]): The class to check.

    Returns:

        str: Status string - one of:

            - 'synced': Local and remote hashes match
            - 'out_of_sync': Local and remote hashes differ
            - 'untracked': No remote version exists

    Example:
    ```python
        status = manager.check_status(MyFilter)
        if status == 'out_of_sync':
            manager.push(MyFilter)
    ```
    """
    local_hash = self.get_class_hash(class_obj)
    remote_meta = self._get_remote_latest_meta(class_obj.__name__)

    if not remote_meta:
        return "untracked"
    if local_hash == remote_meta["hash"]:
        return "synced"

    return "out_of_sync"

persist_class(class_obj)

Serialize the class and upload to storage if it doesn't exist.

This method computes the class hash, serializes it with cloudpickle, and uploads it to storage only if a class with that hash doesn't already exist (avoiding redundant uploads).

Parameters:

Name Type Description Default
class_obj Type[TypePlugable]

The class to persist.

required

Returns:

Name Type Description
str str

Hash of the persisted class.

Example:

    hash_value = manager.persist_class(MyFilter)
    print(f"Persisted MyFilter with hash: {hash_value}")

Note

This method does not update the 'latest' pointer. Use push() for the full push workflow including pointer update.

Source code in labchain/container/persistent/pet_class_manager.py
def persist_class(self, class_obj: Type[TypePlugable]) -> str:
    """
    Serialize the class and upload to storage if it doesn't exist.

    This method computes the class hash, serializes it with cloudpickle,
    and uploads it to storage only if a class with that hash doesn't
    already exist (avoiding redundant uploads).

    Args:
        class_obj (Type[TypePlugable]): The class to persist.

    Returns:
        str: Hash of the persisted class.

    Example:
    ```python
        hash_value = manager.persist_class(MyFilter)
        print(f"Persisted MyFilter with hash: {hash_value}")
    ```

    Note:
        This method does not update the 'latest' pointer. Use push()
        for the full push workflow including pointer update.
    """

    code_hash = cast(BasePlugin, class_obj)._hash
    if code_hash is not None:
        path = f"{class_obj.__name__}/{code_hash}.pkl"

        if not self.storage.check_if_exists(path, context="plugins"):
            binary = cloudpickle.dumps(class_obj)
            self.storage.upload_file(binary, file_name=path, context="plugins")
        return code_hash
    else:
        raise ValueError("Class must have a hash attribute.")

push(class_obj)

Push local version and mark it as 'latest' in storage.

This method performs a complete push workflow:

  1. Computes the class hash
  2. Serializes and uploads the class binary (if not already present)
  3. Updates the 'latest.json' pointer to reference this version

Parameters:

Name Type Description Default
class_obj Type[TypePlugable]

The class to push.

required

Returns:

Type Description
None

None

Example:

    # After modifying MyFilter locally
    manager.push(MyFilter)
    # Now remote 'latest' points to the new version

Note

Pushing creates immutable snapshots. Old versions remain accessible by their hash, enabling rollback and version-specific reconstruction.

Source code in labchain/container/persistent/pet_class_manager.py
def push(self, class_obj: Type[TypePlugable]) -> None:
    """
    Push local version and mark it as 'latest' in storage.

    This method performs a complete push workflow:

    1. Computes the class hash
    2. Serializes and uploads the class binary (if not already present)
    3. Updates the 'latest.json' pointer to reference this version

    Args:
        class_obj (Type[TypePlugable]): The class to push.

    Returns:
        None

    Example:
    ```python
        # After modifying MyFilter locally
        manager.push(MyFilter)
        # Now remote 'latest' points to the new version
    ```

    Note:
        Pushing creates immutable snapshots. Old versions remain accessible
        by their hash, enabling rollback and version-specific reconstruction.
    """
    code_hash = cast(BasePlugin, class_obj)._hash
    class_name = class_obj.__name__

    # Path WITHOUT "plugins/" prefix since context="plugins" adds it
    path = f"{class_name}/{code_hash}.pkl"
    if not self.storage.check_if_exists(path, context="plugins"):
        self.storage.upload_file(class_obj, file_name=path, context="plugins")

    # 2. Update 'latest' development pointer
    manifest = {"hash": code_hash, "class_name": class_name}
    self.storage.upload_file(
        manifest,
        file_name=f"{class_name}/latest.json",
        context="plugins",
    )

pull(class_name, code_hash=None)

Fetch a specific version or 'latest' from storage.

This method retrieves a class from storage. If no specific hash is provided, it follows the 'latest' pointer to get the current version.

Parameters:

Name Type Description Default
class_name str

Name of the class to fetch.

required
code_hash Optional[str]

Specific hash to fetch. If None, fetches 'latest'.

None

Returns:

Type Description
Type[TypePlugable]

Type[TypePlugable]: The reconstructed class object.

Raises:

Type Description
ValueError

If no remote versions exist for the class.

Example:

    # Pull latest version
    MyFilter = manager.pull("MyFilter")

    # Pull specific version
    MyFilterV1 = manager.pull("MyFilter", code_hash="abc123...")

Note

The returned class is fully functional and can be instantiated immediately. All methods and attributes are preserved.

Source code in labchain/container/persistent/pet_class_manager.py
def pull(
    self, class_name: str, code_hash: Optional[str] = None
) -> Type[TypePlugable]:  # type: ignore
    """
    Fetch a specific version or 'latest' from storage.

    This method retrieves a class from storage. If no specific hash is
    provided, it follows the 'latest' pointer to get the current version.

    Args:
        class_name (str): Name of the class to fetch.
        code_hash (Optional[str]): Specific hash to fetch. If None, fetches 'latest'.

    Returns:
        Type[TypePlugable]: The reconstructed class object.

    Raises:
        ValueError: If no remote versions exist for the class.

    Example:
    ```python
        # Pull latest version
        MyFilter = manager.pull("MyFilter")

        # Pull specific version
        MyFilterV1 = manager.pull("MyFilter", code_hash="abc123...")
    ```

    Note:
        The returned class is fully functional and can be instantiated
        immediately. All methods and attributes are preserved.
    """
    target_hash = code_hash

    # If no specific hash requested, fetch latest pointer
    if not target_hash:
        remote_meta = self._get_remote_latest_meta(class_name)
        if not remote_meta:
            raise ValueError(f"No remote versions found for {class_name}")
        target_hash = remote_meta["hash"]

    # Recover the class using the final hash
    return self.recover_class(class_name, target_hash)

recover_class(class_name, code_hash)

Download and reconstruct class from its hash path.

This method fetches the serialized class binary from storage using the class name and hash, then deserializes it with cloudpickle.

Parameters:

Name Type Description Default
class_name str

Name of the class.

required
code_hash str

Hash of the version to recover.

required

Returns:

Type Description
Type[TypePlugable]

Type[TypePlugable]: The reconstructed class object.

Example:

    # Recover a specific version directly
    MyFilterV1 = manager.recover_class("MyFilter", "abc123...")

    instance = MyFilterV1(param=42)

Note

This is a lower-level method typically called by pull(). Prefer using pull() for most use cases.

Source code in labchain/container/persistent/pet_class_manager.py
def recover_class(self, class_name: str, code_hash: str) -> Type[TypePlugable]:  # type: ignore
    """
    Download and reconstruct class from its hash path.

    This method fetches the serialized class binary from storage using
    the class name and hash, then deserializes it with cloudpickle.

    Args:
        class_name (str): Name of the class.
        code_hash (str): Hash of the version to recover.

    Returns:
        Type[TypePlugable]: The reconstructed class object.

    Example:
    ```python
        # Recover a specific version directly
        MyFilterV1 = manager.recover_class("MyFilter", "abc123...")

        instance = MyFilterV1(param=42)
    ```

    Note:
        This is a lower-level method typically called by pull().
        Prefer using pull() for most use cases.
    """
    # Path WITHOUT "plugins/" prefix since context="plugins" adds it

    path = f"{class_name}/{code_hash}.pkl"
    class_obj = self.storage.download_file(path, context="plugins")
    if hasattr(class_obj.__init__, "__typeguard_original_function__"):
        # Restauramos la función original sin chequeo
        class_obj.__init__ = class_obj.__init__.__typeguard_original_function__

    PetClassManager._rehydrate_class_globals(class_obj)

    return class_obj

Overview

PetClassManager is the core component for managing persistent class storage with deterministic version tracking. It handles serialization, storage, and retrieval of plugin classes using CloudPickle and a hash-based versioning system.

Key Responsibilities

  • Hash Computation: Generates deterministic SHA-256 hashes from class bytecode
  • Serialization: Converts classes to binary format using CloudPickle
  • Storage Management: Handles upload/download of class binaries
  • Version Tracking: Maintains latest pointers and version metadata
  • Class Recovery: Deserializes and reconstructs classes from storage

Storage Structure

storage/
└── plugins/
    ā”œā”€ā”€ MyCustomFilter/
    │   ā”œā”€ā”€ abc123...pkl      # Immutable class binary (version 1)
    │   ā”œā”€ā”€ def456...pkl      # Immutable class binary (version 2)
    │   └── latest.json       # Pointer to current version
    ā”œā”€ā”€ AnotherFilter/
    │   ā”œā”€ā”€ 789xyz...pkl
    │   └── latest.json
    └── ...

Each class directory contains:

  • {hash}.pkl: Immutable serialized class binary, named by its content hash
  • latest.json: Development pointer with structure:
      {
        "class_name": "MyCustomFilter",
        "hash": "abc123..."
      }
    

Basic Usage

Initialize Manager

from labchain.storage import S3Storage
from labchain.container.persistent import PetClassManager

# Create storage backend
storage = S3Storage(bucket="my-ml-models", region="us-east-1")

# Initialize manager
manager = PetClassManager(storage)

Check Class Status

from labchain.base import BaseFilter

class MyFilter(BaseFilter):
    def predict(self, x):
        return x * 2

# Check if class is synced with storage
status = manager.check_status(MyFilter)
print(status)  # "untracked" | "synced" | "out_of_sync"

Push Class to Storage

# Compute hash and upload class
manager.push(MyFilter)

# Verify it's synced
status = manager.check_status(MyFilter)
print(status)  # "synced"

Pull Class from Storage

# Pull latest version
MyFilterClass = manager.pull("MyFilter")

# Pull specific version by hash
MyFilterV1 = manager.pull("MyFilter", code_hash="abc123...")

# Create instance
instance = MyFilterClass(param=42)

Advanced Usage

Version Tracking Workflow

from labchain.base import BaseFilter
from labchain.container.persistent import PetClassManager
from labchain.storage import LocalStorage

storage = LocalStorage("./model_storage")
manager = PetClassManager(storage)

# Version 1
class MyModel(BaseFilter):
    def predict(self, x):
        return x * 1

hash_v1 = manager.get_class_hash(MyModel)
manager.push(MyModel)
print(f"V1 hash: {hash_v1[:8]}...")

# Modify and create Version 2
class MyModel(BaseFilter):  # Same name, different implementation
    def predict(self, x):
        return x * 2

hash_v2 = manager.get_class_hash(MyModel)
manager.push(MyModel)
print(f"V2 hash: {hash_v2[:8]}...")

# Both versions are now in storage
assert hash_v1 != hash_v2

# Pull specific versions
ModelV1 = manager.pull("MyModel", code_hash=hash_v1)
ModelV2 = manager.pull("MyModel", code_hash=hash_v2)

# Test both versions work correctly
import numpy as np
from labchain.base import XYData

test_data = XYData.mock(np.array([5]))
print(ModelV1().predict(test_data).value)  # [5]
print(ModelV2().predict(test_data).value)  # [10]

Checking Remote Metadata

# Get metadata for latest version
meta = manager._get_remote_latest_meta("MyFilter")

if meta:
    print(f"Latest version: {meta['hash'][:8]}...")
    print(f"Class name: {meta['class_name']}")
else:
    print("No remote version found")

Recovery with Validation

def safe_recover(manager, class_name, code_hash):
    """Safely recover a class with validation."""
    try:
        # Recover class
        recovered = manager.recover_class(class_name, code_hash)

        # Validate it's a proper class
        assert isinstance(recovered, type), "Not a class type"

        # Validate it has required methods
        assert hasattr(recovered, 'predict'), "Missing predict method"

        print(f"āœ… Successfully recovered {class_name}")
        return recovered

    except Exception as e:
        print(f"āŒ Recovery failed: {e}")
        raise

# Usage
MyFilter = safe_recover(manager, "MyFilter", "abc123...")

Hash Computation Details

The get_class_hash() method computes a deterministic SHA-256 hash based on:

Components Included in Hash

  1. Module and Qualified Name

       h.update(class_obj.__module__.encode())
       h.update(class_obj.__qualname__.encode())
    

  2. Base Classes (order matters)

       for base in class_obj.__bases__:
           h.update(base.__module__.encode())
           h.update(base.__qualname__.encode())
    

  3. Method Bytecode

       for name, method in sorted(class_obj.__dict__.items()):
           if isinstance(method, types.FunctionType):
               code = method.__code__
               h.update(code.co_code)          # Bytecode
               h.update(repr(code.co_consts))  # Constants
               h.update(repr(code.co_names))   # Names
               h.update(str(inspect.signature(method)))  # Signature
    

Example: Hash Sensitivity

import hashlib

class V1(BaseFilter):
    def predict(self, x):
        return x * 1

hash1 = manager.get_class_hash(V1)

# Change implementation
class V1(BaseFilter):
    def predict(self, x):
        return x * 2

hash2 = manager.get_class_hash(V1)

assert hash1 != hash2  # Different bytecode = different hash

# Change only whitespace
class V1(BaseFilter):
    def predict(self, x):
        return   x   *   2  # Extra spaces

hash3 = manager.get_class_hash(V1)

# Bytecode is identical despite whitespace
# (Python compiles to same bytecode)
assert hash2 == hash3

Integration with Container

PetClassManager works seamlessly with the Container system:

from labchain import Container
from labchain.storage import S3Storage

# Configure storage
Container.storage = S3Storage(bucket="my-models")

# Container.pcm is automatically initialized
# No need to create PetClassManager manually

# Use via Container
@Container.bind(persist=True)
class MyFilter(BaseFilter):
    def predict(self, x):
        return x

# Check status via Container
status = Container.pcm.check_status(MyFilter)

# Push via Container
Container.ppif.push_all()  # Uses Container.pcm internally


Error Handling

Common Exceptions

from labchain.container.persistent import PetClassManager

manager = PetClassManager(storage)

# FileNotFoundError: Class not in storage
try:
    cls = manager.pull("NonExistentClass")
except ValueError as e:
    print(f"Class not found: {e}")

# TypeError: Invalid class object
try:
    manager.push("not_a_class")  # Wrong type
except (TypeError, AttributeError) as e:
    print(f"Invalid input: {e}")

# Connection errors
try:
    manager.push(MyFilter)
except Exception as e:
    print(f"Storage error: {e}")
    # Handle network issues, permissions, etc.

Validation Helper

def validate_and_push(manager, class_obj):
    """Push class with comprehensive validation."""

    # 1. Validate it's a class
    if not isinstance(class_obj, type):
        raise TypeError(f"{class_obj} is not a class")

    # 2. Check status
    status = manager.check_status(class_obj)
    if status == "synced":
        print(f"āœ“ {class_obj.__name__} already synced")
        return

    # 3. Get hash
    try:
        hash_value = manager.get_class_hash(class_obj)
        print(f"Hash: {hash_value[:8]}...")
    except Exception as e:
        raise ValueError(f"Could not compute hash: {e}")

    # 4. Push
    try:
        manager.push(class_obj)
        print(f"āœ“ Pushed {class_obj.__name__}")
    except Exception as e:
        raise RuntimeError(f"Push failed: {e}")

    # 5. Verify
    final_status = manager.check_status(class_obj)
    assert final_status == "synced", "Push verification failed"

# Usage
validate_and_push(manager, MyFilter)

Best Practices

1. Always Check Status Before Push

# āœ… Good: Check before pushing
status = manager.check_status(MyFilter)
if status != "synced":
    manager.push(MyFilter)
    print("Pushed new version")
else:
    print("Already synced, skipping")

# āŒ Bad: Blind push
manager.push(MyFilter)  # Redundant if already synced

2. Use Specific Versions in Production

# āœ… Good: Pin to specific hash
production_hash = "abc123def456..."
MyFilter = manager.pull("MyFilter", code_hash=production_hash)

# āŒ Bad: Use 'latest' in production
MyFilter = manager.pull("MyFilter")  # May change unexpectedly

3. Version Tagging

# Keep a mapping of semantic versions to hashes
VERSION_MANIFEST = {
    "MyFilter": {
        "v1.0.0": "abc123...",
        "v1.1.0": "def456...",
        "v2.0.0": "789xyz...",
    }
}

def get_version(class_name, version_tag):
    """Get class by semantic version."""
    hash_value = VERSION_MANIFEST[class_name][version_tag]
    return manager.pull(class_name, code_hash=hash_value)

# Usage
MyFilterV1 = get_version("MyFilter", "v1.0.0")

4. Monitor Hash Collisions (Extremely Rare)

import logging

logger = logging.getLogger(__name__)

def safe_push(manager, class_obj):
    """Push with collision detection."""
    new_hash = manager.get_class_hash(class_obj)

    # Check if hash already exists
    try:
        existing = manager.recover_class(class_obj.__name__, new_hash)

        # If we can recover it, hash collision or duplicate
        logger.warning(
            f"Hash {new_hash[:8]} already exists for {class_obj.__name__}. "
            f"Skipping push (likely duplicate)."
        )
        return

    except FileNotFoundError:
        # Hash doesn't exist, safe to push
        manager.push(class_obj)
        logger.info(f"Pushed {class_obj.__name__} with hash {new_hash[:8]}")

Performance Considerations

Hash Computation Cost

import time

class ComplexFilter(BaseFilter):
    # Many methods
    def method1(self, x): return x
    def method2(self, x): return x
    # ... 50+ methods

# Measure hash computation time
start = time.time()
hash_value = manager.get_class_hash(ComplexFilter)
duration = time.time() - start

print(f"Hash computed in {duration:.3f}s")
# Typically < 0.01s for most classes

Optimization Tips:

  • Hash computation is fast (< 10ms for typical classes)
  • Cached in PetFactory._version_control after first computation
  • Only recomputed if class is modified

Storage I/O

# Minimize round-trips
classes = [Filter1, Filter2, Filter3, Filter4, Filter5]

# āŒ Bad: Multiple individual pushes
for cls in classes:
    manager.push(cls)  # 5 network round-trips

# āœ… Better: Batch check status
for cls in classes:
    if manager.check_status(cls) != "synced":
        manager.push(cls)

# āœ… Best: Use PetFactory.push_all()
# (Handles batching internally)
from labchain.container.persistent import PetFactory
factory = PetFactory(manager, Container.pif)
for cls in classes:
    factory[cls.__name__] = cls
factory.push_all()

Troubleshooting

Issue: Hash keeps changing

Symptom: Same class produces different hashes on different runs.

Possible Causes:

  1. Python version mismatch - Bytecode differs between versions
  2. Dynamic class attributes - Attributes set outside __init__
  3. Non-deterministic imports - Conditional imports

Solution:

# āœ… Deterministic class definition
import numpy as np  # Module-level import

class MyFilter(BaseFilter):
    def __init__(self, threshold: float):
        super().__init__(threshold=threshold)  # Stable params
        self._cache = None  # Private, not hashed

    def predict(self, x):
        return np.array(x.value > self.threshold)  # Stable logic

Issue: Cannot recover class

Symptom: FileNotFoundError when calling pull() or recover_class().

Diagnostic Steps:

# 1. Verify storage connectivity
print(f"Storage: {manager.storage}")
print(f"Storage type: {type(manager.storage)}")

# 2. Check if metadata exists
meta = manager._get_remote_latest_meta("MyFilter")
print(f"Remote metadata: {meta}")

# 3. List files in storage (if LocalStorage)
if hasattr(manager.storage, 'storage_path'):
    import os
    path = f"{manager.storage.storage_path}/plugins/MyFilter"
    if os.path.exists(path):
        print(f"Files: {os.listdir(path)}")

# 4. Try explicit hash
if meta:
    try:
        cls = manager.recover_class("MyFilter", meta['hash'])
        print("āœ“ Recovery successful")
    except Exception as e:
        print(f"āœ— Recovery failed: {e}")


See Also


API Reference

Manager for persistent class storage with version tracking.

This class handles the serialization, storage, and retrieval of plugin classes using cloudpickle and a hash-based versioning system. Each class version is stored immutably by its content hash, with a 'latest' pointer for convenience.

Key Features:

- Hash-based version tracking using SHA-256 of source code
- Immutable storage of class versions
- 'latest' pointer management for development workflow
- Status checking (synced, untracked, out_of_sync)
- Push/pull operations for class synchronization

Storage Structure:

    plugins/
    ā”œā”€ā”€ ClassName/
    │   ā”œā”€ā”€ abc123...pkl  # Immutable class binary by hash
    │   ā”œā”€ā”€ def456...pkl  # Another version
    │   └── latest.json   # Pointer to current version
    └── AnotherClass/
        └── ...

Usage:

    from labchain.storage import LocalStorage
    from labchain.container.persistent import PetClassManager

    storage = LocalStorage("./storage")
    manager = PetClassManager(storage)

    # Check status
    status = manager.check_status(MyFilter)

    # Push class
    manager.push(MyFilter)

    # Pull latest
    cls = manager.pull("MyFilter")

    # Pull specific version
    cls = manager.pull("MyFilter", code_hash="abc123...")

Attributes:

Name Type Description
storage BaseStorage

Storage backend for class persistence.

Methods:

Name Description
_get_remote_latest_meta

str) -> Optional[Dict[str, str]]: Retrieve the 'latest.json' manifest from storage.

check_status

Type[TypePlugable]) -> str: Compare local vs remote versions.

get_class_hash

Type[TypePlugable]) -> str: Generate SHA-256 hash of class source code.

persist_class

Type[TypePlugable]) -> str: Serialize and upload class if it doesn't exist.

push

Type[TypePlugable]) -> None: Push local version and mark as 'latest' in storage.

pull

str, code_hash: Optional[str] = None) -> Type[TypePlugable]: Fetch a specific version or 'latest' from storage.

recover_class

str, code_hash: str) -> Type[TypePlugable]: Download and reconstruct class from its hash path.

Note

This class is designed to work with the PetFactory and PetContainer to provide seamless class persistence across different environments.

Source code in labchain/container/persistent/pet_class_manager.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
class PetClassManager:
    """
    Manager for persistent class storage with version tracking.

    This class handles the serialization, storage, and retrieval of plugin classes
    using cloudpickle and a hash-based versioning system. Each class version is
    stored immutably by its content hash, with a 'latest' pointer for convenience.

    Key Features:

        - Hash-based version tracking using SHA-256 of source code
        - Immutable storage of class versions
        - 'latest' pointer management for development workflow
        - Status checking (synced, untracked, out_of_sync)
        - Push/pull operations for class synchronization

    Storage Structure:
    ```
        plugins/
        ā”œā”€ā”€ ClassName/
        │   ā”œā”€ā”€ abc123...pkl  # Immutable class binary by hash
        │   ā”œā”€ā”€ def456...pkl  # Another version
        │   └── latest.json   # Pointer to current version
        └── AnotherClass/
            └── ...
    ```

    Usage:
    ```python
        from labchain.storage import LocalStorage
        from labchain.container.persistent import PetClassManager

        storage = LocalStorage("./storage")
        manager = PetClassManager(storage)

        # Check status
        status = manager.check_status(MyFilter)

        # Push class
        manager.push(MyFilter)

        # Pull latest
        cls = manager.pull("MyFilter")

        # Pull specific version
        cls = manager.pull("MyFilter", code_hash="abc123...")
    ```

    Attributes:
        storage (BaseStorage): Storage backend for class persistence.

    Methods:
        _get_remote_latest_meta(class_name: str) -> Optional[Dict[str, str]]:
            Retrieve the 'latest.json' manifest from storage.

        check_status(class_obj: Type[TypePlugable]) -> str:
            Compare local vs remote versions.

        get_class_hash(class_obj: Type[TypePlugable]) -> str:
            Generate SHA-256 hash of class source code.

        persist_class(class_obj: Type[TypePlugable]) -> str:
            Serialize and upload class if it doesn't exist.

        push(class_obj: Type[TypePlugable]) -> None:
            Push local version and mark as 'latest' in storage.

        pull(class_name: str, code_hash: Optional[str] = None) -> Type[TypePlugable]:
            Fetch a specific version or 'latest' from storage.

        recover_class(class_name: str, code_hash: str) -> Type[TypePlugable]:
            Download and reconstruct class from its hash path.

    Note:
        This class is designed to work with the PetFactory and PetContainer
        to provide seamless class persistence across different environments.
    """

    def __init__(self, storage: BaseStorage):
        """
        Initialize the PetClassManager with a storage backend.

        Args:
            storage (BaseStorage): Storage backend to use for class persistence.

        Example:
        ```python
            from labchain.storage import S3Storage

            storage = S3Storage(bucket="my-ml-models")
            manager = PetClassManager(storage)
        ```
        """
        self.storage = storage

    def _get_remote_latest_meta(self, class_name: str) -> Optional[Dict[str, str]]:
        """
        Retrieve the 'latest.json' manifest from storage for a specific class.

        This internal method fetches the metadata that points to the current
        'latest' version of a class in storage.

        Args:
            class_name (str): Name of the class to look up.

        Returns:
            Optional[Dict[str, str]]: Dictionary with 'hash' and 'class_name' keys,
                                      or None if not found or error occurs.

        Note:
            This method handles different return types from storage.download_file
            (bytes, str, or dict) and normalizes them to a dict.
        """
        # Path WITHOUT "plugins/" prefix since context="plugins" adds it
        path = f"{class_name}/latest.json"

        if self.storage.check_if_exists(path, context="plugins"):
            try:
                data = self.storage.download_file(path, context="plugins")

                if isinstance(data, bytes):
                    return json.loads(data.decode("utf-8"))
                elif isinstance(data, str):
                    return json.loads(data)
                return data

            except Exception as e:
                rprint(
                    f"[red]Error reading remote metadata for {class_name}: {e}[/red]"
                )
                return None

        return None

    def check_status(self, class_obj: Type[TypePlugable]) -> str:
        """
        Compare local class version vs remote version.

        This method computes the hash of the local class and compares it
        with the hash stored in the remote 'latest' pointer.

        Args:
            class_obj (Type[TypePlugable]): The class to check.

        Returns:

            str: Status string - one of:

                - 'synced': Local and remote hashes match
                - 'out_of_sync': Local and remote hashes differ
                - 'untracked': No remote version exists

        Example:
        ```python
            status = manager.check_status(MyFilter)
            if status == 'out_of_sync':
                manager.push(MyFilter)
        ```
        """
        local_hash = self.get_class_hash(class_obj)
        remote_meta = self._get_remote_latest_meta(class_obj.__name__)

        if not remote_meta:
            return "untracked"
        if local_hash == remote_meta["hash"]:
            return "synced"

        return "out_of_sync"

    def get_class_hash(self, class_obj: Type[TypePlugable]) -> str:
        """
        Generate SHA-256 hash based on class source code.

        This method extracts the source code of the class using inspect.getsource
        and computes a SHA-256 hash. For built-in or dynamically created classes
        without accessible source, it falls back to hashing the module and qualified name.

        Args:
            class_obj (Type[TypePlugable]): The class to hash.

        Returns:
            str: Hex digest of the SHA-256 hash (64 characters).

        Example:
        ```python
            hash1 = manager.get_class_hash(MyFilter)
            # Modify MyFilter source code
            hash2 = manager.get_class_hash(MyFilter)
            assert hash1 != hash2  # Hashes differ after modification
        ```

        Note:
            The hash is deterministic - the same source code will always
            produce the same hash, enabling reliable version tracking.
        """
        try:
            h = hashlib.sha256()

            h.update(class_obj.__module__.encode())
            h.update(class_obj.__qualname__.encode())

            # 2. Base classes (order matters)
            for base in class_obj.__bases__:
                h.update(base.__module__.encode())
                h.update(base.__qualname__.encode())

            # 3. Methods defined in this class only
            for name, obj in sorted(class_obj.__dict__.items()):
                if isinstance(obj, types.FunctionType):
                    code = obj.__code__

                    h.update(name.encode())

                    # Core bytecode
                    h.update(code.co_code)

                    # Constants and names affect semantics
                    h.update(repr(code.co_consts).encode())
                    h.update(repr(code.co_names).encode())

                    # Signature (API-level change)
                    sig = inspect.signature(obj)
                    h.update(str(sig).encode())
            return h.hexdigest()

        except (TypeError, OSError):
            identifier = f"{class_obj.__module__}.{class_obj.__qualname__}"
            return hashlib.sha256(identifier.encode("utf-8")).hexdigest()

    def persist_class(self, class_obj: Type[TypePlugable]) -> str:
        """
        Serialize the class and upload to storage if it doesn't exist.

        This method computes the class hash, serializes it with cloudpickle,
        and uploads it to storage only if a class with that hash doesn't
        already exist (avoiding redundant uploads).

        Args:
            class_obj (Type[TypePlugable]): The class to persist.

        Returns:
            str: Hash of the persisted class.

        Example:
        ```python
            hash_value = manager.persist_class(MyFilter)
            print(f"Persisted MyFilter with hash: {hash_value}")
        ```

        Note:
            This method does not update the 'latest' pointer. Use push()
            for the full push workflow including pointer update.
        """

        code_hash = cast(BasePlugin, class_obj)._hash
        if code_hash is not None:
            path = f"{class_obj.__name__}/{code_hash}.pkl"

            if not self.storage.check_if_exists(path, context="plugins"):
                binary = cloudpickle.dumps(class_obj)
                self.storage.upload_file(binary, file_name=path, context="plugins")
            return code_hash
        else:
            raise ValueError("Class must have a hash attribute.")

    def push(self, class_obj: Type[TypePlugable]) -> None:
        """
        Push local version and mark it as 'latest' in storage.

        This method performs a complete push workflow:

        1. Computes the class hash
        2. Serializes and uploads the class binary (if not already present)
        3. Updates the 'latest.json' pointer to reference this version

        Args:
            class_obj (Type[TypePlugable]): The class to push.

        Returns:
            None

        Example:
        ```python
            # After modifying MyFilter locally
            manager.push(MyFilter)
            # Now remote 'latest' points to the new version
        ```

        Note:
            Pushing creates immutable snapshots. Old versions remain accessible
            by their hash, enabling rollback and version-specific reconstruction.
        """
        code_hash = cast(BasePlugin, class_obj)._hash
        class_name = class_obj.__name__

        # Path WITHOUT "plugins/" prefix since context="plugins" adds it
        path = f"{class_name}/{code_hash}.pkl"
        if not self.storage.check_if_exists(path, context="plugins"):
            self.storage.upload_file(class_obj, file_name=path, context="plugins")

        # 2. Update 'latest' development pointer
        manifest = {"hash": code_hash, "class_name": class_name}
        self.storage.upload_file(
            manifest,
            file_name=f"{class_name}/latest.json",
            context="plugins",
        )

    def pull(
        self, class_name: str, code_hash: Optional[str] = None
    ) -> Type[TypePlugable]:  # type: ignore
        """
        Fetch a specific version or 'latest' from storage.

        This method retrieves a class from storage. If no specific hash is
        provided, it follows the 'latest' pointer to get the current version.

        Args:
            class_name (str): Name of the class to fetch.
            code_hash (Optional[str]): Specific hash to fetch. If None, fetches 'latest'.

        Returns:
            Type[TypePlugable]: The reconstructed class object.

        Raises:
            ValueError: If no remote versions exist for the class.

        Example:
        ```python
            # Pull latest version
            MyFilter = manager.pull("MyFilter")

            # Pull specific version
            MyFilterV1 = manager.pull("MyFilter", code_hash="abc123...")
        ```

        Note:
            The returned class is fully functional and can be instantiated
            immediately. All methods and attributes are preserved.
        """
        target_hash = code_hash

        # If no specific hash requested, fetch latest pointer
        if not target_hash:
            remote_meta = self._get_remote_latest_meta(class_name)
            if not remote_meta:
                raise ValueError(f"No remote versions found for {class_name}")
            target_hash = remote_meta["hash"]

        # Recover the class using the final hash
        return self.recover_class(class_name, target_hash)

    @staticmethod
    def _rehydrate_class_globals(clz: Type) -> None:
        """
        Reinject module globals into all methods of a deserialized class.
        This is required for cloudpickle-loaded classes that reference
        external symbols (e.g. XYData, labchain, etc).
        """
        module_name = getattr(clz, "__module__", None)

        if not module_name:
            return

        module = sys.modules.get(module_name)
        if not module:
            return

        module_globals = module.__dict__

        for attr in clz.__dict__.values():
            if isinstance(attr, (types.FunctionType, types.MethodType)):
                attr.__globals__.update(module_globals)  # type: ignore

    def recover_class(self, class_name: str, code_hash: str) -> Type[TypePlugable]:  # type: ignore
        """
        Download and reconstruct class from its hash path.

        This method fetches the serialized class binary from storage using
        the class name and hash, then deserializes it with cloudpickle.

        Args:
            class_name (str): Name of the class.
            code_hash (str): Hash of the version to recover.

        Returns:
            Type[TypePlugable]: The reconstructed class object.

        Example:
        ```python
            # Recover a specific version directly
            MyFilterV1 = manager.recover_class("MyFilter", "abc123...")

            instance = MyFilterV1(param=42)
        ```

        Note:
            This is a lower-level method typically called by pull().
            Prefer using pull() for most use cases.
        """
        # Path WITHOUT "plugins/" prefix since context="plugins" adds it

        path = f"{class_name}/{code_hash}.pkl"
        class_obj = self.storage.download_file(path, context="plugins")
        if hasattr(class_obj.__init__, "__typeguard_original_function__"):
            # Restauramos la función original sin chequeo
            class_obj.__init__ = class_obj.__init__.__typeguard_original_function__

        PetClassManager._rehydrate_class_globals(class_obj)

        return class_obj

storage = storage instance-attribute

__init__(storage)

Initialize the PetClassManager with a storage backend.

Parameters:

Name Type Description Default
storage BaseStorage

Storage backend to use for class persistence.

required

Example:

    from labchain.storage import S3Storage

    storage = S3Storage(bucket="my-ml-models")
    manager = PetClassManager(storage)

Source code in labchain/container/persistent/pet_class_manager.py
def __init__(self, storage: BaseStorage):
    """
    Initialize the PetClassManager with a storage backend.

    Args:
        storage (BaseStorage): Storage backend to use for class persistence.

    Example:
    ```python
        from labchain.storage import S3Storage

        storage = S3Storage(bucket="my-ml-models")
        manager = PetClassManager(storage)
    ```
    """
    self.storage = storage

_get_remote_latest_meta(class_name)

Retrieve the 'latest.json' manifest from storage for a specific class.

This internal method fetches the metadata that points to the current 'latest' version of a class in storage.

Parameters:

Name Type Description Default
class_name str

Name of the class to look up.

required

Returns:

Type Description
Optional[Dict[str, str]]

Optional[Dict[str, str]]: Dictionary with 'hash' and 'class_name' keys, or None if not found or error occurs.

Note

This method handles different return types from storage.download_file (bytes, str, or dict) and normalizes them to a dict.

Source code in labchain/container/persistent/pet_class_manager.py
def _get_remote_latest_meta(self, class_name: str) -> Optional[Dict[str, str]]:
    """
    Retrieve the 'latest.json' manifest from storage for a specific class.

    This internal method fetches the metadata that points to the current
    'latest' version of a class in storage.

    Args:
        class_name (str): Name of the class to look up.

    Returns:
        Optional[Dict[str, str]]: Dictionary with 'hash' and 'class_name' keys,
                                  or None if not found or error occurs.

    Note:
        This method handles different return types from storage.download_file
        (bytes, str, or dict) and normalizes them to a dict.
    """
    # Path WITHOUT "plugins/" prefix since context="plugins" adds it
    path = f"{class_name}/latest.json"

    if self.storage.check_if_exists(path, context="plugins"):
        try:
            data = self.storage.download_file(path, context="plugins")

            if isinstance(data, bytes):
                return json.loads(data.decode("utf-8"))
            elif isinstance(data, str):
                return json.loads(data)
            return data

        except Exception as e:
            rprint(
                f"[red]Error reading remote metadata for {class_name}: {e}[/red]"
            )
            return None

    return None

_rehydrate_class_globals(clz) staticmethod

Reinject module globals into all methods of a deserialized class. This is required for cloudpickle-loaded classes that reference external symbols (e.g. XYData, labchain, etc).

Source code in labchain/container/persistent/pet_class_manager.py
@staticmethod
def _rehydrate_class_globals(clz: Type) -> None:
    """
    Reinject module globals into all methods of a deserialized class.
    This is required for cloudpickle-loaded classes that reference
    external symbols (e.g. XYData, labchain, etc).
    """
    module_name = getattr(clz, "__module__", None)

    if not module_name:
        return

    module = sys.modules.get(module_name)
    if not module:
        return

    module_globals = module.__dict__

    for attr in clz.__dict__.values():
        if isinstance(attr, (types.FunctionType, types.MethodType)):
            attr.__globals__.update(module_globals)  # type: ignore

check_status(class_obj)

Compare local class version vs remote version.

This method computes the hash of the local class and compares it with the hash stored in the remote 'latest' pointer.

Parameters:

Name Type Description Default
class_obj Type[TypePlugable]

The class to check.

required

Returns:

str: Status string - one of:

    - 'synced': Local and remote hashes match
    - 'out_of_sync': Local and remote hashes differ
    - 'untracked': No remote version exists

Example:

    status = manager.check_status(MyFilter)
    if status == 'out_of_sync':
        manager.push(MyFilter)

Source code in labchain/container/persistent/pet_class_manager.py
def check_status(self, class_obj: Type[TypePlugable]) -> str:
    """
    Compare local class version vs remote version.

    This method computes the hash of the local class and compares it
    with the hash stored in the remote 'latest' pointer.

    Args:
        class_obj (Type[TypePlugable]): The class to check.

    Returns:

        str: Status string - one of:

            - 'synced': Local and remote hashes match
            - 'out_of_sync': Local and remote hashes differ
            - 'untracked': No remote version exists

    Example:
    ```python
        status = manager.check_status(MyFilter)
        if status == 'out_of_sync':
            manager.push(MyFilter)
    ```
    """
    local_hash = self.get_class_hash(class_obj)
    remote_meta = self._get_remote_latest_meta(class_obj.__name__)

    if not remote_meta:
        return "untracked"
    if local_hash == remote_meta["hash"]:
        return "synced"

    return "out_of_sync"

get_class_hash(class_obj)

Generate SHA-256 hash based on class source code.

This method extracts the source code of the class using inspect.getsource and computes a SHA-256 hash. For built-in or dynamically created classes without accessible source, it falls back to hashing the module and qualified name.

Parameters:

Name Type Description Default
class_obj Type[TypePlugable]

The class to hash.

required

Returns:

Name Type Description
str str

Hex digest of the SHA-256 hash (64 characters).

Example:

    hash1 = manager.get_class_hash(MyFilter)
    # Modify MyFilter source code
    hash2 = manager.get_class_hash(MyFilter)
    assert hash1 != hash2  # Hashes differ after modification

Note

The hash is deterministic - the same source code will always produce the same hash, enabling reliable version tracking.

Source code in labchain/container/persistent/pet_class_manager.py
def get_class_hash(self, class_obj: Type[TypePlugable]) -> str:
    """
    Generate SHA-256 hash based on class source code.

    This method extracts the source code of the class using inspect.getsource
    and computes a SHA-256 hash. For built-in or dynamically created classes
    without accessible source, it falls back to hashing the module and qualified name.

    Args:
        class_obj (Type[TypePlugable]): The class to hash.

    Returns:
        str: Hex digest of the SHA-256 hash (64 characters).

    Example:
    ```python
        hash1 = manager.get_class_hash(MyFilter)
        # Modify MyFilter source code
        hash2 = manager.get_class_hash(MyFilter)
        assert hash1 != hash2  # Hashes differ after modification
    ```

    Note:
        The hash is deterministic - the same source code will always
        produce the same hash, enabling reliable version tracking.
    """
    try:
        h = hashlib.sha256()

        h.update(class_obj.__module__.encode())
        h.update(class_obj.__qualname__.encode())

        # 2. Base classes (order matters)
        for base in class_obj.__bases__:
            h.update(base.__module__.encode())
            h.update(base.__qualname__.encode())

        # 3. Methods defined in this class only
        for name, obj in sorted(class_obj.__dict__.items()):
            if isinstance(obj, types.FunctionType):
                code = obj.__code__

                h.update(name.encode())

                # Core bytecode
                h.update(code.co_code)

                # Constants and names affect semantics
                h.update(repr(code.co_consts).encode())
                h.update(repr(code.co_names).encode())

                # Signature (API-level change)
                sig = inspect.signature(obj)
                h.update(str(sig).encode())
        return h.hexdigest()

    except (TypeError, OSError):
        identifier = f"{class_obj.__module__}.{class_obj.__qualname__}"
        return hashlib.sha256(identifier.encode("utf-8")).hexdigest()

persist_class(class_obj)

Serialize the class and upload to storage if it doesn't exist.

This method computes the class hash, serializes it with cloudpickle, and uploads it to storage only if a class with that hash doesn't already exist (avoiding redundant uploads).

Parameters:

Name Type Description Default
class_obj Type[TypePlugable]

The class to persist.

required

Returns:

Name Type Description
str str

Hash of the persisted class.

Example:

    hash_value = manager.persist_class(MyFilter)
    print(f"Persisted MyFilter with hash: {hash_value}")

Note

This method does not update the 'latest' pointer. Use push() for the full push workflow including pointer update.

Source code in labchain/container/persistent/pet_class_manager.py
def persist_class(self, class_obj: Type[TypePlugable]) -> str:
    """
    Serialize the class and upload to storage if it doesn't exist.

    This method computes the class hash, serializes it with cloudpickle,
    and uploads it to storage only if a class with that hash doesn't
    already exist (avoiding redundant uploads).

    Args:
        class_obj (Type[TypePlugable]): The class to persist.

    Returns:
        str: Hash of the persisted class.

    Example:
    ```python
        hash_value = manager.persist_class(MyFilter)
        print(f"Persisted MyFilter with hash: {hash_value}")
    ```

    Note:
        This method does not update the 'latest' pointer. Use push()
        for the full push workflow including pointer update.
    """

    code_hash = cast(BasePlugin, class_obj)._hash
    if code_hash is not None:
        path = f"{class_obj.__name__}/{code_hash}.pkl"

        if not self.storage.check_if_exists(path, context="plugins"):
            binary = cloudpickle.dumps(class_obj)
            self.storage.upload_file(binary, file_name=path, context="plugins")
        return code_hash
    else:
        raise ValueError("Class must have a hash attribute.")

pull(class_name, code_hash=None)

Fetch a specific version or 'latest' from storage.

This method retrieves a class from storage. If no specific hash is provided, it follows the 'latest' pointer to get the current version.

Parameters:

Name Type Description Default
class_name str

Name of the class to fetch.

required
code_hash Optional[str]

Specific hash to fetch. If None, fetches 'latest'.

None

Returns:

Type Description
Type[TypePlugable]

Type[TypePlugable]: The reconstructed class object.

Raises:

Type Description
ValueError

If no remote versions exist for the class.

Example:

    # Pull latest version
    MyFilter = manager.pull("MyFilter")

    # Pull specific version
    MyFilterV1 = manager.pull("MyFilter", code_hash="abc123...")

Note

The returned class is fully functional and can be instantiated immediately. All methods and attributes are preserved.

Source code in labchain/container/persistent/pet_class_manager.py
def pull(
    self, class_name: str, code_hash: Optional[str] = None
) -> Type[TypePlugable]:  # type: ignore
    """
    Fetch a specific version or 'latest' from storage.

    This method retrieves a class from storage. If no specific hash is
    provided, it follows the 'latest' pointer to get the current version.

    Args:
        class_name (str): Name of the class to fetch.
        code_hash (Optional[str]): Specific hash to fetch. If None, fetches 'latest'.

    Returns:
        Type[TypePlugable]: The reconstructed class object.

    Raises:
        ValueError: If no remote versions exist for the class.

    Example:
    ```python
        # Pull latest version
        MyFilter = manager.pull("MyFilter")

        # Pull specific version
        MyFilterV1 = manager.pull("MyFilter", code_hash="abc123...")
    ```

    Note:
        The returned class is fully functional and can be instantiated
        immediately. All methods and attributes are preserved.
    """
    target_hash = code_hash

    # If no specific hash requested, fetch latest pointer
    if not target_hash:
        remote_meta = self._get_remote_latest_meta(class_name)
        if not remote_meta:
            raise ValueError(f"No remote versions found for {class_name}")
        target_hash = remote_meta["hash"]

    # Recover the class using the final hash
    return self.recover_class(class_name, target_hash)

push(class_obj)

Push local version and mark it as 'latest' in storage.

This method performs a complete push workflow:

  1. Computes the class hash
  2. Serializes and uploads the class binary (if not already present)
  3. Updates the 'latest.json' pointer to reference this version

Parameters:

Name Type Description Default
class_obj Type[TypePlugable]

The class to push.

required

Returns:

Type Description
None

None

Example:

    # After modifying MyFilter locally
    manager.push(MyFilter)
    # Now remote 'latest' points to the new version

Note

Pushing creates immutable snapshots. Old versions remain accessible by their hash, enabling rollback and version-specific reconstruction.

Source code in labchain/container/persistent/pet_class_manager.py
def push(self, class_obj: Type[TypePlugable]) -> None:
    """
    Push local version and mark it as 'latest' in storage.

    This method performs a complete push workflow:

    1. Computes the class hash
    2. Serializes and uploads the class binary (if not already present)
    3. Updates the 'latest.json' pointer to reference this version

    Args:
        class_obj (Type[TypePlugable]): The class to push.

    Returns:
        None

    Example:
    ```python
        # After modifying MyFilter locally
        manager.push(MyFilter)
        # Now remote 'latest' points to the new version
    ```

    Note:
        Pushing creates immutable snapshots. Old versions remain accessible
        by their hash, enabling rollback and version-specific reconstruction.
    """
    code_hash = cast(BasePlugin, class_obj)._hash
    class_name = class_obj.__name__

    # Path WITHOUT "plugins/" prefix since context="plugins" adds it
    path = f"{class_name}/{code_hash}.pkl"
    if not self.storage.check_if_exists(path, context="plugins"):
        self.storage.upload_file(class_obj, file_name=path, context="plugins")

    # 2. Update 'latest' development pointer
    manifest = {"hash": code_hash, "class_name": class_name}
    self.storage.upload_file(
        manifest,
        file_name=f"{class_name}/latest.json",
        context="plugins",
    )

recover_class(class_name, code_hash)

Download and reconstruct class from its hash path.

This method fetches the serialized class binary from storage using the class name and hash, then deserializes it with cloudpickle.

Parameters:

Name Type Description Default
class_name str

Name of the class.

required
code_hash str

Hash of the version to recover.

required

Returns:

Type Description
Type[TypePlugable]

Type[TypePlugable]: The reconstructed class object.

Example:

    # Recover a specific version directly
    MyFilterV1 = manager.recover_class("MyFilter", "abc123...")

    instance = MyFilterV1(param=42)

Note

This is a lower-level method typically called by pull(). Prefer using pull() for most use cases.

Source code in labchain/container/persistent/pet_class_manager.py
def recover_class(self, class_name: str, code_hash: str) -> Type[TypePlugable]:  # type: ignore
    """
    Download and reconstruct class from its hash path.

    This method fetches the serialized class binary from storage using
    the class name and hash, then deserializes it with cloudpickle.

    Args:
        class_name (str): Name of the class.
        code_hash (str): Hash of the version to recover.

    Returns:
        Type[TypePlugable]: The reconstructed class object.

    Example:
    ```python
        # Recover a specific version directly
        MyFilterV1 = manager.recover_class("MyFilter", "abc123...")

        instance = MyFilterV1(param=42)
    ```

    Note:
        This is a lower-level method typically called by pull().
        Prefer using pull() for most use cases.
    """
    # Path WITHOUT "plugins/" prefix since context="plugins" adds it

    path = f"{class_name}/{code_hash}.pkl"
    class_obj = self.storage.download_file(path, context="plugins")
    if hasattr(class_obj.__init__, "__typeguard_original_function__"):
        # Restauramos la función original sin chequeo
        class_obj.__init__ = class_obj.__init__.__typeguard_original_function__

    PetClassManager._rehydrate_class_globals(class_obj)

    return class_obj