New backend logs integrations (#2717)

* feat(integrations): new version of backend integrations

* feat(integrations): added ingress rule

* feat(integrations): fixed a port number

* feat(integrations): enabled ingress in values.yaml

* feat(integrations): added startup log

* feat(integrations): added extra logger for 3 of 4 backend logs integrations.

* feat(integrations): removed a logs loop call

* feat(integrations): fixed a table name

* feat(integrations): disabled extra logger

* feat(integrations): made extra logger as an option

* feat(integrations): changed contentType for logs file

* feat(integrations): bug fix

* feat(integrations): struct/string config support for datadog provider

* feat(integrations): map config support for datadog provider

* feat(integrations): removed unnecessary transformation

* feat(integrations): fixed datadog and sentry response format

* feat(integrations): added correct creds parser for sentry provider

* feat(integrations): removed unnecessary return statement

* feat(integrations): added correct creds parser for elastic search

* feat(integrations): changed elastic to elasticsearch

* feat(integrations): added correct creds parser for dynatrace

* feat(integrations): fixed an issue in query request for elasticsearch provider

* feat(integrations): made extra logger configurable by env var

* feat(integrations): removed debug logs
This commit is contained in:
Alexander 2024-10-31 15:28:38 +01:00 committed by GitHub
parent c0910b015a
commit 30a69893bb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
44 changed files with 1381 additions and 22271 deletions

View file

@ -6,55 +6,52 @@ import (
"os/signal"
"syscall"
"github.com/jackc/pgx/v4"
config "openreplay/backend/internal/config/integrations"
"openreplay/backend/pkg/integrations"
"openreplay/backend/internal/http/server"
"openreplay/backend/pkg/db/postgres/pool"
integration "openreplay/backend/pkg/integrations"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/metrics"
databaseMetrics "openreplay/backend/pkg/metrics/database"
"openreplay/backend/pkg/queue"
"openreplay/backend/pkg/token"
"openreplay/backend/pkg/metrics/database"
)
func main() {
ctx := context.Background()
log := logger.New()
cfg := config.New(log)
metrics.New(log, databaseMetrics.List())
metrics.New(log, append(database.List()))
pgConn, err := pgx.Connect(context.Background(), cfg.Postgres.String())
pgConn, err := pool.New(cfg.Postgres.String())
if err != nil {
log.Fatal(ctx, "can't init postgres connection: %s", err)
}
defer pgConn.Close(context.Background())
defer pgConn.Close()
producer := queue.NewProducer(cfg.MessageSizeLimit, true)
defer producer.Close(15000)
storage := integrations.NewStorage(pgConn, log)
if err := storage.Listen(); err != nil {
log.Fatal(ctx, "can't init storage listener: %s", err)
}
listener, err := integrations.New(log, cfg, storage, producer, integrations.NewManager(log), token.NewTokenizer(cfg.TokenSecret))
services, err := integration.NewServiceBuilder(log, cfg, pgConn)
if err != nil {
log.Fatal(ctx, "can't init service: %s", err)
log.Fatal(ctx, "can't init services: %s", err)
}
defer listener.Close()
router, err := integration.NewRouter(cfg, log, services)
if err != nil {
log.Fatal(ctx, "failed while creating router: %s", err)
}
dataIntegrationServer, err := server.New(router.GetHandler(), cfg.HTTPHost, cfg.HTTPPort, cfg.HTTPTimeout)
if err != nil {
log.Fatal(ctx, "failed while creating server: %s", err)
}
go func() {
if err := dataIntegrationServer.Start(); err != nil {
log.Fatal(ctx, "http server error: %s", err)
}
}()
log.Info(ctx, "server successfully started on port %s", cfg.HTTPPort)
// Wait stop signal to shut down server gracefully
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
log.Info(ctx, "integration service started")
for {
select {
case sig := <-sigchan:
log.Info(ctx, "caught signal %v: terminating", sig)
os.Exit(0)
case err := <-listener.Errors:
log.Error(ctx, "listener error: %s", err)
os.Exit(0)
}
}
<-sigchan
log.Info(ctx, "shutting down the server")
dataIntegrationServer.Stop()
}

View file

@ -7,8 +7,9 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.4.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0
github.com/ClickHouse/clickhouse-go/v2 v2.2.0
github.com/DataDog/datadog-api-client-go/v2 v2.30.0
github.com/Masterminds/semver v1.5.0
github.com/andybalholm/brotli v1.0.5
github.com/andybalholm/brotli v1.1.0
github.com/aws/aws-sdk-go v1.44.98
github.com/btcsuite/btcutil v1.0.2
github.com/confluentinc/confluent-kafka-go/v2 v2.4.0
@ -16,6 +17,7 @@ require (
github.com/docker/distribution v2.8.3+incompatible
github.com/elastic/go-elasticsearch/v7 v7.13.1
github.com/elastic/go-elasticsearch/v8 v8.13.0
github.com/getsentry/sentry-go v0.29.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/google/uuid v1.6.0
@ -24,7 +26,7 @@ require (
github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451
github.com/jackc/pgtype v1.14.0
github.com/jackc/pgx/v4 v4.18.2
github.com/klauspost/compress v1.17.4
github.com/klauspost/compress v1.17.7
github.com/klauspost/pgzip v1.2.5
github.com/lib/pq v1.10.2
github.com/oschwald/maxminddb-golang v1.7.0
@ -35,7 +37,7 @@ require (
github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce
github.com/ua-parser/uap-go v0.0.0-20200325213135-e1c09f13e2fe
go.uber.org/zap v1.17.0
golang.org/x/net v0.22.0
golang.org/x/net v0.23.0
google.golang.org/api v0.169.0
)
@ -47,6 +49,7 @@ require (
cloud.google.com/go/longrunning v0.5.6 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.2.0 // indirect
github.com/DataDog/zstd v1.5.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/distribution/reference v0.6.0 // indirect
@ -54,6 +57,7 @@ require (
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/s2a-go v0.1.7 // indirect

View file

@ -35,6 +35,10 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/ClickHouse/clickhouse-go v1.5.4/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
github.com/ClickHouse/clickhouse-go/v2 v2.2.0 h1:dj00TDKY+xwuTJdbpspCSmTLFyWzRJerTHwaBxut1C0=
github.com/ClickHouse/clickhouse-go/v2 v2.2.0/go.mod h1:8f2XZUi7XoeU+uPIytSi1cvx8fmJxi7vIgqpvYTF1+o=
github.com/DataDog/datadog-api-client-go/v2 v2.30.0 h1:WHAo6RA8CqAzaUh3dERqz/n6SuG2GJ/WthBkccn0MIQ=
github.com/DataDog/datadog-api-client-go/v2 v2.30.0/go.mod h1:QKOu6vscsh87fMY1lHfLEmNSunyXImj8BUaUWJXOehc=
github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8=
github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs=
@ -46,8 +50,8 @@ github.com/Microsoft/hcsshim v0.11.4 h1:68vKo2VN8DE9AdN4tnkWnmdhqdbpUFM8OF3Airm7
github.com/Microsoft/hcsshim v0.11.4/go.mod h1:smjE4dvqPX9Zldna+t5FG3rnoHhaB7QYxPRqGcpAD9w=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/aws/aws-sdk-go v1.44.98 h1:fX+NxebSdO/9T6DTNOLhpC+Vv6RNkKRfsMg0a7o/yBo=
github.com/aws/aws-sdk-go v1.44.98/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go-v2 v1.17.6 h1:Y773UK7OBqhzi5VDXMi1zVGsoj+CVHs2eaC2bDsLwi0=
@ -167,6 +171,10 @@ github.com/fsnotify/fsevents v0.1.1/go.mod h1:+d+hS27T6k5J8CRaPLKFgwKYcpS7GwW3Ul
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fvbommel/sortorder v1.0.2 h1:mV4o8B2hKboCdkJm+a7uX/SIpZob4JzUpc5GGnM45eo=
github.com/fvbommel/sortorder v1.0.2/go.mod h1:uk88iVf1ovNn1iLfgUVU2F9o5eO30ui720w+kxuqRs0=
github.com/getsentry/sentry-go v0.29.0 h1:YtWluuCFg9OfcqnaujpY918N/AhCCwarIDWOYSBAjCA=
github.com/getsentry/sentry-go v0.29.0/go.mod h1:jhPesDAL0Q0W2+2YEuVOvdWmVtdsr1+jtBrlDEVWwLY=
github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
@ -188,6 +196,8 @@ github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGK
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw=
@ -339,8 +349,8 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:C
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE=
github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@ -363,8 +373,8 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA=
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
@ -372,8 +382,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-shellwords v1.0.12 h1:M2zGm7EW6UQJvDeQxo4T51eKPurbeFbe8WtebGE2xrk=
@ -439,6 +449,8 @@ github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCko
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@ -646,8 +658,8 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ=
golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA=

View file

@ -1,21 +1,33 @@
package integrations
import (
"time"
"openreplay/backend/internal/config/common"
"openreplay/backend/internal/config/configurator"
"openreplay/backend/internal/config/objectstorage"
"openreplay/backend/internal/config/redis"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/logger"
)
type Config struct {
common.Config
common.Postgres
TopicAnalytics string `env:"TOPIC_ANALYTICS,required"`
TokenSecret string `env:"TOKEN_SECRET,required"`
UseProfiler bool `env:"PROFILER_ENABLED,default=false"`
redis.Redis
objectstorage.ObjectsConfig
HTTPHost string `env:"HTTP_HOST,default="`
HTTPPort string `env:"HTTP_PORT,required"`
HTTPTimeout time.Duration `env:"HTTP_TIMEOUT,default=60s"`
JsonSizeLimit int64 `env:"JSON_SIZE_LIMIT,default=131072"` // 128KB
UseAccessControlHeaders bool `env:"USE_CORS,default=false"`
ProjectExpiration time.Duration `env:"PROJECT_EXPIRATION,default=10m"`
JWTSecret string `env:"JWT_SECRET,required"`
WorkerID uint16
}
func New(log logger.Logger) *Config {
cfg := &Config{}
cfg := &Config{WorkerID: env.WorkerID()}
configurator.Process(log, cfg)
return cfg
}

View file

@ -0,0 +1,36 @@
package data_integration
import (
"openreplay/backend/internal/config/integrations"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/flakeid"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/objectstorage"
"openreplay/backend/pkg/objectstorage/store"
"openreplay/backend/pkg/spot/auth"
)
type ServiceBuilder struct {
Flaker *flakeid.Flaker
ObjStorage objectstorage.ObjectStorage
Auth auth.Auth
Integrator Service
}
func NewServiceBuilder(log logger.Logger, cfg *integrations.Config, pgconn pool.Pool) (*ServiceBuilder, error) {
objStore, err := store.NewStore(&cfg.ObjectsConfig)
if err != nil {
return nil, err
}
integrator, err := NewService(log, pgconn, objStore)
if err != nil {
return nil, err
}
flaker := flakeid.NewFlaker(cfg.WorkerID)
return &ServiceBuilder{
Flaker: flaker,
ObjStorage: objStore,
Auth: auth.NewAuth(log, cfg.JWTSecret, "", pgconn),
Integrator: integrator,
}, nil
}

View file

@ -1,121 +0,0 @@
package clients
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"
"openreplay/backend/pkg/messages"
)
/*
- Bugsnag messages usually recived later then others
*/
type bugsnag struct {
BugsnagProjectId string // `json:"bugsnag_project_id"`
AuthorizationToken string // `json:"auth_token"`
}
type bugsnagEvent struct {
MetaData struct {
SpecialInfo struct {
AsayerSessionId uint64 `json:"asayerSessionId,string"`
OpenReplaySessionToken string `json:"openReplaySessionToken"`
} `json:"special_info"`
} `json:"metaData"`
ReceivedAt string `json:"received_at"` // can use time.Time as it implements UnmarshalJSON from RFC3339 format
Exceptions [1]struct {
Message string
}
}
// need result chan and lastMessageTs
func (b *bugsnag) Request(c *client) error {
sinceTs := c.requestData.GetLastMessageTimestamp() + 1000 // From next second
sinceFormatted := time.UnixMilli(int64(sinceTs)).Format(time.RFC3339)
requestURL := fmt.Sprintf("https://api.bugsnag.com/projects/%v/events", b.BugsnagProjectId)
req, err := http.NewRequest("GET", requestURL, nil)
if err != nil {
return err
}
q := req.URL.Query()
// q.Add("per_page", "100") // Up to a maximum of 30. Default: 30
// q.Add("sort", "timestamp") // Default: timestamp (timestamp == ReceivedAt ??)
q.Add("direction", "asc") // Default: desc
q.Add("full_reports", "true") // Default: false
q.Add("filters[event.since][][type]", "eq")
q.Add("filters[event.since][][value]", sinceFormatted) // seems like inclusively
req.URL.RawQuery = q.Encode()
authToken := "token " + b.AuthorizationToken
req.Header.Add("Authorization", authToken)
req.Header.Add("X-Version", "2")
for {
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// Status code
// 401 (unauthorised)
if resp.StatusCode >= 400 {
io.Copy(io.Discard, resp.Body) // Read the body to free socket
return fmt.Errorf("Bugsnag: server respond with the code %v | data: %v ", resp.StatusCode, *b)
}
var jsonEventList []json.RawMessage
err = json.NewDecoder(resp.Body).Decode(&jsonEventList)
if err != nil {
return err
}
for _, jsonEvent := range jsonEventList {
var e bugsnagEvent
err = json.Unmarshal(jsonEvent, &e)
if err != nil {
c.errChan <- err
continue
}
sessionID := e.MetaData.SpecialInfo.AsayerSessionId
token := e.MetaData.SpecialInfo.OpenReplaySessionToken
if sessionID == 0 && token == "" {
// c.errChan <- "No AsayerSessionId found. | Message: %v", e
continue
}
parsedTime, err := time.Parse(time.RFC3339, e.ReceivedAt)
if err != nil {
c.errChan <- err
continue
}
timestamp := uint64(parsedTime.UnixMilli())
c.requestData.SetLastMessageTimestamp(timestamp)
c.evChan <- &SessionErrorEvent{
SessionID: sessionID,
Token: token,
IntegrationEvent: &messages.IntegrationEvent{
Source: "bugsnag",
Timestamp: timestamp,
Name: e.Exceptions[0].Message,
Payload: string(jsonEvent),
},
}
}
linkHeader := resp.Header.Get("Link")
if linkHeader == "" {
break
}
nextLink := GetLinkFromAngularBrackets(linkHeader)
req.URL, err = url.Parse(nextLink)
if err != nil {
return err
}
}
return nil
}

File diff suppressed because it is too large Load diff

View file

@ -1,110 +1,5 @@
package clients
import (
"encoding/json"
"fmt"
"openreplay/backend/pkg/integrations/model"
"openreplay/backend/pkg/messages"
"sync"
)
type requester interface {
Request(*client) error
}
type client struct {
requester
requestData *model.RequestInfo
integration *model.Integration
mux sync.Mutex
updateChan chan<- model.Integration
evChan chan<- *SessionErrorEvent
errChan chan<- error
}
type SessionErrorEvent struct {
SessionID uint64
Token string
*messages.IntegrationEvent
}
type ClientMap map[string]*client
func NewClient(i *model.Integration, updateChan chan<- model.Integration, evChan chan<- *SessionErrorEvent, errChan chan<- error) (*client, error) {
ri, err := i.GetRequestInfo()
if err != nil {
return nil, err
}
c := &client{
evChan: evChan,
errChan: errChan,
updateChan: updateChan,
requestData: ri,
}
return c, nil
}
func (c *client) Update(i *model.Integration) error {
var r requester
switch i.Provider {
case "bugsnag":
r = new(bugsnag)
case "cloudwatch":
r = new(cloudwatch)
case "datadog":
r = new(datadog)
case "elasticsearch":
r = new(elasticsearch)
case "newrelic":
r = new(newrelic)
case "rollbar":
r = new(rollbar)
case "sentry":
r = new(sentry)
case "stackdriver":
r = new(stackdriver)
case "sumologic":
r = new(sumologic)
}
if err := json.Unmarshal(i.Options, r); err != nil {
return err
}
c.mux.Lock()
defer c.mux.Unlock()
c.integration = i
c.requester = r
return nil
}
func (c *client) handleError(err error) {
c.errChan <- fmt.Errorf("%v | Integration: %v", err, *c.integration)
}
func (c *client) Request() {
c.mux.Lock()
defer c.mux.Unlock()
if !c.requestData.CanAttempt() {
return
}
c.requestData.UpdateLastAttempt()
err := c.requester.Request(c)
if err != nil {
c.requestData.Inc()
c.handleError(fmt.Errorf("ERRROR L139, err: %s", err))
} else {
c.requestData.Reset()
}
rd, err := c.requestData.Encode()
if err != nil {
c.handleError(err)
}
// RequestData is a byte array (pointer-like type), but it's replacement
// won't affect the previous value sent by channel
c.integration.RequestData = rd
c.updateChan <- *c.integration
type Client interface {
FetchSessionData(credentials interface{}, sessionID uint64) (interface{}, error)
}

View file

@ -1,85 +0,0 @@
package clients
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"openreplay/backend/pkg/messages"
"regexp"
"strings"
)
var reIsException = regexp.MustCompile(`(?i)exception|error`)
type cloudwatch struct {
AwsAccessKeyId string // `json:"aws_access_key_id"`
AwsSecretAccessKey string // `json:"aws_secret_access_key"`
LogGroupName string // `json:"log_group_name"`
Region string // `json:"region"`
}
func (cw *cloudwatch) Request(c *client) error {
startTs := int64(c.requestData.GetLastMessageTimestamp() + 1) // From next millisecond
//endTs := utils.CurrentTimestamp()
sess, err := session.NewSession(aws.NewConfig().
WithRegion(cw.Region).
WithCredentials(
credentials.NewStaticCredentials(cw.AwsAccessKeyId, cw.AwsSecretAccessKey, ""),
),
)
if err != nil {
return err
}
svc := cloudwatchlogs.New(sess)
filterOptions := new(cloudwatchlogs.FilterLogEventsInput).
SetStartTime(startTs). // Inclusively both startTime and endTime
// SetEndTime(endTs). // Default nil?
// SetLimit(10000). // Default 10000
SetLogGroupName(cw.LogGroupName).
SetFilterPattern("openReplaySessionToken")
//SetFilterPattern("asayer_session_id")
for {
output, err := svc.FilterLogEvents(filterOptions)
if err != nil {
return err
}
for _, e := range output.Events {
if e.Message == nil || e.Timestamp == nil {
continue
}
if !reIsException.MatchString(*e.Message) { // too weak condition ?
continue
}
token, err := GetToken(*e.Message)
if err != nil {
c.errChan <- err
continue
}
name := ""
if e.LogStreamName != nil {
name = *e.LogStreamName
}
timestamp := uint64(*e.Timestamp)
c.requestData.SetLastMessageTimestamp(timestamp)
c.evChan <- &SessionErrorEvent{
//SessionID: sessionID,
Token: token,
IntegrationEvent: &messages.IntegrationEvent{
Source: "cloudwatch",
Timestamp: timestamp, // e.IngestionTime ??
Name: name,
Payload: strings.ReplaceAll(e.String(), "\n", ""),
},
}
}
if output.NextToken == nil {
break
}
filterOptions.NextToken = output.NextToken
}
return nil
}

View file

@ -1,130 +1,79 @@
package clients
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"openreplay/backend/pkg/messages"
"github.com/DataDog/datadog-api-client-go/v2/api/datadog"
"github.com/DataDog/datadog-api-client-go/v2/api/datadogV2"
)
/*
We collect Logs. Datadog also has Events
type dataDogClient struct{}
*/
type datadog struct {
ApplicationKey string //`json:"application_key"`
ApiKey string //`json:"api_key"`
func NewDataDogClient() Client {
return &dataDogClient{}
}
type datadogResponce struct {
Logs []json.RawMessage
NextLogId *string
Status string
type datadogConfig struct {
Site string `json:"site"`
ApiKey string `json:"api_key"`
AppKey string `json:"app_key"`
}
type datadogLog struct {
Content struct {
Timestamp string
Message string
Attributes struct {
Error struct { // Not sure about this
Message string
}
func (d *dataDogClient) FetchSessionData(credentials interface{}, sessionID uint64) (interface{}, error) {
cfg, ok := credentials.(datadogConfig)
if !ok {
// Not a struct, will try to parse as JSON string
strCfg, ok := credentials.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid credentials, got: %+v", credentials)
}
cfg = datadogConfig{}
if site, ok := strCfg["site"].(string); ok {
cfg.Site = site
}
if apiKey, ok := strCfg["api_key"].(string); ok {
cfg.ApiKey = apiKey
}
if appKey, ok := strCfg["app_key"].(string); ok {
cfg.AppKey = appKey
}
}
}
func (d *datadog) makeRequest(nextLogId *string, fromTs uint64, toTs uint64) (*http.Request, error) {
requestURL := fmt.Sprintf(
"https://api.datadoghq.com/api/v1/logs-queries/list?api_key=%v&application_key=%v",
d.ApiKey,
d.ApplicationKey,
)
startAt := "null"
if nextLogId != nil && *nextLogId != "" {
startAt = *nextLogId
body := datadogV2.LogsListRequest{
Filter: &datadogV2.LogsQueryFilter{
Indexes: []string{
"main",
},
},
Sort: datadogV2.LOGSSORT_TIMESTAMP_ASCENDING.Ptr(),
Page: &datadogV2.LogsListRequestPage{
Limit: datadog.PtrInt32(1),
},
}
// Query: status:error/info/warning?
// openReplaySessionToken instead of asayer_session_id
jsonBody := fmt.Sprintf(`{
"limit": 1000,
"query": "status:error openReplaySessionToken",
"sort": "asc",
"time": {
"from": %v,
"to": %v
},
"startAt": %v
}`, fromTs, toTs, startAt) // from/to: both inclusively, both required; limit: default=10, max 1000
req, err := http.NewRequest("POST", requestURL, bytes.NewBuffer([]byte(jsonBody)))
if sessionID != 0 {
body.Filter.Query = datadog.PtrString(fmt.Sprintf("openReplaySession.id=%d", sessionID))
body.Page.Limit = datadog.PtrInt32(1000)
}
ctx := context.WithValue(context.Background(), datadog.ContextServerVariables, map[string]string{"site": cfg.Site})
ctx = context.WithValue(ctx, datadog.ContextAPIKeys, map[string]datadog.APIKey{
"apiKeyAuth": {Key: cfg.ApiKey},
"appKeyAuth": {Key: cfg.AppKey},
})
configuration := datadog.NewConfiguration()
apiClient := datadog.NewAPIClient(configuration)
api := datadogV2.NewLogsApi(apiClient)
resp, r, err := api.ListLogs(ctx, *datadogV2.NewListLogsOptionalParameters().WithBody(body))
if err != nil {
return nil, err
fmt.Printf("error when calling `LogsApi.ListLogs`: %v", err)
fmt.Printf("full HTTP response: %v\n", r)
}
req.Header.Add("Content-Type", "application/json")
return req, nil
}
func (d *datadog) Request(c *client) error {
fromTs := c.requestData.GetLastMessageTimestamp() + 1 // From next millisecond
toTs := uint64(time.Now().UnixMilli())
var nextLogId *string
for {
req, err := d.makeRequest(nextLogId, fromTs, toTs)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
io.Copy(io.Discard, resp.Body) // Read the body to free socket
return fmt.Errorf("Datadog: server respond with the code %v", resp.StatusCode)
}
var ddResp datadogResponce
if err = json.NewDecoder(resp.Body).Decode(&ddResp); err != nil {
return err
}
// if ddResp.Status != "done"/ "ok"
for _, jsonLog := range ddResp.Logs {
var ddLog datadogLog
err = json.Unmarshal(jsonLog, &ddLog)
if err != nil {
c.errChan <- err
continue
}
token, err := GetToken(ddLog.Content.Message) // sure here?
if err != nil {
c.errChan <- err
continue
}
parsedTime, err := time.Parse(time.RFC3339, ddLog.Content.Timestamp)
if err != nil {
c.errChan <- err
continue
}
timestamp := uint64(parsedTime.UnixMilli())
c.requestData.SetLastMessageTimestamp(timestamp)
c.evChan <- &SessionErrorEvent{
//SessionID: sessionID,
Token: token,
IntegrationEvent: &messages.IntegrationEvent{
Source: "datadog",
Timestamp: timestamp,
Name: ddLog.Content.Attributes.Error.Message,
Payload: string(jsonLog),
},
}
}
nextLogId = ddResp.NextLogId // ensure
if nextLogId == nil {
return nil
}
logs := resp.Data
if logs == nil || len(logs) == 0 {
return nil, fmt.Errorf("no logs found")
}
responseContent, _ := json.Marshal(logs)
return responseContent, nil
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,153 @@
package clients
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
)
type dynatraceClient struct{}
func NewDynatraceClient() Client {
return &dynatraceClient{}
}
type dynatraceConfig struct {
Environment string `json:"environment"`
ClientID string `json:"client_id"`
ClientSecret string `json:"client_secret"`
Resource string `json:"resource"`
}
type AuthToken struct {
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
}
func (d *dynatraceClient) FetchSessionData(credentials interface{}, sessionID uint64) (interface{}, error) {
// Try to parse the credentials as a Config struct
cfg, ok := credentials.(dynatraceConfig)
if !ok {
strCfg, ok := credentials.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid credentials, got: %+v", credentials)
}
cfg = dynatraceConfig{}
if val, ok := strCfg["environment"].(string); ok {
cfg.Environment = val
}
if val, ok := strCfg["client_id"].(string); ok {
cfg.ClientID = val
}
if val, ok := strCfg["client_secret"].(string); ok {
cfg.ClientSecret = val
}
if val, ok := strCfg["resource"].(string); ok {
cfg.Resource = val
}
}
token, err := d.requestOAuthToken(cfg.ClientID, cfg.ClientSecret, cfg.Resource)
if err != nil {
return nil, fmt.Errorf("error while requesting oauth token: %w", err)
}
logs, err := d.requestLogs(token.AccessToken, cfg.Environment, sessionID)
if err != nil {
return nil, fmt.Errorf("error while requesting logs: %w", err)
}
return logs, nil
}
func (d *dynatraceClient) requestOAuthToken(clientID, clientSecret, resource string) (*AuthToken, error) {
requestURL := "https://sso.dynatrace.com/sso/oauth2/token"
params := url.Values{}
params.Add("grant_type", "client_credentials")
params.Add("client_id", clientID)
params.Add("client_secret", clientSecret)
params.Add("resource", resource)
params.Add("scope", "storage:buckets:read storage:logs:read")
requestURL += "?" + params.Encode()
request, err := http.NewRequest("POST", requestURL, nil)
if err != nil {
return nil, err
}
request.Header.Set("Content-Type", "application/x-www-form-urlencoded")
client := &http.Client{}
response, err := client.Do(request)
if err != nil {
return nil, err
}
defer response.Body.Close()
body, err := io.ReadAll(response.Body)
if err != nil {
return nil, err
}
newToken := &AuthToken{}
err = json.Unmarshal(body, newToken)
if err != nil {
return nil, err
}
if newToken.AccessToken == "" {
return nil, fmt.Errorf("empty access token, body: %s", string(body))
}
return newToken, nil
}
type Logs struct {
Results []interface{} `json:"results"`
}
func testRequestParams() url.Values {
params := url.Values{}
params.Add("limit", "1")
return params
}
func requestParams(sessionID uint64) url.Values {
params := url.Values{}
params.Add("limit", "1000")
params.Add("query", "(status=\"WARN\" OR status=\"ERROR\") AND openReplaySession.id="+fmt.Sprint(sessionID))
return params
}
func (d *dynatraceClient) requestLogs(token, environmentID string, sessionID uint64) (*Logs, error) {
requestURL := fmt.Sprintf("https://%s.live.dynatrace.com/api/v2/logs/search", environmentID)
if sessionID == 0 {
requestURL += "?" + testRequestParams().Encode()
} else {
requestURL += "?" + requestParams(sessionID).Encode()
}
request, err := http.NewRequest("GET", requestURL, nil)
if err != nil {
return nil, err
}
request.Header.Set("Content-Type", "application/json")
request.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
client := &http.Client{}
response, err := client.Do(request)
if err != nil {
return nil, err
}
defer response.Body.Close()
body, err := io.ReadAll(response.Body)
if err != nil {
return nil, err
}
logs := &Logs{}
err = json.Unmarshal(body, logs)
if err != nil {
return nil, err
}
if len(logs.Results) == 0 {
return nil, fmt.Errorf("empty logs, body: %s", string(body))
}
return logs, nil
}

View file

@ -0,0 +1,103 @@
package clients
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"log"
"strings"
"github.com/elastic/go-elasticsearch/v8"
)
type elasticsearchClient struct{}
func NewElasticClient() Client {
return &elasticsearchClient{}
}
type elasticsearchConfig struct {
URL string `json:"url"`
APIKeyId string `json:"api_key_id"`
APIKey string `json:"api_key"`
Indexes string `json:"indexes"`
}
func (e *elasticsearchClient) FetchSessionData(credentials interface{}, sessionID uint64) (interface{}, error) {
cfg, ok := credentials.(elasticsearchConfig)
if !ok {
strCfg, ok := credentials.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid credentials, got: %+v", credentials)
}
cfg = elasticsearchConfig{}
if val, ok := strCfg["url"].(string); ok {
cfg.URL = val
}
if val, ok := strCfg["api_key_id"].(string); ok {
cfg.APIKeyId = val
}
if val, ok := strCfg["api_key"].(string); ok {
cfg.APIKey = val
}
if val, ok := strCfg["indexes"].(string); ok {
cfg.Indexes = val
}
}
clientCfg := elasticsearch.Config{
Addresses: []string{
cfg.URL,
},
APIKey: base64.StdEncoding.EncodeToString([]byte(cfg.APIKeyId + ":" + cfg.APIKey)),
}
// Create Elasticsearch client
es, err := elasticsearch.NewClient(clientCfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
var buf strings.Builder
query := `{"size": 1}`
if sessionID != 0 {
query = fmt.Sprintf(`{
"size": 1000,
"query": {
"match_phrase": {
"message": "openReplaySession.id=%d"
}
}
}`, sessionID)
}
buf.WriteString(query)
res, err := es.Search(
es.Search.WithContext(context.Background()),
es.Search.WithIndex("logs"),
es.Search.WithBody(strings.NewReader(buf.String())),
es.Search.WithTrackTotalHits(true),
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
log.Fatalf("Error: %s", res.String())
}
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
if r["hits"] == nil {
return nil, fmt.Errorf("no logs found")
}
logHits := r["hits"].(map[string]interface{})["hits"].([]interface{})
if logHits == nil || len(logHits) == 0 {
return nil, fmt.Errorf("no logs found")
}
responseContent, _ := json.Marshal(logHits)
return responseContent, nil
}

View file

@ -1,219 +0,0 @@
package clients
import (
"bytes"
"context"
b64 "encoding/base64"
"encoding/json"
"fmt"
elasticlib "github.com/elastic/go-elasticsearch/v7"
"log"
"strconv"
"time"
"openreplay/backend/pkg/messages"
)
type elasticsearch struct {
Host string
Port json.Number
ApiKeyId string //`json:"api_key_id"`
ApiKey string //`json:"api_key"`
Indexes string
}
type elasticsearchLog struct {
Message string
Time time.Time `json:"utc_time"` // Should be parsed automatically from RFC3339
}
func (es *elasticsearch) Request(c *client) error {
address := es.Host + ":" + es.Port.String()
apiKey := b64.StdEncoding.EncodeToString([]byte(es.ApiKeyId + ":" + es.ApiKey))
cfg := elasticlib.Config{
Addresses: []string{
address,
},
APIKey: apiKey,
}
esC, err := elasticlib.NewClient(cfg)
if err != nil {
log.Println("Error while creating new ES client")
log.Println(err)
return err
}
gteTs := c.requestData.GetLastMessageTimestamp() + 1000
log.Printf("gteTs: %v ", gteTs)
var buf bytes.Buffer
query := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"filter": []map[string]interface{}{
{
"match": map[string]interface{}{
"message": map[string]interface{}{
"query": "openReplaySessionToken=", // asayer_session_id=
},
},
},
{
"range": map[string]interface{}{
"utc_time": map[string]interface{}{
"gte": strconv.FormatUint(gteTs, 10),
"lte": "now",
},
},
},
{
"term": map[string]interface{}{
"tags": "error",
},
},
},
},
},
}
if err := json.NewEncoder(&buf).Encode(query); err != nil {
return fmt.Errorf("Error encoding the query: %s", err)
}
res, err := esC.Search(
esC.Search.WithContext(context.Background()),
esC.Search.WithIndex(es.Indexes),
esC.Search.WithSize(1000),
esC.Search.WithScroll(time.Minute*2),
esC.Search.WithBody(&buf),
esC.Search.WithSort("utc_time:asc"),
)
if err != nil {
return fmt.Errorf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
log.Printf("Error parsing the Error response body: %v\n", err)
return fmt.Errorf("Error parsing the Error response body: %v", err)
} else {
log.Printf("Elasticsearch Error [%s] %s: %s\n",
res.Status(),
e["error"],
e["error"],
)
return fmt.Errorf("Elasticsearch Error [%s] %s: %s",
res.Status(),
e["error"],
e["error"],
)
}
}
for {
var esResp map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil {
return fmt.Errorf("Error parsing the response body: %s", err)
// If no error, then convert response to a map[string]interface
}
if _, ok := esResp["hits"]; !ok {
log.Printf("Hits not found in \n%v\n", esResp)
break
}
hits := esResp["hits"].(map[string]interface{})["hits"].([]interface{})
if len(hits) == 0 {
log.Println("No hits found")
break
}
log.Printf("received %d hits", len(hits))
for _, hit := range hits {
// Parse the attributes/fields of the document
doc := hit.(map[string]interface{})
source := doc["_source"].(map[string]interface{})
if _, ok := source["message"]; !ok {
log.Printf("message not found in doc \n%v\n", doc)
c.errChan <- fmt.Errorf("message not found in doc '%v' ", doc)
continue
}
if _, ok := source["utc_time"]; !ok {
log.Printf("utc_time not found in doc \n%v\n", doc)
c.errChan <- fmt.Errorf("utc_time not found in doc '%v' ", doc)
continue
}
parsedTime, err := time.Parse(time.RFC3339, source["utc_time"].(string))
if err != nil {
log.Println("cannot parse time")
c.errChan <- fmt.Errorf("cannot parse RFC3339 time of doc '%v' ", doc)
continue
}
esLog := elasticsearchLog{Message: source["message"].(string), Time: parsedTime}
docID := doc["_id"]
token, err := GetToken(esLog.Message)
if err != nil {
log.Printf("Error generating token: %s\n", err)
c.errChan <- err
continue
}
timestamp := uint64(esLog.Time.UnixMilli())
c.requestData.SetLastMessageTimestamp(timestamp)
var sessionID uint64
sessionID, err = strconv.ParseUint(token, 10, 64)
if err != nil {
log.Printf("Error converting token to uint46: %s\n", err)
sessionID = 0
}
payload, err := json.Marshal(source)
if err != nil {
log.Printf("Error converting source to json: %v\n", source)
continue
}
c.evChan <- &SessionErrorEvent{
//SessionID: sessionID,
SessionID: sessionID,
Token: token,
IntegrationEvent: &messages.IntegrationEvent{
Source: "elasticsearch",
Timestamp: timestamp,
Name: fmt.Sprintf("%v", docID),
Payload: string(payload),
},
}
}
if _, ok := esResp["_scroll_id"]; !ok {
log.Println("_scroll_id not found")
break
}
log.Println("Scrolling...")
scrollId := esResp["_scroll_id"]
res, err = esC.Scroll(
esC.Scroll.WithContext(context.Background()),
esC.Scroll.WithScrollID(fmt.Sprintf("%v", scrollId)),
esC.Scroll.WithScroll(time.Minute*2),
)
if err != nil {
return fmt.Errorf("Error getting scroll response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
return fmt.Errorf("Error parsing the response body: %v", err)
} else {
return fmt.Errorf("Elasticsearch [%s] %s: %s",
res.Status(),
e["error"], //.(map[string]interface{})["type"],
e["error"], //.(map[string]interface{})["reason"],
)
}
}
}
return nil
}

View file

@ -1,469 +0,0 @@
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 10,
"relation" : "eq"
},
"max_score" : null,
"hits" : [
{
"_index" : "kibana_sample_data_logs",
"_type" : "_doc",
"_id" : "TaSPaHABZ3Nzf4SJcTnh",
"_score" : null,
"_source" : {
"referer" : "http://www.elastic-elastic-elastic.com/success/christopher-loria",
"request" : "/elasticsearch",
"agent" : "Mozilla/5.0 (X11; Linux x86_64; rv:6.0a1) Gecko/20110421 Firefox/6.0a1",
"extension" : "",
"memory" : null,
"ip" : "245.169.167.67",
"index" : "kibana_sample_data_logs",
"message" : """[asayer_session_id=123456677890]245.169.167.67 - - [2018-08-03T16:20:11.988Z] "GET /elasticsearch HTTP/1.1" 200 9950 "-" "Mozilla/5.0 (X11; Linux x86_64; rv:6.0a1) Gecko/20110421 Firefox/6.0a1"""",
"url" : "https://www.elastic.co/downloads/elasticsearch",
"tags" : [
"success",
"info",
"error"
],
"geo" : {
"srcdest" : "PK:RW",
"src" : "PK",
"coordinates" : {
"lon" : -76.069,
"lat" : 38.80416667
},
"dest" : "RW"
},
"utc_time" : "2020-02-21T16:20:11.988Z",
"bytes" : 9950,
"machine" : {
"os" : "osx",
"ram" : 18253611008
},
"response" : 200,
"clientip" : "245.169.167.67",
"host" : "www.elastic.co",
"phpmemory" : null,
"timestamp" : "2020-02-21T16:20:11.988Z"
},
"sort" : [
1582302011988
]
},
{
"_index" : "kibana_sample_data_logs",
"_type" : "_doc",
"_id" : "fKSPaHABZ3Nzf4SJcTnh",
"_score" : null,
"_source" : {
"referer" : "http://facebook.com/error/charles-fullerton",
"request" : "/elasticsearch",
"agent" : "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
"extension" : "",
"memory" : null,
"ip" : "26.150.255.63",
"index" : "kibana_sample_data_logs",
"message" : """[asayer_session_id=123456677890]26.150.255.63 - - [2018-08-03T16:20:52.802Z] "GET /elasticsearch HTTP/1.1" 200 7330 "-" "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24"""",
"url" : "https://www.elastic.co/downloads/elasticsearch",
"tags" : [
"success",
"info",
"error"
],
"geo" : {
"srcdest" : "US:FR",
"src" : "US",
"coordinates" : {
"lon" : -82.70288889,
"lat" : 34.80997222
},
"dest" : "FR"
},
"utc_time" : "2020-02-21T16:20:52.802Z",
"bytes" : 7330,
"machine" : {
"os" : "ios",
"ram" : 17179869184
},
"response" : 200,
"clientip" : "26.150.255.63",
"host" : "www.elastic.co",
"phpmemory" : null,
"timestamp" : "2020-02-21T16:20:52.802Z"
},
"sort" : [
1582302052802
]
},
{
"_index" : "kibana_sample_data_logs",
"_type" : "_doc",
"_id" : "QqSPaHABZ3Nzf4SJcTnh",
"_score" : null,
"_source" : {
"referer" : "http://nytimes.com/warning/yelena-kondakova",
"request" : "/enterprise",
"agent" : "Mozilla/5.0 (X11; Linux x86_64; rv:6.0a1) Gecko/20110421 Firefox/6.0a1",
"extension" : "",
"memory" : null,
"ip" : "6.63.231.192",
"index" : "kibana_sample_data_logs",
"message" : """[asayer_session_id=123456677890]6.63.231.192 - - [2018-08-03T16:24:33.154Z] "GET /enterprise HTTP/1.1" 503 0 "-" "Mozilla/5.0 (X11; Linux x86_64; rv:6.0a1) Gecko/20110421 Firefox/6.0a1"""",
"url" : "https://www.elastic.co/downloads/enterprise",
"tags" : [
"success",
"login",
"error"
],
"geo" : {
"srcdest" : "BR:IN",
"src" : "BR",
"coordinates" : {
"lon" : -91.67833333,
"lat" : 40.12916667
},
"dest" : "IN"
},
"utc_time" : "2020-02-21T16:24:33.154Z",
"bytes" : 0,
"machine" : {
"os" : "osx",
"ram" : 21474836480
},
"response" : 503,
"clientip" : "6.63.231.192",
"host" : "www.elastic.co",
"phpmemory" : null,
"timestamp" : "2020-02-21T16:24:33.154Z"
},
"sort" : [
1582302273154
]
},
{
"_index" : "kibana_sample_data_logs",
"_type" : "_doc",
"_id" : "t6SPaHABZ3Nzf4SJcTnh",
"_score" : null,
"_source" : {
"referer" : "http://nytimes.com/success/ronald-grabe",
"request" : "/",
"agent" : "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
"extension" : "",
"memory" : null,
"ip" : "111.58.155.54",
"index" : "kibana_sample_data_logs",
"message" : """[asayer_session_id=123456677890]111.58.155.54 - - [2018-08-03T16:26:12.645Z] "GET / HTTP/1.1" 200 3354 "-" "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24"""",
"url" : "https://www.elastic.co/downloads",
"tags" : [
"success",
"info",
"error"
],
"geo" : {
"srcdest" : "CN:CN",
"src" : "CN",
"coordinates" : {
"lon" : -93.30739306,
"lat" : 31.99071694
},
"dest" : "CN"
},
"utc_time" : "2020-02-21T16:26:12.645Z",
"bytes" : 3354,
"machine" : {
"os" : "win 8",
"ram" : 9663676416
},
"response" : 200,
"clientip" : "111.58.155.54",
"host" : "www.elastic.co",
"phpmemory" : null,
"timestamp" : "2020-02-21T16:26:12.645Z"
},
"sort" : [
1582302372645
]
},
{
"_index" : "kibana_sample_data_logs",
"_type" : "_doc",
"_id" : "lqSPaHABZ3Nzf4SJcTnh",
"_score" : null,
"_source" : {
"referer" : "http://www.elastic-elastic-elastic.com/warning/gregory-h-johnson",
"request" : "/styles/ad-blocker.css",
"agent" : "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
"extension" : "css",
"memory" : null,
"ip" : "85.130.95.75",
"index" : "kibana_sample_data_logs",
"message" : """[asayer_session_id=123456677890]85.130.95.75 - - [2018-08-03T16:36:29.881Z] "GET /styles/ad-blocker.css HTTP/1.1" 200 7755 "-" "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24"""",
"url" : "https://cdn.elastic-elastic-elastic.org/styles/ad-blocker.css",
"tags" : [
"warning",
"login",
"error"
],
"geo" : {
"srcdest" : "US:CD",
"src" : "US",
"coordinates" : {
"lon" : -80.09559417,
"lat" : 26.68316194
},
"dest" : "CD"
},
"utc_time" : "2020-02-21T16:36:29.881Z",
"bytes" : 7755,
"machine" : {
"os" : "win 8",
"ram" : 18253611008
},
"response" : 200,
"clientip" : "85.130.95.75",
"host" : "cdn.elastic-elastic-elastic.org",
"phpmemory" : null,
"timestamp" : "2020-02-21T16:36:29.881Z"
},
"sort" : [
1582302989881
]
},
{
"_index" : "kibana_sample_data_logs",
"_type" : "_doc",
"_id" : "qqSPaHABZ3Nzf4SJcTnh",
"_score" : null,
"_source" : {
"referer" : "http://nytimes.com/success/liu-wang",
"request" : "/elasticsearch",
"agent" : "Mozilla/5.0 (X11; Linux x86_64; rv:6.0a1) Gecko/20110421 Firefox/6.0a1",
"extension" : "",
"memory" : null,
"ip" : "107.238.94.231",
"index" : "kibana_sample_data_logs",
"message" : """[asayer_session_id=123456677890]107.238.94.231 - - [2018-08-03T16:52:30.202Z] "GET /elasticsearch HTTP/1.1" 503 0 "-" "Mozilla/5.0 (X11; Linux x86_64; rv:6.0a1) Gecko/20110421 Firefox/6.0a1"""",
"url" : "https://www.elastic.co/downloads/elasticsearch",
"tags" : [
"success",
"security",
"error"
],
"geo" : {
"srcdest" : "CN:BR",
"src" : "CN",
"coordinates" : {
"lon" : -85.24216667,
"lat" : 37.63355556
},
"dest" : "BR"
},
"utc_time" : "2020-02-21T16:52:30.202Z",
"bytes" : 0,
"machine" : {
"os" : "osx",
"ram" : 9663676416
},
"response" : 503,
"clientip" : "107.238.94.231",
"host" : "www.elastic.co",
"phpmemory" : null,
"timestamp" : "2020-02-21T16:52:30.202Z"
},
"sort" : [
1582303950202
]
},
{
"_index" : "kibana_sample_data_logs",
"_type" : "_doc",
"_id" : "5KSPaHABZ3Nzf4SJcTnh",
"_score" : null,
"_source" : {
"referer" : "http://facebook.com/success/edward-gibson",
"request" : "/beats/filebeat",
"agent" : "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
"extension" : "",
"memory" : null,
"ip" : "231.185.3.235",
"index" : "kibana_sample_data_logs",
"message" : """[asayer_session_id=123456677890]231.185.3.235 - - [2018-08-03T16:54:58.604Z] "GET /beats/filebeat HTTP/1.1" 200 7616 "-" "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24"""",
"url" : "https://www.elastic.co/downloads/beats/filebeat",
"tags" : [
"success",
"security",
"error"
],
"geo" : {
"srcdest" : "BO:IN",
"src" : "BO",
"coordinates" : {
"lon" : -151.1913661,
"lat" : 61.965295
},
"dest" : "IN"
},
"utc_time" : "2020-02-21T16:54:58.604Z",
"bytes" : 7616,
"machine" : {
"os" : "osx",
"ram" : 13958643712
},
"response" : 200,
"clientip" : "231.185.3.235",
"host" : "www.elastic.co",
"phpmemory" : null,
"timestamp" : "2020-02-21T16:54:58.604Z"
},
"sort" : [
1582304098604
]
},
{
"_index" : "kibana_sample_data_logs",
"_type" : "_doc",
"_id" : "6aSPaHABZ3Nzf4SJcTnh",
"_score" : null,
"_source" : {
"referer" : "http://www.elastic-elastic-elastic.com/success/umberto-guidoni",
"request" : "/beats/filebeat/filebeat-6.3.2-linux-x86.tar.gz",
"agent" : "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
"extension" : "gz",
"memory" : null,
"ip" : "251.250.144.158",
"index" : "kibana_sample_data_logs",
"message" : """[asayer_session_id=123456677890]251.250.144.158 - - [2018-08-03T17:01:11.333Z] "GET /beats/filebeat/filebeat-6.3.2-linux-x86.tar.gz HTTP/1.1" 200 9860 "-" "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24"""",
"url" : "https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.3.2-linux-x86.tar.gz",
"tags" : [
"success",
"info",
"error"
],
"geo" : {
"srcdest" : "IN:IT",
"src" : "IN",
"coordinates" : {
"lon" : -92.464455,
"lat" : 39.72870694
},
"dest" : "IT"
},
"utc_time" : "2020-02-21T17:01:11.333Z",
"bytes" : 9860,
"machine" : {
"os" : "ios",
"ram" : 6442450944
},
"response" : 200,
"clientip" : "251.250.144.158",
"host" : "artifacts.elastic.co",
"phpmemory" : null,
"timestamp" : "2020-02-21T17:01:11.333Z"
},
"sort" : [
1582304471333
]
},
{
"_index" : "kibana_sample_data_logs",
"_type" : "_doc",
"_id" : "uaSPaHABZ3Nzf4SJcTnh",
"_score" : null,
"_source" : {
"referer" : "http://www.elastic-elastic-elastic.com/success/daniel-barry",
"request" : "/kibana/kibana-6.3.2-linux-x86_64.tar.gz",
"agent" : "Mozilla/5.0 (X11; Linux x86_64; rv:6.0a1) Gecko/20110421 Firefox/6.0a1",
"extension" : "gz",
"memory" : null,
"ip" : "247.50.93.227",
"index" : "kibana_sample_data_logs",
"message" : """[asayer_session_id=123456677890]247.50.93.227 - - [2018-08-03T17:08:38.706Z] "GET /kibana/kibana-6.3.2-linux-x86_64.tar.gz HTTP/1.1" 200 3119 "-" "Mozilla/5.0 (X11; Linux x86_64; rv:6.0a1) Gecko/20110421 Firefox/6.0a1"""",
"url" : "https://artifacts.elastic.co/downloads/kibana/kibana-6.3.2-linux-x86_64.tar.gz",
"tags" : [
"success",
"info",
"error"
],
"geo" : {
"srcdest" : "IN:US",
"src" : "IN",
"coordinates" : {
"lon" : -90.32484722,
"lat" : 36.77394444
},
"dest" : "US"
},
"utc_time" : "2020-02-21T17:08:38.706Z",
"bytes" : 3119,
"machine" : {
"os" : "osx",
"ram" : 9663676416
},
"response" : 200,
"clientip" : "247.50.93.227",
"host" : "artifacts.elastic.co",
"phpmemory" : null,
"timestamp" : "2020-02-21T17:08:38.706Z"
},
"sort" : [
1582304918706
]
},
{
"_index" : "kibana_sample_data_logs",
"_type" : "_doc",
"_id" : "MKSPaHABZ3Nzf4SJcTnh",
"_score" : null,
"_source" : {
"referer" : "http://twitter.com/success/anthony-w-england",
"request" : "/elasticsearch/elasticsearch-6.3.2.zip",
"agent" : "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
"extension" : "zip",
"memory" : null,
"ip" : "179.153.116.46",
"index" : "kibana_sample_data_logs",
"message" : """[asayer_session_id=123456677890]179.153.116.46 - - [2018-08-03T17:11:18.968Z] "GET /elasticsearch/elasticsearch-6.3.2.zip HTTP/1.1" 200 4685 "-" "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24"""",
"url" : "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.2.zip",
"tags" : [
"success",
"info",
"error"
],
"geo" : {
"srcdest" : "IN:IT",
"src" : "IN",
"coordinates" : {
"lon" : -97.10931306,
"lat" : 40.86525806
},
"dest" : "IT"
},
"utc_time" : "2020-02-21T17:11:18.968Z",
"bytes" : 4685,
"machine" : {
"os" : "ios",
"ram" : 4294967296
},
"response" : 200,
"clientip" : "179.153.116.46",
"host" : "artifacts.elastic.co",
"phpmemory" : null,
"timestamp" : "2020-02-21T17:11:18.968Z"
},
"sort" : [
1582305078968
]
}
]
}
}

