fix (integrations): ignore token expired + some logs
This commit is contained in:
parent
6d259279a0
commit
dbf3297c7f
4 changed files with 24 additions and 18 deletions
|
|
@ -16,6 +16,7 @@ func insertMessage(sessionID uint64, msg Message) error {
|
|||
|
||||
// Web
|
||||
case *SessionStart:
|
||||
log.Printf("Session Start: %v", sessionID)
|
||||
return pg.InsertWebSessionStart(sessionID, m)
|
||||
case *SessionEnd:
|
||||
return pg.InsertWebSessionEnd(sessionID, m)
|
||||
|
|
|
|||
|
|
@ -82,6 +82,9 @@ func (b *builder) iterateReadyMessage(iter func(msg Message)) {
|
|||
}
|
||||
|
||||
func (b *builder) buildSessionEnd() {
|
||||
if b.timestamp == 0 {
|
||||
return
|
||||
}
|
||||
sessionEnd := &SessionEnd{
|
||||
Timestamp: b.timestamp, // + delay?
|
||||
}
|
||||
|
|
@ -106,16 +109,25 @@ func (b *builder) buildInputEvent() {
|
|||
|
||||
func (b *builder) handleMessage(message Message, messageID uint64) {
|
||||
timestamp := uint64(message.Meta().Timestamp)
|
||||
if b.timestamp <= timestamp {
|
||||
if b.timestamp <= timestamp { // unnecessary. TODO: test and remove
|
||||
b.timestamp = timestamp
|
||||
}
|
||||
// Start from the first timestamp.
|
||||
// Before the first timestamp.
|
||||
switch msg := message.(type) {
|
||||
case *SessionStart,
|
||||
*Metadata,
|
||||
*UserID,
|
||||
*UserAnonymousID:
|
||||
b.appendReadyMessage(msg)
|
||||
case *RawErrorEvent:
|
||||
b.appendReadyMessage(&ErrorEvent{
|
||||
MessageID: messageID,
|
||||
Timestamp: msg.Timestamp,
|
||||
Source: msg.Source,
|
||||
Name: msg.Name,
|
||||
Message: msg.Message,
|
||||
Payload: msg.Payload,
|
||||
})
|
||||
}
|
||||
if b.timestamp == 0 {
|
||||
return
|
||||
|
|
@ -177,15 +189,6 @@ func (b *builder) handleMessage(message Message, messageID uint64) {
|
|||
Timestamp: b.timestamp,
|
||||
})
|
||||
}
|
||||
case *RawErrorEvent:
|
||||
b.appendReadyMessage(&ErrorEvent{
|
||||
MessageID: messageID,
|
||||
Timestamp: msg.Timestamp,
|
||||
Source: msg.Source,
|
||||
Name: msg.Name,
|
||||
Message: msg.Message,
|
||||
Payload: msg.Payload,
|
||||
})
|
||||
case *JSException:
|
||||
b.appendReadyMessage(&ErrorEvent{
|
||||
MessageID: messageID,
|
||||
|
|
|
|||
|
|
@ -111,7 +111,7 @@ PageLoop:
|
|||
c.errChan <- err
|
||||
continue
|
||||
}
|
||||
if sessionID == 0 { // We can't felter them on request
|
||||
if token == "" && sessionID == 0 { // We can't felter them on request
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ import (
|
|||
|
||||
func main() {
|
||||
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
|
||||
TOPIC_TRIGGER := env.String("TOPIC_TRIGGER")
|
||||
TOPIC_RAW := env.String("TOPIC_RAW")
|
||||
POSTGRES_STRING := env.String("POSTGRES_STRING")
|
||||
|
||||
pg := postgres.NewConn(POSTGRES_STRING)
|
||||
|
|
@ -43,6 +43,7 @@ func main() {
|
|||
})
|
||||
|
||||
producer:= queue.NewProducer()
|
||||
defer producer.Close(15000)
|
||||
|
||||
listener, err := postgres.NewIntegrationsListener(POSTGRES_STRING)
|
||||
if err != nil {
|
||||
|
|
@ -72,13 +73,14 @@ func main() {
|
|||
sessionID := event.SessionID
|
||||
if sessionID == 0 {
|
||||
sessData, err := tokenizer.Parse(event.Token)
|
||||
if err != nil {
|
||||
if err != nil && err != token.EXPIRED {
|
||||
log.Printf("Error on token parsing: %v; Token: %v", err, event.Token)
|
||||
continue
|
||||
}
|
||||
sessionID = sessData.ID
|
||||
}
|
||||
producer.Produce(TOPIC_TRIGGER, sessionID, messages.Encode(event.RawErrorEvent))
|
||||
// TODO: send to ready-events topic. Otherwise it have to go through the events worker.
|
||||
producer.Produce(TOPIC_RAW, sessionID, messages.Encode(event.RawErrorEvent))
|
||||
case err := <-manager.Errors:
|
||||
log.Printf("Integration error: %v\n", err)
|
||||
case i := <-manager.RequestDataUpdates:
|
||||
|
|
@ -86,10 +88,10 @@ func main() {
|
|||
if err := pg.UpdateIntegrationRequestData(&i); err != nil {
|
||||
log.Printf("Postgres Update request_data error: %v\n", err)
|
||||
}
|
||||
//case err := <-listener.Errors:
|
||||
//log.Printf("Postgres listen error: %v\n", err)
|
||||
case err := <-listener.Errors:
|
||||
log.Printf("Postgres listen error: %v\n", err)
|
||||
case iPointer := <-listener.Integrations:
|
||||
// log.Printf("Integration update: %v\n", *iPointer)
|
||||
log.Printf("Integration update: %v\n", *iPointer)
|
||||
err := manager.Update(iPointer)
|
||||
if err != nil {
|
||||
log.Printf("Integration parse error: %v | Integration: %v\n", err, *iPointer)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue