feat(analytics): timeseries queries with filters and events
This commit is contained in:
parent
b0bf357be1
commit
5c0139b66c
6 changed files with 585 additions and 339 deletions
|
|
@ -32,7 +32,7 @@ func (s *chartsImpl) GetData(projectId int, userID uint64, req *MetricPayload) (
|
|||
return nil, fmt.Errorf("request is empty")
|
||||
}
|
||||
|
||||
payload := &Payload{
|
||||
payload := Payload{
|
||||
ProjectId: projectId,
|
||||
UserId: userID,
|
||||
MetricPayload: req,
|
||||
|
|
|
|||
|
|
@ -4,6 +4,6 @@ import "openreplay/backend/pkg/analytics/db"
|
|||
|
||||
type FunnelQueryBuilder struct{}
|
||||
|
||||
func (f FunnelQueryBuilder) Execute(p *Payload, conn db.Connector) (interface{}, error) {
|
||||
func (f FunnelQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) {
|
||||
return "-- Funnel query placeholder", nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,11 +8,11 @@ import (
|
|||
|
||||
type TableQueryBuilder struct{}
|
||||
|
||||
func (t TableQueryBuilder) Execute(p *Payload, conn db.Connector) (interface{}, error) {
|
||||
func (t TableQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) {
|
||||
return t.buildQuery(p)
|
||||
}
|
||||
|
||||
func (t TableQueryBuilder) buildQuery(r *Payload) (string, error) {
|
||||
func (t TableQueryBuilder) buildQuery(r Payload) (string, error) {
|
||||
s := r.Series[0]
|
||||
sessionFilters, eventFilters := partitionFilters(s.Filter.Filters)
|
||||
sessionWhere := buildSessionWhere(sessionFilters)
|
||||
|
|
@ -86,168 +86,3 @@ func (t TableQueryBuilder) buildQuery(r *Payload) (string, error) {
|
|||
|
||||
return final, nil
|
||||
}
|
||||
|
||||
func partitionFilters(filters []Filter) (sessionFilters []Filter, eventFilters []Filter) {
|
||||
for _, f := range filters {
|
||||
if f.IsEvent {
|
||||
eventFilters = append(eventFilters, f)
|
||||
} else {
|
||||
sessionFilters = append(sessionFilters, f)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func buildSessionWhere(filters []Filter) []string {
|
||||
var conds []string
|
||||
for _, f := range filters {
|
||||
switch f.Type {
|
||||
case FilterUserCountry:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCountry, concatValues(f.Value)))
|
||||
case FilterUserCity:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCity, concatValues(f.Value)))
|
||||
case FilterUserState:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserState, concatValues(f.Value)))
|
||||
case FilterUserId:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserID, concatValues(f.Value)))
|
||||
case FilterUserAnonymousId:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserAnonymousID, concatValues(f.Value)))
|
||||
case FilterUserOs:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserOS, concatValues(f.Value)))
|
||||
case FilterUserBrowser:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserBrowser, concatValues(f.Value)))
|
||||
case FilterUserDevice:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDevice, concatValues(f.Value)))
|
||||
case FilterPlatform:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDeviceType, concatValues(f.Value)))
|
||||
case FilterRevId:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColRevID, concatValues(f.Value)))
|
||||
case FilterReferrer:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColBaseReferrer, concatValues(f.Value)))
|
||||
case FilterDuration:
|
||||
if len(f.Value) == 2 {
|
||||
conds = append(conds, fmt.Sprintf("%s >= '%s'", ColDuration, f.Value[0]))
|
||||
conds = append(conds, fmt.Sprintf("%s <= '%s'", ColDuration, f.Value[1]))
|
||||
}
|
||||
case FilterUtmSource:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmSource, concatValues(f.Value)))
|
||||
case FilterUtmMedium:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmMedium, concatValues(f.Value)))
|
||||
case FilterUtmCampaign:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmCampaign, concatValues(f.Value)))
|
||||
case FilterMetadata:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColMetadata1, concatValues(f.Value)))
|
||||
}
|
||||
}
|
||||
// adding /n to each condition for better readability, can be removed.
|
||||
for i := range conds {
|
||||
conds[i] += "\n"
|
||||
}
|
||||
return conds
|
||||
}
|
||||
|
||||
func concatValues(v []string) string {
|
||||
return strings.Join(v, "")
|
||||
}
|
||||
|
||||
func buildEventsWhere(filters []Filter, order EventOrder) (eventConditions []string, having string) {
|
||||
basicEventTypes := "(" +
|
||||
strings.Join([]string{
|
||||
fmt.Sprintf("%s = 'CLICK'", ColEventName),
|
||||
fmt.Sprintf("%s = 'INPUT'", ColEventName),
|
||||
fmt.Sprintf("%s = 'LOCATION'", ColEventName),
|
||||
fmt.Sprintf("%s = 'CUSTOM'", ColEventName),
|
||||
fmt.Sprintf("%s = 'REQUEST'", ColEventName),
|
||||
}, " OR ") + ")"
|
||||
|
||||
var seq []string
|
||||
for _, f := range filters {
|
||||
switch f.Type {
|
||||
case FilterClick:
|
||||
seq = append(seq, seqCond("CLICK", "selector", f))
|
||||
case FilterInput:
|
||||
seq = append(seq, seqCond("INPUT", "label", f))
|
||||
case FilterLocation:
|
||||
seq = append(seq, seqCond("LOCATION", "url_path", f))
|
||||
case FilterCustom:
|
||||
seq = append(seq, seqCond("CUSTOM", "name", f))
|
||||
case FilterFetch:
|
||||
seq = append(seq, seqFetchCond("REQUEST", f))
|
||||
case FilterFetchStatusCode:
|
||||
seq = append(seq, seqCond("REQUEST", "status", f))
|
||||
default:
|
||||
seq = append(seq, fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(string(f.Type))))
|
||||
}
|
||||
}
|
||||
eventConditions = []string{basicEventTypes}
|
||||
|
||||
// then => sequenceMatch
|
||||
// or => OR
|
||||
// and => AND
|
||||
switch order {
|
||||
case EventOrderThen:
|
||||
var pattern []string
|
||||
for i := range seq {
|
||||
pattern = append(pattern, fmt.Sprintf("(?%d)", i+1))
|
||||
}
|
||||
having = fmt.Sprintf("sequenceMatch('%s')(\n%s,\n%s)",
|
||||
strings.Join(pattern, ""), fmt.Sprintf("toUnixTimestamp(%s)", ColEventTime), strings.Join(seq, ",\n"))
|
||||
case EventOrderAnd:
|
||||
// build AND
|
||||
having = strings.Join(seq, " AND ")
|
||||
default:
|
||||
// default => OR
|
||||
var orParts []string
|
||||
for _, p := range seq {
|
||||
orParts = append(orParts, "("+p+")")
|
||||
}
|
||||
having = strings.Join(orParts, " OR ")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func seqCond(eventName, key string, f Filter) string {
|
||||
op := parseOperator(f.Operator)
|
||||
return fmt.Sprintf("(%s = '%s' AND JSONExtractString(toString(%s), '%s') %s '%s')",
|
||||
ColEventName, strings.ToUpper(eventName), ColEventProperties, key, op, concatValues(f.Value))
|
||||
}
|
||||
|
||||
func seqFetchCond(eventName string, f Filter) string {
|
||||
w := []string{fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(eventName))}
|
||||
var extras []string
|
||||
for _, c := range f.Filters {
|
||||
switch c.Type {
|
||||
case FilterFetch:
|
||||
if len(c.Value) > 0 {
|
||||
extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventURLPath, concatValues(c.Value)))
|
||||
}
|
||||
case FilterFetchStatusCode:
|
||||
if len(c.Value) > 0 {
|
||||
extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventStatus, concatValues(c.Value)))
|
||||
}
|
||||
default:
|
||||
// placeholder if needed
|
||||
}
|
||||
}
|
||||
if len(extras) > 0 {
|
||||
w = append(w, strings.Join(extras, " AND "))
|
||||
}
|
||||
return "(" + strings.Join(w, " AND ") + ")"
|
||||
}
|
||||
|
||||
func parseOperator(op string) string {
|
||||
// TODO implement this properly
|
||||
switch strings.ToLower(op) {
|
||||
case OperatorStringContains:
|
||||
return "LIKE"
|
||||
case OperatorStringIs, OperatorStringOn, "=", OperatorStringOnAny:
|
||||
return "="
|
||||
case OperatorStringStartsWith:
|
||||
return "LIKE"
|
||||
case OperatorStringEndsWith:
|
||||
// might interpret differently in real impl
|
||||
return "="
|
||||
default:
|
||||
return "="
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,11 +4,12 @@ import (
|
|||
"fmt"
|
||||
"log"
|
||||
"openreplay/backend/pkg/analytics/db"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type TimeSeriesQueryBuilder struct{}
|
||||
|
||||
func (t TimeSeriesQueryBuilder) Execute(p *Payload, conn db.Connector) (interface{}, error) {
|
||||
func (t TimeSeriesQueryBuilder) Execute(p Payload, conn db.Connector) (interface{}, error) {
|
||||
query, err := t.buildQuery(p)
|
||||
if err != nil {
|
||||
log.Fatalf("Error building query: %v", err)
|
||||
|
|
@ -23,7 +24,6 @@ func (t TimeSeriesQueryBuilder) Execute(p *Payload, conn db.Connector) (interfac
|
|||
defer rows.Close()
|
||||
|
||||
var results []DataPoint
|
||||
|
||||
for rows.Next() {
|
||||
var res DataPoint
|
||||
if err := rows.Scan(&res.Timestamp, &res.Count); err != nil {
|
||||
|
|
@ -37,7 +37,7 @@ func (t TimeSeriesQueryBuilder) Execute(p *Payload, conn db.Connector) (interfac
|
|||
return filled, nil
|
||||
}
|
||||
|
||||
func (t TimeSeriesQueryBuilder) buildQuery(p *Payload) (string, error) {
|
||||
func (t TimeSeriesQueryBuilder) buildQuery(p Payload) (string, error) {
|
||||
query := ""
|
||||
switch p.MetricOf {
|
||||
case "sessionCount":
|
||||
|
|
@ -50,67 +50,102 @@ func (t TimeSeriesQueryBuilder) buildQuery(p *Payload) (string, error) {
|
|||
return query, nil
|
||||
}
|
||||
|
||||
func (TimeSeriesQueryBuilder) buildSessionCountQuery(p *Payload) string {
|
||||
stepSize := int(getStepSize(p.StartTimestamp, p.EndTimestamp, p.Density, false, 1000))
|
||||
subquery := buildEventSubquery(p)
|
||||
return fmt.Sprintf(`SELECT toUnixTimestamp(
|
||||
toStartOfInterval(processed_sessions.datetime, INTERVAL %d second)
|
||||
) * 1000 AS timestamp,
|
||||
COUNT(processed_sessions.session_id) AS count
|
||||
FROM (
|
||||
%s
|
||||
) AS processed_sessions
|
||||
GROUP BY timestamp
|
||||
ORDER BY timestamp;`, stepSize, subquery)
|
||||
func (TimeSeriesQueryBuilder) buildSessionCountQuery(p Payload) string {
|
||||
eventConds, eventNames := buildEventConditions(p.Series[0].Filter.Filters)
|
||||
sessionConds := buildSessionConditions(p.Series[0].Filter.Filters)
|
||||
staticEvt := buildStaticEventWhere(p)
|
||||
sessWhere, sessJoin := buildStaticSessionWhere(p, sessionConds)
|
||||
eventsSubQuery := buildEventsSubQuery(eventConds, eventNames, staticEvt, sessWhere, sessJoin)
|
||||
mainQuery := buildMainQuery(p, eventsSubQuery)
|
||||
return mainQuery
|
||||
}
|
||||
|
||||
func (TimeSeriesQueryBuilder) buildUserCountQuery(p *Payload) string {
|
||||
stepSize := int(getStepSize(p.StartTimestamp, p.EndTimestamp, p.Density, false, 1000))
|
||||
subquery := buildEventSubquery(p)
|
||||
return fmt.Sprintf(`SELECT toUnixTimestamp(
|
||||
toStartOfInterval(processed_sessions.datetime, INTERVAL %d second)
|
||||
) * 1000 AS timestamp,
|
||||
COUNT(DISTINCT processed_sessions.user_id) AS count
|
||||
FROM (
|
||||
%s
|
||||
) AS processed_sessions
|
||||
GROUP BY timestamp
|
||||
ORDER BY timestamp;`, stepSize, subquery)
|
||||
func (TimeSeriesQueryBuilder) buildUserCountQuery(p Payload) string {
|
||||
eventConds, eventNames := buildEventConditions(p.Series[0].Filter.Filters)
|
||||
sessionConds := buildSessionConditions(p.Series[0].Filter.Filters)
|
||||
staticEvt := buildStaticEventWhere(p)
|
||||
sessWhere, sessJoin := buildStaticSessionWhere(p, sessionConds)
|
||||
eventsSubQuery := buildEventsSubQuery(eventConds, eventNames, staticEvt, sessWhere, sessJoin)
|
||||
mainQuery := buildMainQuery(p, eventsSubQuery)
|
||||
return mainQuery
|
||||
}
|
||||
|
||||
func FillMissingDataPoints(
|
||||
startTime, endTime int64,
|
||||
density int,
|
||||
neutral DataPoint,
|
||||
rows []DataPoint,
|
||||
timeCoefficient int64,
|
||||
) []DataPoint {
|
||||
if density <= 1 {
|
||||
return rows
|
||||
func buildEventsSubQuery(eventConds, eventNames []string, staticEvt, sessWhere, sessJoin string) string {
|
||||
if len(eventConds) == 0 && len(eventNames) == 0 {
|
||||
return fmt.Sprintf(noEventSubQueryTpl, sessJoin)
|
||||
}
|
||||
|
||||
stepSize := uint64(getStepSize(startTime, endTime, density, false, 1000))
|
||||
bucketSize := stepSize * uint64(timeCoefficient)
|
||||
|
||||
lookup := make(map[uint64]DataPoint)
|
||||
for _, dp := range rows {
|
||||
if dp.Timestamp < uint64(startTime) {
|
||||
continue
|
||||
}
|
||||
bucket := uint64(startTime) + (((dp.Timestamp - uint64(startTime)) / bucketSize) * bucketSize)
|
||||
lookup[bucket] = dp
|
||||
}
|
||||
|
||||
results := make([]DataPoint, 0, density)
|
||||
for i := 0; i < density; i++ {
|
||||
ts := uint64(startTime) + uint64(i)*bucketSize
|
||||
if dp, ok := lookup[ts]; ok {
|
||||
results = append(results, dp)
|
||||
} else {
|
||||
nd := neutral
|
||||
nd.Timestamp = ts
|
||||
results = append(results, nd)
|
||||
var evtNameClause string
|
||||
var unique []string
|
||||
for _, name := range eventNames {
|
||||
if !contains(unique, name) {
|
||||
unique = append(unique, name)
|
||||
}
|
||||
}
|
||||
return results
|
||||
if len(unique) > 0 {
|
||||
evtNameClause = fmt.Sprintf("AND main.`$event_name` IN (%s)", buildInClause(unique))
|
||||
}
|
||||
having := ""
|
||||
if len(eventConds) > 0 {
|
||||
having = buildHavingClause(eventConds)
|
||||
}
|
||||
evtWhere := staticEvt
|
||||
if len(eventConds) > 0 {
|
||||
evtWhere += " AND " + strings.Join(eventConds, " AND ")
|
||||
}
|
||||
return fmt.Sprintf(eventsSubQueryTpl, evtWhere, sessWhere, evtNameClause, having, sessJoin)
|
||||
}
|
||||
|
||||
func buildMainQuery(p Payload, subQuery string) string {
|
||||
step := int(getStepSize(p.StartTimestamp, p.EndTimestamp, p.Density, false, 1000))
|
||||
return fmt.Sprintf(mainQueryTpl, p.StartTimestamp, p.EndTimestamp, step, subQuery, step)
|
||||
}
|
||||
|
||||
var eventsSubQueryTpl = `
|
||||
SELECT multiIf(
|
||||
s.user_id IS NOT NULL AND s.user_id != '', s.user_id,
|
||||
s.user_anonymous_id IS NOT NULL AND s.user_anonymous_id != '', s.user_anonymous_id,
|
||||
toString(s.user_uuid)) AS user_id,
|
||||
s.datetime AS datetime
|
||||
FROM (
|
||||
SELECT main.session_id,
|
||||
MIN(main.created_at) AS first_event_ts,
|
||||
MAX(main.created_at) AS last_event_ts
|
||||
FROM product_analytics.events AS main
|
||||
WHERE %s
|
||||
AND main.session_id IN (
|
||||
SELECT s.session_id
|
||||
FROM experimental.sessions AS s
|
||||
WHERE %s
|
||||
)
|
||||
%s
|
||||
GROUP BY session_id
|
||||
%s
|
||||
INNER JOIN (
|
||||
SELECT *
|
||||
FROM experimental.sessions AS s
|
||||
WHERE %s
|
||||
) AS s ON (s.session_id = f.session_id)
|
||||
`
|
||||
|
||||
var noEventSubQueryTpl = `
|
||||
SELECT multiIf(
|
||||
s.user_id IS NOT NULL AND s.user_id != '', s.user_id,
|
||||
s.user_anonymous_id IS NOT NULL AND s.user_anonymous_id != '', s.user_anonymous_id,
|
||||
toString(s.user_uuid)) AS user_id,
|
||||
s.datetime AS datetime
|
||||
FROM experimental.sessions AS s
|
||||
WHERE %s
|
||||
`
|
||||
|
||||
var mainQueryTpl = `
|
||||
SELECT gs.generate_series AS timestamp,
|
||||
COALESCE(COUNT(DISTINCT processed_sessions.user_id), 0) AS count
|
||||
FROM generate_series(%d, %d, %d) AS gs
|
||||
LEFT JOIN (
|
||||
%s
|
||||
) AS processed_sessions ON (TRUE)
|
||||
WHERE processed_sessions.datetime >= toDateTime(timestamp / 1000)
|
||||
AND processed_sessions.datetime < toDateTime((timestamp + %d) / 1000)
|
||||
GROUP BY timestamp
|
||||
ORDER BY timestamp;
|
||||
`
|
||||
|
|
|
|||
|
|
@ -68,12 +68,14 @@ type MetricPayload struct {
|
|||
Series []Series `json:"series"`
|
||||
}
|
||||
|
||||
type FilterGroup struct {
|
||||
Filters []Filter `json:"filters"`
|
||||
EventsOrder EventOrder `json:"eventsOrder"`
|
||||
}
|
||||
|
||||
type Series struct {
|
||||
Name string `json:"name"`
|
||||
Filter struct {
|
||||
Filters []Filter `json:"filters"`
|
||||
EventsOrder EventOrder `json:"eventsOrder"`
|
||||
} `json:"filter"`
|
||||
Name string `json:"name"`
|
||||
Filter FilterGroup `json:"filter"`
|
||||
}
|
||||
|
||||
type Filter struct {
|
||||
|
|
@ -81,6 +83,7 @@ type Filter struct {
|
|||
IsEvent bool `json:"isEvent"`
|
||||
Value []string `json:"value"`
|
||||
Operator string `json:"operator"`
|
||||
Source string `json:"source,omitempty"`
|
||||
Filters []Filter `json:"filters"`
|
||||
}
|
||||
|
||||
|
|
@ -109,11 +112,10 @@ const (
|
|||
FilterClick FilterType = "click"
|
||||
FilterInput FilterType = "input"
|
||||
FilterLocation FilterType = "location"
|
||||
FilterTag FilterType = "tag"
|
||||
FilterCustom FilterType = "customEvent"
|
||||
FilterFetch FilterType = "fetch"
|
||||
FilterFetchStatusCode FilterType = "status"
|
||||
FilterTag FilterType = "tag"
|
||||
FilterNetworkRequest FilterType = "fetch"
|
||||
FilterFetchStatusCode FilterType = "fetchStatusCode" // Subfilter
|
||||
FilterGraphQLRequest FilterType = "graphql"
|
||||
FilterStateAction FilterType = "stateAction"
|
||||
FilterError FilterType = "error"
|
||||
|
|
@ -139,7 +141,7 @@ const (
|
|||
OperatorStringIsNot = "isNot"
|
||||
OperatorStringIsUndefined = "isUndefined"
|
||||
OperatorStringNotOn = "notOn"
|
||||
OperatorStringContains = "contains"
|
||||
OperatorContains = "contains"
|
||||
OperatorStringNotContains = "notContains"
|
||||
OperatorStringStartsWith = "startsWith"
|
||||
OperatorStringEndsWith = "endsWith"
|
||||
|
|
|
|||
|
|
@ -13,10 +13,10 @@ type Payload struct {
|
|||
}
|
||||
|
||||
type QueryBuilder interface {
|
||||
Execute(p *Payload, conn db.Connector) (interface{}, error)
|
||||
Execute(p Payload, conn db.Connector) (interface{}, error)
|
||||
}
|
||||
|
||||
func NewQueryBuilder(p *Payload) (QueryBuilder, error) {
|
||||
func NewQueryBuilder(p Payload) (QueryBuilder, error) {
|
||||
switch p.MetricType {
|
||||
case MetricTypeTimeseries:
|
||||
return TimeSeriesQueryBuilder{}, nil
|
||||
|
|
@ -29,122 +29,496 @@ func NewQueryBuilder(p *Payload) (QueryBuilder, error) {
|
|||
}
|
||||
}
|
||||
|
||||
func buildEventSubquery(p *Payload) string {
|
||||
baseEventsWhere := buildBaseEventsWhere(p)
|
||||
sequenceCond := buildSequenceCondition(p.Series)
|
||||
sessionsWhere := buildSessionsWhere(p)
|
||||
//func pickIDField(p Payload) string {
|
||||
// if p.MetricOf == "userCount" {
|
||||
// return "user_id"
|
||||
// }
|
||||
// return "session_id"
|
||||
//}
|
||||
|
||||
if sequenceCond.seqPattern == "" {
|
||||
return fmt.Sprintf(`
|
||||
SELECT s.%[1]s AS %[1]s,
|
||||
s.datetime AS datetime
|
||||
FROM (
|
||||
SELECT main.session_id,
|
||||
MIN(main.created_at) AS first_event_ts,
|
||||
MAX(main.created_at) AS last_event_ts
|
||||
FROM product_analytics.events AS main
|
||||
WHERE %[2]s
|
||||
GROUP BY session_id
|
||||
) AS f
|
||||
INNER JOIN (
|
||||
SELECT *
|
||||
FROM experimental.sessions AS s
|
||||
WHERE %[3]s
|
||||
) AS s ON (s.session_id = f.session_id)
|
||||
`, pickIDField(p), baseEventsWhere, sessionsWhere)
|
||||
}
|
||||
//func buildBaseEventsWhere(p Payload) string {
|
||||
// ts := fmt.Sprintf(
|
||||
// `(main.created_at >= toDateTime(%d / 1000) AND main.created_at <= toDateTime(%d / 1000))`,
|
||||
// p.StartTimestamp,
|
||||
// p.EndTimestamp,
|
||||
// )
|
||||
// return fmt.Sprintf(`main.project_id = %d AND %s`, p.ProjectId, ts)
|
||||
//}
|
||||
|
||||
return fmt.Sprintf(`
|
||||
SELECT s.%[1]s AS %[1]s,
|
||||
s.datetime AS datetime
|
||||
FROM (
|
||||
SELECT main.session_id,
|
||||
MIN(main.created_at) AS first_event_ts,
|
||||
MAX(main.created_at) AS last_event_ts
|
||||
FROM product_analytics.events AS main
|
||||
WHERE %[2]s
|
||||
GROUP BY session_id
|
||||
HAVING sequenceMatch('%[3]s')(toDateTime(main.created_at), %[4]s)
|
||||
) AS f
|
||||
INNER JOIN (
|
||||
SELECT *
|
||||
FROM experimental.sessions AS s
|
||||
WHERE %[5]s
|
||||
) AS s ON (s.session_id = f.session_id)
|
||||
`, pickIDField(p), baseEventsWhere, sequenceCond.seqPattern, sequenceCond.seqEvents, sessionsWhere)
|
||||
}
|
||||
//func buildSessionsWhere(p Payload) string {
|
||||
// ts := fmt.Sprintf(
|
||||
// `(s.datetime >= toDateTime(%d / 1000) AND s.datetime <= toDateTime(%d / 1000))`,
|
||||
// p.StartTimestamp,
|
||||
// p.EndTimestamp,
|
||||
// )
|
||||
// return fmt.Sprintf(`s.project_id = %d AND isNotNull(s.duration) AND %s`, p.ProjectId, ts)
|
||||
//}
|
||||
|
||||
func pickIDField(p *Payload) string {
|
||||
if p.MetricOf == "userCount" {
|
||||
return "user_id"
|
||||
}
|
||||
return "session_id"
|
||||
}
|
||||
//type sequenceParts struct {
|
||||
// seqPattern string
|
||||
// seqEvents string
|
||||
//}
|
||||
|
||||
func buildBaseEventsWhere(p *Payload) string {
|
||||
ts := fmt.Sprintf(
|
||||
`(main.created_at >= toDateTime(%d / 1000) AND main.created_at <= toDateTime(%d / 1000))`,
|
||||
p.StartTimestamp,
|
||||
p.EndTimestamp,
|
||||
)
|
||||
return fmt.Sprintf(`main.project_id = %d AND %s`, p.ProjectId, ts)
|
||||
}
|
||||
//func buildSequenceCondition(series []Series) sequenceParts {
|
||||
// var events []string
|
||||
// for _, s := range series {
|
||||
// if len(s.Filter.Filters) > 0 {
|
||||
// events = append(events, buildOneSeriesSequence(s.Filter.Filters))
|
||||
// }
|
||||
// }
|
||||
// if len(events) < 2 {
|
||||
// return sequenceParts{"", ""}
|
||||
// }
|
||||
// pattern := ""
|
||||
// for i := 1; i <= len(events); i++ {
|
||||
// pattern += fmt.Sprintf("(?%d)", i)
|
||||
// }
|
||||
// return sequenceParts{
|
||||
// seqPattern: pattern,
|
||||
// seqEvents: strings.Join(events, ", "),
|
||||
// }
|
||||
//}
|
||||
|
||||
func buildSessionsWhere(p *Payload) string {
|
||||
ts := fmt.Sprintf(
|
||||
`(s.datetime >= toDateTime(%d / 1000) AND s.datetime <= toDateTime(%d / 1000))`,
|
||||
p.StartTimestamp,
|
||||
p.EndTimestamp,
|
||||
)
|
||||
return fmt.Sprintf(`s.project_id = %d AND isNotNull(s.duration) AND %s`, p.ProjectId, ts)
|
||||
}
|
||||
//func buildOneSeriesSequence(filters []Filter) string {
|
||||
// return strings.Join(buildFilterConditions(filters), " AND ")
|
||||
//}
|
||||
//
|
||||
//func buildFilterConditions(filters []Filter) []string {
|
||||
// var out []string
|
||||
// for _, f := range filters {
|
||||
// switch f.Type {
|
||||
// case FilterClick:
|
||||
// out = append(out,
|
||||
// fmt.Sprintf(`(main."$event_name" = 'CLICK' AND JSONExtractString(toString(main."$properties"), 'label') IN ('%s'))`,
|
||||
// strings.Join(f.Value, "','")))
|
||||
// case FilterInput:
|
||||
// out = append(out,
|
||||
// fmt.Sprintf(`(main."$event_name" = 'INPUT' AND JSONExtractString(toString(main."$properties"), 'label') IN ('%s'))`,
|
||||
// strings.Join(f.Value, "','")))
|
||||
//
|
||||
// default:
|
||||
// out = append(out,
|
||||
// fmt.Sprintf(`(main."$event_name" = '%s')`, strings.ToUpper(string(f.Type))))
|
||||
// }
|
||||
// }
|
||||
// return out
|
||||
//}
|
||||
|
||||
type sequenceParts struct {
|
||||
seqPattern string
|
||||
seqEvents string
|
||||
}
|
||||
|
||||
func buildSequenceCondition(series []Series) sequenceParts {
|
||||
var events []string
|
||||
for _, s := range series {
|
||||
if len(s.Filter.Filters) > 0 {
|
||||
events = append(events, buildOneSeriesSequence(s.Filter.Filters))
|
||||
func partitionFilters(filters []Filter) (sessionFilters []Filter, eventFilters []Filter) {
|
||||
for _, f := range filters {
|
||||
if f.IsEvent {
|
||||
eventFilters = append(eventFilters, f)
|
||||
} else {
|
||||
sessionFilters = append(sessionFilters, f)
|
||||
}
|
||||
}
|
||||
if len(events) < 2 {
|
||||
return sequenceParts{"", ""}
|
||||
}
|
||||
pattern := ""
|
||||
for i := 1; i <= len(events); i++ {
|
||||
pattern += fmt.Sprintf("(?%d)", i)
|
||||
}
|
||||
return sequenceParts{
|
||||
seqPattern: pattern,
|
||||
seqEvents: strings.Join(events, ", "),
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func buildOneSeriesSequence(filters []Filter) string {
|
||||
return strings.Join(buildFilterConditions(filters), " AND ")
|
||||
}
|
||||
func buildEventsWhere(filters []Filter, order EventOrder) (eventConditions []string, having string) {
|
||||
basicEventTypes := "(" +
|
||||
strings.Join([]string{
|
||||
fmt.Sprintf("%s = 'CLICK'", ColEventName),
|
||||
fmt.Sprintf("%s = 'INPUT'", ColEventName),
|
||||
fmt.Sprintf("%s = 'LOCATION'", ColEventName),
|
||||
fmt.Sprintf("%s = 'CUSTOM'", ColEventName),
|
||||
fmt.Sprintf("%s = 'REQUEST'", ColEventName),
|
||||
}, " OR ") + ")"
|
||||
|
||||
func buildFilterConditions(filters []Filter) []string {
|
||||
var out []string
|
||||
var seq []string
|
||||
for _, f := range filters {
|
||||
switch f.Type {
|
||||
case FilterClick:
|
||||
out = append(out,
|
||||
fmt.Sprintf(`(main."$event_name" = 'CLICK' AND JSONExtractString(toString(main."$properties"), 'label') IN ('%s'))`,
|
||||
strings.Join(f.Value, "','")))
|
||||
seq = append(seq, seqCond("CLICK", "selector", f))
|
||||
case FilterInput:
|
||||
out = append(out,
|
||||
fmt.Sprintf(`(main."$event_name" = 'INPUT' AND JSONExtractString(toString(main."$properties"), 'label') IN ('%s'))`,
|
||||
strings.Join(f.Value, "','")))
|
||||
|
||||
seq = append(seq, seqCond("INPUT", "label", f))
|
||||
case FilterLocation:
|
||||
seq = append(seq, seqCond("LOCATION", "url_path", f))
|
||||
case FilterCustom:
|
||||
seq = append(seq, seqCond("CUSTOM", "name", f))
|
||||
case FilterFetch:
|
||||
seq = append(seq, seqFetchCond("REQUEST", f))
|
||||
case FilterFetchStatusCode:
|
||||
seq = append(seq, seqCond("REQUEST", "status", f))
|
||||
default:
|
||||
out = append(out,
|
||||
fmt.Sprintf(`(main."$event_name" = '%s')`, strings.ToUpper(string(f.Type))))
|
||||
seq = append(seq, fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(string(f.Type))))
|
||||
}
|
||||
}
|
||||
return out
|
||||
eventConditions = []string{basicEventTypes}
|
||||
|
||||
// then => sequenceMatch
|
||||
// or => OR
|
||||
// and => AND
|
||||
switch order {
|
||||
case EventOrderThen:
|
||||
var pattern []string
|
||||
for i := range seq {
|
||||
pattern = append(pattern, fmt.Sprintf("(?%d)", i+1))
|
||||
}
|
||||
having = fmt.Sprintf("sequenceMatch('%s')(\n%s,\n%s)",
|
||||
strings.Join(pattern, ""), fmt.Sprintf("toUnixTimestamp(%s)", ColEventTime), strings.Join(seq, ",\n"))
|
||||
case EventOrderAnd:
|
||||
// build AND
|
||||
having = strings.Join(seq, " AND ")
|
||||
case EventOrderOr:
|
||||
default:
|
||||
// default => OR
|
||||
var orParts []string
|
||||
for _, p := range seq {
|
||||
orParts = append(orParts, "("+p+")")
|
||||
}
|
||||
having = strings.Join(orParts, " OR ")
|
||||
}
|
||||
return eventConditions, having
|
||||
}
|
||||
|
||||
func buildSessionWhere(filters []Filter) []string {
|
||||
var conds []string
|
||||
for _, f := range filters {
|
||||
switch f.Type {
|
||||
case FilterUserCountry:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCountry, concatValues(f.Value)))
|
||||
case FilterUserCity:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserCity, concatValues(f.Value)))
|
||||
case FilterUserState:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserState, concatValues(f.Value)))
|
||||
case FilterUserId:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserID, concatValues(f.Value)))
|
||||
case FilterUserAnonymousId:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserAnonymousID, concatValues(f.Value)))
|
||||
case FilterUserOs:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserOS, concatValues(f.Value)))
|
||||
case FilterUserBrowser:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserBrowser, concatValues(f.Value)))
|
||||
case FilterUserDevice:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDevice, concatValues(f.Value)))
|
||||
case FilterPlatform:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUserDeviceType, concatValues(f.Value)))
|
||||
case FilterRevId:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColRevID, concatValues(f.Value)))
|
||||
case FilterReferrer:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColBaseReferrer, concatValues(f.Value)))
|
||||
case FilterDuration:
|
||||
if len(f.Value) == 2 {
|
||||
conds = append(conds, fmt.Sprintf("%s >= '%s'", ColDuration, f.Value[0]))
|
||||
conds = append(conds, fmt.Sprintf("%s <= '%s'", ColDuration, f.Value[1]))
|
||||
}
|
||||
case FilterUtmSource:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmSource, concatValues(f.Value)))
|
||||
case FilterUtmMedium:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmMedium, concatValues(f.Value)))
|
||||
case FilterUtmCampaign:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColUtmCampaign, concatValues(f.Value)))
|
||||
case FilterMetadata:
|
||||
conds = append(conds, fmt.Sprintf("%s = toString('%s')", ColMetadata1, concatValues(f.Value)))
|
||||
}
|
||||
}
|
||||
// adding /n to each condition for better readability, can be removed.
|
||||
for i := range conds {
|
||||
conds[i] += "\n"
|
||||
}
|
||||
return conds
|
||||
}
|
||||
|
||||
func concatValues(v []string) string {
|
||||
return strings.Join(v, "")
|
||||
}
|
||||
|
||||
func seqCond(eventName, key string, f Filter) string {
|
||||
op := parseOperator(f.Operator)
|
||||
return fmt.Sprintf("(%s = '%s' AND JSONExtractString(toString(%s), '%s') %s '%s')",
|
||||
ColEventName, strings.ToUpper(eventName), ColEventProperties, key, op, concatValues(f.Value))
|
||||
}
|
||||
|
||||
func seqFetchCond(eventName string, f Filter) string {
|
||||
w := []string{fmt.Sprintf("(%s = '%s')", ColEventName, strings.ToUpper(eventName))}
|
||||
var extras []string
|
||||
for _, c := range f.Filters {
|
||||
switch c.Type {
|
||||
case FilterFetch:
|
||||
if len(c.Value) > 0 {
|
||||
extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventURLPath, concatValues(c.Value)))
|
||||
}
|
||||
case FilterFetchStatusCode:
|
||||
if len(c.Value) > 0 {
|
||||
extras = append(extras, fmt.Sprintf("(%s = '%s')", ColEventStatus, concatValues(c.Value)))
|
||||
}
|
||||
default:
|
||||
// placeholder if needed
|
||||
}
|
||||
}
|
||||
if len(extras) > 0 {
|
||||
w = append(w, strings.Join(extras, " AND "))
|
||||
}
|
||||
return "(" + strings.Join(w, " AND ") + ")"
|
||||
}
|
||||
|
||||
func parseOperator(op string) string {
|
||||
// TODO implement this properly
|
||||
switch strings.ToLower(op) {
|
||||
case OperatorContains:
|
||||
return "LIKE"
|
||||
case OperatorStringIs, OperatorStringOn, "=", OperatorStringOnAny:
|
||||
return "="
|
||||
case OperatorStringStartsWith:
|
||||
return "LIKE"
|
||||
case OperatorStringEndsWith:
|
||||
// might interpret differently in real impl
|
||||
return "="
|
||||
default:
|
||||
return "="
|
||||
}
|
||||
}
|
||||
|
||||
func buildEventConditions(filters []Filter) (conds, names []string) {
|
||||
for _, f := range filters {
|
||||
if f.IsEvent {
|
||||
switch f.Type {
|
||||
case FilterClick:
|
||||
c := buildCond("JSONExtractString(toString(main.`$properties`), 'label')", f.Value, "is")
|
||||
if c != "" {
|
||||
conds = append(conds, c)
|
||||
}
|
||||
names = append(names, "CLICK")
|
||||
case FilterInput:
|
||||
c := buildCond("JSONExtractString(toString(main.`$properties`), 'label')", f.Value, f.Operator)
|
||||
if c != "" {
|
||||
conds = append(conds, c)
|
||||
}
|
||||
names = append(names, "INPUT")
|
||||
case FilterLocation:
|
||||
c := buildCond("JSONExtractString(toString(main.`$properties`), 'url_path')", f.Value, f.Operator)
|
||||
if c != "" {
|
||||
conds = append(conds, c)
|
||||
}
|
||||
names = append(names, "LOCATION")
|
||||
case FilterCustom:
|
||||
c := buildCond("JSONExtractString(toString(main.`$properties`), 'name')", f.Value, f.Operator)
|
||||
if c != "" {
|
||||
conds = append(conds, c)
|
||||
}
|
||||
names = append(names, "CUSTOM")
|
||||
case FilterFetch:
|
||||
var fetchConds []string
|
||||
for _, nf := range f.Filters {
|
||||
switch nf.Type {
|
||||
case "fetchUrl":
|
||||
c := buildCond("JSONExtractString(toString(main.`$properties`), 'url_path')", nf.Value, f.Operator)
|
||||
if c != "" {
|
||||
fetchConds = append(fetchConds, c)
|
||||
}
|
||||
case "fetchStatusCode":
|
||||
c := buildCond("JSONExtractFloat(toString(main.`$properties`), 'status')", nf.Value, f.Operator)
|
||||
if c != "" {
|
||||
fetchConds = append(fetchConds, c)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(fetchConds) > 0 {
|
||||
conds = append(conds, strings.Join(fetchConds, " AND "))
|
||||
}
|
||||
names = append(names, "REQUEST")
|
||||
case FilterTag:
|
||||
c := buildCond("JSONExtractString(toString(main.`$properties`), 'tag')", f.Value, f.Operator)
|
||||
if c != "" {
|
||||
conds = append(conds, c)
|
||||
}
|
||||
names = append(names, "TAG")
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func buildSessionConditions(filters []Filter) []string {
|
||||
var conds []string
|
||||
for _, f := range filters {
|
||||
if !f.IsEvent {
|
||||
switch f.Type {
|
||||
case FilterUserCountry:
|
||||
conds = append(conds, buildCond("s.user_country", f.Value, f.Operator))
|
||||
case FilterUserCity:
|
||||
conds = append(conds, buildCond("s.user_city", f.Value, f.Operator))
|
||||
case FilterUserState:
|
||||
conds = append(conds, buildCond("s.user_state", f.Value, f.Operator))
|
||||
case FilterUserId:
|
||||
conds = append(conds, buildCond("s.user_id", f.Value, f.Operator))
|
||||
case FilterUserAnonymousId:
|
||||
conds = append(conds, buildCond("s.user_anonymous_id", f.Value, f.Operator))
|
||||
case FilterUserOs:
|
||||
conds = append(conds, buildCond("s.user_os", f.Value, f.Operator))
|
||||
case FilterUserBrowser:
|
||||
conds = append(conds, buildCond("s.user_browser", f.Value, f.Operator))
|
||||
case FilterUserDevice:
|
||||
conds = append(conds, buildCond("s.user_device", f.Value, f.Operator))
|
||||
case FilterPlatform:
|
||||
conds = append(conds, buildCond("s.user_device_type", f.Value, f.Operator))
|
||||
case FilterRevId:
|
||||
conds = append(conds, buildCond("s.rev_id", f.Value, f.Operator))
|
||||
case FilterReferrer:
|
||||
conds = append(conds, buildCond("s.base_referrer", f.Value, f.Operator))
|
||||
case FilterDuration:
|
||||
if len(f.Value) == 2 {
|
||||
conds = append(conds, fmt.Sprintf("s.duration >= '%s'", f.Value[0]))
|
||||
conds = append(conds, fmt.Sprintf("s.duration <= '%s'", f.Value[1]))
|
||||
}
|
||||
case FilterUtmSource:
|
||||
conds = append(conds, buildCond("s.utm_source", f.Value, f.Operator))
|
||||
case FilterUtmMedium:
|
||||
conds = append(conds, buildCond("s.utm_medium", f.Value, f.Operator))
|
||||
case FilterUtmCampaign:
|
||||
conds = append(conds, buildCond("s.utm_campaign", f.Value, f.Operator))
|
||||
case FilterMetadata:
|
||||
if f.Source != "" {
|
||||
conds = append(conds, buildCond(fmt.Sprintf("s.%s", f.Source), f.Value, f.Operator))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return conds
|
||||
}
|
||||
|
||||
func buildCond(expr string, values []string, operator string) string {
|
||||
if len(values) == 0 {
|
||||
return ""
|
||||
}
|
||||
switch operator {
|
||||
case "contains":
|
||||
var conds []string
|
||||
for _, v := range values {
|
||||
conds = append(conds, fmt.Sprintf("%s ILIKE '%%%s%%'", expr, v))
|
||||
}
|
||||
if len(conds) > 1 {
|
||||
return "(" + strings.Join(conds, " OR ") + ")"
|
||||
}
|
||||
return conds[0]
|
||||
case "notContains":
|
||||
var conds []string
|
||||
for _, v := range values {
|
||||
conds = append(conds, fmt.Sprintf("NOT (%s ILIKE '%%%s%%')", expr, v))
|
||||
}
|
||||
if len(conds) > 1 {
|
||||
return "(" + strings.Join(conds, " OR ") + ")"
|
||||
}
|
||||
return conds[0]
|
||||
case "startsWith":
|
||||
var conds []string
|
||||
for _, v := range values {
|
||||
conds = append(conds, fmt.Sprintf("%s ILIKE '%s%%'", expr, v))
|
||||
}
|
||||
if len(conds) > 1 {
|
||||
return "(" + strings.Join(conds, " OR ") + ")"
|
||||
}
|
||||
return conds[0]
|
||||
case "endsWith":
|
||||
var conds []string
|
||||
for _, v := range values {
|
||||
conds = append(conds, fmt.Sprintf("%s ILIKE '%%%s'", expr, v))
|
||||
}
|
||||
if len(conds) > 1 {
|
||||
return "(" + strings.Join(conds, " OR ") + ")"
|
||||
}
|
||||
return conds[0]
|
||||
default:
|
||||
if len(values) > 1 {
|
||||
var quoted []string
|
||||
for _, v := range values {
|
||||
quoted = append(quoted, fmt.Sprintf("'%s'", v))
|
||||
}
|
||||
return fmt.Sprintf("%s IN (%s)", expr, strings.Join(quoted, ","))
|
||||
}
|
||||
return fmt.Sprintf("%s = '%s'", expr, values[0])
|
||||
}
|
||||
}
|
||||
|
||||
func buildInClause(values []string) string {
|
||||
var quoted []string
|
||||
for _, v := range values {
|
||||
quoted = append(quoted, fmt.Sprintf("'%s'", v))
|
||||
}
|
||||
return strings.Join(quoted, ",")
|
||||
}
|
||||
|
||||
func buildStaticEventWhere(p Payload) string {
|
||||
return strings.Join([]string{
|
||||
fmt.Sprintf("main.project_id = %d", p.ProjectId),
|
||||
fmt.Sprintf("main.created_at >= toDateTime(%d / 1000)", p.StartTimestamp),
|
||||
fmt.Sprintf("main.created_at <= toDateTime(%d / 1000)", p.EndTimestamp),
|
||||
}, " AND ")
|
||||
}
|
||||
|
||||
func buildStaticSessionWhere(p Payload, sessionConds []string) (string, string) {
|
||||
static := []string{fmt.Sprintf("s.project_id = %d", p.ProjectId)}
|
||||
sessWhere := strings.Join(static, " AND ")
|
||||
if len(sessionConds) > 0 {
|
||||
sessWhere += " AND " + strings.Join(sessionConds, " AND ")
|
||||
}
|
||||
sessJoin := strings.Join(append(static, append(sessionConds,
|
||||
fmt.Sprintf("s.datetime >= toDateTime(%d / 1000)", p.StartTimestamp),
|
||||
fmt.Sprintf("s.datetime <= toDateTime(%d / 1000)", p.EndTimestamp))...), " AND ")
|
||||
return sessWhere, sessJoin
|
||||
}
|
||||
|
||||
func buildHavingClause(conds []string) string {
|
||||
seqConds := append([]string{}, conds...)
|
||||
if len(seqConds) == 1 {
|
||||
seqConds = append(seqConds, "1")
|
||||
}
|
||||
if len(seqConds) == 0 {
|
||||
return ""
|
||||
}
|
||||
var parts []string
|
||||
for i := range seqConds {
|
||||
parts = append(parts, fmt.Sprintf("(?%d)", i+1))
|
||||
}
|
||||
pattern := strings.Join(parts, "")
|
||||
args := []string{"toDateTime(main.created_at)"}
|
||||
args = append(args, seqConds...)
|
||||
return fmt.Sprintf("HAVING sequenceMatch('%s')(%s)) AS f", pattern, strings.Join(args, ",\n "))
|
||||
}
|
||||
|
||||
func contains(slice []string, s string) bool {
|
||||
for _, v := range slice {
|
||||
if v == s {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func FillMissingDataPoints(
|
||||
startTime, endTime int64,
|
||||
density int,
|
||||
neutral DataPoint,
|
||||
rows []DataPoint,
|
||||
timeCoefficient int64,
|
||||
) []DataPoint {
|
||||
if density <= 1 {
|
||||
return rows
|
||||
}
|
||||
|
||||
stepSize := uint64(getStepSize(startTime, endTime, density, false, 1000))
|
||||
bucketSize := stepSize * uint64(timeCoefficient)
|
||||
|
||||
lookup := make(map[uint64]DataPoint)
|
||||
for _, dp := range rows {
|
||||
if dp.Timestamp < uint64(startTime) {
|
||||
continue
|
||||
}
|
||||
bucket := uint64(startTime) + (((dp.Timestamp - uint64(startTime)) / bucketSize) * bucketSize)
|
||||
lookup[bucket] = dp
|
||||
}
|
||||
|
||||
results := make([]DataPoint, 0, density)
|
||||
for i := 0; i < density; i++ {
|
||||
ts := uint64(startTime) + uint64(i)*bucketSize
|
||||
if dp, ok := lookup[ts]; ok {
|
||||
results = append(results, dp)
|
||||
} else {
|
||||
nd := neutral
|
||||
nd.Timestamp = ts
|
||||
results = append(results, nd)
|
||||
}
|
||||
}
|
||||
return results
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue