Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions queue/jec_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package queue

// JECMessage represents a message fetched from the JEC API.
type JECMessage struct {
MessageId string `json:"messageId"`
Body string `json:"body"`
ChannelId string `json:"channelId"`
}
33 changes: 3 additions & 30 deletions queue/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package queue

import (
"github.com/atlassian/jec/runbook"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"sync"
Expand All @@ -17,24 +16,20 @@ const (
)

type job struct {
queueProvider SQSProvider
messageHandler MessageHandler

message sqs.Message
ownerId string
message JECMessage
apiKey string
baseUrl string

state int32
executeMutex *sync.Mutex
}

func newJob(queueProvider SQSProvider, messageHandler MessageHandler, message sqs.Message, apiKey, baseUrl, ownerId string) *job {
func newJob(messageHandler MessageHandler, message JECMessage, apiKey, baseUrl string) *job {
return &job{
queueProvider: queueProvider,
messageHandler: messageHandler,
message: message,
ownerId: ownerId,
apiKey: apiKey,
baseUrl: baseUrl,
state: jobInitial,
Expand All @@ -43,11 +38,7 @@ func newJob(queueProvider SQSProvider, messageHandler MessageHandler, message sq
}

func (j *job) Id() string {
return *j.message.MessageId
}

func (j *job) sqsMessage() sqs.Message {
return j.message
return j.message.MessageId
}

func (j *job) Execute() error {
Expand All @@ -60,26 +51,8 @@ func (j *job) Execute() error {
}
j.state = jobExecuting

region := j.queueProvider.Properties().Region()
messageId := j.Id()

err := j.queueProvider.DeleteMessage(&j.message)
if err != nil {
j.state = jobError
return errors.Errorf("Message[%s] could not be deleted from the queue[%s]: %s", messageId, region, err)
}

logrus.Debugf("Message[%s] is deleted from the queue[%s].", messageId, region)

messageAttr := j.sqsMessage().MessageAttributes

if messageAttr == nil ||
*messageAttr[ownerId].StringValue != j.ownerId &&
*messageAttr[channelId].StringValue != j.ownerId {
j.state = jobError
return errors.Errorf("Message[%s] is invalid, will not be processed.", messageId)
}

result, err := j.messageHandler.Handle(j.message)

if result != nil {
Expand Down
94 changes: 24 additions & 70 deletions queue/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package queue
import (
"encoding/json"
"github.com/atlassian/jec/runbook"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"io/ioutil"
Expand All @@ -20,27 +19,22 @@ var mockActionResultPayload = &runbook.ActionResultPayload{

func newJobTest() *job {
mockMessageHandler := &MockMessageHandler{}
mockMessageHandler.HandleFunc = func(message sqs.Message) (payload *runbook.ActionResultPayload, e error) {
mockMessageHandler.HandleFunc = func(message JECMessage) (payload *runbook.ActionResultPayload, e error) {
return mockActionResultPayload, nil
}

body := "mockBody"
messageAttr := map[string]*sqs.MessageAttributeValue{ownerId: {StringValue: &mockOwnerId}}

message := sqs.Message{
MessageId: &mockMessageId,
Body: &body,
MessageAttributes: messageAttr,
message := JECMessage{
MessageId: mockMessageId,
Body: "mockBody",
ChannelId: mockChannelId,
}

return &job{
queueProvider: NewMockQueueProvider(),
messageHandler: mockMessageHandler,
message: message,
executeMutex: &sync.Mutex{},
apiKey: mockApiKey,
baseUrl: mockBaseUrl,
ownerId: mockOwnerId,
state: jobInitial,
}
}
Expand All @@ -61,17 +55,17 @@ func TestExecute(t *testing.T) {
}))
defer testServer.Close()

sqsJob := newJobTest()
sqsJob.baseUrl = testServer.URL
jecJob := newJobTest()
jecJob.baseUrl = testServer.URL

wg.Add(1)
err := sqsJob.Execute()
err := jecJob.Execute()

wg.Wait()
assert.Nil(t, err)

expectedState := int32(jobFinished)
actualState := sqsJob.state
actualState := jecJob.state

assert.Equal(t, expectedState, actualState)
}
Expand All @@ -85,39 +79,39 @@ func TestMultipleExecute(t *testing.T) {
}))
defer testServer.Close()

sqsJob := newJobTest()
sqsJob.baseUrl = testServer.URL
jecJob := newJobTest()
jecJob.baseUrl = testServer.URL

errorResults := make(chan error, 25)

wg.Add(26) // 25 execute try + 1 successful execute send result to testServer
for i := 0; i < 25; i++ {
go func() {
defer wg.Done()
err := sqsJob.Execute()
err := jecJob.Execute()
if err != nil {
errorResults <- sqsJob.Execute()
errorResults <- jecJob.Execute()
}
}()
}

wg.Wait()
expectedState := int32(jobFinished)
actualState := sqsJob.state
actualState := jecJob.state

assert.Equal(t, expectedState, actualState) // only one execute finished
assert.Equal(t, 24, len(errorResults)) // other executes will fail
}

func TestExecuteInNotInitialState(t *testing.T) {

sqsJob := newJobTest()
sqsJob.state = jobExecuting
jecJob := newJobTest()
jecJob.state = jobExecuting

err := sqsJob.Execute()
err := jecJob.Execute()
assert.NotNil(t, err)

expectedErr := errors.Errorf("Job[%s] is already executing or finished.", sqsJob.Id())
expectedErr := errors.Errorf("Job[%s] is already executing or finished.", jecJob.Id())
assert.EqualError(t, err, expectedErr.Error())
}

Expand All @@ -141,64 +135,24 @@ func TestExecuteWithProcessError(t *testing.T) {
}))
defer testServer.Close()

sqsJob := newJobTest()
sqsJob.baseUrl = testServer.URL
jecJob := newJobTest()
jecJob.baseUrl = testServer.URL

sqsJob.messageHandler.(*MockMessageHandler).HandleFunc = func(message sqs.Message) (payload *runbook.ActionResultPayload, e error) {
jecJob.messageHandler.(*MockMessageHandler).HandleFunc = func(message JECMessage) (payload *runbook.ActionResultPayload, e error) {
return errPayload, errors.New("Process Error")
}

wg.Add(1)
err := sqsJob.Execute()
err := jecJob.Execute()

wg.Wait()
assert.NotNil(t, err)

expectedErr := errors.Errorf("Message[%s] could not be processed: %s", sqsJob.Id(), "Process Error")
assert.EqualError(t, err, expectedErr.Error())

expectedState := int32(jobError)
actualState := sqsJob.state

assert.Equal(t, expectedState, actualState)
}

func TestExecuteWithDeleteError(t *testing.T) {

sqsJob := newJobTest()

sqsJob.queueProvider.(*MockSQSProvider).DeleteMessageFunc = func(message *sqs.Message) error {
return errors.New("Delete Error")
}

err := sqsJob.Execute()
assert.NotNil(t, err)

expectedErr := errors.Errorf("Message[%s] could not be deleted from the queue[%s]: %s", sqsJob.Id(), sqsJob.queueProvider.Properties().Region(), "Delete Error")
assert.EqualError(t, err, expectedErr.Error())

expectedState := int32(jobError)
actualState := sqsJob.state

assert.Equal(t, expectedState, actualState)
}

func TestExecuteWithInvalidQueueMessage(t *testing.T) {

sqsJob := newJobTest()

falseIntegrationId := "falseIntegrationId"
messageAttr := map[string]*sqs.MessageAttributeValue{ownerId: {StringValue: &falseIntegrationId}, channelId: {StringValue: &falseIntegrationId}}
sqsJob.message = sqs.Message{MessageAttributes: messageAttr, MessageId: &mockMessageId}

err := sqsJob.Execute()
assert.NotNil(t, err)

expectedErr := errors.Errorf("Message[%s] is invalid, will not be processed.", sqsJob.Id())
expectedErr := errors.Errorf("Message[%s] could not be processed: %s", jecJob.Id(), "Process Error")
assert.EqualError(t, err, expectedErr.Error())

expectedState := int32(jobError)
actualState := sqsJob.state
actualState := jecJob.state

assert.Equal(t, expectedState, actualState)
}
25 changes: 12 additions & 13 deletions queue/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import (
"github.com/atlassian/jec/conf"
"github.com/atlassian/jec/git"
"github.com/atlassian/jec/runbook"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"io"
"time"
)

type MessageHandler interface {
Handle(message sqs.Message) (*runbook.ActionResultPayload, error)
Handle(message JECMessage) (*runbook.ActionResultPayload, error)
}

type messageHandler struct {
Expand All @@ -32,9 +31,9 @@ func NewMessageHandler(repositories git.Repositories, actionSpecs conf.ActionSpe
}
}

func (mh *messageHandler) Handle(message sqs.Message) (*runbook.ActionResultPayload, error) {
func (mh *messageHandler) Handle(message JECMessage) (*runbook.ActionResultPayload, error) {
queuePayload := payload{}
err := json.Unmarshal([]byte(*message.Body), &queuePayload)
err := json.Unmarshal([]byte(message.Body), &queuePayload)
if err != nil {
return nil, err
}
Expand All @@ -45,7 +44,7 @@ func (mh *messageHandler) Handle(message sqs.Message) (*runbook.ActionResultPayl
action = queuePayload.Action
}
if action == "" {
return nil, errors.Errorf("SQS message does not contain action property.")
return nil, errors.Errorf("Message does not contain action property.")
}

result := &runbook.ActionResultPayload{
Expand Down Expand Up @@ -73,7 +72,7 @@ func (mh *messageHandler) Handle(message sqs.Message) (*runbook.ActionResultPayl
case *runbook.ExecError:
result.IsSuccessful = false
result.FailureMessage = fmt.Sprintf("Err: %s, Stderr: %s", err.Error(), err.Stderr)
logrus.Debugf("Action[%s] execution of message[%s] failed: %s Stderr: %s", action, *message.MessageId, err.Error(), err.Stderr)
logrus.Debugf("Action[%s] execution of message[%s] failed: %s Stderr: %s", action, message.MessageId, err.Error(), err.Stderr)
case nil:
result.IsSuccessful = true
if !queuePayload.DiscardScriptResponse && queuePayload.ActionType == HttpActionType {
Expand All @@ -82,13 +81,13 @@ func (mh *messageHandler) Handle(message sqs.Message) (*runbook.ActionResultPayl
if err != nil {
result.IsSuccessful = false
logrus.Debugf("Http Action[%s] execution of message[%s] failed, could not parse http response fields: %s, error: %s",
action, *message.MessageId, executionResult, err.Error())
action, message.MessageId, executionResult, err.Error())
result.FailureMessage = "Could not parse http response fields: " + executionResult
} else {
result.HttpResponse = httpResult
}
}
logrus.Debugf("Action[%s] execution of message[%s] has been completed and it took %f seconds.", action, *message.MessageId, took.Seconds())
logrus.Debugf("Action[%s] execution of message[%s] has been completed and it took %f seconds.", action, message.MessageId, took.Seconds())

default:
return nil, err
Expand All @@ -103,19 +102,19 @@ func (mh *messageHandler) resolveMappedAction(action string, actionType string)
if !ok {
failureMessage := fmt.Sprintf("No mapped action is configured for requested action[%s]. "+
"The request will be ignored.", action)
return nil, errors.Errorf(failureMessage)
return nil, errors.New(failureMessage)
}

if mappedAction.Type != actionType {
failureMessage := fmt.Sprintf("The type[%s] of the mapped action[%s] is not compatible with requested type[%s]. "+
"The request will be ignored.", mappedAction.Type, action, actionType)
return nil, errors.Errorf(failureMessage)
return nil, errors.New(failureMessage)
}

return &mappedAction, nil
}

func (mh *messageHandler) execute(mappedAction *conf.MappedAction, message *sqs.Message) (string, string, error) {
func (mh *messageHandler) execute(mappedAction *conf.MappedAction, message *JECMessage) (string, string, error) {

sourceType := mappedAction.SourceType
switch sourceType {
Expand All @@ -135,7 +134,7 @@ func (mh *messageHandler) execute(mappedAction *conf.MappedAction, message *sqs.

case conf.LocalSourceType:
args := append(mh.actionSpecs.GlobalFlags.Args(), mappedAction.Flags.Args()...)
args = append(args, []string{"-payload", *message.Body}...)
args = append(args, []string{"-payload", message.Body}...)
args = append(args, mh.actionSpecs.GlobalArgs...)
args = append(args, mappedAction.Args...)
env := append(mh.actionSpecs.GlobalEnv, mappedAction.Env...)
Expand All @@ -151,7 +150,7 @@ func (mh *messageHandler) execute(mappedAction *conf.MappedAction, message *sqs.
}
stderr := mh.actionLoggers[mappedAction.Stderr]

callbackContext, err := runbook.ExecuteFunc(*message.MessageId, mappedAction.Filepath, args, env, stdout, stderr)
callbackContext, err := runbook.ExecuteFunc(message.MessageId, mappedAction.Filepath, args, env, stdout, stderr)
return stdoutBuff.String(), callbackContext, err
default:
return "", "", errors.Errorf("Unknown action sourceType[%s].", sourceType)
Expand Down
Loading