Skip to content

Benchmarking functions

benchmark_suite_with_injections

benchmark_suite_with_injections(
    agent_pipeline: BasePipelineElement,
    suite: TaskSuite[Env],
    attack: BaseAttack,
    logdir: Path | None,
    force_rerun: bool,
    user_tasks: Sequence[str] | None = None,
    injection_tasks: Sequence[str] | None = None,
    verbose: bool = True,
) -> SuiteResults

Benchmark a suite with injections.

Parameters:

  • agent_pipeline (BasePipelineElement) –

    The agent pipeline to use.

  • suite (TaskSuite[Env]) –

    The suite to benchmark. attack: The attack to use.

  • logdir (Path | None) –

    The directory to save logs to.

  • force_rerun (bool) –

    Whether to force rerun the task in case in the logdir there already exists a log for the task.

  • user_tasks (Sequence[str] | None, default: None ) –

    A subset of user tasks to run. If None, all user tasks are run.

  • injection_tasks (Sequence[str] | None, default: None ) –

    A subset of injection tasks to run. If None, all injection tasks are run.

  • verbose (bool, default: True ) –

    Whether to print the results.

Source code in src/agentdojo/benchmark.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def benchmark_suite_with_injections(
    agent_pipeline: BasePipelineElement,
    suite: TaskSuite[Env],
    attack: BaseAttack,
    logdir: Path | None,
    force_rerun: bool,
    user_tasks: Sequence[str] | None = None,
    injection_tasks: Sequence[str] | None = None,
    verbose: bool = True,
) -> SuiteResults:
    """Benchmark a suite with injections.

    Args:
        agent_pipeline: The agent pipeline to use.
        suite: The suite to benchmark.\
        attack: The attack to use.
        logdir: The directory to save logs to.
        force_rerun: Whether to force rerun the task in case in the logdir there already exists
            a log for the task.
        user_tasks: A subset of user tasks to run. If None, all user tasks are run.
        injection_tasks: A subset of injection tasks to run. If None, all injection tasks are run.
        verbose: Whether to print the results.
    """
    suite_utility_results: dict[tuple[str, str], bool] = {}
    suite_security_results: dict[tuple[str, str], bool] = {}

    if user_tasks is not None:
        user_tasks_to_run = [suite.get_user_task_by_id(user_task_id) for user_task_id in user_tasks]
    else:
        user_tasks_to_run = suite.user_tasks.values()

    if injection_tasks is not None:
        injection_tasks_to_run = {
            injection_task_id: suite.get_injection_task_by_id(injection_task_id)
            for injection_task_id in injection_tasks
        }
    else:
        injection_tasks_to_run = suite.injection_tasks

    injection_tasks_utility_results = {}
    if not attack.is_dos_attack:
        for injection_task_id, injection_task in injection_tasks_to_run.items():
            successful, _ = run_task_without_injection_tasks(suite, agent_pipeline, injection_task, logdir, force_rerun)
            injection_tasks_utility_results[injection_task_id] = successful

        if not all(injection_tasks_utility_results.values()):
            warnings.warn("Not all injection tasks were solved as user tasks.")

    for user_task in user_tasks_to_run:
        utility, security = run_task_with_injection_tasks(
            suite,
            agent_pipeline,
            user_task,
            attack,
            logdir,
            force_rerun,
            injection_tasks,
        )
        suite_utility_results.update(utility)
        suite_security_results.update(security)

    return SuiteResults(
        utility_results=suite_utility_results,
        security_results=suite_security_results,
        injection_tasks_utility_results=injection_tasks_utility_results,
    )

benchmark_suite_without_injections

benchmark_suite_without_injections(
    agent_pipeline: BasePipelineElement,
    suite: TaskSuite,
    logdir: Path | None,
    force_rerun: bool,
    user_tasks: Sequence[str] | None = None,
) -> SuiteResults

Benchmark a suite without injections.

Parameters:

  • agent_pipeline (BasePipelineElement) –

    The agent pipeline to use.

  • suite (TaskSuite) –

    The suite to benchmark. attack: The attack to use.

  • logdir (Path | None) –

    The directory to save logs to.

  • force_rerun (bool) –

    Whether to force rerun the task in case in the logdir there already exists a log for the task.

  • user_tasks (Sequence[str] | None, default: None ) –

    A subset of user tasks to run. If None, all user tasks are run.