View file

@ -1,99 +0,0 @@
package clients
import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"time"
"openreplay/backend/pkg/messages"
)
/*
We use insights-api for query. They also have Logs and Events
*/
// TODO: Eu/us
type newrelic struct {
ApplicationId string //`json:"application_id"`
XQueryKey string //`json:"x_query_key"`
}
// TODO: Recheck
type newrelicResponce struct {
Results []struct {
Events []json.RawMessage
}
}
type newrelicEvent struct {
OpenReplaySessionToken string `json:"openReplaySessionToken"`
ErrorClass string `json:"error.class"`
Timestamp uint64 `json:"timestamp"`
}
func (nr *newrelic) Request(c *client) error {
sinceTs := c.requestData.GetLastMessageTimestamp() + 1000 // From next second
// In docs - format "yyyy-mm-dd HH:MM:ss", but time.RFC3339 works fine too
sinceFormatted := time.UnixMilli(int64(sinceTs)).Format(time.RFC3339)
// US/EU endpoint ??
requestURL := fmt.Sprintf("https://insights-api.eu.newrelic.com/v1/accounts/%v/query", nr.ApplicationId)
req, err := http.NewRequest("GET", requestURL, nil)
if err != nil {
return err
}
// The docks and api are awfull. Seems like SINCE works inclusively.
nrql := fmt.Sprintf("SELECT * FROM TransactionError SINCE '%v' WHERE openReplaySessionToken IS NOT NULL", sinceFormatted)
q := req.URL.Query()
q.Add("nrql", nrql)
req.URL.RawQuery = q.Encode()
req.Header.Add("X-Query-Key", nr.XQueryKey)
req.Header.Add("Accept", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// 401 (unauthorised) if wrong XQueryKey/deploymentServer is wrong or 403 (Forbidden) if ApplicationId is wrong
// 400 if Query has problems
if resp.StatusCode >= 400 {
io.Copy(io.Discard, resp.Body) // Read the body to free socket
return fmt.Errorf("Newrelic: server respond with the code %v| Request: ", resp.StatusCode, *req)
}
// Pagination depending on returning metadata ?
var nrResp newrelicResponce
if err = json.NewDecoder(resp.Body).Decode(&nrResp); err != nil {
return err
}
for _, r := range nrResp.Results {
for _, jsonEvent := range r.Events {
var e newrelicEvent
if err = json.Unmarshal(jsonEvent, &e); err != nil {
c.errChan <- err
continue
}
if e.OpenReplaySessionToken == "" {
c.errChan <- errors.New("Token is empty")
continue
}
c.requestData.SetLastMessageTimestamp(e.Timestamp)
c.evChan <- &SessionErrorEvent{
Token: e.OpenReplaySessionToken,
IntegrationEvent: &messages.IntegrationEvent{
Source: "newrelic",
Timestamp: e.Timestamp,
Name: e.ErrorClass,
Payload: string(jsonEvent),
},
}
}
}
return nil
}

View file

@ -1,153 +0,0 @@
{
"results": [
{
"events": [
{
"aggregateFacet": "WebTransaction/Function/flaskr.auth:register::builtins:ZeroDivisionError",
"appId": 28925082,
"appName": "My Flask Python Application",
"duration": 0.0017523765563964844,
"entityGuid": "MjYyNzMzN3xBUE18QVBQTElDQVRJT058Mjg5MjUwODI",
"error.class": "builtins:ZeroDivisionError",
"error.message": "division by zero",
"host": "new-relic-flask2",
"port": 8080,
"realAgentId": 28925084,
"request.headers.accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"request.headers.host": "35.236.108.92:8080",
"request.headers.referer": "http://35.236.108.92:8080/",
"request.headers.userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36",
"request.method": "GET",
"request.uri": "/auth/register",
"thread.concurrency": 0.9917959662585014,
"timestamp": 1582133149351,
"transactionName": "WebTransaction/Function/flaskr.auth:register",
"transactionUiName": "/flaskr.auth:register"
},
{
"aggregateFacet": "WebTransaction/Function/flaskr.auth:register::builtins:ZeroDivisionError",
"appId": 28925082,
"appName": "My Flask Python Application",
"duration": 0.0011684894561767578,
"entityGuid": "MjYyNzMzN3xBUE18QVBQTElDQVRJT058Mjg5MjUwODI",
"error.class": "builtins:ZeroDivisionError",
"error.message": "division by zero",
"host": "new-relic-flask2",
"port": 8080,
"realAgentId": 28925084,
"request.headers.accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"request.headers.host": "35.236.108.92:8080",
"request.headers.referer": "http://35.236.108.92:8080/",
"request.headers.userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36",
"request.method": "GET",
"request.uri": "/auth/register",
"thread.concurrency": 0.9858882285247911,
"timestamp": 1582133148418,
"transactionName": "WebTransaction/Function/flaskr.auth:register",
"transactionUiName": "/flaskr.auth:register"
},
{
"aggregateFacet": "WebTransaction/Function/flaskr.auth:register::builtins:ZeroDivisionError",
"appId": 28925082,
"appName": "My Flask Python Application",
"duration": 0.0011768341064453125,
"entityGuid": "MjYyNzMzN3xBUE18QVBQTElDQVRJT058Mjg5MjUwODI",
"error.class": "builtins:ZeroDivisionError",
"error.message": "division by zero",
"host": "new-relic-flask2",
"port": 8080,
"realAgentId": 28925084,
"request.headers.accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"request.headers.host": "35.236.108.92:8080",
"request.headers.referer": "http://35.236.108.92:8080/",
"request.headers.userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36",
"request.method": "GET",
"request.uri": "/auth/register",
"thread.concurrency": 0.9890943792544556,
"timestamp": 1582133147569,
"transactionName": "WebTransaction/Function/flaskr.auth:register",
"transactionUiName": "/flaskr.auth:register"
},
{
"aggregateFacet": "WebTransaction/Function/flaskr.auth:register::builtins:ZeroDivisionError",
"appId": 28925082,
"appName": "My Flask Python Application",
"duration": 0.0011293888092041016,
"entityGuid": "MjYyNzMzN3xBUE18QVBQTElDQVRJT058Mjg5MjUwODI",
"error.class": "builtins:ZeroDivisionError",
"error.message": "division by zero",
"host": "new-relic-flask2",
"port": 8080,
"realAgentId": 28925084,
"request.headers.accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"request.headers.host": "35.236.108.92:8080",
"request.headers.referer": "http://35.236.108.92:8080/",
"request.headers.userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36",
"request.method": "GET",
"request.uri": "/auth/register",
"thread.concurrency": 0.986374214903945,
"timestamp": 1582133146601,
"transactionName": "WebTransaction/Function/flaskr.auth:register",
"transactionUiName": "/flaskr.auth:register"
},
{
"aggregateFacet": "WebTransaction/Function/flaskr.auth:register::builtins:ZeroDivisionError",
"appId": 28925082,
"appName": "My Flask Python Application",
"duration": 0.0011687278747558594,
"entityGuid": "MjYyNzMzN3xBUE18QVBQTElDQVRJT058Mjg5MjUwODI",
"error.class": "builtins:ZeroDivisionError",
"error.message": "division by zero",
"host": "new-relic-flask2",
"port": 8080,
"realAgentId": 28925084,
"request.headers.accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"request.headers.host": "35.236.108.92:8080",
"request.headers.referer": "http://35.236.108.92:8080/",
"request.headers.userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36",
"request.method": "GET",
"request.uri": "/auth/register",
"thread.concurrency": 0.9839758465932258,
"timestamp": 1582133139999,
"transactionName": "WebTransaction/Function/flaskr.auth:register",
"transactionUiName": "/flaskr.auth:register"
}
]
}
],
"performanceStats": {
"inspectedCount": 5,
"omittedCount": 0,
"matchCount": 5,
"wallClockTime": 18
},
"metadata": {
"eventTypes": [
"TransactionError"
],
"eventType": "TransactionError",
"openEnded": false,
"beginTime": "2020-02-18T18:00:00Z",
"endTime": "2020-02-19T19:38:58Z",
"beginTimeMillis": 1582048800000,
"endTimeMillis": 1582141138000,
"rawSince": "'2020-02-18T18:00:00Z'",
"rawUntil": "'2020-02-19T19:38:58Z'",
"rawCompareWith": "",
"guid": "5890296a-5092-5fcc-f05d-b55dbeb8a500",
"routerGuid": "729a50a2-a73f-5d88-cd1f-ea0a9ace54a6",
"messages": [
"Your query's end time is in the future. Results may be incomplete."
],
"contents": [
{
"function": "events",
"limit": 100,
"order": {
"column": "timestamp",
"descending": true
}
}
]
}
}

View file

@ -1,42 +0,0 @@
{
"results": [
{
"events": []
}
],
"performanceStats": {
"inspectedCount": 0,
"omittedCount": 0,
"matchCount": 0,
"wallClockTime": 10
},
"metadata": {
"eventTypes": [
"TransactionError"
],
"eventType": "TransactionError",
"openEnded": true,
"beginTime": "2014-08-04T00:00:00Z",
"endTime": "2020-02-14T18:04:23Z",
"beginTimeMillis": 1407110400000,
"endTimeMillis": 1581703463865,
"rawSince": "'2014-08-04 00:00:00'",
"rawUntil": "NOW",
"rawCompareWith": "",
"guid": "27d3fd61-735a-ab46-51b4-f96f07226d2b",
"routerGuid": "979f0b32-b13b-fad1-7577-60f2025da513",
"messages": [
"No events found -- do you have the correct event type and time range?"
],
"contents": [
{
"function": "events",
"limit": 100,
"order": {
"column": "timestamp",
"descending": true
}
}
]
}
}

View file

@ -1,175 +0,0 @@
package clients
import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"openreplay/backend/pkg/messages"
)
// Old name: asayerSessionId
// QUERY: what can be modified?
const RB_QUERY = "SELECT item.id, item.title,body.message.openReplaySessionToken,item.level," +
" item.counter,item.environment,body.crash_report.raw,body.message.body,timestamp" +
" FROM item_occurrence" +
" WHERE body.message.openReplaySessionToken != null" +
" AND timestamp>= %v" +
" AND item.level>30" +
" ORDER BY timestamp" +
" LIMIT 1000"
// ASC by default
// \n\t symbols can spoil the request body, so it wouldn't work (OR probably it happend because of job hashing)
/*
- `read` Access Token required
- timstamp in seconds
*/
type rollbar struct {
AccessToken string // `json:"access_token"`
}
type rollbarJobResponce struct {
Err int
Message string
Result struct {
Id int
}
}
type rollbarJobStatusResponce struct {
Err int
Result struct {
Status string
Result struct {
Rows [][]json.Number
Columns []string
}
}
}
type rollbarEvent map[string]string
/*
It is possible to use /api/1/instances (20 per page)
Jobs for the identical requests are hashed
*/
func (rb *rollbar) Request(c *client) error {
fromTs := c.requestData.GetLastMessageTimestamp() + 1000 // From next second
c.requestData.SetLastMessageTimestamp(fromTs) // anti-job-hashing
fromTsSec := fromTs / 1e3
query := fmt.Sprintf(RB_QUERY, fromTsSec)
jsonBody := fmt.Sprintf(`{
"access_token": "%v",
"query_string": "%v"
}`, rb.AccessToken, query)
req, err := http.NewRequest("POST", "https://api.rollbar.com/api/1/rql/jobs", strings.NewReader(jsonBody))
if err != nil {
return err
}
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Accept", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// status != 200 || 201
// status can be 403 then should report about wrong token
if resp.StatusCode >= 400 {
io.Copy(io.Discard, resp.Body) // Read the body to free socket
return fmt.Errorf("Rollbar: server respond with the code %v", resp.StatusCode)
}
var jobResponce rollbarJobResponce
if err = json.NewDecoder(resp.Body).Decode(&jobResponce); err != nil {
return err
}
if jobResponce.Err != 0 {
return fmt.Errorf("Rollbar job responce error: %v", jobResponce.Message)
}
requestURL := fmt.Sprintf(
"https://api.rollbar.com/api/1/rql/job/%v?access_token=%v&expand=result",
jobResponce.Result.Id,
rb.AccessToken,
)
req, err = http.NewRequest("GET", requestURL, nil)
if err != nil {
return err
}
tick := time.Tick(5 * time.Second)
for {
<-tick
resp, err = http.DefaultClient.Do(req)
if err != nil {
return err // continue + timeout/maxAttempts
}
defer resp.Body.Close()
// status != 200
var jobStatus rollbarJobStatusResponce
err := json.NewDecoder(resp.Body).Decode(&jobStatus)
if err != nil {
return err
}
//todo: pagintation; limit: 1000
if jobStatus.Result.Status == "success" {
for _, row := range jobStatus.Result.Result.Rows {
e := make(rollbarEvent)
for i, col := range jobStatus.Result.Result.Columns {
//if len(row) <= i { error }
e[col] = row[i].String() // here I make them all string. That's not good
}
// sessionID, err := strconv.ParseUint(e[ "body.message.asayerSessionId" ], 10, 64)
// if err != nil {
// c.errChan <- err
// continue
// }
if e["body.message.openReplaySessionToken"] == "" {
c.errChan <- errors.New("Token is empty!")
continue
}
payload, err := json.Marshal(e)
if err != nil {
c.errChan <- err
continue
}
timestampSec, err := strconv.ParseUint(e["timestamp"], 10, 64)
if err != nil {
c.errChan <- err
continue
}
timestamp := timestampSec * 1000
c.requestData.SetLastMessageTimestamp(timestamp)
c.evChan <- &SessionErrorEvent{
Token: e["body.message.openReplaySessionToken"],
IntegrationEvent: &messages.IntegrationEvent{
Source: "rollbar",
Timestamp: timestamp,
Name: e["item.title"],
Payload: string(payload),
},
}
}
break
}
if jobStatus.Result.Status != "new" &&
jobStatus.Result.Status != "running" {
// error
break
}
}
return nil
}

