Skip to content

DtoAPI

Bases: AbstractRecordsOnlyAPI

A class to interact with the DTO API.

Retrieves dto data from the database.

Source code in yeastdnnexplorer/interface/DtoAPI.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
class DtoAPI(AbstractRecordsOnlyAPI):
    """
    A class to interact with the DTO API.

    Retrieves dto data from the database.

    """

    def __init__(self, **kwargs) -> None:
        """
        Initialize the DTO object. This will serve as an interface to the DTO endpoint
        of both the database and the application cache.

        :param url: The URL of the DTO API
        :param kwargs: Additional parameters to pass to AbstractAPI.

        """
        super().__init__(
            url=kwargs.pop("url", os.getenv("DTO_URL", "")),
            **kwargs,
        )

    async def submit(
        self,
        post_dict: dict[str, Any],
        **kwargs,
    ) -> Any:
        """
        Submit a DTO task to the DTO API.

        :param post_dict: The dictionary to submit to the DTO API. The typing needs to
            be adjusted -- it can take a list of dictionaries to submit a batch.
        :return: The group_task_id of the submitted task.

        """
        # make a post request with the post_dict to dto_url
        dto_url = f"{self.url.rstrip('/')}/submit/"
        self.logger.debug("dto_url: %s", dto_url)

        async with aiohttp.ClientSession() as session:
            async with session.post(
                dto_url, headers=self.header, json=post_dict
            ) as response:
                try:
                    response.raise_for_status()
                except aiohttp.ClientResponseError as e:
                    self.logger.error(
                        "Failed to submit DTO task: Status %s, Reason %s",
                        e.status,
                        e.message,
                    )
                    raise
                result = await response.json()
                try:
                    return result["group_task_id"]
                except KeyError:
                    self.logger.error(
                        "Expected 'group_task_id' in response: %s", json.dumps(result)
                    )
                    raise

    async def retrieve(
        self,
        group_task_id: str,
        timeout: int = 300,
        polling_interval: int = 2,
        **kwargs,
    ) -> dict[str, pd.DataFrame]:
        """
        Periodically check the task status and retrieve the result when the task
        completes.

        :param group_task_id: The task ID to retrieve results for.
        :param timeout: The maximum time to wait for the task to complete (in seconds).
        :param polling_interval: The time to wait between status checks (in seconds).
        :return: Records from the DTO API of the successfully completed task.

        """
        # Start time for timeout check
        start_time = time.time()

        # Task status URL
        status_url = f"{self.url.rstrip('/')}/status/"

        while True:
            async with aiohttp.ClientSession() as session:
                # Send a GET request to check the task status
                async with session.get(
                    status_url,
                    headers=self.header,
                    params={"group_task_id": group_task_id},
                ) as response:
                    response.raise_for_status()  # Raise an error for bad status codes
                    status_response = await response.json()

                    # Check if the task is complete
                    if status_response.get("status") == "SUCCESS":

                        if error_tasks := status_response.get("error_tasks"):
                            self.logger.error(
                                f"Tasks {group_task_id} failed: {error_tasks}"
                            )
                        if success_tasks := status_response.get("success_pks"):
                            params = {"id": ",".join(str(pk) for pk in success_tasks)}
                            return await self.read(params=params)
                    elif status_response.get("status") == "FAILURE":
                        raise Exception(
                            f"Task {group_task_id} failed: {status_response}"
                        )

                    # Check if we have reached the timeout
                    elapsed_time = time.time() - start_time
                    if elapsed_time > timeout:
                        raise TimeoutError(
                            f"Task {group_task_id} did not "
                            "complete within {timeout} seconds."
                        )

                    # Wait for the specified polling interval before checking again
                    await asyncio.sleep(polling_interval)

    def create(self, data: dict[str, Any], **kwargs) -> requests.Response:
        raise NotImplementedError("The DTO does not support create.")

    def update(self, df: pd.DataFrame, **kwargs) -> Any:
        raise NotImplementedError("The DTO does not support update.")

    def delete(self, id: str, **kwargs) -> Any:
        """
        Delete a DTO record from the database.

        :param id: The ID of the DTO record to delete.
        :return: A dictionary with a status message indicating success or failure.

        """
        # Include the Authorization header with the token
        headers = kwargs.get("headers", {})
        headers["Authorization"] = f"Token {self.token}"

        # Make the DELETE request with the updated headers
        response = requests.delete(f"{self.url}/{id}/", headers=headers, **kwargs)

        if response.status_code == 204:
            return {"status": "success", "message": "DTO deleted successfully."}

        # Raise an error if the response indicates failure
        response.raise_for_status()

__init__(**kwargs)

Initialize the DTO object. This will serve as an interface to the DTO endpoint of both the database and the application cache.

Parameters:

Name Type Description Default
url

The URL of the DTO API

required
kwargs

Additional parameters to pass to AbstractAPI.

{}
Source code in yeastdnnexplorer/interface/DtoAPI.py
22
23
24
25
26
27
28
29
30
31
32
33
34
def __init__(self, **kwargs) -> None:
    """
    Initialize the DTO object. This will serve as an interface to the DTO endpoint
    of both the database and the application cache.

    :param url: The URL of the DTO API
    :param kwargs: Additional parameters to pass to AbstractAPI.

    """
    super().__init__(
        url=kwargs.pop("url", os.getenv("DTO_URL", "")),
        **kwargs,
    )

delete(id, **kwargs)

Delete a DTO record from the database.

Parameters:

Name Type Description Default
id str

The ID of the DTO record to delete.

required

Returns:

Type Description
Any

A dictionary with a status message indicating success or failure.

Source code in yeastdnnexplorer/interface/DtoAPI.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def delete(self, id: str, **kwargs) -> Any:
    """
    Delete a DTO record from the database.

    :param id: The ID of the DTO record to delete.
    :return: A dictionary with a status message indicating success or failure.

    """
    # Include the Authorization header with the token
    headers = kwargs.get("headers", {})
    headers["Authorization"] = f"Token {self.token}"

    # Make the DELETE request with the updated headers
    response = requests.delete(f"{self.url}/{id}/", headers=headers, **kwargs)

    if response.status_code == 204:
        return {"status": "success", "message": "DTO deleted successfully."}

    # Raise an error if the response indicates failure
    response.raise_for_status()

retrieve(group_task_id, timeout=300, polling_interval=2, **kwargs) async

Periodically check the task status and retrieve the result when the task completes.

Parameters:

Name Type Description Default
group_task_id str

The task ID to retrieve results for.

required
timeout int

The maximum time to wait for the task to complete (in seconds).

300
polling_interval int

The time to wait between status checks (in seconds).

2

Returns:

Type Description
dict[str, DataFrame]

Records from the DTO API of the successfully completed task.

Source code in yeastdnnexplorer/interface/DtoAPI.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
async def retrieve(
    self,
    group_task_id: str,
    timeout: int = 300,
    polling_interval: int = 2,
    **kwargs,
) -> dict[str, pd.DataFrame]:
    """
    Periodically check the task status and retrieve the result when the task
    completes.

    :param group_task_id: The task ID to retrieve results for.
    :param timeout: The maximum time to wait for the task to complete (in seconds).
    :param polling_interval: The time to wait between status checks (in seconds).
    :return: Records from the DTO API of the successfully completed task.

    """
    # Start time for timeout check
    start_time = time.time()

    # Task status URL
    status_url = f"{self.url.rstrip('/')}/status/"

    while True:
        async with aiohttp.ClientSession() as session:
            # Send a GET request to check the task status
            async with session.get(
                status_url,
                headers=self.header,
                params={"group_task_id": group_task_id},
            ) as response:
                response.raise_for_status()  # Raise an error for bad status codes
                status_response = await response.json()

                # Check if the task is complete
                if status_response.get("status") == "SUCCESS":

                    if error_tasks := status_response.get("error_tasks"):
                        self.logger.error(
                            f"Tasks {group_task_id} failed: {error_tasks}"
                        )
                    if success_tasks := status_response.get("success_pks"):
                        params = {"id": ",".join(str(pk) for pk in success_tasks)}
                        return await self.read(params=params)
                elif status_response.get("status") == "FAILURE":
                    raise Exception(
                        f"Task {group_task_id} failed: {status_response}"
                    )

                # Check if we have reached the timeout
                elapsed_time = time.time() - start_time
                if elapsed_time > timeout:
                    raise TimeoutError(
                        f"Task {group_task_id} did not "
                        "complete within {timeout} seconds."
                    )

                # Wait for the specified polling interval before checking again
                await asyncio.sleep(polling_interval)

submit(post_dict, **kwargs) async

Submit a DTO task to the DTO API.

Parameters:

Name Type Description Default
post_dict dict[str, Any]

The dictionary to submit to the DTO API. The typing needs to be adjusted – it can take a list of dictionaries to submit a batch.

required

Returns:

Type Description
Any

The group_task_id of the submitted task.

Source code in yeastdnnexplorer/interface/DtoAPI.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
async def submit(
    self,
    post_dict: dict[str, Any],
    **kwargs,
) -> Any:
    """
    Submit a DTO task to the DTO API.

    :param post_dict: The dictionary to submit to the DTO API. The typing needs to
        be adjusted -- it can take a list of dictionaries to submit a batch.
    :return: The group_task_id of the submitted task.

    """
    # make a post request with the post_dict to dto_url
    dto_url = f"{self.url.rstrip('/')}/submit/"
    self.logger.debug("dto_url: %s", dto_url)

    async with aiohttp.ClientSession() as session:
        async with session.post(
            dto_url, headers=self.header, json=post_dict
        ) as response:
            try:
                response.raise_for_status()
            except aiohttp.ClientResponseError as e:
                self.logger.error(
                    "Failed to submit DTO task: Status %s, Reason %s",
                    e.status,
                    e.message,
                )
                raise
            result = await response.json()
            try:
                return result["group_task_id"]
            except KeyError:
                self.logger.error(
                    "Expected 'group_task_id' in response: %s", json.dumps(result)
                )
                raise