Source code in src/agentdojo/benchmark.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def benchmark_suite_without_injections(
    agent_pipeline: BasePipelineElement,
    suite: TaskSuite,
    logdir: Path | None,
    force_rerun: bool,
    user_tasks: Sequence[str] | None = None,
) -> SuiteResults:
    """Benchmark a suite without injections.

    Args:
        agent_pipeline: The agent pipeline to use.
        suite: The suite to benchmark.\
        attack: The attack to use.
        logdir: The directory to save logs to.
        force_rerun: Whether to force rerun the task in case in the logdir there already exists
            a log for the task.
        user_tasks: A subset of user tasks to run. If None, all user tasks are run.
    """

    suite_utility_results: dict[tuple[str, str], bool] = {}
    suite_security_results: dict[tuple[str, str], bool] = {}

    if user_tasks is not None:
        user_tasks_to_run = [suite.get_user_task_by_id(user_task_id) for user_task_id in user_tasks]
    else:
        user_tasks_to_run = suite.user_tasks.values()

    for user_task in user_tasks_to_run:
        utility, security = run_task_without_injection_tasks(suite, agent_pipeline, user_task, logdir, force_rerun)
        suite_utility_results[(user_task.ID, "")] = utility
        suite_security_results[(user_task.ID, "")] = security

    return SuiteResults(
        utility_results=suite_utility_results,
        security_results=suite_security_results,
        injection_tasks_utility_results={},
    )

run_task_with_injection_tasks

run_task_with_injection_tasks(
    suite: TaskSuite,
    agent_pipeline: BasePipelineElement,
    user_task: BaseUserTask,
    attack: BaseAttack,
    logdir: Path | None,
    force_rerun: bool,
    injection_tasks: Sequence[str] | None = None,
) -> tuple[
    dict[tuple[str, str], bool], dict[tuple[str, str], bool]
]

Runs the given user task with the given injection tasks (or all tasks if no injection task is provided) by injecting the environment by using the given attack.

Parameters:

  • suite (TaskSuite) –

    The task suite to run.

  • agent_pipeline (BasePipelineElement) –

    The agent pipeline to use.

  • user_task (BaseUserTask) –

    The user task to run.

  • attack (BaseAttack) –

    The attack to run.

  • logdir (Path | None) –

    The directory to save logs to.

  • force_rerun (bool) –

    Whether to force rerun the task in case in the logdir there already exists a log for the task.

  • injection_tasks (Sequence[str] | None, default: None ) –

    The injection tasks to run. If None, all injection tasks in the suite will be run.

Returns:

  • tuple[dict[tuple[str, str], bool], dict[tuple[str, str], bool]]

    A tuple of two dictionaries. The first dictionary contains the utility results for each injection task and user task couple. The second dictionary contains the security results for each injection and user task couple.

Source code in src/agentdojo/benchmark.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def run_task_with_injection_tasks(
    suite: TaskSuite,
    agent_pipeline: BasePipelineElement,
    user_task: BaseUserTask,
    attack: BaseAttack,
    logdir: Path | None,
    force_rerun: bool,
    injection_tasks: Sequence[str] | None = None,
) -> tuple[dict[tuple[str, str], bool], dict[tuple[str, str], bool]]:
    """Runs the given user task with the given injection tasks (or all tasks if no injection
    task is provided) by injecting the environment by using the given attack.

    Args:
        suite: The task suite to run.
        agent_pipeline: The agent pipeline to use.
        user_task: The user task to run.
        attack: The attack to run.
        logdir: The directory to save logs to.
        force_rerun: Whether to force rerun the task in case in the logdir there already exists
            a log for the task.
        injection_tasks: The injection tasks to run. If None, all injection tasks in the suite will be run.

    Returns:
        A tuple of two dictionaries. The first dictionary contains the utility results for each injection
            task and user task couple. The second dictionary contains the security results for each
            injection and user task couple.

    """
    utility_results: dict[tuple[str, str], bool] = {}
    security_results: dict[tuple[str, str], bool] = {}

    if attack.is_dos_attack:
        # No need to run all injection tasks since we don't care about the injection target!
        injection_tasks_to_run = [next(iter(suite.injection_tasks.keys()))]
    else:
        injection_tasks_to_run = suite.injection_tasks

    if injection_tasks is not None and not attack.is_dos_attack:
        injection_tasks_to_run = injection_tasks

    for injection_task_id in injection_tasks_to_run:
        injection_task = suite.get_injection_task_by_id(injection_task_id)
        task_injections = attack.attack(user_task, injection_task)

        if logdir is not None and agent_pipeline.name is not None:
            try:
                task_result = load_task_results(
                    agent_pipeline.name,
                    suite.name,
                    user_task.ID,
                    attack_name=attack.name,
                    injection_task=injection_task.ID,
                    logdir=logdir,
                )
            except (ValidationError, FileNotFoundError):
                task_result = None
        else:
            task_result = None

        if task_result is not None and not force_rerun:
            logging.info(f"Skipping task '{user_task.ID}' with '{injection_task.ID}' because it was already run.")
            utility, security = task_result.utility, task_result.security
        else:
            # a trace logger captures logs and saves them, but also passes them to the delegate logger
            with TraceLogger(
                delegate=Logger.get(),
                suite_name=suite.name,
                user_task_id=user_task.ID,
                injection_task_id=injection_task_id,
                injections=task_injections,
                attack_type=attack.name,
                pipeline_name=agent_pipeline.name,
            ) as logger:
                try:
                    utility, security = suite.run_task_with_pipeline(
                        agent_pipeline, user_task, injection_task, task_injections
                    )
                except BadRequestError as e:
                    if e.code == "context_length_exceeded" or e.param == "max_tokens":
                        logger.log_error(
                            f"Skipping task '{user_task.ID}' with '{injection_task.ID}' due to context_length_exceeded: {e}"
                        )
                        utility = False
                        security = True
                    else:
                        raise e
                except ApiError as e:
                    if "internal server error" in str(e):
                        logger.log_error(
                            f"Skipping task '{user_task.ID}' with '{injection_task.ID}' because of internal server error: {e}"
                        )
                        utility = False
                        security = True
                    else:
                        raise e
                except InternalServerError as e:
                    logger.log_error(
                        f"Skipping task '{user_task.ID}' with '{injection_task.ID}' because of internal server error: {e}"
                    )
                    utility = False
                    security = True

                if attack.is_dos_attack:
                    security = not utility

            logger.set_contextarg("utility", utility)
            logger.set_contextarg("security", security)

        utility_results[(user_task.ID, injection_task_id)] = utility
        security_results[(user_task.ID, injection_task_id)] = security
    return utility_results, security_results