View file

@ -1,4 +0,0 @@
{
"err": 1,
"message": "insufficient privileges: read scope is required but the access token only has post_server_item."
}

View file

@ -1,63 +0,0 @@
{
"err": 0,
"result": {
"status": "success",
"job_hash": "2d7141f57204ea9cfe59304beed947313b538608",
"result": {
"isSimpleSelect": true,
"errors": [
],
"warnings": [
],
"executionTime": 0.021143198013305664,
"effectiveTimestamp": 1582125712,
"rowcount": 1,
"rows": [
[
866038147,
"division by zero",
"250250",
40,
2,
"production",
null,
"division by zero",
1582125312,
115079899839
]
],
"selectionColumns": [
"item.id",
"item.title",
"body.message.asayerSessionId",
"item.level",
"item.counter",
"item.environment",
"body.crash_report.raw",
"body.message.body",
"timestamp"
],
"projectIds": [
349886
],
"columns": [
"item.id",
"item.title",
"body.message.asayerSessionId",
"item.level",
"item.counter",
"item.environment",
"body.crash_report.raw",
"body.message.body",
"timestamp",
"occurrence_id"
]
},
"date_modified": 1582125712,
"query_string": "SELECT item.id, item.title,body.message.asayerSessionId,item.level,item.counter,item.environment,body.crash_report.raw,body.message.body,timestamp FROM item_occurrence WHERE body.message.asayerSessionId != null\tAND timestamp>= 1582124402\tAND item.level>30\tORDER BY timestamp\tLIMIT 1000",
"date_created": 1582125712,
"project_id": 349886,
"id": 82765415,
"project_group_id": 16555
}
}

View file

@ -1,13 +0,0 @@
{
"err": 0,
"result": {
"status": "new",
"job_hash": "2d7141f57204ea9cfe59304beed947313b538608",
"date_modified": 1582125712,
"query_string": "SELECT item.id, item.title,body.message.asayerSessionId,item.level,item.counter,item.environment,body.crash_report.raw,body.message.body,timestamp FROM item_occurrence WHERE body.message.asayerSessionId != null\tAND timestamp>= 1582124402\tAND item.level>30\tORDER BY timestamp\tLIMIT 1000",
"date_created": 1582125712,
"project_id": 349886,
"id": 82765415,
"project_group_id": 16555
}
}

View file

@ -4,147 +4,102 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"openreplay/backend/pkg/messages"
)
/*
They also have different stuff
Documentation says:
"Note: This endpoint is experimental and may be removed without notice."
*/
type sentry struct {
OrganizationSlug string // `json:"organization_slug"`
ProjectSlug string // `json:"project_slug"`
Token string // `json:"token"`
type sentryClient struct {
//
}
type sentryEvent struct {
Tags []struct {
Key string
Value string `json:"value"`
func NewSentryClient() Client {
return &sentryClient{}
}
type sentryConfig struct {
OrganizationSlug string `json:"organization_slug"`
ProjectSlug string `json:"project_slug"`
Token string `json:"token"`
}
type SentryEvent struct {
ID string `json:"id"`
Title string `json:"title"`
Message string `json:"message"`
Environment string `json:"environment"`
}
func (s *sentryClient) FetchSessionData(credentials interface{}, sessionID uint64) (interface{}, error) {
cfg, ok := credentials.(sentryConfig)
if !ok {
strCfg, ok := credentials.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid credentials, got: %+v", credentials)
}
cfg = sentryConfig{}
if val, ok := strCfg["organization_slug"].(string); ok {
cfg.OrganizationSlug = val
}
if val, ok := strCfg["project_slug"].(string); ok {
cfg.ProjectSlug = val
}
if val, ok := strCfg["token"].(string); ok {
cfg.Token = val
}
}
DateCreated string `json:"dateCreated"` // or dateReceived ?
Title string
EventID string `json:"eventID"`
}
requestUrl := fmt.Sprintf("https://sentry.io/api/0/projects/%s/%s/events/", cfg.OrganizationSlug, cfg.ProjectSlug)
func (sn *sentry) Request(c *client) error {
requestURL := fmt.Sprintf("https://sentry.io/api/0/projects/%v/%v/events/", sn.OrganizationSlug, sn.ProjectSlug)
req, err := http.NewRequest("GET", requestURL, nil)
testCallLimit := 1
params := url.Values{}
if sessionID != 0 {
params.Add("query", fmt.Sprintf("openReplaySession.id=%d", sessionID))
} else {
params.Add("per_page", fmt.Sprintf("%d", testCallLimit))
}
requestUrl += "?" + params.Encode()
// Create a new request
req, err := http.NewRequest("GET", requestUrl, nil)
if err != nil {
return err
log.Fatalf("Failed to create request: %v", err)
}
authHeader := "Bearer " + sn.Token
req.Header.Add("Authorization", authHeader)
// by link ?
lastEventId := c.requestData.GetLastMessageId()
firstEvent := true
// Add Authorization header
req.Header.Set("Authorization", "Bearer "+cfg.Token)
PageLoop:
for {
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
io.Copy(io.Discard, resp.Body) // Read the body to free socket
return fmt.Errorf("Sentry: server respond with the code %v", resp.StatusCode)
}
var jsonEventList []json.RawMessage
err = json.NewDecoder(resp.Body).Decode(&jsonEventList)
if err != nil {
return err
}
for _, jsonEvent := range jsonEventList {
var e sentryEvent
err = json.Unmarshal(jsonEvent, &e)
if err != nil {
c.errChan <- err
continue
}
if lastEventId == e.EventID {
break PageLoop
}
parsedTime, err := time.Parse(time.RFC3339, e.DateCreated)
if err != nil {
c.errChan <- fmt.Errorf("%v | Event: %v", err, e)
continue
}
timestamp := uint64(parsedTime.UnixMilli())
// TODO: not to receive all the messages (use default integration timestamp)
if firstEvent { // TODO: reverse range?
c.requestData.SetLastMessageId(timestamp, e.EventID)
firstEvent = false
}
var sessionID uint64
var token string
for _, tag := range e.Tags {
if tag.Key == "openReplaySessionToken" {
token = tag.Value
break
}
if tag.Key == "asayer_session_id" {
sessionID, err = strconv.ParseUint(tag.Value, 10, 64)
break
}
}
if err != nil {
c.errChan <- err
continue
}
if token == "" && sessionID == 0 { // We can't felter them on request
continue
}
c.evChan <- &SessionErrorEvent{
SessionID: sessionID,
Token: token,
IntegrationEvent: &messages.IntegrationEvent{
Source: "sentry",
Timestamp: timestamp,
Name: e.Title,
Payload: string(jsonEvent),
},
}
}
// check link before parsing body?
linkHeader := resp.Header.Get("Link")
if linkHeader == "" {
return fmt.Errorf("No Link header found in the responce.")
}
pagInfo := strings.Split(linkHeader, ",")
if len(pagInfo) < 2 {
return fmt.Errorf("Link header format error. Got: '%v'", linkHeader)
}
nextLinkInfo := pagInfo[1]
if strings.Contains(nextLinkInfo, `results="false"`) {
break
}
if !strings.Contains(nextLinkInfo, `results="true"`) {
return fmt.Errorf("Link header format error. Results status not found. Got: '%v'", linkHeader)
}
nextLink := GetLinkFromAngularBrackets(nextLinkInfo)
req.URL, err = url.Parse(nextLink)
if err != nil {
return err
}
// Send the request
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Fatalf("Failed to send request: %v", err)
}
return nil
defer resp.Body.Close()
// Check if the response status is OK
if resp.StatusCode != http.StatusOK {
log.Fatalf("Failed to fetch logs, status code: %v", resp.StatusCode)
}
// Read the response body
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Fatalf("Failed to read response body: %v", err)
}
// Parse the JSON response
var events []SentryEvent
err = json.Unmarshal(body, &events)
if err != nil {
log.Fatalf("Failed to parse JSON: %v", err)
}
if events == nil || len(events) == 0 {
return nil, fmt.Errorf("no logs found")
}
result, err := json.Marshal(events)
if err != nil {
return nil, err
}
return result, nil
}

