Skip to content

AsynchronousCommunicator

synapseclient.models.mixins.AsynchronousCommunicator

Mixin to handle communication with the Synapse Asynchronous Job service.

Source code in synapseclient/models/mixins/asynchronous_job.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
class AsynchronousCommunicator:
    """Mixin to handle communication with the Synapse Asynchronous Job service."""

    def to_synapse_request(self) -> None:
        """Converts the request to a request expected of the Synapse REST API."""
        raise NotImplementedError("to_synapse_request must be implemented.")

    def fill_from_dict(self, synapse_response: Dict[str, str]) -> "Self":
        """
        Converts a response from the REST API into this dataclass.

        Arguments:
            synapse_response: The response from the REST API.

        Returns:
            An instance of this class.
        """
        raise NotImplementedError("fill_from_dict must be implemented.")

    async def _post_exchange_async(
        self, synapse_client: Optional[Synapse] = None, **kwargs
    ) -> None:
        """Any additional logic to run after the exchange with Synapse.

        Arguments:
            synapse_client: The Synapse client to use for the request.
            **kwargs: Additional arguments to pass to the request.
        """
        pass

    async def send_job_and_wait_async(
        self,
        post_exchange_args: Optional[Dict[str, Any]] = None,
        timeout: int = 60,
        *,
        synapse_client: Optional[Synapse] = None,
    ) -> "Self":
        """Send the job to the Asynchronous Job service and wait for it to complete.
        Intended to be called by a class inheriting from this mixin to start a job
        in the Synapse API and wait for it to complete. The inheriting class needs to
        represent an asynchronous job request and response and include all necessary attributes.
        This was initially implemented to be used in the AgentPrompt class which can be used
        as an example.

        Arguments:
            post_exchange_args: Additional arguments to pass to the request.
            timeout: The number of seconds to wait for the job to complete or progress
                before raising a SynapseTimeoutError. Defaults to 60.
            synapse_client: If not passed in and caching was not disabled by
                    `Synapse.allow_client_caching(False)` this will use the last created
                    instance from the Synapse class constructor.

        Returns:
            An instance of this class.

        Example: Using this function
            This function was initially implemented to be used in the AgentPrompt class
            to send a prompt to an AI agent and wait for the response. It can also be used
            in any other class that needs to use an Asynchronous Job.

            The inheriting class (AgentPrompt) will typically not be used directly, but rather
            through a higher level class (AgentSession), but this example shows how you would
            use this function.

                from synapseclient import Synapse
                from synapseclient.models.agent import AgentPrompt

                syn = Synapse()
                syn.login()

                agent_prompt = AgentPrompt(
                    id=None,
                    session_id="123",
                    prompt="Hello",
                    response=None,
                    enable_trace=True,
                    trace=None,
                )
                # This will fill the id, response, and trace
                # attributes with the response from the API
                agent_prompt.send_job_and_wait_async()
        """
        results = await send_job_and_wait_async(
            request=self.to_synapse_request(),
            request_type=self.concrete_type,
            timeout=timeout,
            synapse_client=synapse_client,
        )
        if "results" in results:
            failure_messages = []
            for result in results["results"]:
                if "updateResults" in result:
                    for update_result in result["updateResults"]:
                        failure_code = update_result.get("failureCode", None)
                        failure_message = update_result.get("failureMessage", None)
                        if failure_code or failure_message:
                            client = Synapse.get_client(synapse_client=synapse_client)
                            failure_messages.append(update_result)
            if failure_messages:
                client.logger.warning(
                    f"Failed to send a portion of the async job to Synapse: {failure_messages}"
                )
        self.fill_from_dict(synapse_response=results)
        if not post_exchange_args:
            post_exchange_args = {}
        await self._post_exchange_async(
            **post_exchange_args, synapse_client=synapse_client
        )
        return self

Functions

to_synapse_request

to_synapse_request() -> None

Converts the request to a request expected of the Synapse REST API.

Source code in synapseclient/models/mixins/asynchronous_job.py
40
41
42
def to_synapse_request(self) -> None:
    """Converts the request to a request expected of the Synapse REST API."""
    raise NotImplementedError("to_synapse_request must be implemented.")

fill_from_dict

fill_from_dict(synapse_response: Dict[str, str]) -> Self

Converts a response from the REST API into this dataclass.

PARAMETER DESCRIPTION
synapse_response

The response from the REST API.

TYPE: Dict[str, str]

RETURNS DESCRIPTION
Self

An instance of this class.

