Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bundle/manifests/oran-o2ims.clusterserviceversion.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ rules:
- /o2ims-infrastructureCluster/v1/alarmDictionaries
- /o2ims-infrastructureCluster/v1/nodeClusterTypes
- /o2ims-infrastructureCluster/v1/nodeClusters
- /o2ims-infrastructureInventory/v1/resourceTypes
verbs:
- get
- list
Expand All @@ -30,6 +31,8 @@ rules:
- /o2ims-infrastructureCluster/v1/alarmDictionaries/*
- /o2ims-infrastructureCluster/v1/nodeClusterTypes/*
- /o2ims-infrastructureCluster/v1/nodeClusters/*
- /o2ims-infrastructureInventory/v1/internal/resources/*
- /o2ims-infrastructureInventory/v1/resourceTypes/*
verbs:
- get
- apiGroups:
Expand Down
28 changes: 28 additions & 0 deletions internal/controllers/inventory_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ import (
//+kubebuilder:rbac:urls="/o2ims-infrastructureCluster/v1/nodeClusterTypes/*",verbs=get
//+kubebuilder:rbac:urls="/o2ims-infrastructureCluster/v1/nodeClusters/*",verbs=get
//+kubebuilder:rbac:urls="/o2ims-infrastructureCluster/v1/alarmDictionaries/*",verbs=get
//+kubebuilder:rbac:urls="/o2ims-infrastructureInventory/v1/internal/resources/*",verbs=get
//+kubebuilder:rbac:urls="/o2ims-infrastructureInventory/v1/resourceTypes",verbs=get;list
//+kubebuilder:rbac:urls="/o2ims-infrastructureInventory/v1/resourceTypes/*",verbs=get
//+kubebuilder:rbac:urls="/hardware-manager/inventory/*",verbs=get;list
//+kubebuilder:rbac:groups="batch",resources=cronjobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=route.openshift.io,resources=routes,verbs=get;list;watch
Expand Down Expand Up @@ -1380,6 +1383,31 @@ func (t *reconcilerTask) createAlarmServerClusterRole(ctx context.Context) error
"get",
},
},
{
NonResourceURLs: []string{
"/o2ims-infrastructureInventory/v1/resourceTypes",
},
Verbs: []string{
"get",
"list",
},
},
{
NonResourceURLs: []string{
"/o2ims-infrastructureInventory/v1/resourceTypes/*",
},
Verbs: []string{
"get",
},
},
{
NonResourceURLs: []string{
"/o2ims-infrastructureInventory/v1/internal/resources/*",
},
Verbs: []string{
"get",
},
},
},
}

Expand Down
2 changes: 1 addition & 1 deletion internal/service/alarms/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (a *AlarmsServer) UpdateAlarmServiceConfiguration(ctx context.Context, requ
func (a *AlarmsServer) AmNotification(ctx context.Context, request api.AmNotificationRequestObject) (api.AmNotificationResponseObject, error) {
// TODO: AM auto retries if it receives 5xx error code. That means any error, even if permanent (e.g postgres syntax), will be processed the same way. Once we have a better retry mechanism for pg, update all 5xx to 4xx as needed.

if err := alertmanager.HandleAlerts(ctx, a.Infrastructure.Clients, a.AlarmsRepository, &request.Body.Alerts, alertmanager.Webhook); err != nil {
if err := alertmanager.HandleAlerts(ctx, a.Infrastructure.ClusterServer, a.Infrastructure.ResourceServer, a.AlarmsRepository, &request.Body.Alerts, alertmanager.Webhook); err != nil {
msg := "failed to handle alerts"
slog.Error(msg, "error", err)
return nil, fmt.Errorf("%s: %w", msg, err)
Expand Down
2 changes: 1 addition & 1 deletion internal/service/alarms/internal/alertmanager/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (c *AMClient) SyncAlerts(ctx context.Context) error {
// Covert to Webhook payload to allow us to maintain a single point of entry in the DB
webhookPayload := ConvertAPIAlertsToWebhook(&apiPayload)
if len(webhookPayload) != 0 {
if err := HandleAlerts(ctx, c.infrastructure.Clients, c.alarmsRepository, &webhookPayload, API); err != nil {
if err := HandleAlerts(ctx, c.infrastructure.ClusterServer, c.infrastructure.ResourceServer, c.alarmsRepository, &webhookPayload, API); err != nil {
return fmt.Errorf("failed to handle alerts during full sync: %w", err)
}
}
Expand Down
5 changes: 2 additions & 3 deletions internal/service/alarms/internal/alertmanager/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ var _ = Describe("Alertmanager API Client", func() {
Build()

infra = &infrastructure.Infrastructure{
Clients: []infrastructure.Client{
&infrastructure.ClusterServer{},
},
ClusterServer: &infrastructure.ClusterServer{},
ResourceServer: &infrastructure.ResourceServer{},
}

amClient = alertmanager.NewAlertmanagerClient(fakeClient, mockRepo, infra)
Expand Down
44 changes: 41 additions & 3 deletions internal/service/alarms/internal/alertmanager/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import (
api "github.com/openshift-kni/oran-o2ims/internal/service/alarms/api/generated"
"github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/db/models"
"github.com/openshift-kni/oran-o2ims/internal/service/alarms/internal/infrastructure"
"github.com/openshift-kni/oran-o2ims/internal/service/common"
)

// ConvertAmToAlarmEventRecordModels get alarmEventRecords based on the alertmanager notification and AlarmDefinition
func ConvertAmToAlarmEventRecordModels(ctx context.Context, alerts *[]api.Alert, infrastructureClient infrastructure.Client) []models.AlarmEventRecord {
func ConvertAmToAlarmEventRecordModels(ctx context.Context, alerts *[]api.Alert, clusterServer, resourceServer infrastructure.Client) []models.AlarmEventRecord {
records := make([]models.AlarmEventRecord, 0, len(*alerts))
for _, alert := range *alerts {
record := models.AlarmEventRecord{}
Expand Down Expand Up @@ -69,8 +70,21 @@ func ConvertAmToAlarmEventRecordModels(ctx context.Context, alerts *[]api.Alert,
// Update Extensions with things we didn't really process
record.Extensions = getExtensions(alert)

// for caas alerts object is the cluster ID
record.ObjectID = getClusterID(labels)
// Determine alert type (CaaS vs hardware) and select appropriate infrastructure client
// Hardware alerts have both type=hardware AND component=ironic labels
isHardware := isHardwareAlert(labels)
var infrastructureClient infrastructure.Client
if isHardware {
// For hardware alerts, object is the resource ID from instance_uuid
record.ObjectID = getResourceID(labels)
record.AlarmSource = models.AlarmSourceHardware
infrastructureClient = resourceServer
} else {
// For CaaS alerts, object is the cluster ID from managed_cluster
record.ObjectID = getClusterID(labels)
record.AlarmSource = models.AlarmSourceCaaS
infrastructureClient = clusterServer
}

// derive ObjectTypeID from ObjectID
if record.ObjectID != nil {
Expand Down Expand Up @@ -118,6 +132,30 @@ func getClusterID(labels map[string]string) *uuid.UUID {
return &id
}

// isHardwareAlert checks if an alert is a hardware alert by checking for type=hardware AND component=ironic labels
func isHardwareAlert(labels map[string]string) bool {
alertType, hasType := labels[common.HardwareAlertTypeLabel]
component, hasComponent := labels[common.HardwareAlertComponentLabel]
return hasType && alertType == common.HardwareAlertTypeValue && hasComponent && component == common.HardwareAlertComponentValue
}

// getResourceID extracts resource ID from instance_uuid label for hardware alerts
func getResourceID(labels map[string]string) *uuid.UUID {
val, ok := labels["instance_uuid"]
if !ok {
slog.Warn("Could not find instance_uuid for hardware alert", "labels", labels)
return nil
}

id, err := uuid.Parse(val)
if err != nil {
slog.Warn("Could not convert instance_uuid string to uuid", "labels", labels, "err", err.Error())
return nil
}

return &id
}

// getAlertName extract name from alert label
func getAlertName(labels map[string]string) string {
val, ok := labels["alertname"]
Expand Down
14 changes: 7 additions & 7 deletions internal/service/alarms/internal/alertmanager/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ var _ = Describe("Alertmanager Functions", func() {
},
}

records := alertmanager.ConvertAmToAlarmEventRecordModels(ctx, &alerts, mockInfraClient)
records := alertmanager.ConvertAmToAlarmEventRecordModels(ctx, &alerts, mockInfraClient, mockInfraClient)

// Assert
Expect(records).To(HaveLen(1))
Expand Down Expand Up @@ -131,7 +131,7 @@ var _ = Describe("Alertmanager Functions", func() {
GetAlarmDefinitionID(gomock.Any(), objectTypeIDUUID, "TestAlert", "critical").
Return(alarmDefUUID, nil)

records := alertmanager.ConvertAmToAlarmEventRecordModels(ctx, &alerts, mockInfraClient)
records := alertmanager.ConvertAmToAlarmEventRecordModels(ctx, &alerts, mockInfraClient, mockInfraClient)

// Assert
Expect(records).To(HaveLen(1))
Expand Down Expand Up @@ -188,9 +188,9 @@ var _ = Describe("Alertmanager Functions", func() {
},
}

Expect(alertmanager.ConvertAmToAlarmEventRecordModels(ctx, &alerts1, mockInfraClient)).To(BeEmpty())
Expect(alertmanager.ConvertAmToAlarmEventRecordModels(ctx, &alerts2, mockInfraClient)).To(BeEmpty())
Expect(alertmanager.ConvertAmToAlarmEventRecordModels(ctx, &alerts3, mockInfraClient)).To(BeEmpty())
Expect(alertmanager.ConvertAmToAlarmEventRecordModels(ctx, &alerts1, mockInfraClient, mockInfraClient)).To(BeEmpty())
Expect(alertmanager.ConvertAmToAlarmEventRecordModels(ctx, &alerts2, mockInfraClient, mockInfraClient)).To(BeEmpty())
Expect(alertmanager.ConvertAmToAlarmEventRecordModels(ctx, &alerts3, mockInfraClient, mockInfraClient)).To(BeEmpty())
})

It("should handle infrastructure client errors gracefully", func() {
Expand Down Expand Up @@ -220,7 +220,7 @@ var _ = Describe("Alertmanager Functions", func() {
},
}

records := alertmanager.ConvertAmToAlarmEventRecordModels(ctx, &alerts, mockInfraClient)
records := alertmanager.ConvertAmToAlarmEventRecordModels(ctx, &alerts, mockInfraClient, mockInfraClient)

// Assert - should still create record but without ObjectTypeID
Expect(records).To(HaveLen(1))
Expand Down Expand Up @@ -265,7 +265,7 @@ var _ = Describe("Alertmanager Functions", func() {
},
}

records := alertmanager.ConvertAmToAlarmEventRecordModels(ctx, &alerts, mockInfraClient)
records := alertmanager.ConvertAmToAlarmEventRecordModels(ctx, &alerts, mockInfraClient, mockInfraClient)

// Assert
Expect(records).To(HaveLen(1))
Expand Down
19 changes: 2 additions & 17 deletions internal/service/alarms/internal/alertmanager/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (

// HandleAlerts can be called when a payload from Webhook or API `/alerts` is received
// Webhook is our primary and API as our backup and sync mechanism
func HandleAlerts(ctx context.Context, clients []infrastructure.Client, repository repo.AlarmRepositoryInterface, alerts *[]api.Alert, source SourceType) error {
func HandleAlerts(ctx context.Context, clusterServer, resourceServer infrastructure.Client, repository repo.AlarmRepositoryInterface, alerts *[]api.Alert, source SourceType) error {
// Handle nil alerts
if alerts == nil {
return nil
Expand All @@ -37,23 +37,8 @@ func HandleAlerts(ctx context.Context, clients []infrastructure.Client, reposito
return nil
}

// Get cached cluster server data
var (
clusterServer infrastructure.Client
found bool
)
for i := range clients {
if clients[i].Name() == infrastructure.Name {
clusterServer = clients[i]
found = true
}
}
if !found {
return fmt.Errorf("no cluster server found with name %q", infrastructure.Name)
}

// Combine possible definitions with events
aerModels := ConvertAmToAlarmEventRecordModels(ctx, alerts, clusterServer)
aerModels := ConvertAmToAlarmEventRecordModels(ctx, alerts, clusterServer, resourceServer)

// Insert and update AlarmEventRecord and optionally resolve stale
if err := repository.WithTransaction(ctx, func(tx pgx.Tx) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ import (
"github.com/openshift-kni/oran-o2ims/internal/service/alarms/api/generated"
)

const (
// AlarmSourceCaaS represents alarms from Container-as-a-Service (cluster alerts)
AlarmSourceCaaS = "caas"
// AlarmSourceHardware represents alarms from hardware resources
AlarmSourceHardware = "hardware"
)

// AlarmEventRecord represents a record in the alarm_event_record table.
type AlarmEventRecord struct {
AlarmEventRecordID uuid.UUID `db:"alarm_event_record_id" json:"alarm_event_record_id"`
Expand Down
18 changes: 9 additions & 9 deletions internal/service/alarms/internal/db/repo/alarms_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (ar *AlarmsRepository) UpsertAlarmEventCaaSRecord(ctx context.Context, tx p
"AlarmAcknowledged", "PerceivedSeverity", "Extensions",
"ObjectID", "ObjectTypeID", "AlarmStatus",
"Fingerprint", "AlarmDefinitionID", "ProbableCauseID",
"GenerationID",
"GenerationID", "AlarmSource",
})

// Set values
Expand All @@ -152,7 +152,7 @@ func (ar *AlarmsRepository) UpsertAlarmEventCaaSRecord(ctx context.Context, tx p
record.AlarmAcknowledged, record.PerceivedSeverity, record.Extensions,
record.ObjectID, record.ObjectTypeID, record.AlarmStatus,
record.Fingerprint, record.AlarmDefinitionID, record.ProbableCauseID,
generationID,
generationID, record.AlarmSource,
)))
}
query.Apply(values...)
Expand All @@ -169,7 +169,7 @@ func (ar *AlarmsRepository) UpsertAlarmEventCaaSRecord(ctx context.Context, tx p
im.SetExcluded(dbTags["AlarmDefinitionID"]),
im.SetExcluded(dbTags["ProbableCauseID"]),
im.SetExcluded(dbTags["GenerationID"]),
im.Where(psql.Quote(m.TableName(), dbTags["AlarmSource"]).EQ(psql.Arg("alertmanager"))),
im.SetExcluded(dbTags["AlarmSource"]),
))

sql, params, err := query.Build(ctx)
Expand Down Expand Up @@ -209,12 +209,12 @@ func (ar *AlarmsRepository) ResolveStaleAlarmEventCaaSRecord(ctx context.Context

query := psql.Update(
um.Table(tableName),
um.SetCol(alarmStatus).ToArg(api.Resolved), // Set to resolved
um.SetCol(perceivedSeverity).ToArg(api.CLEARED), // Set corresponding perceivedSeverity
um.Set(psql.Raw(updateClearedTimeCase, TimeNow())), // Set a resolved time if not there already
um.Where(psql.Quote(generationIDCol).LT(psql.Arg(generationID))), // An alert is stale if its GenID is less than current
um.Where(psql.Quote(alarmSource).EQ(psql.Arg("alertmanager"))), // This is only applicable for alertmanager rows
um.Where(psql.Quote(alarmStatus).NE(psql.Arg(api.Resolved))), // If already resolved no need to process that row
um.SetCol(alarmStatus).ToArg(api.Resolved), // Set to resolved
um.SetCol(perceivedSeverity).ToArg(api.CLEARED), // Set corresponding perceivedSeverity
um.Set(psql.Raw(updateClearedTimeCase, TimeNow())), // Set a resolved time if not there already
um.Where(psql.Quote(generationIDCol).LT(psql.Arg(generationID))), // An alert is stale if its GenID is less than current
um.Where(psql.Quote(alarmSource).In(psql.Arg(models.AlarmSourceCaaS, models.AlarmSourceHardware))), // Support both CaaS and hardware alerts
um.Where(psql.Quote(alarmStatus).NE(psql.Arg(api.Resolved))), // If already resolved no need to process that row
um.Returning(psql.Quote(alarmEventRecordID)),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ var _ = Describe("AlarmsRepository", func() {
PerceivedSeverity: api.WARNING,
ObjectID: &id,
Fingerprint: "9a9e2d82a78cf2b9",
AlarmSource: models.AlarmSourceCaaS,
},
}
// Expect transaction begin
Expand All @@ -284,7 +285,7 @@ var _ = Describe("AlarmsRepository", func() {
records[0].PerceivedSeverity, records[0].Extensions,
records[0].ObjectID, records[0].ObjectTypeID,
records[0].AlarmStatus, records[0].Fingerprint,
records[0].AlarmDefinitionID, records[0].ProbableCauseID, int64(0), "alertmanager",
records[0].AlarmDefinitionID, records[0].ProbableCauseID, int64(0), records[0].AlarmSource,
).
WillReturnResult(pgxmock.NewResult("INSERT", 1))

Expand Down Expand Up @@ -312,12 +313,14 @@ var _ = Describe("AlarmsRepository", func() {
PerceivedSeverity: api.WARNING,
ObjectID: &id1,
Fingerprint: "9a9e2d82a78cf2b7",
AlarmSource: models.AlarmSourceCaaS,
},
{
AlarmRaisedTime: now,
PerceivedSeverity: api.CRITICAL,
ObjectID: &id2,
Fingerprint: "9a9e2d82a78cf2b9",
AlarmSource: models.AlarmSourceCaaS,
},
}

Expand All @@ -331,14 +334,13 @@ var _ = Describe("AlarmsRepository", func() {
records[0].PerceivedSeverity, records[0].Extensions,
records[0].ObjectID, records[0].ObjectTypeID,
records[0].AlarmStatus, records[0].Fingerprint,
records[0].AlarmDefinitionID, records[0].ProbableCauseID, int64(0),
records[0].AlarmDefinitionID, records[0].ProbableCauseID, int64(0), records[0].AlarmSource,
records[1].AlarmRaisedTime, records[1].AlarmClearedTime,
records[1].AlarmAcknowledgedTime, records[1].AlarmAcknowledged,
records[1].PerceivedSeverity, records[1].Extensions,
records[1].ObjectID, records[1].ObjectTypeID,
records[1].AlarmStatus, records[1].Fingerprint,
records[1].AlarmDefinitionID, records[1].ProbableCauseID, int64(0),
"alertmanager",
records[1].AlarmDefinitionID, records[1].ProbableCauseID, int64(0), records[1].AlarmSource,
).
WillReturnResult(pgxmock.NewResult("INSERT", 2))

Expand Down Expand Up @@ -427,7 +429,7 @@ var _ = Describe("AlarmsRepository", func() {
mock.ExpectBegin()

mock.ExpectQuery(fmt.Sprintf("UPDATE %s SET", models.AlarmEventRecord{}.TableName())).
WithArgs(api.Resolved, api.CLEARED, clearTime, int64(2), "alertmanager", api.Resolved).
WithArgs(api.Resolved, api.CLEARED, clearTime, int64(2), models.AlarmSourceCaaS, models.AlarmSourceHardware, api.Resolved).
WillReturnRows(pgxmock.NewRows([]string{"alarm_event_record_id"}).AddRow(uuid.New()))

// Expect transaction commit
Expand Down
Loading