File diff suppressed because it is too large Load diff

View file

@ -1,100 +0,0 @@
package clients
import (
"cloud.google.com/go/logging/logadmin"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
//"strconv"
"context"
"encoding/json"
"fmt"
"time"
"openreplay/backend/pkg/messages"
)
// Old: asayerSessionId
const SD_FILTER_QUERY = `
logName = "projects/%v/logs/%v"
labels.openReplaySessionToken!=null AND
severity>=ERROR AND
timestamp>="%v"
`
type stackdriver struct {
ServiceAccountCredentials string // `json:"service_account_credentials"`
LogName string // `json:"log_name"`
}
type saCreds struct {
ProjectId string `json:"project_id"`
}
func (sd *stackdriver) Request(c *client) error {
fromTs := c.requestData.GetLastMessageTimestamp() + 1 // Timestamp is RFC3339Nano, so we take the next millisecond
fromFormatted := time.UnixMilli(int64(fromTs)).Format(time.RFC3339Nano)
ctx := context.Background()
var parsedCreds saCreds
err := json.Unmarshal([]byte(sd.ServiceAccountCredentials), &parsedCreds)
if err != nil {
return err
}
opt := option.WithCredentialsJSON([]byte(sd.ServiceAccountCredentials))
client, err := logadmin.NewClient(ctx, parsedCreds.ProjectId, opt)
if err != nil {
return err
}
defer client.Close()
filter := fmt.Sprintf(SD_FILTER_QUERY, parsedCreds.ProjectId, sd.LogName, fromFormatted)
// By default, Entries are listed from oldest to newest.
/* ResourceNames(rns []string)
"projects/[PROJECT_ID]"
"organizations/[ORGANIZATION_ID]"
"billingAccounts/[BILLING_ACCOUNT_ID]"
"folders/[FOLDER_ID]"
*/
it := client.Entries(ctx, logadmin.Filter(filter))
// TODO: Pagination:
//pager := iterator.NewPager(it, 1000, "")
//nextToken, err := pager.NextPage(&entries)
//if nextToken == "" { break }
for {
e, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
token := e.Labels["openReplaySessionToken"]
// sessionID, err := strconv.ParseUint(strSessionID, 10, 64)
// if err != nil {
// c.errChan <- err
// continue
// }
jsonEvent, err := json.Marshal(e)
if err != nil {
c.errChan <- err
continue
}
timestamp := uint64(e.Timestamp.UnixMilli())
c.requestData.SetLastMessageTimestamp(timestamp)
c.evChan <- &SessionErrorEvent{
//SessionID: sessionID,
Token: token,
IntegrationEvent: &messages.IntegrationEvent{
Source: "stackdriver",
Timestamp: timestamp,
Name: e.InsertID, // not sure about that
Payload: string(jsonEvent),
},
}
}
return nil
}

View file

