Connectors improvements (#1469)

This commit is contained in:
Alexander 2023-09-12 13:12:23 +02:00 committed by GitHub
parent c415b26bc5
commit 5eaed7e351
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 1892 additions and 128 deletions

View file

@ -1,6 +1,4 @@
FROM golang:1.19-alpine3.17 AS prepare
RUN apk add --no-cache git openssh openssl-dev pkgconf gcc g++ make libc-dev bash librdkafka-dev cyrus-sasl cyrus-sasl-gssapiv2 krb5
FROM openreplayalexander/golang_with_librdkafka:v1-release AS prepare
WORKDIR /root
@ -8,7 +6,6 @@ COPY go.mod .
COPY go.sum .
RUN go mod download
FROM prepare AS build
COPY cmd cmd
COPY pkg pkg
@ -18,11 +15,10 @@ ARG SERVICE_NAME
RUN CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o service -tags dynamic openreplay/backend/cmd/$SERVICE_NAME
FROM alpine AS entrypoint
FROM openreplayalexander/alpine_with_librdkafka:v1-release AS entrypoint
ARG GIT_SHA
LABEL GIT_SHA=$GIT_SHA
RUN apk add --no-cache ca-certificates librdkafka-dev cyrus-sasl cyrus-sasl-gssapiv2 krb5
RUN adduser -u 1001 openreplay -D
ARG SERVICE_NAME

View file

@ -3,22 +3,22 @@ module openreplay/backend
go 1.18
require (
cloud.google.com/go/logging v1.6.1
cloud.google.com/go/logging v1.7.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0
github.com/ClickHouse/clickhouse-go/v2 v2.2.0
github.com/Masterminds/semver v1.5.0
github.com/andybalholm/brotli v1.0.5
github.com/aws/aws-sdk-go v1.44.98
github.com/btcsuite/btcutil v1.0.2
github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/confluentinc/confluent-kafka-go/v2 v2.2.0
github.com/elastic/go-elasticsearch/v7 v7.13.1
github.com/go-redis/redis v6.15.9+incompatible
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/jackc/pgconn v1.6.0
github.com/jackc/pgconn v1.9.1-0.20210724152538-d89c8390a530
github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451
github.com/jackc/pgtype v1.3.0
github.com/jackc/pgx/v4 v4.6.0
github.com/jackc/pgtype v1.14.0
github.com/jackc/pgx/v5 v5.4.3
github.com/klauspost/compress v1.15.15
github.com/klauspost/pgzip v1.2.5
github.com/oschwald/maxminddb-golang v1.7.0
@ -27,54 +27,50 @@ require (
github.com/sethvargo/go-envconfig v0.7.0
github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce
github.com/ua-parser/uap-go v0.0.0-20200325213135-e1c09f13e2fe
golang.org/x/net v0.8.0
google.golang.org/api v0.103.0
golang.org/x/net v0.10.0
google.golang.org/api v0.114.0
)
require (
cloud.google.com/go v0.107.0 // indirect
cloud.google.com/go/compute v1.15.1 // indirect
cloud.google.com/go v0.110.0 // indirect
cloud.google.com/go/compute v1.19.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v0.8.0 // indirect
cloud.google.com/go/storage v1.14.0 // indirect
cloud.google.com/go/longrunning v0.3.0 // indirect
cloud.google.com/go/iam v0.13.0 // indirect
cloud.google.com/go/longrunning v0.4.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.4.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.7.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.0.2 // indirect
github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8 // indirect
github.com/jackc/puddle v1.2.2-0.20220404125616-4e959849469a // indirect
github.com/jackc/pgproto3/v2 v2.1.1 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/paulmach/orb v0.7.1 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/stretchr/testify v1.8.2 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/oauth2 v0.4.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/oauth2 v0.6.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
google.golang.org/grpc v1.53.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 // indirect
google.golang.org/grpc v1.54.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

File diff suppressed because it is too large Load diff

View file

@ -5,7 +5,7 @@ import (
"strings"
"time"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v5"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/metrics/database"

View file

@ -5,7 +5,7 @@ import (
"github.com/jackc/pgconn"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v5"
)
func IsPkeyViolation(err error) bool {

View file

@ -4,11 +4,12 @@ import (
"context"
"errors"
"fmt"
"log"
"strings"
"time"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"openreplay/backend/pkg/metrics/database"
)
@ -19,13 +20,21 @@ type Pool interface {
Exec(sql string, arguments ...interface{}) error
SendBatch(b *pgx.Batch) pgx.BatchResults
Begin() (*_Tx, error)
IsConnected() bool
Close()
}
type poolImpl struct {
url string
conn *pgxpool.Pool
}
func (p *poolImpl) IsConnected() bool {
stat := p.conn.Stat()
log.Println("stat: ", stat.AcquireCount(), stat.IdleConns(), stat.MaxConns(), stat.TotalConns())
return true
}
func (p *poolImpl) Query(sql string, args ...interface{}) (pgx.Rows, error) {
start := time.Now()
res, err := p.conn.Query(getTimeoutContext(), sql, args...)
@ -73,17 +82,32 @@ func (p *poolImpl) Close() {
p.conn.Close()
}
func (p *poolImpl) checker() {
for {
time.Sleep(time.Second * 5)
if p.conn != nil {
ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
if err := p.conn.Ping(ctx); err != nil {
log.Println("pgxpool.Ping error: ", err)
}
}
}
}
func New(url string) (Pool, error) {
if url == "" {
return nil, errors.New("pg connection url is empty")
}
conn, err := pgxpool.Connect(context.Background(), url)
conn, err := pgxpool.New(context.Background(), url)
if err != nil {
return nil, fmt.Errorf("pgxpool.Connect error: %v", err)
}
return &poolImpl{
res := &poolImpl{
url: url,
conn: conn,
}, nil
}
go res.checker()
return res, nil
}
// TX - start

View file

@ -4,9 +4,10 @@ import (
"context"
"encoding/json"
"fmt"
"openreplay/backend/pkg/db/postgres/pool"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v5"
"openreplay/backend/pkg/db/postgres/pool"
)
type Listener struct {

View file

@ -3,11 +3,11 @@ package sessions
import (
"fmt"
"log"
"openreplay/backend/pkg/db/postgres/pool"
"time"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v5"
"openreplay/backend/pkg/db/postgres/pool"
"openreplay/backend/pkg/metrics/database"
)

View file

@ -3,13 +3,14 @@ package kafka
import (
"log"
"os"
"strings"
"time"
"openreplay/backend/pkg/env"
"openreplay/backend/pkg/messages"
"openreplay/backend/pkg/queue/types"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/pkg/errors"
)
@ -41,6 +42,7 @@ func NewConsumer(
"go.application.rebalance.enable": true,
"max.poll.interval.ms": env.Int("KAFKA_MAX_POLL_INTERVAL_MS"),
"max.partition.fetch.bytes": messageSizeLimit,
"go.logs.channel.enable": true,
}
// Apply ssl configuration
if env.Bool("KAFKA_USE_SSL") {
@ -90,7 +92,17 @@ func NewConsumer(
if err := c.Subscribe(subREx, consumer.reBalanceCallback); err != nil {
log.Fatalln(err)
}
go func() {
for {
logMsg := <-consumer.c.Logs()
if logMsg.Tag == "MAXPOLL" && strings.Contains(logMsg.Message, "leaving group") {
// By some reason service logic took too much time and was kicked out from the group
log.Printf("Kafka consumer left the group, exiting...")
os.Exit(1)
}
log.Printf("Kafka consumer log: %s", logMsg.String())
}
}()
return consumer
}

View file

@ -5,7 +5,7 @@ import (
"log"
"os"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"openreplay/backend/pkg/env"
)