Dynamic Programming Case Study¶

Dynamic programming solution for the 3x3 grid-world case study.¶

End-to-end story of this file:

  1. Import the gridworld environment and a helper to print policies.
  2. policy_evaluation() computes state values $V^\pi$ for a fixed policy $\pi$, taking expectation over both:
    • action probabilities $\pi(a|s)$
    • transition probabilities $P(s', r | s, a)$
  3. compute_q_from_v() converts those state values into action values $Q^\pi$.
  4. policy_improvement() converts those action values into an epsilon-greedy policy, producing a better policy candidate.
  5. policy_iteration() alternates evaluation and improvement until the policy stops changing, which yields the optimal policy for this finite MDP.
  6. policy_iteration_with_history() does the same work but also records every intermediate snapshot so the learning process can be printed or visualized.
  7. main() runs the procedure end-to-end and prints the value tables and policies for each iteration.

Imports¶

Load the shared Python features used throughout the notebook. The environment definition is kept local in the next section so the notebook can run on its own without depending on package imports.

In [62]:
from __future__ import annotations

from dataclasses import dataclass
from types import SimpleNamespace
from typing import Dict, List, Tuple

import numpy as np

GridWorld Environment¶

Shared 3x3 grid-world used by the DP vs Q-learning case study.¶

End-to-end story of this file:

  1. GridWorldConfig defines the static rules of the world: grid size, start state, goal state, rewards, and transition probabilities.
  2. GridWorldCaseStudyEnv.__init__() uses that config to build a small Gym-like environment object with:
    • observation_space.n: how many discrete states exist
    • action_space.n: how many discrete actions exist
    • P: the tabular transition model for the MDP
    • state: the current live state while an episode is running
  3. _build_transition_model() precomputes every (state, action) transition by calling _transition_distribution() for all state-action pairs.
  4. _transition_distribution() contains the actual environment dynamics: given one state and one action, it builds the full probability distribution over next states and rewards.
  5. reset() starts a new episode by placing the agent back at the start state.
  6. step(action) advances the environment by one action using the precomputed transition model P by sampling from that transition distribution.
  7. format_policy(policy) is a helper for visualization; it converts a policy table into a text grid with arrows.
In [63]:
UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3

ACTION_NAMES = {
    UP: 'UP',
    RIGHT: 'RIGHT',
    DOWN: 'DOWN',
    LEFT: 'LEFT',
}

This is a pure data container. It does not execute the environment; it only stores the fixed parameters needed to construct one.

Precondition: - The values should describe a valid gridworld. In particular, start and goal state indices should be within the total number of cells.

Postcondition: - A read-only configuration object exists and can be safely shared.

Immutable configuration bundle for the environment - This is the structural metadata a Gym-style env usually keeps: grid size, start/goal states, reward settings, and the probability of each possible movement outcome after an action is chosen.

In [64]:
@dataclass(frozen=True)
class GridWorldConfig:
    rows: int = 3
    cols: int = 3
    start_state: int = 0
    goal_state: int = 8
    step_reward: float = -1.0
    goal_reward: float = 10.0
    intended_move_prob: float = 0.8
    slip_left_prob: float = 0.1
    slip_right_prob: float = 0.1

Minimal tabular environment with a Gym-like interface.

Story: This class packages the full MDP environment:

  • static settings in config
  • the set of possible states in observation_space
  • the set of possible actions in action_space
  • the transition model in P
  • the current episode state in state
In [65]:
class GridWorldCaseStudyEnv:
    config: GridWorldConfig
    observation_space: SimpleNamespace
    action_space: SimpleNamespace
    P: Dict[int, Dict[int, List[Tuple[float, int, float, bool]]]]
    state: int
    rng: np.random.Generator

    def __init__(self, config: GridWorldConfig | None = None) -> None:
        # Precondition:
        # `config` is either:
        # - a `GridWorldConfig` object supplied by the caller, or
        # - `None`, meaning the default config should be used.
        #
        # What happens:
        # 1. Choose the provided config, or create a default one.
        # 2. Build Gym-like `observation_space` and `action_space` objects.
        # 3. Precompute the full transition table `P`.
        # 4. Create a random-number generator for stochastic sampling in `step()`.
        # 5. Initialize the live state to the start state.
        #
        # Postcondition:
        # The environment is ready to be used with `reset()` and `step()`.
        self.config = config or GridWorldConfig()
        # Gym exposes an observation space object. For a discrete tabular MDP,
        # the only field needed here is `n`: the total number of valid states.
        self.observation_space = SimpleNamespace(
            n=self.config.rows * self.config.cols
        )
        # Gym also exposes an action space object. This environment uses four
        # discrete actions, encoded by the integer constants defined above.
        self.action_space = SimpleNamespace(n=4)
        # `P` is the classic tabular transition model used by many Gym toy-text
        # environments.
        # The `(state, action)` pair is represented by the nested dictionary
        # lookup `P[state][action]` 
        # The value at that lookup is a list of all possible outcomes for the
        # state-transition distribution P(s', r, done | s, a).
        # Each outcome tuple is:
        # (probability, next_state, reward, done)
        #   So the structure is:                  
        #    P[s][a] = [
        #        (prob_1, next_state_1, reward_1, done_1),
        #        (prob_2, next_state_2, reward_2, done_2),
        #        ...
        #    ]
        # This is represented by P(s',r | s, a) in the MDP definition.
        # P[s][a][k] = kth environment outcome for theat state-action pair.
        # The number of items in this inner list is the number of distinct
        # probability-mass entries in the transition distribution for the chosen
        # state-action pair.
        # In this version the list can contain multiple outcomes, so one
        # `(state, action)` pair can represent a full stochastic transition
        # distribution rather than a single deterministic move.
        # env.P[0][1] (state 0, action Right) is
        #   [(0.8, 1, -1.0, False), (0.1, 0, -1.0, False), (0.1, 3, -1.0, False)]          
        #  Meaning:
        #    - with probability 0.8, go to state 1, reward -1, not terminal 
        #    - with probability 0.1, stay in state 0, reward -1, not terminal
        #    - with probability 0.1, go to state 3, reward -1, not terminal 
        self.P = self._build_transition_model()
        self.rng = np.random.default_rng()
        # Mutable runtime state storing the agent's current position.
        # `reset()` reinitializes it and `step()` advances it.
        self.state = self.config.start_state

    def _build_transition_model(self):
        # Precondition:
        # `self.config`, `self.observation_space`, and `self.action_space` have
        # already been initialized.
        #
        # What happens:
        # 1. Allocate an empty nested dictionary `transitions`.
        # 2. Loop over every state `s`.
        # 3. For each state, loop over every action `a`.
        # 4. Call `_transition_distribution(s, a)` to compute all stochastic
        #    outcomes for that state-action pair.
        # 5. Store the full probability distribution at `transitions[s][a]`.
        #
        # Postcondition:
        # Returns a complete tabular model of the MDP where every valid
        # state-action pair has an entry.
        # Nested dictionary structure:
        # - outer key: current state id `s`
        # - inner key: action id `a`
        # - `transitions[s][a]` is the list of outcomes in the transition
        #   probability distribution for that state-action pair
        transitions = {}
        for state in range(self.observation_space.n):
            transitions[state] = {}
            for action in range(self.action_space.n):
                transitions[state][action] = self._transition_distribution(state, action)
        return transitions

    def _transition_distribution(self, state: int, action: int):
        # Precondition:
        # - `state` must be a valid discrete state index.
        # - `action` must be one of {UP, RIGHT, DOWN, LEFT}.
        #
        # What happens:
        # 1. If `state` is already the goal state, keep the agent there and mark
        #    the transition as terminal.
        # 2. Otherwise compute three candidate directions:
        #    intended move, slip-left move, and slip-right move.
        # 3. Use the probabilities from `GridWorldConfig` to assign probability
        #    mass to those candidate movement directions.
        # 4. Convert each candidate move into a next state.
        # 5. If multiple outcomes land in the same next state, merge their
        #    probabilities into one probability-mass entry.
        # 6. Attach reward and terminal status to each merged outcome.
        #
        # Postcondition:
        # Returns the full transition distribution for `(state, action)`.
        if state == self.config.goal_state:
            # Terminal states loop to themselves with probability 1.
            return [(1.0, state, 0.0, True)]
        # candidate_moves is a list of (action, probability) pairs for the intended
        # move and the two slip moves. The probabilities come from the config.
        # this is not transition. transition is the result after applying the dynamics such as 
        # (prob, next_state, reward, done) after applying the move to the state.
        # example candidate_moves: 
        # candidate_moves = [
        #     (UP|0, 0.8),  # intended move: UP with 0.8 probability
        #     (LEFT|3, 0.1),  # slip-left move: LEFT with 0.1 probability
        #     (RIGHT|1, 0.1),  # slip-right move: RIGHT with 0.1 probability
        # ]
        candidate_moves = [
            # The intended move is the chosen action, but with some probability mass from the config.
            (action, self.config.intended_move_prob),
            # The slip-left move is the action to the left of the intended action, with its own probability mass.
            ((action - 1) % self.action_space.n, self.config.slip_left_prob),
            # The slip-right move is the action to the right of the intended action, with its own probability mass.
            ((action + 1) % self.action_space.n, self.config.slip_right_prob),
        ]
        # next_state_probs[next_state] stores the marginal next-state probability P(s' | s, a)
        # after aggregating multiple stochastic movement branches that land in the same next state.
        next_state_probs = {}
        for move_action, prob in candidate_moves:
            if prob == 0.0:
                continue
            next_state = self._move(state, move_action)
            next_state_probs[next_state] = next_state_probs.get(next_state, 0.0) + prob

        # transition_outcomes stores the full environment transition representation for this (s, a) pair.
        # each tuple is one support point of the joint conditional distribution P(s', r | s, a).
        transition_outcomes = []
        for next_state, prob in next_state_probs.items():
            done = next_state == self.config.goal_state
            reward = self.config.goal_reward if done else self.config.step_reward
            transition_outcomes.append((prob, next_state, reward, done))
        return transition_outcomes

    def _move(self, state: int, action: int) -> int:
        # Precondition:
        # - `state` must be a valid discrete state index.
        # - `action` must be one of the four directional actions.
        #
        # What happens:
        # 1. Convert the flat state index into `(row, col)`.
        # 2. Apply the chosen directional move.
        # 3. Clamp movement to stay inside the grid boundaries.
        # 4. Convert the resulting grid cell back into a flat state index.
        #
        # Postcondition:
        # Returns the next discrete state reached by that single movement.
        if state == self.config.goal_state:
            return state
        
        # States are stored as a single integer, but movement logic is easier in
        # (row, col) form. `divmod` converts the flat state id into grid coords.
        row, col = divmod(state, self.config.cols)
        next_row, next_col = row, col
        if action == UP:
            next_row = max(0, row - 1)
        elif action == RIGHT:
            next_col = min(self.config.cols - 1, col + 1)
        elif action == DOWN:
            next_row = min(self.config.rows - 1, row + 1)
        elif action == LEFT:
            next_col = max(0, col - 1)
        else:
            raise ValueError(f"Invalid action: {action}")
        
        # Convert the next grid cell back into the flattened discrete state id.
        return next_row * self.config.cols + next_col

    def reset(self):
        # Precondition:
        # The environment object has been constructed.
        #
        # What happens:
        # 1. Discard any previous episode progress.
        # 2. Set the live state back to the configured start state.
        #
        # Postcondition:
        # - `self.state == self.config.start_state`
        # - returns `(start_state, {})`
        # - the environment is ready for a fresh episode
        # Gym-style reset returns the initial observation plus an info dict.
        self.state = self.config.start_state
        return self.state, {}

    def step(self, action: int):
        # Precondition:
        # - `action` must be a valid discrete action id.
        # - `self.state` must currently hold a valid state index.
        # - The transition table `P` must already exist.
        #
        # What happens:
        # 1. Use the current state and chosen action to look up `P[self.state][action]`.
        # 2. Read the full list of possible transition outcomes.
        # 3. Sample one outcome according to its probability.
        # 4. Extract `next_state`, `reward`, and `done`.
        # 5. Update the live environment state to `next_state`.
        # 6. Return the Gym-like step result tuple.
        #
        # Postcondition:
        # - `self.state` is updated to the returned `next_state`
        # - caller receives `(next_state, reward, terminated, truncated, info)`
        # - `truncated` is always `False` in this simple environment
        # Execute one action in the environment.
        # In RL terms this maps:
        # current state s + chosen action a -> next state s', reward, terminal flag
        # `P[self.state][action]` can contain multiple transition tuples, one
        # for each support point in the stochastic transition distribution.
        # Sample one of those outcomes from the tabular model `P`,
        # update the live state, and return the standard Gym-like step tuple:
        # (observation, reward, terminated, truncated, info)
        # `info` is an optional dictionary for extra metadata; it is empty here.
        outcomes = self.P[self.state][action]
        probabilities = [prob for prob, _, _, _ in outcomes]
        outcome_index = int(self.rng.choice(len(outcomes), p=probabilities))
        _, next_state, reward, done = outcomes[outcome_index]
        self.state = next_state
        return next_state, reward, done, False, {}


def _format_action_grid(rows) -> str:
    lines = []
    cols = GridWorldConfig().cols
    separator = '+---+---+---+'
    lines.append(separator)
    for idx in range(0, len(rows), cols):
        lines.append('|' + '|'.join(rows[idx: idx + cols]) + '|')
        lines.append(separator)
    return '\n'.join(lines)


def format_policy(policy) -> str:
    # Precondition:
    # `policy` should be indexable by state and each row should support
    # selecting the highest-probability action(s) for that state.
    #
    # What happens:
    # 1. Map the best action in each state to an arrow symbol.
    # 2. If multiple actions are tied for the largest probability, display all
    #    tied arrows in the same cell so the visualization does not hide ties.
    # 3. Replace the goal state's arrow with `G`.
    # 4. Group the states row by row into a text grid.
    #
    # Postcondition:
    # Returns a multi-line string that visually represents the policy.
    arrows = {UP: '^', RIGHT: '>', DOWN: 'v', LEFT: '<'}
    rows = []
    for state in range(policy.shape[0]):
        if state == GridWorldConfig().goal_state:
            rows.append(' G ')
        else:
            row = np.asarray(policy[state], dtype=float)
            best_value = row.max()
            best_actions = np.flatnonzero(np.isclose(row, best_value))
            cell = ''.join(arrows[action] for action in best_actions)
            rows.append(f'{cell:^3}')
    return _format_action_grid(rows)


def format_greedy_actions(Q) -> str:
    arrows = {UP: '^', RIGHT: '>', DOWN: 'v', LEFT: '<'}
    rows = []
    for state in range(Q.shape[0]):
        if state == GridWorldConfig().goal_state:
            rows.append(' G ')
        else:
            row = np.asarray(Q[state], dtype=float)
            best_value = row.max()
            best_actions = np.flatnonzero(np.isclose(row, best_value))
            cell = ''.join(arrows[action] for action in best_actions)
            rows.append(f'{cell:^3}')
    return _format_action_grid(rows)

Visualizing P[state][action] and policy[state]¶

There are two different probability objects in dynamic programming for an MDP:

  1. policy[s][a] = \pi(a|s)

    • this belongs to the agent
    • it tells us how likely the agent is to choose action a in state s
  2. P[s][a]

    • this belongs to the environment model
    • it stores a list of possible outcomes for the chosen state-action pair
    • each entry has the form (prob, next_state, reward, done)
    • each such tuple is one support point of the joint conditional distribution P(s', r | s, a)

So the Bellman expectation uses both layers:

  • first average over actions using \pi(a|s)
  • then average over outcomes using P(s', r | s, a)
In [66]:
env = GridWorldCaseStudyEnv()
example_state = 0
example_action = RIGHT

uniform_policy = np.ones((env.observation_space.n, env.action_space.n)) / env.action_space.n
action_names = {UP: 'Up', RIGHT: 'Right', DOWN: 'Down', LEFT: 'Left'}

print('How the two probability objects are stored')
print()
print(f'policy[{example_state}] -> pi(a|s={example_state})')
for a, action_prob in enumerate(uniform_policy[example_state]):
    print(f"  pi({action_names[a]}|s={example_state}) = {action_prob:.2f}")

print()
print(f'P[{example_state}][{example_action}] -> P(s\', r | s={example_state}, a={action_names[example_action]})')
for prob, next_state, reward, done in env.P[example_state][example_action]:
    print(
        f"  prob={prob:.2f}  outcome=(s\'={next_state}, r={reward}, done={done})"
    )

print()
print('Structure summary')
print('  policy[s][a] -> action probability pi(a|s)')
print('  P[s][a] -> list of (prob, next_state, reward, done)')
print("  each tuple in P[s][a] is one support point of the joint conditional distribution P(s', r | s, a)")
How the two probability objects are stored

policy[0] -> pi(a|s=0)
  pi(Up|s=0) = 0.25
  pi(Right|s=0) = 0.25
  pi(Down|s=0) = 0.25
  pi(Left|s=0) = 0.25

P[0][1] -> P(s', r | s=0, a=Right)
  prob=0.80  outcome=(s'=1, r=-1.0, done=False)
  prob=0.10  outcome=(s'=0, r=-1.0, done=False)
  prob=0.10  outcome=(s'=3, r=-1.0, done=False)

Structure summary
  policy[s][a] -> action probability pi(a|s)
  P[s][a] -> list of (prob, next_state, reward, done)
  each tuple in P[s][a] is one support point of the joint conditional distribution P(s', r | s, a)

SVG view of the two probability structures¶

Dynamic programming probability structures

Dynamic Programming Updates¶

These functions implement policy evaluation, one-step lookahead from V to Q, epsilon-greedy policy improvement, and policy iteration with history tracking. The loop alternates between evaluating the current policy and improving it until the greedy action in every state stops changing.

Steps -

  1. plicy_evaluation(policy) -> $V^\pi$ : identify the state value function for a given policy to evaluate the policy.
  2. compute_q_from_v(V) -> $Q^\pi$ : this is needed for next step
  3. policy_improvement(Q) -> new_policy : for a given policy having identified $V^\pi$, in step 1, we know the value of the current policy which is initialized as a seed uniform policy. now from the given state, we will find all the possible different actions, that will potentially give different policies. So, I need to find the value of each action, from that state. So, I need to compute $Q^\pi(s, a)$ from $V^\pi(s)$. Which Q ever value is hightest from all the possible actions, that policy corresponding to that action will be the new policy.
  4. policy_iteration() -> optimal_policy
  5. policy_iteration_with_history() -> optimal_policy, history

Policy evaluation: compute $V^\pi$ for a fixed policy $\pi$.¶

$$ V^\pi(s) = \sum_a \pi(a \mid s)\sum_{s',r} p(s',r \mid s,a)\left[r + \gamma V^\pi(s')\right]$$

$$V^\pi(s) = \sum_a \pi(a \mid s)\sum_{s',r} p(s',r \mid s,a)\left[r + \gamma (1-\text{done}) V^\pi(s')\right]$$

Meanings:

  • $V^\pi(s)$ = state-value function of the policy $\pi$ -expected return when starting in state s and following policy \pi
  • $\pi(a|s)$ = policy - probability that the agent chooses action a when it is in states
  • $p(s', r|s, a)$ = transition dynamics or environment dynamics - probability that after taking action a in state s, the environment produces next state s' and reward r
  • $s$ = current state - the state for which we are computing the value
  • $s'$ = next state - the possible states the environment can transition to after taking action a in state s
  • $r$ = reward - immediate reward received on that transition
  • $\gamma$ = discount factor - controlling how much future rewards matter
  • $\text{done}$ = terminal indicator; 1 means the episode ends after this transition
  • $V^\pi(s')$ = state-value function of the successor state under policy $\pi$ - expected return from the next state s' if the same policy \pi continues to be followed

Interpretation: - first average over actions using \pi(a|s)

  • then average over environment outcomes using p(s',r|s,a)
  • for each outcome, add immediate reward plus discounted future value

Precondition:
- env must expose:
- observation_space.n
- transition model P[state][action]
- policy must have shape (num_states, num_actions)
- each row of policy should represent action probabilities for one state
- gamma should be in a sensible discount range, usually 0 <= gamma <= 1
- theta should be a small positive convergence tolerance

What happens:
1. Create a value table V initialized to zero for every state.
2. Repeatedly sweep through all states.
3. For each state, compute a new expected value under the current policy:
- first loop over actions weighted by policy[s][a] = $\pi(a|s)$
- then, for each action, loop over all transitions in env.P[s][a]
weighted by their transition probability
- accumulate the Bellman expectation update over both probability
distributions
4. Track delta, the largest absolute value change in this sweep.
5. Stop when the value table changes by less than theta.
Postcondition:
Returns an approximate fixed point V for the Bellman expectation
equation under the supplied policy.

In [67]:
def policy_evaluation(env, policy, gamma=0.9, theta=1e-8, max_iterations=10000):
    num_states = env.observation_space.n
    V = np.zeros(num_states)
    for _ in range(max_iterations):
        delta = 0.0
        for s in range(num_states):
            old_v = V[s]
            new_v = 0.0
            for a, action_prob in enumerate(policy[s]):
                if action_prob == 0.0:
                    continue
                for prob, next_state, reward, done in env.P[s][a]:
                    new_v += action_prob * prob * (
                        reward + gamma * (1 - done) * V[next_state]
                    )
            V[s] = new_v
            delta = max(delta, abs(old_v - new_v))
        if delta < theta:
            return V
    raise RuntimeError(
        f'policy_evaluation did not converge within max_iterations={max_iterations}'
    )

$$Q^\pi(s,a) = \sum_{s',r} P(s',r|s,a)[r + \gamma V^\pi(s')]$$

Why convert V to Q?
- $V^\pi(s)$ tells us how good state $s$ is overall, but policy improvement must compare individual actions available in that state. This depends on both $\pi(a|s)$ and $P(s', r|s, a)$. And $\pi(a|s)$ is the policy we determine by sampling different actions while being in each state, and incrementally identifying trying to find the best action from that state, and that gives the best policy. - $Q^\pi(s, a)$ answers that finer question: "If I am in state $s$, how good is it to take action $a$?"
- Once $Q^\pi$ is available, the best action is identified by $\argmax_a Q^\pi(s, a)$. And that action will lead us to the new improved policy, since for every action the policy will change. basically policy is the probability number for all actions from that state. i will see which action will lead to max returns. and assign high probability to that action. So, Q becomes relevant here. It helps us to iterate through all actions, and identify the action with highest return. and we assign the highest probability to that corresponding action from that state - $\pi(a|s)$

Precondition:
- env must expose observation_space.n, action_space.n, and P
- V must contain one value per state
- gamma should be a valid discount factor
What happens:
1. Allocate an empty Q-table with one row per state and one column per action.
2. For each state-action pair, sum over all transitions in env.P[s][a].
This means Q[s, a] averages over the transition probabilities in
P, but not over action probabilities from pi, because the action
a is assumed to already be fixed.
3. Apply the Bellman one-step lookahead using the provided state values V.
Postcondition:
Returns a Q-table where Q[s, a] estimates the value of taking action a
in state s and then following the value function encoded in V.

In [68]:
def compute_q_from_v(env, V, gamma=0.9):
    num_states = env.observation_space.n
    num_actions = env.action_space.n
    Q = np.zeros((num_states, num_actions))
    for s in range(num_states):
        for a in range(num_actions):
            for prob, next_state, reward, done in env.P[s][a]:
                Q[s, a] += prob * (
                    reward + gamma * (1 - done) * V[next_state]
                )
    return Q


def format_q_tie_details(Q):
    lines = []
    for state, row in enumerate(np.asarray(Q, dtype=float)):
        best_value = row.max()
        tied_actions = np.flatnonzero(np.isclose(row, best_value))
        if len(tied_actions) > 1:
            row_values = ', '.join(f'{value:.15f}' for value in row)
            action_names = ', '.join(ACTION_NAMES[int(action)] for action in tied_actions)
            lines.append(
                f'state {state}: Q=[{row_values}] tied_best_actions=[{action_names}]'
            )
    return '\n'.join(lines) if lines else 'No tied greedy actions.'

Why this step needs Q instead of only V:
V gives one value per state, but improvement must choose among multiple
actions in each state. So we first compute Q[s, a] for all actions and
then assign most of the probability mass to the best action.
Precondition:
- env must be a valid tabular environment
- V must contain one value per state
- gamma should be a valid discount factor
- epsilon should satisfy 0 <= epsilon <= 1
What happens:
1. Convert the current state values V into action values Q.
2. For each state, find all actions tied for the largest Q-value.
3. Build an epsilon-greedy policy:
- distribute epsilon uniformly across all actions - policy = np.full((num_states, num_actions), epsilon / num_actions)
- split the remaining 1 - epsilon probability equally across all tied best actions
- This produces a policy row representing action probabilities pi(a|s) without arbitrarily breaking ties among equally good actions.
- in the fully greedy case (ϵ=0), best action gets probability 1, and all other would get 0. Postcondition:
Returns:
- policy: an epsilon-greedy policy with shape (num_states, num_actions)
- Q: the action-value table used to choose that policy

In [69]:
def policy_improvement(env, V, gamma=0.9, epsilon=0.1):
    Q = compute_q_from_v(env, V, gamma=gamma)
    num_states, num_actions = Q.shape
    policy = np.full((num_states, num_actions), epsilon / num_actions)
    for s in range(num_states):
        row = np.asarray(Q[s], dtype=float)
        best_value = row.max()
        best_actions = np.flatnonzero(np.isclose(row, best_value))
        greedy_share = (1.0 - epsilon) / len(best_actions)
        policy[s, best_actions] += greedy_share
    return policy, Q

Precondition:
- env must be a finite tabular MDP
- gamma, theta, and epsilon should be valid numeric hyperparameters
What happens:
1. Run the same evaluation/improvement loop as policy iteration.
2. After each improvement step, record a snapshot containing:
- iteration index
- policy
- state values V
- action values Q
- whether the policy is already stable
3. Stop once stability is reached.
Postcondition:
- Returns the final policy, V, Q, and a history list of intermediate
- snapshots for debugging, teaching, or visualization.

In [70]:
def policy_iteration_with_history(env, gamma=0.9, theta=1e-8, epsilon=0.1, max_policy_iterations=1000):
    num_states = env.observation_space.n
    num_actions = env.action_space.n
    policy = np.ones((num_states, num_actions)) / num_actions
    history = []
    iteration = 0
    for _ in range(max_policy_iterations):
        V = policy_evaluation(env, policy, gamma=gamma, theta=theta)
        new_policy, Q = policy_improvement(env, V, gamma=gamma, epsilon=epsilon)
        stable = np.array_equal(new_policy.argmax(axis=1), policy.argmax(axis=1))
        history.append({
            'iteration': iteration,
            'policy': new_policy.copy(),
            'V': V.copy(),
            'Q': Q.copy(),
            'stable': stable,
        })
        if stable:
            return new_policy, V, Q, history
        policy = new_policy
        iteration += 1
    raise RuntimeError(
        f'policy_iteration_with_history did not converge within max_policy_iterations={max_policy_iterations}'
    )

Run The Case Study¶

Instantiate the gridworld, run policy iteration, and print the intermediate snapshots. The output shows how the value function, action values, and epsilon-greedy policy evolve until convergence.

In [71]:
env = GridWorldCaseStudyEnv()
epsilon = 0.1
policy, V, Q, history = policy_iteration_with_history(env, epsilon=epsilon)

np.set_printoptions(precision=3, suppress=True)
print('Dynamic Programming Snapshots')
for snapshot in history:
    print(f"\nIteration {snapshot['iteration']} (stable={snapshot['stable']})")
    print('V(s)')
    print(snapshot['V'].reshape(3, 3))
    print('Q(s,a)')
    print(snapshot['Q'])
    print('Full-precision tied greedy Q rows')
    print(format_q_tie_details(snapshot['Q']))
    print('Greedy actions from Q (ties shown)')
    print(format_greedy_actions(snapshot['Q']))
    print(f"Epsilon-greedy policy (epsilon={epsilon})")
    print(format_policy(snapshot['policy']))

print('Optimal state values V*')
print(V.reshape(3, 3))
print('\nOptimal action values Q*')
print(Q)
print('\nFull-precision tied greedy Q rows')
print(format_q_tie_details(Q))
print('\nGreedy actions from Q* (ties shown)')
print(format_greedy_actions(Q))
print(f"\nEpsilon-greedy policy from DP (epsilon={epsilon})")
print(format_policy(policy))
Dynamic Programming Snapshots

Iteration 0 (stable=False)
V(s)
[[-5.922 -5.016 -3.767]
 [-5.016 -3.144  0.251]
 [-3.767  0.251  0.   ]]
Q(s,a)
[[-6.249 -5.596 -5.596 -6.249]
 [-5.484 -4.447 -4.136 -5.999]
 [-4.503 -4.029 -1.61  -4.928]
 [-5.999 -4.136 -4.447 -5.484]
 [-5.041 -1.248 -1.248 -5.041]
 [-3.973 -0.058  7.54  -2.503]
 [-4.928 -1.61  -4.029 -4.503]
 [-2.503  7.54  -0.058 -3.973]
 [ 0.     0.     0.     0.   ]]
Full-precision tied greedy Q rows
state 0: Q=[-6.248688540283291, -5.596286558429437, -5.596286558429436, -6.248688540283291] tied_best_actions=[RIGHT, DOWN]
state 4: Q=[-5.040632604322425, -1.247802953320184, -1.247802953320183, -5.040632604322424] tied_best_actions=[RIGHT, DOWN]
state 8: Q=[0.000000000000000, 0.000000000000000, 0.000000000000000, 0.000000000000000] tied_best_actions=[UP, RIGHT, DOWN, LEFT]
Greedy actions from Q (ties shown)
+---+---+---+
|>v | v | v |
+---+---+---+
| > |>v | v |
+---+---+---+
| > | > | G |
+---+---+---+
Epsilon-greedy policy (epsilon=0.1)
+---+---+---+
|>v | v | v |
+---+---+---+
| > |>v | v |
+---+---+---+
| > | > | G |
+---+---+---+

Iteration 1 (stable=True)
V(s)
[[2.71  4.352 6.22 ]
 [4.352 6.458 8.927]
 [6.22  8.927 0.   ]]
Q(s,a)
[[1.587 2.769 2.769 1.587]
 [2.937 4.451 4.454 1.924]
 [4.43  4.842 6.379 3.497]
 [1.924 4.454 4.451 2.937]
 [3.329 6.623 6.623 3.329]
 [4.863 7.087 9.185 5.31 ]
 [3.497 6.379 4.842 4.43 ]
 [5.31  9.185 7.087 4.863]
 [0.    0.    0.    0.   ]]
Full-precision tied greedy Q rows
state 0: Q=[1.587030309833386, 2.769380343525431, 2.769380343525431, 1.587030309833386] tied_best_actions=[RIGHT, DOWN]
state 4: Q=[3.328918326062884, 6.622872152670462, 6.622872152670463, 3.328918326062884] tied_best_actions=[RIGHT, DOWN]
state 8: Q=[0.000000000000000, 0.000000000000000, 0.000000000000000, 0.000000000000000] tied_best_actions=[UP, RIGHT, DOWN, LEFT]
Greedy actions from Q (ties shown)
+---+---+---+
|>v | v | v |
+---+---+---+
| > |>v | v |
+---+---+---+
| > | > | G |
+---+---+---+
Epsilon-greedy policy (epsilon=0.1)
+---+---+---+
|>v | v | v |
+---+---+---+
| > |>v | v |
+---+---+---+
| > | > | G |
+---+---+---+
Optimal state values V*
[[2.71  4.352 6.22 ]
 [4.352 6.458 8.927]
 [6.22  8.927 0.   ]]

Optimal action values Q*
[[1.587 2.769 2.769 1.587]
 [2.937 4.451 4.454 1.924]
 [4.43  4.842 6.379 3.497]
 [1.924 4.454 4.451 2.937]
 [3.329 6.623 6.623 3.329]
 [4.863 7.087 9.185 5.31 ]
 [3.497 6.379 4.842 4.43 ]
 [5.31  9.185 7.087 4.863]
 [0.    0.    0.    0.   ]]

Full-precision tied greedy Q rows
state 0: Q=[1.587030309833386, 2.769380343525431, 2.769380343525431, 1.587030309833386] tied_best_actions=[RIGHT, DOWN]
state 4: Q=[3.328918326062884, 6.622872152670462, 6.622872152670463, 3.328918326062884] tied_best_actions=[RIGHT, DOWN]
state 8: Q=[0.000000000000000, 0.000000000000000, 0.000000000000000, 0.000000000000000] tied_best_actions=[UP, RIGHT, DOWN, LEFT]

Greedy actions from Q* (ties shown)
+---+---+---+
|>v | v | v |
+---+---+---+
| > |>v | v |
+---+---+---+
| > | > | G |
+---+---+---+

Epsilon-greedy policy from DP (epsilon=0.1)
+---+---+---+
|>v | v | v |
+---+---+---+
| > |>v | v |
+---+---+---+
| > | > | G |
+---+---+---+

Final Output¶

Print only the converged result without the intermediate snapshots.

In [72]:
print('Final Dynamic Programming Output')
print('Stable policy:', history[-1]['stable'])
print('\nFinal state values V*')
print(V.reshape(3, 3))
print('\nFinal action values Q*')
print(Q)
print('\nTied best actions from Q*')
print(format_q_tie_details(Q))
print('\nGreedy actions from Q* (ties shown)')
print(format_greedy_actions(Q))
print(f"\nFinal epsilon-greedy policy (epsilon={epsilon})")
print(policy)
print('\nFinal epsilon-greedy policy visualization')
print(format_policy(policy))
Final Dynamic Programming Output
Stable policy: True

Final state values V*
[[2.71  4.352 6.22 ]
 [4.352 6.458 8.927]
 [6.22  8.927 0.   ]]

Final action values Q*
[[1.587 2.769 2.769 1.587]
 [2.937 4.451 4.454 1.924]
 [4.43  4.842 6.379 3.497]
 [1.924 4.454 4.451 2.937]
 [3.329 6.623 6.623 3.329]
 [4.863 7.087 9.185 5.31 ]
 [3.497 6.379 4.842 4.43 ]
 [5.31  9.185 7.087 4.863]
 [0.    0.    0.    0.   ]]

Tied best actions from Q*
state 0: Q=[1.587030309833386, 2.769380343525431, 2.769380343525431, 1.587030309833386] tied_best_actions=[RIGHT, DOWN]
state 4: Q=[3.328918326062884, 6.622872152670462, 6.622872152670463, 3.328918326062884] tied_best_actions=[RIGHT, DOWN]
state 8: Q=[0.000000000000000, 0.000000000000000, 0.000000000000000, 0.000000000000000] tied_best_actions=[UP, RIGHT, DOWN, LEFT]

Greedy actions from Q* (ties shown)
+---+---+---+
|>v | v | v |
+---+---+---+
| > |>v | v |
+---+---+---+
| > | > | G |
+---+---+---+

Final epsilon-greedy policy (epsilon=0.1)
[[0.025 0.475 0.475 0.025]
 [0.025 0.025 0.925 0.025]
 [0.025 0.025 0.925 0.025]
 [0.025 0.925 0.025 0.025]
 [0.025 0.475 0.475 0.025]
 [0.025 0.025 0.925 0.025]
 [0.025 0.925 0.025 0.025]
 [0.025 0.925 0.025 0.025]
 [0.25  0.25  0.25  0.25 ]]

Final epsilon-greedy policy visualization
+---+---+---+
|>v | v | v |
+---+---+---+
| > |>v | v |
+---+---+---+
| > | > | G |
+---+---+---+

Export Policies To JSON¶

Write only the final converged policy output to a JSON file for downstream use.

In [ ]:
import json
from pathlib import Path


transition_model = {
    str(state): {
        ACTION_NAMES[action].upper(): [
            {
                'probability': float(prob),
                'next_state': int(next_state),
                'reward': float(reward),
                'done': bool(done),
            }
            for prob, next_state, reward, done in outcomes
        ]
        for action, outcomes in actions.items()
    }
    for state, actions in env.P.items()
}


payload = {
    'epsilon': epsilon,
    'action_order': [ACTION_NAMES[i].upper() for i in range(4)],
    'final': {
        'stable': bool(history[-1]['stable']) if history else True,
        'policy_matrix': policy.tolist(),
        'V': V.tolist(),
        'Q': Q.tolist(),
        'transition_model': transition_model,
    },
}

output_path = Path('dynamic_programming_policies.json')
output_path.write_text(json.dumps(payload, indent=2), encoding='utf-8')
print(output_path.resolve())