Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions lcs/agents/facs2/Classifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from __future__ import annotations

from typing import Optional, Union

import lcs.agents.acs2 as acs
from lcs.agents.acs import PMark
from lcs import Perception
from .Condition import Condition
from . import Configuration, Effect


class Classifier(acs.Classifier):
__slots__ = ['condition', 'action', 'effect', 'mark', 'q', 'r',
'ir', 'num', 'exp', 'talp', 'tga', 'tav', 'ee', 'cfg']

def __init__(self,
condition: Union[Condition, str, None] = None,
action: Optional[int] = None,
effect: Union[Effect, str, None] = None,
quality: float = None,
reward: float = 0.5,
immediate_reward: float = 0.0,
numerosity: int = 1,
experience: int = 1,
talp=None,
tga: int = 0,
tav: float = 0.0,
cfg: Optional[Configuration] = None) -> None:

if cfg is None:
raise TypeError("Configuration should be passed to Classifier")

self.cfg = cfg

def build_perception_string(cls, initial,
length=self.cfg.classifier_length):
if initial:
return cls(initial)

return cls.empty(length=length)

self.condition = build_perception_string(Condition, condition)
self.action = action
self.effect = build_perception_string(Effect, effect)

self.mark = PMark(cfg=self.cfg)
if quality is None:
self.q = self.cfg.initial_q
else:
self.q = quality

self.r = reward
self.ir = immediate_reward
self.num = numerosity
self.exp = experience
self.talp = talp
self.tga = tga
self.tav = tav
self.ee = False

def specialize(self,
p0: Perception,
p1: Perception,
leave_specialized=False) -> None:
for idx in range(len(p1)):
if leave_specialized:
if self.effect[idx] != self.cfg.classifier_wildcard:
continue

if p0[idx] != p1[idx]:
if self.effect[idx] == self.cfg.classifier_wildcard:
if p1[idx] != '0.0':
self.effect[idx] = '1.0'
else:
self.effect[idx] = '0.0'

if p0[idx] != '0.0':
self.condition[idx] = '1.0'
else:
self.condition[idx] = '0.0'
137 changes: 137 additions & 0 deletions lcs/agents/facs2/ClassifiersList.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from __future__ import annotations

import random
from typing import Optional

import lcs.agents.acs2 as acs
import lcs.agents.facs2.alp as alp_acs2
import lcs.strategies.anticipatory_learning_process as alp
import lcs.strategies.genetic_algorithms as ga
import lcs.strategies.reinforcement_learning as rl
from lcs import Perception
from lcs.agents.facs2 import Configuration
from . import Classifier


class ClassifiersList(acs.ClassifiersList):

def __init__(self, *args, oktypes=(Classifier,)) -> None:
super().__init__(*args, oktypes=oktypes)

@staticmethod
def apply_alp(population: ClassifiersList,
match_set: ClassifiersList,
action_set: ClassifiersList,
p0: Perception,
action: int,
p1: Perception,
time: int,
theta_exp: int,
cfg: Configuration):

new_list = ClassifiersList()
new_cl: Optional[Classifier] = None
was_expected_case = False
delete_count = 0

for cl in action_set:
cl.increase_experience()
cl.update_application_average(time)

if cl.does_anticipate_correctly(p0, p1):
new_cl = alp_acs2.expected_case(cl, p0, time)
was_expected_case = True
else:
new_cl = alp_acs2.unexpected_case(cl, p0, p1, time)

if cl.is_inadequate():
# Removes classifier from population, match set
# and current list
delete_count += 1
lists = [x for x in [population, match_set, action_set]
if x]
for lst in lists:
lst.safe_remove(cl)

if new_cl is not None:
new_cl.tga = time
alp.add_classifier(new_cl, action_set, new_list, theta_exp)

# No classifier anticipated correctly - generate new one
if not was_expected_case:
new_cl = alp_acs2.cover(p0, action, p1, time, cfg)
alp.add_classifier(new_cl, action_set, new_list, theta_exp)

# Merge classifiers from new_list into self and population
action_set.extend(new_list)
population.extend(new_list)

if match_set is not None:
new_matching = [cl for cl in new_list if
cl.condition.does_match(p1)]
match_set.extend(new_matching)

