"""
docstring
"""
from typing import Callable, List, Optional, Tuple, Union
from warnings import warn
from itertools import combinations
import random
from joblib import Parallel, delayed
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tomsup.ktom_functions import k_tom, init_k_tom, inv_logit
from tomsup.payoffmatrix import PayoffMatrix
from tomsup.plot import (
choice,
score,
ResultsDf,
plot_history,
plot_p_k,
plot_p_op_1,
plot_p_self,
plot_op_states,
plot_heatmap,
)
from wasabi import msg
###################
# ___ AGENT ___
###################
[docs]class Agent:
"""
Agent is super (parent) class for creating agents in tomsup.
"""
[docs] def __init__(
self, strategy: Optional[str] = None, save_history: bool = False, **kwargs
):
"""
Args:
strategy (Optional[str], optional): The strategy of the agent you wish to create. Defaults to None.
It is recommended to use create_agents() to create instead of the Agent class
save_history (bool, optional): Should the history of the agent be saved. Defaults to False.
kwargs (dict) The specifications passed to the strategy
"""
self.choice = None # the last choice of the agent
if save_history:
self.history = pd.DataFrame() # history of choices
else:
self.history = None
if strategy:
if "TOM" in strategy.upper():
k = strategy.split("-")[0]
kwargs["level"] = int(k)
strategy = strategy.split("-")[1].upper()
kwargs["save_history"] = save_history
self.__class__ = eval(strategy)
self.__init__(**kwargs)
[docs] def reset(self, save_history: Optional[bool] = None):
"""resets the agent to its starting parameters
Args:
save_history (Optional[bool], optional): Should the agent history be saved? Defaults to None, which keeps the previous state.
"""
if self._start_params:
if save_history is not None:
self._start_params["save_history"] = save_history
self.__init__(**self._start_params)
else:
self.__init__()
def _add_to_history(self, **kwargs):
if self.history is None:
pass
elif self.history.empty:
self.history = pd.DataFrame(data=kwargs, index=[0])
if self.strategy.split("-")[-1] == "TOM":
self.history = self.history.append(kwargs, ignore_index=True)
self.history = self.history.drop([0]).reset_index()
else:
self.history = self.history.append(kwargs, ignore_index=True)
# Getters
[docs] def get_start_params(self) -> dict:
"""
Returns:
dict: The starting parameters of the agent.
"""
return self._start_params
[docs] def get_strategy(self) -> str:
"""
Returns:
str: The strategy of the agent
"""
return self.strategy
[docs] def get_choice(self) -> int:
"""
Returns:
int: the agents choice in the previous round
"""
return self.choice
[docs] def get_history(
self, key: Optional[str] = None, format: str = "df"
) -> Union[dict, pd.DataFrame, list]:
"""Return the agents history. This include only information relevant to
the agent. E.g. for a Random bias (RB) agent only its choice is saved, while
opponent choice is not as it used by the agent.
Args:
key (Optional[str], optional): The item of interest in the history. Defaults to None, which returns all the entire history.
format (str, optional): Format of the return, options include "list", "dict" and "df", which is a pandas dataframe. Defaults to "df".
Returns:
Union[dict, pd.DataFrame, list]: The history in the specified format
"""
if self.history is None:
raise Exception(
"save_history is unspecified or set to False. \
Consequently you can't get history. \
Set save_history=True if you want to save agent history"
)
if key is None:
_return = self.history
elif isinstance(key, (list, str)):
_return = self.history[key]
else:
raise Exception(
"Please input valid key. Key should be either a list or a string. \
Alternatively, key can be left unspecified."
)
if format == "df":
return _return
if format == "dict":
return dict(_return)
if format == "list":
return list(_return)
raise Exception(
"Please input valid format e.g. 'df' or 'list' or \
leave format unspecified"
)
# plotters
[docs] def plot_choice(self, show: bool = True) -> None:
"""Plot the choices of the agent
Args:
show (bool, optional): Should plt.show be run at the end. Defaults to True.
"""
df = self.get_history()
plt.plot(df.index, df["choice"], color="lightblue", linewidth=4)
plt.xlabel("Round")
plt.ylabel("Choice")
plt.ylim(0, 1)
if show is True:
plt.show()
[docs] def plot_internal(self, fun: Callable, show: bool = True) -> None:
"""
Function for plotting internal states of agent
Args:
fun (Callable): a function which to use to extract from the internal states dict
show (bool): Should it run plt.show at the end of plotting? Default to True
Examples:
>>> # plotting the est. probability of opp. choosing one over trials
>>> tom1.plot_internal(fun=lambda internal_states: internal_states["own_states"]["p_op"])
>>> # plotting the agent belief about its opponents theory of mind level (p_k)
>>> # probability of sophistication level k=0
>>> tom2.plot_internal(fun=lambda internal_states: internal_states["own_states"]["p_k"][0])
>>> # probability of sophistication level k=1
>>> tom2.plot_internal(fun=lambda internal_states: internal_states["own_states"]["p_k"][1])
"""
df = self.get_history()
df["extracted"] = df["internal_states"].apply(fun)
plt.plot(df.index, df["extracted"], color="lightblue", linewidth=4)
if show is True:
plt.show()
[docs]class RB(Agent):
"""
'RB': Random bias agent
Examples:
>>> rb = ts.agent.RB(bias = 1, save_history = True)
>>> rb.compete()
1
>>> rb.get_start_params()
{'bias': 1, 'save_history': True}
>>> rb.compete()
1
>>> rb.get_history(key='choice', format="list")
[1, 1]
"""
[docs] def __init__(self, bias: float = 0.5, **kwargs) -> Agent:
"""
Args:
bias (float, optional): The probability of the agent to choose 1. Defaults to 0.5.
"""
self._start_params = {"bias": bias, **kwargs}
self.bias = bias
self.strategy = "RB"
super().__init__(**kwargs)
[docs] def compete(self, **kwargs) -> int:
"""
Returns:
int: The choice of the agent
"""
self.choice = np.random.binomial(1, self.bias)
self._add_to_history(choice=self.choice)
return self.choice
[docs] def get_bias(self) -> float:
"""
Returns:
float: The bias of the agent
"""
return self.bias
[docs]class WSLS(Agent):
"""
'WSLS': Win-stay, lose-switch is an agent which employ a simple heuristic.
It simply takes the same choice if it wins and switches when it looses.
Examples:
>>> sigmund = WSLS()
>>> sigmund.choice = 0 # Manually setting choice
>>> penny = PayoffMatrix(name="penny_competitive")
>>> sigmund.compete(op_choice=1, p_matrix=penny)
0
>>> sigmund.choice = 1 # Manually setting choice
>>> sigmund.compete(op_choice=0)
0
"""
[docs] def __init__(self, prob_stay: float = 1, prob_switch: float = 1, **kwargs) -> Agent:
"""
Args:
prob_stay (float, optional): he probability to stay if the agent wins. Defaults to 1.
prob_switch (float, optional): The probability to switch if the agent loose. Defaults to 1.
Returns:
Agent: The WSLS agent
"""
self.strategy = "WSLS"
self.prob_switch = prob_switch
self.prob_stay = prob_stay
super().__init__(**kwargs)
self._start_params = {
"prob_stay": prob_stay,
"prob_switch": prob_switch,
**kwargs,
}
[docs] def compete(
self, op_choice: Optional[int], p_matrix: PayoffMatrix, agent: int, **kwargs
) -> int:
"""
Args:
op_choice (Optional[int]): The choice of the opponent, should be None in the first round.
p_matrix (PayoffMatrix): The payoff matrix which the agent plays in
agent (int): The agent role (either 0, 1) in which the agent have in the payoff matrix.
Returns:
int: The choice of the agent
"""
# if a choice haven't been made: Choose randomly (0 or 1)
if self.choice is None:
self.choice = np.random.binomial(1, 0.5)
else: # if a choice have been made:
if op_choice is None:
raise TypeError(
"compete() missing 1 required positional \
argument: 'op_choice', which should be given \
for all round except the first."
)
payoff = p_matrix.payoff(self.choice, op_choice, agent)
# if you lost change action (e.g. if payoff is less the mean
# outcome)
if payoff < p_matrix().mean():
switch = np.random.binomial(1, self.prob_switch)
self.choice = switch * (1 - self.choice) + (1 - switch) * self.choice
else: # if you won
stay = np.random.binomial(1, self.prob_stay)
self.choice = stay * self.choice + (1 - stay) * (1 - self.choice)
self._add_to_history(choice=self.choice)
return self.choice
[docs]class TFT(Agent):
"""
'TFT': Tit-for-Tat is a heuritstic theory of mind strategy. An agent using
this strategy will initially cooperate, then subsequently replicate an
opponent's previous action.
If the opponent previously was cooperative, the TFT agent is cooperative.
Examples:
>>> shelling = ts.agent.TFT()
>>> p_dilemma = ts.PayoffMatrix(name="prisoners_dilemma")
>>> shelling.compete(op_choice=1, p_matrix=p_dilemma)
1
>>> shelling.compete(op_choice=0, p_matrix=p_dilemma)
0
"""
[docs] def __init__(self, copy_prob: float = 1, **kwargs) -> Agent:
"""
Args:
copy_prob (float, optional): the probability that the TFT copies the behaviour
of the opponent, hereby introducing noise to the original TFT strategy by
Shelling (1981). Defaults to 1.
Returns:
Agent: The TFT agent
"""
self.strategy = "TFT"
self.copy_prob = copy_prob
super().__init__(**kwargs)
self._start_params = {"copy_prob": copy_prob, **kwargs}
[docs] def compete(
self,
op_choice: Optional[int] = None,
p_matrix: PayoffMatrix = PayoffMatrix("prisoners_dilemma"),
verbose: bool = True,
**kwargs,
) -> int:
"""
Args:
op_choice (Optional[int]): The choice of the opponent, should be None in the first round.
p_matrix (PayoffMatrix): The payoff matrix which the agent plays in. Defualt to the prisoners dilemma.
agent (int): The agent role (either 0, 1) in which the agent have in the payoff matrix.
Returns:
int: The choice of the agent
"""
if p_matrix.name != "prisoners_dilemma" and verbose:
warn(
"Tit-for-Tat is designed for the prisoners dilemma and might \
not perform well with other payoff matrices.",
Warning,
)
# If a choice haven't been made: Cooperate
if self.choice is None:
self.choice = 1 # assumes 1 to be cooperate
else: # if a choice have been made
if op_choice is None:
raise TypeError(
"compete() missing 1 required positional \
argument: 'op_choice', which should be given \
for all round except the first."
)
self.op_choice = op_choice
copy = np.random.binomial(1, self.copy_prob)
# Calculate resulting choice
self.choice = copy * op_choice + (1 - copy) * (1 - op_choice)
self._add_to_history(choice=self.choice)
return self.choice
[docs]class QL(Agent):
"""
'QL': The Q-learning model by Watkinns (1992)
"""
[docs] def __init__(
self,
learning_rate: float = 0.5,
b_temp: float = 0.01,
expec_val: Tuple[float, float] = (0.5, 0.5),
**kwargs,
):
"""
Args:
learning_rate (float, optional): The degree to which the agent learns. If the learning rate 0 the agent will not learn.
Defaults to 0.5.
b_temp (float, optional): The behavioural temperature of the Q-Learning agent. Defaults to 0.01.
expec_val (Tuple[float, float], optional): The preference for choice 0 and 1. Defaults to (0.5, 0.5).
"""
self.strategy = "QL"
self.learning_rate = learning_rate
self.expec_val = list(expec_val)
self.b_temp = b_temp
super().__init__(**kwargs)
self._start_params = {
"learning_rate": learning_rate,
"b_temp": b_temp,
"expec_val": list(expec_val),
**kwargs,
}
[docs] def compete(
self, op_choice: Optional[int], p_matrix: PayoffMatrix, agent=int, **kwargs
) -> int:
"""
Args:
op_choice (Optional[int]): The choice of the opponent, should be None in the first round.
p_matrix (PayoffMatrix): The payoff matrix which the agent plays in
agent (int): The agent role (either 0, 1) in which the agent have in the payoff matrix.
Returns:
int: The choice of the agent
"""
if self.choice and op_choice: # if not first round
# Calculate whether or not last round was a victory
payoff = p_matrix.payoff(self.choice, op_choice, agent)
if payoff > p_matrix().mean(): # if you won last round
reward = 1 # Save a win
else: # and if you lost
reward = 0 # Save a loss
# Update perceived values of options. Only the last chosen option
# is updated
self.expec_val[self.choice] = self.expec_val[
self.choice
] + self.learning_rate * (reward - self.expec_val[self.choice])
elif self.choice and op_choice is None:
raise TypeError(
"compete() missing 1 required positional argument: \
'op_choice', which should be given for all rounds \
except the first."
)
# Softmax
p_self = np.exp(self.expec_val[1] / self.b_temp) / sum(
np.exp(np.array(self.expec_val) / self.b_temp)
)
# Make choice
self.choice = np.random.binomial(1, p_self)
self._add_to_history(
choice=self.choice,
expected_value0=self.expec_val[0],
expected_value1=self.expec_val[1],
)
return self.choice
# Define getters
[docs] def get_expected_values(self) -> Tuple[float, float]:
"""
Returns:
Tuple[float, float]: The preference for choice 0 and 1.
"""
return tuple(self.expec_val)
[docs] def get_learning_rate(self) -> float:
"""
Returns:
float: The learning rate of the agent
"""
return self.learning_rate
[docs]class TOM(Agent):
"""
This Theory of Mind agent is the variational implementation of the
recursive ToM agent initially proposed by Devaine (2014), but have
been further developed since.
It recursively estimates its opponent and estimate their beliefs
about itself.
"""
[docs] def __init__(
self,
level: int,
volatility: float = -2,
b_temp: float = -1,
bias: Optional[float] = 0,
dilution: Optional[float] = None,
init_states: Union[dict, str] = "default",
**kwargs,
) -> Agent:
"""
Args:
level (int): Sophistication level of the agent.
volatility (float, optional): Volatility (σ) indicate how much the agent
thinks the opponent might shift their parameters over time.
Volatility is a number in the range (0,∞), but for for
computational reasons is inputted on a log scale. I.e. if you want
to have a volatility of 0.13 you should input ts.log(0.13) ≈ -2.
default is -2 as this was used in the initial implementation of
the model.
b_temp (float, optional): The behavioural temperature (also called the
exploration temperature) indicates how noisy the k-ToM decision
process is. Behavioural temperature Is a number in the range (0,∞),
but for for computational reasons is inputted on a log odds scale.
I.e. to have a temperature of 0.37 you should input ts.log(0.37)
≈ -1. Default is -1 as this was used in the initial implementation of
the model.
bias (Optional[float], optional): The Bias indicates the preference k-ToM decision
to choose 1. It is added to the expected payoff. I.e. if the expected
payoff of choosing 1 is -1 and bias is +2 the updated 'expected payoff'
would be +1. Defaults to 0.
dilution (Optional[float], optional): The dilution indicates the degree to which
beliefs about the opponent’s sophistication level are forgotten over
time. dilution is a number in the range (0, 1),
but for for computational reasons is inputted on a log odds scale.
I.e. to have a dilution of 0.62 you should input ts.inv_logit(0.62)
≈ 0.5. Default is None as this was used in the initial implementation
of the model and this there is no dilution of its beliefs about its
oppoenent's sophistication level.. Defaults to None.
init_states (Union[dict, str], optional): The initialization states of the agent.
Defaults to "default". See tutorial on setting initialization states for more info.
Returns:
Agent: The k-ToM agent
"""
if level > 5:
warn(
"It is quite computationally expensive to run a TOM with a \
level > 5. Make sure this is your intention.",
Warning,
)
self.volatility = volatility
self.b_temp = b_temp
self.bias = bias
self.dilution = dilution
self.level = level
self.strategy = str(level) + "-TOM"
params = {"volatility": volatility, "b_temp": b_temp}
if dilution is not None:
params["dilution"] = dilution
if bias is not None:
params["bias"] = bias
self.params = params
if init_states == "default":
self.internal = init_k_tom(params, level, priors=init_states)
else:
self.internal = init_states
self.__kwargs = kwargs
super().__init__(**kwargs)
self._start_params = {
"volatility": volatility,
"level": level,
"b_temp": b_temp,
"bias": bias,
"dilution": dilution,
"init_states": self.internal,
**kwargs,
}
[docs] def compete(
self, p_matrix: PayoffMatrix, agent: int, op_choice: Optional[int] = None
) -> int:
"""
Args:
op_choice (Optional[int]): The choice of the opponent, should be None in the first round.
p_matrix (PayoffMatrix): The payoff matrix which the agent plays in
agent (int): The agent role (either 0, 1) in which the agent have in the payoff matrix.
Returns:
int: The choice of the agent
"""
self.op_choice = op_choice
self.choice, self.internal = k_tom(
self.internal,
self.params,
self.choice,
op_choice,
self.level,
agent,
p_matrix,
**self.__kwargs,
)
self._add_to_history(choice=self.choice, internal_states=self.internal)
return self.choice
# Define getters
[docs] def get_volatility(self) -> float:
"""
Returns:
float: The volatility of the agent
"""
return self.volatility
[docs] def get_behav_temperature(self) -> float:
"""
Returns:
float: The behavioural temperature of the agent
"""
return self.b_temp
[docs] def get_bias(self) -> Optional[float]:
"""
Returns:
Optional[float]: The bias of the agent
"""
if self.bias is None:
msg.warn("TOM does not have a bias.")
return self.bias
[docs] def get_dilution(self) -> Optional[float]:
"""
Returns:
Optional[float]: The dilution of the agent
"""
if self.dilution is None:
msg.warn("TOM does not have a dilution parameter.")
return self.dilution
[docs] def get_level(self) -> float:
"""
Returns:
float: The sophistication level of the agent
"""
return self.level
[docs] def get_internal_states(self) -> dict:
"""
Returns:
dict: The current internal states of the agent
"""
return self.internal
[docs] def set_internal_states(self, internal_states: dict) -> None:
"""
Args:
internal_states (dict): The desired internal states of the agent.
"""
self.internal = internal_states
self._start_params["init_states"] = self.internal
[docs] def get_parameters(self) -> dict:
"""
Returns:
dict: The agents parameters
"""
return self.params
def __print(
self, d, n=0, keys=None, readability_transform={}, print_level=None
) -> None:
"""A helper function for printing dictionaries"""
for key in d:
if (
(print_level is not None)
and isinstance(key, int)
and (key not in print_level)
):
continue
p_key = str(key) + "-ToM" if isinstance(key, int) else key
if p_key in readability_transform:
p_key = readability_transform[p_key]
p_str = "| " * n + str(p_key)
if isinstance(d[key], dict) is False:
x = d[key].tolist() if isinstance(d[key], np.ndarray) else d[key]
p_str = p_str + ": " + " " * (30 - len(p_key)) + str(x)
if (keys is None) or (str(key) in keys) or isinstance(key, int):
print(p_str)
if isinstance(d[key], dict):
self.__print(d[key], n + 1, keys, readability_transform, print_level)
[docs] def print_parameters(
self,
keys: Optional[list] = None,
):
"""
Args:
keys (Optional[list], optional): The key which you wish to print. Defaults to None, indicating all.
"""
readability_transform = {
"volatility": "volatility (log scale)",
"dilution": "dilution (log odds)",
"b_temp": "b_temp (log odds)",
}
self.__print(
self.params,
n=0,
keys=keys,
readability_transform=readability_transform,
print_level=None,
)
[docs] def print_internal(
self,
keys: Optional[list] = None,
level: Optional[list] = None,
):
"""
prints the internal states of the agent.
Explanation of internal states:
opponent_states: indicate that the following states belong to the
simulated opponent
own_states: indicate that the states belongs to the agent itself
p_k: is the estimated sophistication level of the opponent.
p_op_mean: The mean estimate of the opponents choice probability in
log odds.
param_mean: the mean estimate of opponent parameters (in the scale
used for the given parameter). If estimating another k-ToM the order
of estimates is 1) Volatility, 2) Behavioural temperature, 3) Dilution,
4) Bias. Note that bias is 3) if Dilution is not estimated.
param_var: the variance in log scale (same order as in param_mean)
gradient: the local-linear gradient for each estimate (same order as
in param_mean)
p_self: the probability of the agent itself choosing 1
p_op: the aggregate probability of the opponent choosing 1
Args:
keys (Optional[list], optional): The keys which you desire to print. Defaults to None.
level (Optional[list], optional): List of integers containing levels to print
None indicate all levels will be printed. Defaults to None.
"""
readability_transform = {
"p_k": "p_k (probability)",
"p_op_mean": "p_op_mean (log odds)",
"param_var": "param_var (log scale)",
"p_op": "p_op (probability)",
"p_self": "p_self (probability)",
"p_op_mean0": "p_op_mean0 (log odds)",
"p_op_var0": "p_op_var0 (log scale)",
}
# Convert all elements to string
if keys:
keys = [str(key) for key in keys]
keys += ["own_states", "opponent_states"]
self.__print(
self.internal,
n=0,
keys=keys,
readability_transform=readability_transform,
print_level=level,
)
#########################
# ___ AGENT GROUP ___
#########################
[docs]class AgentGroup:
"""
An agent group is a group of agents. It is a utility class to allow for
easily setting up tournaments.
Examples:
>>> round_table = AgentGroup(agents=['RB']*2, \
start_params=[{'bias': 1}]*2)
>>> round_table.agent_names
['RB_0', 'RB_1']
>>> RB_0 = round_table.get_agent('RB_0') # extract an agent
>>> RB_0.bias == 1 # should naturally be 1, as we specified it
True
>>> round_table.set_env('round_robin')
>>> result = round_table.compete(p_matrix="penny_competitive", \
n_rounds=100, n_sim=10)
Currently the pair, ('RB_0', 'RB_1'), is competing for 10 simulations, \
each containg 100 rounds.
Running simulation 1 out of 10
Running simulation 2 out of 10
Running simulation 3 out of 10
Running simulation 4 out of 10
Running simulation 5 out of 10
Running simulation 6 out of 10
Running simulation 7 out of 10
Running simulation 8 out of 10
Running simulation 9 out of 10
Running simulation 10 out of 10
Simulation complete
>>> result.shape[0] == 10*100 # As there is 10 simulations each containing\
100 round
True
>>> result['payoff_agent0'].mean() == 1 # Given that both agents have \
always choose 1, it is clear that agent0 always win, when playing the \
competitive pennygame
True
"""
[docs] def __init__(self, agents: List[str], start_params: Optional[List[dict]] = None):
"""
Args:
agents (List[str]): A list of agents
start_params (Optional[List[dict]], optional): The starting parameters of the agents specified
as a dictionary pr. agent. Defaults to None, indicating default for all agent. Use empty to
use default of an agent.
"""
self.agents = agents
if start_params:
if len(agents) != len(start_params):
raise ValueError(
"the length of agents is not equal to the \
length of starting parameters."
)
else:
start_params = [{}] * len(agents)
self.start_params = start_params
self.environment = None
self.pairing = None
# create unique agent ID's, e.g. a list of 3 RB agent becomes:
# [RB_0, RB_1, RB_2]
self.agent_names = [
v + "_" + str(agents[:i].count(v)) if agents.count(v) > 1 else v
for i, v in enumerate(agents)
]
self._agents = {
name: Agent(name.split("_")[0], **param)
for name, param in zip(self.agent_names, start_params)
}
[docs] def get_environment_name(self) -> str:
"""
Returns:
str: The name of the set environment
"""
if self.environment:
return self.environment
raise Exception(
"Environment in not set, use set_env() to set \
environment"
)
[docs] def get_environment(self):
"""
Returns:
the pairing resulted from the set environment
"""
if self.pairing:
return self.pairing
raise Exception(
"Environment in not set, use set_env() to set \
environment"
)
[docs] def get_names(self) -> List[str]:
"""
Returns:
List[str]: the names of the agents
"""
return self.agent_names
[docs] def get_agent(self, agent: str) -> Agent:
if agent in self.agent_names:
return self._agents[agent]
raise Exception(
"agent is not in agent names, to get a list of \
agent names, use get_names()"
)
[docs] def set_env(self, env: str) -> None:
"""Set environment of the agent group.
Args:
env (str): The string for the environment you wish to set.
Valid environment strings include:
'round_robin': Matches all participant against all others
'random_pairs': Combines the agent in random pairs (the number of
agent must be even)
"""
self.environment = env.lower()
if self.environment == "round_robin":
self.pairing = list(combinations(self.agent_names, 2))
elif self.environment == "random_pairs":
L = self.agent_names[:]
if len(L) % 2 != 0:
raise Exception(
"List is agent in Agent_group should be \
even if environment is set to random pairs. \
Otherwise one would be lonely and we can't \
have that."
)
random.shuffle(L)
self.pairing = list(zip(L[: len(L) // 2], L[len(L) // 2 :]))
else:
raise TypeError(
f"{env} is not a valid environment. Use help() to \
see valid environments"
)
[docs] def compete(
self,
p_matrix: PayoffMatrix,
n_rounds: int = 10,
n_sim: int = 1,
reset_agent: bool = True,
env: Optional[str] = None,
save_history: bool = False,
verbose: bool = True,
n_jobs: Optional[int] = None,
) -> pd.DataFrame:
"""for each pair competes using the specified parameters
Args:
p_matrix (PayoffMatrix): The payoffmatrix in which the agents compete
n_rounds (int, optional): Number of rounds the agent should play in each simulation.
Defaults to 10.
n_sim (int, optional): The number of simulations. Defaults to 1.
reset_agent (bool, optional): Should the agent be reset ? Defaults to True.
env (Optional[str], optional): The environment in which the agent should compete.
Defaults to None, indicating the already set environment.
save_history (bool, optional): Should the history of agent be saved.
Defaults to False, as this is memory intensive.
verbose (bool, optional): Toggles the verbosity of the function. Defaults to True.
n_jobs (Optional[int], optional): Number of parallel jobs. Defaults to None, indicating no parallelization.
-1 indicate as many jobs as there is cores on your unit.
Returns:
pd.DataFrame: A pandas dataframe of the results.
"""
if self.environment is None and env is None:
raise TypeError(
"No env was specified, either specify environment \
using set_env() or by specifying env for \
compete()"
)
if env:
self.set_env(env)
result = []
for pair in self.pairing:
if verbose:
msg.info(
f"Currently the pair, {pair}, is competing for {n_sim} \
simulations, each containg {n_rounds} rounds."
)
res = compete(
self._agents[pair[0]],
self._agents[pair[1]],
p_matrix=p_matrix,
n_rounds=n_rounds,
n_sim=n_sim,
reset_agent=reset_agent,
return_val="df",
save_history=save_history,
verbose=verbose,
n_jobs=n_jobs,
)
res["agent0"] = pair[0]
res["agent1"] = pair[1]
result.append(res)
if verbose:
msg.good("Simulation complete")
self.__df = ResultsDf(pd.concat(result)) # concatenate into one df
return self.__df
[docs] def get_results(self) -> pd.DataFrame:
"""
Returns:
pd.DataFrame: The results
"""
return self.__df
[docs] def plot_heatmap(
self,
aggregate_col: str = "payoff_agent",
aggregate_fun: Callable = np.mean,
certainty_fun: Union[Callable, str] = "mean_ci_95",
cmap: str = "Blues",
na_color: str = "xkcd:white",
xlab: str = "Agent",
ylab: str = "Opponent",
cbarlabel: str = "Average score of the agent",
show: bool = True,
):
"""plot a heatmap of the results.
Args:
aggregate_col (str, optional): The column to aggregate on. Defaults to "payoff_agent".
aggregate_fun (Callable, optional): The function to aggregate by. Defaults to np.mean.
certainty_fun (Union[Callable, str], optional): The certainty function specified as a string on
the form "mean_ci_X" where X denote the confidence interval, or a function.
Defaults to "mean_ci_95".
cmap (str, optional): The color map. Defaults to "Blues".
na_color (str, optional): The color of NAs. Defaults to "xkcd:white", e.g. white.
xlab (str, optional): The name on the x-axis. Defaults to "Agent".
ylab (str, optional): The name of the y-axis. Defaults to "Opponent".
show (bool, optional): Should plt.show be run at the end. Defaults to True.
"""
plot_heatmap(
self.__df,
aggregate_col,
aggregate_fun,
certainty_fun,
cmap,
na_color,
xlab,
ylab,
cbarlabel=cbarlabel,
show=show,
)
[docs] def plot_choice(
self,
agent0: str,
agent1: str,
agent: int = 0,
sim: Optional[int] = None,
plot_individual_sim: bool = False,
show: bool = True,
):
"""plots the choice of an agent in a defined agent pair
Args:
agent0 (str): The name of agent0
agent1 (str): The name of agent1
agent (int, optional): An int denoting which of agent 0 or 1 you should plot. Defaults to 0.
plot_individual_sim (bool, optional): Should you plot each individual simulation. Defaults to False.
show (bool, optional): Should plt.show be run at the end. Defaults to True.
"""
choice(
self.__df,
agent0=agent0,
agent1=agent1,
agent=agent,
plot_individual_sim=plot_individual_sim,
sim=sim,
show=show,
)
[docs] def plot_score(self, agent0: str, agent1: str, agent: int = 0, show: bool = True):
"""plots the score of an agent in a defined agent pair
Args:
agent0 (str): The name of agent0
agent1 (str): The name of agent1
agent (int, optional): An int denoting which of agent 0 or 1 you should plot. Defaults to 0.
show (bool, optional): Should plt.show be run at the end. Defaults to True.
"""
score(self.__df, agent0, agent1, agent=agent, show=show)
[docs] def plot_history(
self,
agent0: int,
agent1: int,
state: str,
agent: int = 0,
fun: Callable = lambda x: x[state],
ylab: str = "",
xlab: str = "Round",
show: bool = True,
):
"""Plots the history of an agent in a defined agent pair
Args:
agent0 (str): The name of agent0
agent1 (str): The name of agent1
agent (int, optional): An int denoting which of agent 0 or 1 you should plot. Defaults to 0.
state (str): The state of the agent you wish to plot.
fun (Callable, optional): A function for extracting the state. Defaults to lambdax:x[state].
xlab (str, optional): The name on the x-axis. Defaults to "Agent".
ylab (str, optional): The name of the y-axis. Defaults to "Opponent".
show (bool, optional): Should plt.show be run at the end. Defaults to True.
"""
plot_history(
self.__df,
agent0,
agent1,
state,
agent,
fun,
ylab=ylab,
xlab=xlab,
show=show,
)
[docs] def plot_p_op_1(
self, agent0: str, agent1: str, agent: int = 0, show: bool = True
) -> None:
"""plots the p_op_1 of a k-ToM agent in a defined agent pair
Args:
agent0 (str): The name of agent0
agent1 (str): The name of agent1
agent (int, optional): An int denoting which of agent 0 or 1 you should plot. Defaults to 0.
show (bool, optional): Should plt.show be run at the end. Defaults to True.
"""
self.__tom_in_group(agent0, agent1, agent)
plot_p_op_1(self.__df, agent0, agent1, agent, show=show)
[docs] def plot_p_k(
self, agent0: str, agent1: str, level: int, agent: int = 0, show: bool = True
):
"""plots the p_k of a k-ToM agent in a defined agent pair
Args:
agent0 (str): The name of agent0
agent1 (str): The name of agent1
agent (int, optional): An int denoting which of agent 0 or 1 you should plot. Defaults to 0.
show (bool, optional): Should plt.show be run at the end. Defaults to True.
"""
self.__tom_in_group(agent0, agent1, agent)
plot_p_k(self.__df, agent0, agent1, agent=agent, level=level, show=show)
[docs] def plot_p_self(self, agent0: str, agent1: str, agent: int = 0, show: bool = True):
"""plots the p_self of a k-ToM agent in a defined agent pair
Args:
agent0 (str): The name of agent0
agent1 (str): The name of agent1
agent (int, optional): An int denoting which of agent 0 or 1 you should plot. Defaults to 0.
show (bool, optional): Should plt.show be run at the end. Defaults to True.
"""
self.__tom_in_group(agent0, agent1, agent)
plot_p_self(self.__df, agent0, agent1, agent, show=show)
[docs] def plot_op_states(
self,
agent0: str,
agent1: str,
state: str,
level: int = 0,
agent: int = 0,
show: bool = True,
):
"""plots the p_self of a k-ToM agent in a defined agent pair
Args:
agent0 (str): The name of agent0
agent1 (str): The name of agent1
agent (int, optional): An int denoting which of agent 0 or 1 you should plot. Defaults to 0.
state (str): a state of the simulated opponent you wish to plot.
level (str): level of the similated opponent you wish to plot.
show (bool, optional): Should plt.show be run at the end. Defaults to True.
"""
self.__tom_in_group(agent0, agent1, agent)
plot_op_states(self.__df, agent0, agent1, state, level=0, agent=0, show=show)
[docs] def plot_tom_op_estimate(
self,
agent0: int,
agent1: int,
level: int,
estimate: str,
agent: int = 0,
plot: str = "mean",
transformation: Optional[bool] = None,
show: bool = True,
):
"""plot a k-ToM's estimates the opponent in a given pair
Args:
agent0 (str): The name of agent0
agent1 (str): The name of agent1
agent (int, optional): An int denoting which of agent 0 or 1 you should plot. Defaults to 0.
estimate (str): The desired estimate to plot options include:
"volatility",
"behav_temp" (Behavoural Temperature),
"bias",
"dilution".
level (str): Sophistication level of the similated opponent you wish to plot.
plot (str, optional): Toggle between plotting mean ("mean") or variance ("var"). Default to "mean".
show (bool, optional): Should plt.show be run at the end. Defaults to True.
"""
a = self.__tom_in_group(agent0, agent1, agent)
d_e = {
"volatility": ("Volatility", (0, 0)),
"behav_temp": ("Behavoural Temperature", (0, 1)),
"bias": ("Bias", (0, -1)),
"dilution": ("Dilution", (0, 2)),
}
if estimate not in d_e:
raise ValueError(f"Invalid estimate: {estimate}.")
ylab, loc = d_e[estimate]
if plot not in {"mean", "var"}:
raise ValueError(
"plot must be either 'mean' or 'var', for\
plotting either mean or variance"
)
if plot == "mean":
p_str = "mean"
p_key = "param_mean"
else:
p_str = "variance"
p_key = "param_var"
if estimate == "dilution" and a.dilution is not None:
raise ValueError("The desired agent does not estimate dilution")
if estimate == "bias" and a.dilution is not None:
raise ValueError("The desired agent does not estimate a bias")
if transformation is True:
if (estimate == "bias") and (plot == "mean"):
transformation = False
elif plot == "var":
t, t_str = np.exp, "exp"
else:
d_t = {
"volatility": (np.exp, "exp"),
"behav_temp": (inv_logit, "inverse_logit"),
"dilution": (inv_logit, "inverse_logit"),
}
t, t_str = d_t[estimate]
elif callable(transformation):
t = transformation
t_str = transformation.__name__
else:
transformation = False
def t(x):
return x
if transformation is False:
ylab = f"{ylab} ({p_str})"
else:
ylab = f"{t_str}({p_str} - {ylab})"
plot_history(
self.__df,
agent0,
agent1,
state=None,
agent=agent,
fun=lambda x: t(
x["internal_states"]["opponent_states"][level]["own_states"][p_key][loc]
),
ylab=ylab,
show=show,
)
def __tom_in_group(self, agent0: str, agent1: str, agent: int) -> TOM:
a = agent0 if agent == 0 else agent1
agent = self._agents[a]
if isinstance(agent, TOM):
return agent
raise ValueError(
f"The function called requires desired agent to be a ToM agent\
but the specified agent ({a}) is a agent of type {type(agent)}"
)
def __str__(self) -> str:
header = f"<Class AgentGroup, envinment = {self.environment}> \n\n"
info = "\n".join(
("\t | \t".join(str(ii) for ii in i))
for i in list(zip(self.agent_names, self.start_params))
)
return header + info
###################
# ___ UTILS ___
###################
[docs]def compete(
agent_0: Agent,
agent_1: Agent,
p_matrix: PayoffMatrix,
n_rounds: int = 1,
n_sim: Optional[int] = None,
reset_agent: bool = True,
return_val: str = "df",
save_history: bool = False,
verbose: bool = False,
n_jobs: Optional[int] = None,
):
"""
Args:
agent_0 (Agent): objects of class Agent which should compete
agent_1 (Agent): objects of class Agent which should compete
p_matrix (PayoffMatrix): The payoffmatrix in which the agents compete
n_rounds (int, optional): Number of rounds the agent should play in each simulation.
Defaults to 10.
n_sim (int, optional): The number of simulations. Defaults to 1.
reset_agent (bool, optional): Should the agent be reset ? Defaults to True.
save_history (bool, optional): Should the history of agent be saved.
Defaults to False, as this is memory intensive.
return_val (str): Should values be returns as a pandas dataframe ("df"), or a "list".
verbose (bool, optional): Toggles the verbosity of the function. Defaults to True.
n_jobs (Optional[int], optional): Number of parallel jobs. Defaults to None, indicating no parallelization.
-1 indicate as many jobs as there is cores on your unit, i.e. os.cpu_count().
Examples:
>>> sirRB = RB(bias = 0.7)
>>> sirWSLS = WSLS()
>>> result = compete(sirRB, sirWSLS, p_matrix = "penny_competitive", \
n_rounds = 10)
>>> type(result)
pandas.core.frame.DataFrame
>>> result.columns
Index(['round', 'action_agent0', 'action_agent1', 'payoff_agent0',
'payoff_agent1'],
dtype='object')
>>> result = compete(sirRB, sirWSLS, p_matrix = "penny_competitive", \
n_rounds = 10, n_sim = 3, return_val = 'list')
>>> len(result) == 3*10
True
>>> result = compete(sirRB, sirWSLS, p_matrix = "penny_competitive", \
n_rounds = 100, n_sim = 3, return_val = 'df', verbose = True)
Running simulation 1 out of 3
Running simulation 2 out of 3
Running simulation 3 out of 3
>>> result['payoff_agent1'].mean() > 0 # We see that the WSLS() on \
average win more than it lose vs. the biased agent (RB)
True
"""
if isinstance(p_matrix, str):
p_matrix = PayoffMatrix(name=p_matrix)
if reset_agent:
agent_0.reset(save_history=save_history)
agent_1.reset(save_history=save_history)
if n_sim:
# make sure people don't do things they regret
if reset_agent is False:
warn(
"n_sim is set and reset_agent is False, the agent will \
maintain their knowledge across simulations. Is this \
the intended outcome?",
Warning,
)
def __compete(sim):
if verbose:
msg.info(f"\tRunning simulation {sim+1} out of {n_sim}")
res = compete(
agent_0,
agent_1,
p_matrix,
n_rounds,
None,
reset_agent,
return_val="list",
save_history=save_history,
)
if reset_agent and sim != n_sim - 1:
agent_0.reset()
agent_1.reset()
return [(sim,) + tup for tup in res]
if n_jobs is not None:
result = [
t
for trial in Parallel(n_jobs=n_jobs)(
delayed(__compete)(i) for i in range(n_sim)
)
for t in trial
]
else:
result = [t for trial in map(__compete, range(n_sim)) for t in trial]
else:
c_0, c_1 = None, None
result = []
for i in range(n_rounds):
c_0_prev, c_1_prev = c_0, c_1
c_0 = agent_0.compete(
p_matrix=p_matrix, agent=0, op_choice=c_1_prev
) # c for choice
c_1 = agent_1.compete(p_matrix=p_matrix, agent=1, op_choice=c_0_prev)
payoff = (
p_matrix.payoff(c_0, c_1, agent=0),
p_matrix.payoff(c_0, c_1, agent=1),
)
if save_history:
history0 = agent_0.history.tail(1).to_dict("r")[0]
history1 = agent_1.history.tail(1).to_dict("r")[0]
result.append((i, c_0, c_1, payoff[0], payoff[1], history0, history1))
else:
result.append((i, c_0, c_1, payoff[0], payoff[1]))
if return_val == "df":
if save_history:
return ResultsDf(
result,
columns=[
"round",
"choice_agent0",
"choice_agent1",
"payoff_agent0",
"payoff_agent1",
"history_agent0",
"history_agent1",
],
)
return ResultsDf(
result,
columns=[
"round",
"choice_agent0",
"choice_agent1",
"payoff_agent0",
"payoff_agent1",
],
)
if return_val == "list":
return result
if return_val == "df":
if save_history:
return ResultsDf(
result,
columns=[
"n_sim",
"round",
"choice_agent0",
"choice_agent1",
"payoff_agent0",
"payoff_agent1",
"history_agent0",
"history_agent1",
],
)
return ResultsDf(
result,
columns=[
"n_sim",
"round",
"choice_agent0",
"choice_agent1",
"payoff_agent0",
"payoff_agent1",
],
)
raise TypeError("Invalid return_val, please use either 'df' or 'list'")