From 5255d3993cfb608c3ea5679bc5fb08ba70648cf3 Mon Sep 17 00:00:00 2001 From: StabbyCutyou Date: Mon, 5 Oct 2015 23:11:38 -0400 Subject: [PATCH 01/12] First steps towards refactor. HTTPV2 defined, topics/queues/riak access patterns defined --- core/config.go | 79 ++++++++++++ core/queue.go | 48 +++++++ core/queues.go | 131 ++++++++++++++++++++ core/riak_service.go | 130 +++++++++++++++++++ core/topic.go | 20 +++ core/topics.go | 90 ++++++++++++++ dynamiq.go | 18 +++ server/endpoints/http/v2/queue_handlers.go | 39 ++++++ server/endpoints/http/v2/status_handlers.go | 20 +++ server/endpoints/http/v2/topic_handlers.go | 31 +++++ server/endpoints/http/v2/v2.go | 71 +++++++++++ 11 files changed, 677 insertions(+) create mode 100644 core/config.go create mode 100644 core/queue.go create mode 100644 core/queues.go create mode 100644 core/riak_service.go create mode 100644 core/topic.go create mode 100644 core/topics.go create mode 100644 server/endpoints/http/v2/queue_handlers.go create mode 100644 server/endpoints/http/v2/status_handlers.go create mode 100644 server/endpoints/http/v2/topic_handlers.go create mode 100644 server/endpoints/http/v2/v2.go diff --git a/core/config.go b/core/config.go new file mode 100644 index 0000000..99054f3 --- /dev/null +++ b/core/config.go @@ -0,0 +1,79 @@ +package core + +import ( + "time" + + "github.com/Tapjoy/dynamiq/app/compressor" + "github.com/Tapjoy/dynamiq/app/stats" + "github.com/hashicorp/memberlist" +) + +// StatsConfig represents config info for sending data to any statsd like system +// and the client itself +type StatsConfig struct { + Address string + FlushInterval int + Prefix string + Client *stats.Client +} + +// HTTPConfig represents config info for the HTTP server +type HTTPConfig struct { + APIVersion string + Port uint16 +} + +// DiscoveryConfig represents config info for how Dynamiq nodes discovery eachother via Memberlist +type DiscoveryConfig struct { + Port int + Memberlist *memberlist.Memberlist +} + +// RiakConfig represents config info and the connection pool for Riak +type RiakConfig struct { + Addresses []string + Port uint16 + Service *RiakService + ConfigSyncInterval time.Duration +} + +// Config is the parent struct of all the individual configuration sections +type Config struct { + Riak *RiakConfig + Discovery *DiscoveryConfig + HTTP *HTTPConfig + Stats *StatsConfig + Compressor *compressor.Compressor + Queues *Queues +} + +// GetConfig Parses and returns a config object +func GetConfig() (*Config, error) { + // TODO settle on an actual config package + + discoveryConfig := &DiscoveryConfig{ + Port: 7000, + } + + httpConfig := &HTTPConfig{ + Port: 8081, + APIVersion: "2", + } + + riakConfig := &RiakConfig{ + Addresses: []string{"127.0.0.1"}, + Port: 8087, + ConfigSyncInterval: 30 * time.Second, + } + + rs, err := NewRiakService(riakConfig.Addresses, riakConfig.Port) + if err != nil { + return nil, err + } + riakConfig.Service = rs + return &Config{ + Riak: riakConfig, + Discovery: discoveryConfig, + HTTP: httpConfig, + }, nil +} diff --git a/core/queue.go b/core/queue.go new file mode 100644 index 0000000..69e248e --- /dev/null +++ b/core/queue.go @@ -0,0 +1,48 @@ +package core + +import ( + "math" + "math/big" + "sync" + + "github.com/StabbyCutyou/partition_ring" + "github.com/tpjg/goriakpbc" +) + +// Queue represents a bucket in Riak used to hold messages, and the behaviors that +// may be taken over such an object +type Queue struct { + // the definition of a queue + // name of the queue + Name string + // the PartitionRing for this queue + ring *partitionring.PartitionRing + + // Mutex for protecting rw access to the Config object + configLock sync.RWMutex + // Individual settings for the queue + Config *riak.RDtMap +} + +// Define statistics keys suffixes + +// QueueSentStatsSuffix is +const QueueSentStatsSuffix = "sent.count" + +// QueueReceivedStatsSuffix is +const QueueReceivedStatsSuffix = "received.count" + +// QueueDeletedStatsSuffix is +const QueueDeletedStatsSuffix = "deleted.count" + +// QueueDepthStatsSuffix is +const QueueDepthStatsSuffix = "depth.count" + +// QueueDepthAprStatsSuffix is +const QueueDepthAprStatsSuffix = "approximate_depth.count" + +// QueueFillDeltaStatsSuffix +const QueueFillDeltaStatsSuffix = "fill.count" + +// MaxIDSize is +var MaxIDSize = *big.NewInt(math.MaxInt64) diff --git a/core/queues.go b/core/queues.go new file mode 100644 index 0000000..ebb4349 --- /dev/null +++ b/core/queues.go @@ -0,0 +1,131 @@ +package core + +// Queues represents a collection of Queues, and the behaviors that may be taken +import ( + "errors" + "sync" + "time" + + "github.com/basho/riak-go-client" +) + +// VisibilityTimeout is the name of the config setting name for controlling how long a message is "inflight" +const VisibilityTimeout = "visibility_timeout" + +// PartitionCount is +const PartitionCount = "partition_count" + +// MinPartitions is the name of the config setting name for controlling the minimum number of partitions per queue +const MinPartitions = "min_partitions" + +// MaxPartitions is the name of the config setting name for controlling the maximum number of partitions per node +const MaxPartitions = "max_partitions" + +// MaxPartitionAge is the name of the config setting name for controlling how long an un-used partition should exist +const MaxPartitionAge = "max_partition_age" + +// CompressedMessages is the name of the config setting name for controlling if the queue is using compression or not +const CompressedMessages = "compressed_messages" + +var ( + // ErrQueueAlreadyExists is an error for when you try to create a queue that already exists + ErrQueueAlreadyExists = errors.New("Queue already exists") + // ErrConfigMapNotFound is + ErrConfigMapNotFound = errors.New("The Config map in Riak has not yet been created") + // Settings Arrays and maps cannot be made immutable in golang + Settings = [...]string{VisibilityTimeout, PartitionCount, MinPartitions, MaxPartitions, CompressedMessages} + + // DefaultSettings is + DefaultSettings = map[string]string{VisibilityTimeout: "30", PartitionCount: "5", MinPartitions: "1", MaxPartitions: "10", CompressedMessages: "false"} +) + +// Queues represents a collection of Queue objects, and the behaviors that may be +// taken over a collection of such objects +type Queues struct { + // a container for all queues + QueueMap map[string]*Queue + // Channels / Timer for syncing the config + syncScheduler *time.Ticker + syncKiller chan bool + // Reference to shared riak client + riakService *RiakService + // Mutex for protecting rw access to the Config object + configLock sync.RWMutex + // Settings for Queues in general, ie queue list + Config *riak.Map +} + +// LoadQueuesFromRiak is +func LoadQueuesFromRiak(cfg *Config) (*Queues, error) { + queues := &Queues{ + QueueMap: make(map[string]*Queue), + syncScheduler: time.NewTicker(cfg.Riak.ConfigSyncInterval), + syncKiller: make(chan bool, 0), + riakService: cfg.Riak.Service, + configLock: sync.RWMutex{}, + } + + m, err := queues.riakService.GetQueueConfigMap() + + if err == ErrConfigMapNotFound { + m, err = queues.riakService.CreateQueueConfigMap() + if err != nil { + return nil, err + } + } else if err != nil { + return nil, err + } + queues.Config = m + return queues, nil +} + +// Create will register a new queue with the default config +// This queue will be available to be used once all the nodes have had their config +// refreshed +func (queues *Queues) Create(queueName string) (bool, error) { + // This function intentionally not optimized because it should + // not be a high-throughput operation + + if ok, err := queues.Exists(queueName); ok || err != nil { + if err == nil { + return false, ErrQueueAlreadyExists + } + return false, err + } + + // build the operation to update the set + op := &riak.MapOperation{} + op.AddToSet("queues", []byte(queueName)) + _, err := queues.riakService.CreateOrUpdateMap("config", "queues_config", op) + if err != nil { + return false, err + } + return true, nil +} + +// Delete will remove a queue from the system, and remove it from any topics +// that it is subscribed to. This will not delete any un-acknowledged messages +// although in the future it will expand to cover this as well. +func (queues *Queues) Delete(queueName string) (bool, error) { + // This function intentionally not optimized because it should + // not be a high-throughput operation + return false, nil +} + +// Exists checks is the given queue name is already created or not +func (queues *Queues) Exists(queueName string) (bool, error) { + m, err := queues.riakService.GetQueueConfigMap() + if err != nil { + return false, err + } + + if set, ok := m.Sets["queues"]; ok { + for _, item := range set { + if queueName == string(item) { + return true, nil + } + } + } + + return false, nil +} diff --git a/core/riak_service.go b/core/riak_service.go new file mode 100644 index 0000000..7551207 --- /dev/null +++ b/core/riak_service.go @@ -0,0 +1,130 @@ +package core + +import ( + "fmt" + "time" + + "github.com/basho/riak-go-client" +) + +// RiakService is an abstraction over the client, to DRY up some common lookups +type RiakService struct { + Pool *riak.Client +} + +// NewRiakService creates a RiakService to use in the app +func NewRiakService(addresses []string, port uint16) (*RiakService, error) { + // Create a new Riak Client + clientOpts := &riak.NewClientOptions{ + RemoteAddresses: addresses, + Port: port, + } + + client, err := riak.NewClient(clientOpts) + + if err != nil { + return nil, err + } + // Slap it into our struct so we can add some common lookups to it + return &RiakService{ + Pool: client, + }, nil +} + +// Execute is lazy method to ferry to the underlying clients Execute +func (rs *RiakService) Execute(cmd riak.Command) error { + return rs.Pool.Execute(cmd) +} + +// GetQueueConfigMap loads the primary queue configuration data from Riak, as a native riak.Map +func (rs *RiakService) GetQueueConfigMap() (*riak.Map, error) { + return rs.GetMap("queues_config") +} + +// GetTopicConfigMap loads the primary topic configuration data from Riak, as a native riak.Map +func (rs *RiakService) GetTopicConfigMap() (*riak.Map, error) { + return rs.GetMap("topics_config") +} + +// CreateQueueConfigMap is +func (rs *RiakService) CreateQueueConfigMap() (*riak.Map, error) { + op := &riak.MapOperation{} + op.SetRegister("created", []byte(time.Now().String())) + + return rs.CreateOrUpdateMap("config", "queues_config", op) +} + +// CreateTopicConfigMap is +func (rs *RiakService) CreateTopicConfigMap() (*riak.Map, error) { + op := &riak.MapOperation{} + op.SetRegister("created", []byte(time.Now().String())) + + return rs.CreateOrUpdateMap("config", "topics_config", op) +} + +// GetMap loads a CRDT Map from Riak +func (rs *RiakService) GetMap(name string) (*riak.Map, error) { + // TODO solution for how to centralize this knowledge + // few things need to reach into config and they're confined to + // private methods in 2 packages, but still. Magic strings + not DRY + fmc, err := riak.NewFetchMapCommandBuilder(). + WithBucketType("maps"). + WithBucket("config"). + WithKey(name).Build() + + if err != nil { + return nil, err + } + + if err = rs.Execute(fmc); err != nil { + return nil, err + } + res := fmc.(*riak.FetchMapCommand) + if res.Error() != nil { + return nil, res.Error() + } + if res.Response.IsNotFound { + // There will be no error from res.Error() here, as it isn't an error + return nil, ErrConfigMapNotFound + } + return res.Response.Map, nil +} + +// CreateOrUpdateMap does exactly that because thats what the riak lib allows for +func (rs *RiakService) CreateOrUpdateMap(bucket string, key string, op *riak.MapOperation) (*riak.Map, error) { + cmd, err := riak.NewUpdateMapCommandBuilder(). + WithBucketType("maps"). + WithBucket(bucket). + WithMapOperation(op). + WithReturnBody(true). + WithKey(key).Build() + + if err != nil { + return nil, err + } + + if err = rs.Execute(cmd); err != nil { + return nil, err + } + + res := cmd.(*riak.UpdateMapCommand) + return res.Response.Map, res.Error() +} + +// CreateQueueConfig will create a configuration for a new queue +func (rs *RiakService) CreateQueueConfig(queueName string, values map[string]string) (*riak.Map, error) { + op := &riak.MapOperation{} + for key, value := range values { + op.SetRegister(key, []byte(value)) + } + + return rs.CreateOrUpdateMap("config", queueConfigRecordName(queueName), op) +} + +func queueConfigRecordName(queueName string) string { + return fmt.Sprintf("queue_%s_config", queueName) +} + +func topicConfigRecordName(queueName string) string { + return fmt.Sprintf("topic_%s_config", queueName) +} diff --git a/core/topic.go b/core/topic.go new file mode 100644 index 0000000..4850878 --- /dev/null +++ b/core/topic.go @@ -0,0 +1,20 @@ +package core + +import ( + "sync" + + "github.com/basho/riak-go-client" +) + +// Topic represents a bucket in Riak used to hold messages, and the behaviors that +// may be taken over such an object +type Topic struct { + // the definition of a queue + // name of the queue + Name string + + // Mutex for protecting rw access to the Config object + configLock sync.RWMutex + // Individual settings for the queue + Config *riak.Map +} diff --git a/core/topics.go b/core/topics.go new file mode 100644 index 0000000..e36d80d --- /dev/null +++ b/core/topics.go @@ -0,0 +1,90 @@ +package core + +import ( + "sync" + "time" + + "github.com/basho/riak-go-client" +) + +// Topics is +type Topics struct { + riakService *RiakService + + syncScheduler *time.Ticker + syncKiller chan bool + + configLock sync.RWMutex + Config *riak.Map + TopicMap map[string]*Topic +} + +// LoadTopicsFromRiak is +func LoadTopicsFromRiak(cfg *Config) (*Topics, error) { + topics := &Topics{ + TopicMap: make(map[string]*Topic), + syncScheduler: time.NewTicker(cfg.Riak.ConfigSyncInterval), + syncKiller: make(chan bool, 0), + riakService: cfg.Riak.Service, + configLock: sync.RWMutex{}, + } + + m, err := topics.riakService.GetTopicConfigMap() + if err == ErrConfigMapNotFound { + m, err = topics.riakService.CreateTopicConfigMap() + if err != nil { + return nil, err + } + } else if err != nil { + return nil, err + } + topics.Config = m + return topics, nil +} + +// Create will register a new topic +// This queue will be available to be used once all the nodes have had their config +// refreshed +func (topics *Topics) Create(queueName string) (bool, error) { + // This function intentionally not optimized because it should + // not be a high-throughput operation + + if ok, err := topics.Exists(queueName); ok || err != nil { + if err == nil { + return false, ErrQueueAlreadyExists + } + return false, err + } + // build the operation to update the set + op := &riak.MapOperation{} + op.AddToSet("topics", []byte(queueName)) + _, err := topics.riakService.CreateOrUpdateMap("config", "topics_config", op) + if err != nil { + return false, err + } + return true, nil +} + +// Delete will remove a topic from the system. +// this will not delete any queues or messages. +func (topics *Topics) Delete(queueName string) (bool, error) { + // This function intentionally not optimized because it should + // not be a high-throughput operation + return false, nil +} + +// Exists checks is the given topic name is already created or not +func (topics *Topics) Exists(topicName string) (bool, error) { + m, err := topics.riakService.GetTopicConfigMap() + if err != nil { + return false, err + } + if set, ok := m.Sets["topics"]; ok { + for _, item := range set { + if topicName == string(item) { + return true, nil + } + } + } + return false, nil +} diff --git a/dynamiq.go b/dynamiq.go index e80f3cb..f67e7ab 100644 --- a/dynamiq.go +++ b/dynamiq.go @@ -2,12 +2,30 @@ package main import ( "flag" + "log" "github.com/Sirupsen/logrus" "github.com/Tapjoy/dynamiq/app" + "github.com/Tapjoy/dynamiq/core" + "github.com/Tapjoy/dynamiq/server/endpoints/http/v2" ) +func main2() { + cfg, err := core.GetConfig() + if err != nil { + log.Fatal(err) + } + + httpServer, err := httpv2.New(cfg) + if err != nil { + log.Println(err) + } + httpServer.Listen() +} + func main() { + main2() + return //Get some Command line options configFile := flag.String("c", "./lib/config.gcfg", "location of config file") flag.Parse() diff --git a/server/endpoints/http/v2/queue_handlers.go b/server/endpoints/http/v2/queue_handlers.go new file mode 100644 index 0000000..c4c5103 --- /dev/null +++ b/server/endpoints/http/v2/queue_handlers.go @@ -0,0 +1,39 @@ +package httpv2 + +import "net/http" + +func (h *HTTPApi) queueList(w http.ResponseWriter, r *http.Request) { + +} + +func (h *HTTPApi) queueDetails(w http.ResponseWriter, r *http.Request) { + +} + +func (h *HTTPApi) queueConfigure(w http.ResponseWriter, r *http.Request) { + +} + +func (h *HTTPApi) queueCreate(w http.ResponseWriter, r *http.Request) { + +} + +func (h *HTTPApi) queueDelete(w http.ResponseWriter, r *http.Request) { + +} + +func (h *HTTPApi) queueSubmitMessage(w http.ResponseWriter, r *http.Request) { + +} + +func (h *HTTPApi) queueGetMessage(w http.ResponseWriter, r *http.Request) { + +} + +func (h *HTTPApi) queuePollMessage(w http.ResponseWriter, r *http.Request) { + +} + +func (h *HTTPApi) queueDeleteMessage(w http.ResponseWriter, r *http.Request) { + +} diff --git a/server/endpoints/http/v2/status_handlers.go b/server/endpoints/http/v2/status_handlers.go new file mode 100644 index 0000000..fecdee2 --- /dev/null +++ b/server/endpoints/http/v2/status_handlers.go @@ -0,0 +1,20 @@ +package httpv2 + +import ( + "encoding/json" + "fmt" + "net/http" +) + +func (h *HTTPApi) statusServers(w http.ResponseWriter, r *http.Request) { + response := make([]string, 0) + for _, member := range h.memberList.Members() { + response = append(response, fmt.Sprintf("Member: %s %s", member.Name, member.Addr)) + } + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(response) +} + +func (h *HTTPApi) statusPartitionRange(w http.ResponseWriter, r *http.Request) { + +} diff --git a/server/endpoints/http/v2/topic_handlers.go b/server/endpoints/http/v2/topic_handlers.go new file mode 100644 index 0000000..8cad23f --- /dev/null +++ b/server/endpoints/http/v2/topic_handlers.go @@ -0,0 +1,31 @@ +package httpv2 + +import "net/http" + +func (h *HTTPApi) topicList(w http.ResponseWriter, r *http.Request) { + +} + +func (h *HTTPApi) topicDetails(w http.ResponseWriter, r *http.Request) { + +} + +func (h *HTTPApi) topicCreate(w http.ResponseWriter, r *http.Request) { + +} + +func (h *HTTPApi) topicDelete(w http.ResponseWriter, r *http.Request) { + +} + +func (h *HTTPApi) topicSubmitMessage(w http.ResponseWriter, r *http.Request) { + +} + +func (h *HTTPApi) topicSubscribe(w http.ResponseWriter, r *http.Request) { + +} + +func (h *HTTPApi) topicUnsubscribe(w http.ResponseWriter, r *http.Request) { + +} diff --git a/server/endpoints/http/v2/v2.go b/server/endpoints/http/v2/v2.go new file mode 100644 index 0000000..d4ee3eb --- /dev/null +++ b/server/endpoints/http/v2/v2.go @@ -0,0 +1,71 @@ +package httpv2 + +import ( + "fmt" + "log" + "net/http" + + "github.com/Tapjoy/dynamiq/core" + "github.com/gorilla/mux" + "github.com/hashicorp/memberlist" +) + +// HTTPApi represents the object used to govern http calls into the system +type HTTPApi struct { + memberList *memberlist.Memberlist + topics *core.Topics + queues *core.Queues + port uint16 +} + +// New initializes a new +func New(cfg *core.Config) (*HTTPApi, error) { + t, err := core.LoadTopicsFromRiak(cfg) + if err != nil { + return nil, err + } + q, err := core.LoadQueuesFromRiak(cfg) + if err != nil { + return nil, err + } + h := &HTTPApi{ + memberList: cfg.Discovery.Memberlist, + topics: t, + queues: q, + port: cfg.HTTP.Port, + } + router := mux.NewRouter().PathPrefix("/v2").Subrouter() + + statusRoutes := router.PathPrefix("/status").Subrouter() + topicRoutes := router.PathPrefix("/topics").Subrouter() + queueRoutes := router.PathPrefix("/queues").Subrouter() + + statusRoutes.HandleFunc("/server", h.statusServers).Methods("GET") + statusRoutes.HandleFunc("/partitionrange", h.statusPartitionRange).Methods("GET") + + topicRoutes.HandleFunc("/", h.topicList).Methods("GET") + topicRoutes.HandleFunc("/{topic}", h.topicDetails).Methods("GET") + topicRoutes.HandleFunc("/{topic}", h.topicCreate).Methods("PUT") + topicRoutes.HandleFunc("/{topic}", h.topicDelete).Methods("DELETE") + topicRoutes.HandleFunc("/{topic}", h.topicSubmitMessage).Methods("POST") + topicRoutes.HandleFunc("/{topic}/queues/{queue}", h.topicSubscribe).Methods("PUT") + topicRoutes.HandleFunc("/{topic}/queues/{queue}", h.topicUnsubscribe).Methods("DELETE") + + queueRoutes.HandleFunc("/", h.queueList).Methods("GET") + queueRoutes.HandleFunc("/{queue}", h.queueDetails).Methods("GET") + queueRoutes.HandleFunc("/{queue}", h.queueConfigure).Methods("PATCH") + queueRoutes.HandleFunc("/{queue}", h.queueCreate).Methods("PUT") + queueRoutes.HandleFunc("/{queue}", h.queueDelete).Methods("DELETE") + queueRoutes.HandleFunc("/{queue}", h.queueSubmitMessage).Methods("POST") + queueRoutes.HandleFunc("/{queue}/{id}", h.queueGetMessage).Methods("GET") + queueRoutes.HandleFunc("/{queue}/{id}", h.queueDeleteMessage).Methods("DELETE") + queueRoutes.HandleFunc("/{queue}/poll/{num}", h.queuePollMessage).Methods("GET") + + http.Handle("/", router) + return h, nil +} + +// Listen is +func (api *HTTPApi) Listen() { + log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", api.port), nil)) +} From 7925c2740dad8fffda6baa5312ecd4a78260c3ab Mon Sep 17 00:00:00 2001 From: StabbyCutyou Date: Wed, 7 Oct 2015 21:27:27 -0400 Subject: [PATCH 02/12] Adding in Poll, Delete methods --- core/queue.go | 33 ++++++-- core/riak_service.go | 190 +++++++++++++++++++++++++++++++++++++++++++ dynamiq.go | 7 ++ 3 files changed, 225 insertions(+), 5 deletions(-) diff --git a/core/queue.go b/core/queue.go index 69e248e..57ce13f 100644 --- a/core/queue.go +++ b/core/queue.go @@ -1,8 +1,6 @@ package core import ( - "math" - "math/big" "sync" "github.com/StabbyCutyou/partition_ring" @@ -17,7 +15,8 @@ type Queue struct { Name string // the PartitionRing for this queue ring *partitionring.PartitionRing - + // the RiakService + riakService *RiakService // Mutex for protecting rw access to the Config object configLock sync.RWMutex // Individual settings for the queue @@ -44,5 +43,29 @@ const QueueDepthAprStatsSuffix = "approximate_depth.count" // QueueFillDeltaStatsSuffix const QueueFillDeltaStatsSuffix = "fill.count" -// MaxIDSize is -var MaxIDSize = *big.NewInt(math.MaxInt64) +// PollMessages does a range scan over the queue bucket and returns a map of message ids to bodies +func (queue *Queue) PollMessages(batchSize uint32) (map[string]string, error) { + lower, upper, err := queue.ring.ReserveNext() + if err != nil { + return nil, err + } + riakObjects, err := queue.riakService.RangeScanMessages(queue.Name, batchSize, lower, upper) + if err != nil { + return nil, err + } + results := make(map[string]string, len(riakObjects)) + for _, obj := range riakObjects { + results[obj.Key] = string(obj.Value) + } + return results, err +} + +// DeleteMessage is +func (queue *Queue) DeleteMessage(id string) (map[string]bool, error) { + return queue.DeleteMessages([]string{id}) +} + +// DeleteMessages is +func (queue *Queue) DeleteMessages(ids []string) (map[string]bool, error) { + return queue.riakService.DeleteMessages(queue.Name, ids) +} diff --git a/core/riak_service.go b/core/riak_service.go index 7551207..e0389f5 100644 --- a/core/riak_service.go +++ b/core/riak_service.go @@ -1,17 +1,29 @@ package core import ( + "crypto/rand" "fmt" + "math" + "math/big" + "sync" "time" "github.com/basho/riak-go-client" ) +// MaxMessageID is +var MaxMessageID = *big.NewInt(math.MaxInt64) + // RiakService is an abstraction over the client, to DRY up some common lookups type RiakService struct { Pool *riak.Client } +type deletedMessage struct { + key string + deleted bool +} + // NewRiakService creates a RiakService to use in the app func NewRiakService(addresses []string, port uint16) (*RiakService, error) { // Create a new Riak Client @@ -121,6 +133,184 @@ func (rs *RiakService) CreateQueueConfig(queueName string, values map[string]str return rs.CreateOrUpdateMap("config", queueConfigRecordName(queueName), op) } +func (rs *RiakService) RangeScanMessages(queueName string, numMessages uint32, lowerBound int64, upperBound int64) ([]*riak.Object, error) { + cmd, err := riak.NewSecondaryIndexQueryCommandBuilder(). + WithBucketType("messages"). + WithBucket(queueName). + WithIntRange(lowerBound, upperBound). + WithMaxResults(numMessages). + WithIndexName("$bucket").Build() + + if err != nil { + return nil, err + } + + if err = rs.Execute(cmd); err != nil { + return nil, err + } + + res := cmd.(*riak.SecondaryIndexQueryCommand) + if res.Error() != nil { + return nil, res.Error() + } + return rs.lookupMessagesForRangeScanResults(queueName, res.Response.Results) +} + +func (rs *RiakService) lookupMessagesForRangeScanResults(queueName string, results []*riak.SecondaryIndexQueryResult) ([]*riak.Object, error) { + objChan := make(chan []*riak.Object, len(results)) + wg := &sync.WaitGroup{} + wg.Add(len(results)) + for _, item := range results { + key := string(item.ObjectKey) + // Go wide with IO requests, use a channel to communicate with a consumer, below + go func(riakService *RiakService, w *sync.WaitGroup, c chan []*riak.Object, messageKey string) { + defer wg.Done() + cmd, err := riak.NewFetchValueCommandBuilder(). + WithBucketType("messages"). + WithBucket(queueName). + WithKey(messageKey).Build() + + if err != nil { + return // Can't do anything if there was an error. Probably good to log here? + } + + if err = riakService.Execute(cmd); err != nil { + return // Can't do anything? + } + + res := cmd.(*riak.FetchValueCommand) + if res.Error() != nil || res.Response.IsNotFound { + return // ? + } + + c <- res.Response.Values + return + }(rs, wg, objChan, key) + } + + // Kickoff the waitgroup to close the conn once they all report in + // Boy, I hope no weird goroutine scheduling stuff occurs or this could get...racey + go func(waitGroup *sync.WaitGroup, oChan chan []*riak.Object) { + waitGroup.Wait() + close(oChan) + }(wg, objChan) + + foundMessages := make([]*riak.Object, len(results)) + foundCount := 0 + for objs := range objChan { + if len(objs) > 1 { + for _, o := range objs[1:] { + // Rewrite the sibling back into the system + // Message siblings indicate unique id collisions, and should + // be re-published into the system for later delivery + _, err := rs.StoreMessage(queueName, string(o.Value)) + if err != nil { + // Couldn't save that Message + // That would mean it's lost + // need to incorporate a retry mechanic + } + } + } + + foundMessages[foundCount] = objs[0] + foundCount++ + } + + return foundMessages[:foundCount], nil +} + +func (rs *RiakService) StoreMessage(queueName string, message string) (string, error) { + randID, err := rand.Int(rand.Reader, &MaxMessageID) + if err != nil { + return "", err + } + + id := randID.String() + + obj := &riak.Object{ + ContentType: "application/json", + Charset: "utf-8", + ContentEncoding: "utf-8", + Value: []byte(message), + } + + cmd, err := riak.NewStoreValueCommandBuilder(). + WithBucketType("messages"). + WithBucket(queueName). + WithKey(id). + WithContent(obj).Build() + + if err != nil { + return "", err + } + + if err = rs.Execute(cmd); err != nil { + return "", err + } + + res := cmd.(*riak.StoreValueCommand) + if res.Error() != nil { + return "", err + } + + return id, nil +} + +func (rs *RiakService) DeleteMessage(queueName string, key string) (bool, error) { + cmd, err := riak.NewDeleteValueCommandBuilder(). + WithBucketType("messages"). + WithBucket(queueName). + WithKey(key).Build() + + if err != nil { + return false, err + } + + if err = rs.Execute(cmd); err != nil { + return false, err + } + + res := cmd.(*riak.DeleteValueCommand) + if res.Error() != nil { + return false, err + } + + return res.Response, nil + +} + +func (rs *RiakService) DeleteMessages(queueName string, keys []string) (map[string]bool, error) { + boolChan := make(chan *deletedMessage, len(keys)) + wg := &sync.WaitGroup{} + wg.Add(len(keys)) + + results := make(map[string]bool, len(keys)) + + for _, mKey := range keys { + go func(riakService *RiakService, w *sync.WaitGroup, c chan *deletedMessage, messageKey string) { + defer w.Done() + + deleted, err := riakService.DeleteMessage(queueName, messageKey) + if err != nil { + c <- &deletedMessage{key: messageKey, deleted: deleted} + } + return + }(rs, wg, boolChan, mKey) + } + + // Kickoff the waitgroup to close the conn once they all report in + // Boy, I hope no weird goroutine scheduling stuff occurs or this could get...racey + go func(waitGroup *sync.WaitGroup, c chan *deletedMessage) { + waitGroup.Wait() + close(c) + }(wg, boolChan) + + for obj := range boolChan { + results[obj.key] = obj.deleted + } + return results, nil +} + func queueConfigRecordName(queueName string) string { return fmt.Sprintf("queue_%s_config", queueName) } diff --git a/dynamiq.go b/dynamiq.go index f67e7ab..0407383 100644 --- a/dynamiq.go +++ b/dynamiq.go @@ -3,6 +3,7 @@ package main import ( "flag" "log" + "math" "github.com/Sirupsen/logrus" "github.com/Tapjoy/dynamiq/app" @@ -16,6 +17,12 @@ func main2() { log.Fatal(err) } + messages, err := cfg.Riak.Service.RangeScanMessages("tq1", 20, 0, math.MaxInt64) + if err != nil { + log.Println(err) + } + log.Println("Stuff") + log.Println(messages) httpServer, err := httpv2.New(cfg) if err != nil { log.Println(err) From a8072df27b254a29efd1ca90a4f53b813a3f2ea5 Mon Sep 17 00:00:00 2001 From: StabbyCutyou Date: Tue, 13 Oct 2015 21:19:48 -0400 Subject: [PATCH 03/12] Prototype of config sync working, some initial http stuff but thatll change, more of the internal apis fleshed out --- core/config.go | 229 +++++++++++++++++++- core/queue.go | 4 +- core/queues.go | 71 +++--- core/riak_service.go | 85 ++++++-- core/topic.go | 2 +- core/topics.go | 38 +++- dynamiq.go | 10 + server/endpoints/http/v2/status_handlers.go | 2 +- server/endpoints/http/v2/topic_handlers.go | 17 +- server/endpoints/http/v2/v2.go | 33 ++- 10 files changed, 404 insertions(+), 87 deletions(-) diff --git a/core/config.go b/core/config.go index 99054f3..ede2b17 100644 --- a/core/config.go +++ b/core/config.go @@ -1,6 +1,8 @@ package core import ( + "log" + "sync" "time" "github.com/Tapjoy/dynamiq/app/compressor" @@ -45,6 +47,10 @@ type Config struct { Stats *StatsConfig Compressor *compressor.Compressor Queues *Queues + Topics *Topics + // Channels / Timer for syncing the config + syncScheduler *time.Ticker + syncKiller chan bool } // GetConfig Parses and returns a config object @@ -63,7 +69,7 @@ func GetConfig() (*Config, error) { riakConfig := &RiakConfig{ Addresses: []string{"127.0.0.1"}, Port: 8087, - ConfigSyncInterval: 30 * time.Second, + ConfigSyncInterval: 2 * time.Second, } rs, err := NewRiakService(riakConfig.Addresses, riakConfig.Port) @@ -71,9 +77,220 @@ func GetConfig() (*Config, error) { return nil, err } riakConfig.Service = rs - return &Config{ - Riak: riakConfig, - Discovery: discoveryConfig, - HTTP: httpConfig, - }, nil + + t, err := LoadTopicsFromRiak(riakConfig) + if err != nil { + return nil, err + } + q, err := LoadQueuesFromRiak(riakConfig) + if err != nil { + return nil, err + } + + cfg := &Config{ + Riak: riakConfig, + Discovery: discoveryConfig, + HTTP: httpConfig, + Queues: q, + Topics: t, + syncScheduler: time.NewTicker(riakConfig.ConfigSyncInterval), + syncKiller: make(chan bool), + } + cfg.beginSync() + return cfg, nil +} + +func (cfg *Config) beginSync() { + // Go routine to listen to either the scheduler or the killer + go func(config *Config) { + for { + select { + // Check to see if we have a tick + case <-config.syncScheduler.C: + config.syncConfig() + // Check to see if we've been stopped + case <-config.syncKiller: + config.syncScheduler.Stop() + return + } + } + }(cfg) + return +} + +func (cfg *Config) syncConfig() error { + log.Println("Syncing") + // First - Refresh the list of topics and their metadata + tcfg, err := cfg.Riak.Service.GetTopicsConfigMap() + if err != nil { + if err == ErrConfigMapNotFound { + tcfg, err = cfg.Riak.Service.CreateTopicsConfigMap() + if err != nil { + return err + } + } + return err + } + var oldSet [][]byte + var ok bool + cfg.Topics.configLock.Lock() + defer cfg.Topics.configLock.Unlock() + + if oldSet, ok = tcfg.Sets["topics"]; !ok { + // There were no known topics ? + // No known topics should not prevent the entire sync + // just the topic portion + log.Println(ErrNoKnownTopics) + return ErrNoKnownTopics + } + + if len(oldSet) == 0 { + log.Println(ErrNoKnownTopics) + + return ErrNoKnownTopics + } + + cfg.Topics.Config = tcfg + var newSet [][]byte + if newSet, ok = cfg.Topics.Config.Sets["topics"]; !ok { + log.Println(ErrNoKnownTopics) + + return ErrNoKnownTopics + } + + // If a topic exists in topicsToKeep... + // ... and does exist in Topics, update it's config Object + // ... and does not exist in Topics, initialize it from Riak + // Else + // ... It is not in toKeep and is in Topics, remove it + + topicsToKeep := make(map[string]bool) + for _, topic := range newSet { + tName := string(topic) + // Record this so we know who to evict + topicsToKeep[tName] = true + // Add or Update the topic to the known set + var t *Topic + if t, ok = cfg.Topics.KnownTopics[tName]; !ok { + // Didn't exist in memory, so create it + // TODO centralize this, don't just re-write initialization logic + t = &Topic{ + Name: tName, + configLock: sync.RWMutex{}, + } + } + // get the config and set it on the topic + topcfg, err := cfg.Riak.Service.GetTopicConfigMap(tName) + if err != nil { + // If the config object isn't found - no big deal + // Only config presently is a list of queues, the object will + // be created when the first queue is set + log.Println(err, tName) + } + t.Config = topcfg + cfg.Topics.KnownTopics[tName] = t + } + + // Go through the old list of topics + for _, topic := range oldSet { + tName := string(topic) + if _, ok := topicsToKeep[tName]; !ok { + // It wasn't in the old list, evict it + delete(cfg.Topics.KnownTopics, tName) + } + } + + // Now, do the above but for the queues. Added effect - any queue removed + // must also be removed from a topic + + qcfg, err := cfg.Riak.Service.GetQueuesConfigMap() + if err != nil { + if err != nil { + if err == ErrConfigMapNotFound { + qcfg, err = cfg.Riak.Service.CreateQueuesConfigMap() + if err != nil { + return err + } + } + return err + } + } + + cfg.Queues.configLock.Lock() + defer cfg.Queues.configLock.Unlock() + if oldSet, ok = qcfg.Sets["queues"]; !ok { + // There were no known topics ? + // No known topics should not prevent the entire sync + // just the topic portion + log.Println(ErrNoKnownQueues) + return ErrNoKnownQueues + } + + if len(oldSet) == 0 { + log.Println(ErrNoKnownQueues) + return ErrNoKnownQueues + } + + cfg.Queues.Config = qcfg + if newSet, ok = cfg.Queues.Config.Sets["queues"]; !ok { + log.Println(ErrNoKnownQueues) + return ErrNoKnownQueues + } + + queuesToRemove := make([]string, 0) + queuesToKeep := make(map[string]bool) + + for _, queue := range newSet { + qName := string(queue) + // Record this so we know who to evict + topicsToKeep[qName] = true + // Add or Update the topic to the known set + var q *Queue + if q, ok = cfg.Queues.KnownQueues[qName]; !ok { + // Didn't exist in memory, so create it + // TODO centralize this, don't just re-write initialization logic + q = &Queue{ + Name: qName, + configLock: sync.RWMutex{}, + } + } + // get the config and set it on the topic + queuecfg, err := cfg.Riak.Service.GetQueueConfigMap(qName) + if err != nil { + log.Println(err, qName) + return err + } + q.Config = queuecfg + cfg.Queues.KnownQueues[qName] = q + } + + // Go through the old list of queues + for _, queue := range oldSet { + qName := string(queue) + if _, ok := queuesToKeep[qName]; !ok { + // It wasn't in the old list, evict it + delete(cfg.Queues.KnownQueues, qName) + } else { + queuesToRemove = append(queuesToRemove, qName) + } + } + + // Go through any evicted queues, and remove them from topic subscriptions + // 3 loops... there has to be a better way to manage all of this + for _, queue := range queuesToRemove { + for tName, topic := range cfg.Topics.KnownTopics { + for _, q := range topic.Config.Sets["queues"] { + qName := string(q) + if qName == queue { + _, err := cfg.Riak.Service.UpdateTopicSubscription(tName, qName, false) + if err != nil { + log.Println(err) + return err + } + } + } + } + } + log.Println("Syncing Done") + return nil } diff --git a/core/queue.go b/core/queue.go index 57ce13f..9bcea5c 100644 --- a/core/queue.go +++ b/core/queue.go @@ -4,7 +4,7 @@ import ( "sync" "github.com/StabbyCutyou/partition_ring" - "github.com/tpjg/goriakpbc" + "github.com/basho/riak-go-client" ) // Queue represents a bucket in Riak used to hold messages, and the behaviors that @@ -20,7 +20,7 @@ type Queue struct { // Mutex for protecting rw access to the Config object configLock sync.RWMutex // Individual settings for the queue - Config *riak.RDtMap + Config *riak.Map } // Define statistics keys suffixes diff --git a/core/queues.go b/core/queues.go index ebb4349..a4e18a5 100644 --- a/core/queues.go +++ b/core/queues.go @@ -4,7 +4,6 @@ package core import ( "errors" "sync" - "time" "github.com/basho/riak-go-client" ) @@ -13,16 +12,7 @@ import ( const VisibilityTimeout = "visibility_timeout" // PartitionCount is -const PartitionCount = "partition_count" - -// MinPartitions is the name of the config setting name for controlling the minimum number of partitions per queue -const MinPartitions = "min_partitions" - -// MaxPartitions is the name of the config setting name for controlling the maximum number of partitions per node -const MaxPartitions = "max_partitions" - -// MaxPartitionAge is the name of the config setting name for controlling how long an un-used partition should exist -const MaxPartitionAge = "max_partition_age" +const PartitionStep = "partition_step" // CompressedMessages is the name of the config setting name for controlling if the queue is using compression or not const CompressedMessages = "compressed_messages" @@ -32,43 +22,39 @@ var ( ErrQueueAlreadyExists = errors.New("Queue already exists") // ErrConfigMapNotFound is ErrConfigMapNotFound = errors.New("The Config map in Riak has not yet been created") + //ErrNoKnownQueues + ErrNoKnownQueues = errors.New("There are no known queues in the system") // Settings Arrays and maps cannot be made immutable in golang - Settings = [...]string{VisibilityTimeout, PartitionCount, MinPartitions, MaxPartitions, CompressedMessages} - + Settings = [...]string{VisibilityTimeout, PartitionStep, CompressedMessages} // DefaultSettings is - DefaultSettings = map[string]string{VisibilityTimeout: "30", PartitionCount: "5", MinPartitions: "1", MaxPartitions: "10", CompressedMessages: "false"} + DefaultSettings = map[string]string{VisibilityTimeout: "30", PartitionStep: "5000000", CompressedMessages: "false"} ) // Queues represents a collection of Queue objects, and the behaviors that may be // taken over a collection of such objects type Queues struct { - // a container for all queues - QueueMap map[string]*Queue - // Channels / Timer for syncing the config - syncScheduler *time.Ticker - syncKiller chan bool // Reference to shared riak client riakService *RiakService // Mutex for protecting rw access to the Config object configLock sync.RWMutex // Settings for Queues in general, ie queue list Config *riak.Map + // a container for all queues + KnownQueues map[string]*Queue } // LoadQueuesFromRiak is -func LoadQueuesFromRiak(cfg *Config) (*Queues, error) { +func LoadQueuesFromRiak(cfg *RiakConfig) (*Queues, error) { queues := &Queues{ - QueueMap: make(map[string]*Queue), - syncScheduler: time.NewTicker(cfg.Riak.ConfigSyncInterval), - syncKiller: make(chan bool, 0), - riakService: cfg.Riak.Service, - configLock: sync.RWMutex{}, + KnownQueues: make(map[string]*Queue), + riakService: cfg.Service, + configLock: sync.RWMutex{}, } - m, err := queues.riakService.GetQueueConfigMap() + m, err := queues.riakService.GetQueuesConfigMap() if err == ErrConfigMapNotFound { - m, err = queues.riakService.CreateQueueConfigMap() + m, err = queues.riakService.CreateQueuesConfigMap() if err != nil { return nil, err } @@ -76,13 +62,18 @@ func LoadQueuesFromRiak(cfg *Config) (*Queues, error) { return nil, err } queues.Config = m + + // TODO + // Initialize a Queue object for each one in the set + // Store it by name in KnownQueus + return queues, nil } // Create will register a new queue with the default config // This queue will be available to be used once all the nodes have had their config // refreshed -func (queues *Queues) Create(queueName string) (bool, error) { +func (queues *Queues) Create(queueName string, options map[string]string) (bool, error) { // This function intentionally not optimized because it should // not be a high-throughput operation @@ -96,10 +87,28 @@ func (queues *Queues) Create(queueName string) (bool, error) { // build the operation to update the set op := &riak.MapOperation{} op.AddToSet("queues", []byte(queueName)) - _, err := queues.riakService.CreateOrUpdateMap("config", "queues_config", op) + _, err := queues.riakService.CreateOrUpdateMap("config", "queues_config", []*riak.MapOperation{op}) if err != nil { return false, err } + + cfgOps := make([]*riak.MapOperation, 0) + // Create the config + for name, defaultValue := range DefaultSettings { + cOp := &riak.MapOperation{} + if val, ok := options[name]; ok { + cOp.SetRegister(name, []byte(val)) + } else { + cOp.SetRegister(name, []byte(defaultValue)) + } + cfgOps = append(cfgOps, cOp) + } + + _, err = queues.riakService.CreateOrUpdateMap("config", queueConfigRecordName(queueName), cfgOps) + if err != nil { + return false, err + } + return true, nil } @@ -109,12 +118,14 @@ func (queues *Queues) Create(queueName string) (bool, error) { func (queues *Queues) Delete(queueName string) (bool, error) { // This function intentionally not optimized because it should // not be a high-throughput operation + + // Also, not implemented yet! return false, nil } // Exists checks is the given queue name is already created or not func (queues *Queues) Exists(queueName string) (bool, error) { - m, err := queues.riakService.GetQueueConfigMap() + m, err := queues.riakService.GetQueuesConfigMap() if err != nil { return false, err } diff --git a/core/riak_service.go b/core/riak_service.go index e0389f5..84f6c05 100644 --- a/core/riak_service.go +++ b/core/riak_service.go @@ -49,29 +49,29 @@ func (rs *RiakService) Execute(cmd riak.Command) error { } // GetQueueConfigMap loads the primary queue configuration data from Riak, as a native riak.Map -func (rs *RiakService) GetQueueConfigMap() (*riak.Map, error) { +func (rs *RiakService) GetQueuesConfigMap() (*riak.Map, error) { return rs.GetMap("queues_config") } // GetTopicConfigMap loads the primary topic configuration data from Riak, as a native riak.Map -func (rs *RiakService) GetTopicConfigMap() (*riak.Map, error) { +func (rs *RiakService) GetTopicsConfigMap() (*riak.Map, error) { return rs.GetMap("topics_config") } // CreateQueueConfigMap is -func (rs *RiakService) CreateQueueConfigMap() (*riak.Map, error) { +func (rs *RiakService) CreateQueuesConfigMap() (*riak.Map, error) { op := &riak.MapOperation{} op.SetRegister("created", []byte(time.Now().String())) - return rs.CreateOrUpdateMap("config", "queues_config", op) + return rs.CreateOrUpdateMap("config", "queues_config", []*riak.MapOperation{op}) } // CreateTopicConfigMap is -func (rs *RiakService) CreateTopicConfigMap() (*riak.Map, error) { +func (rs *RiakService) CreateTopicsConfigMap() (*riak.Map, error) { op := &riak.MapOperation{} op.SetRegister("created", []byte(time.Now().String())) - return rs.CreateOrUpdateMap("config", "topics_config", op) + return rs.CreateOrUpdateMap("config", "topics_config", []*riak.MapOperation{op}) } // GetMap loads a CRDT Map from Riak @@ -103,13 +103,16 @@ func (rs *RiakService) GetMap(name string) (*riak.Map, error) { } // CreateOrUpdateMap does exactly that because thats what the riak lib allows for -func (rs *RiakService) CreateOrUpdateMap(bucket string, key string, op *riak.MapOperation) (*riak.Map, error) { - cmd, err := riak.NewUpdateMapCommandBuilder(). +func (rs *RiakService) CreateOrUpdateMap(bucket string, key string, ops []*riak.MapOperation) (*riak.Map, error) { + cBuilder := riak.NewUpdateMapCommandBuilder(). WithBucketType("maps"). WithBucket(bucket). - WithMapOperation(op). WithReturnBody(true). - WithKey(key).Build() + WithKey(key) + for _, op := range ops { + cBuilder = cBuilder.WithMapOperation(op) + } + cmd, err := cBuilder.Build() if err != nil { return nil, err @@ -130,9 +133,10 @@ func (rs *RiakService) CreateQueueConfig(queueName string, values map[string]str op.SetRegister(key, []byte(value)) } - return rs.CreateOrUpdateMap("config", queueConfigRecordName(queueName), op) + return rs.CreateOrUpdateMap("config", queueConfigRecordName(queueName), []*riak.MapOperation{op}) } +// RangeScanMessages is func (rs *RiakService) RangeScanMessages(queueName string, numMessages uint32, lowerBound int64, upperBound int64) ([]*riak.Object, error) { cmd, err := riak.NewSecondaryIndexQueryCommandBuilder(). WithBucketType("messages"). @@ -157,8 +161,11 @@ func (rs *RiakService) RangeScanMessages(queueName string, numMessages uint32, l } func (rs *RiakService) lookupMessagesForRangeScanResults(queueName string, results []*riak.SecondaryIndexQueryResult) ([]*riak.Object, error) { + // Channel for holding the results of the io calls objChan := make(chan []*riak.Object, len(results)) + // Waitgroup to gate the function completing wg := &sync.WaitGroup{} + // Seed it with the expected number of ops wg.Add(len(results)) for _, item := range results { key := string(item.ObjectKey) @@ -195,7 +202,9 @@ func (rs *RiakService) lookupMessagesForRangeScanResults(queueName string, resul close(oChan) }(wg, objChan) + // Allocate it all up front to save some time foundMessages := make([]*riak.Object, len(results)) + // Keep track of how many actually returned, for later foundCount := 0 for objs := range objChan { if len(objs) > 1 { @@ -215,7 +224,7 @@ func (rs *RiakService) lookupMessagesForRangeScanResults(queueName string, resul foundMessages[foundCount] = objs[0] foundCount++ } - + // Return only the slice of messages found return foundMessages[:foundCount], nil } @@ -280,18 +289,23 @@ func (rs *RiakService) DeleteMessage(queueName string, key string) (bool, error) } func (rs *RiakService) DeleteMessages(queueName string, keys []string) (map[string]bool, error) { + // Channel for holding the results of the io calls boolChan := make(chan *deletedMessage, len(keys)) + // Waitgroup to gate the function completing wg := &sync.WaitGroup{} + // Seed it with the expected number of ops wg.Add(len(keys)) results := make(map[string]bool, len(keys)) for _, mKey := range keys { + // Kick off a go routine to delete the message go func(riakService *RiakService, w *sync.WaitGroup, c chan *deletedMessage, messageKey string) { defer w.Done() deleted, err := riakService.DeleteMessage(queueName, messageKey) if err != nil { + // Pop the results onto the channel c <- &deletedMessage{key: messageKey, deleted: deleted} } return @@ -305,16 +319,59 @@ func (rs *RiakService) DeleteMessages(queueName string, keys []string) (map[stri close(c) }(wg, boolChan) + // Harvest until the channel closes for obj := range boolChan { results[obj.key] = obj.deleted } return results, nil } +func (rs *RiakService) UpdateTopicSubscription(topicName string, queueName string, addQueue bool) (bool, error) { + // Probably less awkward to just have the calling code build the cmd but... + // I'm trying to keep all the riak stuff in one place + + // Also - this needs to verify both exist before attempting to map them + // It doesn't, currently + op := &riak.MapOperation{} + if addQueue { + op.AddToSet("queues", []byte(queueName)) + } else { + op.RemoveFromSet("queues", []byte(queueName)) + } + cmd, err := riak.NewUpdateMapCommandBuilder(). + WithBucketType("maps"). + WithBucket("config"). + WithKey(topicConfigRecordName(topicName)). + WithMapOperation(op).Build() + + if err != nil { + return false, err + } + + if err = rs.Execute(cmd); err != nil { + return false, err + } + res := cmd.(*riak.UpdateMapCommand) + if res.Error() != nil { + return false, res.Error() + } + + // If the op is a success, its a success + return res.Success(), nil +} + +func (rs *RiakService) GetQueueConfigMap(queueName string) (*riak.Map, error) { + return rs.GetMap(queueConfigRecordName(queueName)) +} + +func (rs *RiakService) GetTopicConfigMap(topicName string) (*riak.Map, error) { + return rs.GetMap(topicConfigRecordName(topicName)) +} + func queueConfigRecordName(queueName string) string { return fmt.Sprintf("queue_%s_config", queueName) } -func topicConfigRecordName(queueName string) string { - return fmt.Sprintf("topic_%s_config", queueName) +func topicConfigRecordName(topicName string) string { + return fmt.Sprintf("topic_%s_config", topicName) } diff --git a/core/topic.go b/core/topic.go index 4850878..292cefa 100644 --- a/core/topic.go +++ b/core/topic.go @@ -12,7 +12,7 @@ type Topic struct { // the definition of a queue // name of the queue Name string - + // RiakService for interacting with Riak // Mutex for protecting rw access to the Config object configLock sync.RWMutex // Individual settings for the queue diff --git a/core/topics.go b/core/topics.go index e36d80d..1c9f765 100644 --- a/core/topics.go +++ b/core/topics.go @@ -1,6 +1,7 @@ package core import ( + "errors" "sync" "time" @@ -14,24 +15,29 @@ type Topics struct { syncScheduler *time.Ticker syncKiller chan bool - configLock sync.RWMutex - Config *riak.Map - TopicMap map[string]*Topic + configLock sync.RWMutex + Config *riak.Map + KnownTopics map[string]*Topic } +var ( + //ErrNoKnownTopics + ErrNoKnownTopics = errors.New("There are no known topics in the system") +) + // LoadTopicsFromRiak is -func LoadTopicsFromRiak(cfg *Config) (*Topics, error) { +func LoadTopicsFromRiak(cfg *RiakConfig) (*Topics, error) { topics := &Topics{ - TopicMap: make(map[string]*Topic), - syncScheduler: time.NewTicker(cfg.Riak.ConfigSyncInterval), + KnownTopics: make(map[string]*Topic), + syncScheduler: time.NewTicker(cfg.ConfigSyncInterval), syncKiller: make(chan bool, 0), - riakService: cfg.Riak.Service, + riakService: cfg.Service, configLock: sync.RWMutex{}, } - m, err := topics.riakService.GetTopicConfigMap() + m, err := topics.riakService.GetTopicsConfigMap() if err == ErrConfigMapNotFound { - m, err = topics.riakService.CreateTopicConfigMap() + m, err = topics.riakService.CreateTopicsConfigMap() if err != nil { return nil, err } @@ -58,7 +64,7 @@ func (topics *Topics) Create(queueName string) (bool, error) { // build the operation to update the set op := &riak.MapOperation{} op.AddToSet("topics", []byte(queueName)) - _, err := topics.riakService.CreateOrUpdateMap("config", "topics_config", op) + _, err := topics.riakService.CreateOrUpdateMap("config", "topics_config", []*riak.MapOperation{op}) if err != nil { return false, err } @@ -75,7 +81,7 @@ func (topics *Topics) Delete(queueName string) (bool, error) { // Exists checks is the given topic name is already created or not func (topics *Topics) Exists(topicName string) (bool, error) { - m, err := topics.riakService.GetTopicConfigMap() + m, err := topics.riakService.GetTopicsConfigMap() if err != nil { return false, err } @@ -88,3 +94,13 @@ func (topics *Topics) Exists(topicName string) (bool, error) { } return false, nil } + +func (topics *Topics) SubscribeQueue(topicName string, queueName string) (bool, error) { + // Add the queue to the topic + return topics.riakService.UpdateTopicSubscription(topicName, queueName, true) +} + +func (topics *Topics) UnsubscribeQueue(topicName string, queueName string) (bool, error) { + // Remove the queue from the topic + return topics.riakService.UpdateTopicSubscription(topicName, queueName, false) +} diff --git a/dynamiq.go b/dynamiq.go index 0407383..fdcddda 100644 --- a/dynamiq.go +++ b/dynamiq.go @@ -17,6 +17,16 @@ func main2() { log.Fatal(err) } + ok, err := cfg.Queues.Create("tq1", make(map[string]string)) + if !ok || err != nil { + log.Fatal(err) + } + + ok, err = cfg.Topics.Create("tt1") + if !ok || err != nil { + log.Fatal(err) + } + messages, err := cfg.Riak.Service.RangeScanMessages("tq1", 20, 0, math.MaxInt64) if err != nil { log.Println(err) diff --git a/server/endpoints/http/v2/status_handlers.go b/server/endpoints/http/v2/status_handlers.go index fecdee2..e7107cb 100644 --- a/server/endpoints/http/v2/status_handlers.go +++ b/server/endpoints/http/v2/status_handlers.go @@ -8,7 +8,7 @@ import ( func (h *HTTPApi) statusServers(w http.ResponseWriter, r *http.Request) { response := make([]string, 0) - for _, member := range h.memberList.Members() { + for _, member := range h.context.Discovery.Memberlist.Members() { response = append(response, fmt.Sprintf("Member: %s %s", member.Name, member.Addr)) } w.WriteHeader(http.StatusOK) diff --git a/server/endpoints/http/v2/topic_handlers.go b/server/endpoints/http/v2/topic_handlers.go index 8cad23f..66569f2 100644 --- a/server/endpoints/http/v2/topic_handlers.go +++ b/server/endpoints/http/v2/topic_handlers.go @@ -1,9 +1,14 @@ package httpv2 -import "net/http" +import ( + "net/http" -func (h *HTTPApi) topicList(w http.ResponseWriter, r *http.Request) { + "github.com/gorilla/mux" +) +func (h *HTTPApi) topicList(w http.ResponseWriter, r *http.Request) { + // TODO need to move topics/queues into a config manager to lock their access + // due to the goroutines that update and need to lock them } func (h *HTTPApi) topicDetails(w http.ResponseWriter, r *http.Request) { @@ -11,7 +16,13 @@ func (h *HTTPApi) topicDetails(w http.ResponseWriter, r *http.Request) { } func (h *HTTPApi) topicCreate(w http.ResponseWriter, r *http.Request) { - + topicName := mux.Vars(r)["topic"] + res, err := h.context.Topics.Create(topicName) + if err != nil { + // return 500 for now - should return contextually correct errors otherwise + errorResponse(w, err) + } + response(w, map[string]interface{}{"topic": topicName, "created": res}) } func (h *HTTPApi) topicDelete(w http.ResponseWriter, r *http.Request) { diff --git a/server/endpoints/http/v2/v2.go b/server/endpoints/http/v2/v2.go index d4ee3eb..bbe25f4 100644 --- a/server/endpoints/http/v2/v2.go +++ b/server/endpoints/http/v2/v2.go @@ -1,38 +1,24 @@ package httpv2 import ( + "encoding/json" "fmt" "log" "net/http" "github.com/Tapjoy/dynamiq/core" "github.com/gorilla/mux" - "github.com/hashicorp/memberlist" ) // HTTPApi represents the object used to govern http calls into the system type HTTPApi struct { - memberList *memberlist.Memberlist - topics *core.Topics - queues *core.Queues - port uint16 + context *core.Config } // New initializes a new func New(cfg *core.Config) (*HTTPApi, error) { - t, err := core.LoadTopicsFromRiak(cfg) - if err != nil { - return nil, err - } - q, err := core.LoadQueuesFromRiak(cfg) - if err != nil { - return nil, err - } h := &HTTPApi{ - memberList: cfg.Discovery.Memberlist, - topics: t, - queues: q, - port: cfg.HTTP.Port, + context: cfg, } router := mux.NewRouter().PathPrefix("/v2").Subrouter() @@ -66,6 +52,15 @@ func New(cfg *core.Config) (*HTTPApi, error) { } // Listen is -func (api *HTTPApi) Listen() { - log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", api.port), nil)) +func (h *HTTPApi) Listen() { + log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", h.context.HTTP.Port), nil)) +} + +func response(w http.ResponseWriter, responsePayload map[string]interface{}) { + json.NewEncoder(w).Encode(responsePayload) +} + +func errorResponse(w http.ResponseWriter, err error) { + w.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(w).Encode(map[string]error{"error": err}) } From 4becb85cb6ca2460da9387779d37f89a5677ac0a Mon Sep 17 00:00:00 2001 From: StabbyCutyou Date: Mon, 2 Nov 2015 20:15:58 -0500 Subject: [PATCH 04/12] Filling in topic http routes, topic broadcast logic --- core/config.go | 29 ++++++++ core/topic.go | 11 +++ core/topics.go | 16 ++++ dynamiq.go | 2 +- server/endpoints/http/v2/topic_handlers.go | 42 ----------- .../{endpoints => }/http/v2/queue_handlers.go | 0 .../http/v2/status_handlers.go | 0 server/http/v2/topic_handlers.go | 73 +++++++++++++++++++ server/{endpoints => }/http/v2/v2.go | 3 +- 9 files changed, 132 insertions(+), 44 deletions(-) delete mode 100644 server/endpoints/http/v2/topic_handlers.go rename server/{endpoints => }/http/v2/queue_handlers.go (100%) rename server/{endpoints => }/http/v2/status_handlers.go (100%) create mode 100644 server/http/v2/topic_handlers.go rename server/{endpoints => }/http/v2/v2.go (95%) diff --git a/core/config.go b/core/config.go index ede2b17..0d91013 100644 --- a/core/config.go +++ b/core/config.go @@ -1,6 +1,7 @@ package core import ( + "errors" "log" "sync" "time" @@ -53,6 +54,11 @@ type Config struct { syncKiller chan bool } +var ( + //ErrUnknownTopic is + ErrUnknownTopic = errors.New("There is no known topic by that name") +) + // GetConfig Parses and returns a config object func GetConfig() (*Config, error) { // TODO settle on an actual config package @@ -100,6 +106,29 @@ func GetConfig() (*Config, error) { return cfg, nil } +// TopicNames is +func (cfg *Config) TopicNames() []string { + cfg.Topics.configLock.RLock() + list := make([]string, 0) + + for name := range cfg.Topics.KnownTopics { + list = append(list, name) + } + cfg.Topics.configLock.RUnlock() + return list +} + +// GetTopic is +func (cfg *Config) GetTopic(name string) (*Topic, error) { + cfg.Topics.configLock.RLock() + t, ok := cfg.Topics.KnownTopics[name] + cfg.Topics.configLock.RUnlock() + if ok { + return t, nil + } + return nil, ErrUnknownTopic +} + func (cfg *Config) beginSync() { // Go routine to listen to either the scheduler or the killer go func(config *Config) { diff --git a/core/topic.go b/core/topic.go index 292cefa..2510ae2 100644 --- a/core/topic.go +++ b/core/topic.go @@ -18,3 +18,14 @@ type Topic struct { // Individual settings for the queue Config *riak.Map } + +// GetQueueNames is +func (t *Topic) GetQueueNames() []string { + t.configLock.RLock() + names := make([]string, 0) + for _, q := range t.Config.Sets["queues"] { + names = append(names, string(q)) + } + t.configLock.RUnlock() + return names +} diff --git a/core/topics.go b/core/topics.go index 1c9f765..efe21a3 100644 --- a/core/topics.go +++ b/core/topics.go @@ -104,3 +104,19 @@ func (topics *Topics) UnsubscribeQueue(topicName string, queueName string) (bool // Remove the queue from the topic return topics.riakService.UpdateTopicSubscription(topicName, queueName, false) } + +func (t *Topics) BroadcastMessage(topicName string, data string) []map[string]interface{} { + t.configLock.RLock() + results := make([]map[string]interface{}, len(t.Config.Sets["queues"])) + for _, q := range t.Config.Sets["queues"] { + queueName := string(q) + id, err := t.riakService.StoreMessage(queueName, data) + result := map[string]interface{}{"id": id} + if err != nil { + result[queueName] = err.Error() + } + results = append(results, result) + } + t.configLock.RUnlock() + return results +} diff --git a/dynamiq.go b/dynamiq.go index fdcddda..3759df1 100644 --- a/dynamiq.go +++ b/dynamiq.go @@ -8,7 +8,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/Tapjoy/dynamiq/app" "github.com/Tapjoy/dynamiq/core" - "github.com/Tapjoy/dynamiq/server/endpoints/http/v2" + "github.com/Tapjoy/dynamiq/server/http/v2" ) func main2() { diff --git a/server/endpoints/http/v2/topic_handlers.go b/server/endpoints/http/v2/topic_handlers.go deleted file mode 100644 index 66569f2..0000000 --- a/server/endpoints/http/v2/topic_handlers.go +++ /dev/null @@ -1,42 +0,0 @@ -package httpv2 - -import ( - "net/http" - - "github.com/gorilla/mux" -) - -func (h *HTTPApi) topicList(w http.ResponseWriter, r *http.Request) { - // TODO need to move topics/queues into a config manager to lock their access - // due to the goroutines that update and need to lock them -} - -func (h *HTTPApi) topicDetails(w http.ResponseWriter, r *http.Request) { - -} - -func (h *HTTPApi) topicCreate(w http.ResponseWriter, r *http.Request) { - topicName := mux.Vars(r)["topic"] - res, err := h.context.Topics.Create(topicName) - if err != nil { - // return 500 for now - should return contextually correct errors otherwise - errorResponse(w, err) - } - response(w, map[string]interface{}{"topic": topicName, "created": res}) -} - -func (h *HTTPApi) topicDelete(w http.ResponseWriter, r *http.Request) { - -} - -func (h *HTTPApi) topicSubmitMessage(w http.ResponseWriter, r *http.Request) { - -} - -func (h *HTTPApi) topicSubscribe(w http.ResponseWriter, r *http.Request) { - -} - -func (h *HTTPApi) topicUnsubscribe(w http.ResponseWriter, r *http.Request) { - -} diff --git a/server/endpoints/http/v2/queue_handlers.go b/server/http/v2/queue_handlers.go similarity index 100% rename from server/endpoints/http/v2/queue_handlers.go rename to server/http/v2/queue_handlers.go diff --git a/server/endpoints/http/v2/status_handlers.go b/server/http/v2/status_handlers.go similarity index 100% rename from server/endpoints/http/v2/status_handlers.go rename to server/http/v2/status_handlers.go diff --git a/server/http/v2/topic_handlers.go b/server/http/v2/topic_handlers.go new file mode 100644 index 0000000..d8f0e69 --- /dev/null +++ b/server/http/v2/topic_handlers.go @@ -0,0 +1,73 @@ +package httpv2 + +import ( + "encoding/json" + "net/http" + + "github.com/gorilla/mux" +) + +func (h *HTTPApi) topicList(w http.ResponseWriter, r *http.Request) { + // TODO need to move topics/queues into a config manager to lock their access + // due to the goroutines that update and need to lock them + response(w, http.StatusOK, map[string]interface{}{"topics": h.context.TopicNames()}) +} + +func (h *HTTPApi) topicDetails(w http.ResponseWriter, r *http.Request) { + topicName := mux.Vars(r)["topic"] + t, err := h.context.GetTopic(topicName) + if err != nil { + errorResponse(w, err) + } + response(w, http.StatusOK, map[string]interface{}{"topic": topicName, "queues": t.GetQueueNames()}) +} + +func (h *HTTPApi) topicCreate(w http.ResponseWriter, r *http.Request) { + topicName := mux.Vars(r)["topic"] + res, err := h.context.Topics.Create(topicName) + if err != nil { + // return 500 for now - should return contextually correct errors otherwise + errorResponse(w, err) + } + response(w, http.StatusOK, map[string]interface{}{"topic": topicName, "created": res}) +} + +func (h *HTTPApi) topicDelete(w http.ResponseWriter, r *http.Request) { + +} + +func (h *HTTPApi) topicSubmitMessage(w http.ResponseWriter, r *http.Request) { + topicName := mux.Vars(r)["topic"] + msgData := "" // TODO read from body + + results := h.context.Topics.BroadcastMessage(topicName, msgData) + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(results) +} + +func (h *HTTPApi) topicSubscribe(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + topicName := vars["topic"] + queueName := vars["queue"] + + ok, err := h.context.Topics.SubscribeQueue(topicName, queueName) + resp := map[string]interface{}{"topic": topicName, "queue": queueName, "subscribed": ok} + if err != nil { + resp["error"] = err.Error() + } + response(w, http.StatusOK, resp) +} + +func (h *HTTPApi) topicUnsubscribe(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + topicName := vars["topic"] + queueName := vars["queue"] + + ok, err := h.context.Topics.UnsubscribeQueue(topicName, queueName) + resp := map[string]interface{}{"topic": topicName, "queue": queueName, "unsubscribed": ok} + if err != nil { + resp["error"] = err.Error() + } + response(w, http.StatusOK, resp) +} diff --git a/server/endpoints/http/v2/v2.go b/server/http/v2/v2.go similarity index 95% rename from server/endpoints/http/v2/v2.go rename to server/http/v2/v2.go index bbe25f4..8a3f4b7 100644 --- a/server/endpoints/http/v2/v2.go +++ b/server/http/v2/v2.go @@ -56,7 +56,8 @@ func (h *HTTPApi) Listen() { log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", h.context.HTTP.Port), nil)) } -func response(w http.ResponseWriter, responsePayload map[string]interface{}) { +func response(w http.ResponseWriter, status int, responsePayload map[string]interface{}) { + w.WriteHeader(status) json.NewEncoder(w).Encode(responsePayload) } From bcaa0fc8db3202bb5e541505bf263ef9a29d72fc Mon Sep 17 00:00:00 2001 From: StabbyCutyou Date: Mon, 2 Nov 2015 21:15:59 -0500 Subject: [PATCH 05/12] Filling in some of the queue http endpoints --- core/config.go | 26 ++++++++++++++++++++ core/queue.go | 10 -------- core/queues.go | 17 +++++++++++++ server/http/v2/queue_handlers.go | 41 ++++++++++++++++++++++++++++---- server/http/v2/topic_handlers.go | 2 -- 5 files changed, 80 insertions(+), 16 deletions(-) diff --git a/core/config.go b/core/config.go index 0d91013..3a0b8ec 100644 --- a/core/config.go +++ b/core/config.go @@ -129,6 +129,32 @@ func (cfg *Config) GetTopic(name string) (*Topic, error) { return nil, ErrUnknownTopic } +// TopicNames is +func (cfg *Config) QueueNames() []string { + cfg.Queues.configLock.RLock() + list := make([]string, 0) + + for name := range cfg.Queues.KnownQueues { + list = append(list, name) + } + cfg.Queues.configLock.RUnlock() + return list +} + +func (cfg *Config) GetQueueConfig(name string) (map[string]string, error) { + cfg.Queues.configLock.RLock() + m, err := cfg.Riak.Service.GetQueueConfigMap(name) + if err != nil { + return nil, err + } + results := make(map[string]string, len(m.Registers)) + for k, v := range m.Registers { + results[k] = string(v) + } + cfg.Queues.configLock.RUnlock() + return results, nil +} + func (cfg *Config) beginSync() { // Go routine to listen to either the scheduler or the killer go func(config *Config) { diff --git a/core/queue.go b/core/queue.go index 9bcea5c..38e55a5 100644 --- a/core/queue.go +++ b/core/queue.go @@ -59,13 +59,3 @@ func (queue *Queue) PollMessages(batchSize uint32) (map[string]string, error) { } return results, err } - -// DeleteMessage is -func (queue *Queue) DeleteMessage(id string) (map[string]bool, error) { - return queue.DeleteMessages([]string{id}) -} - -// DeleteMessages is -func (queue *Queue) DeleteMessages(ids []string) (map[string]bool, error) { - return queue.riakService.DeleteMessages(queue.Name, ids) -} diff --git a/core/queues.go b/core/queues.go index a4e18a5..aa0a370 100644 --- a/core/queues.go +++ b/core/queues.go @@ -140,3 +140,20 @@ func (queues *Queues) Exists(queueName string) (bool, error) { return false, nil } + +func (q *Queues) SaveMessage(queueName string, data string) (string, error) { + q.configLock.RLock() + id, err := q.riakService.StoreMessage(queueName, data) + q.configLock.RUnlock() + return id, err +} + +// DeleteMessage is +func (queues *Queues) DeleteMessage(name string, id string) (map[string]bool, error) { + return queues.DeleteMessages(name, []string{id}) +} + +// DeleteMessages is +func (queues *Queues) DeleteMessages(name string, ids []string) (map[string]bool, error) { + return queues.riakService.DeleteMessages(name, ids) +} diff --git a/server/http/v2/queue_handlers.go b/server/http/v2/queue_handlers.go index c4c5103..31ab66a 100644 --- a/server/http/v2/queue_handlers.go +++ b/server/http/v2/queue_handlers.go @@ -1,13 +1,23 @@ package httpv2 -import "net/http" +import ( + "net/http" -func (h *HTTPApi) queueList(w http.ResponseWriter, r *http.Request) { + "github.com/Tapjoy/dynamiq/core" + "github.com/gorilla/mux" +) +func (h *HTTPApi) queueList(w http.ResponseWriter, r *http.Request) { + response(w, http.StatusOK, map[string]interface{}{"queues": h.context.QueueNames()}) } func (h *HTTPApi) queueDetails(w http.ResponseWriter, r *http.Request) { - + queueName := mux.Vars(r)["queue"] + conf, err := h.context.GetQueueConfig(queueName) + if err != nil { + errorResponse(w, err) + } + response(w, http.StatusOK, map[string]interface{}{"queue": queueName, "config": conf}) } func (h *HTTPApi) queueConfigure(w http.ResponseWriter, r *http.Request) { @@ -15,6 +25,13 @@ func (h *HTTPApi) queueConfigure(w http.ResponseWriter, r *http.Request) { } func (h *HTTPApi) queueCreate(w http.ResponseWriter, r *http.Request) { + queueName := mux.Vars(r)["queue"] + ok, err := h.context.Queues.Create(queueName, core.DefaultSettings) + if err != nil { + // return 500 for now - should return contextually correct errors otherwise + errorResponse(w, err) + } + response(w, http.StatusOK, map[string]interface{}{"queue": queueName, "created": ok}) } @@ -23,7 +40,14 @@ func (h *HTTPApi) queueDelete(w http.ResponseWriter, r *http.Request) { } func (h *HTTPApi) queueSubmitMessage(w http.ResponseWriter, r *http.Request) { - + queueName := mux.Vars(r)["queue"] + msgData := "" // TODO read from body + + id, err := h.context.Queues.SaveMessage(queueName, msgData) + if err != nil { + errorResponse(w, err) + } + response(w, http.StatusOK, map[string]interface{}{"queue": queueName, "id": id}) } func (h *HTTPApi) queueGetMessage(w http.ResponseWriter, r *http.Request) { @@ -35,5 +59,14 @@ func (h *HTTPApi) queuePollMessage(w http.ResponseWriter, r *http.Request) { } func (h *HTTPApi) queueDeleteMessage(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + queueName := vars["queue"] + id := vars["id"] + + ok, err := h.context.Queues.DeleteMessage(queueName, id) + if err != nil { + errorResponse(w, err) + } + response(w, http.StatusOK, map[string]interface{}{"queue": queueName, "deleted": ok}) } diff --git a/server/http/v2/topic_handlers.go b/server/http/v2/topic_handlers.go index d8f0e69..16c395c 100644 --- a/server/http/v2/topic_handlers.go +++ b/server/http/v2/topic_handlers.go @@ -8,8 +8,6 @@ import ( ) func (h *HTTPApi) topicList(w http.ResponseWriter, r *http.Request) { - // TODO need to move topics/queues into a config manager to lock their access - // due to the goroutines that update and need to lock them response(w, http.StatusOK, map[string]interface{}{"topics": h.context.TopicNames()}) } From 88151eff98e7d22c027230a5727a17b62300e3a5 Mon Sep 17 00:00:00 2001 From: StabbyCutyou Date: Mon, 2 Nov 2015 21:52:36 -0500 Subject: [PATCH 06/12] More work on queues http endpoint --- core/queues.go | 9 +++++---- core/riak_service.go | 32 ++++++++++++++++++++++++++++++++ server/http/v2/queue_handlers.go | 11 ++++++++++- 3 files changed, 47 insertions(+), 5 deletions(-) diff --git a/core/queues.go b/core/queues.go index aa0a370..493e271 100644 --- a/core/queues.go +++ b/core/queues.go @@ -141,11 +141,12 @@ func (queues *Queues) Exists(queueName string) (bool, error) { return false, nil } +func (q *Queues) GetMessage(queueName string, id string) (string, error) { + return q.riakService.GetMessage(queueName, id) +} + func (q *Queues) SaveMessage(queueName string, data string) (string, error) { - q.configLock.RLock() - id, err := q.riakService.StoreMessage(queueName, data) - q.configLock.RUnlock() - return id, err + return q.riakService.StoreMessage(queueName, data) } // DeleteMessage is diff --git a/core/riak_service.go b/core/riak_service.go index 84f6c05..65d7393 100644 --- a/core/riak_service.go +++ b/core/riak_service.go @@ -160,6 +160,38 @@ func (rs *RiakService) RangeScanMessages(queueName string, numMessages uint32, l return rs.lookupMessagesForRangeScanResults(queueName, res.Response.Results) } +func (rs *RiakService) GetMessage(queueName string, messageKey string) (string, error) { + cmd, err := riak.NewFetchValueCommandBuilder(). + WithBucketType("messages"). + WithBucket(queueName). + WithKey(messageKey).Build() + + if err != nil { + return "", err + } + if err = rs.Execute(cmd); err != nil { + return "", err + } + + res := cmd.(*riak.FetchValueCommand) + if res.Error() != nil || res.Response.IsNotFound { + return "", res.Error() + } + + if len(res.Response.Values) > 1 { + for _, obj := range res.Response.Values[1:len(res.Response.Values)] { + _, err := rs.StoreMessage(queueName, string(obj.Value)) + if err != nil { + // Couldn't save that Message + // That would mean it's lost + // need to incorporate a retry mechanic + } + } + } + + return string(res.Response.Values[0].Value), nil +} + func (rs *RiakService) lookupMessagesForRangeScanResults(queueName string, results []*riak.SecondaryIndexQueryResult) ([]*riak.Object, error) { // Channel for holding the results of the io calls objChan := make(chan []*riak.Object, len(results)) diff --git a/server/http/v2/queue_handlers.go b/server/http/v2/queue_handlers.go index 31ab66a..539988e 100644 --- a/server/http/v2/queue_handlers.go +++ b/server/http/v2/queue_handlers.go @@ -1,6 +1,7 @@ package httpv2 import ( + "encoding/json" "net/http" "github.com/Tapjoy/dynamiq/core" @@ -51,7 +52,15 @@ func (h *HTTPApi) queueSubmitMessage(w http.ResponseWriter, r *http.Request) { } func (h *HTTPApi) queueGetMessage(w http.ResponseWriter, r *http.Request) { - + vars := mux.Vars(r) + queueName := vars["queue"] + id := vars["id"] + msg, err := h.context.Queues.GetMessage(queueName, id) + if err != nil { + errorResponse(w, err) + } + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(msg) } func (h *HTTPApi) queuePollMessage(w http.ResponseWriter, r *http.Request) { From fdd8c7bcac58014ff034c0dd025b16f7a7abd1e6 Mon Sep 17 00:00:00 2001 From: StabbyCutyou Date: Wed, 4 Nov 2015 09:35:10 -0800 Subject: [PATCH 07/12] Fixing error being used for topics, fixing how I used the queue map configuration object --- core/config.go | 2 +- core/queues.go | 13 ++++++++----- core/riak_service.go | 17 +++++++---------- core/topics.go | 10 +++++++--- dynamiq.go | 12 +++++++++--- 5 files changed, 32 insertions(+), 22 deletions(-) diff --git a/core/config.go b/core/config.go index 3a0b8ec..fac287d 100644 --- a/core/config.go +++ b/core/config.go @@ -240,7 +240,7 @@ func (cfg *Config) syncConfig() error { // If the config object isn't found - no big deal // Only config presently is a list of queues, the object will // be created when the first queue is set - log.Println(err, tName) + // This is a NOOP, but left in for documentation purposes } t.Config = topcfg cfg.Topics.KnownTopics[tName] = t diff --git a/core/queues.go b/core/queues.go index 493e271..45b42f8 100644 --- a/core/queues.go +++ b/core/queues.go @@ -3,6 +3,7 @@ package core // Queues represents a collection of Queues, and the behaviors that may be taken import ( "errors" + "log" "sync" "github.com/basho/riak-go-client" @@ -87,24 +88,26 @@ func (queues *Queues) Create(queueName string, options map[string]string) (bool, // build the operation to update the set op := &riak.MapOperation{} op.AddToSet("queues", []byte(queueName)) - _, err := queues.riakService.CreateOrUpdateMap("config", "queues_config", []*riak.MapOperation{op}) + _, err := queues.riakService.CreateOrUpdateMap("config", "queues_config", op) if err != nil { return false, err } - cfgOps := make([]*riak.MapOperation, 0) + //cfgOps := make([]*riak.MapOperation, 0) // Create the config + cOp := &riak.MapOperation{} for name, defaultValue := range DefaultSettings { - cOp := &riak.MapOperation{} if val, ok := options[name]; ok { + log.Println("Found a setting for this", name, " ", val) cOp.SetRegister(name, []byte(val)) } else { + log.Println("Found a default setting for this", name, " ", val) cOp.SetRegister(name, []byte(defaultValue)) } - cfgOps = append(cfgOps, cOp) + //cfgOps = append(cfgOps, cOp) } - _, err = queues.riakService.CreateOrUpdateMap("config", queueConfigRecordName(queueName), cfgOps) + _, err = queues.riakService.CreateOrUpdateMap("config", queueConfigRecordName(queueName), cOp) if err != nil { return false, err } diff --git a/core/riak_service.go b/core/riak_service.go index 65d7393..50e2e38 100644 --- a/core/riak_service.go +++ b/core/riak_service.go @@ -63,7 +63,7 @@ func (rs *RiakService) CreateQueuesConfigMap() (*riak.Map, error) { op := &riak.MapOperation{} op.SetRegister("created", []byte(time.Now().String())) - return rs.CreateOrUpdateMap("config", "queues_config", []*riak.MapOperation{op}) + return rs.CreateOrUpdateMap("config", "queues_config", op) } // CreateTopicConfigMap is @@ -71,7 +71,7 @@ func (rs *RiakService) CreateTopicsConfigMap() (*riak.Map, error) { op := &riak.MapOperation{} op.SetRegister("created", []byte(time.Now().String())) - return rs.CreateOrUpdateMap("config", "topics_config", []*riak.MapOperation{op}) + return rs.CreateOrUpdateMap("config", "topics_config", op) } // GetMap loads a CRDT Map from Riak @@ -103,16 +103,13 @@ func (rs *RiakService) GetMap(name string) (*riak.Map, error) { } // CreateOrUpdateMap does exactly that because thats what the riak lib allows for -func (rs *RiakService) CreateOrUpdateMap(bucket string, key string, ops []*riak.MapOperation) (*riak.Map, error) { - cBuilder := riak.NewUpdateMapCommandBuilder(). +func (rs *RiakService) CreateOrUpdateMap(bucket string, key string, op *riak.MapOperation) (*riak.Map, error) { + cmd, err := riak.NewUpdateMapCommandBuilder(). WithBucketType("maps"). WithBucket(bucket). WithReturnBody(true). - WithKey(key) - for _, op := range ops { - cBuilder = cBuilder.WithMapOperation(op) - } - cmd, err := cBuilder.Build() + WithKey(key). + WithMapOperation(op).Build() if err != nil { return nil, err @@ -133,7 +130,7 @@ func (rs *RiakService) CreateQueueConfig(queueName string, values map[string]str op.SetRegister(key, []byte(value)) } - return rs.CreateOrUpdateMap("config", queueConfigRecordName(queueName), []*riak.MapOperation{op}) + return rs.CreateOrUpdateMap("config", queueConfigRecordName(queueName), op) } // RangeScanMessages is diff --git a/core/topics.go b/core/topics.go index efe21a3..e3e47d1 100644 --- a/core/topics.go +++ b/core/topics.go @@ -2,6 +2,7 @@ package core import ( "errors" + "log" "sync" "time" @@ -21,8 +22,10 @@ type Topics struct { } var ( - //ErrNoKnownTopics + //ErrNoKnownTopics is ErrNoKnownTopics = errors.New("There are no known topics in the system") + // ErrTopicAlreadyExists is + ErrTopicAlreadyExists = errors.New("Topic already exists") ) // LoadTopicsFromRiak is @@ -57,15 +60,16 @@ func (topics *Topics) Create(queueName string) (bool, error) { if ok, err := topics.Exists(queueName); ok || err != nil { if err == nil { - return false, ErrQueueAlreadyExists + return false, ErrTopicAlreadyExists } return false, err } // build the operation to update the set op := &riak.MapOperation{} op.AddToSet("topics", []byte(queueName)) - _, err := topics.riakService.CreateOrUpdateMap("config", "topics_config", []*riak.MapOperation{op}) + _, err := topics.riakService.CreateOrUpdateMap("config", "topics_config", op) if err != nil { + log.Println(err) return false, err } return true, nil diff --git a/dynamiq.go b/dynamiq.go index 3759df1..fc05d1a 100644 --- a/dynamiq.go +++ b/dynamiq.go @@ -17,20 +17,26 @@ func main2() { log.Fatal(err) } - ok, err := cfg.Queues.Create("tq1", make(map[string]string)) + ok, err := cfg.Topics.Create("tt1") if !ok || err != nil { log.Fatal(err) } - ok, err = cfg.Topics.Create("tt1") + ok, err = cfg.Queues.Create("tq1", make(map[string]string)) + if !ok || err != nil { + log.Fatal(err) + } + + ok, err = cfg.Topics.SubscribeQueue("tt1", "tq1") if !ok || err != nil { log.Fatal(err) } messages, err := cfg.Riak.Service.RangeScanMessages("tq1", 20, 0, math.MaxInt64) if err != nil { - log.Println(err) + log.Fatal(err) } + log.Println("Stuff") log.Println(messages) httpServer, err := httpv2.New(cfg) From b7b8c53918440705ceeaa6513212a4f92106e5ef Mon Sep 17 00:00:00 2001 From: StabbyCutyou Date: Wed, 4 Nov 2015 11:09:48 -0800 Subject: [PATCH 08/12] Broadcasting and retrieving messages in the simple case --- core/topics.go | 15 ++++++++++----- server/http/v2/topic_handlers.go | 14 ++++++++++++-- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/core/topics.go b/core/topics.go index e3e47d1..d5a2eff 100644 --- a/core/topics.go +++ b/core/topics.go @@ -109,18 +109,23 @@ func (topics *Topics) UnsubscribeQueue(topicName string, queueName string) (bool return topics.riakService.UpdateTopicSubscription(topicName, queueName, false) } -func (t *Topics) BroadcastMessage(topicName string, data string) []map[string]interface{} { +func (t *Topics) BroadcastMessage(topicName string, data string) ([]map[string]interface{}, error) { t.configLock.RLock() - results := make([]map[string]interface{}, len(t.Config.Sets["queues"])) - for _, q := range t.Config.Sets["queues"] { + topic, ok := t.KnownTopics[topicName] + if !ok { + return nil, ErrUnknownTopic + } + results := make([]map[string]interface{}, 0) + for _, q := range topic.Config.Sets["queues"] { queueName := string(q) + log.Println("Queue is", queueName) id, err := t.riakService.StoreMessage(queueName, data) - result := map[string]interface{}{"id": id} + result := map[string]interface{}{"id": id, "queue": queueName} if err != nil { result[queueName] = err.Error() } results = append(results, result) } t.configLock.RUnlock() - return results + return results, nil } diff --git a/server/http/v2/topic_handlers.go b/server/http/v2/topic_handlers.go index 16c395c..3e8aba5 100644 --- a/server/http/v2/topic_handlers.go +++ b/server/http/v2/topic_handlers.go @@ -2,6 +2,8 @@ package httpv2 import ( "encoding/json" + "io/ioutil" + "log" "net/http" "github.com/gorilla/mux" @@ -36,10 +38,18 @@ func (h *HTTPApi) topicDelete(w http.ResponseWriter, r *http.Request) { func (h *HTTPApi) topicSubmitMessage(w http.ResponseWriter, r *http.Request) { topicName := mux.Vars(r)["topic"] - msgData := "" // TODO read from body + msgData, err := ioutil.ReadAll(r.Body) + r.Body.Close() - results := h.context.Topics.BroadcastMessage(topicName, msgData) + if err != nil { + errorResponse(w, err) + } + results, err := h.context.Topics.BroadcastMessage(topicName, string(msgData)) + if err != nil { + errorResponse(w, err) + } + log.Println(results) w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(results) } From 25ddb05b9dfd2115c6e8e4e9d72d861adde6de3c Mon Sep 17 00:00:00 2001 From: StabbyCutyou Date: Sat, 7 Nov 2015 22:27:09 -0500 Subject: [PATCH 09/12] Initial work on polling API, needs work to finish the way loading / syncing queue objects from riak works --- core/config.go | 7 +++++-- core/queue.go | 17 ----------------- core/queues.go | 32 +++++++++++++++++++++++++++++++- server/http/v2/queue_handlers.go | 26 ++++++++++++++++++++++++++ server/http/v2/v2.go | 2 +- 5 files changed, 63 insertions(+), 21 deletions(-) diff --git a/core/config.go b/core/config.go index fac287d..61fd316 100644 --- a/core/config.go +++ b/core/config.go @@ -57,6 +57,9 @@ type Config struct { var ( //ErrUnknownTopic is ErrUnknownTopic = errors.New("There is no known topic by that name") + + //ErrUnknownTopic is + ErrUnknownQueue = errors.New("There is no known queue by that name") ) // GetConfig Parses and returns a config object @@ -302,8 +305,8 @@ func (cfg *Config) syncConfig() error { // Add or Update the topic to the known set var q *Queue if q, ok = cfg.Queues.KnownQueues[qName]; !ok { - // Didn't exist in memory, so create it - // TODO centralize this, don't just re-write initialization logic + // TODO see node in LoadQueuesFromRiak about the need For + // and independent LoadFromRiak method q = &Queue{ Name: qName, configLock: sync.RWMutex{}, diff --git a/core/queue.go b/core/queue.go index 38e55a5..620b207 100644 --- a/core/queue.go +++ b/core/queue.go @@ -42,20 +42,3 @@ const QueueDepthAprStatsSuffix = "approximate_depth.count" // QueueFillDeltaStatsSuffix const QueueFillDeltaStatsSuffix = "fill.count" - -// PollMessages does a range scan over the queue bucket and returns a map of message ids to bodies -func (queue *Queue) PollMessages(batchSize uint32) (map[string]string, error) { - lower, upper, err := queue.ring.ReserveNext() - if err != nil { - return nil, err - } - riakObjects, err := queue.riakService.RangeScanMessages(queue.Name, batchSize, lower, upper) - if err != nil { - return nil, err - } - results := make(map[string]string, len(riakObjects)) - for _, obj := range riakObjects { - results[obj.Key] = string(obj.Value) - } - return results, err -} diff --git a/core/queues.go b/core/queues.go index 45b42f8..cbdcd1b 100644 --- a/core/queues.go +++ b/core/queues.go @@ -66,7 +66,9 @@ func LoadQueuesFromRiak(cfg *RiakConfig) (*Queues, error) { // TODO // Initialize a Queue object for each one in the set - // Store it by name in KnownQueus + // Store it by name in KnownQueues + + // Need to use a general LoadFromRiak method return queues, nil } @@ -161,3 +163,31 @@ func (queues *Queues) DeleteMessage(name string, id string) (map[string]bool, er func (queues *Queues) DeleteMessages(name string, ids []string) (map[string]bool, error) { return queues.riakService.DeleteMessages(name, ids) } + +// PollMessages does a range scan over the queue bucket and returns a map of message ids to bodies +func (queues *Queues) PollMessages(name string, batchSize uint32) (map[string]string, error) { + queues.configLock.RLock() + defer queues.configLock.RUnlock() + log.Println(queues.KnownQueues) + log.Println(queues.Config) + queue, ok := queues.KnownQueues[name] + if !ok { + log.Println("not found") + return nil, ErrUnknownQueue + } + lower, upper, err := queue.ring.ReserveNext() + if err != nil { + log.Println(err) + return nil, err + } + riakObjects, err := queue.riakService.RangeScanMessages(queue.Name, batchSize, lower, upper) + if err != nil { + log.Println(err) + return nil, err + } + results := make(map[string]string, len(riakObjects)) + for _, obj := range riakObjects { + results[obj.Key] = string(obj.Value) + } + return results, nil +} diff --git a/server/http/v2/queue_handlers.go b/server/http/v2/queue_handlers.go index 539988e..624e1fc 100644 --- a/server/http/v2/queue_handlers.go +++ b/server/http/v2/queue_handlers.go @@ -2,7 +2,9 @@ package httpv2 import ( "encoding/json" + "log" "net/http" + "strconv" "github.com/Tapjoy/dynamiq/core" "github.com/gorilla/mux" @@ -17,6 +19,7 @@ func (h *HTTPApi) queueDetails(w http.ResponseWriter, r *http.Request) { conf, err := h.context.GetQueueConfig(queueName) if err != nil { errorResponse(w, err) + return } response(w, http.StatusOK, map[string]interface{}{"queue": queueName, "config": conf}) } @@ -31,6 +34,7 @@ func (h *HTTPApi) queueCreate(w http.ResponseWriter, r *http.Request) { if err != nil { // return 500 for now - should return contextually correct errors otherwise errorResponse(w, err) + return } response(w, http.StatusOK, map[string]interface{}{"queue": queueName, "created": ok}) @@ -47,6 +51,7 @@ func (h *HTTPApi) queueSubmitMessage(w http.ResponseWriter, r *http.Request) { id, err := h.context.Queues.SaveMessage(queueName, msgData) if err != nil { errorResponse(w, err) + return } response(w, http.StatusOK, map[string]interface{}{"queue": queueName, "id": id}) } @@ -58,13 +63,33 @@ func (h *HTTPApi) queueGetMessage(w http.ResponseWriter, r *http.Request) { msg, err := h.context.Queues.GetMessage(queueName, id) if err != nil { errorResponse(w, err) + return } w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(msg) } func (h *HTTPApi) queuePollMessage(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + queueName := vars["queue"] + log.Println(queueName) + count := vars["count"] + + uCount, err := strconv.ParseUint(count, 10, 32) + if err != nil { + errorResponse(w, err) + return + } + msgs, err := h.context.Queues.PollMessages(queueName, uint32(uCount)) + + if err != nil { + errorResponse(w, err) + return + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(msgs) } func (h *HTTPApi) queueDeleteMessage(w http.ResponseWriter, r *http.Request) { @@ -75,6 +100,7 @@ func (h *HTTPApi) queueDeleteMessage(w http.ResponseWriter, r *http.Request) { ok, err := h.context.Queues.DeleteMessage(queueName, id) if err != nil { errorResponse(w, err) + return } response(w, http.StatusOK, map[string]interface{}{"queue": queueName, "deleted": ok}) diff --git a/server/http/v2/v2.go b/server/http/v2/v2.go index 8a3f4b7..783c1d9 100644 --- a/server/http/v2/v2.go +++ b/server/http/v2/v2.go @@ -45,7 +45,7 @@ func New(cfg *core.Config) (*HTTPApi, error) { queueRoutes.HandleFunc("/{queue}", h.queueSubmitMessage).Methods("POST") queueRoutes.HandleFunc("/{queue}/{id}", h.queueGetMessage).Methods("GET") queueRoutes.HandleFunc("/{queue}/{id}", h.queueDeleteMessage).Methods("DELETE") - queueRoutes.HandleFunc("/{queue}/poll/{num}", h.queuePollMessage).Methods("GET") + queueRoutes.HandleFunc("/{queue}/poll/{count}", h.queuePollMessage).Methods("GET") http.Handle("/", router) return h, nil From d8c6dc06c2f87aea3a46a42b4039d5ec7e0e9639 Mon Sep 17 00:00:00 2001 From: StabbyCutyou Date: Sun, 8 Nov 2015 23:10:31 -0500 Subject: [PATCH 10/12] Polling from the rest api works --- core/config.go | 17 ++++------------- core/queues.go | 46 +++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 45 insertions(+), 18 deletions(-) diff --git a/core/config.go b/core/config.go index 61fd316..467fcc1 100644 --- a/core/config.go +++ b/core/config.go @@ -301,24 +301,17 @@ func (cfg *Config) syncConfig() error { for _, queue := range newSet { qName := string(queue) // Record this so we know who to evict - topicsToKeep[qName] = true + queuesToKeep[qName] = true // Add or Update the topic to the known set var q *Queue if q, ok = cfg.Queues.KnownQueues[qName]; !ok { // TODO see node in LoadQueuesFromRiak about the need For // and independent LoadFromRiak method - q = &Queue{ - Name: qName, - configLock: sync.RWMutex{}, + q, err = LoadQueueFromRiak(cfg.Riak.Service, qName) + if err != nil { + return err } } - // get the config and set it on the topic - queuecfg, err := cfg.Riak.Service.GetQueueConfigMap(qName) - if err != nil { - log.Println(err, qName) - return err - } - q.Config = queuecfg cfg.Queues.KnownQueues[qName] = q } @@ -328,7 +321,6 @@ func (cfg *Config) syncConfig() error { if _, ok := queuesToKeep[qName]; !ok { // It wasn't in the old list, evict it delete(cfg.Queues.KnownQueues, qName) - } else { queuesToRemove = append(queuesToRemove, qName) } } @@ -342,7 +334,6 @@ func (cfg *Config) syncConfig() error { if qName == queue { _, err := cfg.Riak.Service.UpdateTopicSubscription(tName, qName, false) if err != nil { - log.Println(err) return err } } diff --git a/core/queues.go b/core/queues.go index cbdcd1b..7a10c5e 100644 --- a/core/queues.go +++ b/core/queues.go @@ -4,8 +4,11 @@ package core import ( "errors" "log" + "strconv" "sync" + "time" + "github.com/StabbyCutyou/partition_ring" "github.com/basho/riak-go-client" ) @@ -28,7 +31,7 @@ var ( // Settings Arrays and maps cannot be made immutable in golang Settings = [...]string{VisibilityTimeout, PartitionStep, CompressedMessages} // DefaultSettings is - DefaultSettings = map[string]string{VisibilityTimeout: "30", PartitionStep: "5000000", CompressedMessages: "false"} + DefaultSettings = map[string]string{VisibilityTimeout: "5s", PartitionStep: "5000000", CompressedMessages: "false"} ) // Queues represents a collection of Queue objects, and the behaviors that may be @@ -64,15 +67,48 @@ func LoadQueuesFromRiak(cfg *RiakConfig) (*Queues, error) { } queues.Config = m - // TODO - // Initialize a Queue object for each one in the set - // Store it by name in KnownQueues - + for _, q := range queues.Config.Sets["queues"] { + queueName := string(q) + rQueue, err := LoadQueueFromRiak(queues.riakService, queueName) + if err != nil { + return nil, err + } + queues.KnownQueues[queueName] = rQueue + } // Need to use a general LoadFromRiak method return queues, nil } +func LoadQueueFromRiak(rs *RiakService, queueName string) (*Queue, error) { + cfg, err := rs.GetQueueConfigMap(queueName) + if err != nil { + return nil, err + } + + step := cfg.Registers[PartitionStep] + intStep, err := strconv.ParseInt(string(step), 10, 64) + if err != nil { + return nil, err + } + visTimeout := cfg.Registers[VisibilityTimeout] + timeDuration, err := time.ParseDuration(string(visTimeout)) + + if err != nil { + return nil, err + } + + q := &Queue{ + Name: queueName, + Config: cfg, + configLock: sync.RWMutex{}, + riakService: rs, + ring: partitionring.New(0, partitionring.MaxPartitionUpperBound, intStep, timeDuration), + } + + return q, nil +} + // Create will register a new queue with the default config // This queue will be available to be used once all the nodes have had their config // refreshed From 9ed5f361feed611b1810f3675400b56652087da5 Mon Sep 17 00:00:00 2001 From: StabbyCutyou Date: Sun, 8 Nov 2015 23:51:31 -0500 Subject: [PATCH 11/12] Working polling - now with more working --- core/queues.go | 5 +---- core/riak_service.go | 4 +++- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/core/queues.go b/core/queues.go index 7a10c5e..9986b74 100644 --- a/core/queues.go +++ b/core/queues.go @@ -31,7 +31,7 @@ var ( // Settings Arrays and maps cannot be made immutable in golang Settings = [...]string{VisibilityTimeout, PartitionStep, CompressedMessages} // DefaultSettings is - DefaultSettings = map[string]string{VisibilityTimeout: "5s", PartitionStep: "5000000", CompressedMessages: "false"} + DefaultSettings = map[string]string{VisibilityTimeout: "5s", PartitionStep: "922337203685477580", CompressedMessages: "false"} ) // Queues represents a collection of Queue objects, and the behaviors that may be @@ -204,11 +204,8 @@ func (queues *Queues) DeleteMessages(name string, ids []string) (map[string]bool func (queues *Queues) PollMessages(name string, batchSize uint32) (map[string]string, error) { queues.configLock.RLock() defer queues.configLock.RUnlock() - log.Println(queues.KnownQueues) - log.Println(queues.Config) queue, ok := queues.KnownQueues[name] if !ok { - log.Println("not found") return nil, ErrUnknownQueue } lower, upper, err := queue.ring.ReserveNext() diff --git a/core/riak_service.go b/core/riak_service.go index 50e2e38..04cb59f 100644 --- a/core/riak_service.go +++ b/core/riak_service.go @@ -140,7 +140,7 @@ func (rs *RiakService) RangeScanMessages(queueName string, numMessages uint32, l WithBucket(queueName). WithIntRange(lowerBound, upperBound). WithMaxResults(numMessages). - WithIndexName("$bucket").Build() + WithIndexName("id_int").Build() if err != nil { return nil, err @@ -272,6 +272,8 @@ func (rs *RiakService) StoreMessage(queueName string, message string) (string, e Value: []byte(message), } + obj.AddToIntIndex("id_int", int(randID.Int64())) + cmd, err := riak.NewStoreValueCommandBuilder(). WithBucketType("messages"). WithBucket(queueName). From d411ed8f19806d12627e6816d772f1becd8ff05d Mon Sep 17 00:00:00 2001 From: StabbyCutyou Date: Mon, 9 Nov 2015 12:14:43 -0500 Subject: [PATCH 12/12] Deleting now properly reports the ids deleted, removed superfluous logging --- core/queues.go | 2 -- core/riak_service.go | 4 ++-- core/topics.go | 1 - dynamiq.go | 23 ----------------------- server/http/v2/queue_handlers.go | 2 -- server/http/v2/topic_handlers.go | 2 -- 6 files changed, 2 insertions(+), 32 deletions(-) diff --git a/core/queues.go b/core/queues.go index 9986b74..9a7c730 100644 --- a/core/queues.go +++ b/core/queues.go @@ -136,10 +136,8 @@ func (queues *Queues) Create(queueName string, options map[string]string) (bool, cOp := &riak.MapOperation{} for name, defaultValue := range DefaultSettings { if val, ok := options[name]; ok { - log.Println("Found a setting for this", name, " ", val) cOp.SetRegister(name, []byte(val)) } else { - log.Println("Found a default setting for this", name, " ", val) cOp.SetRegister(name, []byte(defaultValue)) } //cfgOps = append(cfgOps, cOp) diff --git a/core/riak_service.go b/core/riak_service.go index 04cb59f..fb985dc 100644 --- a/core/riak_service.go +++ b/core/riak_service.go @@ -327,7 +327,7 @@ func (rs *RiakService) DeleteMessages(queueName string, keys []string) (map[stri // Seed it with the expected number of ops wg.Add(len(keys)) - results := make(map[string]bool, len(keys)) + results := make(map[string]bool) for _, mKey := range keys { // Kick off a go routine to delete the message @@ -335,7 +335,7 @@ func (rs *RiakService) DeleteMessages(queueName string, keys []string) (map[stri defer w.Done() deleted, err := riakService.DeleteMessage(queueName, messageKey) - if err != nil { + if err == nil { // Pop the results onto the channel c <- &deletedMessage{key: messageKey, deleted: deleted} } diff --git a/core/topics.go b/core/topics.go index d5a2eff..826f608 100644 --- a/core/topics.go +++ b/core/topics.go @@ -118,7 +118,6 @@ func (t *Topics) BroadcastMessage(topicName string, data string) ([]map[string]i results := make([]map[string]interface{}, 0) for _, q := range topic.Config.Sets["queues"] { queueName := string(q) - log.Println("Queue is", queueName) id, err := t.riakService.StoreMessage(queueName, data) result := map[string]interface{}{"id": id, "queue": queueName} if err != nil { diff --git a/dynamiq.go b/dynamiq.go index fc05d1a..7e78b4b 100644 --- a/dynamiq.go +++ b/dynamiq.go @@ -3,7 +3,6 @@ package main import ( "flag" "log" - "math" "github.com/Sirupsen/logrus" "github.com/Tapjoy/dynamiq/app" @@ -17,28 +16,6 @@ func main2() { log.Fatal(err) } - ok, err := cfg.Topics.Create("tt1") - if !ok || err != nil { - log.Fatal(err) - } - - ok, err = cfg.Queues.Create("tq1", make(map[string]string)) - if !ok || err != nil { - log.Fatal(err) - } - - ok, err = cfg.Topics.SubscribeQueue("tt1", "tq1") - if !ok || err != nil { - log.Fatal(err) - } - - messages, err := cfg.Riak.Service.RangeScanMessages("tq1", 20, 0, math.MaxInt64) - if err != nil { - log.Fatal(err) - } - - log.Println("Stuff") - log.Println(messages) httpServer, err := httpv2.New(cfg) if err != nil { log.Println(err) diff --git a/server/http/v2/queue_handlers.go b/server/http/v2/queue_handlers.go index 624e1fc..9ccbfbd 100644 --- a/server/http/v2/queue_handlers.go +++ b/server/http/v2/queue_handlers.go @@ -2,7 +2,6 @@ package httpv2 import ( "encoding/json" - "log" "net/http" "strconv" @@ -72,7 +71,6 @@ func (h *HTTPApi) queueGetMessage(w http.ResponseWriter, r *http.Request) { func (h *HTTPApi) queuePollMessage(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) queueName := vars["queue"] - log.Println(queueName) count := vars["count"] uCount, err := strconv.ParseUint(count, 10, 32) diff --git a/server/http/v2/topic_handlers.go b/server/http/v2/topic_handlers.go index 3e8aba5..3a86b71 100644 --- a/server/http/v2/topic_handlers.go +++ b/server/http/v2/topic_handlers.go @@ -3,7 +3,6 @@ package httpv2 import ( "encoding/json" "io/ioutil" - "log" "net/http" "github.com/gorilla/mux" @@ -49,7 +48,6 @@ func (h *HTTPApi) topicSubmitMessage(w http.ResponseWriter, r *http.Request) { if err != nil { errorResponse(w, err) } - log.Println(results) w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(results) }