@ -1,215 +0,0 @@
package clients
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
"openreplay/backend/pkg/messages"
)
/*
The maximum value for limit is 10,000 messages or 100 MB in total message size,
which means the query may return less than 10,000 messages if you exceed the size limit.
API Documentation: https://help.sumologic.com/APIs/Search-Job-API/About-the-Search-Job-API
*/
const SL_LIMIT = 10000
type sumologic struct {
AccessId string // `json:"access_id"`
AccessKey string // `json:"access_key"`
cookies []*http.Cookie
}
type sumplogicJobResponce struct {
Id string
}
type sumologicJobStatusResponce struct {
State string
MessageCount int
//PendingErrors []string
}
type sumologicResponce struct {
Messages []struct {
Map json.RawMessage
}
}
type sumologicEvent struct {
Timestamp uint64 `json:"_messagetime,string"`
Raw string `json:"_raw"`
}
func (sl *sumologic) deleteJob(jobId string, errChan chan<- error) {
requestURL := fmt.Sprintf("https://api.%vsumologic.com/api/v1/search/jobs/%v", "eu.", jobId)
req, err := http.NewRequest("DELETE", requestURL, nil)
if err != nil {
errChan <- fmt.Errorf("Error on DELETE request creation: %v", err)
return
}
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Accept", "application/json")
req.SetBasicAuth(sl.AccessId, sl.AccessKey)
resp, err := http.DefaultClient.Do(req)
if err != nil {
errChan <- fmt.Errorf("Error on DELETE request: %v", err)
return
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}
func (sl *sumologic) Request(c *client) error {
fromTs := c.requestData.GetLastMessageTimestamp() + 1 // From next millisecond
toTs := time.Now().UnixMilli()
requestURL := fmt.Sprintf("https://api.%vsumologic.com/api/v1/search/jobs", "eu.") // deployment server??
jsonBody := fmt.Sprintf(`{
"query": "\"openReplaySessionToken=\" AND (*error* OR *fail* OR *exception*)",
"from": %v,
"to": %v
}`, fromTs, toTs) // docs and api are awful. from/to seems to work inclusively
req, err := http.NewRequest("POST", requestURL, strings.NewReader(jsonBody))
if err != nil {
return err
}
//q := req.URL.Query()
//q.Add("query", "\"asayer_session_id=\" AND (*error* OR *fail* OR *exception*)")
//q.Add("from", )
//q.Add("to")
//q.Add("timeZone", "UTC")
//q.Add("byReceiptTime", "true")
for _, cookie := range sl.cookies {
req.AddCookie(cookie)
}
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Accept", "application/json")
req.SetBasicAuth(sl.AccessId, sl.AccessKey)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("Error while requesting search job start: %v", err)
}
defer resp.Body.Close()
// Can be 202/400/415 according to docs
// https://help.sumologic.com/APIs/Search-Job-API/About-the-Search-Job-API#status-codes
// responce body is NOT the same as in docs (look at the sumologic_job_start.json)
if resp.StatusCode >= 400 {
io.Copy(io.Discard, resp.Body) // Read the body to free socket
return fmt.Errorf("Sumologic: server respond with the code %v | req %v |Resp: %v", resp.StatusCode, *req, *resp)
}
sl.cookies = resp.Cookies()
var jobResponce sumplogicJobResponce
if err = json.NewDecoder(resp.Body).Decode(&jobResponce); err != nil {
return fmt.Errorf("Error on parsing responce: %v", err)
}
defer sl.deleteJob(jobResponce.Id, c.errChan)
requestURL = fmt.Sprintf("https://api.%vsumologic.com/api/v1/search/jobs/%v", "eu.", jobResponce.Id)
req, err = http.NewRequest("GET", requestURL, nil)
if err != nil {
return err
}
req.Header.Add("Accept", "application/json")
req.SetBasicAuth(sl.AccessId, sl.AccessKey)
for _, cookie := range sl.cookies {
req.AddCookie(cookie)
}
tick := time.Tick(5 * time.Second)
for {
<-tick
resp, err = http.DefaultClient.Do(req)
if err != nil {
return err // TODO: retry, counter/timeout
}
defer resp.Body.Close()
// TODO: check resp.StatusCode
//sl.cookies = resp.Cookies() TODO?
var jobStatus sumologicJobStatusResponce
err := json.NewDecoder(resp.Body).Decode(&jobStatus)
if err != nil {
return err // TODO: retry, counter/timeout
}
if jobStatus.State == "DONE GATHERING RESULTS" {
offset := 0
for offset < jobStatus.MessageCount {
requestURL = fmt.Sprintf(
"https://api.%vsumologic.com/api/v1/search/jobs/%v/messages?offset=%v&limit=%v",
"eu.",
jobResponce.Id,
offset,
SL_LIMIT,
)
req, err = http.NewRequest("GET", requestURL, nil)
if err != nil {
return err // TODO: retry, counter/timeout
}
req.Header.Add("Accept", "application/json")
req.SetBasicAuth(sl.AccessId, sl.AccessKey)
for _, cookie := range sl.cookies {
req.AddCookie(cookie)
}
resp, err = http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
var slResp sumologicResponce
err := json.NewDecoder(resp.Body).Decode(&slResp)
if err != nil {
return err
}
for _, m := range slResp.Messages {
var e sumologicEvent
err = json.Unmarshal(m.Map, &e)
if err != nil {
c.errChan <- err
continue
}
token, err := GetToken(e.Raw)
if err != nil {
c.errChan <- err
continue
}
name := e.Raw
if len(name) > 20 {
name = name[:20] // not sure about that
}
c.requestData.SetLastMessageTimestamp(e.Timestamp)
c.evChan <- &SessionErrorEvent{
//SessionID: sessionID,
Token: token,
IntegrationEvent: &messages.IntegrationEvent{
Source: "sumologic",
Timestamp: e.Timestamp,
Name: name,
Payload: string(m.Map), //e.Raw ?
},
}
}
offset += len(slResp.Messages)
}
break
}
if jobStatus.State != "NOT STARTED" &&
jobStatus.State != "GATHERING RESULTS" {
// error
break
}
}
return nil
}

View file

@ -1,6 +0,0 @@
{
"status": 400,
"id": "EV5YQ-N96Y8-RNDUF",
"code": "searchjob.offset.missing",
"message": "Offset is missing."
}

View file

@ -1,146 +0,0 @@
{
"fields": [
{
"name": "_blockid",
"fieldType": "long",
"keyField": false
},
{
"name": "_collector",
"fieldType": "string",
"keyField": false
},
{
"name": "_collectorid",
"fieldType": "long",
"keyField": false
},
{
"name": "_format",
"fieldType": "string",
"keyField": false
},
{
"name": "_messagecount",
"fieldType": "int",
"keyField": false
},
{
"name": "_messageid",
"fieldType": "long",
"keyField": false
},
{
"name": "_messagetime",
"fieldType": "long",
"keyField": false
},
{
"name": "_raw",
"fieldType": "string",
"keyField": false
},
{
"name": "_receipttime",
"fieldType": "long",
"keyField": false
},
{
"name": "_size",
"fieldType": "long",
"keyField": false
},
{
"name": "_source",
"fieldType": "string",
"keyField": false
},
{
"name": "_sourcecategory",
"fieldType": "string",
"keyField": false
},
{
"name": "_sourcehost",
"fieldType": "string",
"keyField": false
},
{
"name": "_sourceid",
"fieldType": "long",
"keyField": false
},
{
"name": "_sourcename",
"fieldType": "string",
"keyField": false
},
{
"name": "_view",
"fieldType": "string",
"keyField": false
}
],
"messages": [
{
"map": {
"_blockid": "-9223372036854750790",
"_messagetime": "1582286762789",
"_raw": "ZeroDivisionError: [asayer_session_id=9999999999] division by zero",
"_collectorid": "108084884",
"_sourceid": "148051672",
"_collector": "new-relic-flask2",
"_messagecount": "11",
"_sourcehost": "new-relic-flask2",
"_messageid": "-9223372036854423342",
"_sourcename": "/home/tahay_asayer_io/tutorial/example.log",
"_size": "66",
"_view": "",
"_receipttime": "1582286762789",
"_sourcecategory": "linux/system",
"_format": "t:fail:o:-1:l:0:p:null",
"_source": "Linux System_2"
}
},
{
"map": {
"_blockid": "-9223372036854750791",
"_messagetime": "1582286727679",
"_raw": "ZeroDivisionError: [asayer_session_id=9999999999] division by zero",
"_collectorid": "108084884",
"_sourceid": "148051672",
"_collector": "new-relic-flask2",
"_messagecount": "8",
"_sourcehost": "new-relic-flask2",
"_messageid": "-9223372036854423345",
"_sourcename": "/home/tahay_asayer_io/tutorial/example.log",
"_size": "66",
"_view": "",
"_receipttime": "1582286727679",
"_sourcecategory": "linux/system",
"_format": "t:fail:o:-1:l:0:p:null",
"_source": "Linux System_2"
}
},
{
"map": {
"_blockid": "-9223372036854750791",
"_messagetime": "1582286723677",
"_raw": "ZeroDivisionError: [asayer_session_id=9999999999] division by zero",
"_collectorid": "108084884",
"_sourceid": "148051672",
"_collector": "new-relic-flask2",
"_messagecount": "5",
"_sourcehost": "new-relic-flask2",
"_messageid": "-9223372036854423348",
"_sourcename": "/home/tahay_asayer_io/tutorial/example.log",
"_size": "66",
"_view": "",
"_receipttime": "1582286723677",
"_sourcecategory": "linux/system",
"_format": "t:fail:o:-1:l:0:p:null",
"_source": "Linux System_2"
}
}
]
}

View file

@ -1,7 +0,0 @@
{
"id":"6F17F4197B438B68",
"link": {
"rel":"self",
"href":"https://api.eu.sumologic.com/api/v1/search/jobs/6F17F4197B438B68"
}
}

View file

@ -1,16 +0,0 @@
{
"state": "DONE GATHERING RESULTS",
"histogramBuckets": [
{
"startTimestamp": 1582243200000,
"length": 55087270,
"count": 194
}
],
"messageCount": 194,
"recordCount": 0,
"pendingWarnings": [
],
"pendingErrors": [
]
}

View file

@ -1,26 +0,0 @@
package clients
import (
"fmt"
"regexp"
"strings"
)
func GetLinkFromAngularBrackets(s string) string {
beg := strings.Index(s, "<") + 1
end := strings.Index(s, ">")
if end < 0 {
return ""
}
return strings.TrimSpace(s[beg:end])
}
var reToken = regexp.MustCompile(`(?i)openReplaySessionToken=([0-9a-zA-Z\.]+)`)
func GetToken(s string) (string, error) {
matches := reToken.FindStringSubmatch(s)
if len(matches) < 2 {
return "", fmt.Errorf("'openReplaySessionToken' not found in '%v' ", s)
}
return matches[1], nil
}

View file

@ -0,0 +1,224 @@
package data_integration
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"time"
"github.com/gorilla/mux"
metrics "openreplay/backend/pkg/metrics/heuristics"
)
func getIntegrationsArgs(r *http.Request) (string, uint64, error) {
vars := mux.Vars(r)
name := vars["name"]
if name == "" {
return "", 0, fmt.Errorf("empty integration name")
}
project := vars["project"]
if project == "" {
return "", 0, fmt.Errorf("project id is empty")
}
projID, err := strconv.ParseUint(project, 10, 64)
if err != nil || projID <= 0 {
return "", 0, fmt.Errorf("invalid project id")
}
return name, projID, nil
}
func getIntegrationSession(r *http.Request) (uint64, error) {
vars := mux.Vars(r)
session := vars["session"]
if session == "" {
return 0, fmt.Errorf("session id is empty")
}
sessID, err := strconv.ParseUint(session, 10, 64)
if err != nil || sessID <= 0 {
return 0, fmt.Errorf("invalid session id")
}
return sessID, nil
}
type IntegrationRequest struct {
IntegrationData map[string]string `json:"data"`
}
func (e *Router) createIntegration(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
bodyBytes, err := e.readBody(w, r, e.cfg.JsonSizeLimit)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize)
return
}
bodySize = len(bodyBytes)
integration, project, err := getIntegrationsArgs(r)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
req := &IntegrationRequest{}
if err := json.Unmarshal(bodyBytes, req); err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
if err := e.services.Integrator.AddIntegration(project, integration, req.IntegrationData); err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
e.ResponseOK(r.Context(), w, startTime, r.URL.Path, bodySize)
}
func (e *Router) getIntegration(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
integration, project, err := getIntegrationsArgs(r)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
intParams, err := e.services.Integrator.GetIntegration(project, integration)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
e.ResponseWithJSON(r.Context(), w, intParams, startTime, r.URL.Path, bodySize)
}
func (e *Router) updateIntegration(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
bodyBytes, err := e.readBody(w, r, e.cfg.JsonSizeLimit)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusRequestEntityTooLarge, err, startTime, r.URL.Path, bodySize)
return
}
bodySize = len(bodyBytes)
integration, project, err := getIntegrationsArgs(r)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
req := &IntegrationRequest{}
if err := json.Unmarshal(bodyBytes, req); err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
if err := e.services.Integrator.UpdateIntegration(project, integration, req.IntegrationData); err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
e.ResponseOK(r.Context(), w, startTime, r.URL.Path, bodySize)
}
func (e *Router) deleteIntegration(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
integration, project, err := getIntegrationsArgs(r)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
if err := e.services.Integrator.DeleteIntegration(project, integration); err != nil {
e.ResponseWithError(r.Context(), w, http.StatusInternalServerError, err, startTime, r.URL.Path, bodySize)
return
}
e.ResponseOK(r.Context(), w, startTime, r.URL.Path, bodySize)
}
func (e *Router) getIntegrationData(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
bodySize := 0
integration, project, err := getIntegrationsArgs(r)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
session, err := getIntegrationSession(r)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
url, err := e.services.Integrator.GetSessionDataURL(project, integration, session)
if err != nil {
e.ResponseWithError(r.Context(), w, http.StatusBadRequest, err, startTime, r.URL.Path, bodySize)
return
}
resp := map[string]string{"url": url}
e.ResponseWithJSON(r.Context(), w, resp, startTime, r.URL.Path, bodySize)
}
func recordMetrics(requestStart time.Time, url string, code, bodySize int) {
if bodySize > 0 {
metrics.RecordRequestSize(float64(bodySize), url, code)
}
metrics.IncreaseTotalRequests()
metrics.RecordRequestDuration(float64(time.Now().Sub(requestStart).Milliseconds()), url, code)
}
func (e *Router) readBody(w http.ResponseWriter, r *http.Request, limit int64) ([]byte, error) {
body := http.MaxBytesReader(w, r.Body, limit)
bodyBytes, err := io.ReadAll(body)
// Close body
if closeErr := body.Close(); closeErr != nil {
e.log.Warn(r.Context(), "error while closing request body: %s", closeErr)
}
if err != nil {
return nil, err
}
return bodyBytes, nil
}
func (e *Router) ResponseOK(ctx context.Context, w http.ResponseWriter, requestStart time.Time, url string, bodySize int) {
w.WriteHeader(http.StatusOK)
e.log.Info(ctx, "response ok")
recordMetrics(requestStart, url, http.StatusOK, bodySize)
}
func (e *Router) ResponseWithJSON(ctx context.Context, w http.ResponseWriter, res interface{}, requestStart time.Time, url string, bodySize int) {
e.log.Info(ctx, "response ok")
body, err := json.Marshal(res)
if err != nil {
e.log.Error(ctx, "can't marshal response: %s", err)
}
w.Header().Set("Content-Type", "application/json")
w.Write(body)
recordMetrics(requestStart, url, http.StatusOK, bodySize)
}
type response struct {
Error string `json:"error"`
}
func (e *Router) ResponseWithError(ctx context.Context, w http.ResponseWriter, code int, err error, requestStart time.Time, url string, bodySize int) {
e.log.Error(ctx, "response error, code: %d, error: %s", code, err)
body, err := json.Marshal(&response{err.Error()})
if err != nil {
e.log.Error(ctx, "can't marshal response: %s", err)
}
w.WriteHeader(code)
w.Write(body)
recordMetrics(requestStart, url, code, bodySize)
}

View file

@ -1,97 +0,0 @@
package integrations
import (
"context"
"fmt"
"strings"
"time"
config "openreplay/backend/internal/config/integrations"
"openreplay/backend/pkg/intervals"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/queue/types"
"openreplay/backend/pkg/token"
)
type Listener struct {
log logger.Logger
cfg *config.Config
storage Storage
producer types.Producer
manager *Manager
tokenizer *token.Tokenizer
Errors chan error
}
func New(log logger.Logger, cfg *config.Config, storage Storage, producer types.Producer, manager *Manager, tokenizer *token.Tokenizer) (*Listener, error) {
listener := &Listener{
log: log,
cfg: cfg,
storage: storage,
Errors: make(chan error),
producer: producer,
manager: manager,
tokenizer: tokenizer,
}
ints, err := storage.GetAll()
if err != nil {
return nil, err
}
for _, i := range ints {
// Add new integration to manager
if err = manager.Update(i); err != nil {
log.Error(context.Background(), "integration parse error: %v | integration: %v", err, *i)
}
}
manager.RequestAll()
go listener.worker()
return listener, nil
}
func (l *Listener) worker() {
clientsCheckTick := time.Tick(intervals.INTEGRATIONS_REQUEST_INTERVAL * time.Millisecond)
for {
select {
case <-clientsCheckTick:
l.manager.RequestAll()
case event := <-l.manager.Events:
l.log.Info(context.Background(), "new integration event: %+v", *event.IntegrationEvent)
sessionID := event.SessionID
if sessionID == 0 {
sessData, err := l.tokenizer.Parse(event.Token)
if err != nil && err != token.EXPIRED {
l.log.Error(context.Background(), "error on token parsing: %v; token: %v", err, event.Token)
continue
}
sessionID = sessData.ID
}
// Why do we produce integration events to analytics topic
l.producer.Produce(l.cfg.TopicAnalytics, sessionID, event.IntegrationEvent.Encode())
case err := <-l.manager.Errors:
l.log.Error(context.Background(), "integration error: %v", err)
case i := <-l.manager.RequestDataUpdates:
if err := l.storage.Update(&i); err != nil {
l.log.Error(context.Background(), "Postgres update request_data error: %v", err)
}
default:
newNotification, err := l.storage.CheckNew()
if err != nil {
if strings.Contains(err.Error(), "context deadline exceeded") {
continue
}
l.Errors <- fmt.Errorf("Integration storage error: %v", err)
continue
}
l.log.Info(context.Background(), "integration update: %v", *newNotification)
err = l.manager.Update(newNotification)
if err != nil {
l.log.Error(context.Background(), "integration parse error: %v | integration: %v", err, *newNotification)
}
}
}
}
func (l *Listener) Close() error {
return l.storage.UnListen()
}

