diff --git a/backend/pkg/token/tokenizer.go b/backend/pkg/token/tokenizer.go index 3f1069a63..f61e1f145 100644 --- a/backend/pkg/token/tokenizer.go +++ b/backend/pkg/token/tokenizer.go @@ -22,8 +22,8 @@ func NewTokenizer(secret string) *Tokenizer { } type TokenData struct { - ID uint64 - ExpTime int64 + ID uint64 + ExpTime int64 } func (tokenizer *Tokenizer) sign(body string) []byte { @@ -33,7 +33,7 @@ func (tokenizer *Tokenizer) sign(body string) []byte { } func (tokenizer *Tokenizer) Compose(d TokenData) string { - body := strconv.FormatUint(d.ID, 36) + + body := strconv.FormatUint(d.ID, 36) + "." + strconv.FormatInt(d.ExpTime, 36) sign := base58.Encode(tokenizer.sign(body)) return body + "." + sign @@ -58,8 +58,8 @@ func (tokenizer *Tokenizer) Parse(token string) (*TokenData, error) { if err != nil { return nil, err } - if expTime <= time.Now().UnixNano()/1e6 { - return &TokenData{id,expTime}, EXPIRED + if expTime <= time.Now().UnixMilli() { + return &TokenData{id, expTime}, EXPIRED } - return &TokenData{id,expTime}, nil + return &TokenData{id, expTime}, nil } diff --git a/backend/pkg/utime/utime.go b/backend/pkg/utime/utime.go deleted file mode 100644 index e3b5a2751..000000000 --- a/backend/pkg/utime/utime.go +++ /dev/null @@ -1,11 +0,0 @@ -package utime - -import "time" - -func CurrentTimestamp() int64 { - return time.Now().UnixNano() / 1e6 -} - -func ToMilliseconds(t time.Time) int64 { - return t.UnixNano() / 1e6 -} diff --git a/backend/services/ender/builder/builder.go b/backend/services/ender/builder/builder.go index e36bdcbe3..9c2067985 100644 --- a/backend/services/ender/builder/builder.go +++ b/backend/services/ender/builder/builder.go @@ -110,11 +110,11 @@ func (b *builder) buildInputEvent() { func (b *builder) handleMessage(message Message, messageID uint64) { timestamp := GetTimestamp(message) - if b.timestamp <= timestamp { // unnecessary? TODO: test and remove + if b.timestamp < timestamp { // unnecessary? TODO: test and remove b.timestamp = timestamp } - b.lastProcessedTimestamp = time.Now().UnixNano() / 1e6 + b.lastProcessedTimestamp = time.Now().UnixMilli() // Might happen before the first timestamp. switch msg := message.(type) { diff --git a/backend/services/ender/main.go b/backend/services/ender/main.go index f0f139dce..f2430f3a0 100644 --- a/backend/services/ender/main.go +++ b/backend/services/ender/main.go @@ -56,7 +56,7 @@ func main() { consumer.Close() os.Exit(0) case <-tick: - builderMap.IterateReadyMessages(time.Now().UnixNano()/1e6, func(sessionID uint64, readyMsg messages.Message) { + builderMap.IterateReadyMessages(time.Now().UnixMilli(), func(sessionID uint64, readyMsg messages.Message) { producer.Produce(TOPIC_TRIGGER, sessionID, messages.Encode(readyMsg)) }) // TODO: why exactly do we need Flush here and not in any other place? diff --git a/backend/services/http/handlers-ios.go b/backend/services/http/handlers-ios.go index affcab59d..f15a6af60 100644 --- a/backend/services/http/handlers-ios.go +++ b/backend/services/http/handlers-ios.go @@ -85,14 +85,14 @@ func startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) { responseWithError(w, http.StatusForbidden, errors.New("browser not recognized")) return } - sessionID, err := flaker.Compose(uint64(startTime.UnixNano() / 1e6)) + sessionID, err := flaker.Compose(uint64(startTime.UnixMilli())) if err != nil { responseWithError(w, http.StatusInternalServerError, err) return } // TODO: if EXPIRED => send message for two sessions association expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond) - tokenData = &token.TokenData{sessionID, expTime.UnixNano() / 1e6} + tokenData = &token.TokenData{sessionID, expTime.UnixMilli()} country := geoIP.ExtractISOCodeFromHTTPRequest(r) diff --git a/backend/services/http/handlers-web.go b/backend/services/http/handlers-web.go index 6020c3eb1..29dcf161d 100644 --- a/backend/services/http/handlers-web.go +++ b/backend/services/http/handlers-web.go @@ -76,14 +76,14 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { responseWithError(w, http.StatusForbidden, errors.New("browser not recognized")) return } - sessionID, err := flaker.Compose(uint64(startTime.UnixNano() / 1e6)) + sessionID, err := flaker.Compose(uint64(startTime.UnixMilli())) if err != nil { responseWithError(w, http.StatusInternalServerError, err) return } // TODO: if EXPIRED => send message for two sessions association expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond) - tokenData = &token.TokenData{sessionID, expTime.UnixNano() / 1e6} + tokenData = &token.TokenData{sessionID, expTime.UnixMilli()} country := geoIP.ExtractISOCodeFromHTTPRequest(r) producer.Produce(TOPIC_RAW_WEB, tokenData.ID, Encode(&SessionStart{ @@ -108,8 +108,8 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) { //delayDuration := time.Now().Sub(startTime) responseWithJSON(w, &response{ - //Timestamp: startTime.UnixNano() / 1e6, - //Delay: delayDuration.Nanoseconds() / 1e6, + //Timestamp: startTime.UnixMilli(), + //Delay: delayDuration.Milliseconds(), Token: tokenizer.Compose(*tokenData), UserUUID: userUUID, SessionID: strconv.FormatUint(tokenData.ID, 10), diff --git a/backend/services/integrations/integration/bugsnag.go b/backend/services/integrations/integration/bugsnag.go index 7c31db3cb..118cdb84d 100644 --- a/backend/services/integrations/integration/bugsnag.go +++ b/backend/services/integrations/integration/bugsnag.go @@ -1,15 +1,14 @@ package integration import ( + "encoding/json" "fmt" + "io" + "io/ioutil" "net/http" - "encoding/json" "net/url" "time" - "io" - "io/ioutil" - "openreplay/backend/pkg/utime" "openreplay/backend/pkg/messages" ) @@ -18,15 +17,14 @@ import ( */ type bugsnag struct { - BugsnagProjectId string // `json:"bugsnag_project_id"` + BugsnagProjectId string // `json:"bugsnag_project_id"` AuthorizationToken string // `json:"auth_token"` } - type bugsnagEvent struct { MetaData struct { SpecialInfo struct { - AsayerSessionId uint64 `json:"asayerSessionId,string"` + AsayerSessionId uint64 `json:"asayerSessionId,string"` OpenReplaySessionToken string `json:"openReplaySessionToken"` } `json:"special_info"` } `json:"metaData"` @@ -38,7 +36,7 @@ type bugsnagEvent struct { func (b *bugsnag) Request(c *client) error { sinceTs := c.getLastMessageTimestamp() + 1000 // From next second - sinceFormatted := time.Unix(0, int64(sinceTs*1e6)).Format(time.RFC3339) + sinceFormatted := time.UnixMilli(int64(sinceTs)).Format(time.RFC3339) requestURL := fmt.Sprintf("https://api.bugsnag.com/projects/%v/events", b.BugsnagProjectId) req, err := http.NewRequest("GET", requestURL, nil) if err != nil { @@ -47,10 +45,10 @@ func (b *bugsnag) Request(c *client) error { q := req.URL.Query() // q.Add("per_page", "100") // Up to a maximum of 30. Default: 30 // q.Add("sort", "timestamp") // Default: timestamp (timestamp == ReceivedAt ??) - q.Add("direction", "asc") // Default: desc + q.Add("direction", "asc") // Default: desc q.Add("full_reports", "true") // Default: false - q.Add("filters[event.since][][type]", "eq") - q.Add("filters[event.since][][value]", sinceFormatted) // seems like inclusively + q.Add("filters[event.since][][type]", "eq") + q.Add("filters[event.since][][value]", sinceFormatted) // seems like inclusively req.URL.RawQuery = q.Encode() authToken := "token " + b.AuthorizationToken @@ -85,7 +83,7 @@ func (b *bugsnag) Request(c *client) error { } sessionID := e.MetaData.SpecialInfo.AsayerSessionId token := e.MetaData.SpecialInfo.OpenReplaySessionToken - if sessionID == 0 && token == "" { + if sessionID == 0 && token == "" { // c.errChan <- "No AsayerSessionId found. | Message: %v", e continue } @@ -94,16 +92,16 @@ func (b *bugsnag) Request(c *client) error { c.errChan <- err continue } - timestamp := uint64(utime.ToMilliseconds(parsedTime)) + timestamp := uint64(parsedTime.UnixMilli()) c.setLastMessageTimestamp(timestamp) c.evChan <- &SessionErrorEvent{ SessionID: sessionID, - Token: token, + Token: token, RawErrorEvent: &messages.RawErrorEvent{ - Source: "bugsnag", + Source: "bugsnag", Timestamp: timestamp, - Name: e.Exceptions[0].Message, - Payload: string(jsonEvent), + Name: e.Exceptions[0].Message, + Payload: string(jsonEvent), }, } } diff --git a/backend/services/integrations/integration/client.go b/backend/services/integrations/integration/client.go index 2abf9913d..315bfe4e9 100644 --- a/backend/services/integrations/integration/client.go +++ b/backend/services/integrations/integration/client.go @@ -5,10 +5,10 @@ import ( "fmt" "log" "sync" + "time" "openreplay/backend/pkg/db/postgres" "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/utime" ) const MAX_ATTEMPTS_IN_A_ROW = 4 @@ -20,10 +20,10 @@ type requester interface { } type requestData struct { - LastMessageTimestamp uint64 // `json:"lastMessageTimestamp, string"` - LastMessageId string + LastMessageTimestamp uint64 // `json:"lastMessageTimestamp, string"` + LastMessageId string UnsuccessfullAttemptsCount int - LastAttemptTimestamp int64 + LastAttemptTimestamp int64 } type client struct { @@ -31,19 +31,19 @@ type client struct { requester integration *postgres.Integration // TODO: timeout ? - mux sync.Mutex + mux sync.Mutex updateChan chan<- postgres.Integration - evChan chan<- *SessionErrorEvent - errChan chan<- error + evChan chan<- *SessionErrorEvent + errChan chan<- error } type SessionErrorEvent struct { SessionID uint64 - Token string + Token string *messages.RawErrorEvent } -type ClientMap map[ string ]*client +type ClientMap map[string]*client func NewClient(i *postgres.Integration, updateChan chan<- postgres.Integration, evChan chan<- *SessionErrorEvent, errChan chan<- error) (*client, error) { c := new(client) @@ -60,15 +60,14 @@ func NewClient(i *postgres.Integration, updateChan chan<- postgres.Integration, // TODO: RequestData manager if c.requestData.LastMessageTimestamp == 0 { // ? - c.requestData.LastMessageTimestamp = uint64(utime.CurrentTimestamp() - 24*60*60*1000) + c.requestData.LastMessageTimestamp = uint64(time.Now().Add(-time.Hour * 24).UnixMilli()) } return c, nil } - // from outside -func (c* client) Update(i *postgres.Integration) error { +func (c *client) Update(i *postgres.Integration) error { c.mux.Lock() defer c.mux.Unlock() var r requester @@ -111,8 +110,8 @@ func (c *client) getLastMessageTimestamp() uint64 { } func (c *client) setLastMessageId(timestamp uint64, id string) { //if timestamp >= c.requestData.LastMessageTimestamp { - c.requestData.LastMessageId = id - c.requestData.LastMessageTimestamp = timestamp + c.requestData.LastMessageId = id + c.requestData.LastMessageTimestamp = timestamp //} } func (c *client) getLastMessageId() string { @@ -128,18 +127,18 @@ func (c *client) Request() { c.mux.Lock() defer c.mux.Unlock() if c.requestData.UnsuccessfullAttemptsCount >= MAX_ATTEMPTS || - (c.requestData.UnsuccessfullAttemptsCount >= MAX_ATTEMPTS_IN_A_ROW && - utime.CurrentTimestamp() - c.requestData.LastAttemptTimestamp < ATTEMPTS_INTERVAL) { + (c.requestData.UnsuccessfullAttemptsCount >= MAX_ATTEMPTS_IN_A_ROW && + time.Now().UnixMilli()-c.requestData.LastAttemptTimestamp < ATTEMPTS_INTERVAL) { return } - c.requestData.LastAttemptTimestamp = utime.CurrentTimestamp() + c.requestData.LastAttemptTimestamp = time.Now().UnixMilli() err := c.requester.Request(c) if err != nil { log.Println("ERRROR L139") log.Println(err) c.handleError(err) - c.requestData.UnsuccessfullAttemptsCount++; + c.requestData.UnsuccessfullAttemptsCount++ } else { c.requestData.UnsuccessfullAttemptsCount = 0 } @@ -152,5 +151,3 @@ func (c *client) Request() { c.integration.RequestData = rd c.updateChan <- *c.integration } - - diff --git a/backend/services/integrations/integration/datadog.go b/backend/services/integrations/integration/datadog.go index eb7b5daee..096c3b822 100644 --- a/backend/services/integrations/integration/datadog.go +++ b/backend/services/integrations/integration/datadog.go @@ -1,38 +1,37 @@ package integration import ( - "fmt" - "net/http" - "encoding/json" "bytes" - "time" + "encoding/json" + "fmt" "io" - "io/ioutil" + "io/ioutil" + "net/http" + "time" - "openreplay/backend/pkg/utime" "openreplay/backend/pkg/messages" ) -/* +/* We collect Logs. Datadog also has Events */ type datadog struct { - ApplicationKey string //`json:"application_key"` - ApiKey string //`json:"api_key"` + ApplicationKey string //`json:"application_key"` + ApiKey string //`json:"api_key"` } type datadogResponce struct { - Logs []json.RawMessage + Logs []json.RawMessage NextLogId *string - Status string + Status string } type datadogLog struct { Content struct { - Timestamp string - Message string + Timestamp string + Message string Attributes struct { Error struct { // Not sure about this Message string @@ -48,10 +47,10 @@ func (d *datadog) makeRequest(nextLogId *string, fromTs uint64, toTs uint64) (*h d.ApplicationKey, ) startAt := "null" - if nextLogId != nil && *nextLogId != "" { + if nextLogId != nil && *nextLogId != "" { startAt = *nextLogId } - // Query: status:error/info/warning? + // Query: status:error/info/warning? // openReplaySessionToken instead of asayer_session_id jsonBody := fmt.Sprintf(`{ "limit": 1000, @@ -72,8 +71,8 @@ func (d *datadog) makeRequest(nextLogId *string, fromTs uint64, toTs uint64) (*h } func (d *datadog) Request(c *client) error { - fromTs := c.getLastMessageTimestamp() + 1 // From next millisecond - toTs := uint64(utime.CurrentTimestamp()) + fromTs := c.getLastMessageTimestamp() + 1 // From next millisecond + toTs := uint64(time.Now().UnixMilli()) var nextLogId *string for { req, err := d.makeRequest(nextLogId, fromTs, toTs) @@ -111,16 +110,16 @@ func (d *datadog) Request(c *client) error { c.errChan <- err continue } - timestamp := uint64(utime.ToMilliseconds(parsedTime)) + timestamp := uint64(parsedTime.UnixMilli()) c.setLastMessageTimestamp(timestamp) c.evChan <- &SessionErrorEvent{ //SessionID: sessionID, Token: token, RawErrorEvent: &messages.RawErrorEvent{ - Source: "datadog", + Source: "datadog", Timestamp: timestamp, - Name: ddLog.Content.Attributes.Error.Message, - Payload: string(jsonLog), + Name: ddLog.Content.Attributes.Error.Message, + Payload: string(jsonLog), }, } } @@ -129,4 +128,4 @@ func (d *datadog) Request(c *client) error { return nil } } -} \ No newline at end of file +} diff --git a/backend/services/integrations/integration/elasticsearch.go b/backend/services/integrations/integration/elasticsearch.go index 14480e0b8..dd6f5d5f9 100644 --- a/backend/services/integrations/integration/elasticsearch.go +++ b/backend/services/integrations/integration/elasticsearch.go @@ -12,7 +12,6 @@ import ( "time" "openreplay/backend/pkg/messages" - "openreplay/backend/pkg/utime" ) type elasticsearch struct { @@ -164,7 +163,7 @@ func (es *elasticsearch) Request(c *client) error { c.errChan <- err continue } - timestamp := uint64(utime.ToMilliseconds(esLog.Time)) + timestamp := uint64(esLog.Time.UnixMilli()) c.setLastMessageTimestamp(timestamp) var sessionID uint64 diff --git a/backend/services/integrations/integration/newrelic.go b/backend/services/integrations/integration/newrelic.go index 937ab166d..2dce79aa5 100644 --- a/backend/services/integrations/integration/newrelic.go +++ b/backend/services/integrations/integration/newrelic.go @@ -2,25 +2,24 @@ package integration import ( "encoding/json" - "time" + "errors" "fmt" - "net/http" "io" - "io/ioutil" - "errors" + "io/ioutil" + "net/http" + "time" "openreplay/backend/pkg/messages" ) /* - We use insights-api for query. They also have Logs and Events + We use insights-api for query. They also have Logs and Events */ - // TODO: Eu/us type newrelic struct { - ApplicationId string //`json:"application_id"` - XQueryKey string //`json:"x_query_key"` + ApplicationId string //`json:"application_id"` + XQueryKey string //`json:"x_query_key"` } // TODO: Recheck @@ -34,14 +33,14 @@ type newrelicResponce struct { type newrelicEvent struct { //AsayerSessionID uint64 `json:"asayer_session_id,string"` // string/int decoder? OpenReplaySessionToken string `json:"openReplaySessionToken"` - ErrorClass string `json:"error.class"` - Timestamp uint64 `json:"timestamp"` + ErrorClass string `json:"error.class"` + Timestamp uint64 `json:"timestamp"` } func (nr *newrelic) Request(c *client) error { sinceTs := c.getLastMessageTimestamp() + 1000 // From next second // In docs - format "yyyy-mm-dd HH:MM:ss", but time.RFC3339 works fine too - sinceFormatted := time.Unix(0, int64(sinceTs*1e6)).Format(time.RFC3339) + sinceFormatted := time.UnixMilli(int64(sinceTs)).Format(time.RFC3339) // US/EU endpoint ?? requestURL := fmt.Sprintf("https://insights-api.eu.newrelic.com/v1/accounts/%v/query", nr.ApplicationId) req, err := http.NewRequest("GET", requestURL, nil) @@ -64,11 +63,10 @@ func (nr *newrelic) Request(c *client) error { } defer resp.Body.Close() - // 401 (unauthorised) if wrong XQueryKey/deploymentServer is wrong or 403 (Forbidden) if ApplicationId is wrong // 400 if Query has problems if resp.StatusCode >= 400 { - io.Copy(ioutil.Discard, resp.Body) // Read the body to free socket + io.Copy(ioutil.Discard, resp.Body) // Read the body to free socket return fmt.Errorf("Newrelic: server respond with the code %v| Request: ", resp.StatusCode, *req) } // Pagination depending on returning metadata ? @@ -92,13 +90,13 @@ func (nr *newrelic) Request(c *client) error { c.evChan <- &SessionErrorEvent{ Token: e.OpenReplaySessionToken, RawErrorEvent: &messages.RawErrorEvent{ - Source: "newrelic", + Source: "newrelic", Timestamp: e.Timestamp, - Name: e.ErrorClass, - Payload: string(jsonEvent), + Name: e.ErrorClass, + Payload: string(jsonEvent), }, } } } return nil -} \ No newline at end of file +} diff --git a/backend/services/integrations/integration/sentry.go b/backend/services/integrations/integration/sentry.go index 0330430c3..1c5bfdaad 100644 --- a/backend/services/integrations/integration/sentry.go +++ b/backend/services/integrations/integration/sentry.go @@ -1,44 +1,41 @@ package integration import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" "net/http" "net/url" - "encoding/json" - "strings" - "fmt" - "time" "strconv" - "io" - "io/ioutil" + "strings" + "time" - "openreplay/backend/pkg/utime" "openreplay/backend/pkg/messages" ) - -/* +/* They also have different stuff - Documentation says: + Documentation says: "Note: This endpoint is experimental and may be removed without notice." */ type sentry struct { OrganizationSlug string // `json:"organization_slug"` - ProjectSlug string // `json:"project_slug"` - Token string // `json:"token"` + ProjectSlug string // `json:"project_slug"` + Token string // `json:"token"` } type sentryEvent struct { Tags []struct { - Key string - Value string `json:"value"` + Key string + Value string `json:"value"` } - DateCreated string `json:"dateCreated"` // or dateReceived ? - Title string - EventID string `json:"eventID"` + DateCreated string `json:"dateCreated"` // or dateReceived ? + Title string + EventID string `json:"eventID"` } - func (sn *sentry) Request(c *client) error { requestURL := fmt.Sprintf("https://sentry.io/api/0/projects/%v/%v/events/", sn.OrganizationSlug, sn.ProjectSlug) req, err := http.NewRequest("GET", requestURL, nil) @@ -88,9 +85,9 @@ PageLoop: c.errChan <- fmt.Errorf("%v | Event: %v", err, e) continue } - timestamp := uint64(utime.ToMilliseconds(parsedTime)) + timestamp := uint64(parsedTime.UnixMilli()) // TODO: not to receive all the messages (use default integration timestamp) - if firstEvent { // TODO: reverse range? + if firstEvent { // TODO: reverse range? c.setLastMessageId(timestamp, e.EventID) firstEvent = false } @@ -117,12 +114,12 @@ PageLoop: c.evChan <- &SessionErrorEvent{ SessionID: sessionID, - Token: token, + Token: token, RawErrorEvent: &messages.RawErrorEvent{ - Source: "sentry", + Source: "sentry", Timestamp: timestamp, - Name: e.Title, - Payload: string(jsonEvent), + Name: e.Title, + Payload: string(jsonEvent), }, } } @@ -137,7 +134,7 @@ PageLoop: return fmt.Errorf("Link header format error. Got: '%v'", linkHeader) } - nextLinkInfo := pagInfo[ 1 ] + nextLinkInfo := pagInfo[1] if strings.Contains(nextLinkInfo, `results="false"`) { break } @@ -151,4 +148,4 @@ PageLoop: } } return nil -} \ No newline at end of file +} diff --git a/backend/services/integrations/integration/stackdriver.go b/backend/services/integrations/integration/stackdriver.go index bb8e3cef9..e852d5d36 100644 --- a/backend/services/integrations/integration/stackdriver.go +++ b/backend/services/integrations/integration/stackdriver.go @@ -1,22 +1,19 @@ package integration - import ( - "google.golang.org/api/option" "cloud.google.com/go/logging/logadmin" "google.golang.org/api/iterator" - - //"strconv" - "encoding/json" - "time" - "fmt" - "context" + "google.golang.org/api/option" + + //"strconv" + "context" + "encoding/json" + "fmt" + "time" - "openreplay/backend/pkg/utime" "openreplay/backend/pkg/messages" ) - // Old: asayerSessionId const SD_FILTER_QUERY = ` @@ -28,7 +25,7 @@ const SD_FILTER_QUERY = ` type stackdriver struct { ServiceAccountCredentials string // `json:"service_account_credentials"` - LogName string // `json:"log_name"` + LogName string // `json:"log_name"` } type saCreds struct { @@ -37,10 +34,10 @@ type saCreds struct { func (sd *stackdriver) Request(c *client) error { fromTs := c.getLastMessageTimestamp() + 1 // Timestamp is RFC3339Nano, so we take the next millisecond - fromFormatted := time.Unix(0, int64(fromTs *1e6)).Format(time.RFC3339Nano) + fromFormatted := time.UnixMilli(int64(fromTs)).Format(time.RFC3339Nano) ctx := context.Background() - var parsedCreds saCreds + var parsedCreds saCreds err := json.Unmarshal([]byte(sd.ServiceAccountCredentials), &parsedCreds) if err != nil { return err @@ -49,56 +46,56 @@ func (sd *stackdriver) Request(c *client) error { opt := option.WithCredentialsJSON([]byte(sd.ServiceAccountCredentials)) client, err := logadmin.NewClient(ctx, parsedCreds.ProjectId, opt) if err != nil { - return err + return err } defer client.Close() - - filter := fmt.Sprintf(SD_FILTER_QUERY, parsedCreds.ProjectId, sd.LogName, fromFormatted) - // By default, Entries are listed from oldest to newest. - /* ResourceNames(rns []string) - "projects/[PROJECT_ID]" - "organizations/[ORGANIZATION_ID]" - "billingAccounts/[BILLING_ACCOUNT_ID]" - "folders/[FOLDER_ID]" - */ - it := client.Entries(ctx, logadmin.Filter(filter)) - // TODO: Pagination: - //pager := iterator.NewPager(it, 1000, "") - //nextToken, err := pager.NextPage(&entries) - //if nextToken == "" { break } - for { - e, err := it.Next() - if err == iterator.Done { - break - } - if err != nil { - return err - } + filter := fmt.Sprintf(SD_FILTER_QUERY, parsedCreds.ProjectId, sd.LogName, fromFormatted) + // By default, Entries are listed from oldest to newest. + /* ResourceNames(rns []string) + "projects/[PROJECT_ID]" + "organizations/[ORGANIZATION_ID]" + "billingAccounts/[BILLING_ACCOUNT_ID]" + "folders/[FOLDER_ID]" + */ + it := client.Entries(ctx, logadmin.Filter(filter)) - token := e.Labels["openReplaySessionToken"] - // sessionID, err := strconv.ParseUint(strSessionID, 10, 64) - // if err != nil { - // c.errChan <- err - // continue - // } - jsonEvent, err := json.Marshal(e) - if err != nil { - c.errChan <- err - continue - } - timestamp := uint64(utime.ToMilliseconds(e.Timestamp)) - c.setLastMessageTimestamp(timestamp) - c.evChan <- &SessionErrorEvent{ + // TODO: Pagination: + //pager := iterator.NewPager(it, 1000, "") + //nextToken, err := pager.NextPage(&entries) + //if nextToken == "" { break } + for { + e, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + return err + } + + token := e.Labels["openReplaySessionToken"] + // sessionID, err := strconv.ParseUint(strSessionID, 10, 64) + // if err != nil { + // c.errChan <- err + // continue + // } + jsonEvent, err := json.Marshal(e) + if err != nil { + c.errChan <- err + continue + } + timestamp := uint64(e.Timestamp.UnixMilli()) + c.setLastMessageTimestamp(timestamp) + c.evChan <- &SessionErrorEvent{ //SessionID: sessionID, Token: token, RawErrorEvent: &messages.RawErrorEvent{ - Source: "stackdriver", + Source: "stackdriver", Timestamp: timestamp, - Name: e.InsertID, // not sure about that - Payload: string(jsonEvent), + Name: e.InsertID, // not sure about that + Payload: string(jsonEvent), }, } } return nil -} \ No newline at end of file +} diff --git a/backend/services/integrations/integration/sumologic.go b/backend/services/integrations/integration/sumologic.go index 2660dd6ac..8ff39ec9e 100644 --- a/backend/services/integrations/integration/sumologic.go +++ b/backend/services/integrations/integration/sumologic.go @@ -1,20 +1,19 @@ package integration import ( - "net/http" - "time" "encoding/json" "fmt" - "strings" "io" - "io/ioutil" + "io/ioutil" + "net/http" + "strings" + "time" - "openreplay/backend/pkg/utime" "openreplay/backend/pkg/messages" ) -/* - The maximum value for limit is 10,000 messages or 100 MB in total message size, +/* + The maximum value for limit is 10,000 messages or 100 MB in total message size, which means the query may return less than 10,000 messages if you exceed the size limit. API Documentation: https://help.sumologic.com/APIs/Search-Job-API/About-the-Search-Job-API @@ -22,31 +21,30 @@ import ( const SL_LIMIT = 10000 type sumologic struct { - AccessId string // `json:"access_id"` - AccessKey string // `json:"access_key"` - cookies []*http.Cookie + AccessId string // `json:"access_id"` + AccessKey string // `json:"access_key"` + cookies []*http.Cookie } - type sumplogicJobResponce struct { Id string } type sumologicJobStatusResponce struct { - State string + State string MessageCount int //PendingErrors []string } type sumologicResponce struct { - Messages [] struct { + Messages []struct { Map json.RawMessage } } type sumologicEvent struct { Timestamp uint64 `json:"_messagetime,string"` - Raw string `json:"_raw"` + Raw string `json:"_raw"` } func (sl *sumologic) deleteJob(jobId string, errChan chan<- error) { @@ -68,10 +66,9 @@ func (sl *sumologic) deleteJob(jobId string, errChan chan<- error) { resp.Body.Close() } - func (sl *sumologic) Request(c *client) error { fromTs := c.getLastMessageTimestamp() + 1 // From next millisecond - toTs := utime.CurrentTimestamp() + toTs := time.Now().UnixMilli() requestURL := fmt.Sprintf("https://api.%vsumologic.com/api/v1/search/jobs", "eu.") // deployment server?? jsonBody := fmt.Sprintf(`{ "query": "\"openReplaySessionToken=\" AND (*error* OR *fail* OR *exception*)", @@ -132,7 +129,7 @@ func (sl *sumologic) Request(c *client) error { tick := time.Tick(5 * time.Second) for { - <- tick + <-tick resp, err = http.DefaultClient.Do(req) if err != nil { return err // TODO: retry, counter/timeout @@ -147,12 +144,12 @@ func (sl *sumologic) Request(c *client) error { } if jobStatus.State == "DONE GATHERING RESULTS" { offset := 0 - for ;offset < jobStatus.MessageCount; { + for offset < jobStatus.MessageCount { requestURL = fmt.Sprintf( - "https://api.%vsumologic.com/api/v1/search/jobs/%v/messages?offset=%v&limit=%v", - "eu.", - jobResponce.Id, - offset, + "https://api.%vsumologic.com/api/v1/search/jobs/%v/messages?offset=%v&limit=%v", + "eu.", + jobResponce.Id, + offset, SL_LIMIT, ) req, err = http.NewRequest("GET", requestURL, nil) @@ -190,17 +187,17 @@ func (sl *sumologic) Request(c *client) error { } name := e.Raw if len(name) > 20 { - name = name[:20] // not sure about that + name = name[:20] // not sure about that } c.setLastMessageTimestamp(e.Timestamp) c.evChan <- &SessionErrorEvent{ //SessionID: sessionID, Token: token, RawErrorEvent: &messages.RawErrorEvent{ - Source: "sumologic", + Source: "sumologic", Timestamp: e.Timestamp, - Name: name, - Payload: string(m.Map), //e.Raw ? + Name: name, + Payload: string(m.Map), //e.Raw ? }, } @@ -209,11 +206,11 @@ func (sl *sumologic) Request(c *client) error { } break } - if jobStatus.State != "NOT STARTED" && + if jobStatus.State != "NOT STARTED" && jobStatus.State != "GATHERING RESULTS" { // error break } } return nil -} \ No newline at end of file +} diff --git a/ee/backend/pkg/kafka/consumer.go b/ee/backend/pkg/kafka/consumer.go index 82aa56d50..cb3714316 100644 --- a/ee/backend/pkg/kafka/consumer.go +++ b/ee/backend/pkg/kafka/consumer.go @@ -147,7 +147,7 @@ func (consumer *Consumer) ConsumeNext() error { if e.TopicPartition.Error != nil { return errors.Wrap(e.TopicPartition.Error, "Consumer Partition Error") } - ts := e.Timestamp.UnixNano() / 1e6 + ts := e.Timestamp.UnixMilli() consumer.messageHandler(decodeKey(e.Key), e.Value, &types.Meta{ Topic: *(e.TopicPartition.Topic), ID: uint64(e.TopicPartition.Offset),