Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 200 additions & 84 deletions tutorials/advanced/checkpoint.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@
"\n",
"Lazy | Filesystem | Permanent | Deterministic | Explicit | Spark Equivalent | Fugue Equivalent\n",
":---:|:---:|:---:|:---:|:---:|:---|:---\n",
"Yes | No | No | No | No | `persist()` | `weak_checkpoint(lazy=True)`\n",
"No | No | No | No | No | | `persist()` or `weak_checkpoint(lazy=False)`\n",
"Yes | Yes | No | No | No | | `strong_checkpoint(lazy=True)`\n",
"Yes | No | No | No | No | `persist()` | `persist(lazy=True)`\n",
"No | No | No | No | No | | `persist()` or `persist(lazy=False)`\n",
"Yes | Yes | No | No | No | | `transform(..., save_path=None, checkpoint=True)`\n",
"No | Yes | No | No | No | | `checkpoint()` or `strong_checkpoint(lazy=False)`\n",
"Yes | Yes | Yes | No | No | | \n",
"No | Yes | Yes | No | No | `checkpoint` | `yield_as`\n",
"No | Yes | Yes | No | No | `checkpoint()` | `yield_as`\n",
"Yes | Yes | Yes | Yes | No | | `deteriministic_checkpoint(lazy=True)`\n",
"No | Yes | Yes | Yes | No | | `deteriministic_checkpoint()`\n",
"Yes | Yes | Yes | Yes | Yes | | \n",
Expand All @@ -70,26 +70,26 @@
"\n",
"### Weak Checkpoint\n",
"\n",
"Weak checkpoint is in memory checkpoint. We don't use 'in memory' because it may also use executor's local disk space, depending on the execution engine we use. `persist()` is an alias of `weak_checkpoint(lazy=False)`. \n",
"Weak checkpoint is an in memory checkpoint. We don't use 'in memory' because it may also use executor's local disk space, depending on the execution engine we use. To invoke this, use `persist(lazy=False)`. \n",
"\n",
"Note Fugue persist is eager but Spark persist is lazy."
"Note Fugue persist is eager by default but Spark persist is lazy."
]
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"from fugue import FugueWorkflow\n",
"from fugue_spark import SparkExecutionEngine\n",
"from time import sleep\n",
"import pandas as pd\n",
"import fugue.api as fa\n",
"\n",
"# schema: *\n",
"def just_wait(df:pd.DataFrame) -> pd.DataFrame:\n",
" sleep(5)\n",
" return df"
" return df\n",
"\n",
"df = pd.DataFrame({\"a\": [1,2,3], \"b\": [1,2,3]})"
]
},
{
Expand All @@ -101,37 +101,141 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"with FugueWorkflow(SparkExecutionEngine()) as dag:\n",
" df = dag.df([[0]],\"a:int\")\n",
" df = df.transform(just_wait)\n",
" df.show()\n",
" df.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In the following code, the two shows, due to Spark lineage, will duplicae the transform step, and will take 10 sec"
"from pyspark.sql import SparkSession\n",
"\n",
"spark = SparkSession.builder.getOrCreate()"
]
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 7,
"metadata": {},
"outputs": [],
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>a:long</th>\n",
" <th>b:long</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>2</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>3</td>\n",
" <td>3</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>\n",
"<font size=\"-1\">SparkDataFrame: a:long,b:long</font>"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>a:long</th>\n",
" <th>b:long</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>2</td>\n",
" <td>2</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>3</td>\n",
" <td>3</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>\n",
"<font size=\"-1\">SparkDataFrame: a:long,b:long</font>"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 49.5 ms, sys: 13.7 ms, total: 63.2 ms\n",
"Wall time: 10.6 s\n"
]
}
],
"source": [
"%%time\n",
"with FugueWorkflow(SparkExecutionEngine()) as dag:\n",
" df = dag.df([[0]],\"a:int\")\n",
" df = df.transform(just_wait).persist()\n",
" df.show()\n",
" df.show()"
"with fa.engine_context(spark):\n",
" res = fa.transform(df, just_wait, schema=\"*\")\n",
" fa.show(res)\n",
" fa.show(res)"
]
},
{
Expand All @@ -143,35 +247,23 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 11,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 26.4 ms, sys: 6.85 ms, total: 33.2 ms\n",
"Wall time: 83.9 ms\n"
]
}
],
"source": [
"%%time\n",
"with FugueWorkflow(SparkExecutionEngine()) as dag:\n",
" df = dag.df([[0]],\"a:int\")\n",
" df = df.transform(just_wait).weak_checkpoint(lazy=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following takes 5 sec because the first show triggers the persist and the second uses the cache. But it is pointless to use a lazy checkpoint in a predefined dag like this."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"with FugueWorkflow(SparkExecutionEngine()) as dag:\n",
" df = dag.df([[0]],\"a:int\")\n",
" df = df.transform(just_wait).weak_checkpoint(lazy=True)\n",
" df.show()\n",
" df.show()"
"with fa.engine_context(spark):\n",
" res = fa.transform(df, just_wait, schema=\"*\")\n",
" fa.persist(res, lazy=True)"
]
},
{
Expand All @@ -180,7 +272,7 @@
"source": [
"### Strong Checkpoint\n",
"\n",
"Strong checkpoints are in file but non-permanent checkpoints. They are removed after the DAG execution done.\n",
"Strong checkpoints are in file but non-permanent checkpoints. They are removed after the execution is completed.\n",
"\n",
"Note Fugue checkpoint is non-permanent but Spark checkpoint is permanent.\n",
"\n",
Expand All @@ -189,32 +281,40 @@
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"engine = SparkExecutionEngine(conf={\"fugue.workflow.checkpoint.path\":\"/tmp\"})\n",
"with FugueWorkflow(engine) as dag:\n",
" df = dag.df([[0]],\"a:int\")\n",
" df = df.transform(just_wait).checkpoint()\n",
" df.show()\n",
" df.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 15,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+---+\n",
"| a| b|\n",
"+---+---+\n",
"| 3| 3|\n",
"| 1| 1|\n",
"| 2| 2|\n",
"+---+---+\n",
"\n",
"+---+---+\n",
"| a| b|\n",
"+---+---+\n",
"| 3| 3|\n",
"| 1| 1|\n",
"| 2| 2|\n",
"+---+---+\n",
"\n",
"CPU times: user 36.3 ms, sys: 9.8 ms, total: 46.1 ms\n",
"Wall time: 5.57 s\n"
]
}
],
"source": [
"%%time\n",
"engine = SparkExecutionEngine(conf={\"fugue.workflow.checkpoint.path\":\"/tmp\"})\n",
"with FugueWorkflow(engine) as dag:\n",
" df = dag.df([[0]],\"a:int\")\n",
" df = df.transform(just_wait).strong_checkpoint(lazy=True)\n",
" df.show()\n",
" df.show()"
"with fa.engine_context(spark, engine_conf={\"fugue.workflow.checkpoint.path\":\"/tmp\"}):\n",
" res = fa.transform(df, just_wait, schema=\"*\", checkpoint=True)\n",
" res.show()\n",
" res.show()"
]
},
{
Expand All @@ -223,7 +323,7 @@
"source": [
"### Deterministic Checkpoint\n",
"\n",
"Deterministic checkpoint is mainly for 'resuming'. Expecially when in an interactive environment, you may not want to rerun the whole DAG every time you make a minor change. But note that, practically, you should avoid using deterministic checkpoint in production, also whenever you use deterministic checkpoint, ask yourself, is the dag overly complicated? Can you make it more modulized?\n",
"Deterministic checkpoint is mainly for 'resuming'. Expecially when in an interactive environment, you may not want to rerun the whole DAG every time you make a minor change. But note that, practically, you should avoid using deterministic checkpoint in production, also whenever you use deterministic checkpoint, ask yourself, is the dag overly complicated? Can you make it more modularized?\n",
"\n",
"See this example"
]
Expand Down Expand Up @@ -319,6 +419,22 @@
"It is common to see you have a good separation of your logic, and they are divided into several DAGs, and you need to pass the dataframes between the DAGs. This can be done by `save` and `load` an explicit file path, but for intermediate data, you don't always want to specify paths for them, also you want determinism to work cross DAGs. This is reason to use `yield_as`"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [],
"source": [
"query = \"\"\"\n",
"TRANSFORM df USING just_wait SCHEMA *\n",
"DETERMINISTIC CHECKPOINT\n",
"YIELD DATAFRAME AS result\n",
"\"\"\"\n",
"\n",
"res = fa.fugue_sql_flow(query).run(engine=spark, conf={\"fugue.workflow.checkpoint.path\":\"/tmp\"})\n",
"res['result']"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -383,7 +499,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.9"
"version": "3.8.13"
},
"vscode": {
"interpreter": {
Expand Down
6 changes: 5 additions & 1 deletion tutorials/advanced/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Since you already have experience in Spark or distributed computing in general,
:hidden:

useful_config
checkpoint
execution_engine
validation
schema_dataframes
Expand All @@ -30,8 +31,11 @@ These configurations can have significant impact on building and running the Fug
## [Execution Engine](execution_engine.ipynb)
The heart of Fugue. It is the layer that unifies many of the core concepts of distributed computing, and separates the underlying computing frameworks from user level logic. Normally you don't directly interact with execution engines. But it's good to understand some basics.

## [Checkpoint](checkpoint.ipynb)
These are the operations to avoid duplicate computation when working with larger datasets. Fugue has more granular checkpoint operations than Spark.

## [Validation](validation.ipynb)
Fugue applies input validation.
Fugue applies input validation to make sure the data has the expected format before a transformation is applied.

## [Data Type, Schema & DataFrames](schema_dataframes.ipynb)
Fugue data types and schema are strictly based on [Apache Arrow](https://arrow.apache.org/docs/index.html). Dataframe is an abstract concept with several built-in implementations to adapt to different dataframes. In this tutorial, we will go through the basic APIs and focus on the most common use cases.
Expand Down
Loading