diff --git a/app/config.go b/app/config.go index bc963b6..708825d 100644 --- a/app/config.go +++ b/app/config.go @@ -7,6 +7,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/Tapjoy/dynamiq/app/compressor" "github.com/Tapjoy/dynamiq/app/stats" + "github.com/hashicorp/memberlist" "github.com/tpjg/goriakpbc" "math/rand" "strconv" @@ -16,8 +17,13 @@ import ( var ( ConfigurationOptionNotFound = errors.New("Configuration Value Not Found") + config = &Config{} ) +func GetConfig() *Config { + return config +} + const CONFIGURATION_BUCKET = "config" const QUEUE_CONFIG_NAME = "queue_config" const QUEUE_SET_NAME = "queues" @@ -34,12 +40,13 @@ var SETTINGS = [...]string{VISIBILITY_TIMEOUT, PARTITION_COUNT, MIN_PARTITIONS, var DEFAULT_SETTINGS = map[string]string{VISIBILITY_TIMEOUT: "30", PARTITION_COUNT: "5", MIN_PARTITIONS: "1", MAX_PARTITIONS: "10", MAX_PARTITION_AGE: "432000", COMPRESSED_MESSAGES: "false"} type Config struct { - Core Core - Stats Stats - Compressor compressor.Compressor - Queues *Queues - RiakPool *riak.Client - Topics *Topics + Core Core + Stats Stats + Compressor compressor.Compressor + Queues *Queues + RiakPool *riak.Client + Topics *Topics + MemberNodes *memberlist.Memberlist } type Core struct { @@ -64,59 +71,69 @@ type Stats struct { Client stats.StatsClient } -func initRiakPool(cfg *Config) *riak.Client { +func initRiakPool() { rand.Seed(time.Now().UnixNano()) // TODO this should just be 1 HAProxy - hosts := []string{cfg.Core.RiakNodes} + hosts := []string{config.Core.RiakNodes} host := hosts[rand.Intn(len(hosts))] - return riak.NewClientPool(host, cfg.Core.BackendConnectionPool) + config.RiakPool = riak.NewClientPool(host, config.Core.BackendConnectionPool) + return } func GetCoreConfig(config_file *string) (*Config, error) { - var cfg Config - err := gcfg.ReadFileInto(&cfg, *config_file) + err := gcfg.ReadFileInto(config, *config_file) if err != nil { logrus.Fatal(err) } - if len(cfg.Core.SeedServer) == 0 { + if len(config.Core.SeedServer) == 0 { logrus.Fatal("The list of seedservers was empty") } - cfg.Core.SeedServers = strings.Split(cfg.Core.SeedServer, ",") - for i, x := range cfg.Core.SeedServers { - cfg.Core.SeedServers[i] = x + ":" + strconv.Itoa(cfg.Core.SeedPort) + config.Core.SeedServers = strings.Split(config.Core.SeedServer, ",") + for i, x := range config.Core.SeedServers { + config.Core.SeedServers[i] = x + ":" + strconv.Itoa(config.Core.SeedPort) + } + + // This will join the node to the cluster + config.MemberNodes, _, err = InitMemberList(config.Core.Name, config.Core.Port, config.Core.SeedServers, config.Core.SeedPort) + if err != nil { + logrus.Error(err) } - cfg.RiakPool = initRiakPool(&cfg) - cfg.Queues = loadQueuesConfig(&cfg) - switch cfg.Stats.Type { + // This will place a fully prepared riak connection pool onto the config object + initRiakPool() + + // This will load all the queue config from Riak and place it onto the config object + loadQueuesConfig() + + switch config.Stats.Type { case "statsd": - cfg.Stats.Client = stats.NewStatsdClient(cfg.Stats.Address, cfg.Stats.Prefix, time.Second*time.Duration(cfg.Stats.FlushInterval)) + config.Stats.Client = stats.NewStatsdClient(config.Stats.Address, config.Stats.Prefix, time.Second*time.Duration(config.Stats.FlushInterval)) default: - cfg.Stats.Client = stats.NewNOOPClient() + config.Stats.Client = stats.NewNOOPClient() } // Currently we only support zlib, but we may support others // Here is where we'd detect and inject - cfg.Compressor = compressor.NewZlibCompressor() + config.Compressor = compressor.NewZlibCompressor() - cfg.Core.LogLevel, err = logrus.ParseLevel(cfg.Core.LogLevelString) + config.Core.LogLevel, err = logrus.ParseLevel(config.Core.LogLevelString) if err != nil { logrus.Fatal(err) } - go cfg.Queues.syncConfig(&cfg) - return &cfg, err + go config.Queues.syncConfig() + return config, err } -func loadQueuesConfig(cfg *Config) *Queues { +func loadQueuesConfig() { // Create the Queues Config struct queuesConfig := Queues{ QueueMap: make(map[string]*Queue), } // Get the queues - client := cfg.RiakConnection() + client := config.RiakConnection() // TODO: We should be handling errors here // Get the bucket holding the map of config data configBucket, err := client.NewBucketType("maps", CONFIGURATION_BUCKET) @@ -126,18 +143,18 @@ func loadQueuesConfig(cfg *Config) *Queues { logrus.Errorf("Error trying to get maps bucket type: %s", err) } // Fetch the object for holding the set of queues - config, err := configBucket.FetchMap(QUEUE_CONFIG_NAME) + qconfigObj, err := configBucket.FetchMap(QUEUE_CONFIG_NAME) if err != nil { logrus.Errorf("Error trying to get queue config bucket: %s", err) } - queuesConfig.Config = config + queuesConfig.Config = qconfigObj // AddSet implicitly calls fetch set if the set already exists - queueSet := config.AddSet(QUEUE_SET_NAME) + queueSet := qconfigObj.AddSet(QUEUE_SET_NAME) if queueSet == nil { queueSet.Add([]byte("default_queue")) - config.Store() - config, _ = configBucket.FetchMap(QUEUE_CONFIG_NAME) + qconfigObj.Store() + qconfigObj, _ = configBucket.FetchMap(QUEUE_CONFIG_NAME) } // For each queue we have in the system for _, elem := range queueSet.GetValue() { @@ -149,14 +166,14 @@ func loadQueuesConfig(cfg *Config) *Queues { queue := &Queue{ Name: name, Config: configMap, - Parts: InitPartitions(cfg, name), + Parts: InitPartitions(name), } // TODO: We should be handling errors here // Set the queue in the queue map queuesConfig.QueueMap[name] = queue } - // Return the completed Queue cache of settings - return &queuesConfig + // Set the completed Queue cache of settings + config.Queues = &queuesConfig } func (cfg *Config) InitializeQueue(queueName string) error { @@ -171,7 +188,7 @@ func (cfg *Config) InitializeQueue(queueName string) error { // Now, add the queue into our memory-cache of data cfg.Queues.QueueMap[queueName] = &Queue{ Name: queueName, - Parts: InitPartitions(cfg, queueName), + Parts: InitPartitions(queueName), Config: configMap, } return err diff --git a/app/httpinterface_v1.go b/app/httpinterface_v1.go index 0770f97..74d248e 100644 --- a/app/httpinterface_v1.go +++ b/app/httpinterface_v1.go @@ -5,13 +5,12 @@ import ( "fmt" "github.com/Sirupsen/logrus" "github.com/go-martini/martini" - "github.com/hashicorp/memberlist" "github.com/martini-contrib/binding" "github.com/martini-contrib/render" "net/http" "strconv" - "time" "strings" + "time" ) // TODO Should this live in the config package? @@ -50,12 +49,12 @@ func logrusLogger() martini.Handler { } } -func dynamiqMartini(cfg *Config) *martini.ClassicMartini { +func dynamiqMartini() *martini.ClassicMartini { r := martini.NewRouter() m := martini.New() log := logrus.New() - log.Level = cfg.Core.LogLevel + log.Level = GetConfig().Core.LogLevel m.Map(log) m.Use(logrusLogger()) @@ -69,13 +68,13 @@ func dynamiqMartini(cfg *Config) *martini.ClassicMartini { type HTTP_API_V1 struct { } -func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { +func (h HTTP_API_V1) InitWebserver() { // tieing our Queue to HTTP interface == bad we should move this somewhere else // Queues.Queues is dumb. Need a better name-chain - queues := cfg.Queues - topics := cfg.Topics + queues := GetConfig().Queues + topics := GetConfig().Topics - m := dynamiqMartini(cfg) + m := dynamiqMartini() m.Use(render.Renderer()) // Group the routes underneath their version @@ -83,7 +82,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { // STATUS / STATISTICS API BLOCK m.Get("/status/servers", func() string { return_string := "" - for _, member := range list.Members() { + for _, member := range GetConfig().MemberNodes.Members() { return_string += fmt.Sprintf("Member: %s %s\n", member.Name, member.Addr) } return return_string @@ -103,12 +102,11 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { } }) - m.Delete("/queues/:queue", func(r render.Render, params martini.Params) { var present bool - _, present = queues.QueueMap[params["queue"]] + _, present = queues.QueueMap[params["queue"]] if present == true { - queues.DeleteQueue(params["queue"], cfg) + queues.DeleteQueue(params["queue"]) deleted := true r.JSON(200, map[string]interface{}{"Deleted": deleted}) } else { @@ -120,7 +118,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { var present bool _, present = queues.QueueMap[params["queue"]] if present != true { - cfg.InitializeQueue(params["queue"]) + GetConfig().InitializeQueue(params["queue"]) r.JSON(201, "created") } else { r.JSON(422, map[string]interface{}{"error": "Queue already exists."}) @@ -148,7 +146,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { if present != true { r.JSON(422, map[string]interface{}{"error": "Queue does not exist. Please create it first"}) } else { - topics.TopicMap[params["topic"]].AddQueue(cfg, params["queue"]) + topics.TopicMap[params["topic"]].AddQueue(params["queue"]) r.JSON(200, map[string]interface{}{"Queues": topics.TopicMap[params["topic"]].ListQueues()}) } } @@ -161,7 +159,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { if present != true { topics.InitTopic(params["topic"]) } - topics.TopicMap[params["topic"]].DeleteQueue(cfg, params["queue"]) + topics.TopicMap[params["topic"]].DeleteQueue(params["queue"]) r.JSON(200, map[string]interface{}{"Queues": topics.TopicMap[params["topic"]].ListQueues()}) }) @@ -178,7 +176,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { // returns the first error it runs across. Would simplify the code here greatly. var err error if configRequest.VisibilityTimeout != nil { - err = cfg.SetVisibilityTimeout(params["queue"], *configRequest.VisibilityTimeout) + err = GetConfig().SetVisibilityTimeout(params["queue"], *configRequest.VisibilityTimeout) // We really need a proper way to generalize error handling // Writing this out every time is going to be silly if err != nil { @@ -189,7 +187,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { } if configRequest.MinPartitions != nil { - err = cfg.SetMinPartitions(params["queue"], *configRequest.MinPartitions) + err = GetConfig().SetMinPartitions(params["queue"], *configRequest.MinPartitions) if err != nil { logrus.Println(err) r.JSON(500, map[string]interface{}{"error": err.Error()}) @@ -198,7 +196,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { } if configRequest.MaxPartitions != nil { - err = cfg.SetMaxPartitions(params["queue"], *configRequest.MaxPartitions) + err = GetConfig().SetMaxPartitions(params["queue"], *configRequest.MaxPartitions) if err != nil { logrus.Println(err) r.JSON(500, map[string]interface{}{"error": err.Error()}) @@ -206,7 +204,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { } } if configRequest.MaxPartitionAge != nil { - err = cfg.SetMaxPartitionAge(params["queue"], *configRequest.MaxPartitionAge) + err = GetConfig().SetMaxPartitionAge(params["queue"], *configRequest.MaxPartitionAge) if err != nil { logrus.Println(err) r.JSON(500, map[string]interface{}{"error": err.Error()}) @@ -215,7 +213,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { } if configRequest.CompressedMessages != nil { - err = cfg.SetCompressedMessages(params["queue"], *configRequest.CompressedMessages) + err = GetConfig().SetCompressedMessages(params["queue"], *configRequest.CompressedMessages) if err != nil { logrus.Println(err) r.JSON(500, map[string]interface{}{"error": err.Error()}) @@ -257,7 +255,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { var buf bytes.Buffer buf.ReadFrom(req.Body) - response := topics.TopicMap[params["topic"]].Broadcast(cfg, buf.String()) + response := topics.TopicMap[params["topic"]].Broadcast(buf.String()) r.JSON(200, response) }) @@ -275,11 +273,11 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { _, present = queues.QueueMap[params["queue"]] if present == true { queueReturn := make(map[string]interface{}) - queueReturn["visibility_timeout"], _ = cfg.GetVisibilityTimeout(params["queue"]) - queueReturn["min_partitions"], _ = cfg.GetMinPartitions(params["queue"]) - queueReturn["max_partitions"], _ = cfg.GetMaxPartitions(params["queue"]) - queueReturn["max_partition_age"], _ = cfg.GetMaxPartitionAge(params["queue"]) - queueReturn["compressed_messages"], _ = cfg.GetCompressedMessages(params["queue"]) + queueReturn["visibility_timeout"], _ = GetConfig().GetVisibilityTimeout(params["queue"]) + queueReturn["min_partitions"], _ = GetConfig().GetMinPartitions(params["queue"]) + queueReturn["max_partitions"], _ = GetConfig().GetMaxPartitions(params["queue"]) + queueReturn["max_partition_age"], _ = GetConfig().GetMaxPartitionAge(params["queue"]) + queueReturn["compressed_messages"], _ = GetConfig().GetCompressedMessages(params["queue"]) queueReturn["partitions"] = queues.QueueMap[params["queue"]].Parts.PartitionCount() r.JSON(200, queueReturn) } else { @@ -290,7 +288,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { m.Get("/queues/:queue/message/:messageId", func(r render.Render, params martini.Params) { queue := queues.QueueMap[params["queue"]] if queue != nil { - messages := queue.RetrieveMessages(strings.Fields(params["messageId"]), cfg) + messages := queue.RetrieveMessages(strings.Fields(params["messageId"])) if (len(messages)) > 0 { r.JSON(200, map[string]interface{}{"messages": messages}) } else { @@ -315,7 +313,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { if batchSize <= 0 { r.JSON(422, fmt.Sprint("Batchsizes must be non-negative integers greater than 0")) } - messages, err := queues.QueueMap[params["queue"]].Get(cfg, list, batchSize) + messages, err := queues.QueueMap[params["queue"]].Get(batchSize) if err != nil && err.Error() != NOPARTITIONS { // We're choosing to ignore nopartitions issues for now and treat them as normal 200s @@ -352,7 +350,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { // TODO clean this up, full json api? var buf bytes.Buffer buf.ReadFrom(req.Body) - uuid := queues.QueueMap[params["queue"]].Put(cfg, buf.String()) + uuid := queues.QueueMap[params["queue"]].Put(buf.String()) return uuid } else { @@ -365,12 +363,12 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { var present bool _, present = queues.QueueMap[params["queue"]] if present != true { - cfg.InitializeQueue(params["queue"]) + GetConfig().InitializeQueue(params["queue"]) } - r.JSON(200, queues.QueueMap[params["queue"]].Delete(cfg, params["messageId"])) + r.JSON(200, queues.QueueMap[params["queue"]].Delete(params["messageId"])) }) // DATA INTERACTION API BLOCK }) - logrus.Fatal(http.ListenAndServe(":"+strconv.Itoa(cfg.Core.HttpPort), m)) + logrus.Fatal(http.ListenAndServe(":"+strconv.Itoa(GetConfig().Core.HttpPort), m)) } diff --git a/app/partitions.go b/app/partitions.go index f8c85e2..81a2f1e 100644 --- a/app/partitions.go +++ b/app/partitions.go @@ -25,15 +25,15 @@ type Partition struct { LastUsed time.Time } -func InitPartitions(cfg *Config, queueName string) *Partitions { +func InitPartitions(queueName string) *Partitions { part := &Partitions{ partitions: lane.NewPQueue(lane.MINPQ), partitionCount: 0, } // We'll initially allocate the minimum amount - minPartitions, _ := cfg.GetMinPartitions(queueName) + minPartitions, _ := GetConfig().GetMinPartitions(queueName) part.Lock() - part.makePartitions(cfg, queueName, minPartitions) + part.makePartitions(queueName, minPartitions) part.Unlock() return part } @@ -41,15 +41,15 @@ func InitPartitions(cfg *Config, queueName string) *Partitions { func (part *Partitions) PartitionCount() int { return part.partitionCount } -func (part *Partitions) GetPartition(cfg *Config, queueName string, list *memberlist.Memberlist) (int, int, *Partition, error) { +func (part *Partitions) GetPartition(queueName string) (int, int, *Partition, error) { //get the node position and the node count - nodePosition, nodeCount := getNodePosition(list) + nodePosition, nodeCount := getNodePosition(GetConfig().MemberNodes) //calculate the range that our node is responsible for step := math.MaxInt64 / nodeCount nodeBottom := nodePosition * step nodeTop := (nodePosition + 1) * step - myPartition, partition, totalPartitions, err := part.getPartitionPosition(cfg, queueName) + myPartition, partition, totalPartitions, err := part.getPartitionPosition(queueName) if err != nil && err.Error() != NOPARTITIONS { logrus.Error(err) } @@ -80,7 +80,7 @@ func getNodePosition(list *memberlist.Memberlist) (int, int) { return nodePosition, nodeCount } -func (part *Partitions) getPartitionPosition(cfg *Config, queueName string) (int, *Partition, int, error) { +func (part *Partitions) getPartitionPosition(queueName string) (int, *Partition, int, error) { //iterate over the partitions and then increase or decrease the number of partitions //TODO move loging out of the sync operation for better throughput @@ -95,14 +95,14 @@ func (part *Partitions) getPartitionPosition(cfg *Config, queueName string) (int // this seems a little scary return myPartition, workingPartition, part.partitionCount, errors.New(NOPARTITIONS) } - visTimeout, _ := cfg.GetVisibilityTimeout(queueName) + visTimeout, _ := GetConfig().GetVisibilityTimeout(queueName) if time.Since(workingPartition.LastUsed).Seconds() > visTimeout { myPartition = workingPartition.Id } else { part.partitions.Push(workingPartition, workingPartition.LastUsed.UnixNano()) part.Lock() defer part.Unlock() - maxPartitions, _ := cfg.GetMaxPartitions(queueName) + maxPartitions, _ := GetConfig().GetMaxPartitions(queueName) if part.partitionCount < maxPartitions { workingPartition = new(Partition) workingPartition.Id = part.partitionCount @@ -114,21 +114,21 @@ func (part *Partitions) getPartitionPosition(cfg *Config, queueName string) (int } return myPartition, workingPartition, part.partitionCount, err } -func (part *Partitions) PushPartition(cfg *Config, queueName string, partition *Partition, lock bool) { +func (part *Partitions) PushPartition(queueName string, partition *Partition, lock bool) { if lock { partition.LastUsed = time.Now() part.partitions.Push(partition, partition.LastUsed.UnixNano()) } else { - visTimeout, _ := cfg.GetVisibilityTimeout(queueName) + visTimeout, _ := GetConfig().GetVisibilityTimeout(queueName) unlockTime := int(visTimeout) partition.LastUsed = time.Now().Add(-(time.Duration(unlockTime) * time.Second)) part.partitions.Push(partition, partition.LastUsed.UnixNano()) } } -func (part *Partitions) makePartitions(cfg *Config, queueName string, partitionsToMake int) { +func (part *Partitions) makePartitions(queueName string, partitionsToMake int) { var initialTime time.Time - maxPartitions, _ := cfg.GetMaxPartitions(queueName) + maxPartitions, _ := GetConfig().GetMaxPartitions(queueName) offset := part.partitionCount for partitionId := offset; partitionId < offset+partitionsToMake; partitionId++ { if maxPartitions > partitionId { @@ -140,12 +140,12 @@ func (part *Partitions) makePartitions(cfg *Config, queueName string, partitions } } } -func (part *Partitions) syncPartitions(cfg *Config, queueName string) { +func (part *Partitions) syncPartitions(queueName string) { part.Lock() - maxPartitions, _ := cfg.GetMaxPartitions(queueName) - minPartitions, _ := cfg.GetMinPartitions(queueName) - maxPartitionAge, _ := cfg.GetMaxPartitionAge(queueName) + maxPartitions, _ := GetConfig().GetMaxPartitions(queueName) + minPartitions, _ := GetConfig().GetMinPartitions(queueName) + maxPartitionAge, _ := GetConfig().GetMaxPartitionAge(queueName) var partsRemoved int for partsRemoved = 0; maxPartitions < part.partitionCount; partsRemoved++ { @@ -154,7 +154,7 @@ func (part *Partitions) syncPartitions(cfg *Config, queueName string) { part.partitionCount = part.partitionCount - partsRemoved if part.partitionCount < minPartitions { - part.makePartitions(cfg, queueName, minPartitions-part.partitionCount) + part.makePartitions(queueName, minPartitions-part.partitionCount) } // Partition Aging logic diff --git a/app/queue.go b/app/queue.go index 608aba2..3fda987 100644 --- a/app/queue.go +++ b/app/queue.go @@ -102,10 +102,10 @@ func (queue *Queue) setQueueDepthApr(c stats.StatsClient, list *memberlist.Membe } } -func (queues *Queues) Exists(cfg *Config, queueName string) bool { +func (queues *Queues) Exists(queueName string) bool { // For now, lets go right to Riak for this // Because of the config delay, we don't wanna check the memory values - client := cfg.RiakConnection() + client := GetConfig().RiakConnection() bucket, _ := client.NewBucketType("maps", CONFIGURATION_BUCKET) m, _ := bucket.FetchMap(QUEUE_CONFIG_NAME) @@ -120,8 +120,8 @@ func (queues *Queues) Exists(cfg *Config, queueName string) bool { return false } -func (queues Queues) DeleteQueue(name string, cfg *Config) bool { - client := cfg.RiakConnection() +func (queues Queues) DeleteQueue(name string) bool { + client := GetConfig().RiakConnection() bucket, _ := client.NewBucketType("maps", CONFIGURATION_BUCKET) config, _ := bucket.FetchMap(QUEUE_CONFIG_NAME) @@ -132,13 +132,13 @@ func (queues Queues) DeleteQueue(name string, cfg *Config) bool { bucketConfig.Destroy() //return true if queue doesn't exist anymore - return !queues.Exists(cfg, name) + return !queues.Exists(name) } // get a message from the queue -func (queue *Queue) Get(cfg *Config, list *memberlist.Memberlist, batchsize int64) ([]riak.RObject, error) { +func (queue *Queue) Get(batchsize int64) ([]riak.RObject, error) { // grab a riak client - client := cfg.RiakConnection() + client := GetConfig().RiakConnection() //set the bucket bucket, err := client.NewBucketType("messages", queue.Name) @@ -148,14 +148,14 @@ func (queue *Queue) Get(cfg *Config, list *memberlist.Memberlist, batchsize int6 } // get the top and bottom partitions - partBottom, partTop, partition, err := queue.Parts.GetPartition(cfg, queue.Name, list) + partBottom, partTop, partition, err := queue.Parts.GetPartition(queue.Name) if err != nil { return nil, err } //get a list of batchsize message ids messageIds, _, err := bucket.IndexQueryRangePage("id_int", strconv.Itoa(partBottom), strconv.Itoa(partTop), uint32(batchsize), "") - defer queue.setQueueDepthApr(cfg.Stats.Client, list, queue.Name, messageIds) + defer queue.setQueueDepthApr(GetConfig().Stats.Client, GetConfig().MemberNodes, queue.Name, messageIds) if err != nil { logrus.Error(err) @@ -165,28 +165,28 @@ func (queue *Queue) Get(cfg *Config, list *memberlist.Memberlist, batchsize int6 // return the partition to the parts heap, but only lock it when we have messages if messageCount > 0 { - defer queue.Parts.PushPartition(cfg, queue.Name, partition, true) + defer queue.Parts.PushPartition(queue.Name, partition, true) } else { - defer queue.Parts.PushPartition(cfg, queue.Name, partition, false) + defer queue.Parts.PushPartition(queue.Name, partition, false) } - defer incrementReceiveCount(cfg.Stats.Client, queue.Name, messageCount) - defer recordFillRatio(cfg.Stats.Client, queue.Name, batchsize, messageCount) + defer incrementReceiveCount(GetConfig().Stats.Client, queue.Name, messageCount) + defer recordFillRatio(GetConfig().Stats.Client, queue.Name, batchsize, messageCount) logrus.Debug("Message retrieved ", messageCount) - return queue.RetrieveMessages(messageIds, cfg), err + return queue.RetrieveMessages(messageIds), err } // Put a Message onto the queue -func (queue *Queue) Put(cfg *Config, message string) string { +func (queue *Queue) Put(message string) string { //Grab our bucket - client := cfg.RiakConnection() + client := GetConfig().RiakConnection() bucket, err := client.NewBucketType("messages", queue.Name) if err == nil { // Prepare the body and compress, if need be var body = []byte(message) - var shouldCompress, _ = cfg.GetCompressedMessages(queue.Name) + var shouldCompress, _ = GetConfig().GetCompressedMessages(queue.Name) if shouldCompress == true { var compressedBody []byte - compressedBody, err = cfg.Compressor.Compress(body) + compressedBody, err = GetConfig().Compressor.Compress(body) if err != nil { logrus.Error("Error compressing message body") logrus.Error(err) @@ -206,7 +206,7 @@ func (queue *Queue) Put(cfg *Config, message string) string { messageObj.Data = body messageObj.Store() - defer incrementMessageCount(cfg.Stats.Client, queue.Name, 1) + defer incrementMessageCount(GetConfig().Stats.Client, queue.Name, 1) return uuid } else { //Actually want to handle this in some other way @@ -215,13 +215,13 @@ func (queue *Queue) Put(cfg *Config, message string) string { } // Delete a Message from the queue -func (queue *Queue) Delete(cfg *Config, id string) bool { - client := cfg.RiakConnection() +func (queue *Queue) Delete(id string) bool { + client := GetConfig().RiakConnection() bucket, err := client.NewBucketType("messages", queue.Name) if err == nil { err = bucket.Delete(id) if err == nil { - defer decrementMessageCount(cfg.Stats.Client, queue.Name, 1) + defer decrementMessageCount(GetConfig().Stats.Client, queue.Name, 1) return true } else { logrus.Error(err) @@ -235,19 +235,19 @@ func (queue *Queue) Delete(cfg *Config, id string) bool { } // helpers -func (queue *Queue) RetrieveMessages(ids []string, cfg *Config) []riak.RObject { +func (queue *Queue) RetrieveMessages(ids []string) []riak.RObject { var rObjectArrayChan = make(chan riak.RObject, len(ids)) var rKeys = make(chan string, len(ids)) start := time.Now() // We might need to decompress the data - var decompressMessages, _ = cfg.GetCompressedMessages(queue.Name) + var decompressMessages, _ = GetConfig().GetCompressedMessages(queue.Name) // foreach message id we have for i := 0; i < len(ids); i++ { // Kick off a go routine go func() { var riakKey string - client := cfg.RiakConnection() + client := GetConfig().RiakConnection() bucket, _ := client.NewBucketType("messages", queue.Name) // Pop a key off the rKeys channel riakKey = <-rKeys @@ -261,7 +261,7 @@ func (queue *Queue) RetrieveMessages(ids []string, cfg *Config) []riak.RObject { // If we didn't get an error, push the riak object into the objectarray channel } if decompressMessages == true { - var data, _ = cfg.Compressor.Decompress(rObject.Data) + var data, _ = GetConfig().Compressor.Decompress(rObject.Data) rObject.Data = data } rObjectArrayChan <- *rObject @@ -286,7 +286,7 @@ func (queue *Queue) RetrieveMessages(ids []string, cfg *Config) []riak.RObject { if rObject.Conflict() { for _, sibling := range rObject.Siblings { if len(sibling.Data) > 0 { - queue.Put(cfg, string(sibling.Data)) + queue.Put(string(sibling.Data)) } else { logrus.Debugf("sibling had no data") } @@ -304,10 +304,10 @@ func (queue *Queue) RetrieveMessages(ids []string, cfg *Config) []riak.RObject { return returnVals } -func (queues *Queues) syncConfig(cfg *Config) { +func (queues *Queues) syncConfig() { for { logrus.Debug("syncing Queue config with Riak") - client := cfg.RiakConnection() + client := GetConfig().RiakConnection() bucket, err := client.NewBucketType("maps", CONFIGURATION_BUCKET) if err != nil { // This is likely caused by a network blip against the riak node, or the node being down @@ -315,8 +315,7 @@ func (queues *Queues) syncConfig(cfg *Config) { // skip this iteration of the config sync, and try again at the next interval logrus.Error("There was an error attempting to read the from the configuration bucket") logrus.Error(err) - //cfg.ResetRiakConnection() - time.Sleep(cfg.Core.SyncConfigInterval * time.Millisecond) + time.Sleep(GetConfig().Core.SyncConfigInterval * time.Millisecond) continue } @@ -331,8 +330,7 @@ func (queues *Queues) syncConfig(cfg *Config) { // skip this iteration of the config sync, and try again at the next interval logrus.Error("There was an error attempting to read from the queue configuration map in the configuration bucket") logrus.Error(err) - //cfg.ResetRiakConnection() - time.Sleep(cfg.Core.SyncConfigInterval * time.Millisecond) + time.Sleep(GetConfig().Core.SyncConfigInterval * time.Millisecond) continue } } @@ -344,14 +342,14 @@ func (queues *Queues) syncConfig(cfg *Config) { if queueSet == nil { //bail if there aren't any queues //but not before sleeping - time.Sleep(cfg.Core.SyncConfigInterval * time.Second) + time.Sleep(GetConfig().Core.SyncConfigInterval * time.Second) continue } queueSlice := queueSet.GetValue() if queueSlice == nil { //bail if there aren't any queues //but not before sleeping - time.Sleep(cfg.Core.SyncConfigInterval * time.Second) + time.Sleep(GetConfig().Core.SyncConfigInterval * time.Second) continue } @@ -363,13 +361,13 @@ func (queues *Queues) syncConfig(cfg *Config) { var present bool _, present = queues.QueueMap[queueName] if present != true { - initQueueFromRiak(cfg, queueName) + initQueueFromRiak(queueName) } queuesToKeep[queueName] = true } //iterate over the topics in topics.TopicMap and delete the ones no longer used - topics := cfg.Topics + topics := GetConfig().Topics for queue, _ := range queues.QueueMap { var present bool _, present = queuesToKeep[queue] @@ -378,7 +376,7 @@ func (queues *Queues) syncConfig(cfg *Config) { topicQueueList := topics.TopicMap[topic].ListQueues() for _, topicQueue := range topicQueueList { if topicQueue == string(queue) { - topics.TopicMap[topic].DeleteQueue(cfg, string(queue)) + topics.TopicMap[topic].DeleteQueue(string(queue)) } } } @@ -388,38 +386,38 @@ func (queues *Queues) syncConfig(cfg *Config) { //sync all topics with riak for _, queue := range queues.QueueMap { - queue.syncConfig(cfg) + queue.syncConfig() } //sleep for the configured interval - time.Sleep(cfg.Core.SyncConfigInterval * time.Millisecond) + time.Sleep(GetConfig().Core.SyncConfigInterval * time.Millisecond) } } -func initQueueFromRiak(cfg *Config, queueName string) { - client := cfg.RiakConnection() +func initQueueFromRiak(queueName string) { + client := GetConfig().RiakConnection() bucket, _ := client.NewBucketType("maps", CONFIGURATION_BUCKET) config, _ := bucket.FetchMap(queueConfigRecordName(queueName)) queue := Queue{ Name: queueName, - Parts: InitPartitions(cfg, queueName), + Parts: InitPartitions(queueName), Config: config, } // This is adding a new member to the collection, it shouldn't need a lock? // TODO Keep an eye on this for emergent issues - cfg.Queues.QueueMap[queueName] = &queue + GetConfig().Queues.QueueMap[queueName] = &queue } -func (queue *Queue) syncConfig(cfg *Config) { +func (queue *Queue) syncConfig() { //refresh the queue RDtMap - client := cfg.RiakConnection() + client := GetConfig().RiakConnection() bucket, _ := client.NewBucketType("maps", CONFIGURATION_BUCKET) rCfg, _ := bucket.FetchMap(queueConfigRecordName(queue.Name)) queue.updateQueueConfig(rCfg) - queue.Parts.syncPartitions(cfg, queue.Name) + queue.Parts.syncPartitions(queue.Name) } func (queue *Queue) updateQueueConfig(rCfg *riak.RDtMap) { diff --git a/app/topic.go b/app/topic.go index 6b68d8b..2969137 100644 --- a/app/topic.go +++ b/app/topic.go @@ -24,8 +24,8 @@ type Topics struct { queues *Queues } -func InitTopics(cfg *Config, queues *Queues) Topics { - client := cfg.RiakConnection() +func InitTopics() Topics { + client := GetConfig().RiakConnection() bucket, err := client.NewBucketType("maps", "config") if err != nil { logrus.Error(err) @@ -45,11 +45,11 @@ func InitTopics(cfg *Config, queues *Queues) Topics { } topics := Topics{ Config: config, - riakPool: cfg.RiakPool, - queues: queues, + riakPool: GetConfig().RiakPool, + queues: GetConfig().Queues, TopicMap: make(map[string]*Topic), } - go topics.syncConfig(cfg) + go topics.syncConfig() return topics } @@ -72,7 +72,7 @@ func (topics *Topics) InitTopic(name string) { } //Broadcast the message to all listening queues and return the acked writes -func (topic *Topic) Broadcast(cfg *Config, message string) map[string]string { +func (topic *Topic) Broadcast(message string) map[string]string { queueWrites := make(map[string]string) // If we haven't mapped any queues to this topic yet, this will be nil topicQueues := topic.Config.FetchSet("queues") @@ -82,7 +82,7 @@ func (topic *Topic) Broadcast(cfg *Config, message string) map[string]string { var present bool _, present = topic.queues.QueueMap[string(queue)] if present == true { - uuid := topic.queues.QueueMap[string(queue)].Put(cfg, message) + uuid := topic.queues.QueueMap[string(queue)].Put(message) queueWrites[string(queue)] = uuid } else { // Return something indicating no queue? @@ -93,8 +93,8 @@ func (topic *Topic) Broadcast(cfg *Config, message string) map[string]string { return queueWrites } -func (topic *Topic) AddQueue(cfg *Config, name string) { - client := cfg.RiakConnection() +func (topic *Topic) AddQueue(name string) { + client := GetConfig().RiakConnection() bucket, err := client.NewBucketType("maps", CONFIGURATION_BUCKET) recordName := topicConfigRecordName(topic.Name) @@ -109,8 +109,8 @@ func (topic *Topic) AddQueue(cfg *Config, name string) { } } -func (topic *Topic) DeleteQueue(cfg *Config, name string) { - client := cfg.RiakConnection() +func (topic *Topic) DeleteQueue(name string) { + client := GetConfig().RiakConnection() recordName := topicConfigRecordName(topic.Name) bucket, _ := client.NewBucketType("maps", CONFIGURATION_BUCKET) topic.Config, _ = bucket.FetchMap(recordName) @@ -162,7 +162,7 @@ func (topic *Topic) Delete() { //helpers //TODO move error handling for empty config in riak to initializer -func (topics *Topics) syncConfig(cfg *Config) { +func (topics *Topics) syncConfig() { for { logrus.Debug("syncing Topic config with Riak") //refresh the topic RDtMap @@ -175,7 +175,7 @@ func (topics *Topics) syncConfig(cfg *Config) { logrus.Error("There was an error attempting to read the from the configuration bucket") logrus.Error(err) //cfg.ResetRiakConnection() - time.Sleep(cfg.Core.SyncConfigInterval * time.Millisecond) + time.Sleep(GetConfig().Core.SyncConfigInterval * time.Millisecond) continue } //fetch the map ignore error for event that map doesn't exist @@ -189,7 +189,7 @@ func (topics *Topics) syncConfig(cfg *Config) { logrus.Error("There was an error attempting to read from the queue configuration map in the configuration bucket") logrus.Error(err) //cfg.ResetRiakConnection() - time.Sleep(cfg.Core.SyncConfigInterval * time.Millisecond) + time.Sleep(GetConfig().Core.SyncConfigInterval * time.Millisecond) continue } //iterate the map and add or remove topics that need to be destroyed @@ -197,7 +197,7 @@ func (topics *Topics) syncConfig(cfg *Config) { if topicSlice == nil { //bail if there aren't any topics //but not before sleeping - time.Sleep(cfg.Core.SyncConfigInterval * time.Second) + time.Sleep(GetConfig().Core.SyncConfigInterval * time.Second) continue } //Is there a better way to do this? @@ -228,7 +228,7 @@ func (topics *Topics) syncConfig(cfg *Config) { topic.syncConfig() } //sleep for the configured interval - time.Sleep(cfg.Core.SyncConfigInterval * time.Millisecond) + time.Sleep(GetConfig().Core.SyncConfigInterval * time.Millisecond) } } diff --git a/dynamiq.go b/dynamiq.go index bdad0aa..e44cede 100644 --- a/dynamiq.go +++ b/dynamiq.go @@ -19,7 +19,7 @@ func main() { //setup the config file cfg, err := app.GetCoreConfig(config_file) - topics := app.InitTopics(cfg, cfg.Queues) + topics := app.InitTopics() cfg.Topics = &topics if err != nil { @@ -27,8 +27,7 @@ func main() { } logrus.SetLevel(cfg.Core.LogLevel) - list, _, err := app.InitMemberList(cfg.Core.Name, cfg.Core.Port, cfg.Core.SeedServers, cfg.Core.SeedPort) http_api := app.HTTP_API_V1{} - http_api.InitWebserver(list, cfg) + http_api.InitWebserver() }