diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 23e77e0..1d16eaf 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -18,12 +18,12 @@ jobs: id: filter with: filters: | - flight-reader: - - 'cmd/flight-reader/**' - flight-processor: - - 'cmd/flight-processor/**' - flight-poster: - - 'cmd/flight-poster/**' + reader: + - 'cmd/reader/**' + processor: + - 'cmd/processor/**' + poster: + - 'cmd/poster/**' pkg: - 'pkg/**' internal: diff --git a/Makefile b/Makefile index 36cb7c8..5fa6336 100644 --- a/Makefile +++ b/Makefile @@ -1,20 +1,19 @@ -PROTO_DIR = proto -OUTPUT_DIR = src +.PHONY: help +help: + @echo "Available make targets:" + @grep '^\.PHONY:' Makefile | sed 's/\.PHONY: //g' | tr ' ' '\n' | sort | uniq | \ + while read target; do \ + echo " make $$target"; \ + done -.PHONY: summarizer -summarizer: - @mkdir -p ${PROTO_DIR}/${OUTPUT_DIR}/summarizer - @protoc \ - -I${PROTO_DIR} \ - --go_out=${PROTO_DIR}/${OUTPUT_DIR}/summarizer --go_opt=paths=source_relative \ - --go-grpc_out=${PROTO_DIR}/${OUTPUT_DIR}/summarizer --go-grpc_opt=paths=source_relative \ - ${PROTO_DIR}/summarizer.proto +.PHONY: reader +reader: + docker build -t flight-reader -f docker/reader.Dockerfile . + +.PHONY: processor +processor: + docker build -t flight-processor -f docker/processor.Dockerfile . .PHONY: poster poster: - @mkdir -p ${PROTO_DIR}/${OUTPUT_DIR}/poster - @protoc \ - -I${PROTO_DIR} \ - --go_out=${PROTO_DIR}/${OUTPUT_DIR}/poster --go_opt=paths=source_relative \ - --go-grpc_out=${PROTO_DIR}/${OUTPUT_DIR}/poster --go-grpc_opt=paths=source_relative \ - ${PROTO_DIR}/poster.proto + docker build -t flight-poster -f docker/poster.Dockerfile . diff --git a/cmd/flight-poster/main.go b/cmd/poster/main.go similarity index 100% rename from cmd/flight-poster/main.go rename to cmd/poster/main.go diff --git a/cmd/flight-processor/main.go b/cmd/processor/main.go similarity index 100% rename from cmd/flight-processor/main.go rename to cmd/processor/main.go diff --git a/cmd/flight-reader/README.md b/cmd/reader/README.md similarity index 100% rename from cmd/flight-reader/README.md rename to cmd/reader/README.md diff --git a/cmd/flight-reader/main.go b/cmd/reader/main.go similarity index 100% rename from cmd/flight-reader/main.go rename to cmd/reader/main.go diff --git a/compose.yaml b/compose.yaml new file mode 100644 index 0000000..71250f9 --- /dev/null +++ b/compose.yaml @@ -0,0 +1,67 @@ + +services: + zookeeper: + image: confluentinc/cp-zookeeper:latest + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + ports: + - "9093:9093" + - "9092:9092" # For internal use + - "9101:9101" # JMX port for monitoring + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,HOST://0.0.0.0:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,HOST://localhost:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + + kafka-init: + image: confluentinc/cp-kafka:latest + depends_on: + - kafka + entrypoint: ["/bin/bash", "-c"] + command: > + " + echo 'Waiting for Kafka to be ready...' && + cub kafka-ready -b kafka:9092 1 20 && + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic reader.flights --partitions 1 --replication-factor 1 && + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic processor.statistic --partitions 1 --replication-factor 1 + " + + reader: + env_file: + - .env + build: + context: . + dockerfile: ./docker/reader.Dockerfile + ports: + - 8080:8080 + restart: on-failure:5 + + processor: + env_file: + - .env + build: + context: . + dockerfile: ./docker/processor.Dockerfile + restart: on-failure:5 + + poster: + env_file: + - .env + build: + context: . + dockerfile: ./docker/poster.Dockerfile + restart: on-failure:5 diff --git a/configs/reader-config.yaml b/configs/reader-config.yaml index ca9368f..c127898 100644 --- a/configs/reader-config.yaml +++ b/configs/reader-config.yaml @@ -1,8 +1,8 @@ http_server: - port: 9099 - timeout: 10 + port: 8080 + timeout: 75 http_client: - timeout: 10 + timeout: 70 flight_api: url: '' user: '' diff --git a/docker/poster.Dockerfile b/docker/poster.Dockerfile new file mode 100644 index 0000000..82e0c73 --- /dev/null +++ b/docker/poster.Dockerfile @@ -0,0 +1,26 @@ +FROM golang:1.24-alpine AS builder + +ENV CGO_ENABLED=0 +ENV GOOS=linux +ENV GOARCH=amd64 + +WORKDIR /app + +COPY go.mod go.sum ./ + +RUN go mod download +RUN go mod verify + +COPY pkg ./pkg +COPY cmd/poster ./cmd/poster +COPY internal/poster ./internal/poster +COPY configs/poster-config.yaml ./configs/poster-config.yaml + +RUN go build -ldflags="-w -s" -o /app/bin/poster ./cmd/poster + +FROM alpine:latest + +WORKDIR /app +COPY --from=builder /app/bin/poster . +COPY --from=builder app/configs ./configs +CMD ["./poster"] diff --git a/docker/processor.Dockerfile b/docker/processor.Dockerfile new file mode 100644 index 0000000..2919034 --- /dev/null +++ b/docker/processor.Dockerfile @@ -0,0 +1,26 @@ +FROM golang:1.24-alpine AS builder + +ENV CGO_ENABLED=0 +ENV GOOS=linux +ENV GOARCH=amd64 + +WORKDIR /app + +COPY go.mod go.sum ./ + +RUN go mod download +RUN go mod verify + +COPY pkg ./pkg +COPY cmd/processor ./cmd/processor +COPY internal/processor ./internal/processor +COPY configs/processor-config.yaml ./configs/processor-config.yaml + +RUN go build -ldflags="-w -s" -o /app/bin/processor ./cmd/processor + +FROM alpine:latest + +WORKDIR /app +COPY --from=builder /app/bin/processor . +COPY --from=builder app/configs ./configs +CMD ["./processor"] diff --git a/docker/reader.Dockerfile b/docker/reader.Dockerfile new file mode 100644 index 0000000..c39cf5c --- /dev/null +++ b/docker/reader.Dockerfile @@ -0,0 +1,26 @@ +FROM golang:1.24-alpine AS builder + +ENV CGO_ENABLED=0 +ENV GOOS=linux +ENV GOARCH=amd64 + +WORKDIR /app + +COPY go.mod go.sum ./ + +RUN go mod download +RUN go mod verify + +COPY pkg ./pkg +COPY cmd/reader ./cmd/reader +COPY internal/reader ./internal/reader +COPY configs/reader-config.yaml ./configs/reader-config.yaml + +RUN go build -ldflags="-w -s" -o /app/bin/reader ./cmd/reader + +FROM alpine:latest + +WORKDIR /app +COPY --from=builder /app/bin/reader . +COPY --from=builder app/configs ./configs +CMD ["./reader"] diff --git a/internal/poster/config/env.go b/internal/poster/config/env.go index 90ad216..4795dde 100644 --- a/internal/poster/config/env.go +++ b/internal/poster/config/env.go @@ -41,8 +41,9 @@ type TwitterAPIConfig struct { func LoadConfig() (*FlightPosterConfig, error) { viper.SetConfigName("poster-config") viper.SetConfigType("yaml") - viper.AddConfigPath("../../configs") viper.AddConfigPath("../../../configs") + viper.AddConfigPath("../../configs") + viper.AddConfigPath("./configs") viper.AutomaticEnv() viper.SetEnvPrefix("FLIGHT_POSTER") viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_", "-", "_")) diff --git a/internal/processor/config/env.go b/internal/processor/config/env.go index 1cf6f17..5dbdcd1 100644 --- a/internal/processor/config/env.go +++ b/internal/processor/config/env.go @@ -29,8 +29,9 @@ type SummarizerConfig struct { func LoadConfig() (*FlightProcessorConfig, error) { viper.SetConfigName("processor-config") viper.SetConfigType("yaml") - viper.AddConfigPath("../../configs") viper.AddConfigPath("../../../configs") + viper.AddConfigPath("../../configs") + viper.AddConfigPath("./configs") viper.AutomaticEnv() viper.SetEnvPrefix("FLIGHT_PROCESSOR") viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_", "-", "_")) diff --git a/internal/processor/service/processor.go b/internal/processor/service/processor.go index 6045e26..926140a 100644 --- a/internal/processor/service/processor.go +++ b/internal/processor/service/processor.go @@ -85,7 +85,9 @@ processingLoop: airport = string(msg.Value) slog.Info("Started processing stream for airport", "airport", airport) case "end_of_stream": + date := string(msg.Value) + slog.Info("Ended processing stream for airport", "date", date) summary, err := p.summarizer.SummarizeFlights(flights, date, airport) if err != nil { diff --git a/internal/processor/service/summarizer.go b/internal/processor/service/summarizer.go index 744af6e..1c3288e 100644 --- a/internal/processor/service/summarizer.go +++ b/internal/processor/service/summarizer.go @@ -90,7 +90,7 @@ func topNKeysByValue(m map[string]int, n int) []string { for _, value := range buckets[i] { result = append(result, value) if len(result) == n { - break + return result } } } diff --git a/internal/reader/config/env.go b/internal/reader/config/env.go index 1b2982d..97e5685 100644 --- a/internal/reader/config/env.go +++ b/internal/reader/config/env.go @@ -40,8 +40,9 @@ type RouteAPIConfig struct { func LoadConfig() (*FlightReaderConfig, error) { viper.SetConfigName("reader-config") viper.SetConfigType("yaml") - viper.AddConfigPath("../../configs") viper.AddConfigPath("../../../configs") + viper.AddConfigPath("../../configs") + viper.AddConfigPath("./configs") viper.AutomaticEnv() viper.SetEnvPrefix("FLIGHT_READER") viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_", "-", "_")) diff --git a/internal/reader/config/env_test.go b/internal/reader/config/env_test.go index 288738e..82728e5 100644 --- a/internal/reader/config/env_test.go +++ b/internal/reader/config/env_test.go @@ -19,9 +19,9 @@ func TestLoadConfig_ValidConfigFile_ShouldSucceed(t *testing.T) { cfg, err := config.LoadConfig() require.NoError(t, err) require.NotNil(t, cfg) - require.Equal(t, "9099", cfg.HTTPServerConfig.Port) - require.Equal(t, 10, cfg.HTTPServerConfig.Timeout) - require.Equal(t, 10, cfg.HTTPClientConfig.Timeout) + require.Equal(t, "8080", cfg.HTTPServerConfig.Port) + require.Equal(t, 75, cfg.HTTPServerConfig.Timeout) + require.Equal(t, 70, cfg.HTTPClientConfig.Timeout) require.Equal(t, "test", cfg.FlightAPIClientConfig.URL) require.Equal(t, "test", cfg.FlightAPIClientConfig.User) require.Equal(t, "test", cfg.FlightAPIClientConfig.Pass) @@ -92,9 +92,9 @@ func TestLoadConfig_EnvOverride_ShouldSucceed(t *testing.T) { cfg, err := config.LoadConfig() require.NoError(t, err) require.NotNil(t, cfg) - require.Equal(t, "9099", cfg.HTTPServerConfig.Port) - require.Equal(t, 10, cfg.HTTPServerConfig.Timeout) - require.Equal(t, 10, cfg.HTTPClientConfig.Timeout) + require.Equal(t, "8080", cfg.HTTPServerConfig.Port) + require.Equal(t, 75, cfg.HTTPServerConfig.Timeout) + require.Equal(t, 70, cfg.HTTPClientConfig.Timeout) require.Equal(t, "test", cfg.FlightAPIClientConfig.URL) require.Equal(t, "test", cfg.FlightAPIClientConfig.User) require.Equal(t, "test", cfg.FlightAPIClientConfig.Pass) diff --git a/internal/reader/service/reader.go b/internal/reader/service/reader.go index dee4a2b..f0ea3f8 100644 --- a/internal/reader/service/reader.go +++ b/internal/reader/service/reader.go @@ -97,6 +97,8 @@ func (r *Reader) processFlights( return fmt.Errorf("failed to process flights: %w", err) } + slog.Info("Fetched flights successfully", "airport", airport, "flights_count", len(flights)) + if err := r.processRoute(ctx, flights, airport, date); err != nil { return fmt.Errorf("failed to process routes: %w", err) } @@ -189,12 +191,7 @@ func (r *Reader) sendFlightAndRouteMessage( } func (r *Reader) sendStreamControlMessage(ctx context.Context, key, message string) error { - value, err := json.Marshal(map[string]string{"message": message}) - if err != nil { - return fmt.Errorf("failed to marshal %s message: %w", key, err) - } - - if err := r.messageWriter.WriteMessage(ctx, []byte(key), value); err != nil { + if err := r.messageWriter.WriteMessage(ctx, []byte(key), []byte(message)); err != nil { return fmt.Errorf("failed to write %s message to the message queue: %w", key, err) } diff --git a/pkg/http/server_test.go b/pkg/http/server_test.go index 9397611..c3bb51d 100644 --- a/pkg/http/server_test.go +++ b/pkg/http/server_test.go @@ -111,7 +111,9 @@ func TestServe_ServerError_ShouldError(t *testing.T) { server, err := server.NewServer(cfg, http.HandlerFunc(testHandler)) require.NoError(t, err) - err = server.Serve(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + defer cancel() + err = server.Serve(ctx) require.ErrorContains(t, err, "failed to start HTTP server") } diff --git a/pkg/kafka/reader.go b/pkg/kafka/reader.go index f6d541b..793ac24 100644 --- a/pkg/kafka/reader.go +++ b/pkg/kafka/reader.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "strings" "time" "github.com/twmb/franz-go/pkg/kgo" @@ -59,8 +60,9 @@ func NewKafkaReader(cfg ReaderConfig) (*Reader, error) { return nil, fmt.Errorf("kafka group ID is empty") } + addresses := strings.Split(cfg.Address, ",") opts := []kgo.Opt{ - kgo.SeedBrokers([]string{cfg.Address}...), + kgo.SeedBrokers(addresses...), kgo.ConsumerGroup(cfg.GroupID), kgo.ConsumeTopics(cfg.Topic), } diff --git a/pkg/kafka/writer.go b/pkg/kafka/writer.go index f76968d..e377957 100644 --- a/pkg/kafka/writer.go +++ b/pkg/kafka/writer.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "strings" "github.com/twmb/franz-go/pkg/kgo" ) @@ -43,8 +44,9 @@ func NewKafkaWriter(cfg WriterConfig) (*Writer, error) { return nil, fmt.Errorf("kafka topic is empty") } + addresses := strings.Split(cfg.Address, ",") opts := []kgo.Opt{ - kgo.SeedBrokers([]string{cfg.Address}...), + kgo.SeedBrokers(addresses...), kgo.DefaultProduceTopic(cfg.Topic), } client, err := kgo.NewClient(opts...)