Made standart project layout for ender service
This commit is contained in:
parent
50bbd0fe98
commit
700ef0dcc6
14 changed files with 38 additions and 16 deletions
|
|
@ -23,7 +23,7 @@ function build_service() {
|
|||
image="$1"
|
||||
echo "BUILDING $image"
|
||||
case "$image" in
|
||||
http | db)
|
||||
http | db | ender)
|
||||
echo build http
|
||||
docker build -t ${DOCKER_REPO:-'local'}/$image:${git_sha1} --build-arg SERVICE_NAME=$image -f ./cmd/Dockerfile .
|
||||
[[ $PUSH_IMAGE -eq 1 ]] && {
|
||||
|
|
|
|||
|
|
@ -2,37 +2,34 @@ package main
|
|||
|
||||
import (
|
||||
"log"
|
||||
"openreplay/backend/internal/builder"
|
||||
"openreplay/backend/internal/config/ender"
|
||||
"time"
|
||||
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"openreplay/backend/pkg/env"
|
||||
"openreplay/backend/pkg/intervals"
|
||||
logger "openreplay/backend/pkg/log"
|
||||
"openreplay/backend/pkg/messages"
|
||||
"openreplay/backend/pkg/queue"
|
||||
"openreplay/backend/pkg/queue/types"
|
||||
"openreplay/backend/services/ender/builder"
|
||||
)
|
||||
|
||||
func main() {
|
||||
log.SetFlags(log.LstdFlags | log.LUTC | log.Llongfile)
|
||||
|
||||
GROUP_EVENTS := env.String("GROUP_ENDER")
|
||||
TOPIC_TRIGGER := env.String("TOPIC_TRIGGER")
|
||||
cfg := ender.New()
|
||||
|
||||
builderMap := builder.NewBuilderMap()
|
||||
|
||||
statsLogger := logger.NewQueueStats(env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"))
|
||||
|
||||
statsLogger := logger.NewQueueStats(cfg.LoggerTimeout)
|
||||
producer := queue.NewProducer()
|
||||
consumer := queue.NewMessageConsumer(
|
||||
GROUP_EVENTS,
|
||||
cfg.GroupEvents,
|
||||
[]string{
|
||||
env.String("TOPIC_RAW_WEB"),
|
||||
env.String("TOPIC_RAW_IOS"),
|
||||
cfg.TopicRawWeb,
|
||||
cfg.TopicRawIOS,
|
||||
},
|
||||
func(sessionID uint64, msg messages.Message, meta *types.Meta) {
|
||||
statsLogger.Collect(sessionID, meta)
|
||||
|
|
@ -51,17 +48,17 @@ func main() {
|
|||
select {
|
||||
case sig := <-sigchan:
|
||||
log.Printf("Caught signal %v: terminating\n", sig)
|
||||
producer.Close(2000)
|
||||
consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP)
|
||||
producer.Close(cfg.ProducerTimeout)
|
||||
consumer.Commit()
|
||||
consumer.Close()
|
||||
os.Exit(0)
|
||||
case <-tick:
|
||||
builderMap.IterateReadyMessages(time.Now().UnixMilli(), func(sessionID uint64, readyMsg messages.Message) {
|
||||
producer.Produce(TOPIC_TRIGGER, sessionID, messages.Encode(readyMsg))
|
||||
producer.Produce(cfg.TopicTrigger, sessionID, messages.Encode(readyMsg))
|
||||
})
|
||||
// TODO: why exactly do we need Flush here and not in any other place?
|
||||
producer.Flush(2000)
|
||||
consumer.CommitBack(intervals.EVENTS_BACK_COMMIT_GAP)
|
||||
producer.Flush(cfg.ProducerTimeout)
|
||||
consumer.Commit()
|
||||
default:
|
||||
if err := consumer.ConsumeNext(); err != nil {
|
||||
log.Fatalf("Error on consuming: %v", err)
|
||||
25
backend/internal/config/ender/config.go
Normal file
25
backend/internal/config/ender/config.go
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
package ender
|
||||
|
||||
import (
|
||||
"openreplay/backend/pkg/env"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
GroupEvents string
|
||||
TopicTrigger string
|
||||
LoggerTimeout int
|
||||
TopicRawWeb string
|
||||
TopicRawIOS string
|
||||
ProducerTimeout int
|
||||
}
|
||||
|
||||
func New() *Config {
|
||||
return &Config{
|
||||
GroupEvents: env.String("GROUP_ENDER"),
|
||||
TopicTrigger: env.String("TOPIC_TRIGGER"),
|
||||
LoggerTimeout: env.Int("LOG_QUEUE_STATS_INTERVAL_SEC"),
|
||||
TopicRawWeb: env.String("TOPIC_RAW_WEB"),
|
||||
TopicRawIOS: env.String("TOPIC_RAW_IOS"),
|
||||
ProducerTimeout: 2000,
|
||||
}
|
||||
}
|
||||
0
backend/services/ender/build_hack
Normal file
0
backend/services/ender/build_hack
Normal file
Loading…
Add table
Reference in a new issue