diff --git a/lcs/agents/facs2/Classifier.py b/lcs/agents/facs2/Classifier.py new file mode 100644 index 00000000..6a9d8d00 --- /dev/null +++ b/lcs/agents/facs2/Classifier.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +from typing import Optional, Union + +import lcs.agents.acs2 as acs +from lcs.agents.acs import PMark +from lcs import Perception +from .Condition import Condition +from . import Configuration, Effect + + +class Classifier(acs.Classifier): + __slots__ = ['condition', 'action', 'effect', 'mark', 'q', 'r', + 'ir', 'num', 'exp', 'talp', 'tga', 'tav', 'ee', 'cfg'] + + def __init__(self, + condition: Union[Condition, str, None] = None, + action: Optional[int] = None, + effect: Union[Effect, str, None] = None, + quality: float = None, + reward: float = 0.5, + immediate_reward: float = 0.0, + numerosity: int = 1, + experience: int = 1, + talp=None, + tga: int = 0, + tav: float = 0.0, + cfg: Optional[Configuration] = None) -> None: + + if cfg is None: + raise TypeError("Configuration should be passed to Classifier") + + self.cfg = cfg + + def build_perception_string(cls, initial, + length=self.cfg.classifier_length): + if initial: + return cls(initial) + + return cls.empty(length=length) + + self.condition = build_perception_string(Condition, condition) + self.action = action + self.effect = build_perception_string(Effect, effect) + + self.mark = PMark(cfg=self.cfg) + if quality is None: + self.q = self.cfg.initial_q + else: + self.q = quality + + self.r = reward + self.ir = immediate_reward + self.num = numerosity + self.exp = experience + self.talp = talp + self.tga = tga + self.tav = tav + self.ee = False + + def specialize(self, + p0: Perception, + p1: Perception, + leave_specialized=False) -> None: + for idx in range(len(p1)): + if leave_specialized: + if self.effect[idx] != self.cfg.classifier_wildcard: + continue + + if p0[idx] != p1[idx]: + if self.effect[idx] == self.cfg.classifier_wildcard: + if p1[idx] != '0.0': + self.effect[idx] = '1.0' + else: + self.effect[idx] = '0.0' + + if p0[idx] != '0.0': + self.condition[idx] = '1.0' + else: + self.condition[idx] = '0.0' diff --git a/lcs/agents/facs2/ClassifiersList.py b/lcs/agents/facs2/ClassifiersList.py new file mode 100644 index 00000000..215c85c0 --- /dev/null +++ b/lcs/agents/facs2/ClassifiersList.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +import random +from typing import Optional + +import lcs.agents.acs2 as acs +import lcs.agents.facs2.alp as alp_acs2 +import lcs.strategies.anticipatory_learning_process as alp +import lcs.strategies.genetic_algorithms as ga +import lcs.strategies.reinforcement_learning as rl +from lcs import Perception +from lcs.agents.facs2 import Configuration +from . import Classifier + + +class ClassifiersList(acs.ClassifiersList): + + def __init__(self, *args, oktypes=(Classifier,)) -> None: + super().__init__(*args, oktypes=oktypes) + + @staticmethod + def apply_alp(population: ClassifiersList, + match_set: ClassifiersList, + action_set: ClassifiersList, + p0: Perception, + action: int, + p1: Perception, + time: int, + theta_exp: int, + cfg: Configuration): + + new_list = ClassifiersList() + new_cl: Optional[Classifier] = None + was_expected_case = False + delete_count = 0 + + for cl in action_set: + cl.increase_experience() + cl.update_application_average(time) + + if cl.does_anticipate_correctly(p0, p1): + new_cl = alp_acs2.expected_case(cl, p0, time) + was_expected_case = True + else: + new_cl = alp_acs2.unexpected_case(cl, p0, p1, time) + + if cl.is_inadequate(): + # Removes classifier from population, match set + # and current list + delete_count += 1 + lists = [x for x in [population, match_set, action_set] + if x] + for lst in lists: + lst.safe_remove(cl) + + if new_cl is not None: + new_cl.tga = time + alp.add_classifier(new_cl, action_set, new_list, theta_exp) + + # No classifier anticipated correctly - generate new one + if not was_expected_case: + new_cl = alp_acs2.cover(p0, action, p1, time, cfg) + alp.add_classifier(new_cl, action_set, new_list, theta_exp) + + # Merge classifiers from new_list into self and population + action_set.extend(new_list) + population.extend(new_list) + + if match_set is not None: + new_matching = [cl for cl in new_list if + cl.condition.does_match(p1)] + match_set.extend(new_matching) + + return 0 + + @staticmethod + def apply_ga(time: int, + population: ClassifiersList, + match_set: ClassifiersList, + action_set: ClassifiersList, + p: Perception, + theta_ga: int, + mu: float, + chi: float, + theta_as: int, + do_subsumption: bool, + theta_exp: int) -> None: + + if ga.should_apply(action_set, time, theta_ga): + ga.set_timestamps(action_set, time) + + # Select parents + parent1, parent2 = ga.roulette_wheel_selection( + action_set, lambda cl: pow(cl.q, 3) * cl.num) + + child1 = Classifier.copy_from(parent1, time) + child2 = Classifier.copy_from(parent2, time) + + # Execute mutation + ga.generalizing_mutation(child1, mu) + ga.generalizing_mutation(child2, mu) + + # Execute cross-over + if random.random() < chi: + if child1.effect == child2.effect: + ga.two_point_crossover(child1, child2) + + # Update quality and reward + child1.q = child2.q = float(sum([child1.q, child2.q]) / 2) + child2.r = child2.r = float(sum([child1.r, child2.r]) / 2) + + child1.q /= 2 + child2.q /= 2 + + # We are interested only in classifiers with specialized condition + unique_children = {cl for cl in [child1, child2] + if cl.condition.specificity > 0} + + ga.delete_classifiers( + population, match_set, action_set, + len(unique_children), theta_as) + + # check for subsumers / similar classifiers + for child in unique_children: + ga.add_classifier(child, p, + population, match_set, action_set, + do_subsumption, theta_exp) + + @staticmethod + def apply_reinforcement_learning(action_set: ClassifiersList, + reward: int, + p: float, + beta: float, + gamma: float) -> None: + + for cl in action_set: + rl.update_classifier(cl, reward, p, beta, gamma) diff --git a/lcs/agents/facs2/Condition.py b/lcs/agents/facs2/Condition.py new file mode 100644 index 00000000..d7c99383 --- /dev/null +++ b/lcs/agents/facs2/Condition.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +from lcs import Perception +import lcs.agents.acs as acs + + +class Condition(acs.Condition): + + def specialize_with_condition(self, other: Condition) -> None: + for idx, new_el in enumerate(other): + if new_el != self.WILDCARD: + if new_el != '0.0': + self[idx] = '1.0' + else: + self[idx] = '0.0' + + def does_match(self, p: Perception) -> bool: + """ + Check if condition match given observations + + Parameters + ---------- + p: Union[Perception, Condition] + perception or condition object + + Returns + ------- + bool + True if condition match given list, False otherwise + """ + j = 0 + for ci, oi in zip(self, p): + i = j + check = False + for obs in oi: + if str(ci) != '0.0' and str(obs) != '0.0': + check = True + i += 1 + break + i += 1 + if '1.0' not in ci[j:i]: + j += len(oi) + continue + j += len(oi) + if not check: + return False + + return True diff --git a/lcs/agents/facs2/Configuration.py b/lcs/agents/facs2/Configuration.py new file mode 100644 index 00000000..ebff3530 --- /dev/null +++ b/lcs/agents/facs2/Configuration.py @@ -0,0 +1,20 @@ +import lcs.agents.acs as acs + + +class Configuration(acs.Configuration): + def __init__(self, **kwargs): + + super(Configuration, self).__init__(**kwargs) + + self.gamma: float = kwargs.get('gamma', 0.95) + self.do_ga: bool = kwargs.get('do_ga', False) + self.initial_q: float = kwargs.get('initial_q', 0.5) + self.biased_exploration_prob: float = kwargs.get( + 'biased_exploration_prob', 0.05) + self.theta_ga: int = kwargs.get('theta_ga', 100) + self.mu: float = kwargs.get('mu', 0.3) + self.chi: float = kwargs.get('chi', 0.8) + + def __str__(self) -> str: + return str(vars(self)) + diff --git a/lcs/agents/facs2/Effect.py b/lcs/agents/facs2/Effect.py new file mode 100644 index 00000000..3f697c1b --- /dev/null +++ b/lcs/agents/facs2/Effect.py @@ -0,0 +1,20 @@ +from __future__ import annotations +import lcs.agents.acs2 as acs + + +class Effect(acs.Effect): + + def __init__(self, observation): + super().__init__(observation) + + @classmethod + def item_anticipate_change(cls, item, p0_item, p1_item) -> bool: + if item == cls.WILDCARD or item == '0.0': + if p0_item != p1_item: + return False + else: + if p0_item == p1_item: + return False + + return True + diff --git a/lcs/agents/facs2/__init__.py b/lcs/agents/facs2/__init__.py new file mode 100644 index 00000000..053338aa --- /dev/null +++ b/lcs/agents/facs2/__init__.py @@ -0,0 +1,6 @@ +from .Configuration import Configuration +from .Effect import Effect +from .Classifier import Classifier +from .ClassifiersList import ClassifiersList +from .fACS2 import fACS2 +from .Condition import Condition diff --git a/lcs/agents/facs2/adapters/CartPoleFuzzyEnvironmentAdapter.py b/lcs/agents/facs2/adapters/CartPoleFuzzyEnvironmentAdapter.py new file mode 100644 index 00000000..06216c28 --- /dev/null +++ b/lcs/agents/facs2/adapters/CartPoleFuzzyEnvironmentAdapter.py @@ -0,0 +1,126 @@ +from lcs.agents.facs2.adapters.FuzzyEnvironmentAdapter import \ + FuzzyEnvironmentAdapter +import numpy as np + + +class CartPoleFuzzyEnvironmentAdapter(FuzzyEnvironmentAdapter): + _cart_position_min = -4.8 + _cart_position_max = 4.8 + _cart_velocity_min = -np.inf + _cart_velocity_max = np.inf + _pole_angle_min = -0.418 + _pole_angle_max = 0.418 + _pole_angular_velocity_min = -np.inf + _pole_angular_velocity_max = np.inf + + condition_length = 18 + + def __init__(self, env): + super().__init__(env) + self._position_functions = [ + self._generate_left_linear_function, + self._generate_triangular_function, + self._generate_triangular_function, + self._generate_triangular_function, + self._generate_right_linear_function + ] + + self._velocity_functions = [ + self._generate_left_linear_function, + self._generate_triangular_function, + self._generate_right_linear_function + ] + + self._pole_angle_functions = [ + self._generate_left_linear_function, + self._generate_triangular_function, + self._generate_triangular_function, + self._generate_triangular_function, + self._generate_triangular_function, + self._generate_triangular_function, + self._generate_right_linear_function + ] + + self._angular_velocity_functions = [ + self._generate_left_linear_function, + self._generate_triangular_function, + self._generate_right_linear_function + ] + + self._position_ranges = [ + (self._cart_position_min, -2.4), (self._cart_position_min, -2.4, 0), + (-2.4, 0, 2.4), (0, 2.4, self._cart_position_max), + (2.4, self._cart_position_max) + ] + + self._velocity_ranges = [ + (-1, 0), (-1, 0, 1), (0, 1) + ] + + self._pole_angle_ranges = [ + (-0.418, -0.279), (-0.418, -0.279, -0.139), (-0.279, -0.139, 0), + (-0.139, 0, 0.139), (0, 0.139, 0.279), + (0.139, 0.279, 0.418), (0.279, 0.418) + ] + + self._angular_velocity_ranges = [ + (-2, 0), (-2, 0, 2), (0, 2) + ] + + self._action_ranges = [ + (-1.0, 1.0), + (0.0, 2.0) + ] + + @classmethod + def to_genotype(cls, phenotype): + state = [] + for p in phenotype: + state.append(str(p)) + return tuple(state) + + def to_membership_function(self, obs): + cart_position = float(obs[0]) + cart_velocity = float(obs[1]) + pole_angle = float(obs[2]) + pole_angular_velocity = float(obs[3]) + membership_function_values = [[], [], [], []] + for pos_func, pos_range in zip(self._position_functions, + self._position_ranges): + membership_function_values[0].append(pos_func(cart_position, + pos_range)) + for vel_func, vel_range in zip(self._velocity_functions, + self._velocity_ranges): + membership_function_values[1].append(vel_func(cart_velocity, + vel_range)) + + for pole_angle_func, angle_range in zip(self._pole_angle_functions, + self._pole_angle_ranges): + membership_function_values[2].append(pole_angle_func(pole_angle, + angle_range)) + + for angular_vel_func, pole_angular_velocity_range in zip( + self._angular_velocity_functions, self._angular_velocity_ranges): + membership_function_values[3].append(angular_vel_func( + pole_angular_velocity, pole_angular_velocity_range)) + + return tuple(membership_function_values) + + def calculate_final_actions_func_shape(self, values): + final_ranges = [] + for value, action_range in zip(values, self._action_ranges): + if not value: + final_ranges.append([action_range[0], 0]) + final_ranges.append([action_range[1], 0]) + continue + middle = (action_range[1] + action_range[0]) / 2. + if value == 1: + final_ranges.append([action_range[0], 0]) + final_ranges.append([middle, 1]) + final_ranges.append([action_range[1], 0]) + diff = middle - action_range[0] + final_ranges.append([action_range[0], 0]) + final_ranges.append([middle - (value * diff), value]) + final_ranges.append([middle + (value * diff), value]) + final_ranges.append([action_range[1], 0]) + return final_ranges diff --git a/lcs/agents/facs2/adapters/FuzzyEnvironmentAdapter.py b/lcs/agents/facs2/adapters/FuzzyEnvironmentAdapter.py new file mode 100644 index 00000000..77101f3f --- /dev/null +++ b/lcs/agents/facs2/adapters/FuzzyEnvironmentAdapter.py @@ -0,0 +1,198 @@ +import gym +import numpy as np + + +class FuzzyEnvironmentAdapter(gym.ObservationWrapper): + + def __init__(self, env): + super().__init__(env) + + def observation(self, observation): + return observation + + @classmethod + def _generate_triangular_function(cls, + x: float, + abc: [float]) -> float: + """ + Generate traingular membership function and + calcaulate value for given variable. + + Parameters + ---------- + x: float + given position + abc: [float] + values to describe shape of triangle + + Returns + ------- + float + value of given point + """ + assert len(abc) == 3 + a, b, c = np.r_[abc] + assert a <= b <= c + + if x < a or x > c: + return 0.0 + + if x > b: + y = (c - x) / (c - b) + elif x < b: + y = (x - a) / (b - a) + else: + y = 1.0 + + return y + + def change_state_type(self, raw_state): + """ + Change 2D list of memberships value to 1D + + Parameters + ---------- + raw_state + current state of environment + + Returns + ------- + [float] + 1D list of memberships values + """ + final_state = [] + state = self.to_membership_function(raw_state) + for obs in state: + for o in obs: + final_state.append(str(o)) + return final_state + + @classmethod + def _generate_left_linear_function(cls, + x: float, + ab: [float]) -> float: + """ + Generate left linear function to calculate + membership value of given x + + Parameters + ---------- + x: float + given position + ab: [float] + values to describe shape of function + + Returns + ------- + float + value of given point + """ + + assert len(ab) == 2 + a, b = np.r_[ab] + assert a <= b + + if x <= a: + y = 1.0 + elif a < x < b: + y = (b - x) / (b - a) + else: + y = 0.0 + + return y + + @classmethod + def _generate_right_linear_function(cls, + x: float, + ab: [float]) -> float: + """ + Generate right linear function to calculate + membership value of given x + + Parameters + ---------- + x: float + given position + ab: [float] + values to describe shape of function + + Returns + ------- + float + value of given point + """ + assert len(ab) == 2 + a, b = np.r_[ab] + assert a <= b + + if x >= b: + y = 1.0 + elif a < x < b: + y = (b - x) / (b - a) + else: + y = 0.0 + + return y + + def calculate_centroid(self, ranges): + """ + Calculate centroid coordinates for final action function + + Parameters + ---------- + ranges + list of points describing shape of action func + + Returns + ------- + float, float + Coordinates x and y of calculated centroid + """ + a = cx = cy = 0 + + for i, (x, y) in enumerate(ranges): + if i == len(ranges) - 1: + break + next_xy = ranges[i + 1] + cx += (x + next_xy[0]) * (x * next_xy[1] - next_xy[0] * y) + cy += (y + next_xy[1]) * (x * next_xy[1] - next_xy[0] * y) + a += (x * next_xy[1] - next_xy[0] * y) + a /= 2 + cx /= 6 * a + cy /= 6 * a + return cx, cy + + def calculate_final_actions_func_shape(self, values): + """ + Calculate final shape of action function + + Parameters + ---------- + values + calculated max value for each possible action + from all classifiers + + Returns + ------- + [float, float] + coordinates for every point describing + function shape + + """ + raise NotImplementedError() + + def to_membership_function(self, obs): + """ + Change given observation to membership values + + Parameters + ---------- + obs + observations from environment + + Returns + ------- + [float] + list of membership values + """ + raise NotImplementedError() diff --git a/lcs/agents/facs2/adapters/Mazev2FuzzyEnvironmentAdapter.py b/lcs/agents/facs2/adapters/Mazev2FuzzyEnvironmentAdapter.py new file mode 100644 index 00000000..58e620fe --- /dev/null +++ b/lcs/agents/facs2/adapters/Mazev2FuzzyEnvironmentAdapter.py @@ -0,0 +1,61 @@ +from .FuzzyEnvironmentAdapter import FuzzyEnvironmentAdapter + + +class Mazev2FuzzyEnvironmentAdapter(FuzzyEnvironmentAdapter): + _path = 0 + _wall = 1 + _reward = 9 + condition_length = 12 + + def __init__(self, env): + super().__init__(env) + self._functions = [ + self._generate_left_linear_function, + self._generate_triangular_function, + self._generate_right_linear_function + ] + self._ranges = [ + (0, 1.5), (0.5, 2.0, 2.5), (9.0, 10.0) + ] + + self._action_ranges = [ + (-0.5, 1), (0.5, 2), (1.5, 3), + (2.5, 4), (3.5, 5), (4.5, 6), + (5.5, 7), (6.5, 8) + ] + + @classmethod + def to_genotype(cls, phenotype): + state = [] + for p in phenotype: + state.append(str(p)) + return tuple(state) + + def to_membership_function(self, obs): + obs = list(map(float, obs)) + memberships_values = [[] for _ in range(4)] + for idx, _ in enumerate(obs[::2]): + o = obs[idx * 2] + obs[idx * 2 + 1] + for func, rang in zip(self._functions, self._ranges): + memberships_values[idx].append(func(o, rang)) + return tuple(memberships_values) + + def calculate_final_actions_func_shape(self, values): + final_ranges = [] + for value, action_range in zip(values, self._action_ranges): + if not value: + final_ranges.append([action_range[0], 0]) + final_ranges.append([action_range[1], 0]) + continue + middle = (action_range[1] + action_range[0]) / 2. + if value == 1: + final_ranges.append([action_range[0], 0]) + final_ranges.append([middle, 1]) + final_ranges.append([action_range[1], 0]) + diff = middle - action_range[0] + final_ranges.append([action_range[0], 0]) + final_ranges.append([middle - (value * diff), value]) + final_ranges.append([middle + (value * diff), value]) + final_ranges.append([action_range[1], 0]) + return final_ranges + diff --git a/lcs/agents/facs2/adapters/MountainCarFuzzyEnvironmentAdapter.py b/lcs/agents/facs2/adapters/MountainCarFuzzyEnvironmentAdapter.py new file mode 100644 index 00000000..d3c3582c --- /dev/null +++ b/lcs/agents/facs2/adapters/MountainCarFuzzyEnvironmentAdapter.py @@ -0,0 +1,81 @@ +from lcs.agents.facs2.adapters.FuzzyEnvironmentAdapter import \ + FuzzyEnvironmentAdapter + + +class MountainCarFuzzyEnvironmentAdapter(FuzzyEnvironmentAdapter): + _position_min = -1.2 + _position_max = 0.6 + _velocity_min = -0.07 + _velocity_max = 0.07 + condition_length = 9 + + def __init__(self, env): + super().__init__(env) + self._position_functions = [ + self._generate_left_linear_function, + self._generate_triangular_function, + self._generate_triangular_function, + self._generate_triangular_function, + self._generate_right_linear_function] + + self._velocity_functions = [ + self._generate_left_linear_function, + self._generate_triangular_function, + self._generate_triangular_function, + self._generate_right_linear_function] + + self._position_ranges = [ + (self._position_min, -1), (-1.1, -0.8, -0.5), (-0.6, -0.3, 0), + (-0.1, 0.2, 0.5), (0.4, self._position_max) + ] + self._velocity_ranges = [ + (self._velocity_min, -0.04), (-0.05, -0.02, 0.01), + (-0.01, 0.02, 0.05), (0.04, self._velocity_max) + ] + self._action_ranges = [ + (-1.0, 1.0), + (0.0, 2.0), + (1.0, 3.0) + ] + + @classmethod + def to_genotype(cls, phenotype): + state = [] + for p in phenotype: + state.append(str(p)) + return tuple(state) + + def to_membership_function(self, obs): + position = float(obs[0]) + velocity = float(obs[1]) + membership_function_values = [[], []] + for pos_func, pos_range in zip(self._position_functions, + self._position_ranges): + membership_function_values[0].append(pos_func(position, + pos_range)) + + for vel_func, vel_range in zip(self._velocity_functions, + self._velocity_ranges): + membership_function_values[1].append(vel_func(velocity, + vel_range)) + + return tuple(membership_function_values) + + def calculate_final_actions_func_shape(self, values): + final_ranges = [] + for value, action_range in zip(values, self._action_ranges): + if not value: + final_ranges.append([action_range[0], 0]) + final_ranges.append([action_range[1], 0]) + continue + middle = (action_range[1] + action_range[0]) / 2. + if value == 1: + final_ranges.append([action_range[0], 0]) + final_ranges.append([middle, 1]) + final_ranges.append([action_range[1], 0]) + diff = middle - action_range[0] + final_ranges.append([action_range[0], 0]) + final_ranges.append([middle - (value * diff), value]) + final_ranges.append([middle + (value * diff), value]) + final_ranges.append([action_range[1], 0]) + return final_ranges diff --git a/lcs/agents/facs2/adapters/WoodsFuzzyEnvironmentAdapter.py b/lcs/agents/facs2/adapters/WoodsFuzzyEnvironmentAdapter.py new file mode 100644 index 00000000..3681e666 --- /dev/null +++ b/lcs/agents/facs2/adapters/WoodsFuzzyEnvironmentAdapter.py @@ -0,0 +1,65 @@ +from .FuzzyEnvironmentAdapter import FuzzyEnvironmentAdapter + + +class WoodsFuzzyEnvironmentAdapter(FuzzyEnvironmentAdapter): + _path = 0 + _wall = 1 + _reward = 9 + condition_length = 12 + + def __init__(self, env): + super().__init__(env) + self._functions = [ + self._generate_left_linear_function, + self._generate_triangular_function, + self._generate_right_linear_function + ] + self._ranges = [ + (0, 1.5), (0.5, 2.0, 2.5), (9.0, 10.0) + ] + + self._action_ranges = [ + (-0.5, 0.5), (0.5, 1.5), (1.5, 2.5), + (2.5, 3.5), (3.5, 4.5), (4.5, 5.5), + (5.5, 6.5), (6.5, 7.5) + ] + + @classmethod + def to_genotype(cls, phenotype): + state = [] + for p in phenotype: + if p == 'O': + state.append('1.0') + elif p == '.': + state.append('0.0') + else: + state.append('9.0') + return tuple(state) + + def to_membership_function(self, obs): + obs = list(map(float, obs)) + memberships_values = [[] for _ in range(4)] + for idx, _ in enumerate(obs[::2]): + o = obs[idx * 2] + obs[idx * 2 + 1] + for func, rang in zip(self._functions, self._ranges): + memberships_values[idx].append(func(o, rang)) + return tuple(memberships_values) + + def calculate_final_actions_func_shape(self, values): + final_ranges = [] + for value, action_range in zip(values, self._action_ranges): + if not value: + final_ranges.append([action_range[0], 0]) + final_ranges.append([action_range[1], 0]) + continue + middle = (action_range[1] + action_range[0]) / 2. + if value == 1: + final_ranges.append([action_range[0], 0]) + final_ranges.append([middle, 1]) + final_ranges.append([action_range[1], 0]) + diff = middle - action_range[0] + final_ranges.append([action_range[0], 0]) + final_ranges.append([middle - (value * diff), value]) + final_ranges.append([middle + (value * diff), value]) + final_ranges.append([action_range[1], 0]) + return final_ranges diff --git a/lcs/agents/facs2/adapters/__init__.py b/lcs/agents/facs2/adapters/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lcs/agents/facs2/alp.py b/lcs/agents/facs2/alp.py new file mode 100644 index 00000000..41c642b2 --- /dev/null +++ b/lcs/agents/facs2/alp.py @@ -0,0 +1,81 @@ +from random import random +from typing import Optional + +from lcs import Perception +from lcs.agents.facs2 import Classifier, Configuration + + +def cover(p0: Perception, + action: int, + p1: Perception, + time: int, + cfg: Configuration) -> Classifier: + + new_cl = Classifier(action=action, experience=0, reward=0, cfg=cfg) + new_cl.tga = time + new_cl.talp = time + + new_cl.specialize(p0, p1) + + return new_cl + + +def expected_case(cl: Classifier, + p0: Perception, + time: int) -> Optional[Classifier]: + + diff = cl.mark.get_differences(p0) + + if diff.specificity == 0: + cl.increase_quality() + return None + + no_spec = len(cl.specified_unchanging_attributes) + no_spec_new = diff.specificity + child = cl.copy_from(cl, time) + + if no_spec >= cl.cfg.u_max: + while no_spec >= cl.cfg.u_max: + res = cl.generalize_unchanging_condition_attribute() + assert res is True + no_spec -= 1 + + while no_spec + no_spec_new > cl.cfg.u_max: + if random() < 0.5: + diff.generalize_specific_attribute_randomly() + no_spec_new -= 1 + else: + if cl.generalize_unchanging_condition_attribute(): + no_spec -= 1 + else: + while no_spec + no_spec_new > cl.cfg.u_max: + diff.generalize_specific_attribute_randomly() + no_spec_new -= 1 + + child.condition.specialize_with_condition(diff) + + if child.q < 0.5: + child.q = 0.5 + + return child + + +def unexpected_case(cl: Classifier, + p0: Perception, + p1: Perception, + time: int) -> Optional[Classifier]: + + cl.decrease_quality() + cl.set_mark(p0) + + if not cl.effect.is_specializable(p0, p1): + return None + + child = cl.copy_from(cl, time) + + child.specialize(p0, p1, leave_specialized=True) + + if child.q < 0.5: + child.q = 0.5 + + return child diff --git a/lcs/agents/facs2/fACS2.py b/lcs/agents/facs2/fACS2.py new file mode 100644 index 00000000..014aa427 --- /dev/null +++ b/lcs/agents/facs2/fACS2.py @@ -0,0 +1,257 @@ +import logging +import random +from lcs import Perception +from lcs.agents.Agent import TrialMetrics +from . import ClassifiersList, Configuration, Classifier +from ...agents import Agent + +logger = logging.getLogger(__name__) + + +class fACS2(Agent): + + def __init__(self, + cfg: Configuration, + population: ClassifiersList = None) -> None: + self.cfg = cfg + self.population = population or ClassifiersList() + + def get_population(self): + return self.population + + def get_cfg(self): + return self.cfg + + def _run_trial_explore(self, env, time, current_trial=None): + + logger.debug("** Running trial explore ** ") + # Initial conditions + steps = 0 + state = env.reset() + action = env.action_space.sample() + last_reward = 0 + prev_state = Perception.empty() + action_set = ClassifiersList() + done = False + + while not done: + state = Perception(state) + state_to_calculate = Perception(env.change_state_type(state)) + membership_func_values = env.to_membership_function(state) + match_set = self.population.form_match_set(membership_func_values) + + if steps > 0: + # Apply learning in the last action set + ClassifiersList.apply_alp( + self.population, + match_set, + action_set, + prev_state, + action, + state_to_calculate, + time + steps, + self.cfg.theta_exp, + self.cfg) + ClassifiersList.apply_reinforcement_learning( + action_set, + last_reward, + match_set.get_maximum_fitness(), + self.cfg.beta, + self.cfg.gamma + ) + if self.cfg.do_ga: + ClassifiersList.apply_ga( + time + steps, + self.population, + match_set, + action_set, + state_to_calculate, + self.cfg.theta_ga, + self.cfg.mu, + self.cfg.chi, + self.cfg.theta_as, + self.cfg.do_subsumption, + self.cfg.theta_exp) + + if random.random() > self.cfg.epsilon: + action = self.select_action(env, match_set, membership_func_values) + else: + action = random.choice(range(self.cfg.number_of_possible_actions)) + + action_set = match_set.form_action_set(action) + + prev_state = Perception(state_to_calculate) + raw_state, last_reward, done, _ = env.step(action) + + state = Perception(raw_state) + state_to_calculate = Perception(env.change_state_type(state)) + + if done: + ClassifiersList.apply_alp( + self.population, + ClassifiersList(), + action_set, + prev_state, + action, + state_to_calculate, + time + steps, + self.cfg.theta_exp, + self.cfg) + ClassifiersList.apply_reinforcement_learning( + action_set, + last_reward, + 0, + self.cfg.beta, + self.cfg.gamma) + if self.cfg.do_ga: + ClassifiersList.apply_ga( + time + steps, + self.population, + ClassifiersList(), + action_set, + state_to_calculate, + self.cfg.theta_ga, + self.cfg.mu, + self.cfg.chi, + self.cfg.theta_as, + self.cfg.do_subsumption, + self.cfg.theta_exp) + + steps += 1 + + return TrialMetrics(steps, last_reward) + + def _run_trial_exploit(self, env, time=None, current_trial=None) \ + -> TrialMetrics: + + logger.debug("** Running trial exploit **") + # Initial conditions + steps = 0 + + state = Perception(env.reset()) + + last_reward = 0 + action_set = ClassifiersList() + done = False + + while not done: + env.render() + state = Perception(state) + membership_func_values = env.to_membership_function(state) + + match_set = self.population.form_match_set(membership_func_values) + + if steps > 0: + ClassifiersList.apply_reinforcement_learning( + action_set, + last_reward, + match_set.get_maximum_fitness(), + self.cfg.beta, + self.cfg.gamma) + + # Here when exploiting always choose best action + action = self.select_action(env, match_set, membership_func_values) + action_set = match_set.form_action_set(action) + + raw_state, last_reward, done, _ = env.step(action) + + state = Perception(raw_state) + + if done: + ClassifiersList.apply_reinforcement_learning( + action_set, last_reward, 0, self.cfg.beta, self.cfg.gamma) + + steps += 1 + + return TrialMetrics(steps, last_reward) + + def calculate_min_value_for_each_clasifier(self, + match_set, + memberships_values): + """ + Select min value from all memberships function values + where classifier had active rule. + + Parameters + ---------- + match_set + match set of classifiers + memberships_values + membership values for current environment state + + Returns + ------- + [[float, int]] + min values of membership for each classifier + and action of that classifier + + """ + + if not match_set: + return + elif type(match_set) == Classifier: + conditions = [match_set.condition] + actions = [match_set.action] + else: + conditions = [clf.condition for clf in match_set] + actions = [clf.action for clf in match_set] + values = [] + for conds, a in zip(conditions, actions): + conditions_values = [] + for input_values in memberships_values: + for c, m in zip(conds, input_values): + if c == self.cfg.classifier_wildcard: + continue + conditions_values.append(float(c) * m) + if True in conditions_values: + values.append((min(c for c in conditions_values if c > 0), a)) + return values + + def select_max_action_value(self, output_values): + """ + Select max membership value for each possible action + + Parameters + ---------- + output_values + min values of each classifier and proposed action + + Returns + ------- + possible_actions + all posible actions with max membership value + of it + + """ + possible_actions = [0 for _ in range( + self.cfg.number_of_possible_actions)] + for value, action_index in output_values: + if possible_actions[action_index] < value: + possible_actions[action_index] = value + return possible_actions + + def select_action(self, env, match_set, memberships_values): + """ + Select final action from match_set + + Parameters + ---------- + env + match_set + memberships_values + + Returns + ------- + action + selected action + + """ + calculate = self.calculate_min_value_for_each_clasifier + min_values = calculate(match_set, memberships_values) + if not min_values: + return random.choice(range(self.cfg.number_of_possible_actions)) + actions = self.select_max_action_value(min_values) + actions_func_shape = env. \ + calculate_final_actions_func_shape(actions) + return round(env.calculate_centroid( + actions_func_shape)[0])