fix (integrations): ignore token expired + some logs

This commit is contained in:
ShiKhu 2021-05-20 23:56:18 +02:00
parent 4938fb262a
commit cc35c64047
4 changed files with 24 additions and 18 deletions

View file

@ -16,6 +16,7 @@ func insertMessage(sessionID uint64, msg Message) error {
// Web
case *SessionStart:
log.Printf("Session Start: %v", sessionID)
return pg.InsertWebSessionStart(sessionID, m)
case *SessionEnd:
return pg.InsertWebSessionEnd(sessionID, m)

View file

@ -82,6 +82,9 @@ func (b *builder) iterateReadyMessage(iter func(msg Message)) {
}
func (b *builder) buildSessionEnd() {
if b.timestamp == 0 {
return
}
sessionEnd := &SessionEnd{
Timestamp: b.timestamp, // + delay?
}
@ -106,16 +109,25 @@ func (b *builder) buildInputEvent() {
func (b *builder) handleMessage(message Message, messageID uint64) {
timestamp := uint64(message.Meta().Timestamp)
if b.timestamp <= timestamp {
if b.timestamp <= timestamp { // unnecessary. TODO: test and remove
b.timestamp = timestamp
}
// Start from the first timestamp.
// Before the first timestamp.
switch msg := message.(type) {
case *SessionStart,
*Metadata,
*UserID,
*UserAnonymousID:
b.appendReadyMessage(msg)
case *RawErrorEvent:
b.appendReadyMessage(&ErrorEvent{
MessageID: messageID,
Timestamp: msg.Timestamp,
Source: msg.Source,
Name: msg.Name,
Message: msg.Message,
Payload: msg.Payload,
})
}
if b.timestamp == 0 {
return
@ -177,15 +189,6 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
Timestamp: b.timestamp,
})
}
case *RawErrorEvent:
b.appendReadyMessage(&ErrorEvent{
MessageID: messageID,
Timestamp: msg.Timestamp,
Source: msg.Source,
Name: msg.Name,
Message: msg.Message,
Payload: msg.Payload,
})
case *JSException:
b.appendReadyMessage(&ErrorEvent{
MessageID: messageID,

View file

@ -111,7 +111,7 @@ PageLoop:
c.errChan <- err
continue
}
if sessionID == 0 { // We can't felter them on request
if token == "" && sessionID == 0 { // We can't felter them on request
continue
}

View file

@ -19,7 +19,7 @@ import (
func main() {
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
TOPIC_TRIGGER := env.String("TOPIC_TRIGGER")
TOPIC_RAW := env.String("TOPIC_RAW")
POSTGRES_STRING := env.String("POSTGRES_STRING")
pg := postgres.NewConn(POSTGRES_STRING)
@ -43,6 +43,7 @@ func main() {
})
producer:= queue.NewProducer()
defer producer.Close(15000)
listener, err := postgres.NewIntegrationsListener(POSTGRES_STRING)
if err != nil {
@ -72,13 +73,14 @@ func main() {
sessionID := event.SessionID
if sessionID == 0 {
sessData, err := tokenizer.Parse(event.Token)
if err != nil {
if err != nil && err != token.EXPIRED {
log.Printf("Error on token parsing: %v; Token: %v", err, event.Token)
continue
}
sessionID = sessData.ID
}
producer.Produce(TOPIC_TRIGGER, sessionID, messages.Encode(event.RawErrorEvent))
// TODO: send to ready-events topic. Otherwise it have to go through the events worker.
producer.Produce(TOPIC_RAW, sessionID, messages.Encode(event.RawErrorEvent))
case err := <-manager.Errors:
log.Printf("Integration error: %v\n", err)
case i := <-manager.RequestDataUpdates:
@ -86,10 +88,10 @@ func main() {
if err := pg.UpdateIntegrationRequestData(&i); err != nil {
log.Printf("Postgres Update request_data error: %v\n", err)
}
//case err := <-listener.Errors:
//log.Printf("Postgres listen error: %v\n", err)
case err := <-listener.Errors:
log.Printf("Postgres listen error: %v\n", err)
case iPointer := <-listener.Integrations:
// log.Printf("Integration update: %v\n", *iPointer)
log.Printf("Integration update: %v\n", *iPointer)
err := manager.Update(iPointer)
if err != nil {
log.Printf("Integration parse error: %v | Integration: %v\n", err, *iPointer)