run_task_without_injection_tasks

run_task_without_injection_tasks(
    suite: TaskSuite,
    agent_pipeline: BasePipelineElement,
    task: BaseUserTask | BaseInjectionTask,
    logdir: Path | None,
    force_rerun: bool,
) -> tuple[bool, bool]

Run a task without injections. Useful to benchmark the utility of a pipeline.

Parameters:

  • suite (TaskSuite) –

    The suite to use to run the benchmark.

  • agent_pipeline (BasePipelineElement) –

    The agent pipeline to benchmark.

  • task (BaseUserTask | BaseInjectionTask) –

    The task to run.

  • logdir (Path | None) –

    The directory to save logs to.

  • force_rerun (bool) –

    Whether to force rerun the task in case in the logdir there already exists a log for the task.

Returns:

  • tuple[bool, bool]

    A tuple with the utility and security of the task. The security is always True.

Source code in src/agentdojo/benchmark.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def run_task_without_injection_tasks(
    suite: TaskSuite,
    agent_pipeline: BasePipelineElement,
    task: BaseUserTask | BaseInjectionTask,
    logdir: Path | None,
    force_rerun: bool,
) -> tuple[bool, bool]:
    """Run a task without injections. Useful to benchmark the utility of a pipeline.

    Args:
        suite: The suite to use to run the benchmark.
        agent_pipeline: The agent pipeline to benchmark.
        task: The task to run.
        logdir: The directory to save logs to.
        force_rerun: Whether to force rerun the task in case in the logdir there already exists
            a log for the task.

    Returns:
        A tuple with the utility and security of the task. The security is always True.
    """
    if logdir is not None and agent_pipeline.name is not None:
        try:
            task_result = load_task_results(
                agent_pipeline.name,
                suite.name,
                task.ID,
                attack_name="none",
                injection_task="none",
                logdir=logdir,
            )
        except (ValidationError, FileNotFoundError):
            task_result = None
    else:
        task_result = None

    if task_result is not None and not force_rerun:
        logging.info(f"Skipping task '{task.ID}' with no injections because it was already run.")
        return task_result.utility, task_result.security

    with TraceLogger(
        delegate=Logger.get(),
        suite_name=suite.name,
        user_task_id=task.ID,
        injection_task_id=None,
        injections={},
        attack_type="none",
        pipeline_name=agent_pipeline.name,
    ) as logger:
        try:
            utility, security = suite.run_task_with_pipeline(agent_pipeline, task, injection_task=None, injections={})
        except BadRequestError as e:
            if e.code == "context_length_exceeded" or e.param == "max_tokens":
                logger.log_error(f"Skipping task {task.ID} due to context_length_exceeded: {e}")
                utility = False
                security = True
            else:
                raise e
        except ApiError as e:
            if "internal server error" in str(e):
                logger.log_error(f"Skipping task {task.ID} because of internal server error: {e}")
                utility = False
                security = True
            else:
                raise e
        except InternalServerError as e:
            logger.log_error(f"Skipping task {task.ID} because of internal server error: {e}")
            utility = False
            security = True
        logger.set_contextarg("utility", utility)
        logger.set_contextarg("security", security)

    return utility, security

SuiteResults

Bases: TypedDict

The results of running a suite of tasks with a given pipeline.

utility_results instance-attribute

utility_results: dict[tuple[str, str], bool]

The utility results for each injection task and user task couple. The key is a tuple of the user task ID and the injection task ID.

security_results instance-attribute

security_results: dict[tuple[str, str], bool]

The security results for each injection task and user task couple. The key is a tuple of the user task ID and the injection task ID.

injection_tasks_utility_results instance-attribute

injection_tasks_utility_results: dict[str, bool]

The utility results for each injection task. The key is the injection task ID.