Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ jobs:
id: filter
with:
filters: |
flight-reader:
- 'cmd/flight-reader/**'
flight-processor:
- 'cmd/flight-processor/**'
flight-poster:
- 'cmd/flight-poster/**'
reader:
- 'cmd/reader/**'
processor:
- 'cmd/processor/**'
poster:
- 'cmd/poster/**'
pkg:
- 'pkg/**'
internal:
Expand Down
31 changes: 15 additions & 16 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
PROTO_DIR = proto
OUTPUT_DIR = src
.PHONY: help
help:
@echo "Available make targets:"
@grep '^\.PHONY:' Makefile | sed 's/\.PHONY: //g' | tr ' ' '\n' | sort | uniq | \
while read target; do \
echo " make $$target"; \
done

.PHONY: summarizer
summarizer:
@mkdir -p ${PROTO_DIR}/${OUTPUT_DIR}/summarizer
@protoc \
-I${PROTO_DIR} \
--go_out=${PROTO_DIR}/${OUTPUT_DIR}/summarizer --go_opt=paths=source_relative \
--go-grpc_out=${PROTO_DIR}/${OUTPUT_DIR}/summarizer --go-grpc_opt=paths=source_relative \
${PROTO_DIR}/summarizer.proto
.PHONY: reader
reader:
docker build -t flight-reader -f docker/reader.Dockerfile .

.PHONY: processor
processor:
docker build -t flight-processor -f docker/processor.Dockerfile .

.PHONY: poster
poster:
@mkdir -p ${PROTO_DIR}/${OUTPUT_DIR}/poster
@protoc \
-I${PROTO_DIR} \
--go_out=${PROTO_DIR}/${OUTPUT_DIR}/poster --go_opt=paths=source_relative \
--go-grpc_out=${PROTO_DIR}/${OUTPUT_DIR}/poster --go-grpc_opt=paths=source_relative \
${PROTO_DIR}/poster.proto
docker build -t flight-poster -f docker/poster.Dockerfile .
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
67 changes: 67 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@

services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9093:9093"
- "9092:9092" # For internal use
- "9101:9101" # JMX port for monitoring
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,HOST://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,HOST://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

kafka-init:
image: confluentinc/cp-kafka:latest
depends_on:
- kafka
entrypoint: ["/bin/bash", "-c"]
command: >
"
echo 'Waiting for Kafka to be ready...' &&
cub kafka-ready -b kafka:9092 1 20 &&
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic reader.flights --partitions 1 --replication-factor 1 &&
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic processor.statistic --partitions 1 --replication-factor 1
"

reader:
env_file:
- .env
build:
context: .
dockerfile: ./docker/reader.Dockerfile
ports:
- 8080:8080
restart: on-failure:5

processor:
env_file:
- .env
build:
context: .
dockerfile: ./docker/processor.Dockerfile
restart: on-failure:5

poster:
env_file:
- .env
build:
context: .
dockerfile: ./docker/poster.Dockerfile
restart: on-failure:5
6 changes: 3 additions & 3 deletions configs/reader-config.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
http_server:
port: 9099
timeout: 10
port: 8080
timeout: 75
http_client:
timeout: 10
timeout: 70
flight_api:
url: ''
user: ''
Expand Down
26 changes: 26 additions & 0 deletions docker/poster.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
FROM golang:1.24-alpine AS builder

ENV CGO_ENABLED=0
ENV GOOS=linux
ENV GOARCH=amd64

WORKDIR /app

COPY go.mod go.sum ./

RUN go mod download
RUN go mod verify

COPY pkg ./pkg
COPY cmd/poster ./cmd/poster
COPY internal/poster ./internal/poster
COPY configs/poster-config.yaml ./configs/poster-config.yaml

RUN go build -ldflags="-w -s" -o /app/bin/poster ./cmd/poster

FROM alpine:latest

WORKDIR /app
COPY --from=builder /app/bin/poster .
COPY --from=builder app/configs ./configs
CMD ["./poster"]
26 changes: 26 additions & 0 deletions docker/processor.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
FROM golang:1.24-alpine AS builder

