4848
4949To install the dependencies, run ``pip install "ray[train]" torch tiktoken datasets transformers``.
5050
51- Then, import the required libraries:
51+ Then, import the required libraries.
5252"""
5353
5454import time
8686train_ds = ray .data .from_huggingface (hf_ds ["train" ])
8787val_ds = ray .data .from_huggingface (hf_ds ["validation" ])
8888
89- # Limit dataset size for fast iteration during smoke tests.=
89+ # Limit dataset size for fast iteration during smoke tests.
9090if SMOKE_TEST :
9191 train_ds = train_ds .limit (2500 )
9292 val_ds = val_ds .limit (2500 )
9393
9494print (f"Dataset schema:\n { train_ds .schema ()} " )
9595
9696###############################################################################
97- # The schema should look like this:
97+ # The schema can look like this:
9898#
9999# .. code-block:: text
100100#
105105# This means that the dataset has one column called ``text`` and it is a string.
106106#
107107# Inspect raw data
108- #
109108# ~~~~~~~~~~~~~~~~
110109#
111110# Use ``take(n)`` to fetch a small number of rows for inspection.
143142# 50,257). ``tiktoken`` is a fast, standalone tokenizer that has no
144143# dependency on the Hugging Face ``transformers`` library.
145144#
146- # The ``tokenize_and_chunk`` function:
145+ # The ``tokenize_and_chunk`` function does the following :
147146#
148- # 1. Tokenizes each batch of text, concatenating into a single stream.
149- # Article title lines (e.g. ``= Article Title =``) trigger an
150- # ``<|endoftext|> `` separator so the model resets context at article
151- # boundaries.
152- # 2. Splits the stream into fixed-length blocks of ``block_size + 1``
153- # tokens.
154- # 3. Returns ``input_ids`` (the first ``block_size`` tokens) and
155- # ``labels`` (shifted by one position for next-token prediction).
147+ # * Tokenizes each batch of text, concatenating into a single stream.
148+ # Article title lines (for example, ``= Article Title =``) trigger an
149+ # ``<|endoftext|`` separator so the model resets context at article
150+ # boundaries.
151+ # * Splits the stream into fixed-length blocks of ``block_size + 1``
152+ # tokens.
153+ # * Returns ``input_ids`` (the first ``block_size`` tokens) and
154+ # ``labels`` (shifted by one position for next-token prediction).
156155
157156BLOCK_SIZE = 256
158157VOCAB_SIZE = 50257
@@ -224,7 +223,7 @@ def tokenize_and_chunk(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
224223# Streaming execution
225224# ~~~~~~~~~~~~~~~~~~~
226225#
227- # Under the hood , Ray divides the data into **blocks** and dispatches them to
226+ # Internally , Ray divides the data into **blocks** and dispatches them to
228227# workers. This block-based architecture enables **streaming execution**: as
229228# soon as a stage outputs a block, the next stage can begin processing it
230229# immediately without waiting for previous stages to finish the entire
@@ -233,7 +232,7 @@ def tokenize_and_chunk(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
233232# to fit in memory at once.
234233#
235234# When training starts, Ray Data logs the execution plan. For this tutorial
236- # it looks like :
235+ # one possible plan is :
237236#
238237# .. code-block:: text
239238#
@@ -277,7 +276,7 @@ def create_model():
277276del model # Free memory before training
278277
279278###############################################################################
280- # You should see approximately 123.8M parameters.
279+ # You can see approximately 123.8M parameters.
281280
282281###############################################################################
283282# Define the distributed training function
@@ -417,7 +416,7 @@ def train_func_per_worker(config: dict):
417416# Setting ``num_workers=8`` launches 8 parallel workers, one per GPU. Ray
418417# Train handles ``torch.distributed`` initialization, NCCL backend setup,
419418# and ``DistributedDataParallel`` wrapping behind the scenes. In the logs
420- # you will see each worker being assigned a rank and device:
419+ # you see each worker assigned a rank and device:
421420#
422421# .. code-block:: text
423422#
@@ -466,9 +465,9 @@ def train_func_per_worker(config: dict):
466465# ---------------
467466#
468467# After training, the ``Result`` object contains the final metrics and
469- # checkpoint. ``result.metrics`` is populated from the last
468+ # checkpoint. ``result.metrics`` comes from the last
470469# ``ray.train.report()`` call. ``result.checkpoint`` is ``None`` here
471- # because this tutorial does not save checkpoints.
470+ # because this tutorial doesn't save checkpoints.
472471
473472print ("\n Training finished!" )
474473
@@ -487,12 +486,11 @@ def train_func_per_worker(config: dict):
487486
488487###############################################################################
489488# Checkpointing
490- #
491489# ~~~~~~~~~~~~~
492490#
493491# In a production training run you would enable checkpointing so that
494492# training can resume from the last saved state after a failure. This
495- # requires a **shared storage path** (e.g. an S3 bucket or NFS mount)
493+ # requires a **shared storage path** (for example, an S3 bucket or NFS mount)
496494# accessible from all nodes:
497495#
498496# .. code-block:: python
@@ -528,8 +526,8 @@ def train_func_per_worker(config: dict):
528526# To **resume training from a checkpoint**, call
529527# ``ray.train.get_checkpoint()`` at the top of your training function.
530528# When Ray Train restarts workers (for example, after a failure), it
531- # automatically provides the latest checkpoint. If no checkpoint exists
532- # (i.e. this is a fresh run), the function returns ``None``:
529+ # automatically provides the most recent checkpoint. If no checkpoint exists
530+ # (this is a fresh run), the function returns ``None``:
533531#
534532# .. code-block:: python
535533#
@@ -600,27 +598,25 @@ def train_func_per_worker(config: dict):
600598# No changes to the training function are needed. The same
601599# ``train_func_per_worker`` runs identically whether on 1 GPU or 256 GPUs.
602600#
603- # .. note::
604- #
605- # This tutorial uses ``DistributedDataParallel`` (DDP), which replicates
606- # the full model on every GPU. For larger models that don't fit on a
607- # single GPU, you can switch to
608- # `FullyShardedDataParallel <https://pytorch.org/docs/stable/fsdp.html>`__
609- # (FSDP) to shard parameters, gradients, and optimizer states across
610- # workers by setting ``prepare_model(parallel_strategy="fsdp")``.
601+ # This tutorial uses ``DistributedDataParallel`` (DDP), which replicates
602+ # the full model on every GPU. For larger models that don't fit on a
603+ # single GPU, you can switch to
604+ # `FullyShardedDataParallel <https://pytorch.org/docs/stable/fsdp.html>`__
605+ # (FSDP) to shard parameters, gradients, and optimizer states across
606+ # workers by setting ``prepare_model(parallel_strategy="fsdp")``.
611607#
612608# Heterogeneous clusters: separate data and training resources
613609# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
614610#
615611# Because Ray Data and Ray Train are separate systems, they don't have to
616612# share the same machines. By default, Ray Data preprocessing and training
617613# workers all run on the same nodes. However, you can optionally add
618- # **CPU-only nodes** to your cluster and Ray Data will automatically
619- # schedule preprocessing tasks on them, keeping your expensive GPU nodes
614+ # **CPU-only nodes** to your cluster and Ray Data automatically
615+ # schedules preprocessing tasks on them, keeping your expensive GPU nodes
620616# free for training.
621617#
622618# This is useful when data preprocessing is a bottleneck. If you notice
623- # low GPU utilization because workers are waiting on data, you can add
619+ # low GPU use because workers are waiting on data, you can add
624620# cheaper CPU-only nodes to the cluster and Ray Data scales out
625621# preprocessing to them.
626622#
@@ -638,12 +634,11 @@ def train_func_per_worker(config: dict):
638634# Ray Train's fault tolerance mechanisms include:
639635#
640636# * **Worker restart**: If a worker process crashes, Ray Train
641- # automatically restarts it and resumes training from the last
642- # checkpoint.
637+ # automatically restarts it and resumes training.
643638# * **Checkpoint recovery**: Ray Train saves checkpoints to persistent
644639# storage. When recovering from a failure, training resumes from the
645640# latest checkpoint rather than starting over.
646- # * **Node failure handling**: If an entire node goes down, Ray
641+ # * **Node failure handling**: If an entire node goes down, Ray Train
647642# replaces the failed node and resumes training.
648643#
649644# To enable automatic failure recovery, configure ``FailureConfig`` in your ``RunConfig``:
@@ -679,7 +674,7 @@ def train_func_per_worker(config: dict):
679674# * Monitor training progress across all workers
680675# * Inspect logs from individual workers
681676# * Identify data loading or communication bottlenecks
682- # * View resource utilization for CPU, GPU, and memory per worker
677+ # * View resource use for CPU, GPU, and memory per worker
683678# * Debug failures with detailed error messages and stack traces
684679#
685680# For more information, see the `Ray Train monitoring
0 commit comments