---
title: "Running an AnVIL workflow within R"
author:
- name: Kayla Interdonato
  affiliation: Roswell Park Comprehensive Cancer Center
- name: Yubo Cheng
  affiliation: Roswell Park Comprehensive Cancer Center
- name: Martin Morgan
  affiliation: Roswell Park Comprehensive Cancer Center
  email: Martin.Morgan@RoswellPark.org
package: AnVIL
output:
  BiocStyle::html_document
abstract: |
  This vignette demonstrates how a user can edit, run, and stop a
  Terra / AnVIL workflow from within their R session. The configuration of the
  workflow can be retrieved and edited. Then this new configuration can be
  sent back to the Terra / AnVIL workspace for future use. With the new
  configuration defined by the user will then be able to run the workflow as well
  as stop any jobs from running.
vignette: |
  %\VignetteIndexEntry{Running an AnVIL workflow within R}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---

```{r, include = FALSE}
has_gcloud <- AnVILBase::has_avworkspace(
    strict = TRUE, platform = AnVILGCP::gcp()
)
knitr::opts_chunk$set(
    eval =  has_gcloud, collapse = TRUE, cache = TRUE
)
options(width=75)
```

# Installation

Install the _AnVIL_ package with

```{r, eval = FALSE}
if (!requireNamespace("BiocManager", quietly = TRUE))
    install.packages("BiocManager", repos = "https://cran.r-project.org")
BiocManager::install("AnVIL")
```

Once installed, load the package with

```{r, message = FALSE, eval = TRUE, cache = FALSE}
library(AnVILGCP)
library(AnVIL)
```

# Workflow setup: DESeq2

## Setting up the workspace and choosing a workflow

The first step will be to define the namespace (billing project) and
name of the workspace to be used with the functions. In our case we
will be using the Bioconductor AnVIL namespace and a DESeq2 workflow
as the intended workspace.

```{r workspace, eval = has_gcloud}
avworkspace("bioconductor-rpci-anvil/Bioconductor-Workflow-DESeq2")
```

Each workspace can have 0 or more workflows. The workflows have a
`name` and `namespace`, just as workspaces. Discover the workflows
available in a workspace

```{r workflows, eval = has_gcloud}
avworkflows()
```

From the table returned by `avworkflows()`, record the namespace and
name of the workflow of interest using `avworkflow()`.

```{r workflow, eval = has_gcloud}
avworkflow("bioconductor-rpci-anvil/AnVILBulkRNASeq")
```

## Retrieving the configuration

Each workflow defines inputs, outputs and certain code
execution. These workflow 'configurations' that can be retrieved with
`avworkflow_configuration_get`.

```{r configuration, eval = has_gcloud}
config <- avworkflow_configuration_get()
config
```

This function is using the workspace namespace, workspace name,
workflow namespace, and workflow name we recorded above with
`avworkspace()` and `avworkflow()`.

# Updating workflows

## Changing the inputs / outputs

There is a lot of information contained in the configuration but the
only variables of interest to the user would be the inputs and
outputs. In our case the inputs and outputs are pre-defined so we
don't have to do anything to them.  But for some workflows these
inputs / outputs may be blank and therefore would need to be defined
by the user. We will change one of our inputs values to show how this
would be done.

There are two functions to help users easily see the content of the
inputs and outputs, they are `avworkflow_configuration_inputs` and
`avworkflow_configuration_outputs`. These functions display the
information in a `tibble` structure which users are most likely
familiar with.

```{r inputs_outputs, eval = has_gcloud}
inputs <- avworkflow_configuration_inputs(config)
inputs

outputs <- avworkflow_configuration_outputs(config)
outputs
```

Let's change the `salmon.transcriptome_index_name` field; this is an
arbitrary string identifier in our workflow.

```{r change_input, eval = has_gcloud}
inputs <-
    inputs |>
    mutate(
        attribute = ifelse(
            name == "salmon.transcriptome_index_name",
            '"new_index_name"',
            attribute
        )
    )
inputs
```

## Update configuration locally

Since the inputs have been modified we need to put this information into
the configuration of the workflow. We can do this with
`avworkflow_configuration_update()`. By default this function will take the
inputs and outputs of the original configuration, just in case there were no
changes to one of them (like in our example our outputs weren't changed).

```{r update_config, eval = has_gcloud}
new_config <- avworkflow_configuration_update(config, inputs)
new_config
```

## Set a workflow configuration for reuse in AnVIL

Use `avworkflow_configuration_set()` to permanently update the
workflow to new parameter values.

```{r set_config, eval = has_gcloud}
avworkflow_configuration_set(new_config)
```

Actually, the previous command validates `new_config` only; to update
the configuration in AnVIL (i.e., replacing the values in the
workspace workflow graphical user interface), add the argument `dry = FALSE`.

```{r set_config_not_dry}
## avworkflow_configuration_set(new_config, dry = FALSE)
```

# Running and stopping workflows

## Running a workflow

To finally run the new workflow we need to know the name of the data set to be
used in the workflow. This can be discovered by looking at the table of
interest and using the name of the data set.

```{r entityName, eval = has_gcloud}
entityName <- avtable("participant_set") |>
    pull(participant_set_id) |>
    head(1)
avworkflow_run(new_config, entityName)
```

Again, actually running the new configuration requires the argument
`dry = FALSE`.

```{r run_not_dry}
## avworkflow_run(new_config, entityName, dry = FALSE)
```

`config` is used to set the `rootEntityType` and workflow method name
and namespace; other components of `config` are ignored (the other
components will be read by Terra / AnVIL from values updated with
`avworkflow_configuration_set()`).

## Monitoring workflows

We can see that the workflow is running by using the `avworkflow_jobs`
function. The elements of the table are ordered chronologically, with
the most recent submission (most likely the job we just started!)
listed first.

```{r checking_workflow, eval = has_gcloud}
avworkflow_jobs()
```

## Stopping workflows

Use `avworkflow_stop()` to stop a currently running workflow. This
will change the status of the job, reported by `avworkflow_jobs()`,
from 'Submitted' to 'Aborted'.

```{r stop_workflow, eval = has_gcloud}
avworkflow_stop() # dry = FALSE to stop

avworkflow_jobs()
```

# Managing workflow output

## Workflow files

Workflows can generate a large number of intermediate files (including
diagnostic logs), as well as final outputs for more interactive
analysis. Use the `submissionId` from `avworkflow_jobs()` to discover
files produced by a submission; the default behavior lists files
produced by the most recent job.

```{r files, eval = has_gcloud}
submissionId <- "fb8e35b7-df5d-49e6-affa-9893aaeebf37"
avworkflow_files(submissionId)
```

Workflow files are stored in the workspace bucket. The files can be
localized to the persistent disk of the current runtime using
`avworkflow_localize()`; the default is again to localize files from
the most recently submitted job; use `type=` to influence which files
('control' e.g., log files, 'output', or 'all') are localized.

```{r localize, eval = has_gcloud}
avworkflow_localize(
    submissionId,
    type = "output"
    ## dry = FALSE to localize
)
```

## Workflow information

Information on workflows (status, start, and end times, and input and
output parameters) is available with `avworkflow_info()`. The examples
below are from workflows using the [Rcollectl][] package to measure
time spent in different parts of a single-cell RNA-seq analysis. The
workspace is not publicly available, so results from
`avworkflow_info()` are read from files in this package.

[Rcollectl]: https://bioconductor.org/packages/Rcollectl

A single job submission can launch multiple workflows. This occurs,
e.g., when a workflow uses several rows from a DATA table to perform
independent analyses. In the example used here, the workflows were
configured to use different numbers of cores (3 or 8) and different
ways of storing single-cell expression data (an in-memory `dgCMatrix`
or an on-disk representation). Thus a single job submission started
four workflows.  This example was retrieved with

```{r, eval = FALSE}
avworkspace("bioconductor-rpci-yubo/Rcollectlworkflowh5ad")
submissionId <- "9385fd75-4cb7-470f-9e07-1979e2c8f193"
info_1 <- avworkflow_info(submissionId)
```

Read the saved version of this result in to *R*.

```{r, message=FALSE, warning=FALSE}
info_file_1 <- system.file(package = "AnVIL", "extdata", "avworkflow_info_1.rds")
info_1 <- readRDS(info_file_1)

## view result of avworkflow_info()
info_1
```

Three of the workflows were successful, one failed.

```{r}
info_1 |>
    select(workflowId, status, inputs, outputs)
```

Inputs and outputs for each workflow are stored as list. Strategies
for working with list-columns in tibbles are described in Chapter 24
of [R for Data Science][r4ds]. Use `tidyr::unnest_wider()` to expand
the inputs. The failed workflow involved 8 `core` using the on-disk
data representation `dgCMatrix = FALSE`.

[r4ds]: https://r4ds.hadley.nz/rectangling

```{r}
info_1 |>
    select(workflowId, status, inputs) |>
    tidyr::unnest_wider(inputs)
```

The outputs (files summarizing the single-cell analysis, and the
timestamps associated with each step in the analysis) involve two
levels of nesting; following the strategy outlined in
[R for Data Science][r4ds], the outputs (google bucket locations) are

[r4ds]: https://r4ds.hadley.nz/rectangling

```{r}
info_1 |>
    select(workflowId, outputs) |>
    tidyr::unnest_wider(outputs) |>
    tidyr::unnest_longer(starts_with("Rcollectl"), keep_empty = TRUE)
```

In the example used so far, each workflow produces a single file. A
different examples is a workflow that produces multiple output
files. This corresponds to the following submissionId:

```{r, eval = FALSE, eval = has_gcloud}
submissionId <- "35280de1-42d8-492b-aa8c-5feff984bffa"
info_2 <- avworkflow_info(submissionId)
```

Reading the result from the stored version:

```{r, message=FALSE, warning=FALSE}
info_file_2 <- system.file(package = "AnVIL", "extdata", "avworkflow_info_2.rds")
info_2 <- readRDS(info_file_2)
info_2
```

Inputs and outputs are manipulated in the same way as before, but this
time there are multiple output files.

```{r, message=FALSE, warning=FALSE}
info_2 |>
    select(workflowId, outputs) |>
    tidyr::unnest_wider(outputs)
```

To see the output files, expand the outputs column using `unnest_longer()`.

```{r}
output_files <-
    info_2 |>
    select(workflowId, outputs) |>
    tidyr::unnest_wider(outputs) |>
    select(RcollectlWorkflowDelayedArrayParameters.Rcollectl_result) |>
    tidyr::unnest_longer(
        "RcollectlWorkflowDelayedArrayParameters.Rcollectl_result"
    )
output_files
```

The full file paths are available using `pull()` or `as.vector()`.

```{r}
output_files |>
    as.vector()
```

# Session information

```{r sessionInfo}
sessionInfo()
```
