Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APIGOV-27681 - event generator updates for processing #864

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ main-resources.json
sub-resources.json
vendor
sample.sequence
**/data/cache
220 changes: 144 additions & 76 deletions pkg/transaction/eventgenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package transaction

import (
"encoding/json"
"errors"
"strings"
"time"

Expand All @@ -16,7 +17,7 @@ import (
"github.com/Axway/agent-sdk/pkg/transaction/metric"
"github.com/Axway/agent-sdk/pkg/transaction/models"
transutil "github.com/Axway/agent-sdk/pkg/transaction/util"
"github.com/Axway/agent-sdk/pkg/util/errors"
sdkErrors "github.com/Axway/agent-sdk/pkg/util/errors"
hc "github.com/Axway/agent-sdk/pkg/util/healthcheck"
"github.com/Axway/agent-sdk/pkg/util/log"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -28,6 +29,7 @@ type EventGenerator interface {
CreateEvent(logEvent LogEvent, eventTime time.Time, metaData common.MapStr, fields common.MapStr, privateData interface{}) (event beat.Event, err error) // DEPRECATED
CreateEvents(summaryEvent LogEvent, detailEvents []LogEvent, eventTime time.Time, metaData common.MapStr, fields common.MapStr, privateData interface{}) (events []beat.Event, err error)
SetUseTrafficForAggregation(useTrafficForAggregation bool)
CreateFromEventReport(eventReport EventReport) (events []beat.Event, err error)
}

// Generator - Create the events to be published to Condor
Expand Down Expand Up @@ -59,19 +61,127 @@ func (e *Generator) SetUseTrafficForAggregation(useTrafficForAggregation bool) {

// CreateEvent - Creates a new event to be sent to Amplify Observability, expects sampling is handled by agent
func (e *Generator) CreateEvent(logEvent LogEvent, eventTime time.Time, metaData common.MapStr, eventFields common.MapStr, privateData interface{}) (beat.Event, error) {
// if CreateEvent is being used, sampling will not work, so all events need to be sent
if metaData == nil {
metaData = common.MapStr{}
builder := NewEventReportBuilder().
SetEventTime(eventTime).
SetMetadata(metaData).
SetFields(eventFields).
SetPrivateData(privateData).
SetForceSample()

// set the proper log event type
if logEvent.TransactionSummary != nil {
builder = builder.SetSummaryEvent(logEvent)
} else {
builder = builder.SetDetailEvents([]LogEvent{logEvent}).SetSkipMetricTracking()
}
metaData.Put(sampling.SampleKey, true)

if logEvent.TransactionSummary != nil {
report, err := builder.Build()
if err != nil {
return beat.Event{}, err
}

events, err := e.CreateFromEventReport(report)
if err != nil {
return beat.Event{}, err
}
if len(events) == 0 {
return beat.Event{}, errors.New("an event was not created")
}
if len(events) > 1 {
return events[0], errors.New("unexpectedly, more than one event was created, only returning the first")
}

// will only ever have 1 beat event returned
return events[0], nil
}

// CreateEvents - Creates new events to be sent to Amplify Observability
func (e *Generator) CreateEvents(summaryEvent LogEvent, detailEvents []LogEvent, eventTime time.Time, metaData common.MapStr, eventFields common.MapStr, privateData interface{}) ([]beat.Event, error) {
report, err := NewEventReportBuilder().
SetSummaryEvent(summaryEvent).
SetDetailEvents(detailEvents).
SetEventTime(eventTime).
SetMetadata(metaData).
SetFields(eventFields).
SetPrivateData(privateData).
Build()
if err != nil {
return []beat.Event{}, err
}

return e.CreateFromEventReport(report)
}

// CreateEvent - Creates a new event to be sent to Amplify Observability, expects sampling is handled by agent
func (e *Generator) CreateFromEventReport(eventReport EventReport) ([]beat.Event, error) {
events := make([]beat.Event, 0)
logger := e.logger

// add logging fields from summary event
if eventReport.GetSummaryEvent() != (LogEvent{}) {
logger.WithField("transactionID", eventReport.GetSummaryEvent().TransactionID)
} else if len(eventReport.GetDetailEvents()) > 0 {
logger.WithField("transactionID", eventReport.GetDetailEvents()[0].TransactionID)
}

bytes := e.getBytesSent(eventReport.GetDetailEvents())
if eventReport.ShouldTrackMetrics() && eventReport.GetSummaryEvent() != (LogEvent{}) {
e.trackMetrics(eventReport.GetSummaryEvent(), int64(bytes))
}

if eventReport.ShouldOnlyTrackMetrics() {
logger.Trace("not generating events, only tracking for metrics")
return events, nil
}

// See if the uri is in the api exceptions list
if e.isInAPIExceptionsList(eventReport.GetDetailEvents()) {
logger.Debug("found api path in traceability api exceptions list, ignore transaction event")
return events, nil
}

//set up the sampling metadata if set to force it
metadata := eventReport.GetMetadata()
if eventReport.ShouldForceSample() {
logger.Trace("sampling event")
metadata = SetSampleInMetadata(metadata)
}

//if no summary is sent then prepare the array of TransactionEvents for publishing
if eventReport.GetSummaryEvent() == (LogEvent{}) {
return e.handleTransactionEvents(eventReport.GetDetailEvents(), eventReport.GetEventTime(), metadata, eventReport.GetFields(), eventReport.GetPrivateData())
}

// Check to see if marketplace provisioning/subs is enabled
newSummaryEvent, err := e.processTxnSummary(eventReport.GetSummaryEvent())
if err != nil {
logger.WithError(err).Trace("handling summary event")
return events, err
}

if eventReport.ShouldHandleSampling() && !eventReport.ShouldForceSample() {
shouldSample, err := sampling.ShouldSampleTransaction(e.createSamplingTransactionDetails(eventReport.GetSummaryEvent()))
if err != nil || !shouldSample {
// do not need to create the event structure if it will not be sampled
return events, err
}
metadata = SetSampleInMetadata(metadata)
}

newEvent, err := e.createEvent(newSummaryEvent, eventReport.GetEventTime(), metadata, eventReport.GetFields(), eventReport.GetPrivateData())
if err != nil {
logger.WithError(err).Trace("handling summary event")
return events, err
}

e.processTxnSummary(logEvent)
e.trackMetrics(logEvent, 0)
detailEvents, err := e.handleTransactionEvents(eventReport.GetDetailEvents(), eventReport.GetEventTime(), metadata, eventReport.GetFields(), eventReport.GetPrivateData())
if err != nil {
logger.WithError(err).Trace("handling detail event(s)")
return events, err
}

return e.createEvent(logEvent, eventTime, metaData, eventFields, privateData)
events = append(events, newEvent)
return append(events, detailEvents...), nil
}

func (e *Generator) trackMetrics(summaryEvent LogEvent, bytes int64) {
Expand All @@ -93,7 +203,7 @@ func (e *Generator) trackMetrics(summaryEvent LogEvent, bytes int64) {
appDetails := models.AppDetails{}
if summaryEvent.TransactionSummary.Application != nil {
appDetails.Name = summaryEvent.TransactionSummary.Application.Name
appDetails.ID = strings.TrimLeft(summaryEvent.TransactionSummary.Application.ID, SummaryEventApplicationIDPrefix)
appDetails.ID = strings.ReplaceAll(summaryEvent.TransactionSummary.Application.ID, SummaryEventApplicationIDPrefix, "")
}

collector := metric.GetMetricCollector()
Expand All @@ -118,11 +228,8 @@ func (e *Generator) createEvent(logEvent LogEvent, eventTime time.Time, metaData
return event, err
}

eventData := eventFields
// No need to get the other field data if not being sampled
if sampled, found := metaData[sampling.SampleKey]; found && sampled.(bool) {
eventData, err = e.createEventData(serializedLogEvent, eventFields)
}
eventData, err := e.createEventData(serializedLogEvent, eventFields)
if err != nil {
return event, err
}
Expand All @@ -135,61 +242,14 @@ func (e *Generator) createEvent(logEvent LogEvent, eventTime time.Time, metaData
}, nil
}

// CreateEvents - Creates new events to be sent to Amplify Observability
func (e *Generator) CreateEvents(summaryEvent LogEvent, detailEvents []LogEvent, eventTime time.Time, metaData common.MapStr, eventFields common.MapStr, privateData interface{}) ([]beat.Event, error) {
events := make([]beat.Event, 0)

// See if the uri is in the api exceptions list
if e.isInAPIExceptionsList(detailEvents) {
e.logger.Debug("Found api path in traceability api exceptions list. Ignore transaction event")
return events, nil
func (e *Generator) getBytesSent(detailEvents []LogEvent) int {
if len(detailEvents) == 0 || detailEvents[0].TransactionEvent == nil || detailEvents[0].TransactionEvent.Protocol == nil {
return 0
}

// Check to see if marketplace provisioning/subs is enabled
err := e.processTxnSummary(summaryEvent)
if err != nil {
return nil, err
}

//if no summary is sent then prepare the array of TransactionEvents for publishing
if summaryEvent == (LogEvent{}) {
return e.handleTransactionEvents(detailEvents, eventTime, metaData, eventFields, privateData)
}

shouldSample, err := sampling.ShouldSampleTransaction(e.createSamplingTransactionDetails(summaryEvent))
if err != nil {
return events, err
if httpEvent, ok := detailEvents[0].TransactionEvent.Protocol.(*Protocol); ok {
return httpEvent.BytesSent
}
if shouldSample {
if metaData == nil {
metaData = common.MapStr{}
}
metaData.Put(sampling.SampleKey, true)
}

newEvent, err := e.createEvent(summaryEvent, eventTime, metaData, eventFields, privateData)

if err != nil {
return events, err
}

events = append(events, newEvent)
for _, event := range detailEvents {
newEvent, err := e.createEvent(event, eventTime, metaData, eventFields, privateData)
if err == nil {
events = append(events, newEvent)
}
}

bytes := 0
if len(detailEvents) > 0 {
if httpEvent, ok := detailEvents[0].TransactionEvent.Protocol.(*Protocol); ok {
bytes = httpEvent.BytesSent
}
}
e.trackMetrics(summaryEvent, int64(bytes))

return events, nil
return 0
}

func (e *Generator) handleTransactionEvents(detailEvents []LogEvent, eventTime time.Time, metaData common.MapStr, eventFields common.MapStr, privateData interface{}) ([]beat.Event, error) {
Expand All @@ -200,32 +260,32 @@ func (e *Generator) handleTransactionEvents(detailEvents []LogEvent, eventTime t
}
metaData.Put(sampling.SampleKey, true)
newEvent, err := e.createEvent(event, eventTime, metaData, eventFields, privateData)
if err == nil {
events = append(events, newEvent)
if err != nil {
return nil, err
}
events = append(events, newEvent)
}

return events, nil

}

func (e *Generator) processTxnSummary(summaryEvent LogEvent) error {
func (e *Generator) processTxnSummary(summaryEvent LogEvent) (LogEvent, error) {
// only process if there is a central client and marketplace subs are enabled
if agent.GetCentralClient() == nil {
return nil
return summaryEvent, nil
}
if summaryEvent.TransactionSummary != nil {
txnSummary := e.updateTxnSummaryByAccessRequest(summaryEvent)
if txnSummary != nil {
jsonData, err := json.Marshal(&txnSummary)
if err != nil {
return err
return summaryEvent, err
}
e.logger.Trace(string(jsonData))
summaryEvent.TransactionSummary = txnSummary
}
}
return nil
return summaryEvent, nil
}

// updateTxnSummaryByAccessRequest - get the consumer information to add to transaction event. If we don't have any
Expand Down Expand Up @@ -329,7 +389,7 @@ func (e *Generator) createSamplingTransactionDetails(summaryEvent LogEvent) samp
func (e *Generator) isInAPIExceptionsList(logEvents []LogEvent) bool {

// Sanity check.
if len(logEvents) == 0 {
if len(logEvents) == 0 || logEvents[0].TransactionEvent == nil || logEvents[0].TransactionEvent.Protocol == nil {
return false
}

Expand Down Expand Up @@ -362,7 +422,7 @@ func (e *Generator) healthcheck(name string) *hc.Status {
if err != nil {
status = &hc.Status{
Result: hc.FAIL,
Details: errors.Wrap(apic.ErrAuthenticationCall, err.Error()).Error(),
Details: sdkErrors.Wrap(apic.ErrAuthenticationCall, err.Error()).Error(),
}
}

Expand Down Expand Up @@ -403,6 +463,14 @@ func (e *Generator) createEventFields() (fields map[string]string, err error) {
return
}

func SetSampleInMetadata(metadata common.MapStr) common.MapStr {
if metadata == nil {
metadata = common.MapStr{}
}
metadata.Put(sampling.SampleKey, true)
return metadata
}

// updateWithProviderDetails -
func updateWithProviderDetails(accessRequest *management.AccessRequest, managedApp *v1.ResourceInstance, summaryEvent *Summary, log log.FieldLogger) *Summary {

Expand Down
Loading
Loading