code(backend): use 1.18 time features

This commit is contained in:
ShiKhu 2022-04-01 19:43:46 +02:00
parent 6196e79d00
commit c8872064ec
15 changed files with 184 additions and 213 deletions

View file

@ -22,8 +22,8 @@ func NewTokenizer(secret string) *Tokenizer {
}
type TokenData struct {
ID uint64
ExpTime int64
ID uint64
ExpTime int64
}
func (tokenizer *Tokenizer) sign(body string) []byte {
@ -33,7 +33,7 @@ func (tokenizer *Tokenizer) sign(body string) []byte {
}
func (tokenizer *Tokenizer) Compose(d TokenData) string {
body := strconv.FormatUint(d.ID, 36) +
body := strconv.FormatUint(d.ID, 36) +
"." + strconv.FormatInt(d.ExpTime, 36)
sign := base58.Encode(tokenizer.sign(body))
return body + "." + sign
@ -58,8 +58,8 @@ func (tokenizer *Tokenizer) Parse(token string) (*TokenData, error) {
if err != nil {
return nil, err
}
if expTime <= time.Now().UnixNano()/1e6 {
return &TokenData{id,expTime}, EXPIRED
if expTime <= time.Now().UnixMilli() {
return &TokenData{id, expTime}, EXPIRED
}
return &TokenData{id,expTime}, nil
return &TokenData{id, expTime}, nil
}

View file

@ -1,11 +0,0 @@
package utime
import "time"
func CurrentTimestamp() int64 {
return time.Now().UnixNano() / 1e6
}
func ToMilliseconds(t time.Time) int64 {
return t.UnixNano() / 1e6
}

View file

@ -110,11 +110,11 @@ func (b *builder) buildInputEvent() {
func (b *builder) handleMessage(message Message, messageID uint64) {
timestamp := GetTimestamp(message)
if b.timestamp <= timestamp { // unnecessary? TODO: test and remove
if b.timestamp < timestamp { // unnecessary? TODO: test and remove
b.timestamp = timestamp
}
b.lastProcessedTimestamp = time.Now().UnixNano() / 1e6
b.lastProcessedTimestamp = time.Now().UnixMilli()
// Might happen before the first timestamp.
switch msg := message.(type) {

View file

@ -56,7 +56,7 @@ func main() {
consumer.Close()
os.Exit(0)
case <-tick:
builderMap.IterateReadyMessages(time.Now().UnixNano()/1e6, func(sessionID uint64, readyMsg messages.Message) {
builderMap.IterateReadyMessages(time.Now().UnixMilli(), func(sessionID uint64, readyMsg messages.Message) {
producer.Produce(TOPIC_TRIGGER, sessionID, messages.Encode(readyMsg))
})
// TODO: why exactly do we need Flush here and not in any other place?

View file

@ -85,14 +85,14 @@ func startSessionHandlerIOS(w http.ResponseWriter, r *http.Request) {
responseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
sessionID, err := flaker.Compose(uint64(startTime.UnixNano() / 1e6))
sessionID, err := flaker.Compose(uint64(startTime.UnixMilli()))
if err != nil {
responseWithError(w, http.StatusInternalServerError, err)
return
}
// TODO: if EXPIRED => send message for two sessions association
expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond)
tokenData = &token.TokenData{sessionID, expTime.UnixNano() / 1e6}
tokenData = &token.TokenData{sessionID, expTime.UnixMilli()}
country := geoIP.ExtractISOCodeFromHTTPRequest(r)

View file

@ -76,14 +76,14 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
responseWithError(w, http.StatusForbidden, errors.New("browser not recognized"))
return
}
sessionID, err := flaker.Compose(uint64(startTime.UnixNano() / 1e6))
sessionID, err := flaker.Compose(uint64(startTime.UnixMilli()))
if err != nil {
responseWithError(w, http.StatusInternalServerError, err)
return
}
// TODO: if EXPIRED => send message for two sessions association
expTime := startTime.Add(time.Duration(p.MaxSessionDuration) * time.Millisecond)
tokenData = &token.TokenData{sessionID, expTime.UnixNano() / 1e6}
tokenData = &token.TokenData{sessionID, expTime.UnixMilli()}
country := geoIP.ExtractISOCodeFromHTTPRequest(r)
producer.Produce(TOPIC_RAW_WEB, tokenData.ID, Encode(&SessionStart{
@ -108,8 +108,8 @@ func startSessionHandlerWeb(w http.ResponseWriter, r *http.Request) {
//delayDuration := time.Now().Sub(startTime)
responseWithJSON(w, &response{
//Timestamp: startTime.UnixNano() / 1e6,
//Delay: delayDuration.Nanoseconds() / 1e6,
//Timestamp: startTime.UnixMilli(),
//Delay: delayDuration.Milliseconds(),
Token: tokenizer.Compose(*tokenData),
UserUUID: userUUID,
SessionID: strconv.FormatUint(tokenData.ID, 10),

View file

@ -1,15 +1,14 @@
package integration
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"encoding/json"
"net/url"
"time"
"io"
"io/ioutil"
"openreplay/backend/pkg/utime"
"openreplay/backend/pkg/messages"
)
@ -18,15 +17,14 @@ import (
*/
type bugsnag struct {
BugsnagProjectId string // `json:"bugsnag_project_id"`
BugsnagProjectId string // `json:"bugsnag_project_id"`
AuthorizationToken string // `json:"auth_token"`
}
type bugsnagEvent struct {
MetaData struct {
SpecialInfo struct {
AsayerSessionId uint64 `json:"asayerSessionId,string"`
AsayerSessionId uint64 `json:"asayerSessionId,string"`
OpenReplaySessionToken string `json:"openReplaySessionToken"`
} `json:"special_info"`
} `json:"metaData"`
@ -38,7 +36,7 @@ type bugsnagEvent struct {
func (b *bugsnag) Request(c *client) error {
sinceTs := c.getLastMessageTimestamp() + 1000 // From next second
sinceFormatted := time.Unix(0, int64(sinceTs*1e6)).Format(time.RFC3339)
sinceFormatted := time.UnixMilli(int64(sinceTs)).Format(time.RFC3339)
requestURL := fmt.Sprintf("https://api.bugsnag.com/projects/%v/events", b.BugsnagProjectId)
req, err := http.NewRequest("GET", requestURL, nil)
if err != nil {
@ -47,10 +45,10 @@ func (b *bugsnag) Request(c *client) error {
q := req.URL.Query()
// q.Add("per_page", "100") // Up to a maximum of 30. Default: 30
// q.Add("sort", "timestamp") // Default: timestamp (timestamp == ReceivedAt ??)
q.Add("direction", "asc") // Default: desc
q.Add("direction", "asc") // Default: desc
q.Add("full_reports", "true") // Default: false
q.Add("filters[event.since][][type]", "eq")
q.Add("filters[event.since][][value]", sinceFormatted) // seems like inclusively
q.Add("filters[event.since][][type]", "eq")
q.Add("filters[event.since][][value]", sinceFormatted) // seems like inclusively
req.URL.RawQuery = q.Encode()
authToken := "token " + b.AuthorizationToken
@ -85,7 +83,7 @@ func (b *bugsnag) Request(c *client) error {
}
sessionID := e.MetaData.SpecialInfo.AsayerSessionId
token := e.MetaData.SpecialInfo.OpenReplaySessionToken
if sessionID == 0 && token == "" {
if sessionID == 0 && token == "" {
// c.errChan <- "No AsayerSessionId found. | Message: %v", e
continue
}
@ -94,16 +92,16 @@ func (b *bugsnag) Request(c *client) error {
c.errChan <- err
continue
}
timestamp := uint64(utime.ToMilliseconds(parsedTime))
timestamp := uint64(parsedTime.UnixMilli())
c.setLastMessageTimestamp(timestamp)
c.evChan <- &SessionErrorEvent{
SessionID: sessionID,
Token: token,
Token: token,
RawErrorEvent: &messages.RawErrorEvent{
Source: "bugsnag",
Source: "bugsnag",
Timestamp: timestamp,
Name: e.Exceptions[0].Message,
Payload: string(jsonEvent),
Name: e.Exceptions[0].Message,
Payload: string(jsonEvent),
},
}
}

View file

@ -5,10 +5,10 @@ import (
"fmt"
"log"
"sync"
"time"
"openreplay/backend/pkg/db/postgres"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/utime"
)
const MAX_ATTEMPTS_IN_A_ROW = 4
@ -20,10 +20,10 @@ type requester interface {
}
type requestData struct {
LastMessageTimestamp uint64 // `json:"lastMessageTimestamp, string"`
LastMessageId string
LastMessageTimestamp uint64 // `json:"lastMessageTimestamp, string"`
LastMessageId string
UnsuccessfullAttemptsCount int
LastAttemptTimestamp int64
LastAttemptTimestamp int64
}
type client struct {
@ -31,19 +31,19 @@ type client struct {
requester
integration *postgres.Integration
// TODO: timeout ?
mux sync.Mutex
mux sync.Mutex
updateChan chan<- postgres.Integration
evChan chan<- *SessionErrorEvent
errChan chan<- error
evChan chan<- *SessionErrorEvent
errChan chan<- error
}
type SessionErrorEvent struct {
SessionID uint64
Token string
Token string
*messages.RawErrorEvent
}
type ClientMap map[ string ]*client
type ClientMap map[string]*client
func NewClient(i *postgres.Integration, updateChan chan<- postgres.Integration, evChan chan<- *SessionErrorEvent, errChan chan<- error) (*client, error) {
c := new(client)
@ -60,15 +60,14 @@ func NewClient(i *postgres.Integration, updateChan chan<- postgres.Integration,
// TODO: RequestData manager
if c.requestData.LastMessageTimestamp == 0 {
// ?
c.requestData.LastMessageTimestamp = uint64(utime.CurrentTimestamp() - 24*60*60*1000)
c.requestData.LastMessageTimestamp = uint64(time.Now().Add(-time.Hour * 24).UnixMilli())
}
return c, nil
}
// from outside
func (c* client) Update(i *postgres.Integration) error {
func (c *client) Update(i *postgres.Integration) error {
c.mux.Lock()
defer c.mux.Unlock()
var r requester
@ -111,8 +110,8 @@ func (c *client) getLastMessageTimestamp() uint64 {
}
func (c *client) setLastMessageId(timestamp uint64, id string) {
//if timestamp >= c.requestData.LastMessageTimestamp {
c.requestData.LastMessageId = id
c.requestData.LastMessageTimestamp = timestamp
c.requestData.LastMessageId = id
c.requestData.LastMessageTimestamp = timestamp
//}
}
func (c *client) getLastMessageId() string {
@ -128,18 +127,18 @@ func (c *client) Request() {
c.mux.Lock()
defer c.mux.Unlock()
if c.requestData.UnsuccessfullAttemptsCount >= MAX_ATTEMPTS ||
(c.requestData.UnsuccessfullAttemptsCount >= MAX_ATTEMPTS_IN_A_ROW &&
utime.CurrentTimestamp() - c.requestData.LastAttemptTimestamp < ATTEMPTS_INTERVAL) {
(c.requestData.UnsuccessfullAttemptsCount >= MAX_ATTEMPTS_IN_A_ROW &&
time.Now().UnixMilli()-c.requestData.LastAttemptTimestamp < ATTEMPTS_INTERVAL) {
return
}
c.requestData.LastAttemptTimestamp = utime.CurrentTimestamp()
c.requestData.LastAttemptTimestamp = time.Now().UnixMilli()
err := c.requester.Request(c)
if err != nil {
log.Println("ERRROR L139")
log.Println(err)
c.handleError(err)
c.requestData.UnsuccessfullAttemptsCount++;
c.requestData.UnsuccessfullAttemptsCount++
} else {
c.requestData.UnsuccessfullAttemptsCount = 0
}
@ -152,5 +151,3 @@ func (c *client) Request() {
c.integration.RequestData = rd
c.updateChan <- *c.integration
}

View file

@ -1,38 +1,37 @@
package integration
import (
"fmt"
"net/http"
"encoding/json"
"bytes"
"time"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"io/ioutil"
"net/http"
"time"
"openreplay/backend/pkg/utime"
"openreplay/backend/pkg/messages"
)
/*
/*
We collect Logs. Datadog also has Events
*/
type datadog struct {
ApplicationKey string //`json:"application_key"`
ApiKey string //`json:"api_key"`
ApplicationKey string //`json:"application_key"`
ApiKey string //`json:"api_key"`
}
type datadogResponce struct {
Logs []json.RawMessage
Logs []json.RawMessage
NextLogId *string
Status string
Status string
}
type datadogLog struct {
Content struct {
Timestamp string
Message string
Timestamp string
Message string
Attributes struct {
Error struct { // Not sure about this
Message string
@ -48,10 +47,10 @@ func (d *datadog) makeRequest(nextLogId *string, fromTs uint64, toTs uint64) (*h
d.ApplicationKey,
)
startAt := "null"
if nextLogId != nil && *nextLogId != "" {
if nextLogId != nil && *nextLogId != "" {
startAt = *nextLogId
}
// Query: status:error/info/warning?
// Query: status:error/info/warning?
// openReplaySessionToken instead of asayer_session_id
jsonBody := fmt.Sprintf(`{
"limit": 1000,
@ -72,8 +71,8 @@ func (d *datadog) makeRequest(nextLogId *string, fromTs uint64, toTs uint64) (*h
}
func (d *datadog) Request(c *client) error {
fromTs := c.getLastMessageTimestamp() + 1 // From next millisecond
toTs := uint64(utime.CurrentTimestamp())
fromTs := c.getLastMessageTimestamp() + 1 // From next millisecond
toTs := uint64(time.Now().UnixMilli())
var nextLogId *string
for {
req, err := d.makeRequest(nextLogId, fromTs, toTs)
@ -111,16 +110,16 @@ func (d *datadog) Request(c *client) error {
c.errChan <- err
continue
}
timestamp := uint64(utime.ToMilliseconds(parsedTime))
timestamp := uint64(parsedTime.UnixMilli())
c.setLastMessageTimestamp(timestamp)
c.evChan <- &SessionErrorEvent{
//SessionID: sessionID,
Token: token,
RawErrorEvent: &messages.RawErrorEvent{
Source: "datadog",
Source: "datadog",
Timestamp: timestamp,
Name: ddLog.Content.Attributes.Error.Message,
Payload: string(jsonLog),
Name: ddLog.Content.Attributes.Error.Message,
Payload: string(jsonLog),
},
}
}
@ -129,4 +128,4 @@ func (d *datadog) Request(c *client) error {
return nil
}
}
}
}

View file

@ -12,7 +12,6 @@ import (
"time"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/utime"
)
type elasticsearch struct {
@ -164,7 +163,7 @@ func (es *elasticsearch) Request(c *client) error {
c.errChan <- err
continue
}
timestamp := uint64(utime.ToMilliseconds(esLog.Time))
timestamp := uint64(esLog.Time.UnixMilli())
c.setLastMessageTimestamp(timestamp)
var sessionID uint64

View file

@ -2,25 +2,24 @@ package integration
import (
"encoding/json"
"time"
"errors"
"fmt"
"net/http"
"io"
"io/ioutil"
"errors"
"io/ioutil"
"net/http"
"time"
"openreplay/backend/pkg/messages"
)
/*
We use insights-api for query. They also have Logs and Events
We use insights-api for query. They also have Logs and Events
*/
// TODO: Eu/us
type newrelic struct {
ApplicationId string //`json:"application_id"`
XQueryKey string //`json:"x_query_key"`
ApplicationId string //`json:"application_id"`
XQueryKey string //`json:"x_query_key"`
}
// TODO: Recheck
@ -34,14 +33,14 @@ type newrelicResponce struct {
type newrelicEvent struct {
//AsayerSessionID uint64 `json:"asayer_session_id,string"` // string/int decoder?
OpenReplaySessionToken string `json:"openReplaySessionToken"`
ErrorClass string `json:"error.class"`
Timestamp uint64 `json:"timestamp"`
ErrorClass string `json:"error.class"`
Timestamp uint64 `json:"timestamp"`
}
func (nr *newrelic) Request(c *client) error {
sinceTs := c.getLastMessageTimestamp() + 1000 // From next second
// In docs - format "yyyy-mm-dd HH:MM:ss", but time.RFC3339 works fine too
sinceFormatted := time.Unix(0, int64(sinceTs*1e6)).Format(time.RFC3339)
sinceFormatted := time.UnixMilli(int64(sinceTs)).Format(time.RFC3339)
// US/EU endpoint ??
requestURL := fmt.Sprintf("https://insights-api.eu.newrelic.com/v1/accounts/%v/query", nr.ApplicationId)
req, err := http.NewRequest("GET", requestURL, nil)
@ -64,11 +63,10 @@ func (nr *newrelic) Request(c *client) error {
}
defer resp.Body.Close()
// 401 (unauthorised) if wrong XQueryKey/deploymentServer is wrong or 403 (Forbidden) if ApplicationId is wrong
// 400 if Query has problems
if resp.StatusCode >= 400 {
io.Copy(ioutil.Discard, resp.Body) // Read the body to free socket
io.Copy(ioutil.Discard, resp.Body) // Read the body to free socket
return fmt.Errorf("Newrelic: server respond with the code %v| Request: ", resp.StatusCode, *req)
}
// Pagination depending on returning metadata ?
@ -92,13 +90,13 @@ func (nr *newrelic) Request(c *client) error {
c.evChan <- &SessionErrorEvent{
Token: e.OpenReplaySessionToken,
RawErrorEvent: &messages.RawErrorEvent{
Source: "newrelic",
Source: "newrelic",
Timestamp: e.Timestamp,
Name: e.ErrorClass,
Payload: string(jsonEvent),
Name: e.ErrorClass,
Payload: string(jsonEvent),
},
}
}
}
return nil
}
}

View file

@ -1,44 +1,41 @@
package integration
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"encoding/json"
"strings"
"fmt"
"time"
"strconv"
"io"
"io/ioutil"
"strings"
"time"
"openreplay/backend/pkg/utime"
"openreplay/backend/pkg/messages"
)
/*
/*
They also have different stuff
Documentation says:
Documentation says:
"Note: This endpoint is experimental and may be removed without notice."
*/
type sentry struct {
OrganizationSlug string // `json:"organization_slug"`
ProjectSlug string // `json:"project_slug"`
Token string // `json:"token"`
ProjectSlug string // `json:"project_slug"`
Token string // `json:"token"`
}
type sentryEvent struct {
Tags []struct {
Key string
Value string `json:"value"`
Key string
Value string `json:"value"`
}
DateCreated string `json:"dateCreated"` // or dateReceived ?
Title string
EventID string `json:"eventID"`
DateCreated string `json:"dateCreated"` // or dateReceived ?
Title string
EventID string `json:"eventID"`
}
func (sn *sentry) Request(c *client) error {
requestURL := fmt.Sprintf("https://sentry.io/api/0/projects/%v/%v/events/", sn.OrganizationSlug, sn.ProjectSlug)
req, err := http.NewRequest("GET", requestURL, nil)
@ -88,9 +85,9 @@ PageLoop:
c.errChan <- fmt.Errorf("%v | Event: %v", err, e)
continue
}
timestamp := uint64(utime.ToMilliseconds(parsedTime))
timestamp := uint64(parsedTime.UnixMilli())
// TODO: not to receive all the messages (use default integration timestamp)
if firstEvent { // TODO: reverse range?
if firstEvent { // TODO: reverse range?
c.setLastMessageId(timestamp, e.EventID)
firstEvent = false
}
@ -117,12 +114,12 @@ PageLoop:
c.evChan <- &SessionErrorEvent{
SessionID: sessionID,
Token: token,
Token: token,
RawErrorEvent: &messages.RawErrorEvent{
Source: "sentry",
Source: "sentry",
Timestamp: timestamp,
Name: e.Title,
Payload: string(jsonEvent),
Name: e.Title,
Payload: string(jsonEvent),
},
}
}
@ -137,7 +134,7 @@ PageLoop:
return fmt.Errorf("Link header format error. Got: '%v'", linkHeader)
}
nextLinkInfo := pagInfo[ 1 ]
nextLinkInfo := pagInfo[1]
if strings.Contains(nextLinkInfo, `results="false"`) {
break
}
@ -151,4 +148,4 @@ PageLoop:
}
}
return nil
}
}

View file

@ -1,22 +1,19 @@
package integration
import (
"google.golang.org/api/option"
"cloud.google.com/go/logging/logadmin"
"google.golang.org/api/iterator"
//"strconv"
"encoding/json"
"time"
"fmt"
"context"
"google.golang.org/api/option"
//"strconv"
"context"
"encoding/json"
"fmt"
"time"
"openreplay/backend/pkg/utime"
"openreplay/backend/pkg/messages"
)
// Old: asayerSessionId
const SD_FILTER_QUERY = `
@ -28,7 +25,7 @@ const SD_FILTER_QUERY = `
type stackdriver struct {
ServiceAccountCredentials string // `json:"service_account_credentials"`
LogName string // `json:"log_name"`
LogName string // `json:"log_name"`
}
type saCreds struct {
@ -37,10 +34,10 @@ type saCreds struct {
func (sd *stackdriver) Request(c *client) error {
fromTs := c.getLastMessageTimestamp() + 1 // Timestamp is RFC3339Nano, so we take the next millisecond
fromFormatted := time.Unix(0, int64(fromTs *1e6)).Format(time.RFC3339Nano)
fromFormatted := time.UnixMilli(int64(fromTs)).Format(time.RFC3339Nano)
ctx := context.Background()
var parsedCreds saCreds
var parsedCreds saCreds
err := json.Unmarshal([]byte(sd.ServiceAccountCredentials), &parsedCreds)
if err != nil {
return err
@ -49,56 +46,56 @@ func (sd *stackdriver) Request(c *client) error {
opt := option.WithCredentialsJSON([]byte(sd.ServiceAccountCredentials))
client, err := logadmin.NewClient(ctx, parsedCreds.ProjectId, opt)
if err != nil {
return err
return err
}
defer client.Close()
filter := fmt.Sprintf(SD_FILTER_QUERY, parsedCreds.ProjectId, sd.LogName, fromFormatted)
// By default, Entries are listed from oldest to newest.
/* ResourceNames(rns []string)
"projects/[PROJECT_ID]"
"organizations/[ORGANIZATION_ID]"
"billingAccounts/[BILLING_ACCOUNT_ID]"
"folders/[FOLDER_ID]"
*/
it := client.Entries(ctx, logadmin.Filter(filter))
// TODO: Pagination:
//pager := iterator.NewPager(it, 1000, "")
//nextToken, err := pager.NextPage(&entries)
//if nextToken == "" { break }
for {
e, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
filter := fmt.Sprintf(SD_FILTER_QUERY, parsedCreds.ProjectId, sd.LogName, fromFormatted)
// By default, Entries are listed from oldest to newest.
/* ResourceNames(rns []string)
"projects/[PROJECT_ID]"
"organizations/[ORGANIZATION_ID]"
"billingAccounts/[BILLING_ACCOUNT_ID]"
"folders/[FOLDER_ID]"
*/
it := client.Entries(ctx, logadmin.Filter(filter))
token := e.Labels["openReplaySessionToken"]
// sessionID, err := strconv.ParseUint(strSessionID, 10, 64)
// if err != nil {
// c.errChan <- err
// continue
// }
jsonEvent, err := json.Marshal(e)
if err != nil {
c.errChan <- err
continue
}
timestamp := uint64(utime.ToMilliseconds(e.Timestamp))
c.setLastMessageTimestamp(timestamp)
c.evChan <- &SessionErrorEvent{
// TODO: Pagination:
//pager := iterator.NewPager(it, 1000, "")
//nextToken, err := pager.NextPage(&entries)
//if nextToken == "" { break }
for {
e, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
token := e.Labels["openReplaySessionToken"]
// sessionID, err := strconv.ParseUint(strSessionID, 10, 64)
// if err != nil {
// c.errChan <- err
// continue
// }
jsonEvent, err := json.Marshal(e)
if err != nil {
c.errChan <- err
continue
}
timestamp := uint64(e.Timestamp.UnixMilli())
c.setLastMessageTimestamp(timestamp)
c.evChan <- &SessionErrorEvent{
//SessionID: sessionID,
Token: token,
RawErrorEvent: &messages.RawErrorEvent{
Source: "stackdriver",
Source: "stackdriver",
Timestamp: timestamp,
Name: e.InsertID, // not sure about that
Payload: string(jsonEvent),
Name: e.InsertID, // not sure about that
Payload: string(jsonEvent),
},
}
}
return nil
}
}

View file

@ -1,20 +1,19 @@
package integration
import (
"net/http"
"time"
"encoding/json"
"fmt"
"strings"
"io"
"io/ioutil"
"io/ioutil"
"net/http"
"strings"
"time"
"openreplay/backend/pkg/utime"
"openreplay/backend/pkg/messages"
)
/*
The maximum value for limit is 10,000 messages or 100 MB in total message size,
/*
The maximum value for limit is 10,000 messages or 100 MB in total message size,
which means the query may return less than 10,000 messages if you exceed the size limit.
API Documentation: https://help.sumologic.com/APIs/Search-Job-API/About-the-Search-Job-API
@ -22,31 +21,30 @@ import (
const SL_LIMIT = 10000
type sumologic struct {
AccessId string // `json:"access_id"`
AccessKey string // `json:"access_key"`
cookies []*http.Cookie
AccessId string // `json:"access_id"`
AccessKey string // `json:"access_key"`
cookies []*http.Cookie
}
type sumplogicJobResponce struct {
Id string
}
type sumologicJobStatusResponce struct {
State string
State string
MessageCount int
//PendingErrors []string
}
type sumologicResponce struct {
Messages [] struct {
Messages []struct {
Map json.RawMessage
}
}
type sumologicEvent struct {
Timestamp uint64 `json:"_messagetime,string"`
Raw string `json:"_raw"`
Raw string `json:"_raw"`
}
func (sl *sumologic) deleteJob(jobId string, errChan chan<- error) {
@ -68,10 +66,9 @@ func (sl *sumologic) deleteJob(jobId string, errChan chan<- error) {
resp.Body.Close()
}
func (sl *sumologic) Request(c *client) error {
fromTs := c.getLastMessageTimestamp() + 1 // From next millisecond
toTs := utime.CurrentTimestamp()
toTs := time.Now().UnixMilli()
requestURL := fmt.Sprintf("https://api.%vsumologic.com/api/v1/search/jobs", "eu.") // deployment server??
jsonBody := fmt.Sprintf(`{
"query": "\"openReplaySessionToken=\" AND (*error* OR *fail* OR *exception*)",
@ -132,7 +129,7 @@ func (sl *sumologic) Request(c *client) error {
tick := time.Tick(5 * time.Second)
for {
<- tick
<-tick
resp, err = http.DefaultClient.Do(req)
if err != nil {
return err // TODO: retry, counter/timeout
@ -147,12 +144,12 @@ func (sl *sumologic) Request(c *client) error {
}
if jobStatus.State == "DONE GATHERING RESULTS" {
offset := 0
for ;offset < jobStatus.MessageCount; {
for offset < jobStatus.MessageCount {
requestURL = fmt.Sprintf(
"https://api.%vsumologic.com/api/v1/search/jobs/%v/messages?offset=%v&limit=%v",
"eu.",
jobResponce.Id,
offset,
"https://api.%vsumologic.com/api/v1/search/jobs/%v/messages?offset=%v&limit=%v",
"eu.",
jobResponce.Id,
offset,
SL_LIMIT,
)
req, err = http.NewRequest("GET", requestURL, nil)
@ -190,17 +187,17 @@ func (sl *sumologic) Request(c *client) error {
}
name := e.Raw
if len(name) > 20 {
name = name[:20] // not sure about that
name = name[:20] // not sure about that
}
c.setLastMessageTimestamp(e.Timestamp)
c.evChan <- &SessionErrorEvent{
//SessionID: sessionID,
Token: token,
RawErrorEvent: &messages.RawErrorEvent{
Source: "sumologic",
Source: "sumologic",
Timestamp: e.Timestamp,
Name: name,
Payload: string(m.Map), //e.Raw ?
Name: name,
Payload: string(m.Map), //e.Raw ?
},
}
@ -209,11 +206,11 @@ func (sl *sumologic) Request(c *client) error {
}
break
}
if jobStatus.State != "NOT STARTED" &&
if jobStatus.State != "NOT STARTED" &&
jobStatus.State != "GATHERING RESULTS" {
// error
break
}
}
return nil
}
}

View file

@ -147,7 +147,7 @@ func (consumer *Consumer) ConsumeNext() error {
if e.TopicPartition.Error != nil {
return errors.Wrap(e.TopicPartition.Error, "Consumer Partition Error")
}
ts := e.Timestamp.UnixNano() / 1e6
ts := e.Timestamp.UnixMilli()
consumer.messageHandler(decodeKey(e.Key), e.Value, &types.Meta{
Topic: *(e.TopicPartition.Topic),
ID: uint64(e.TopicPartition.Offset),