From 2bd1aabd233cb6e00bf8cbe445792130baa8e265 Mon Sep 17 00:00:00 2001 From: ShyamSunder149 Date: Wed, 7 Jan 2026 12:58:36 +0530 Subject: [PATCH] feat: added slog component and writerfactory renamed --- logger/logger.go | 48 +++++++++++++++++++++++++++++++++++++++++ spooler/spooler.go | 30 +++++++++++++++----------- spooler/spooler_test.go | 2 ++ spooler/types.go | 3 +++ spooler/writer.go | 18 +++++++++------- 5 files changed, 81 insertions(+), 20 deletions(-) create mode 100644 logger/logger.go diff --git a/logger/logger.go b/logger/logger.go new file mode 100644 index 0000000..0ac6e0c --- /dev/null +++ b/logger/logger.go @@ -0,0 +1,48 @@ +package logger + +import ( + "context" + "log/slog" + "os" +) + +type slogWrapper struct { + log *slog.Logger +} + +type Logger interface { + Info(ctx context.Context, msg string, args map[string]any) + Error(ctx context.Context, err error, args map[string]any) +} + +var _ Logger = (*slogWrapper)(nil) + +func New() Logger { + slogLogger := slog.New( + slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{}), + ) + + return &slogWrapper{log: slogLogger} +} + +func (sw *slogWrapper) Info(ctx context.Context, msg string, attrs map[string]any) { + sw.log.LogAttrs(ctx, slog.LevelInfo, msg, sw.mapAttrs(attrs)...) +} + +func (sw *slogWrapper) Error(ctx context.Context, err error, attrs map[string]any) { + sw.log.LogAttrs(ctx, slog.LevelError, err.Error(), sw.mapAttrs(attrs)...) +} + +func (sw *slogWrapper) mapAttrs(attrs map[string]any) []slog.Attr { + logAttrs := make([]slog.Attr, 0, len(attrs)) + + if attrs == nil { + return logAttrs + } + + for k, v := range attrs { + logAttrs = append(logAttrs, slog.Attr{Key: k, Value: slog.AnyValue(v)}) + } + + return logAttrs +} diff --git a/spooler/spooler.go b/spooler/spooler.go index 2243f15..9337cc8 100755 --- a/spooler/spooler.go +++ b/spooler/spooler.go @@ -61,27 +61,28 @@ import ( // log.Fatal(err) // } type Spooler struct { - config SpoolerConfig - writer writerFactory - batcher *batcher - currentWriter *fileWriter + config SpoolerConfig + writer writer + batcher *batcher } // Create a new Spooler instance func NewSpooler(config SpoolerConfig) (*Spooler, error) { batcher, err := newBatcher(config.BatchConfig) if err != nil { + config.FileWriterConfig.logger.Error(context.Background(), err, nil) return nil, err } - writerFactory, err := newWriterFactory(config.FileWriterConfig) + writer, err := newWriter(config.FileWriterConfig) if err != nil { + config.FileWriterConfig.logger.Error(context.Background(), err, nil) return nil, err } return &Spooler{ config: config, - writer: writerFactory, + writer: writer, batcher: batcher, }, nil } @@ -93,18 +94,20 @@ func (s *Spooler) BatchSize() int { // Create a new file writer func (s *Spooler) NewWriter(ctx context.Context, fileName string) error { - if s.currentWriter != nil { + if s.writer.currentWriter != nil { if err := s.Commit(); err != nil { + s.config.FileWriterConfig.logger.Error(context.Background(), err, nil) return err } } writer, err := s.writer.NewFileWriter(s.batcher.CurrentDir(), fileName) if err != nil { + s.config.FileWriterConfig.logger.Error(context.Background(), err, nil) return err } - s.currentWriter = writer + s.writer.currentWriter = writer return nil } @@ -112,9 +115,10 @@ func (s *Spooler) NewWriter(ctx context.Context, fileName string) error { func (s *Spooler) WriteChunk(data []byte) error { var err error - if err = s.currentWriter.Write(data); err != nil { + if err = s.writer.currentWriter.Write(data); err != nil { if errors.Is(err, ErrWriteLimitReached) { - if err = s.currentWriter.Abort(); err != nil { + if err = s.writer.currentWriter.Abort(); err != nil { + s.config.FileWriterConfig.logger.Error(context.Background(), err, nil) return err } } @@ -126,14 +130,16 @@ func (s *Spooler) WriteChunk(data []byte) error { func (s *Spooler) Commit() error { var err error - totalWritten, err := s.currentWriter.Commit() + totalWritten, err := s.writer.currentWriter.Commit() if err != nil { + s.config.FileWriterConfig.logger.Error(context.Background(), err, nil) return err } - s.currentWriter = nil + s.writer.currentWriter = nil s.batcher.AddBytes(totalWritten) if err = s.batcher.Rotate(); err != nil { + s.config.FileWriterConfig.logger.Error(context.Background(), err, nil) return err } diff --git a/spooler/spooler_test.go b/spooler/spooler_test.go index c383eff..95fa6e4 100644 --- a/spooler/spooler_test.go +++ b/spooler/spooler_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/brianvoe/gofakeit/v7" + "github.com/ritvikos/synapse/logger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -65,6 +66,7 @@ func TestSpoolerSync(t *testing.T) { }, FileWriterConfig: FileWriterConfig{ MaxFileSize: 5 * 1024 * 1024, + logger: logger.New(), }, } diff --git a/spooler/types.go b/spooler/types.go index 480f5a1..fbff5c5 100644 --- a/spooler/types.go +++ b/spooler/types.go @@ -2,6 +2,8 @@ package spooler import ( "errors" + + "github.com/ritvikos/synapse/logger" ) type SpoolerConfig struct { @@ -89,6 +91,7 @@ type BatchHooks struct { type FileWriterConfig struct { MaxFileSize int + logger logger.Logger } func (c FileWriterConfig) Validate() error { diff --git a/spooler/writer.go b/spooler/writer.go index 2edcc8c..b1b8f98 100644 --- a/spooler/writer.go +++ b/spooler/writer.go @@ -10,18 +10,20 @@ import ( var ErrWriteLimitReached = errors.New("write limit reached") // Factory for creating file writers with specific configurations. -type writerFactory struct { - maxFileSize int +type writer struct { + maxFileSize int + currentWriter *fileWriter } -// Create a new writer factory with the given configuration. -func newWriterFactory(config FileWriterConfig) (writerFactory, error) { +// Create a new writer with the given configuration. +func newWriter(config FileWriterConfig) (writer, error) { if err := config.Validate(); err != nil { - return writerFactory{}, fmt.Errorf("writer factory: invalid config: %w", err) + return writer{}, fmt.Errorf("writer: invalid config: %w", err) } - return writerFactory{ - maxFileSize: config.MaxFileSize, + return writer{ + maxFileSize: config.MaxFileSize, + currentWriter: nil, }, nil } @@ -42,7 +44,7 @@ type fileWriter struct { // Create a new file writer that writes to a temporary file in `dir`. // Upon commit, the file will be renamed to the specified `commitPath`. -func (f *writerFactory) NewFileWriter(dir, fileName string) (*fileWriter, error) { +func (f *writer) NewFileWriter(dir, fileName string) (*fileWriter, error) { tmpFile, err := os.CreateTemp(dir, "*.tmp") if err != nil { return nil, err