Pythonic Markov Decision Process (MDP)

By vrde

I’ve been studying a subject called probabilistic methods for decisions.
In this course there are a lot of interesting topics like Bayesian Networks (BN), inference and querying in BNs, probabilistic reasoning over time etc. and, in the book (Artificial Intelligence: A Modern Approach – Russell, Norvig) there are a lot of interesting pseudocode ;) to implement.

The simplest algorithm to implement is -IMHO- the Value-Iteration algorithm and my goal is to reproduce the graph that shows the evolution of the utilities shown in Figure 17.5.

MDP example
(image source)

What is a MDP

Wikipedia has an article about MDP.
Simplifying, we need three things:

  1. an initial state: S0
  2. a transition model: T(s, a, s’)
  3. a reward function: R(s)

In this example I assume the environment is fully observable and I consider the horizon to be infinite.

The scope is to find an optimal policy, namely a solution that specifies what we should do for any state other than the goal.

tools & libraries

I used

code for an MDP interface

In my toy problem I need (but is not mandatory) an interface to determine the MDP:

class MarkovDecisionProcess:
    def transition(self, from_state, action, to_state):
	raise NotImplementedError

    def initial_state(self):
	raise NotImplementedError

    def reward(self, state):
	raise NotImplementedError

    def discount(self, state):
	raise NotImplementedError

code for the modified dict class

I need also a slightly modified dict class, up ahead you will understand why:

class SumDict(dict):
    def __setitem__(self, key, value):
        if self.has_key(key):
            value += self.get(key)
        dict.__setitem__(self, key, value)

code for the Toy MDP

After this I can define the world for the agent, it is identical to the one in the book (Section 17.1):

import numpy as np

class ToyMDP(MarkovDecisionProcess):

    def __init__(self):
        #this is the same world of the book
        self.world = np.array([
            [-0.04, -0.04, -0.04,     1],
            [-0.04,  None, -0.04,    -1],
            [-0.04, -0.04, -0.04, -0.04],
        ])
        self.initial_state = (0, 0)
        self.finals = [(0,3), (1,3)]
        self.actions = ('l', 'r', 'u', 'd')

    def __iter__(self):
        class Iterator:
            def __init__(self, iterator, finals):
                self.iterator = iterator
                self.finals = finals
            def next(self):
                while True:
                    coords = self.iterator.coords
                    val = self.iterator.next()
                    if val and coords not in self.finals: break
                return coords, val
        return Iterator(self.world.flat, self.finals)

    def _move(self, state, action):
        """calculates the next state given an action"""
        shape = self.world.shape
        next = list(state)
        if state in self.finals:
            pass
        elif action == 'l' and \
            (state[1] > 0 and self.world[state[0]][state[1]-1] != None):
            next[1] -= 1
        #... other three elif for up, down and right
        return tuple(next)

    def successors(self, state, action):
        """this function returns a dict with all the successors of
        a state and the relative probability given an action,
        for example if you are in (1,0) and you want to go right
        the function returns {(1,0):0.8, (0,0):0.1, (2,0):0.1}"""

        #I'm using SumDict because if two or more "_move"
        #return the same state (and the state is the key of the dict)
        #we need to sum out the values not overwrite the old value
        d = SumDict()
        if action == 'l':
            d[self._move(state, 'l')] = 0.8
            d[self._move(state, 'u')] = 0.1
            d[self._move(state, 'd')] = 0.1
        #... other three elif for up, down and right
        return d

    # from_state, to_state: a tuple
    # action: l, r, u, d (left, right, up, down)
    def transition(self, from_state, action, to_state):
        return self.successors(from_state, action)[to_state]

    def initial_state(self):
        return self.initial_state

    def reward(self, state):
        return self.world[state[0]][state[1]]

    def discount(self):
        return 1

code for the Value-Iteration algorithm

Now we are ready to compute the utility of every state. The function is a little bit more complex if compared with another one that I have just found (really, just 5 minutes ago) :/

There is a little difference between my implementation and the one in the book.
The book uses U'[s]=R[s] + \gamma \max_{a}\sum_{s'}T(s, a, s')U[s']; the difference is in the summation: the book formula iterates over all the states, including the unreachable ones, while, using the successor, I iterate only over the reachable states.
I think this is correct: from a state s I can go only to successors(s) and I can iterate over them instead of over every state in the world.

def value_iteration(mdp, e, hook=lambda u, mdp: 1):
    _u = np.zeros(mdp.world.shape)
    for final in mdp.finals:
        _u[final[0]][final[1]] = mdp.world.item(final)
    while True:
        u = _u.copy()
        hook(u, mdp)
        d = 0
        for state, reward in mdp:
            summ = max([sum([prob * u.item(next_state) for next_state,prob in
                mdp.successors(state, action).items()]) for action in
                mdp.actions])
            _u[state[0]][state[1]] = reward + mdp.discount() * summ
            diff = fabs(_u.item(state) - u.item(state))
            if diff > d:
                d = diff
        if d <= e*(1-mdp.discount())/mdp.discount(): break
    return u

results

The output of the utility of the states matrix is equal to the one in the book:

vrde@pandora:~/other/blog/mdp$ python mdp.py
[[ 0.81155822  0.86780822  0.91780822  1.        ]
 [ 0.76155822  0.          0.66027397 -1.        ]
 [ 0.70530822  0.65530822  0.61141553  0.38792491]]

and also the graph:
utility of the states evolution

That’s all folks, if you want to try it at home you can download here: mdp.tar.bz2 (or, if you want to use wget, here).

Tag: , ,

Lascia un commento