return 0

@staticmethod
def apply_ga(time: int,
population: ClassifiersList,
match_set: ClassifiersList,
action_set: ClassifiersList,
p: Perception,
theta_ga: int,
mu: float,
chi: float,
theta_as: int,
do_subsumption: bool,
theta_exp: int) -> None:

if ga.should_apply(action_set, time, theta_ga):
ga.set_timestamps(action_set, time)

# Select parents
parent1, parent2 = ga.roulette_wheel_selection(
action_set, lambda cl: pow(cl.q, 3) * cl.num)

child1 = Classifier.copy_from(parent1, time)
child2 = Classifier.copy_from(parent2, time)

# Execute mutation
ga.generalizing_mutation(child1, mu)
ga.generalizing_mutation(child2, mu)

# Execute cross-over
if random.random() < chi:
if child1.effect == child2.effect:
ga.two_point_crossover(child1, child2)

# Update quality and reward
child1.q = child2.q = float(sum([child1.q, child2.q]) / 2)
child2.r = child2.r = float(sum([child1.r, child2.r]) / 2)

child1.q /= 2
child2.q /= 2

# We are interested only in classifiers with specialized condition
unique_children = {cl for cl in [child1, child2]
if cl.condition.specificity > 0}

ga.delete_classifiers(
population, match_set, action_set,
len(unique_children), theta_as)

# check for subsumers / similar classifiers
for child in unique_children:
ga.add_classifier(child, p,
population, match_set, action_set,
do_subsumption, theta_exp)

@staticmethod
def apply_reinforcement_learning(action_set: ClassifiersList,
reward: int,
p: float,
beta: float,
gamma: float) -> None:

for cl in action_set:
rl.update_classifier(cl, reward, p, beta, gamma)
48 changes: 48 additions & 0 deletions lcs/agents/facs2/Condition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from __future__ import annotations

from lcs import Perception
import lcs.agents.acs as acs


class Condition(acs.Condition):

def specialize_with_condition(self, other: Condition) -> None:
for idx, new_el in enumerate(other):
if new_el != self.WILDCARD:
if new_el != '0.0':
self[idx] = '1.0'
else:
self[idx] = '0.0'

def does_match(self, p: Perception) -> bool:
"""
Check if condition match given observations

Parameters
----------
p: Union[Perception, Condition]
perception or condition object

Returns
-------
bool
True if condition match given list, False otherwise
"""
j = 0
for ci, oi in zip(self, p):
i = j
check = False
for obs in oi:
if str(ci) != '0.0' and str(obs) != '0.0':
check = True
i += 1
break
i += 1
if '1.0' not in ci[j:i]:
j += len(oi)
continue
j += len(oi)
if not check:
return False

return True
20 changes: 20 additions & 0 deletions lcs/agents/facs2/Configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import lcs.agents.acs as acs


class Configuration(acs.Configuration):
def __init__(self, **kwargs):

super(Configuration, self).__init__(**kwargs)

self.gamma: float = kwargs.get('gamma', 0.95)
self.do_ga: bool = kwargs.get('do_ga', False)
self.initial_q: float = kwargs.get('initial_q', 0.5)
self.biased_exploration_prob: float = kwargs.get(
'biased_exploration_prob', 0.05)
self.theta_ga: int = kwargs.get('theta_ga', 100)
self.mu: float = kwargs.get('mu', 0.3)
self.chi: float = kwargs.get('chi', 0.8)

def __str__(self) -> str:
return str(vars(self))

20 changes: 20 additions & 0 deletions lcs/agents/facs2/Effect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from __future__ import annotations
import lcs.agents.acs2 as acs


class Effect(acs.Effect):

def __init__(self, observation):
super().__init__(observation)

@classmethod
def item_anticipate_change(cls, item, p0_item, p1_item) -> bool:
if item == cls.WILDCARD or item == '0.0':
if p0_item != p1_item:
return False
else:
if p0_item == p1_item:
return False

return True

6 changes: 6 additions & 0 deletions lcs/agents/facs2/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .Configuration import Configuration
from .Effect import Effect
from .Classifier import Classifier
from .ClassifiersList import ClassifiersList
from .fACS2 import fACS2
from .Condition import Condition
Loading