From 3904509f1868d9341b579ddd7cb81105f3ffe465 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 1 Sep 2022 17:45:28 +0200 Subject: [PATCH] ClickHouse bugs fix (#707) * fix(backend): fixed issues in CH connector --- backend/pkg/db/postgres/session-updates.go | 2 +- backend/pkg/messages/batch.go | 1 - backend/pkg/messages/raw.go | 3 +-- ee/backend/internal/db/datasaver/messages.go | 9 +++++++-- ee/backend/pkg/db/clickhouse/connector.go | 17 +++++++++++------ 5 files changed, 20 insertions(+), 12 deletions(-) diff --git a/backend/pkg/db/postgres/session-updates.go b/backend/pkg/db/postgres/session-updates.go index 14260c2c6..47e374355 100644 --- a/backend/pkg/db/postgres/session-updates.go +++ b/backend/pkg/db/postgres/session-updates.go @@ -1,7 +1,7 @@ package postgres // Mechanism of combination several session updates into one -const sessionUpdateReq = `UPDATE sessions SET (pages_count, events_count) = (pages_count + $1, events_count + $2) WHERE session_id = $3` +const sessionUpdateReq = `UPDATE sessions SET pages_count = pages_count + $1, events_count = events_count + $2 WHERE session_id = $3` type sessionUpdates struct { sessionID uint64 diff --git a/backend/pkg/messages/batch.go b/backend/pkg/messages/batch.go index f10052ea5..955d0cfc0 100644 --- a/backend/pkg/messages/batch.go +++ b/backend/pkg/messages/batch.go @@ -99,7 +99,6 @@ func (i *iteratorImpl) Next() bool { i.version = m.Version i.url = m.Url isBatchMeta = true - log.Printf("new batch version: %d", i.version) if i.version > 1 { log.Printf("incorrect batch version, skip current batch") return false diff --git a/backend/pkg/messages/raw.go b/backend/pkg/messages/raw.go index b121de8d9..daa59accd 100644 --- a/backend/pkg/messages/raw.go +++ b/backend/pkg/messages/raw.go @@ -26,12 +26,11 @@ func (m *RawMessage) Encode() []byte { m.data[0] = uint8(m.tp) m.encoded = true *m.skipped = false - n, err := io.ReadFull(m.reader, m.data[1:]) + _, err := io.ReadFull(m.reader, m.data[1:]) if err != nil { log.Printf("message encode err: %s", err) return nil } - log.Printf("encode: read %d of %d bytes", n, m.size) return m.data } diff --git a/ee/backend/internal/db/datasaver/messages.go b/ee/backend/internal/db/datasaver/messages.go index 63a70ffed..3187a0c91 100644 --- a/ee/backend/internal/db/datasaver/messages.go +++ b/ee/backend/internal/db/datasaver/messages.go @@ -52,8 +52,13 @@ func (mi *Saver) InsertMessage(sessionID uint64, msg messages.Message) error { if err != nil { log.Printf("can't get session info for CH: %s", err) } else { - if err := mi.ch.InsertRequest(session, m); err != nil { - log.Printf("can't insert request event into clickhouse: %s", err) + project, err := mi.pg.GetProject(session.ProjectID) + if err != nil { + log.Printf("can't get project: %s", err) + } else { + if err := mi.ch.InsertRequest(session, m, project.SaveRequestPayloads); err != nil { + log.Printf("can't insert request event into clickhouse: %s", err) + } } } return mi.pg.InsertWebFetchEvent(sessionID, m) diff --git a/ee/backend/pkg/db/clickhouse/connector.go b/ee/backend/pkg/db/clickhouse/connector.go index 904371cf4..0ad91120e 100644 --- a/ee/backend/pkg/db/clickhouse/connector.go +++ b/ee/backend/pkg/db/clickhouse/connector.go @@ -56,6 +56,7 @@ func (b *bulkImpl) Send() error { for _, set := range b.values { if err := batch.Append(set...); err != nil { log.Printf("can't append value set to batch, err: %s", err) + log.Printf("failed query: %s", b.query) } } b.values = make([][]interface{}, 0) @@ -77,7 +78,7 @@ type Connector interface { InsertWebErrorEvent(session *types.Session, msg *messages.ErrorEvent) error InsertWebPerformanceTrackAggr(session *types.Session, msg *messages.PerformanceTrackAggr) error InsertAutocomplete(session *types.Session, msgType, msgValue string) error - InsertRequest(session *types.Session, msg *messages.FetchEvent) error + InsertRequest(session *types.Session, msg *messages.FetchEvent, savePayload bool) error InsertCustom(session *types.Session, msg *messages.CustomEvent) error InsertGraphQL(session *types.Session, msg *messages.GraphQLEvent) error } @@ -180,7 +181,6 @@ func (c *connectorImpl) checkError(name string, err error) { } func (c *connectorImpl) InsertWebSession(session *types.Session) error { - log.Printf("insert session: %+v", session) if session.Duration == nil { return errors.New("trying to insert session with nil duration") } @@ -366,17 +366,22 @@ func (c *connectorImpl) InsertAutocomplete(session *types.Session, msgType, msgV return nil } -func (c *connectorImpl) InsertRequest(session *types.Session, msg *messages.FetchEvent) error { +func (c *connectorImpl) InsertRequest(session *types.Session, msg *messages.FetchEvent, savePayload bool) error { + var request, response *string + if savePayload { + request = &msg.Request + response = &msg.Response + } if err := c.batches["requests"].Append( session.SessionID, uint16(session.ProjectID), datetime(msg.Timestamp), msg.URL, - nullableString(msg.Request), - nullableString(msg.Response), + request, + response, uint16(msg.Status), url.EnsureMethod(msg.Method), - msg.Duration, + uint16(msg.Duration), msg.Status < 400, "REQUEST", ); err != nil {