From dbf3297c7f92a979564580818952e19fca915346 Mon Sep 17 00:00:00 2001 From: ShiKhu Date: Thu, 20 May 2021 23:56:18 +0200 Subject: [PATCH] fix (integrations): ignore token expired + some logs --- backend/services/db/messages.go | 1 + backend/services/ender/builder/builder.go | 25 +++++++++++-------- .../integrations/integration/sentry.go | 2 +- backend/services/integrations/main.go | 14 ++++++----- 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/backend/services/db/messages.go b/backend/services/db/messages.go index 6aa4ac076..511165c5f 100644 --- a/backend/services/db/messages.go +++ b/backend/services/db/messages.go @@ -16,6 +16,7 @@ func insertMessage(sessionID uint64, msg Message) error { // Web case *SessionStart: + log.Printf("Session Start: %v", sessionID) return pg.InsertWebSessionStart(sessionID, m) case *SessionEnd: return pg.InsertWebSessionEnd(sessionID, m) diff --git a/backend/services/ender/builder/builder.go b/backend/services/ender/builder/builder.go index cccf96bcf..246b2f7e0 100644 --- a/backend/services/ender/builder/builder.go +++ b/backend/services/ender/builder/builder.go @@ -82,6 +82,9 @@ func (b *builder) iterateReadyMessage(iter func(msg Message)) { } func (b *builder) buildSessionEnd() { + if b.timestamp == 0 { + return + } sessionEnd := &SessionEnd{ Timestamp: b.timestamp, // + delay? } @@ -106,16 +109,25 @@ func (b *builder) buildInputEvent() { func (b *builder) handleMessage(message Message, messageID uint64) { timestamp := uint64(message.Meta().Timestamp) - if b.timestamp <= timestamp { + if b.timestamp <= timestamp { // unnecessary. TODO: test and remove b.timestamp = timestamp } - // Start from the first timestamp. + // Before the first timestamp. switch msg := message.(type) { case *SessionStart, *Metadata, *UserID, *UserAnonymousID: b.appendReadyMessage(msg) + case *RawErrorEvent: + b.appendReadyMessage(&ErrorEvent{ + MessageID: messageID, + Timestamp: msg.Timestamp, + Source: msg.Source, + Name: msg.Name, + Message: msg.Message, + Payload: msg.Payload, + }) } if b.timestamp == 0 { return @@ -177,15 +189,6 @@ func (b *builder) handleMessage(message Message, messageID uint64) { Timestamp: b.timestamp, }) } - case *RawErrorEvent: - b.appendReadyMessage(&ErrorEvent{ - MessageID: messageID, - Timestamp: msg.Timestamp, - Source: msg.Source, - Name: msg.Name, - Message: msg.Message, - Payload: msg.Payload, - }) case *JSException: b.appendReadyMessage(&ErrorEvent{ MessageID: messageID, diff --git a/backend/services/integrations/integration/sentry.go b/backend/services/integrations/integration/sentry.go index 39443f51a..0330430c3 100644 --- a/backend/services/integrations/integration/sentry.go +++ b/backend/services/integrations/integration/sentry.go @@ -111,7 +111,7 @@ PageLoop: c.errChan <- err continue } - if sessionID == 0 { // We can't felter them on request + if token == "" && sessionID == 0 { // We can't felter them on request continue } diff --git a/backend/services/integrations/main.go b/backend/services/integrations/main.go index 68b4ec5aa..e1ea58ebd 100644 --- a/backend/services/integrations/main.go +++ b/backend/services/integrations/main.go @@ -19,7 +19,7 @@ import ( func main() { log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile) - TOPIC_TRIGGER := env.String("TOPIC_TRIGGER") + TOPIC_RAW := env.String("TOPIC_RAW") POSTGRES_STRING := env.String("POSTGRES_STRING") pg := postgres.NewConn(POSTGRES_STRING) @@ -43,6 +43,7 @@ func main() { }) producer:= queue.NewProducer() + defer producer.Close(15000) listener, err := postgres.NewIntegrationsListener(POSTGRES_STRING) if err != nil { @@ -72,13 +73,14 @@ func main() { sessionID := event.SessionID if sessionID == 0 { sessData, err := tokenizer.Parse(event.Token) - if err != nil { + if err != nil && err != token.EXPIRED { log.Printf("Error on token parsing: %v; Token: %v", err, event.Token) continue } sessionID = sessData.ID } - producer.Produce(TOPIC_TRIGGER, sessionID, messages.Encode(event.RawErrorEvent)) + // TODO: send to ready-events topic. Otherwise it have to go through the events worker. + producer.Produce(TOPIC_RAW, sessionID, messages.Encode(event.RawErrorEvent)) case err := <-manager.Errors: log.Printf("Integration error: %v\n", err) case i := <-manager.RequestDataUpdates: @@ -86,10 +88,10 @@ func main() { if err := pg.UpdateIntegrationRequestData(&i); err != nil { log.Printf("Postgres Update request_data error: %v\n", err) } - //case err := <-listener.Errors: - //log.Printf("Postgres listen error: %v\n", err) + case err := <-listener.Errors: + log.Printf("Postgres listen error: %v\n", err) case iPointer := <-listener.Integrations: - // log.Printf("Integration update: %v\n", *iPointer) + log.Printf("Integration update: %v\n", *iPointer) err := manager.Update(iPointer) if err != nil { log.Printf("Integration parse error: %v | Integration: %v\n", err, *iPointer)