From 942e1132a4d6ff619340736254558130603ad0a1 Mon Sep 17 00:00:00 2001 From: Anson Chung Date: Tue, 13 May 2025 00:33:25 -0700 Subject: [PATCH] [REFACTOR] Move Poster to new folder structure --- cmd/flight-poster/config.yaml | 13 - cmd/flight-poster/internal/client/http.go | 17 -- cmd/flight-poster/internal/config/env.go | 67 ----- cmd/flight-poster/internal/config/env_test.go | 74 ----- cmd/flight-poster/internal/model/threads.go | 11 - cmd/flight-poster/internal/poster/poster.go | 15 -- cmd/flight-poster/internal/poster/twitter.go | 48 ---- cmd/flight-poster/internal/server/grpc.go | 104 ------- cmd/flight-poster/main.go | 122 ++++++--- cmd/flight-processor/main.go | 5 +- configs/poster-config.yaml | 21 ++ go.mod | 6 +- go.sum | 4 - .../poster/client}/instagram.go | 2 +- internal/poster/client/socials.go | 18 ++ .../poster/client}/threads.go | 122 +++++---- internal/poster/client/threads_test.go | 178 ++++++++++++ internal/poster/client/twitter.go | 61 +++++ internal/poster/client/twitter_test.go | 90 +++++++ internal/poster/config/env.go | 60 +++++ internal/poster/config/env_test.go | 77 ++++++ internal/poster/model/threads.go | 16 ++ internal/poster/service/poster.go | 102 +++++++ internal/poster/service/poster_test.go | 199 ++++++++++++++ internal/processor/model/summary.go | 24 -- internal/processor/service/processor.go | 6 +- internal/processor/service/processor_test.go | 27 +- internal/processor/service/summarizer.go | 9 +- internal/processor/service/summarizer_test.go | 11 +- internal/reader/service/reader.go | 39 ++- internal/reader/service/reader_test.go | 16 +- .../test/mock/mock_flight_summary_repo.go | 21 +- internal/test/mock/mock_socials.go | 55 ++++ internal/test/mock/mock_summarizer.go | 5 +- pkg/http/server_test.go | 4 +- pkg/model/summary.go | 73 +++++ .../repository/flight_summary.go | 26 +- .../repository/flight_summary_test.go | 16 +- proto/poster.proto | 21 -- proto/src/poster/poster.pb.go | 255 ------------------ proto/src/poster/poster_grpc.pb.go | 121 --------- 41 files changed, 1232 insertions(+), 929 deletions(-) delete mode 100644 cmd/flight-poster/config.yaml delete mode 100644 cmd/flight-poster/internal/client/http.go delete mode 100644 cmd/flight-poster/internal/config/env.go delete mode 100644 cmd/flight-poster/internal/config/env_test.go delete mode 100644 cmd/flight-poster/internal/model/threads.go delete mode 100644 cmd/flight-poster/internal/poster/poster.go delete mode 100644 cmd/flight-poster/internal/poster/twitter.go delete mode 100644 cmd/flight-poster/internal/server/grpc.go create mode 100644 configs/poster-config.yaml rename {cmd/flight-poster/internal/poster => internal/poster/client}/instagram.go (92%) create mode 100644 internal/poster/client/socials.go rename {cmd/flight-poster/internal/poster => internal/poster/client}/threads.go (56%) create mode 100644 internal/poster/client/threads_test.go create mode 100644 internal/poster/client/twitter.go create mode 100644 internal/poster/client/twitter_test.go create mode 100644 internal/poster/config/env.go create mode 100644 internal/poster/config/env_test.go create mode 100644 internal/poster/model/threads.go create mode 100644 internal/poster/service/poster.go create mode 100644 internal/poster/service/poster_test.go delete mode 100644 internal/processor/model/summary.go create mode 100644 internal/test/mock/mock_socials.go create mode 100644 pkg/model/summary.go rename {internal/processor => pkg}/repository/flight_summary.go (67%) rename {internal/processor => pkg}/repository/flight_summary_test.go (85%) delete mode 100644 proto/poster.proto delete mode 100644 proto/src/poster/poster.pb.go delete mode 100644 proto/src/poster/poster_grpc.pb.go diff --git a/cmd/flight-poster/config.yaml b/cmd/flight-poster/config.yaml deleted file mode 100644 index df1b197..0000000 --- a/cmd/flight-poster/config.yaml +++ /dev/null @@ -1,13 +0,0 @@ -grpc-server: - port: 9097 -http-client: - timeout: 10 -threads: - url: https://graph.threads.net - access-token: '' -twitter: - access-token-key: '' - access-token-secret: '' -logger: - json: true - level: 'info' diff --git a/cmd/flight-poster/internal/client/http.go b/cmd/flight-poster/internal/client/http.go deleted file mode 100644 index d3322b3..0000000 --- a/cmd/flight-poster/internal/client/http.go +++ /dev/null @@ -1,17 +0,0 @@ -package client - -import ( - "log/slog" - "net/http" - "time" - - "github.com/ansoncht/flight-microservices/cmd/flight-poster/internal/config" -) - -func NewHTTP(cfg config.HTTPClientConfig) (*http.Client, error) { - slog.Info("Creating HTTP client for the service") - - return &http.Client{ - Timeout: time.Duration(cfg.Timeout) * time.Second, - }, nil -} diff --git a/cmd/flight-poster/internal/config/env.go b/cmd/flight-poster/internal/config/env.go deleted file mode 100644 index 0aca061..0000000 --- a/cmd/flight-poster/internal/config/env.go +++ /dev/null @@ -1,67 +0,0 @@ -package config - -import ( - "fmt" - "strings" - - "github.com/ansoncht/flight-microservices/pkg/logger" - "github.com/spf13/viper" -) - -// FlightPosterConfig holds all configurations related to flight poster. -type FlightPosterConfig struct { - GrpcServerConfig GrpcServerConfig `mapstructure:"grpc-server"` - HTTPClientConfig HTTPClientConfig `mapstructure:"http-client"` - ThreadsClientConfig ThreadsClientConfig `mapstructure:"threads"` - TwitterClientConfig TwitterClientConfig `mapstructure:"twitter"` - LoggerConfig logger.Config `mapstructure:"logger"` -} - -// GrpcServerConfig holds configuration settings for the gRPC server. -type GrpcServerConfig struct { - // Port specifies the port where the gRPC server listens for connections. - Port string `mapstructure:"port"` -} - -// HTTPClientConfig holds configuration settings for the HTTP client. -type HTTPClientConfig struct { - // Timeout specifies the timeout for reading HTTP headers in seconds. - Timeout int `mapstructure:"timeout"` -} - -// ThreadsClientConfig holds configuration settings for the Threads client. -type ThreadsClientConfig struct { - // URL specifies the base URL for the Threads API. - URL string `mapstructure:"url"` - // Token specifies the access token for authentication with the Threads API. - Token string `mapstructure:"access-token"` -} - -// TwitterClientConfig holds configuration settings for the Twitter client. -type TwitterClientConfig struct { - // Key specifies the API key for Twitter authentication. - Key string `mapstructure:"access-token-key"` - // Secret specifies the API secret key for Twitter authentication. - Secret string `mapstructure:"access-token-secret"` -} - -// LoadConfig loads configuration from environment variables and a YAML file. -func LoadConfig() (*FlightPosterConfig, error) { - viper.SetConfigName("config") - viper.SetConfigType("yaml") - viper.AddConfigPath("../../") - viper.AutomaticEnv() - viper.SetEnvPrefix("FLIGHT_POSTER") - viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_", "-", "_")) - - if err := viper.ReadInConfig(); err != nil { - return nil, fmt.Errorf("failed to read config file: %w", err) - } - - var cfg FlightPosterConfig - if err := viper.Unmarshal(&cfg); err != nil { - return nil, fmt.Errorf("failed to unmarshal config: %w", err) - } - - return &cfg, nil -} diff --git a/cmd/flight-poster/internal/config/env_test.go b/cmd/flight-poster/internal/config/env_test.go deleted file mode 100644 index cb37a13..0000000 --- a/cmd/flight-poster/internal/config/env_test.go +++ /dev/null @@ -1,74 +0,0 @@ -package config_test - -import ( - "os" - "testing" - - "github.com/ansoncht/flight-microservices/cmd/flight-poster/internal/config" - "github.com/stretchr/testify/require" -) - -func TestLoadConfig_ValidConfigFile_ShouldSucceed(t *testing.T) { - t.Run("Valid Config File", func(t *testing.T) { - os.Setenv("GOTWI_API_KEY", "test") - os.Setenv("GOTWI_API_KEY_SECRET", "test") - os.Setenv("FLIGHT_POSTER_THREADS_ACCESS_TOKEN", "test") - os.Setenv("FLIGHT_POSTER_TWITTER_ACCESS_TOKEN_KEY", "test") - os.Setenv("FLIGHT_POSTER_TWITTER_ACCESS_TOKEN_SECRET", "test") - - cfg, err := config.LoadConfig() - - require.NoError(t, err) - require.NotNil(t, cfg) - require.Equal(t, "9097", cfg.GrpcServerConfig.Port) - require.Equal(t, 10, cfg.HTTPClientConfig.Timeout) - require.Equal(t, "https://graph.threads.net", cfg.ThreadsClientConfig.URL) - require.Equal(t, "test", cfg.ThreadsClientConfig.Token) - require.Equal(t, "test", cfg.TwitterClientConfig.Key) - require.Equal(t, "test", cfg.TwitterClientConfig.Secret) - require.True(t, cfg.LoggerConfig.JSON) - require.Equal(t, "info", cfg.LoggerConfig.Level) - }) -} - -func TestLoadConfig_MissingFile_ShouldError(t *testing.T) { - t.Run("Missing Config File", func(t *testing.T) { - // Temporarily rename the config file if it exists - originalPath := "../../config.yaml" - tempPath := "../../config.yaml.bak" - - // Restore the file after the test - if _, err := os.Stat(originalPath); err == nil { - err := os.Rename(originalPath, tempPath) - require.NoError(t, err) - defer func() { - err := os.Rename(tempPath, originalPath) - require.NoError(t, err, "failed to restore config file") - }() - } - - cfg, err := config.LoadConfig() - require.Error(t, err) - require.Nil(t, cfg) - }) -} - -func TestLoadConfig_EnvOverride_ShouldSucceed(t *testing.T) { - t.Run("Override Config File", func(t *testing.T) { - t.Setenv("FLIGHT_POSTER_LOGGER_LEVEL", "debug") - t.Setenv("FLIGHT_POSTER_LOGGER_JSON", "false") - - cfg, err := config.LoadConfig() - - require.NoError(t, err) - require.NotNil(t, cfg) - require.Equal(t, "9097", cfg.GrpcServerConfig.Port) - require.Equal(t, 10, cfg.HTTPClientConfig.Timeout) - require.Equal(t, "https://graph.threads.net", cfg.ThreadsClientConfig.URL) - require.Equal(t, "test", cfg.ThreadsClientConfig.Token) - require.Equal(t, "test", cfg.TwitterClientConfig.Key) - require.Equal(t, "test", cfg.TwitterClientConfig.Secret) - require.False(t, cfg.LoggerConfig.JSON) - require.Equal(t, "debug", cfg.LoggerConfig.Level) - }) -} diff --git a/cmd/flight-poster/internal/model/threads.go b/cmd/flight-poster/internal/model/threads.go deleted file mode 100644 index 4b0c3ed..0000000 --- a/cmd/flight-poster/internal/model/threads.go +++ /dev/null @@ -1,11 +0,0 @@ -package model - -// ThreadsUserResponse represents the structure of user data returned by the Threads API. -type ThreadsUserResponse struct { - ID string `json:"id"` // User ID for the Threads account -} - -// ThreadsPostCreationResponse represents the structure of post data returned by the Threads API. -type ThreadsPostCreationResponse struct { - ID string `json:"id"` // Container ID for the Threads post -} diff --git a/cmd/flight-poster/internal/poster/poster.go b/cmd/flight-poster/internal/poster/poster.go deleted file mode 100644 index f62bb22..0000000 --- a/cmd/flight-poster/internal/poster/poster.go +++ /dev/null @@ -1,15 +0,0 @@ -package poster - -import ( - "context" - "time" -) - -type Poster interface { - PublishPost(ctx context.Context, content string) (bool, error) -} - -type token struct { - accessToken string // Actual access token for the user - expiration time.Time // Expiration time of the token -} diff --git a/cmd/flight-poster/internal/poster/twitter.go b/cmd/flight-poster/internal/poster/twitter.go deleted file mode 100644 index ec7a26d..0000000 --- a/cmd/flight-poster/internal/poster/twitter.go +++ /dev/null @@ -1,48 +0,0 @@ -package poster - -import ( - "context" - "fmt" - "log/slog" - - "github.com/ansoncht/flight-microservices/cmd/flight-poster/internal/config" - "github.com/michimani/gotwi" - "github.com/michimani/gotwi/tweet/managetweet" - "github.com/michimani/gotwi/tweet/managetweet/types" -) - -type TwitterClient struct { - client *gotwi.Client -} - -func NewTwitterClient(cfg config.TwitterClientConfig) (*TwitterClient, error) { - slog.Info("Creating Twitter client for the service") - - opts := &gotwi.NewClientInput{ - AuthenticationMethod: gotwi.AuthenMethodOAuth1UserContext, - OAuthToken: cfg.Key, - OAuthTokenSecret: cfg.Secret, - } - - client, err := gotwi.NewClient(opts) - if err != nil { - return nil, fmt.Errorf("failed to initialize api wrapper client: %w", err) - } - - return &TwitterClient{ - client: client, - }, nil -} - -func (t *TwitterClient) PublishPost(ctx context.Context, content string) (bool, error) { - in := &types.CreateInput{ - Text: gotwi.String(content), - } - - res, err := managetweet.Create(ctx, t.client, in) - if err != nil || res.Data.ID == nil || *res.Data.ID == "" { - return false, fmt.Errorf("failed to post Twitter post: %w", err) - } - - return true, nil -} diff --git a/cmd/flight-poster/internal/server/grpc.go b/cmd/flight-poster/internal/server/grpc.go deleted file mode 100644 index 14c12d5..0000000 --- a/cmd/flight-poster/internal/server/grpc.go +++ /dev/null @@ -1,104 +0,0 @@ -package server - -import ( - "context" - "fmt" - "log/slog" - "net" - "strconv" - - "github.com/ansoncht/flight-microservices/cmd/flight-poster/internal/config" - "github.com/ansoncht/flight-microservices/cmd/flight-poster/internal/poster" - pb "github.com/ansoncht/flight-microservices/proto/src/poster" - "google.golang.org/grpc" - "google.golang.org/grpc/reflection" -) - -// GrpcServer represents the gRPC server structure. -type GrpcServer struct { - pb.UnimplementedPosterServer - server *grpc.Server // gRPC server instance - lis net.Listener // Network listener for incoming connections - posters []poster.Poster // Posters for social media -} - -// NewGRPC creates a new gRPC server instance. -func NewGRPC(cfg config.GrpcServerConfig, posters []poster.Poster) (*GrpcServer, error) { - slog.Info("Creating gRPC server for the service") - - // Validate the port number - if cfg.Port == "" { - return nil, fmt.Errorf("empty port number") - } - - port, _ := strconv.Atoi(cfg.Port) - if port < 1 { - return nil, fmt.Errorf("port number must be greater than 1") - } - - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) - if err != nil { - return nil, fmt.Errorf("failed to listen: %w", err) - } - - s := grpc.NewServer() - grpcServer := &GrpcServer{ - server: s, - lis: lis, - posters: posters, - } - - pb.RegisterPosterServer(s, grpcServer) - reflection.Register(s) - - return grpcServer, nil -} - -// ServeGRPC starts the gRPC server and handles incoming requests. -func (g *GrpcServer) ServeGRPC(ctx context.Context) error { - slog.Info("Starting gRPC server", "port", g.lis.Addr().String()) - - c := make(chan error) - - // Start the server in a goroutine - go func() { - if err := g.server.Serve(g.lis); err != nil { - slog.Error("Failed to start gRPC server", "error", err) - c <- fmt.Errorf("failed to start gRPC server: %w", err) - } - }() - - select { - case <-ctx.Done(): - slog.Info("Stopping gRPC server due to context cancellation") - return nil - case err := <-c: - return err - } -} - -// Close gracefully shuts down the gRPC server. -func (g *GrpcServer) Close() { - g.server.GracefulStop() -} - -func (g *GrpcServer) SendSummary(ctx context.Context, req *pb.SendSummaryRequest) (*pb.SendSummaryResponse, error) { - slog.Info("Receiving flight summary from Flight Processor, posting to social media") - - // Create a message for social media - message := fmt.Sprintf("✈️ Flight Summary for %s:\n", req.Date) - - // Iterate over the flight statistics to append all destinations - for _, flightStat := range req.FlightStats { - message += fmt.Sprintf("🌍 %s: %d\t\t", flightStat.Destination, flightStat.Frequency) - } - - for _, poster := range g.posters { - _, err := poster.PublishPost(ctx, message) - if err != nil { - return nil, fmt.Errorf("failed to post: %w", err) - } - } - - return &pb.SendSummaryResponse{}, nil -} diff --git a/cmd/flight-poster/main.go b/cmd/flight-poster/main.go index 2a9789a..0cf2ddf 100644 --- a/cmd/flight-poster/main.go +++ b/cmd/flight-poster/main.go @@ -9,10 +9,14 @@ import ( "os/signal" "syscall" - "github.com/ansoncht/flight-microservices/cmd/flight-poster/internal/client" - "github.com/ansoncht/flight-microservices/cmd/flight-poster/internal/config" - "github.com/ansoncht/flight-microservices/cmd/flight-poster/internal/poster" - "github.com/ansoncht/flight-microservices/cmd/flight-poster/internal/server" + "github.com/ansoncht/flight-microservices/internal/poster/client" + "github.com/ansoncht/flight-microservices/internal/poster/service" + appHTTP "github.com/ansoncht/flight-microservices/pkg/http" + "github.com/ansoncht/flight-microservices/pkg/kafka" + "github.com/ansoncht/flight-microservices/pkg/mongo" + "github.com/ansoncht/flight-microservices/pkg/repository" + + "github.com/ansoncht/flight-microservices/internal/poster/config" "github.com/ansoncht/flight-microservices/pkg/logger" "golang.org/x/sync/errgroup" ) @@ -36,63 +40,115 @@ func main() { slog.SetDefault(&logger) - // Create HTTP clients - httpClient, err := client.NewHTTP(cfg.HTTPClientConfig) + httpClient, err := appHTTP.NewClient(cfg.HTTPClientConfig) if err != nil { slog.Error("Failed to create HTTP client", "error", err) return } - // Create posters for different social media - posters, err := initializePosters(ctx, cfg.ThreadsClientConfig, cfg.TwitterClientConfig, httpClient) + mongoDB, err := mongo.NewMongoClient(ctx, cfg.MongoClientConfig) if err != nil { - slog.Error("Failed to create posters", "error", err) + slog.Error("Failed to create MongoDB client", "error", err) return } - // Create a gRPC server - grpcServer, err := server.NewGRPC(cfg.GrpcServerConfig, posters) + repo, err := repository.NewMongoSummaryRepository(mongoDB) if err != nil { - slog.Error("Failed to create gRPC server", "error", err) + slog.Error("Failed to create summary repository", "error", err) return } - g, gCtx := errgroup.WithContext(ctx) - - g.Go(func() error { - return grpcServer.ServeGRPC(gCtx) - }) - - <-gCtx.Done() + // Create posters for different social media + poster, err := initializePoster( + ctx, + cfg.ThreadsClientConfig, + cfg.TwitterClientConfig, + cfg.KafkaReaderConfig, + httpClient, + repo, + ) + if err != nil { + slog.Error("Failed to initialize poster service", "error", err) + return + } - if err := g.Wait(); err != nil { - slog.Error("Encounter unexpected error", "error", err) + // Run the poster in background + if err := startBackgroundJobs(ctx, poster); err != nil { + slog.Error("Failed to run background jobs concurrently", "error", err) return } - grpcServer.Close() + // Perform a safe shutdown + if err := safeShutDown(ctx, poster, mongoDB); err != nil { + slog.Error("Failed to perform graceful shutdown", "error", err) + return + } slog.Info("Flight Poster service has fully stopped") } -func initializePosters( +func initializePoster( ctx context.Context, - threadsCfg config.ThreadsClientConfig, - twittercfg config.TwitterClientConfig, + threadsCfg config.ThreadsAPIConfig, + twittercfg config.TwitterAPIConfig, + kafkaReaderCfg kafka.ReaderConfig, httpClient *http.Client, -) ([]poster.Poster, error) { + repo repository.SummaryRepository, +) (*service.Poster, error) { // Create posters for different social media - threads, err := poster.NewThreadsClient(ctx, threadsCfg, httpClient) + threads, err := client.NewThreadsAPI(ctx, threadsCfg, httpClient) if err != nil { - slog.Error("Failed to create Threads poster", "error", err) - return nil, fmt.Errorf("failed to create Threads poster: %w", err) + return nil, fmt.Errorf("failed to create Threads client: %w", err) } - twitter, err := poster.NewTwitterClient(twittercfg) + twitter, err := client.NewTwitterAPI(twittercfg) if err != nil { - slog.Error("Failed to create Twitter poster", "error", err) - return nil, fmt.Errorf("failed to create Twitter poster: %w", err) + return nil, fmt.Errorf("failed to create Twitter client: %w", err) } - return []poster.Poster{threads, twitter}, nil + kafkaReader, err := kafka.NewKafkaReader(kafkaReaderCfg) + if err != nil { + return nil, fmt.Errorf("failed to create kafka reader: %w", err) + } + + clients := []client.Socials{threads, twitter} + poster, err := service.NewPoster(clients, kafkaReader, repo) + if err != nil { + return nil, fmt.Errorf("failed to create poster service: %w", err) + } + + return poster, nil +} + +// startBackgroundJobs starts the processor service in background. +func startBackgroundJobs(ctx context.Context, poster *service.Poster) error { + // Use errgroup to manage concurrent tasks + g, gCtx := errgroup.WithContext(ctx) + + // Start the processor service + g.Go(func() error { + return poster.Post(gCtx) + }) + + // Wait for the context to be done (e.g., due to an interrupt signal) + <-gCtx.Done() + + // Wait for all goroutines to finish + if err := g.Wait(); err != nil { + return fmt.Errorf("failed to run poster: %w", err) + } + + return nil +} + +// safeShutDown shut down MongoDB client and kafka reader gracefully. +func safeShutDown(ctx context.Context, poster *service.Poster, mongodb *mongo.Client) error { + if err := mongodb.Client.Disconnect(ctx); err != nil { + slog.Error("Failed to shutdown MongoDB client", "error", err) + return fmt.Errorf("failed to shutdown mongodb client: %w", err) + } + + poster.Close() + + return nil } diff --git a/cmd/flight-processor/main.go b/cmd/flight-processor/main.go index 5381003..f6f7460 100644 --- a/cmd/flight-processor/main.go +++ b/cmd/flight-processor/main.go @@ -9,12 +9,11 @@ import ( "syscall" "github.com/ansoncht/flight-microservices/internal/processor/config" - "github.com/ansoncht/flight-microservices/internal/processor/repository" "github.com/ansoncht/flight-microservices/internal/processor/service" - "github.com/ansoncht/flight-microservices/pkg/kafka" "github.com/ansoncht/flight-microservices/pkg/logger" "github.com/ansoncht/flight-microservices/pkg/mongo" + "github.com/ansoncht/flight-microservices/pkg/repository" "golang.org/x/sync/errgroup" ) @@ -129,8 +128,6 @@ func startBackgroundJobs(ctx context.Context, processor *service.Processor) erro // safeShutDown shut down MongoDB client and kafka reader gracefully. func safeShutDown(ctx context.Context, processor *service.Processor, mongodb *mongo.Client) error { - slog.Info("Shutting down components") - if err := mongodb.Client.Disconnect(ctx); err != nil { slog.Error("Failed to shutdown MongoDB client", "error", err) return fmt.Errorf("failed to shutdown mongodb client: %w", err) diff --git a/configs/poster-config.yaml b/configs/poster-config.yaml new file mode 100644 index 0000000..4bad113 --- /dev/null +++ b/configs/poster-config.yaml @@ -0,0 +1,21 @@ +http_client: + timeout: 10 +threads_api: + url: https://graph.threads.net + access_token: '' +twitter_api: + access_token_key: '' + access_token_secret: '' +mongo: + uri: '' + db: flights + pool_size: 5 + connection_timeout: 5 + socket_timeout: 5 +kafka_reader: + address: '' + topic: '' + group_id: '' +logger: + json: true + level: 'info' diff --git a/go.mod b/go.mod index c622939..36b7924 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.24.3 require ( golang.org/x/sync v0.14.0 - google.golang.org/protobuf v1.36.5 + google.golang.org/protobuf v1.36.5 // indirect ) require ( @@ -71,6 +71,7 @@ require ( go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect go.opentelemetry.io/otel v1.35.0 // indirect go.opentelemetry.io/otel/metric v1.35.0 // indirect + go.opentelemetry.io/otel/sdk v1.32.0 // indirect go.opentelemetry.io/otel/trace v1.35.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect @@ -80,7 +81,7 @@ require ( golang.org/x/net v0.39.0 // indirect golang.org/x/sys v0.33.0 // indirect golang.org/x/text v0.25.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect + google.golang.org/grpc v1.70.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) @@ -96,5 +97,4 @@ require ( github.com/twmb/franz-go/pkg/kadm v1.16.0 go.mongodb.org/mongo-driver v1.17.1 go.uber.org/mock v0.5.2 - google.golang.org/grpc v1.70.0 ) diff --git a/go.sum b/go.sum index 9ad98da..82ee591 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,6 @@ github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= -github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -211,8 +209,6 @@ go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/ go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= -go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU= -go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= diff --git a/cmd/flight-poster/internal/poster/instagram.go b/internal/poster/client/instagram.go similarity index 92% rename from cmd/flight-poster/internal/poster/instagram.go rename to internal/poster/client/instagram.go index d5fbc85..095925a 100644 --- a/cmd/flight-poster/internal/poster/instagram.go +++ b/internal/poster/client/instagram.go @@ -1,4 +1,4 @@ -package poster +package client import "log/slog" diff --git a/internal/poster/client/socials.go b/internal/poster/client/socials.go new file mode 100644 index 0000000..edc5b57 --- /dev/null +++ b/internal/poster/client/socials.go @@ -0,0 +1,18 @@ +package client + +import ( + "context" + "time" +) + +// Socials defines the interface for posting content to social media platforms. +type Socials interface { + // PublishPost publishes a post to the social media platform. + PublishPost(ctx context.Context, content string) error +} + +// token holds the access token and its expiration time. +type token struct { + accessToken string // Actual access token for the user + expiration time.Time // Expiration time of the token +} diff --git a/cmd/flight-poster/internal/poster/threads.go b/internal/poster/client/threads.go similarity index 56% rename from cmd/flight-poster/internal/poster/threads.go rename to internal/poster/client/threads.go index 13df635..7286947 100644 --- a/cmd/flight-poster/internal/poster/threads.go +++ b/internal/poster/client/threads.go @@ -1,4 +1,4 @@ -package poster +package client import ( "context" @@ -9,23 +9,35 @@ import ( "net/url" "time" - "github.com/ansoncht/flight-microservices/cmd/flight-poster/internal/config" - "github.com/ansoncht/flight-microservices/cmd/flight-poster/internal/model" + "github.com/ansoncht/flight-microservices/internal/poster/config" + "github.com/ansoncht/flight-microservices/internal/poster/model" ) -type ThreadsClient struct { - token token - user string - baseURL string - httpClient *http.Client +type Threads struct { + token token + user string + baseURL string + client *http.Client } -func NewThreadsClient( +func NewThreadsAPI( ctx context.Context, - cfg config.ThreadsClientConfig, - httpClient *http.Client, -) (*ThreadsClient, error) { - slog.Info("Creating Threads client for the service") + cfg config.ThreadsAPIConfig, + client *http.Client, +) (*Threads, error) { + slog.Info("Initializing Threads API client", "url", cfg.URL) + + if client == nil { + return nil, fmt.Errorf("http client is nil") + } + + if cfg.URL == "" { + return nil, fmt.Errorf("threads api url is empty") + } + + if cfg.Token == "" { + return nil, fmt.Errorf("threads api token is empty") + } // Assumes initial token expires after 60 days token := token{ @@ -33,38 +45,38 @@ func NewThreadsClient( expiration: time.Now().Add(60 * 24 * time.Hour), } - client := &ThreadsClient{ - baseURL: cfg.URL, - token: token, - httpClient: httpClient, + threads := &Threads{ + baseURL: cfg.URL, + token: token, + client: client, } - user, err := client.getUserID(ctx) + user, err := threads.getUserID(ctx) if err != nil { - return nil, fmt.Errorf("failed to get Threads user id: %w", err) + return nil, fmt.Errorf("failed to get user id: %w", err) } - client.user = user + threads.user = user - return client, nil + return threads, nil } -func (t *ThreadsClient) PublishPost(ctx context.Context, content string) (bool, error) { - postID, err := t.createPost(ctx, content, nil) +func (t *Threads) PublishPost(ctx context.Context, content string) error { + postID, err := t.createContainer(ctx, content, nil) if err != nil { - return false, fmt.Errorf("failed to post Threads post: %w", err) + return fmt.Errorf("failed to create Threads container: %w", err) } if t.needRefreshToken() { if err := t.refreshToken(ctx); err != nil { - return false, fmt.Errorf("failed to refresh Threads user token: %w", err) + return fmt.Errorf("failed to refresh Threads user token: %w", err) } } // Parse the base URL endpoint, err := url.Parse(t.baseURL) if err != nil { - return false, fmt.Errorf("failed to parse url: %w", err) + return fmt.Errorf("failed to parse url: %w", err) } // Add path segments @@ -79,40 +91,34 @@ func (t *ThreadsClient) PublishPost(ctx context.Context, content string) (bool, // Create a HTTP POST request req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint.String(), nil) if err != nil { - return false, fmt.Errorf("failed to create request for posting Threads post : %w", err) + return fmt.Errorf("failed to create request: %w", err) } - resp, err := t.httpClient.Do(req) + resp, err := t.client.Do(req) if err != nil { - return false, fmt.Errorf("failed to post Threads post: %w", err) + return fmt.Errorf("HTTP request failed: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return false, fmt.Errorf("unexpected status code: %d", resp.StatusCode) + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) } - var post model.ThreadsPostCreationResponse + var post model.ThreadsPostResponse if err := json.NewDecoder(resp.Body).Decode(&post); err != nil { - return false, fmt.Errorf("failed to parse container: %w", err) + return fmt.Errorf("failed to decode Threads post response: %w", err) } - return true, nil + return nil } -func (t *ThreadsClient) createPost(ctx context.Context, content string, media []string) (string, error) { +func (t *Threads) createContainer(ctx context.Context, content string, media []string) (string, error) { if content == "" { - return "", fmt.Errorf("failed to create Threads post: content is empty") - } - - if len(media) == 0 { - slog.Warn("Threads post contains no media") + return "", fmt.Errorf("content is empty") } - if t.needRefreshToken() { - if err := t.refreshToken(ctx); err != nil { - return "", fmt.Errorf("failed to refresh Threads user token: %w", err) - } + if len(media) != 0 { + return "", fmt.Errorf("media is not supported") } // Parse the base URL @@ -134,12 +140,12 @@ func (t *ThreadsClient) createPost(ctx context.Context, content string, media [] // Create a HTTP POST request req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint.String(), nil) if err != nil { - return "", fmt.Errorf("failed to create request for initializing Threads post: %w", err) + return "", fmt.Errorf("failed to create request: %w", err) } - resp, err := t.httpClient.Do(req) + resp, err := t.client.Do(req) if err != nil { - return "", fmt.Errorf("failed to create Threads post container: %w", err) + return "", fmt.Errorf("HTTP request failed: %w", err) } defer resp.Body.Close() @@ -147,15 +153,15 @@ func (t *ThreadsClient) createPost(ctx context.Context, content string, media [] return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode) } - var post model.ThreadsPostCreationResponse + var post model.ThreadsContainerResponse if err := json.NewDecoder(resp.Body).Decode(&post); err != nil { - return "", fmt.Errorf("failed to parse container: %w", err) + return "", fmt.Errorf("failed to decode Threads container response: %w", err) } return post.ID, nil } -func (t *ThreadsClient) getUserID(ctx context.Context) (string, error) { +func (t *Threads) getUserID(ctx context.Context) (string, error) { // Parse the base URL endpoint, err := url.Parse(t.baseURL) if err != nil { @@ -173,12 +179,12 @@ func (t *ThreadsClient) getUserID(ctx context.Context) (string, error) { // Create a HTTP GET request req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.String(), nil) if err != nil { - return "", fmt.Errorf("failed to create request for flight: %w", err) + return "", fmt.Errorf("failed to create request: %w", err) } - resp, err := t.httpClient.Do(req) + resp, err := t.client.Do(req) if err != nil { - return "", fmt.Errorf("failed to get user id: %w", err) + return "", fmt.Errorf("HTTP request failed: %w", err) } defer resp.Body.Close() @@ -188,13 +194,13 @@ func (t *ThreadsClient) getUserID(ctx context.Context) (string, error) { var user model.ThreadsUserResponse if err := json.NewDecoder(resp.Body).Decode(&user); err != nil { - return "", fmt.Errorf("failed to parse user: %w", err) + return "", fmt.Errorf("failed to decode Threads user response: %w", err) } return user.ID, nil } -func (t *ThreadsClient) refreshToken(ctx context.Context) error { +func (t *Threads) refreshToken(ctx context.Context) error { // Parse the base URL endpoint, err := url.Parse(t.baseURL) if err != nil { @@ -213,12 +219,12 @@ func (t *ThreadsClient) refreshToken(ctx context.Context) error { // Create a HTTP GET request req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.String(), nil) if err != nil { - return fmt.Errorf("failed to create request for refreshing token: %w", err) + return fmt.Errorf("failed to create request: %w", err) } - resp, err := t.httpClient.Do(req) + resp, err := t.client.Do(req) if err != nil { - return fmt.Errorf("failed to refresh token: %w", err) + return fmt.Errorf("HTTP request failed: %w", err) } defer resp.Body.Close() @@ -231,7 +237,7 @@ func (t *ThreadsClient) refreshToken(ctx context.Context) error { return nil } -func (t *ThreadsClient) needRefreshToken() bool { +func (t *Threads) needRefreshToken() bool { // If the token's expiration time is within 7 days from now, it's time to refresh return time.Now().After(t.token.expiration.Add(-7 * 24 * time.Hour)) } diff --git a/internal/poster/client/threads_test.go b/internal/poster/client/threads_test.go new file mode 100644 index 0000000..411eea2 --- /dev/null +++ b/internal/poster/client/threads_test.go @@ -0,0 +1,178 @@ +package client_test + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/ansoncht/flight-microservices/internal/poster/client" + "github.com/ansoncht/flight-microservices/internal/poster/config" + "github.com/ansoncht/flight-microservices/internal/poster/model" + "github.com/stretchr/testify/require" +) + +func TestNewThreadsAPI_ValidConfig_ShouldSucceed(t *testing.T) { + expected := model.ThreadsUserResponse{ + ID: "testuser", + } + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _ = json.NewEncoder(w).Encode(expected) + })) + + ctx := context.Background() + cfg := config.ThreadsAPIConfig{ + URL: server.URL, + Token: "test", + } + client, err := client.NewThreadsAPI(ctx, cfg, &http.Client{}) + require.NoError(t, err) + require.NotNil(t, client) +} + +func TestNewThreadsAPI_InvalidConfig_ShouldError(t *testing.T) { + tests := []struct { + name string + cfg config.ThreadsAPIConfig + client *http.Client + wantErr string + }{ + { + name: "Nil HTTP Client", + cfg: config.ThreadsAPIConfig{URL: "http://localhost:8080", Token: "test"}, + client: nil, + wantErr: "http client is nil", + }, + { + name: "Empty URL", + cfg: config.ThreadsAPIConfig{URL: "", Token: "test"}, + client: &http.Client{}, + wantErr: "threads api url is empty", + }, + { + name: "Empty Token", + cfg: config.ThreadsAPIConfig{URL: "http://localhost:8080", Token: ""}, + client: &http.Client{}, + wantErr: "threads api token is empty", + }, + { + name: "Empty User", + cfg: config.ThreadsAPIConfig{URL: "http://localhost:8080", Token: "test"}, + client: &http.Client{}, + wantErr: "failed to get user id", + }, + } + + ctx := context.Background() + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client, err := client.NewThreadsAPI(ctx, tt.cfg, tt.client) + require.Nil(t, client) + require.ErrorContains(t, err, tt.wantErr) + }) + } +} + +func TestPublishPost_ValidArgs_ShouldSucceed(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _ = json.NewEncoder(w).Encode(&model.ThreadsPostResponse{ + ID: "testpost", + }) + })) + defer server.Close() + + ctx := context.Background() + cfg := config.ThreadsAPIConfig{ + URL: server.URL, + Token: "test", + } + client, err := client.NewThreadsAPI(ctx, cfg, server.Client()) + require.NoError(t, err) + require.NotNil(t, client) + + err = client.PublishPost(ctx, "Test post") + require.NoError(t, err) +} + +func TestPublishPost_ContainerFailure_ShouldError(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _ = json.NewEncoder(w).Encode(&model.ThreadsUserResponse{ + ID: "testuser", + }) + })) + + defer server.Close() + + ctx := context.Background() + cfg := config.ThreadsAPIConfig{ + URL: server.URL, + Token: "test", + } + client, err := client.NewThreadsAPI(ctx, cfg, server.Client()) + require.NoError(t, err) + require.NotNil(t, client) + + err = client.PublishPost(ctx, "") + require.ErrorContains(t, err, "content is empty") +} + +func TestPublishPost_InvalidContainerResponse_ShouldError(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/me": + _ = json.NewEncoder(w).Encode(&model.ThreadsUserResponse{ + ID: "testuser", + }) + case "/testuser/threads": + _ = json.NewEncoder(w).Encode([]byte("{invalid json")) + default: + http.NotFound(w, r) + } + })) + + defer server.Close() + + ctx := context.Background() + cfg := config.ThreadsAPIConfig{ + URL: server.URL, + Token: "test", + } + client, err := client.NewThreadsAPI(ctx, cfg, server.Client()) + require.NoError(t, err) + require.NotNil(t, client) + + err = client.PublishPost(ctx, "Test post") + require.ErrorContains(t, err, "failed to decode Threads container response") +} + +func TestPublishPost_InvalidPostResponse_ShouldError(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/me": + _ = json.NewEncoder(w).Encode(&model.ThreadsUserResponse{ + ID: "testuser", + }) + case "/testuser/threads": + _ = json.NewEncoder(w).Encode(&model.ThreadsContainerResponse{ + ID: "testcontainer", + }) + case "/testuser/threads_publish": + _ = json.NewEncoder(w).Encode([]byte("{invalid json")) + } + })) + defer server.Close() + + ctx := context.Background() + cfg := config.ThreadsAPIConfig{ + URL: server.URL, + Token: "test", + } + client, err := client.NewThreadsAPI(ctx, cfg, server.Client()) + require.NoError(t, err) + require.NotNil(t, client) + + err = client.PublishPost(ctx, "Test post") + require.ErrorContains(t, err, "failed to decode Threads post response") +} diff --git a/internal/poster/client/twitter.go b/internal/poster/client/twitter.go new file mode 100644 index 0000000..ccabd4b --- /dev/null +++ b/internal/poster/client/twitter.go @@ -0,0 +1,61 @@ +package client + +import ( + "context" + "fmt" + "log/slog" + + "github.com/ansoncht/flight-microservices/internal/poster/config" + + "github.com/michimani/gotwi" + "github.com/michimani/gotwi/tweet/managetweet" + "github.com/michimani/gotwi/tweet/managetweet/types" +) + +type Twitter struct { + client *gotwi.Client +} + +func NewTwitterAPI(cfg config.TwitterAPIConfig) (*Twitter, error) { + slog.Info("Initializing Twitter API client") + + if cfg.Key == "" { + return nil, fmt.Errorf("twitter api key is empty") + } + + if cfg.Secret == "" { + return nil, fmt.Errorf("twitter api secret is empty") + } + + opts := &gotwi.NewClientInput{ + AuthenticationMethod: gotwi.AuthenMethodOAuth1UserContext, + OAuthToken: cfg.Key, + OAuthTokenSecret: cfg.Secret, + } + + client, err := gotwi.NewClient(opts) + if err != nil { + return nil, fmt.Errorf("failed to create twiiter api wrapper client: %w", err) + } + + return &Twitter{ + client: client, + }, nil +} + +func (t *Twitter) PublishPost(ctx context.Context, content string) error { + if content == "" { + return fmt.Errorf("content is empty") + } + + in := &types.CreateInput{ + Text: gotwi.String(content), + } + + res, err := managetweet.Create(ctx, t.client, in) + if err != nil || res.Data.ID == nil || *res.Data.ID == "" { + return fmt.Errorf("failed to create Twitter post: %w", err) + } + + return nil +} diff --git a/internal/poster/client/twitter_test.go b/internal/poster/client/twitter_test.go new file mode 100644 index 0000000..a47fc63 --- /dev/null +++ b/internal/poster/client/twitter_test.go @@ -0,0 +1,90 @@ +package client_test + +import ( + "context" + "os" + "testing" + + "github.com/ansoncht/flight-microservices/internal/poster/client" + "github.com/ansoncht/flight-microservices/internal/poster/config" + "github.com/stretchr/testify/require" +) + +func TestNewTwitterAPI_ValidConfig_ShouldSucceed(t *testing.T) { + t.Setenv("GOTWI_API_KEY", "test") + t.Setenv("GOTWI_API_KEY_SECRET", "test") + + cfg := config.TwitterAPIConfig{ + Key: os.Getenv("GOTWI_API_KEY"), + Secret: os.Getenv("GOTWI_API_KEY_SECRET"), + } + client, err := client.NewTwitterAPI(cfg) + require.NoError(t, err) + require.NotNil(t, client) +} + +func TestNewTwitterAPI_InvalidConfig_ShouldError(t *testing.T) { + tests := []struct { + name string + cfg config.TwitterAPIConfig + wantErr string + }{ + { + name: "Empty Key", + cfg: config.TwitterAPIConfig{Key: "", Secret: "test"}, + wantErr: "twitter api key is empty", + }, + { + name: "Empty Secret", + cfg: config.TwitterAPIConfig{Key: "test", Secret: ""}, + wantErr: "twitter api secret is empty", + }, + { + name: "Empty Environment Variables", + cfg: config.TwitterAPIConfig{Key: "test", Secret: "test"}, + wantErr: "failed to create twiiter api wrapper client", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client, err := client.NewTwitterAPI(tt.cfg) + require.Nil(t, client) + require.ErrorContains(t, err, tt.wantErr) + }) + } +} + +func TestPublishPost_EmptyContent_ShouldError(t *testing.T) { + t.Setenv("GOTWI_API_KEY", "test") + t.Setenv("GOTWI_API_KEY_SECRET", "test") + + cfg := config.TwitterAPIConfig{ + Key: os.Getenv("GOTWI_API_KEY"), + Secret: os.Getenv("GOTWI_API_KEY_SECRET"), + } + client, err := client.NewTwitterAPI(cfg) + require.NoError(t, err) + require.NotNil(t, client) + + ctx := context.Background() + err = client.PublishPost(ctx, "") + require.ErrorContains(t, err, "content is empty") +} + +func TestPublishPost_TwitterWrapperError_ShouldError(t *testing.T) { + t.Setenv("GOTWI_API_KEY", "test") + t.Setenv("GOTWI_API_KEY_SECRET", "test") + + cfg := config.TwitterAPIConfig{ + Key: os.Getenv("GOTWI_API_KEY"), + Secret: os.Getenv("GOTWI_API_KEY_SECRET"), + } + client, err := client.NewTwitterAPI(cfg) + require.NoError(t, err) + require.NotNil(t, client) + + ctx := context.Background() + err = client.PublishPost(ctx, "test post ") + require.ErrorContains(t, err, "failed to create Twitter post") +} diff --git a/internal/poster/config/env.go b/internal/poster/config/env.go new file mode 100644 index 0000000..90ad216 --- /dev/null +++ b/internal/poster/config/env.go @@ -0,0 +1,60 @@ +package config + +import ( + "fmt" + "strings" + + "github.com/ansoncht/flight-microservices/pkg/http" + "github.com/ansoncht/flight-microservices/pkg/kafka" + "github.com/ansoncht/flight-microservices/pkg/logger" + "github.com/ansoncht/flight-microservices/pkg/mongo" + "github.com/spf13/viper" +) + +// FlightPosterConfig holds all configurations related to flight poster. +type FlightPosterConfig struct { + ThreadsClientConfig ThreadsAPIConfig `mapstructure:"threads_api"` + TwitterClientConfig TwitterAPIConfig `mapstructure:"twitter_api"` + KafkaReaderConfig kafka.ReaderConfig `mapstructure:"kafka_reader"` + MongoClientConfig mongo.ClientConfig `mapstructure:"mongo"` + HTTPClientConfig http.ClientConfig `mapstructure:"http_client"` + LoggerConfig logger.Config `mapstructure:"logger"` +} + +// ThreadsAPIConfig holds configuration settings for the Threads api client. +type ThreadsAPIConfig struct { + // URL specifies the base URL for the Threads API. + URL string `mapstructure:"url"` + // Token specifies the access token for authentication with the Threads API. + Token string `mapstructure:"access_token"` +} + +// TwitterAPIConfig holds configuration settings for the Twitter api client. +type TwitterAPIConfig struct { + // Key specifies the API key for Twitter authentication. + Key string `mapstructure:"access_token_key"` + // Secret specifies the API secret key for Twitter authentication. + Secret string `mapstructure:"access_token_secret"` +} + +// LoadConfig loads configuration from environment variables and a YAML file. +func LoadConfig() (*FlightPosterConfig, error) { + viper.SetConfigName("poster-config") + viper.SetConfigType("yaml") + viper.AddConfigPath("../../configs") + viper.AddConfigPath("../../../configs") + viper.AutomaticEnv() + viper.SetEnvPrefix("FLIGHT_POSTER") + viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_", "-", "_")) + + if err := viper.ReadInConfig(); err != nil { + return nil, fmt.Errorf("failed to read config file: %w", err) + } + + var cfg FlightPosterConfig + if err := viper.Unmarshal(&cfg); err != nil { + return nil, fmt.Errorf("failed to unmarshal config: %w", err) + } + + return &cfg, nil +} diff --git a/internal/poster/config/env_test.go b/internal/poster/config/env_test.go new file mode 100644 index 0000000..9c009f9 --- /dev/null +++ b/internal/poster/config/env_test.go @@ -0,0 +1,77 @@ +package config_test + +import ( + "os" + "testing" + + "github.com/ansoncht/flight-microservices/internal/poster/config" + "github.com/stretchr/testify/require" +) + +func TestLoadConfig_ValidConfigFile_ShouldSucceed(t *testing.T) { + os.Setenv("GOTWI_API_KEY", "test") + os.Setenv("GOTWI_API_KEY_SECRET", "test") + os.Setenv("FLIGHT_POSTER_THREADS_API_ACCESS_TOKEN", "test") + os.Setenv("FLIGHT_POSTER_TWITTER_API_ACCESS_TOKEN_KEY", "test") + os.Setenv("FLIGHT_POSTER_TWITTER_API_ACCESS_TOKEN_SECRET", "test") + os.Setenv("FLIGHT_POSTER_KAFKA_READER_ADDRESS", "test") + os.Setenv("FLIGHT_POSTER_KAFKA_READER_TOPIC", "test") + os.Setenv("FLIGHT_POSTER_KAFKA_READER_GROUP_ID", "test") + os.Setenv("FLIGHT_POSTER_MONGO_URI", "mongodb://localhost:27017") + + cfg, err := config.LoadConfig() + require.NoError(t, err) + require.NotNil(t, cfg) + require.Equal(t, 10, cfg.HTTPClientConfig.Timeout) + require.Equal(t, "https://graph.threads.net", cfg.ThreadsClientConfig.URL) + require.Equal(t, "test", cfg.ThreadsClientConfig.Token) + require.Equal(t, "test", cfg.TwitterClientConfig.Key) + require.Equal(t, "test", cfg.TwitterClientConfig.Secret) + require.Equal(t, "mongodb://localhost:27017", cfg.MongoClientConfig.URI) + require.Equal(t, "flights", cfg.MongoClientConfig.DB) + require.Equal(t, uint64(5), cfg.MongoClientConfig.PoolSize) + require.Equal(t, 5, cfg.MongoClientConfig.ConnectionTimeout) + require.Equal(t, 5, cfg.MongoClientConfig.SocketTimeout) + require.Equal(t, "test", cfg.KafkaReaderConfig.Address) + require.Equal(t, "test", cfg.KafkaReaderConfig.Topic) + require.Equal(t, "test", cfg.KafkaReaderConfig.GroupID) + require.True(t, cfg.LoggerConfig.JSON) + require.Equal(t, "info", cfg.LoggerConfig.Level) +} + +func TestLoadConfig_MissingFile_ShouldError(t *testing.T) { + // Temporarily rename the config file if it exists + originalPath := "../../../configs/poster-config.yaml" + tempPath := "../../../configs/poster-config.yaml.bak" + + // Restore the file after the test + if _, err := os.Stat(originalPath); err == nil { + err := os.Rename(originalPath, tempPath) + require.NoError(t, err) + defer func() { + err := os.Rename(tempPath, originalPath) + require.NoError(t, err, "failed to restore config file") + }() + } + + cfg, err := config.LoadConfig() + require.Error(t, err) + require.Nil(t, cfg) +} + +func TestLoadConfig_EnvOverride_ShouldSucceed(t *testing.T) { + os.Setenv("FLIGHT_POSTER_THREADS_API_URL", "test") + os.Setenv("FLIGHT_POSTER_MONGO_URI", "mongodb://localhost:37017") + os.Setenv("FLIGHT_POSTER_MONGO_DB", "test_db") + os.Setenv("FLIGHT_POSTER_LOGGER_LEVEL", "debug") + os.Setenv("FLIGHT_POSTER_LOGGER_JSON", "false") + + cfg, err := config.LoadConfig() + require.NoError(t, err) + require.NotNil(t, cfg) + require.Equal(t, "test", cfg.ThreadsClientConfig.URL) + require.Equal(t, "mongodb://localhost:37017", cfg.MongoClientConfig.URI) + require.Equal(t, "test_db", cfg.MongoClientConfig.DB) + require.False(t, cfg.LoggerConfig.JSON) + require.Equal(t, "debug", cfg.LoggerConfig.Level) +} diff --git a/internal/poster/model/threads.go b/internal/poster/model/threads.go new file mode 100644 index 0000000..556f871 --- /dev/null +++ b/internal/poster/model/threads.go @@ -0,0 +1,16 @@ +package model + +// ThreadsUserResponse holds the user ID information returned by the Threads API. +type ThreadsUserResponse struct { + ID string `json:"id"` +} + +// ThreadsContainerResponse holds the container response from the Threads API. +type ThreadsContainerResponse struct { + ID string `json:"id"` +} + +// ThreadsPostResponse holds the post response from the Threads API. +type ThreadsPostResponse struct { + ID string `json:"id"` +} diff --git a/internal/poster/service/poster.go b/internal/poster/service/poster.go new file mode 100644 index 0000000..223901b --- /dev/null +++ b/internal/poster/service/poster.go @@ -0,0 +1,102 @@ +package service + +import ( + "context" + "fmt" + + "github.com/ansoncht/flight-microservices/internal/poster/client" + "github.com/ansoncht/flight-microservices/pkg/kafka" + "github.com/ansoncht/flight-microservices/pkg/repository" + "github.com/twmb/franz-go/pkg/kgo" + "golang.org/x/sync/errgroup" +) + +// Poster holds dependencies for posting flight summaries to social media platforms. +type Poster struct { + // socials specifies the list of social media clients to post messages. + socials []client.Socials + // messageReader specifies the message reader to read messages from a message queue. + messageReader kafka.MessageReader + // repo specifies the repo to interact with the db collection. + repo repository.SummaryRepository +} + +// NewPoster creates a new Poster instance based on the provided social media clients, message reader, and repository. +func NewPoster( + socials []client.Socials, + messageReader kafka.MessageReader, + repo repository.SummaryRepository, +) (*Poster, error) { + if len(socials) == 0 { + return nil, fmt.Errorf("social media clients are empty") + } + + if messageReader == nil { + return nil, fmt.Errorf("message reader is nil") + } + + if repo == nil { + return nil, fmt.Errorf("repository is nil") + } + + return &Poster{ + socials: socials, + messageReader: messageReader, + repo: repo, + }, nil +} + +// Close closes the poster service. +func (p *Poster) Close() { + p.messageReader.Close() +} + +// Post posts the flight summary to all social media clients. +func (p *Poster) Post(ctx context.Context) error { + msgChan := make(chan kgo.Record) + g, gCtx := errgroup.WithContext(ctx) + + g.Go(func() error { + return p.messageReader.ReadMessages(gCtx, msgChan) + }) + +postingLoop: + for { + select { + case <-gCtx.Done(): + break postingLoop + case msg, ok := <-msgChan: + if !ok { + break postingLoop + } + + summary, err := p.repo.Get(gCtx, string(msg.Value)) + if err != nil { + return fmt.Errorf("failed to get flight summary: %w", err) + } + + content := summary.FormatForSocialMedia() + + for _, social := range p.socials { + platform := social + g.Go(func() error { + if err := platform.PublishPost(gCtx, content); err != nil { + return fmt.Errorf("failed to post content: %w", err) + } + + return nil + }) + } + } + } + + if err := g.Wait(); err != nil { + return fmt.Errorf("error while reading messages: %w", err) + } + + if ctx.Err() != nil { + return fmt.Errorf("context canceled while posting content: %w", ctx.Err()) + } + + return nil +} diff --git a/internal/poster/service/poster_test.go b/internal/poster/service/poster_test.go new file mode 100644 index 0000000..07a0522 --- /dev/null +++ b/internal/poster/service/poster_test.go @@ -0,0 +1,199 @@ +package service_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/ansoncht/flight-microservices/internal/poster/client" + "github.com/ansoncht/flight-microservices/internal/poster/service" + "github.com/ansoncht/flight-microservices/internal/test/mock" + "github.com/ansoncht/flight-microservices/pkg/kafka" + "github.com/ansoncht/flight-microservices/pkg/model" + "github.com/ansoncht/flight-microservices/pkg/repository" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kgo" + "go.uber.org/mock/gomock" +) + +func TestNewPoster_NonNilClients_ShouldSucceed(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + socials := []client.Socials{mock.NewMockSocials(ctrl)} + reader := mock.NewMockMessageReader(ctrl) + repo := mock.NewMockSummaryRepository(ctrl) + + poster, err := service.NewPoster(socials, reader, repo) + require.NoError(t, err) + require.NotNil(t, poster) +} + +func TestNewPoster_NilDependencies_ShouldError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + tests := []struct { + name string + socials []client.Socials + reader kafka.MessageReader + repository repository.SummaryRepository + expectedErr string + }{ + { + name: "nil socials", + socials: nil, + reader: mock.NewMockMessageReader(ctrl), + repository: mock.NewMockSummaryRepository(ctrl), + expectedErr: "social media clients are empty", + }, + { + name: "nil reader", + socials: []client.Socials{mock.NewMockSocials(ctrl)}, + reader: nil, + repository: mock.NewMockSummaryRepository(ctrl), + expectedErr: "message reader is nil", + }, + { + name: "nil repository", + socials: []client.Socials{mock.NewMockSocials(ctrl)}, + reader: mock.NewMockMessageReader(ctrl), + repository: nil, + expectedErr: "repository is nil", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + processor, err := service.NewPoster(tt.socials, tt.reader, tt.repository) + require.ErrorContains(t, err, tt.expectedErr) + require.Nil(t, processor) + }) + } +} + +func TestPost_ValidContent_ShouldSucceed(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + social := mock.NewMockSocials(ctrl) + socials := []client.Socials{social} + reader := mock.NewMockMessageReader(ctrl) + repo := mock.NewMockSummaryRepository(ctrl) + + poster, err := service.NewPoster(socials, reader, repo) + require.NoError(t, err) + require.NotNil(t, poster) + defer poster.Close() + + reader.EXPECT().ReadMessages(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, msgChan chan<- kgo.Record) error { + defer close(msgChan) + msgChan <- kgo.Record{Key: []byte("summary_id"), Value: []byte("test_id")} + return nil + }, + ) + reader.EXPECT().Close() + repo.EXPECT().Get(gomock.Any(), "test_id").Return(&model.DailyFlightSummary{}, nil) + social.EXPECT().PublishPost(gomock.Any(), gomock.Any()).Return(nil) + + err = poster.Post(context.Background()) + require.NoError(t, err) +} + +func TestPost_RepoGetError_ShouldError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + social := mock.NewMockSocials(ctrl) + socials := []client.Socials{social} + reader := mock.NewMockMessageReader(ctrl) + repo := mock.NewMockSummaryRepository(ctrl) + + poster, err := service.NewPoster(socials, reader, repo) + require.NoError(t, err) + require.NotNil(t, poster) + defer poster.Close() + + reader.EXPECT().ReadMessages(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, msgChan chan<- kgo.Record) error { + defer close(msgChan) + msgChan <- kgo.Record{Key: []byte("summary_id"), Value: []byte("test_id")} + return nil + }, + ) + reader.EXPECT().Close() + repo.EXPECT().Get(gomock.Any(), "test_id").Return(nil, errors.New("test error")) + + err = poster.Post(context.Background()) + require.ErrorContains(t, err, "failed to get flight summary") +} + +func TestPost_SocialPostError_ShouldError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + social := mock.NewMockSocials(ctrl) + socials := []client.Socials{social} + reader := mock.NewMockMessageReader(ctrl) + repo := mock.NewMockSummaryRepository(ctrl) + + poster, err := service.NewPoster(socials, reader, repo) + require.NoError(t, err) + require.NotNil(t, poster) + defer poster.Close() + + reader.EXPECT().ReadMessages(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, msgChan chan<- kgo.Record) error { + defer close(msgChan) + msgChan <- kgo.Record{Key: []byte("summary_id"), Value: []byte("test_id")} + return nil + }, + ) + reader.EXPECT().Close() + repo.EXPECT().Get(gomock.Any(), "test_id").Return(&model.DailyFlightSummary{}, nil) + social.EXPECT().PublishPost(gomock.Any(), gomock.Any()).Return(errors.New("test error")) + + err = poster.Post(context.Background()) + require.ErrorContains(t, err, "failed to post content") +} + +func TestPost_ContextCanceled_ShouldError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + social := mock.NewMockSocials(ctrl) + socials := []client.Socials{social} + reader := mock.NewMockMessageReader(ctrl) + repo := mock.NewMockSummaryRepository(ctrl) + + poster, err := service.NewPoster(socials, reader, repo) + require.NoError(t, err) + require.NotNil(t, poster) + defer poster.Close() + + reader.EXPECT().ReadMessages(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, msgChan chan<- kgo.Record) error { + defer close(msgChan) + + select { + case <-ctx.Done(): + return ctx.Err() + case msgChan <- kgo.Record{Key: []byte("summary_id"), Value: []byte("test_id")}: + cancel() + time.Sleep(10 * time.Millisecond) + } + return nil + }, + ) + reader.EXPECT().Close() + repo.EXPECT().Get(gomock.Any(), "test_id").Return(&model.DailyFlightSummary{}, nil) + social.EXPECT().PublishPost(gomock.Any(), gomock.Any()).Return(nil) + + err = poster.Post(ctx) + require.ErrorContains(t, err, "context canceled while posting content") +} diff --git a/internal/processor/model/summary.go b/internal/processor/model/summary.go deleted file mode 100644 index de8bc23..0000000 --- a/internal/processor/model/summary.go +++ /dev/null @@ -1,24 +0,0 @@ -package model - -import ( - "time" - - "go.mongodb.org/mongo-driver/bson/primitive" -) - -// DailyFlightSummary holds aggregated statistics for all flights departing from a specific airport on a given day. -type DailyFlightSummary struct { - ID primitive.ObjectID `bson:"_id,omitempty"` - Date primitive.DateTime `bson:"date"` - Airport string `bson:"airport"` - TotalFlights int `bson:"totalFlights"` - AirlineCounts map[string]int `bson:"airlineCounts"` - DestinationCounts map[string]int `bson:"destinationCounts"` - TopDestinations []string `bson:"topDestinations,omitempty"` - TopAirlines []string `bson:"topAirlines,omitempty"` -} - -// ToMongoDateTime converts time.Time to primitive.DateTime for MongoDB. -func ToMongoDateTime(t time.Time) primitive.DateTime { - return primitive.NewDateTimeFromTime(t) -} diff --git a/internal/processor/service/processor.go b/internal/processor/service/processor.go index 2778a34..6045e26 100644 --- a/internal/processor/service/processor.go +++ b/internal/processor/service/processor.go @@ -6,9 +6,9 @@ import ( "fmt" "log/slog" - repo "github.com/ansoncht/flight-microservices/internal/processor/repository" msgQueue "github.com/ansoncht/flight-microservices/pkg/kafka" "github.com/ansoncht/flight-microservices/pkg/model" + repo "github.com/ansoncht/flight-microservices/pkg/repository" "github.com/twmb/franz-go/pkg/kgo" "golang.org/x/sync/errgroup" ) @@ -85,9 +85,9 @@ processingLoop: airport = string(msg.Value) slog.Info("Started processing stream for airport", "airport", airport) case "end_of_stream": - endAirport := string(msg.Value) + date := string(msg.Value) - summary, err := p.summarizer.SummarizeFlights(flights, endAirport, airport) + summary, err := p.summarizer.SummarizeFlights(flights, date, airport) if err != nil { return fmt.Errorf("failed to summarize flights: %w", err) } diff --git a/internal/processor/service/processor_test.go b/internal/processor/service/processor_test.go index 6106a3d..a3f4a9a 100644 --- a/internal/processor/service/processor_test.go +++ b/internal/processor/service/processor_test.go @@ -7,12 +7,11 @@ import ( "testing" "time" - "github.com/ansoncht/flight-microservices/internal/processor/model" - "github.com/ansoncht/flight-microservices/internal/processor/repository" "github.com/ansoncht/flight-microservices/internal/processor/service" "github.com/ansoncht/flight-microservices/internal/test/mock" - msgQueue "github.com/ansoncht/flight-microservices/pkg/kafka" - msg "github.com/ansoncht/flight-microservices/pkg/model" + "github.com/ansoncht/flight-microservices/pkg/kafka" + "github.com/ansoncht/flight-microservices/pkg/model" + "github.com/ansoncht/flight-microservices/pkg/repository" "github.com/stretchr/testify/require" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/mock/gomock" @@ -40,8 +39,8 @@ func TestNewProcessor_NilDependencies_ShouldReturnError(t *testing.T) { tests := []struct { name string - writer msgQueue.MessageWriter - reader msgQueue.MessageReader + writer kafka.MessageWriter + reader kafka.MessageReader summarizer service.Summarizer repository repository.SummaryRepository expectedErr string @@ -105,11 +104,11 @@ func TestProcess_ValidMessage_ShouldSuccess(t *testing.T) { require.NoError(t, err) require.NotNil(t, processor) - flight1, err := json.Marshal(&msg.FlightRecord{Airline: "UA", FlightNumber: "123", Destination: "LAX"}) + flight1, err := json.Marshal(&model.FlightRecord{Airline: "UA", FlightNumber: "123", Destination: "LAX"}) require.NoError(t, err) require.NotNil(t, flight1) - flight2, err := json.Marshal(&msg.FlightRecord{Airline: "AA", FlightNumber: "456", Destination: "SFO"}) + flight2, err := json.Marshal(&model.FlightRecord{Airline: "AA", FlightNumber: "456", Destination: "SFO"}) require.NoError(t, err) require.NotNil(t, flight2) @@ -164,7 +163,7 @@ func TestProcess_MalformedMessage_ShouldSuccess(t *testing.T) { require.NoError(t, err) require.NotNil(t, processor) - flight1, err := json.Marshal(&msg.FlightRecord{Airline: "UA", FlightNumber: "123", Destination: "LAX"}) + flight1, err := json.Marshal(&model.FlightRecord{Airline: "UA", FlightNumber: "123", Destination: "LAX"}) require.NoError(t, err) require.NotNil(t, flight1) @@ -215,7 +214,7 @@ func TestProcess_ContextCanceledWhenRead_ShouldError(t *testing.T) { sum := mock.NewMockSummarizer(ctrl) repo := mock.NewMockSummaryRepository(ctrl) - flight, err := json.Marshal(&msg.FlightRecord{Airline: "UA", FlightNumber: "123", Destination: "LAX"}) + flight, err := json.Marshal(&model.FlightRecord{Airline: "UA", FlightNumber: "123", Destination: "LAX"}) require.NoError(t, err) require.NotNil(t, flight) @@ -254,7 +253,7 @@ func TestProcess_ContextCanceled_ShouldError(t *testing.T) { sum := mock.NewMockSummarizer(ctrl) repo := mock.NewMockSummaryRepository(ctrl) - flight, err := json.Marshal(&msg.FlightRecord{Airline: "UA", FlightNumber: "123", Destination: "LAX"}) + flight, err := json.Marshal(&model.FlightRecord{Airline: "UA", FlightNumber: "123", Destination: "LAX"}) require.NoError(t, err) require.NotNil(t, flight) @@ -320,7 +319,7 @@ func TestProcess_SummarizeFlightsError_ShouldError(t *testing.T) { require.NoError(t, err) require.NotNil(t, processor) - flight, err := json.Marshal(&msg.FlightRecord{Airline: "UA", FlightNumber: "123", Destination: "LAX"}) + flight, err := json.Marshal(&model.FlightRecord{Airline: "UA", FlightNumber: "123", Destination: "LAX"}) require.NoError(t, err) require.NotNil(t, flight) @@ -360,7 +359,7 @@ func TestProcessor_Process_RepositoryInsertError(t *testing.T) { require.NoError(t, err) require.NotNil(t, processor) - flight, err := json.Marshal(&msg.FlightRecord{Airline: "UA", FlightNumber: "123", Destination: "LAX"}) + flight, err := json.Marshal(&model.FlightRecord{Airline: "UA", FlightNumber: "123", Destination: "LAX"}) require.NoError(t, err) require.NotNil(t, flight) @@ -411,7 +410,7 @@ func TestProcessor_Process_WriteMessageError(t *testing.T) { require.NoError(t, err) require.NotNil(t, processor) - flight, err := json.Marshal(&msg.FlightRecord{Airline: "UA", FlightNumber: "123", Destination: "LAX"}) + flight, err := json.Marshal(&model.FlightRecord{Airline: "UA", FlightNumber: "123", Destination: "LAX"}) require.NoError(t, err) require.NotNil(t, flight) diff --git a/internal/processor/service/summarizer.go b/internal/processor/service/summarizer.go index aa38e28..744af6e 100644 --- a/internal/processor/service/summarizer.go +++ b/internal/processor/service/summarizer.go @@ -5,7 +5,6 @@ import ( "time" "github.com/ansoncht/flight-microservices/internal/processor/config" - "github.com/ansoncht/flight-microservices/internal/processor/model" msg "github.com/ansoncht/flight-microservices/pkg/model" ) @@ -16,7 +15,7 @@ const ( // Summarizer defines the interface for summarizing flight data. type Summarizer interface { // SummarizeFlights summarizes flight data for a given date and airport. - SummarizeFlights(records []msg.FlightRecord, date string, airport string) (*model.DailyFlightSummary, error) + SummarizeFlights(records []msg.FlightRecord, date string, airport string) (*msg.DailyFlightSummary, error) } // FlightSummarizer implements the Summarizer interface. @@ -40,7 +39,7 @@ func (f *FlightSummarizer) SummarizeFlights( records []msg.FlightRecord, date string, airport string, -) (*model.DailyFlightSummary, error) { +) (*msg.DailyFlightSummary, error) { dt, err := parseDate(date) if err != nil { return nil, fmt.Errorf("failed to parse date for transaction: %w", err) @@ -60,8 +59,8 @@ func (f *FlightSummarizer) SummarizeFlights( topDestinations := topNKeysByValue(destCounts, f.topN) topAirlines := topNKeysByValue(airlineCounts, f.topN) - return &model.DailyFlightSummary{ - Date: model.ToMongoDateTime(dt), + return &msg.DailyFlightSummary{ + Date: msg.ToMongoDateTime(dt), Airport: airport, TotalFlights: totalFlights, AirlineCounts: airlineCounts, diff --git a/internal/processor/service/summarizer_test.go b/internal/processor/service/summarizer_test.go index 71b3a48..1a27562 100644 --- a/internal/processor/service/summarizer_test.go +++ b/internal/processor/service/summarizer_test.go @@ -5,7 +5,6 @@ import ( "time" "github.com/ansoncht/flight-microservices/internal/processor/config" - "github.com/ansoncht/flight-microservices/internal/processor/model" "github.com/ansoncht/flight-microservices/internal/processor/service" msg "github.com/ansoncht/flight-microservices/pkg/model" "github.com/stretchr/testify/require" @@ -35,7 +34,7 @@ func TestSummarizeFlights_ValidAndEmptyData_ShouldSucceed(t *testing.T) { testCases := []struct { name string flights []msg.FlightRecord - expectedSummary *model.DailyFlightSummary + expectedSummary *msg.DailyFlightSummary }{ { name: "Valid Data", @@ -52,8 +51,8 @@ func TestSummarizeFlights_ValidAndEmptyData_ShouldSucceed(t *testing.T) { {Airline: "American", FlightNumber: "302", Origin: "LAX", Destination: "DFW"}, {Airline: "United", FlightNumber: "106", Origin: "SAN", Destination: "SFO"}, }, - expectedSummary: &model.DailyFlightSummary{ - Date: model.ToMongoDateTime(time.Date(2025, 5, 7, 0, 0, 0, 0, time.UTC)), + expectedSummary: &msg.DailyFlightSummary{ + Date: msg.ToMongoDateTime(time.Date(2025, 5, 7, 0, 0, 0, 0, time.UTC)), Airport: "SFO", TotalFlights: 11, AirlineCounts: map[string]int{"United": 6, "Delta": 2, "American": 2, "JetBlue": 1}, @@ -65,8 +64,8 @@ func TestSummarizeFlights_ValidAndEmptyData_ShouldSucceed(t *testing.T) { { name: "Empty Flights", flights: []msg.FlightRecord{}, - expectedSummary: &model.DailyFlightSummary{ - Date: model.ToMongoDateTime(time.Date(2025, 5, 7, 0, 0, 0, 0, time.UTC)), + expectedSummary: &msg.DailyFlightSummary{ + Date: msg.ToMongoDateTime(time.Date(2025, 5, 7, 0, 0, 0, 0, time.UTC)), Airport: "SFO", TotalFlights: 0, AirlineCounts: map[string]int{}, diff --git a/internal/reader/service/reader.go b/internal/reader/service/reader.go index ed3569d..dee4a2b 100644 --- a/internal/reader/service/reader.go +++ b/internal/reader/service/reader.go @@ -90,21 +90,25 @@ func (r *Reader) processFlights( airport string, ) error { // Get previous day in Unix timestamp - begin, end := getPreviousDayTime() + begin, end, date := getPreviousDayTime() flights, err := r.flightsClient.FetchFlights(ctx, airport, begin, end) if err != nil { return fmt.Errorf("failed to process flights: %w", err) } - if err := r.processRoute(ctx, flights); err != nil { + if err := r.processRoute(ctx, flights, airport, date); err != nil { return fmt.Errorf("failed to process routes: %w", err) } return nil } -func (r *Reader) processRoute(ctx context.Context, flights []model.Flight) error { +func (r *Reader) processRoute(ctx context.Context, flights []model.Flight, airport string, date string) error { + if err := r.sendStreamControlMessage(ctx, "start_of_stream", airport); err != nil { + return fmt.Errorf("failed to send start_of_stream message: %w", err) + } + // Use errgroup with shared context to process routes concurrently g, gCtx := errgroup.WithContext(ctx) @@ -149,6 +153,10 @@ func (r *Reader) processRoute(ctx context.Context, flights []model.Flight) error return fmt.Errorf("failed to process at least one route: %w", err) } + if err := r.sendStreamControlMessage(ctx, "end_of_stream", date); err != nil { + return fmt.Errorf("failed to send end_of_stream message: %w", err) + } + return nil } @@ -180,19 +188,36 @@ func (r *Reader) sendFlightAndRouteMessage( return nil } +func (r *Reader) sendStreamControlMessage(ctx context.Context, key, message string) error { + value, err := json.Marshal(map[string]string{"message": message}) + if err != nil { + return fmt.Errorf("failed to marshal %s message: %w", key, err) + } + + if err := r.messageWriter.WriteMessage(ctx, []byte(key), value); err != nil { + return fmt.Errorf("failed to write %s message to the message queue: %w", key, err) + } + + return nil +} + // getPreviousDayTime calculates the start and end Unix timestamps for the previous day. -func getPreviousDayTime() (string, string) { +func getPreviousDayTime() (string, string, string) { now := time.Now() + yesterday := now.AddDate(0, 0, -2) + // Mark the start of yesterday (12:00:00 AM) - startOfYesterday := time.Date(now.Year(), now.Month(), now.Day()-2, 0, 0, 0, 0, now.Location()) + startOfYesterday := time.Date(yesterday.Year(), yesterday.Month(), yesterday.Day(), 0, 0, 0, 0, now.Location()) // Mark the end of yesterday (11:59:59 PM) - endOfYesterday := time.Date(now.Year(), now.Month(), now.Day()-2, 23, 59, 59, 0, now.Location()) + endOfYesterday := time.Date(yesterday.Year(), yesterday.Month(), yesterday.Day(), 23, 59, 59, 0, now.Location()) // Convert to Unix epoch timestamps startEpoch := startOfYesterday.Unix() endEpoch := endOfYesterday.Unix() - return fmt.Sprintf("%d", startEpoch), fmt.Sprintf("%d", endEpoch) + date := yesterday.Format("2006-01-02") + + return fmt.Sprintf("%d", startEpoch), fmt.Sprintf("%d", endEpoch), date } diff --git a/internal/reader/service/reader_test.go b/internal/reader/service/reader_test.go index 0886ed8..b784555 100644 --- a/internal/reader/service/reader_test.go +++ b/internal/reader/service/reader_test.go @@ -96,6 +96,8 @@ func TestHTTPHandler_WorkingComponents_ShouldSucceed(t *testing.T) { mFlights.EXPECT().FetchFlights(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(flights, nil) mRoutes.EXPECT().FetchRoute(gomock.Any(), gomock.Any()).Return(&model.Route{}, nil) mKafka.EXPECT().WriteMessage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + mKafka.EXPECT().WriteMessage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + mKafka.EXPECT().WriteMessage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) reader, err := service.NewReader(mFlights, mRoutes, mKafka) require.NoError(t, err) @@ -145,14 +147,17 @@ func TestHTTPHandler_EmptyCallSign_ShouldSucceed(t *testing.T) { mRoutes := mock.NewMockRoute(ctrl) mFlights := mock.NewMockFlight(ctrl) + mKafka := mock.NewMockMessageWriter(ctrl) flights := []model.Flight{ {Origin: "VHHH", Destination: "RJTT", Callsign: "", FirstSeen: 1, LastSeen: 2}, } mFlights.EXPECT().FetchFlights(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(flights, nil) + mKafka.EXPECT().WriteMessage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + mKafka.EXPECT().WriteMessage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) - reader, err := service.NewReader(mFlights, mRoutes, &kafka.Writer{}) + reader, err := service.NewReader(mFlights, mRoutes, mKafka) require.NoError(t, err) require.NotNil(t, reader) @@ -175,6 +180,7 @@ func TestHTTPHandler_RoutesClientError_ShouldSucceed(t *testing.T) { mRoutes := mock.NewMockRoute(ctrl) mFlights := mock.NewMockFlight(ctrl) + mKafka := mock.NewMockMessageWriter(ctrl) flights := []model.Flight{ {Origin: "VHHH", Destination: "RJTT", Callsign: "CRK452", FirstSeen: 1, LastSeen: 2}, @@ -182,8 +188,10 @@ func TestHTTPHandler_RoutesClientError_ShouldSucceed(t *testing.T) { mFlights.EXPECT().FetchFlights(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(flights, nil) mRoutes.EXPECT().FetchRoute(gomock.Any(), gomock.Any()).Return(nil, errors.New("error")) + mKafka.EXPECT().WriteMessage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + mKafka.EXPECT().WriteMessage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) - reader, err := service.NewReader(mFlights, mRoutes, &kafka.Writer{}) + reader, err := service.NewReader(mFlights, mRoutes, mKafka) require.NoError(t, err) require.NotNil(t, reader) @@ -214,7 +222,9 @@ func TestHTTPHandler_MessageWriterError_ShouldSucceed(t *testing.T) { mFlights.EXPECT().FetchFlights(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(flights, nil) mRoutes.EXPECT().FetchRoute(gomock.Any(), gomock.Any()).Return(&model.Route{}, nil) + mKafka.EXPECT().WriteMessage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) mKafka.EXPECT().WriteMessage(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("error")) + mKafka.EXPECT().WriteMessage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) reader, err := service.NewReader(mFlights, mRoutes, mKafka) require.NoError(t, err) @@ -247,6 +257,7 @@ func TestHTTPHandler_RouteProcessContextCancellation_ShouldError(t *testing.T) { mFlights.EXPECT().FetchFlights(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(flights, nil) mRoutes.EXPECT().FetchRoute(gomock.Any(), gomock.Any()).Return(nil, context.Canceled) + mKafka.EXPECT().WriteMessage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) reader, err := service.NewReader(mFlights, mRoutes, mKafka) require.NoError(t, err) @@ -279,6 +290,7 @@ func TestHTTPHandler_MessageProcessContextCancellation_ShouldError(t *testing.T) mFlights.EXPECT().FetchFlights(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(flights, nil) mRoutes.EXPECT().FetchRoute(gomock.Any(), gomock.Any()).Return(&model.Route{}, nil) + mKafka.EXPECT().WriteMessage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) mKafka.EXPECT().WriteMessage(gomock.Any(), gomock.Any(), gomock.Any()).Return(context.Canceled) reader, err := service.NewReader(mFlights, mRoutes, mKafka) diff --git a/internal/test/mock/mock_flight_summary_repo.go b/internal/test/mock/mock_flight_summary_repo.go index 79ba1ad..9eaa359 100644 --- a/internal/test/mock/mock_flight_summary_repo.go +++ b/internal/test/mock/mock_flight_summary_repo.go @@ -1,9 +1,9 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: internal/processor/repository/flight_summary.go +// Source: pkg/repository/flight_summary.go // // Generated by this command: // -// mockgen -source internal/processor/repository/flight_summary.go -destination=internal/test/mock/mock_flight_summary_repo.go -package=mock +// mockgen -source pkg/repository/flight_summary.go -destination=internal/test/mock/mock_flight_summary_repo.go -package=mock // // Package mock is a generated GoMock package. @@ -13,7 +13,7 @@ import ( context "context" reflect "reflect" - model "github.com/ansoncht/flight-microservices/internal/processor/model" + model "github.com/ansoncht/flight-microservices/pkg/model" gomock "go.uber.org/mock/gomock" ) @@ -41,6 +41,21 @@ func (m *MockSummaryRepository) EXPECT() *MockSummaryRepositoryMockRecorder { return m.recorder } +// Get mocks base method. +func (m *MockSummaryRepository) Get(ctx context.Context, id string) (*model.DailyFlightSummary, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", ctx, id) + ret0, _ := ret[0].(*model.DailyFlightSummary) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get. +func (mr *MockSummaryRepositoryMockRecorder) Get(ctx, id any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockSummaryRepository)(nil).Get), ctx, id) +} + // Insert mocks base method. func (m *MockSummaryRepository) Insert(ctx context.Context, summary model.DailyFlightSummary) (string, error) { m.ctrl.T.Helper() diff --git a/internal/test/mock/mock_socials.go b/internal/test/mock/mock_socials.go new file mode 100644 index 0000000..1f59b7b --- /dev/null +++ b/internal/test/mock/mock_socials.go @@ -0,0 +1,55 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: internal/poster/client/socials.go +// +// Generated by this command: +// +// mockgen -source internal/poster/client/socials.go -destination=internal/test/mock/mock_socials.go -package=mock +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockSocials is a mock of Socials interface. +type MockSocials struct { + ctrl *gomock.Controller + recorder *MockSocialsMockRecorder + isgomock struct{} +} + +// MockSocialsMockRecorder is the mock recorder for MockSocials. +type MockSocialsMockRecorder struct { + mock *MockSocials +} + +// NewMockSocials creates a new mock instance. +func NewMockSocials(ctrl *gomock.Controller) *MockSocials { + mock := &MockSocials{ctrl: ctrl} + mock.recorder = &MockSocialsMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSocials) EXPECT() *MockSocialsMockRecorder { + return m.recorder +} + +// PublishPost mocks base method. +func (m *MockSocials) PublishPost(ctx context.Context, content string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PublishPost", ctx, content) + ret0, _ := ret[0].(error) + return ret0 +} + +// PublishPost indicates an expected call of PublishPost. +func (mr *MockSocialsMockRecorder) PublishPost(ctx, content any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishPost", reflect.TypeOf((*MockSocials)(nil).PublishPost), ctx, content) +} diff --git a/internal/test/mock/mock_summarizer.go b/internal/test/mock/mock_summarizer.go index b6d07d9..431e5d5 100644 --- a/internal/test/mock/mock_summarizer.go +++ b/internal/test/mock/mock_summarizer.go @@ -12,7 +12,6 @@ package mock import ( reflect "reflect" - model "github.com/ansoncht/flight-microservices/internal/processor/model" model0 "github.com/ansoncht/flight-microservices/pkg/model" gomock "go.uber.org/mock/gomock" ) @@ -42,10 +41,10 @@ func (m *MockSummarizer) EXPECT() *MockSummarizerMockRecorder { } // SummarizeFlights mocks base method. -func (m *MockSummarizer) SummarizeFlights(records []model0.FlightRecord, date, airport string) (*model.DailyFlightSummary, error) { +func (m *MockSummarizer) SummarizeFlights(records []model0.FlightRecord, date, airport string) (*model0.DailyFlightSummary, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SummarizeFlights", records, date, airport) - ret0, _ := ret[0].(*model.DailyFlightSummary) + ret0, _ := ret[0].(*model0.DailyFlightSummary) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/pkg/http/server_test.go b/pkg/http/server_test.go index 03c817f..9397611 100644 --- a/pkg/http/server_test.go +++ b/pkg/http/server_test.go @@ -107,13 +107,11 @@ func TestServe_ServerError_ShouldError(t *testing.T) { require.NoError(t, err) }() - cfg := server.ServerConfig{Port: fmt.Sprintf("%d", 123), Timeout: 5} + cfg := server.ServerConfig{Port: fmt.Sprintf("%d", 123), Timeout: 2} server, err := server.NewServer(cfg, http.HandlerFunc(testHandler)) require.NoError(t, err) err = server.Serve(context.Background()) - - require.Error(t, err) require.ErrorContains(t, err, "failed to start HTTP server") } diff --git a/pkg/model/summary.go b/pkg/model/summary.go new file mode 100644 index 0000000..49d91de --- /dev/null +++ b/pkg/model/summary.go @@ -0,0 +1,73 @@ +package model + +import ( + "fmt" + "time" + + "go.mongodb.org/mongo-driver/bson/primitive" +) + +const ( + limit = 5 +) + +// DailyFlightSummary holds aggregated statistics for all flights departing from a specific airport on a given day. +type DailyFlightSummary struct { + ID primitive.ObjectID `bson:"_id,omitempty"` + Date primitive.DateTime `bson:"date"` + Airport string `bson:"airport"` + TotalFlights int `bson:"totalFlights"` + AirlineCounts map[string]int `bson:"airlineCounts"` + DestinationCounts map[string]int `bson:"destinationCounts"` + TopDestinations []string `bson:"topDestinations,omitempty"` + TopAirlines []string `bson:"topAirlines,omitempty"` +} + +// ToMongoDateTime converts time.Time to primitive.DateTime for MongoDB. +func ToMongoDateTime(t time.Time) primitive.DateTime { + return primitive.NewDateTimeFromTime(t) +} + +// FormatForSocialMedia formats the DailyFlightSummary for social media content. +func (s *DailyFlightSummary) FormatForSocialMedia() string { + // Convert MongoDB date to Go's time.Time + date := s.Date.Time() + + // Limit the top airlines and destinations to 5 + topAirlines := s.TopAirlines + if len(topAirlines) > limit { + topAirlines = topAirlines[:limit] + } + + topDestinations := s.TopDestinations + if len(topDestinations) > limit { + topDestinations = topDestinations[:limit] + } + + // Format the summary with emojis + return fmt.Sprintf( + "✈️ **Daily Flight Summary** ✈️\n"+ + "📍 **Airport**: %s\n"+ + "📅 **Date**: %s\n"+ + "🛫 **Total Flights**: %d\n\n"+ + "🏆 **Top 5 Airlines**:\n%s\n\n"+ + "🌍 **Top 5 Destinations**:\n%s\n", + s.Airport, + date.Format("2006-01-02"), // Format date as YYYY-MM-DD + s.TotalFlights, + formatListWithNumbers(topAirlines), + formatListWithNumbers(topDestinations), + ) +} + +// formatListWithNumbers formats a list of strings with numbers (e.g., 1️⃣, 2️⃣). +func formatListWithNumbers(items []string) string { + formatted := "" + emojis := []string{"1️⃣", "2️⃣", "3️⃣", "4️⃣", "5️⃣"} + for i, item := range items { + if i < len(emojis) { + formatted += fmt.Sprintf("%s %s\n", emojis[i], item) + } + } + return formatted +} diff --git a/internal/processor/repository/flight_summary.go b/pkg/repository/flight_summary.go similarity index 67% rename from internal/processor/repository/flight_summary.go rename to pkg/repository/flight_summary.go index 3e3d574..4f41e96 100644 --- a/internal/processor/repository/flight_summary.go +++ b/pkg/repository/flight_summary.go @@ -4,9 +4,9 @@ import ( "context" "fmt" - "github.com/ansoncht/flight-microservices/internal/processor/model" + "github.com/ansoncht/flight-microservices/pkg/model" db "github.com/ansoncht/flight-microservices/pkg/mongo" - + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" ) @@ -17,6 +17,8 @@ const dailySummaryCollection = "daily_summaries" type SummaryRepository interface { // Insert inserts a new flight summary into the database. Insert(ctx context.Context, summary model.DailyFlightSummary) (string, error) + // Get gets a flight summary from the database. + Get(ctx context.Context, id string) (*model.DailyFlightSummary, error) } // MongoSummaryRepository holds the MongoDB collection for flight summaries. @@ -54,3 +56,23 @@ func (r *MongoSummaryRepository) Insert(ctx context.Context, summary model.Daily return oid.Hex(), nil } + +// Get gets a flight summary from the MongoDB collection. +func (r *MongoSummaryRepository) Get(ctx context.Context, id string) (*model.DailyFlightSummary, error) { + oid, err := primitive.ObjectIDFromHex(id) + if err != nil { + return nil, fmt.Errorf("failed to cast id to ObjectID") + } + + result := r.Collection.FindOne(ctx, bson.D{{Key: "_id", Value: oid}}) + + var doc bson.M + _ = result.Decode(&doc) + fmt.Printf("Document: %+v\n", doc) + + summary := &model.DailyFlightSummary{} + if err := result.Decode(summary); err != nil { + return nil, fmt.Errorf("failed to find document with ID %s: %w", id, err) + } + return summary, nil +} diff --git a/internal/processor/repository/flight_summary_test.go b/pkg/repository/flight_summary_test.go similarity index 85% rename from internal/processor/repository/flight_summary_test.go rename to pkg/repository/flight_summary_test.go index e18d040..205d836 100644 --- a/internal/processor/repository/flight_summary_test.go +++ b/pkg/repository/flight_summary_test.go @@ -4,9 +4,9 @@ import ( "context" "testing" - "github.com/ansoncht/flight-microservices/internal/processor/model" - "github.com/ansoncht/flight-microservices/internal/processor/repository" - db "github.com/ansoncht/flight-microservices/pkg/mongo" + "github.com/ansoncht/flight-microservices/pkg/model" + "github.com/ansoncht/flight-microservices/pkg/mongo" + "github.com/ansoncht/flight-microservices/pkg/repository" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/modules/mongodb" @@ -30,14 +30,14 @@ func TestNewMongoSummaryRepository_Integration(t *testing.T) { uri, err := mongodbContainer.ConnectionString(ctx) require.NoError(t, err) - cfg := db.ClientConfig{ + cfg := mongo.ClientConfig{ URI: uri, DB: "testdb", PoolSize: 5, ConnectionTimeout: 10, SocketTimeout: 10, } - mongo, err := db.NewMongoClient(ctx, cfg) + mongo, err := mongo.NewMongoClient(ctx, cfg) defer func() { err = mongo.Client.Disconnect(ctx) require.NoError(t, err) @@ -51,7 +51,7 @@ func TestNewMongoSummaryRepository_Integration(t *testing.T) { } func TestNewMongoSummaryRepository_NilClient_ShouldError(t *testing.T) { - var mongo *db.Client + var mongo *mongo.Client repo, err := repository.NewMongoSummaryRepository(mongo) require.ErrorContains(t, err, "mongo client is nil") require.Nil(t, repo) @@ -75,7 +75,7 @@ func TestInsert_Integration(t *testing.T) { uri, err := mongodbContainer.ConnectionString(ctx) require.NoError(t, err) - cfg := db.ClientConfig{ + cfg := mongo.ClientConfig{ URI: uri, DB: "testdb", PoolSize: 5, @@ -83,7 +83,7 @@ func TestInsert_Integration(t *testing.T) { SocketTimeout: 10, } - mongo, err := db.NewMongoClient(ctx, cfg) + mongo, err := mongo.NewMongoClient(ctx, cfg) defer func() { err = mongo.Client.Disconnect(ctx) require.NoError(t, err) diff --git a/proto/poster.proto b/proto/poster.proto deleted file mode 100644 index 25acbdc..0000000 --- a/proto/poster.proto +++ /dev/null @@ -1,21 +0,0 @@ -syntax = "proto3"; -package flight; - -option go_package = "github.com/ansoncht/flight-microservices/proto/poster"; - -service Poster { - rpc SendSummary (SendSummaryRequest) returns (SendSummaryResponse) {} -} - -message FlightStat { - string destination = 1; - int64 frequency = 2; -} - -message SendSummaryRequest { - string date = 1; - string origin = 2; - repeated FlightStat flight_stats = 3; -} - -message SendSummaryResponse {} diff --git a/proto/src/poster/poster.pb.go b/proto/src/poster/poster.pb.go deleted file mode 100644 index b6ce0c0..0000000 --- a/proto/src/poster/poster.pb.go +++ /dev/null @@ -1,255 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.35.1 -// protoc v5.27.1 -// source: poster.proto - -package poster - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type FlightStat struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Destination string `protobuf:"bytes,1,opt,name=destination,proto3" json:"destination,omitempty"` - Frequency int64 `protobuf:"varint,2,opt,name=frequency,proto3" json:"frequency,omitempty"` -} - -func (x *FlightStat) Reset() { - *x = FlightStat{} - mi := &file_poster_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *FlightStat) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*FlightStat) ProtoMessage() {} - -func (x *FlightStat) ProtoReflect() protoreflect.Message { - mi := &file_poster_proto_msgTypes[0] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use FlightStat.ProtoReflect.Descriptor instead. -func (*FlightStat) Descriptor() ([]byte, []int) { - return file_poster_proto_rawDescGZIP(), []int{0} -} - -func (x *FlightStat) GetDestination() string { - if x != nil { - return x.Destination - } - return "" -} - -func (x *FlightStat) GetFrequency() int64 { - if x != nil { - return x.Frequency - } - return 0 -} - -type SendSummaryRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Date string `protobuf:"bytes,1,opt,name=date,proto3" json:"date,omitempty"` - Origin string `protobuf:"bytes,2,opt,name=origin,proto3" json:"origin,omitempty"` - FlightStats []*FlightStat `protobuf:"bytes,3,rep,name=flight_stats,json=flightStats,proto3" json:"flight_stats,omitempty"` -} - -func (x *SendSummaryRequest) Reset() { - *x = SendSummaryRequest{} - mi := &file_poster_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *SendSummaryRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*SendSummaryRequest) ProtoMessage() {} - -func (x *SendSummaryRequest) ProtoReflect() protoreflect.Message { - mi := &file_poster_proto_msgTypes[1] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use SendSummaryRequest.ProtoReflect.Descriptor instead. -func (*SendSummaryRequest) Descriptor() ([]byte, []int) { - return file_poster_proto_rawDescGZIP(), []int{1} -} - -func (x *SendSummaryRequest) GetDate() string { - if x != nil { - return x.Date - } - return "" -} - -func (x *SendSummaryRequest) GetOrigin() string { - if x != nil { - return x.Origin - } - return "" -} - -func (x *SendSummaryRequest) GetFlightStats() []*FlightStat { - if x != nil { - return x.FlightStats - } - return nil -} - -type SendSummaryResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *SendSummaryResponse) Reset() { - *x = SendSummaryResponse{} - mi := &file_poster_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *SendSummaryResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*SendSummaryResponse) ProtoMessage() {} - -func (x *SendSummaryResponse) ProtoReflect() protoreflect.Message { - mi := &file_poster_proto_msgTypes[2] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use SendSummaryResponse.ProtoReflect.Descriptor instead. -func (*SendSummaryResponse) Descriptor() ([]byte, []int) { - return file_poster_proto_rawDescGZIP(), []int{2} -} - -var File_poster_proto protoreflect.FileDescriptor - -var file_poster_proto_rawDesc = []byte{ - 0x0a, 0x0c, 0x70, 0x6f, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, - 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x22, 0x4c, 0x0a, 0x0a, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, - 0x53, 0x74, 0x61, 0x74, 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, - 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09, 0x66, 0x72, 0x65, 0x71, 0x75, 0x65, - 0x6e, 0x63, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x66, 0x72, 0x65, 0x71, 0x75, - 0x65, 0x6e, 0x63, 0x79, 0x22, 0x77, 0x0a, 0x12, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x75, 0x6d, 0x6d, - 0x61, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, - 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x61, 0x74, 0x65, 0x12, 0x16, - 0x0a, 0x06, 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, - 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x12, 0x35, 0x0a, 0x0c, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, - 0x5f, 0x73, 0x74, 0x61, 0x74, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x66, - 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x46, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x53, 0x74, 0x61, 0x74, - 0x52, 0x0b, 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x53, 0x74, 0x61, 0x74, 0x73, 0x22, 0x15, 0x0a, - 0x13, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x52, 0x0a, 0x06, 0x50, 0x6f, 0x73, 0x74, 0x65, 0x72, 0x12, 0x48, - 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x12, 0x1a, 0x2e, - 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x75, 0x6d, 0x6d, 0x61, - 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x66, 0x6c, 0x69, 0x67, - 0x68, 0x74, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x6e, 0x73, 0x6f, 0x6e, 0x63, 0x68, 0x74, 0x2f, - 0x66, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x2d, 0x6d, 0x69, 0x63, 0x72, 0x6f, 0x73, 0x65, 0x72, 0x76, - 0x69, 0x63, 0x65, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x6f, 0x73, 0x74, 0x65, - 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_poster_proto_rawDescOnce sync.Once - file_poster_proto_rawDescData = file_poster_proto_rawDesc -) - -func file_poster_proto_rawDescGZIP() []byte { - file_poster_proto_rawDescOnce.Do(func() { - file_poster_proto_rawDescData = protoimpl.X.CompressGZIP(file_poster_proto_rawDescData) - }) - return file_poster_proto_rawDescData -} - -var file_poster_proto_msgTypes = make([]protoimpl.MessageInfo, 3) -var file_poster_proto_goTypes = []any{ - (*FlightStat)(nil), // 0: flight.FlightStat - (*SendSummaryRequest)(nil), // 1: flight.SendSummaryRequest - (*SendSummaryResponse)(nil), // 2: flight.SendSummaryResponse -} -var file_poster_proto_depIdxs = []int32{ - 0, // 0: flight.SendSummaryRequest.flight_stats:type_name -> flight.FlightStat - 1, // 1: flight.Poster.SendSummary:input_type -> flight.SendSummaryRequest - 2, // 2: flight.Poster.SendSummary:output_type -> flight.SendSummaryResponse - 2, // [2:3] is the sub-list for method output_type - 1, // [1:2] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name -} - -func init() { file_poster_proto_init() } -func file_poster_proto_init() { - if File_poster_proto != nil { - return - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_poster_proto_rawDesc, - NumEnums: 0, - NumMessages: 3, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_poster_proto_goTypes, - DependencyIndexes: file_poster_proto_depIdxs, - MessageInfos: file_poster_proto_msgTypes, - }.Build() - File_poster_proto = out.File - file_poster_proto_rawDesc = nil - file_poster_proto_goTypes = nil - file_poster_proto_depIdxs = nil -} diff --git a/proto/src/poster/poster_grpc.pb.go b/proto/src/poster/poster_grpc.pb.go deleted file mode 100644 index 22dc7d3..0000000 --- a/proto/src/poster/poster_grpc.pb.go +++ /dev/null @@ -1,121 +0,0 @@ -// Code generated by protoc-gen-go-grpc. DO NOT EDIT. -// versions: -// - protoc-gen-go-grpc v1.5.1 -// - protoc v5.27.1 -// source: poster.proto - -package poster - -import ( - context "context" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" -) - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.64.0 or later. -const _ = grpc.SupportPackageIsVersion9 - -const ( - Poster_SendSummary_FullMethodName = "/flight.Poster/SendSummary" -) - -// PosterClient is the client API for Poster service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type PosterClient interface { - SendSummary(ctx context.Context, in *SendSummaryRequest, opts ...grpc.CallOption) (*SendSummaryResponse, error) -} - -type posterClient struct { - cc grpc.ClientConnInterface -} - -func NewPosterClient(cc grpc.ClientConnInterface) PosterClient { - return &posterClient{cc} -} - -func (c *posterClient) SendSummary(ctx context.Context, in *SendSummaryRequest, opts ...grpc.CallOption) (*SendSummaryResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(SendSummaryResponse) - err := c.cc.Invoke(ctx, Poster_SendSummary_FullMethodName, in, out, cOpts...) - if err != nil { - return nil, err - } - return out, nil -} - -// PosterServer is the server API for Poster service. -// All implementations must embed UnimplementedPosterServer -// for forward compatibility. -type PosterServer interface { - SendSummary(context.Context, *SendSummaryRequest) (*SendSummaryResponse, error) - mustEmbedUnimplementedPosterServer() -} - -// UnimplementedPosterServer must be embedded to have -// forward compatible implementations. -// -// NOTE: this should be embedded by value instead of pointer to avoid a nil -// pointer dereference when methods are called. -type UnimplementedPosterServer struct{} - -func (UnimplementedPosterServer) SendSummary(context.Context, *SendSummaryRequest) (*SendSummaryResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method SendSummary not implemented") -} -func (UnimplementedPosterServer) mustEmbedUnimplementedPosterServer() {} -func (UnimplementedPosterServer) testEmbeddedByValue() {} - -// UnsafePosterServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to PosterServer will -// result in compilation errors. -type UnsafePosterServer interface { - mustEmbedUnimplementedPosterServer() -} - -func RegisterPosterServer(s grpc.ServiceRegistrar, srv PosterServer) { - // If the following call pancis, it indicates UnimplementedPosterServer was - // embedded by pointer and is nil. This will cause panics if an - // unimplemented method is ever invoked, so we test this at initialization - // time to prevent it from happening at runtime later due to I/O. - if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { - t.testEmbeddedByValue() - } - s.RegisterService(&Poster_ServiceDesc, srv) -} - -func _Poster_SendSummary_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(SendSummaryRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(PosterServer).SendSummary(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: Poster_SendSummary_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(PosterServer).SendSummary(ctx, req.(*SendSummaryRequest)) - } - return interceptor(ctx, in, info, handler) -} - -// Poster_ServiceDesc is the grpc.ServiceDesc for Poster service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var Poster_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "flight.Poster", - HandlerType: (*PosterServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "SendSummary", - Handler: _Poster_SendSummary_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "poster.proto", -}