ENV CGO_ENABLED=0
ENV GOOS=linux
ENV GOARCH=amd64

WORKDIR /app

COPY go.mod go.sum ./

RUN go mod download
RUN go mod verify

COPY pkg ./pkg
COPY cmd/processor ./cmd/processor
COPY internal/processor ./internal/processor
COPY configs/processor-config.yaml ./configs/processor-config.yaml

RUN go build -ldflags="-w -s" -o /app/bin/processor ./cmd/processor

FROM alpine:latest

WORKDIR /app
COPY --from=builder /app/bin/processor .
COPY --from=builder app/configs ./configs
CMD ["./processor"]
26 changes: 26 additions & 0 deletions docker/reader.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
FROM golang:1.24-alpine AS builder

ENV CGO_ENABLED=0
ENV GOOS=linux
ENV GOARCH=amd64

WORKDIR /app

COPY go.mod go.sum ./

RUN go mod download
RUN go mod verify

COPY pkg ./pkg
COPY cmd/reader ./cmd/reader
COPY internal/reader ./internal/reader
COPY configs/reader-config.yaml ./configs/reader-config.yaml

RUN go build -ldflags="-w -s" -o /app/bin/reader ./cmd/reader

FROM alpine:latest

WORKDIR /app
COPY --from=builder /app/bin/reader .
COPY --from=builder app/configs ./configs
CMD ["./reader"]
3 changes: 2 additions & 1 deletion internal/poster/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ type TwitterAPIConfig struct {
func LoadConfig() (*FlightPosterConfig, error) {
viper.SetConfigName("poster-config")
viper.SetConfigType("yaml")
viper.AddConfigPath("../../configs")
viper.AddConfigPath("../../../configs")
viper.AddConfigPath("../../configs")
viper.AddConfigPath("./configs")
viper.AutomaticEnv()
viper.SetEnvPrefix("FLIGHT_POSTER")
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_", "-", "_"))
Expand Down
3 changes: 2 additions & 1 deletion internal/processor/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ type SummarizerConfig struct {
func LoadConfig() (*FlightProcessorConfig, error) {
viper.SetConfigName("processor-config")
viper.SetConfigType("yaml")
viper.AddConfigPath("../../configs")
viper.AddConfigPath("../../../configs")
viper.AddConfigPath("../../configs")
viper.AddConfigPath("./configs")
viper.AutomaticEnv()
viper.SetEnvPrefix("FLIGHT_PROCESSOR")
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_", "-", "_"))
Expand Down
2 changes: 2 additions & 0 deletions internal/processor/service/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ processingLoop:
airport = string(msg.Value)
slog.Info("Started processing stream for airport", "airport", airport)
case "end_of_stream":

date := string(msg.Value)
slog.Info("Ended processing stream for airport", "date", date)

summary, err := p.summarizer.SummarizeFlights(flights, date, airport)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/processor/service/summarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func topNKeysByValue(m map[string]int, n int) []string {
for _, value := range buckets[i] {
result = append(result, value)
if len(result) == n {
break
return result
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion internal/reader/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ type RouteAPIConfig struct {
func LoadConfig() (*FlightReaderConfig, error) {
viper.SetConfigName("reader-config")
viper.SetConfigType("yaml")
viper.AddConfigPath("../../configs")
viper.AddConfigPath("../../../configs")
viper.AddConfigPath("../../configs")
viper.AddConfigPath("./configs")
viper.AutomaticEnv()
viper.SetEnvPrefix("FLIGHT_READER")
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_", "-", "_"))
Expand Down
12 changes: 6 additions & 6 deletions internal/reader/config/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ func TestLoadConfig_ValidConfigFile_ShouldSucceed(t *testing.T) {
cfg, err := config.LoadConfig()
require.NoError(t, err)
require.NotNil(t, cfg)
require.Equal(t, "9099", cfg.HTTPServerConfig.Port)
require.Equal(t, 10, cfg.HTTPServerConfig.Timeout)
require.Equal(t, 10, cfg.HTTPClientConfig.Timeout)
require.Equal(t, "8080", cfg.HTTPServerConfig.Port)
require.Equal(t, 75, cfg.HTTPServerConfig.Timeout)
require.Equal(t, 70, cfg.HTTPClientConfig.Timeout)
require.Equal(t, "test", cfg.FlightAPIClientConfig.URL)
require.Equal(t, "test", cfg.FlightAPIClientConfig.User)
require.Equal(t, "test", cfg.FlightAPIClientConfig.Pass)
Expand Down Expand Up @@ -92,9 +92,9 @@ func TestLoadConfig_EnvOverride_ShouldSucceed(t *testing.T) {
cfg, err := config.LoadConfig()
require.NoError(t, err)
require.NotNil(t, cfg)
require.Equal(t, "9099", cfg.HTTPServerConfig.Port)
require.Equal(t, 10, cfg.HTTPServerConfig.Timeout)
require.Equal(t, 10, cfg.HTTPClientConfig.Timeout)
require.Equal(t, "8080", cfg.HTTPServerConfig.Port)
require.Equal(t, 75, cfg.HTTPServerConfig.Timeout)
require.Equal(t, 70, cfg.HTTPClientConfig.Timeout)
require.Equal(t, "test", cfg.FlightAPIClientConfig.URL)
require.Equal(t, "test", cfg.FlightAPIClientConfig.User)
require.Equal(t, "test", cfg.FlightAPIClientConfig.Pass)
Expand Down
9 changes: 3 additions & 6 deletions internal/reader/service/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ func (r *Reader) processFlights(
return fmt.Errorf("failed to process flights: %w", err)
}

slog.Info("Fetched flights successfully", "airport", airport, "flights_count", len(flights))

if err := r.processRoute(ctx, flights, airport, date); err != nil {
return fmt.Errorf("failed to process routes: %w", err)
}
Expand Down Expand Up @@ -189,12 +191,7 @@ func (r *Reader) sendFlightAndRouteMessage(
}

func (r *Reader) sendStreamControlMessage(ctx context.Context, key, message string) error {
value, err := json.Marshal(map[string]string{"message": message})
if err != nil {
return fmt.Errorf("failed to marshal %s message: %w", key, err)
}

if err := r.messageWriter.WriteMessage(ctx, []byte(key), value); err != nil {
if err := r.messageWriter.WriteMessage(ctx, []byte(key), []byte(message)); err != nil {
return fmt.Errorf("failed to write %s message to the message queue: %w", key, err)
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/http/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ func TestServe_ServerError_ShouldError(t *testing.T) {
server, err := server.NewServer(cfg, http.HandlerFunc(testHandler))
require.NoError(t, err)

err = server.Serve(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
err = server.Serve(ctx)
require.ErrorContains(t, err, "failed to start HTTP server")
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/kafka/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"strings"
"time"

"github.com/twmb/franz-go/pkg/kgo"
Expand Down Expand Up @@ -59,8 +60,9 @@ func NewKafkaReader(cfg ReaderConfig) (*Reader, error) {
return nil, fmt.Errorf("kafka group ID is empty")
}

addresses := strings.Split(cfg.Address, ",")
opts := []kgo.Opt{
kgo.SeedBrokers([]string{cfg.Address}...),
kgo.SeedBrokers(addresses...),
kgo.ConsumerGroup(cfg.GroupID),
kgo.ConsumeTopics(cfg.Topic),
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kafka/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"strings"

"github.com/twmb/franz-go/pkg/kgo"
)
Expand Down Expand Up @@ -43,8 +44,9 @@ func NewKafkaWriter(cfg WriterConfig) (*Writer, error) {
return nil, fmt.Errorf("kafka topic is empty")
}

addresses := strings.Split(cfg.Address, ",")
opts := []kgo.Opt{
kgo.SeedBrokers([]string{cfg.Address}...),
kgo.SeedBrokers(addresses...),
kgo.DefaultProduceTopic(cfg.Topic),
}
client, err := kgo.NewClient(opts...)
Expand Down