Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions logger/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package logger

import (
"context"
"log/slog"
"os"
)

type slogWrapper struct {
log *slog.Logger
}

type Logger interface {
Info(ctx context.Context, msg string, args map[string]any)
Error(ctx context.Context, err error, args map[string]any)
}

var _ Logger = (*slogWrapper)(nil)

func New() Logger {
slogLogger := slog.New(
slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{}),
)

return &slogWrapper{log: slogLogger}
}

func (sw *slogWrapper) Info(ctx context.Context, msg string, attrs map[string]any) {
sw.log.LogAttrs(ctx, slog.LevelInfo, msg, sw.mapAttrs(attrs)...)
}

func (sw *slogWrapper) Error(ctx context.Context, err error, attrs map[string]any) {
sw.log.LogAttrs(ctx, slog.LevelError, err.Error(), sw.mapAttrs(attrs)...)
}

func (sw *slogWrapper) mapAttrs(attrs map[string]any) []slog.Attr {
logAttrs := make([]slog.Attr, 0, len(attrs))

if attrs == nil {
return logAttrs
}

for k, v := range attrs {
logAttrs = append(logAttrs, slog.Attr{Key: k, Value: slog.AnyValue(v)})
}

return logAttrs
}
30 changes: 18 additions & 12 deletions spooler/spooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,27 +61,28 @@ import (
// log.Fatal(err)
// }
type Spooler struct {
config SpoolerConfig
writer writerFactory
batcher *batcher
currentWriter *fileWriter
config SpoolerConfig
writer writer
batcher *batcher
}

// Create a new Spooler instance
func NewSpooler(config SpoolerConfig) (*Spooler, error) {
batcher, err := newBatcher(config.BatchConfig)
if err != nil {
config.FileWriterConfig.logger.Error(context.Background(), err, nil)
return nil, err
}

writerFactory, err := newWriterFactory(config.FileWriterConfig)
writer, err := newWriter(config.FileWriterConfig)
if err != nil {
config.FileWriterConfig.logger.Error(context.Background(), err, nil)
return nil, err
}

return &Spooler{
config: config,
writer: writerFactory,
writer: writer,
batcher: batcher,
}, nil
}
Expand All @@ -93,28 +94,31 @@ func (s *Spooler) BatchSize() int {

// Create a new file writer
func (s *Spooler) NewWriter(ctx context.Context, fileName string) error {
if s.currentWriter != nil {
if s.writer.currentWriter != nil {
if err := s.Commit(); err != nil {
s.config.FileWriterConfig.logger.Error(context.Background(), err, nil)
return err
}
}

writer, err := s.writer.NewFileWriter(s.batcher.CurrentDir(), fileName)
if err != nil {
s.config.FileWriterConfig.logger.Error(context.Background(), err, nil)
return err
}

s.currentWriter = writer
s.writer.currentWriter = writer
return nil
}

// Write a chunk of data to the current file writer
func (s *Spooler) WriteChunk(data []byte) error {
var err error

if err = s.currentWriter.Write(data); err != nil {
if err = s.writer.currentWriter.Write(data); err != nil {
if errors.Is(err, ErrWriteLimitReached) {
if err = s.currentWriter.Abort(); err != nil {
if err = s.writer.currentWriter.Abort(); err != nil {
s.config.FileWriterConfig.logger.Error(context.Background(), err, nil)
return err
}
}
Expand All @@ -126,14 +130,16 @@ func (s *Spooler) WriteChunk(data []byte) error {
func (s *Spooler) Commit() error {
var err error

totalWritten, err := s.currentWriter.Commit()
totalWritten, err := s.writer.currentWriter.Commit()
if err != nil {
s.config.FileWriterConfig.logger.Error(context.Background(), err, nil)
return err
}
s.currentWriter = nil
s.writer.currentWriter = nil

s.batcher.AddBytes(totalWritten)
if err = s.batcher.Rotate(); err != nil {
s.config.FileWriterConfig.logger.Error(context.Background(), err, nil)
return err
}

Expand Down
2 changes: 2 additions & 0 deletions spooler/spooler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/brianvoe/gofakeit/v7"
"github.com/ritvikos/synapse/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -65,6 +66,7 @@ func TestSpoolerSync(t *testing.T) {
},
FileWriterConfig: FileWriterConfig{
MaxFileSize: 5 * 1024 * 1024,
logger: logger.New(),
},
}

Expand Down
3 changes: 3 additions & 0 deletions spooler/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package spooler

import (
"errors"

"github.com/ritvikos/synapse/logger"
)

type SpoolerConfig struct {
Expand Down Expand Up @@ -89,6 +91,7 @@ type BatchHooks struct {

type FileWriterConfig struct {
MaxFileSize int
logger logger.Logger
}

func (c FileWriterConfig) Validate() error {
Expand Down
18 changes: 10 additions & 8 deletions spooler/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@ import (
var ErrWriteLimitReached = errors.New("write limit reached")

// Factory for creating file writers with specific configurations.
type writerFactory struct {
maxFileSize int
type writer struct {
maxFileSize int
currentWriter *fileWriter
}

// Create a new writer factory with the given configuration.
func newWriterFactory(config FileWriterConfig) (writerFactory, error) {
// Create a new writer with the given configuration.
func newWriter(config FileWriterConfig) (writer, error) {
if err := config.Validate(); err != nil {
return writerFactory{}, fmt.Errorf("writer factory: invalid config: %w", err)
return writer{}, fmt.Errorf("writer: invalid config: %w", err)
}

return writerFactory{
maxFileSize: config.MaxFileSize,
return writer{
maxFileSize: config.MaxFileSize,
currentWriter: nil,
}, nil
}

Expand All @@ -42,7 +44,7 @@ type fileWriter struct {

// Create a new file writer that writes to a temporary file in `dir`.
// Upon commit, the file will be renamed to the specified `commitPath`.
func (f *writerFactory) NewFileWriter(dir, fileName string) (*fileWriter, error) {
func (f *writer) NewFileWriter(dir, fileName string) (*fileWriter, error) {
tmpFile, err := os.CreateTemp(dir, "*.tmp")
if err != nil {
return nil, err
Expand Down