Skip to content
This repository was archived by the owner on Jun 14, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 52 additions & 35 deletions app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/Tapjoy/dynamiq/app/compressor"
"github.com/Tapjoy/dynamiq/app/stats"
"github.com/hashicorp/memberlist"
"github.com/tpjg/goriakpbc"
"math/rand"
"strconv"
Expand All @@ -16,8 +17,13 @@ import (

var (
ConfigurationOptionNotFound = errors.New("Configuration Value Not Found")
config = &Config{}
)

func GetConfig() *Config {
return config
}

const CONFIGURATION_BUCKET = "config"
const QUEUE_CONFIG_NAME = "queue_config"
const QUEUE_SET_NAME = "queues"
Expand All @@ -34,12 +40,13 @@ var SETTINGS = [...]string{VISIBILITY_TIMEOUT, PARTITION_COUNT, MIN_PARTITIONS,
var DEFAULT_SETTINGS = map[string]string{VISIBILITY_TIMEOUT: "30", PARTITION_COUNT: "5", MIN_PARTITIONS: "1", MAX_PARTITIONS: "10", MAX_PARTITION_AGE: "432000", COMPRESSED_MESSAGES: "false"}

type Config struct {
Core Core
Stats Stats
Compressor compressor.Compressor
Queues *Queues
RiakPool *riak.Client
Topics *Topics
Core Core
Stats Stats
Compressor compressor.Compressor
Queues *Queues
RiakPool *riak.Client
Topics *Topics
MemberNodes *memberlist.Memberlist
}

type Core struct {
Expand All @@ -64,59 +71,69 @@ type Stats struct {
Client stats.StatsClient
}

func initRiakPool(cfg *Config) *riak.Client {
func initRiakPool() {
rand.Seed(time.Now().UnixNano())
// TODO this should just be 1 HAProxy
hosts := []string{cfg.Core.RiakNodes}
hosts := []string{config.Core.RiakNodes}
host := hosts[rand.Intn(len(hosts))]
return riak.NewClientPool(host, cfg.Core.BackendConnectionPool)
config.RiakPool = riak.NewClientPool(host, config.Core.BackendConnectionPool)
return
}

func GetCoreConfig(config_file *string) (*Config, error) {
var cfg Config
err := gcfg.ReadFileInto(&cfg, *config_file)
err := gcfg.ReadFileInto(config, *config_file)
if err != nil {
logrus.Fatal(err)
}

if len(cfg.Core.SeedServer) == 0 {
if len(config.Core.SeedServer) == 0 {
logrus.Fatal("The list of seedservers was empty")
}

cfg.Core.SeedServers = strings.Split(cfg.Core.SeedServer, ",")
for i, x := range cfg.Core.SeedServers {
cfg.Core.SeedServers[i] = x + ":" + strconv.Itoa(cfg.Core.SeedPort)
config.Core.SeedServers = strings.Split(config.Core.SeedServer, ",")
for i, x := range config.Core.SeedServers {
config.Core.SeedServers[i] = x + ":" + strconv.Itoa(config.Core.SeedPort)
}

// This will join the node to the cluster
config.MemberNodes, _, err = InitMemberList(config.Core.Name, config.Core.Port, config.Core.SeedServers, config.Core.SeedPort)
if err != nil {
logrus.Error(err)
}

cfg.RiakPool = initRiakPool(&cfg)
cfg.Queues = loadQueuesConfig(&cfg)
switch cfg.Stats.Type {
// This will place a fully prepared riak connection pool onto the config object
initRiakPool()

// This will load all the queue config from Riak and place it onto the config object
loadQueuesConfig()

switch config.Stats.Type {
case "statsd":
cfg.Stats.Client = stats.NewStatsdClient(cfg.Stats.Address, cfg.Stats.Prefix, time.Second*time.Duration(cfg.Stats.FlushInterval))
config.Stats.Client = stats.NewStatsdClient(config.Stats.Address, config.Stats.Prefix, time.Second*time.Duration(config.Stats.FlushInterval))
default:
cfg.Stats.Client = stats.NewNOOPClient()
config.Stats.Client = stats.NewNOOPClient()
}

// Currently we only support zlib, but we may support others
// Here is where we'd detect and inject
cfg.Compressor = compressor.NewZlibCompressor()
config.Compressor = compressor.NewZlibCompressor()

cfg.Core.LogLevel, err = logrus.ParseLevel(cfg.Core.LogLevelString)
config.Core.LogLevel, err = logrus.ParseLevel(config.Core.LogLevelString)
if err != nil {
logrus.Fatal(err)
}

go cfg.Queues.syncConfig(&cfg)
return &cfg, err
go config.Queues.syncConfig()
return config, err
}

func loadQueuesConfig(cfg *Config) *Queues {
func loadQueuesConfig() {
// Create the Queues Config struct
queuesConfig := Queues{
QueueMap: make(map[string]*Queue),
}
// Get the queues
client := cfg.RiakConnection()
client := config.RiakConnection()
// TODO: We should be handling errors here
// Get the bucket holding the map of config data
configBucket, err := client.NewBucketType("maps", CONFIGURATION_BUCKET)
Expand All @@ -126,18 +143,18 @@ func loadQueuesConfig(cfg *Config) *Queues {
logrus.Errorf("Error trying to get maps bucket type: %s", err)
}
// Fetch the object for holding the set of queues
config, err := configBucket.FetchMap(QUEUE_CONFIG_NAME)
qconfigObj, err := configBucket.FetchMap(QUEUE_CONFIG_NAME)
if err != nil {
logrus.Errorf("Error trying to get queue config bucket: %s", err)
}
queuesConfig.Config = config
queuesConfig.Config = qconfigObj

// AddSet implicitly calls fetch set if the set already exists
queueSet := config.AddSet(QUEUE_SET_NAME)
queueSet := qconfigObj.AddSet(QUEUE_SET_NAME)
if queueSet == nil {
queueSet.Add([]byte("default_queue"))
config.Store()
config, _ = configBucket.FetchMap(QUEUE_CONFIG_NAME)
qconfigObj.Store()
qconfigObj, _ = configBucket.FetchMap(QUEUE_CONFIG_NAME)
}
// For each queue we have in the system
for _, elem := range queueSet.GetValue() {
Expand All @@ -149,14 +166,14 @@ func loadQueuesConfig(cfg *Config) *Queues {
queue := &Queue{
Name: name,
Config: configMap,
Parts: InitPartitions(cfg, name),
Parts: InitPartitions(name),
}
// TODO: We should be handling errors here
// Set the queue in the queue map
queuesConfig.QueueMap[name] = queue
}
// Return the completed Queue cache of settings
return &queuesConfig
// Set the completed Queue cache of settings
config.Queues = &queuesConfig
}

func (cfg *Config) InitializeQueue(queueName string) error {
Expand All @@ -171,7 +188,7 @@ func (cfg *Config) InitializeQueue(queueName string) error {
// Now, add the queue into our memory-cache of data
cfg.Queues.QueueMap[queueName] = &Queue{
Name: queueName,
Parts: InitPartitions(cfg, queueName),
Parts: InitPartitions(queueName),
Config: configMap,
}
return err
Expand Down
62 changes: 30 additions & 32 deletions app/httpinterface_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import (
"fmt"
"github.com/Sirupsen/logrus"
"github.com/go-martini/martini"
"github.com/hashicorp/memberlist"
"github.com/martini-contrib/binding"
"github.com/martini-contrib/render"
"net/http"
"strconv"
"time"
"strings"
"time"
)

// TODO Should this live in the config package?
Expand Down Expand Up @@ -50,12 +49,12 @@ func logrusLogger() martini.Handler {
}
}

func dynamiqMartini(cfg *Config) *martini.ClassicMartini {
func dynamiqMartini() *martini.ClassicMartini {
r := martini.NewRouter()
m := martini.New()

log := logrus.New()
log.Level = cfg.Core.LogLevel
log.Level = GetConfig().Core.LogLevel

m.Map(log)
m.Use(logrusLogger())
Expand All @@ -69,21 +68,21 @@ func dynamiqMartini(cfg *Config) *martini.ClassicMartini {
type HTTP_API_V1 struct {
}

func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
func (h HTTP_API_V1) InitWebserver() {
// tieing our Queue to HTTP interface == bad we should move this somewhere else
// Queues.Queues is dumb. Need a better name-chain
queues := cfg.Queues
topics := cfg.Topics
queues := GetConfig().Queues
topics := GetConfig().Topics

m := dynamiqMartini(cfg)
m := dynamiqMartini()
m.Use(render.Renderer())

// Group the routes underneath their version
m.Group("/v1", func(r martini.Router) {
// STATUS / STATISTICS API BLOCK
m.Get("/status/servers", func() string {
return_string := ""
for _, member := range list.Members() {
for _, member := range GetConfig().MemberNodes.Members() {
return_string += fmt.Sprintf("Member: %s %s\n", member.Name, member.Addr)
}
return return_string
Expand All @@ -103,12 +102,11 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
}
})


m.Delete("/queues/:queue", func(r render.Render, params martini.Params) {
var present bool
_, present = queues.QueueMap[params["queue"]]
_, present = queues.QueueMap[params["queue"]]
if present == true {
queues.DeleteQueue(params["queue"], cfg)
queues.DeleteQueue(params["queue"])
deleted := true
r.JSON(200, map[string]interface{}{"Deleted": deleted})
} else {
Expand All @@ -120,7 +118,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
var present bool
_, present = queues.QueueMap[params["queue"]]
if present != true {
cfg.InitializeQueue(params["queue"])
GetConfig().InitializeQueue(params["queue"])
r.JSON(201, "created")
} else {
r.JSON(422, map[string]interface{}{"error": "Queue already exists."})
Expand Down Expand Up @@ -148,7 +146,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
if present != true {
r.JSON(422, map[string]interface{}{"error": "Queue does not exist. Please create it first"})
} else {
topics.TopicMap[params["topic"]].AddQueue(cfg, params["queue"])
topics.TopicMap[params["topic"]].AddQueue(params["queue"])
r.JSON(200, map[string]interface{}{"Queues": topics.TopicMap[params["topic"]].ListQueues()})
}
}
Expand All @@ -161,7 +159,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
if present != true {
topics.InitTopic(params["topic"])
}
topics.TopicMap[params["topic"]].DeleteQueue(cfg, params["queue"])
topics.TopicMap[params["topic"]].DeleteQueue(params["queue"])
r.JSON(200, map[string]interface{}{"Queues": topics.TopicMap[params["topic"]].ListQueues()})
})

Expand All @@ -178,7 +176,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
// returns the first error it runs across. Would simplify the code here greatly.
var err error
if configRequest.VisibilityTimeout != nil {
err = cfg.SetVisibilityTimeout(params["queue"], *configRequest.VisibilityTimeout)
err = GetConfig().SetVisibilityTimeout(params["queue"], *configRequest.VisibilityTimeout)
// We really need a proper way to generalize error handling
// Writing this out every time is going to be silly
if err != nil {
Expand All @@ -189,7 +187,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
}

if configRequest.MinPartitions != nil {
err = cfg.SetMinPartitions(params["queue"], *configRequest.MinPartitions)
err = GetConfig().SetMinPartitions(params["queue"], *configRequest.MinPartitions)
if err != nil {
logrus.Println(err)
r.JSON(500, map[string]interface{}{"error": err.Error()})
Expand All @@ -198,15 +196,15 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
}

if configRequest.MaxPartitions != nil {
err = cfg.SetMaxPartitions(params["queue"], *configRequest.MaxPartitions)
err = GetConfig().SetMaxPartitions(params["queue"], *configRequest.MaxPartitions)
if err != nil {
logrus.Println(err)
r.JSON(500, map[string]interface{}{"error": err.Error()})
return
}
}
if configRequest.MaxPartitionAge != nil {
err = cfg.SetMaxPartitionAge(params["queue"], *configRequest.MaxPartitionAge)
err = GetConfig().SetMaxPartitionAge(params["queue"], *configRequest.MaxPartitionAge)
if err != nil {
logrus.Println(err)
r.JSON(500, map[string]interface{}{"error": err.Error()})
Expand All @@ -215,7 +213,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
}

if configRequest.CompressedMessages != nil {
err = cfg.SetCompressedMessages(params["queue"], *configRequest.CompressedMessages)
err = GetConfig().SetCompressedMessages(params["queue"], *configRequest.CompressedMessages)
if err != nil {
logrus.Println(err)
r.JSON(500, map[string]interface{}{"error": err.Error()})
Expand Down Expand Up @@ -257,7 +255,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
var buf bytes.Buffer
buf.ReadFrom(req.Body)

response := topics.TopicMap[params["topic"]].Broadcast(cfg, buf.String())
response := topics.TopicMap[params["topic"]].Broadcast(buf.String())
r.JSON(200, response)
})

Expand All @@ -275,11 +273,11 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
_, present = queues.QueueMap[params["queue"]]
if present == true {
queueReturn := make(map[string]interface{})
queueReturn["visibility_timeout"], _ = cfg.GetVisibilityTimeout(params["queue"])
queueReturn["min_partitions"], _ = cfg.GetMinPartitions(params["queue"])
queueReturn["max_partitions"], _ = cfg.GetMaxPartitions(params["queue"])
queueReturn["max_partition_age"], _ = cfg.GetMaxPartitionAge(params["queue"])
queueReturn["compressed_messages"], _ = cfg.GetCompressedMessages(params["queue"])
queueReturn["visibility_timeout"], _ = GetConfig().GetVisibilityTimeout(params["queue"])
queueReturn["min_partitions"], _ = GetConfig().GetMinPartitions(params["queue"])
queueReturn["max_partitions"], _ = GetConfig().GetMaxPartitions(params["queue"])
queueReturn["max_partition_age"], _ = GetConfig().GetMaxPartitionAge(params["queue"])
queueReturn["compressed_messages"], _ = GetConfig().GetCompressedMessages(params["queue"])
queueReturn["partitions"] = queues.QueueMap[params["queue"]].Parts.PartitionCount()
r.JSON(200, queueReturn)
} else {
Expand All @@ -290,7 +288,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
m.Get("/queues/:queue/message/:messageId", func(r render.Render, params martini.Params) {
queue := queues.QueueMap[params["queue"]]
if queue != nil {
messages := queue.RetrieveMessages(strings.Fields(params["messageId"]), cfg)
messages := queue.RetrieveMessages(strings.Fields(params["messageId"]))
if (len(messages)) > 0 {
r.JSON(200, map[string]interface{}{"messages": messages})
} else {
Expand All @@ -315,7 +313,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
if batchSize <= 0 {
r.JSON(422, fmt.Sprint("Batchsizes must be non-negative integers greater than 0"))
}
messages, err := queues.QueueMap[params["queue"]].Get(cfg, list, batchSize)
messages, err := queues.QueueMap[params["queue"]].Get(batchSize)

if err != nil && err.Error() != NOPARTITIONS {
// We're choosing to ignore nopartitions issues for now and treat them as normal 200s
Expand Down Expand Up @@ -352,7 +350,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
// TODO clean this up, full json api?
var buf bytes.Buffer
buf.ReadFrom(req.Body)
uuid := queues.QueueMap[params["queue"]].Put(cfg, buf.String())
uuid := queues.QueueMap[params["queue"]].Put(buf.String())

return uuid
} else {
Expand All @@ -365,12 +363,12 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
var present bool
_, present = queues.QueueMap[params["queue"]]
if present != true {
cfg.InitializeQueue(params["queue"])
GetConfig().InitializeQueue(params["queue"])
}

r.JSON(200, queues.QueueMap[params["queue"]].Delete(cfg, params["messageId"]))
r.JSON(200, queues.QueueMap[params["queue"]].Delete(params["messageId"]))
})
// DATA INTERACTION API BLOCK
})
logrus.Fatal(http.ListenAndServe(":"+strconv.Itoa(cfg.Core.HttpPort), m))
logrus.Fatal(http.ListenAndServe(":"+strconv.Itoa(GetConfig().Core.HttpPort), m))
}
Loading