View file

@ -1,51 +0,0 @@
package integrations
import (
"context"
"openreplay/backend/pkg/integrations/clients"
"openreplay/backend/pkg/integrations/model"
"openreplay/backend/pkg/logger"
)
type Manager struct {
log logger.Logger
clientMap clients.ClientMap
Events chan *clients.SessionErrorEvent
Errors chan error
RequestDataUpdates chan model.Integration // not pointer because it could change in other thread
}
func NewManager(log logger.Logger) *Manager {
return &Manager{
log: log,
clientMap: make(clients.ClientMap),
RequestDataUpdates: make(chan model.Integration, 100),
Events: make(chan *clients.SessionErrorEvent, 100),
Errors: make(chan error, 100),
}
}
func (m *Manager) Update(i *model.Integration) (err error) {
m.log.Info(context.Background(), "Integration initialization: %v\n", *i)
key := i.GetKey()
if i.Options == nil {
delete(m.clientMap, key)
return nil
}
c, exists := m.clientMap[key]
if !exists {
c, err = clients.NewClient(i, m.RequestDataUpdates, m.Events, m.Errors)
if err != nil {
return err
}
m.clientMap[key] = c
}
return c.Update(i)
}
func (m *Manager) RequestAll() {
m.log.Info(context.Background(), "Requesting all...")
for _, c := range m.clientMap {
go c.Request()
}
}

View file

@ -1,38 +0,0 @@
package model
import (
"encoding/json"
"fmt"
"time"
)
type Integration struct {
ProjectID uint32 `json:"project_id"`
Provider string `json:"provider"`
RequestData json.RawMessage `json:"request_data"`
Options json.RawMessage `json:"options"`
}
func (i *Integration) Encode() []byte {
b, _ := json.Marshal(i)
return b
}
func (i *Integration) Decode(data []byte) error {
return json.Unmarshal(data, i)
}
func (i *Integration) GetKey() string {
return fmt.Sprintf("%d%s", i.ProjectID, i.Provider)
}
func (i *Integration) GetRequestInfo() (*RequestInfo, error) {
ri := new(RequestInfo)
if err := json.Unmarshal(i.RequestData, ri); err != nil {
return nil, err
}
if ri.LastMessageTimestamp == 0 {
ri.LastMessageTimestamp = uint64(time.Now().Add(-time.Hour * 24).UnixMilli())
}
return ri, nil
}

View file

@ -1,58 +0,0 @@
package model
import (
"encoding/json"
"time"
)
const MAX_ATTEMPTS_IN_A_ROW = 4
const MAX_ATTEMPTS = 40
const ATTEMPTS_INTERVAL = 3 * 60 * 60 * 1000
type RequestInfo struct {
LastMessageId string
LastMessageTimestamp uint64
LastAttemptTimestamp int64
UnsuccessfullAttemptsCount int
}
func (c *RequestInfo) SetLastMessageTimestamp(timestamp uint64) {
if timestamp > c.LastMessageTimestamp {
c.LastMessageTimestamp = timestamp
}
}
func (c *RequestInfo) GetLastMessageTimestamp() uint64 {
return c.LastMessageTimestamp
}
func (c *RequestInfo) SetLastMessageId(timestamp uint64, id string) {
c.LastMessageId = id
c.LastMessageTimestamp = timestamp
}
func (c *RequestInfo) GetLastMessageId() string {
return c.LastMessageId
}
func (c *RequestInfo) CanAttempt() bool {
if c.UnsuccessfullAttemptsCount >= MAX_ATTEMPTS ||
(c.UnsuccessfullAttemptsCount >= MAX_ATTEMPTS_IN_A_ROW &&
time.Now().UnixMilli()-c.LastAttemptTimestamp < ATTEMPTS_INTERVAL) {
return false
}
return true
}
func (c *RequestInfo) UpdateLastAttempt() {
c.LastAttemptTimestamp = time.Now().UnixMilli()
}
func (c *RequestInfo) Inc() {
c.UnsuccessfullAttemptsCount++
}
func (c *RequestInfo) Reset() {
c.UnsuccessfullAttemptsCount = 0
}
func (c *RequestInfo) Encode() ([]byte, error) {
return json.Marshal(c)
}

View file

@ -0,0 +1,170 @@
package data_integration
import (
"bytes"
"fmt"
"io"
"net/http"
"time"
"github.com/docker/distribution/context"
"github.com/gorilla/mux"
integration "openreplay/backend/internal/config/integrations"
"openreplay/backend/internal/http/util"
"openreplay/backend/pkg/logger"
limiter "openreplay/backend/pkg/spot/api"
"openreplay/backend/pkg/spot/auth"
)
type Router struct {
log logger.Logger
cfg *integration.Config
router *mux.Router
services *ServiceBuilder
limiter *limiter.UserRateLimiter
}
func NewRouter(cfg *integration.Config, log logger.Logger, services *ServiceBuilder) (*Router, error) {
switch {
case cfg == nil:
return nil, fmt.Errorf("config is empty")
case services == nil:
return nil, fmt.Errorf("services is empty")
case log == nil:
return nil, fmt.Errorf("logger is empty")
}
e := &Router{
log: log,
cfg: cfg,
services: services,
limiter: limiter.NewUserRateLimiter(10, 30, 1*time.Minute, 5*time.Minute),
}
e.init()
return e, nil
}
func (e *Router) init() {
e.router = mux.NewRouter()
// Root route
e.router.HandleFunc("/", e.ping)
e.router.HandleFunc("/v1/integrations/{name}/{project}", e.createIntegration).Methods("POST", "OPTIONS")
e.router.HandleFunc("/v1/integrations/{name}/{project}", e.getIntegration).Methods("GET", "OPTIONS")
e.router.HandleFunc("/v1/integrations/{name}/{project}", e.updateIntegration).Methods("PATCH", "OPTIONS")
e.router.HandleFunc("/v1/integrations/{name}/{project}", e.deleteIntegration).Methods("DELETE", "OPTIONS")
e.router.HandleFunc("/v1/integrations/{name}/{project}/data/{session}", e.getIntegrationData).Methods("GET", "OPTIONS")
// CORS middleware
e.router.Use(e.corsMiddleware)
e.router.Use(e.authMiddleware)
e.router.Use(e.rateLimitMiddleware)
e.router.Use(e.actionMiddleware)
}
func (e *Router) ping(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
func (e *Router) corsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
next.ServeHTTP(w, r)
}
if e.cfg.UseAccessControlHeaders {
// Prepare headers for preflight requests
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST,GET,PATCH,DELETE")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization,Content-Encoding")
}
if r.Method == http.MethodOptions {
w.Header().Set("Cache-Control", "max-age=86400")
w.WriteHeader(http.StatusOK)
return
}
r = r.WithContext(context.WithValues(r.Context(), map[string]interface{}{"httpMethod": r.Method, "url": util.SafeString(r.URL.Path)}))
next.ServeHTTP(w, r)
})
}
func (e *Router) authMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
next.ServeHTTP(w, r)
}
// Check if the request is authorized
user, err := e.services.Auth.IsAuthorized(r.Header.Get("Authorization"), nil, false)
if err != nil {
e.log.Warn(r.Context(), "Unauthorized request: %s", err)
w.WriteHeader(http.StatusUnauthorized)
return
}
r = r.WithContext(context.WithValues(r.Context(), map[string]interface{}{"userData": user}))
next.ServeHTTP(w, r)
})
}
func (e *Router) rateLimitMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
next.ServeHTTP(w, r)
}
user := r.Context().Value("userData").(*auth.User)
rl := e.limiter.GetRateLimiter(user.ID)
if !rl.Allow() {
http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
return
}
next.ServeHTTP(w, r)
})
}
type statusWriter struct {
http.ResponseWriter
statusCode int
}
func (w *statusWriter) WriteHeader(statusCode int) {
w.statusCode = statusCode
w.ResponseWriter.WriteHeader(statusCode)
}
func (w *statusWriter) Write(b []byte) (int, error) {
if w.statusCode == 0 {
w.statusCode = http.StatusOK
}
return w.ResponseWriter.Write(b)
}
func (e *Router) actionMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
next.ServeHTTP(w, r)
}
// Read body and restore the io.ReadCloser to its original state
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "can't read body", http.StatusBadRequest)
return
}
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
// Use custom response writer to get the status code
sw := &statusWriter{ResponseWriter: w}
// Serve the request
next.ServeHTTP(sw, r)
e.logRequest(r, bodyBytes, sw.statusCode)
})
}
func (e *Router) logRequest(r *http.Request, bodyBytes []byte, statusCode int) {
e.log.Info(r.Context(), "Request: %s %s %s %d", r.Method, r.URL.Path, bodyBytes, statusCode)
}
func (e *Router) GetHandler() http.Handler {
return e.router
}

View file

@ -0,0 +1,214 @@
package data_integration
import (
"bytes"
"context"
"errors"
"fmt"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/integrations/clients"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/objectstorage"
)
var PROVIDERS = []string{"datadog", "sentry", "elasticsearch", "dynatrace"}
func isValidProviderName(provider string) bool {
for _, p := range PROVIDERS {
if p == provider {
return true
}
}
return false
}
// Service is the interface that provides methods for backend logs integrations (DataDog, etc).
type Service interface {
AddIntegration(projectID uint64, provider string, data interface{}) error
UpdateIntegration(projectID uint64, provider string, data interface{}) error
GetIntegration(projectID uint64, provider string) (interface{}, error)
DeleteIntegration(projectID uint64, provider string) error
GetSessionDataURL(projectID uint64, provider string, sessionID uint64) (string, error)
}
type serviceImpl struct {
log logger.Logger
conn pool.Pool
storage objectstorage.ObjectStorage
}
func (s *serviceImpl) AddIntegration(projectID uint64, provider string, data interface{}) error {
switch {
case projectID == 0:
return errors.New("project_id is empty")
case provider == "":
return errors.New("provider is empty")
case !isValidProviderName(provider):
return errors.New("invalid provider name")
case data == nil:
return errors.New("data is empty")
}
sql := `INSERT INTO public.integrations (project_id, provider, options) VALUES ($1, $2, $3)`
if err := s.conn.Exec(sql, projectID, provider, data); err != nil {
return fmt.Errorf("failed to add integration: %v", err)
}
return nil
}
func (s *serviceImpl) GetIntegration(projectID uint64, provider string) (interface{}, error) {
switch {
case projectID == 0:
return nil, errors.New("project_id is empty")
case provider == "":
return nil, errors.New("provider is empty")
case !isValidProviderName(provider):
return nil, errors.New("invalid provider name")
}
sql := `SELECT options FROM public.integrations WHERE project_id = $1 AND provider = $2`
var options interface{}
if err := s.conn.QueryRow(sql, projectID, provider).Scan(&options); err != nil {
return nil, fmt.Errorf("failed to get integration: %v", err)
}
return options, nil
}
func (s *serviceImpl) UpdateIntegration(projectID uint64, provider string, data interface{}) error {
switch {
case projectID == 0:
return errors.New("project_id is empty")
case provider == "":
return errors.New("provider is empty")
case !isValidProviderName(provider):
return errors.New("invalid provider name")
case data == nil:
return errors.New("data is empty")
}
sql := `UPDATE public.integrations SET options = $1 WHERE project_id = $2 AND provider = $3`
if err := s.conn.Exec(sql, data, projectID, provider); err != nil {
return fmt.Errorf("failed to update integration: %v", err)
}
return nil
}
func (s *serviceImpl) DeleteIntegration(projectID uint64, provider string) error {
switch {
case projectID == 0:
return errors.New("project_id is empty")
case provider == "":
return errors.New("provider is empty")
case !isValidProviderName(provider):
return errors.New("invalid provider name")
}
sql := `DELETE FROM public.integrations WHERE project_id = $1 AND provider = $2`
if err := s.conn.Exec(sql, projectID, provider); err != nil {
return fmt.Errorf("failed to delete integration: %v", err)
}
return nil
}
func (s *serviceImpl) GetSessionDataURL(projectID uint64, provider string, sessionID uint64) (string, error) {
switch {
case projectID == 0:
return "", errors.New("project_id is empty")
case provider == "":
return "", errors.New("provider is empty")
case !isValidProviderName(provider):
return "", errors.New("invalid provider name")
case sessionID == 0:
return "", errors.New("session_id is empty")
}
if s.hasSessionData(projectID, provider, sessionID) {
return s.generateSessionDataURL(provider, sessionID)
}
creds, err := s.getProviderCredentials(projectID, provider)
if err != nil {
return "", fmt.Errorf("failed to get provider credentials: %v", err)
}
data, err := s.fetchSessionData(provider, creds, sessionID)
if err != nil {
return "", fmt.Errorf("failed to fetch session data: %v", err)
}
if err := s.uploadSessionData(provider, sessionID, data); err != nil {
return "", fmt.Errorf("failed to upload session data to s3: %v", err)
}
if err := s.markSessionData(projectID, provider, sessionID); err != nil {
s.log.Warn(context.Background(), "failed to mark session data: %v", err)
}
return s.generateSessionDataURL(provider, sessionID)
}
func (s *serviceImpl) hasSessionData(projectID uint64, provider string, sessionID uint64) bool {
sql := `SELECT EXISTS(SELECT 1 FROM session_integrations WHERE project_id = $1 AND provider = $2 AND session_id = $3)`
val := false
if err := s.conn.QueryRow(sql, projectID, provider, sessionID).Scan(&val); err != nil {
s.log.Error(context.Background(), "failed to check session data existence: %v", err)
return false
}
return val
}
func (s *serviceImpl) getProviderCredentials(projectID uint64, provider string) (interface{}, error) {
sql := `SELECT options FROM public.integrations WHERE project_id = $1 AND provider = $2`
var credentials interface{}
if err := s.conn.QueryRow(sql, projectID, provider).Scan(&credentials); err != nil {
return nil, fmt.Errorf("failed to get provider credentials: %v", err)
}
return credentials, nil
}
func (s *serviceImpl) fetchSessionData(provider string, credentials interface{}, sessionID uint64) (interface{}, error) {
var newClient clients.Client
switch provider {
case "datadog":
newClient = clients.NewDataDogClient()
case "sentry":
newClient = clients.NewSentryClient()
case "elasticsearch":
newClient = clients.NewElasticClient()
case "dynatrace":
newClient = clients.NewDynatraceClient()
default:
return nil, fmt.Errorf("unknown provider: %s", provider)
}
return newClient.FetchSessionData(credentials, sessionID)
}
func (s *serviceImpl) uploadSessionData(provider string, sessionID uint64, data interface{}) error {
key := fmt.Sprintf("%d/%s.logs", sessionID, provider)
dataBytes, _ := data.([]byte)
return s.storage.Upload(bytes.NewReader(dataBytes), key, "text/plain", objectstorage.NoCompression)
}
func (s *serviceImpl) markSessionData(projectID uint64, provider string, sessionID uint64) error {
sql := `INSERT INTO session_integrations (project_id, provider, session_id) VALUES ($1, $2, $3)`
if err := s.conn.Exec(sql, projectID, provider, sessionID); err != nil {
return fmt.Errorf("failed to mark session data: %v", err)
}
return nil
}
func (s *serviceImpl) generateSessionDataURL(provider string, sessionID uint64) (string, error) {
key := fmt.Sprintf("%d/%s.logs", sessionID, provider)
dataURL, err := s.storage.GetPreSignedDownloadUrl(key)
if err != nil {
return "", fmt.Errorf("failed to generate session data URL: %v", err)
}
return dataURL, nil
}
func NewService(log logger.Logger, pgConn pool.Pool, objStorage objectstorage.ObjectStorage) (Service, error) {
switch {
case log == nil:
return nil, errors.New("logger is empty")
case pgConn == nil:
return nil, errors.New("postgres connection is empty")
case objStorage == nil:
return nil, errors.New("object storage is empty")
}
return &serviceImpl{
log: log,
conn: pgConn,
storage: objStorage,
}, nil
}