Source code in synapseclient/models/mixins/asynchronous_job.py
44
45
46
47
48
49
50
51
52
53
54
def fill_from_dict(self, synapse_response: Dict[str, str]) -> "Self":
    """
    Converts a response from the REST API into this dataclass.

    Arguments:
        synapse_response: The response from the REST API.

    Returns:
        An instance of this class.
    """
    raise NotImplementedError("fill_from_dict must be implemented.")

send_job_and_wait_async async

send_job_and_wait_async(post_exchange_args: Optional[Dict[str, Any]] = None, timeout: int = 60, *, synapse_client: Optional[Synapse] = None) -> Self

Send the job to the Asynchronous Job service and wait for it to complete. Intended to be called by a class inheriting from this mixin to start a job in the Synapse API and wait for it to complete. The inheriting class needs to represent an asynchronous job request and response and include all necessary attributes. This was initially implemented to be used in the AgentPrompt class which can be used as an example.

PARAMETER DESCRIPTION
post_exchange_args

Additional arguments to pass to the request.

TYPE: Optional[Dict[str, Any]] DEFAULT: None

timeout

The number of seconds to wait for the job to complete or progress before raising a SynapseTimeoutError. Defaults to 60.

TYPE: int DEFAULT: 60

synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False) this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
Self

An instance of this class.

Using this function

This function was initially implemented to be used in the AgentPrompt class to send a prompt to an AI agent and wait for the response. It can also be used in any other class that needs to use an Asynchronous Job.

The inheriting class (AgentPrompt) will typically not be used directly, but rather through a higher level class (AgentSession), but this example shows how you would use this function.

from synapseclient import Synapse
from synapseclient.models.agent import AgentPrompt

syn = Synapse()
syn.login()

agent_prompt = AgentPrompt(
    id=None,
    session_id="123",
    prompt="Hello",
    response=None,
    enable_trace=True,
    trace=None,
)
# This will fill the id, response, and trace
# attributes with the response from the API
agent_prompt.send_job_and_wait_async()
Source code in synapseclient/models/mixins/asynchronous_job.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
async def send_job_and_wait_async(
    self,
    post_exchange_args: Optional[Dict[str, Any]] = None,
    timeout: int = 60,
    *,
    synapse_client: Optional[Synapse] = None,
) -> "Self":
    """Send the job to the Asynchronous Job service and wait for it to complete.
    Intended to be called by a class inheriting from this mixin to start a job
    in the Synapse API and wait for it to complete. The inheriting class needs to
    represent an asynchronous job request and response and include all necessary attributes.
    This was initially implemented to be used in the AgentPrompt class which can be used
    as an example.

    Arguments:
        post_exchange_args: Additional arguments to pass to the request.
        timeout: The number of seconds to wait for the job to complete or progress
            before raising a SynapseTimeoutError. Defaults to 60.
        synapse_client: If not passed in and caching was not disabled by
                `Synapse.allow_client_caching(False)` this will use the last created
                instance from the Synapse class constructor.

    Returns:
        An instance of this class.

    Example: Using this function
        This function was initially implemented to be used in the AgentPrompt class
        to send a prompt to an AI agent and wait for the response. It can also be used
        in any other class that needs to use an Asynchronous Job.

        The inheriting class (AgentPrompt) will typically not be used directly, but rather
        through a higher level class (AgentSession), but this example shows how you would
        use this function.

            from synapseclient import Synapse
            from synapseclient.models.agent import AgentPrompt

            syn = Synapse()
            syn.login()

            agent_prompt = AgentPrompt(
                id=None,
                session_id="123",
                prompt="Hello",
                response=None,
                enable_trace=True,
                trace=None,
            )
            # This will fill the id, response, and trace
            # attributes with the response from the API
            agent_prompt.send_job_and_wait_async()
    """
    results = await send_job_and_wait_async(
        request=self.to_synapse_request(),
        request_type=self.concrete_type,
        timeout=timeout,
        synapse_client=synapse_client,
    )
    if "results" in results:
        failure_messages = []
        for result in results["results"]:
            if "updateResults" in result:
                for update_result in result["updateResults"]:
                    failure_code = update_result.get("failureCode", None)
                    failure_message = update_result.get("failureMessage", None)
                    if failure_code or failure_message:
                        client = Synapse.get_client(synapse_client=synapse_client)
                        failure_messages.append(update_result)
        if failure_messages:
            client.logger.warning(
                f"Failed to send a portion of the async job to Synapse: {failure_messages}"
            )
    self.fill_from_dict(synapse_response=results)
    if not post_exchange_args:
        post_exchange_args = {}
    await self._post_exchange_async(
        **post_exchange_args, synapse_client=synapse_client
    )
    return self