View file

@ -1,95 +0,0 @@
package integrations
import (
"context"
"encoding/json"
"fmt"
"openreplay/backend/pkg/logger"
"openreplay/backend/pkg/integrations/model"
"time"
"github.com/jackc/pgx/v4"
)
type Storage interface {
Listen() error
UnListen() error
CheckNew() (*model.Integration, error)
GetAll() ([]*model.Integration, error)
Update(i *model.Integration) error
}
type storageImpl struct {
conn *pgx.Conn
log logger.Logger
}
func NewStorage(conn *pgx.Conn, log logger.Logger) Storage {
return &storageImpl{
conn: conn,
log: log,
}
}
func (s *storageImpl) Listen() error {
_, err := s.conn.Exec(context.Background(), "LISTEN integration")
return err
}
func (s *storageImpl) UnListen() error {
_, err := s.conn.Exec(context.Background(), "UNLISTEN integration")
return err
}
func (s *storageImpl) CheckNew() (*model.Integration, error) {
ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond)
notification, err := s.conn.WaitForNotification(ctx)
if err != nil {
return nil, err
}
if notification.Channel == "integration" {
integrationP := new(model.Integration)
if err := json.Unmarshal([]byte(notification.Payload), integrationP); err != nil {
return nil, err
}
return integrationP, nil
}
return nil, fmt.Errorf("unknown notification channel: %s", notification.Channel)
}
func (s *storageImpl) GetAll() ([]*model.Integration, error) {
rows, err := s.conn.Query(context.Background(), `
SELECT project_id, provider, options, request_data
FROM integrations
`)
if err != nil {
return nil, err
}
defer rows.Close()
integrations := make([]*model.Integration, 0)
for rows.Next() {
i := new(model.Integration)
if err := rows.Scan(&i.ProjectID, &i.Provider, &i.Options, &i.RequestData); err != nil {
s.log.Error(context.Background(), "postgres scan error: %v", err)
continue
}
integrations = append(integrations, i)
}
if err = rows.Err(); err != nil {
return nil, err
}
return integrations, nil
}
func (s *storageImpl) Update(i *model.Integration) error {
_, err := s.conn.Exec(context.Background(), `
UPDATE integrations
SET request_data = $1
WHERE project_id=$2 AND provider=$3`,
i.RequestData, i.ProjectID, i.Provider,
)
return err
}

154
backend/pkg/logger/extra.go Normal file
View file

@ -0,0 +1,154 @@
package logger
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/getsentry/sentry-go"
"net/http"
"openreplay/backend/pkg/env"
"time"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
)
type extraLogger struct {
hasSentry bool
hasElastic bool
dataDogAPIKey string
elasticLogger *elasticsearch.Client
}
type ExtraLogger interface {
Log(ctx context.Context, log string)
}
func NewExtraLogger() ExtraLogger {
// Init sentry
hasSentry := true
SENTRY_DSN := env.String("SENTRY_DSN")
err := sentry.Init(sentry.ClientOptions{
Dsn: SENTRY_DSN,
TracesSampleRate: 1.0,
})
if err != nil {
fmt.Printf("sentry.Init: %s", err)
hasSentry = false
}
// Init elasticsearch
ELASTIC_HOST := env.String("ELASTIC_HOST")
ELASTIC_API_KEY := env.String("ELASTIC_API_KEY")
hasElastic := true
es, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{ELASTIC_HOST},
APIKey: ELASTIC_API_KEY,
})
if err != nil {
fmt.Printf("Error creating the ES client: %s", err)
hasElastic = false
}
// Init
DATADOG_API_KEY := env.String("DATADOG_API_KEY")
if DATADOG_API_KEY == "" {
fmt.Printf("DATADOG_API_KEY is empty")
}
return &extraLogger{
hasSentry: hasSentry,
hasElastic: hasElastic,
elasticLogger: es,
dataDogAPIKey: DATADOG_API_KEY,
}
}
// LogMessage defines the structure of your log message
type LogMessage struct {
Timestamp time.Time `json:"@timestamp"`
Message string `json:"message"`
Level string `json:"level"`
}
func sendLog(es *elasticsearch.Client, logMessage LogMessage) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(logMessage); err != nil {
fmt.Printf("Error encoding log message: %s", err)
return
}
req := esapi.IndexRequest{
Index: "logs",
DocumentID: "",
Body: &buf,
Refresh: "true",
}
res, err := req.Do(context.Background(), es)
if err != nil {
fmt.Printf("Error sending log to Elasticsearch: %s", err)
return
}
defer res.Body.Close()
// Check the response status
if res.IsError() {
fmt.Printf("Error response from Elasticsearch: %s", res.String())
} else {
fmt.Printf("Log successfully sent to Elasticsearch.")
}
}
func (el *extraLogger) Log(ctx context.Context, msg string) {
if sID, ok := ctx.Value("sessionID").(string); ok {
msg = fmt.Sprintf("%s openReplaySession.id=%s", msg, sID)
}
if el.hasSentry {
sentry.CaptureMessage(msg)
}
if el.hasElastic {
esMsg := LogMessage{
Timestamp: time.Now(),
Message: msg,
Level: "INFO",
}
sendLog(el.elasticLogger, esMsg)
}
if el.dataDogAPIKey != "" {
url := "https://http-intake.logs.datadoghq.com/v1/input"
logMessage := `{
"message": "` + msg + `",
"ddsource": "go",
"service": "myservice",
"hostname": "myhost",
"ddtags": "env:development"
}`
req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(logMessage)))
if err != nil {
fmt.Println("Failed to create request:", err)
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("DD-API-KEY", el.dataDogAPIKey)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Println("Failed to send log to DataDog:", err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
fmt.Println("Failed to send log to DataDog, status code:", resp.StatusCode)
} else {
fmt.Println("Log sent to DataDog successfully!")
}
}
}

View file

@ -17,7 +17,9 @@ type Logger interface {
}
type loggerImpl struct {
l *zap.Logger
l *zap.Logger
useExtra bool
extra ExtraLogger
}
func New() Logger {
@ -27,7 +29,14 @@ func New() Logger {
core := zapcore.NewCore(jsonEncoder, zapcore.AddSync(os.Stdout), zap.InfoLevel)
baseLogger := zap.New(core, zap.AddCaller())
logger := baseLogger.WithOptions(zap.AddCallerSkip(1))
return &loggerImpl{l: logger}
customLogger := &loggerImpl{l: logger}
// Use it only for debugging purposes
if doExtra := os.Getenv("ENABLE_EXTRA_LOGS"); doExtra == "true" {
customLogger.extra = NewExtraLogger()
customLogger.useExtra = true
}
return customLogger
}
func (l *loggerImpl) prepare(ctx context.Context, logger *zap.Logger) *zap.Logger {
@ -53,21 +62,41 @@ func (l *loggerImpl) prepare(ctx context.Context, logger *zap.Logger) *zap.Logge
}
func (l *loggerImpl) Debug(ctx context.Context, message string, args ...interface{}) {
l.prepare(ctx, l.l.With(zap.String("level", "debug"))).Debug(fmt.Sprintf(message, args...))
logStr := fmt.Sprintf(message, args...)
l.prepare(ctx, l.l.With(zap.String("level", "debug"))).Debug(logStr)
if l.useExtra {
l.extra.Log(ctx, logStr)
}
}
func (l *loggerImpl) Info(ctx context.Context, message string, args ...interface{}) {
l.prepare(ctx, l.l.With(zap.String("level", "info"))).Info(fmt.Sprintf(message, args...))
logStr := fmt.Sprintf(message, args...)
l.prepare(ctx, l.l.With(zap.String("level", "info"))).Info(logStr)
if l.useExtra {
l.extra.Log(ctx, logStr)
}
}
func (l *loggerImpl) Warn(ctx context.Context, message string, args ...interface{}) {
l.prepare(ctx, l.l.With(zap.String("level", "warn"))).Warn(fmt.Sprintf(message, args...))
logStr := fmt.Sprintf(message, args...)
l.prepare(ctx, l.l.With(zap.String("level", "warn"))).Warn(logStr)
if l.useExtra {
l.extra.Log(ctx, logStr)
}
}
func (l *loggerImpl) Error(ctx context.Context, message string, args ...interface{}) {
l.prepare(ctx, l.l.With(zap.String("level", "error"))).Error(fmt.Sprintf(message, args...))
logStr := fmt.Sprintf(message, args...)
l.prepare(ctx, l.l.With(zap.String("level", "error"))).Error(logStr)
if l.useExtra {
l.extra.Log(ctx, logStr)
}
}
func (l *loggerImpl) Fatal(ctx context.Context, message string, args ...interface{}) {
l.prepare(ctx, l.l.With(zap.String("level", "fatal"))).Fatal(fmt.Sprintf(message, args...))
logStr := fmt.Sprintf(message, args...)
l.prepare(ctx, l.l.With(zap.String("level", "fatal"))).Fatal(logStr)
if l.useExtra {
l.extra.Log(ctx, logStr)
}
}

View file

@ -1,6 +1,10 @@
package heuristics
import "github.com/prometheus/client_golang/prometheus"
import (
"github.com/prometheus/client_golang/prometheus"
"openreplay/backend/pkg/metrics/common"
"strconv"
)
var heuristicsTotalEvents = prometheus.NewCounterVec(
prometheus.CounterOpts{
@ -15,6 +19,46 @@ func IncreaseTotalEvents(eventType string) {
heuristicsTotalEvents.WithLabelValues(eventType).Inc()
}
var heuristicsRequestSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "heuristics",
Name: "request_size_bytes",
Help: "A histogram displaying the size of each HTTP request in bytes.",
Buckets: common.DefaultSizeBuckets,
},
[]string{"url", "response_code"},
)
func RecordRequestSize(size float64, url string, code int) {
heuristicsRequestSize.WithLabelValues(url, strconv.Itoa(code)).Observe(size)
}
var heuristicsRequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "heuristics",
Name: "request_duration_seconds",
Help: "A histogram displaying the duration of each HTTP request in seconds.",
Buckets: common.DefaultDurationBuckets,
},
[]string{"url", "response_code"},
)
func RecordRequestDuration(durMillis float64, url string, code int) {
heuristicsRequestDuration.WithLabelValues(url, strconv.Itoa(code)).Observe(durMillis / 1000.0)
}
var heuristicsTotalRequests = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "heuristics",
Name: "requests_total",
Help: "A counter displaying the number all HTTP requests.",
},
)
func IncreaseTotalRequests() {
heuristicsTotalRequests.Inc()
}
func List() []prometheus.Collector {
return []prometheus.Collector{
heuristicsTotalEvents,

View file

@ -1,62 +1,36 @@
{{- if .Values.ingress.enabled -}}
{{- if .Values.ingress.enabled }}
{{- $fullName := include "integrations.fullname" . -}}
{{- $svcPort := .Values.service.ports.http -}}
{{- if and .Values.ingress.className (not (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion)) }}
{{- if not (hasKey .Values.ingress.annotations "kubernetes.io/ingress.class") }}
{{- $_ := set .Values.ingress.annotations "kubernetes.io/ingress.class" .Values.ingress.className}}
{{- end }}
{{- end }}
{{- if semverCompare ">=1.19-0" .Capabilities.KubeVersion.GitVersion -}}
apiVersion: networking.k8s.io/v1
{{- else if semverCompare ">=1.14-0" .Capabilities.KubeVersion.GitVersion -}}
apiVersion: networking.k8s.io/v1beta1
{{- else -}}
apiVersion: extensions/v1beta1
{{- end }}
kind: Ingress
metadata:
name: {{ $fullName }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "integrations.labels" . | nindent 4 }}
{{- with .Values.ingress.annotations }}
annotations:
{{- with .Values.ingress.annotations }}
{{- toYaml . | nindent 4 }}
{{- end }}
nginx.ingress.kubernetes.io/rewrite-target: /$1
nginx.ingress.kubernetes.io/upstream-hash-by: $http_x_forwarded_for
spec:
{{- if and .Values.ingress.className (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion) }}
ingressClassName: {{ .Values.ingress.className }}
{{- end }}
{{- if .Values.ingress.tls }}
ingressClassName: "{{ tpl .Values.ingress.className . }}"
tls:
{{- range .Values.ingress.tls }}
- hosts:
{{- range .hosts }}
- {{ . | quote }}
{{- end }}
secretName: {{ .secretName }}
{{- end }}
{{- end }}
- {{ .Values.global.domainName }}
{{- if .Values.ingress.tls.secretName}}
secretName: {{ .Values.ingress.tls.secretName }}
{{- end}}
rules:
{{- range .Values.ingress.hosts }}
- host: {{ .host | quote }}
- host: {{ .Values.global.domainName }}
http:
paths:
{{- range .paths }}
- path: {{ .path }}
{{- if and .pathType (semverCompare ">=1.18-0" $.Capabilities.KubeVersion.GitVersion) }}
pathType: {{ .pathType }}
{{- end }}
- pathType: Prefix
backend:
{{- if semverCompare ">=1.19-0" $.Capabilities.KubeVersion.GitVersion }}
service:
name: {{ $fullName }}
port:
number: {{ $svcPort }}
{{- else }}
serviceName: {{ $fullName }}
servicePort: {{ $svcPort }}
{{- end }}
{{- end }}
{{- end }}
path: /integrations/(.*)
{{- end }}

View file

@ -47,7 +47,7 @@ podSecurityContext:
service:
type: ClusterIP
ports:
http: 9000
http: 8080
metrics: 8888
serviceMonitor:
@ -63,20 +63,20 @@ serviceMonitor:
scrapeTimeout: 10s
ingress:
enabled: false
className: ""
annotations: {}
# kubernetes.io/ingress.class: nginx
# kubernetes.io/tls-acme: "true"
hosts:
- host: chart-example.local
paths:
- path: /
pathType: ImplementationSpecific
tls: []
# - secretName: chart-example-tls
# hosts:
# - chart-example.local
enabled: true
className: "{{ .Values.global.ingress.controller.ingressClassResource.name }}"
annotations:
cert-manager.io/cluster-issuer: "letsencrypt-prod"
nginx.ingress.kubernetes.io/proxy-connect-timeout: "120"
nginx.ingress.kubernetes.io/proxy-send-timeout: "300"
nginx.ingress.kubernetes.io/proxy-read-timeout: "300"
nginx.ingress.kubernetes.io/cors-allow-methods: POST,PATCH,DELETE
nginx.ingress.kubernetes.io/cors-allow-headers: Content-Type,Authorization,Content-Encoding,X-Openreplay-Batch
nginx.ingress.kubernetes.io/cors-allow-origin: '*'
nginx.ingress.kubernetes.io/enable-cors: "true"
nginx.ingress.kubernetes.io/cors-expose-headers: "Content-Length"
tls:
secretName: openreplay-ssl
resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious