-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathConcurrentTaskProcessingActor.scala
More file actions
127 lines (101 loc) · 3.89 KB
/
ConcurrentTaskProcessingActor.scala
File metadata and controls
127 lines (101 loc) · 3.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package com.kifi.juggle
import akka.actor.Actor
import scala.concurrent.{ Future }
import scala.util.{ Try, Failure, Success }
object ConcurrentTaskProcessingActor {
trait TaskProcessingActorMessage
case object Close extends TaskProcessingActorMessage
case object IfYouCouldJustGoAhead extends TaskProcessingActorMessage
case class UnknownTaskStatusException[T](task: T) extends Exception(s"Unknown task status: $task")
case class UnsupportedActorMessageException(any: Any) extends IllegalStateException(if (any != null) any.toString else "Message is NULL")
}
trait ConcurrentTaskProcessingActor[T] { _: Actor =>
protected val minConcurrentTasks: Int
protected val maxConcurrentTasks: Int
protected def pullTasks(limit: Int): Future[Seq[T]]
protected def processTasks(tasks: Seq[T]): Map[T, Future[Unit]]
protected val immediately = new scala.concurrent.ExecutionContext {
def execute(runnable: Runnable): Unit = { runnable.run() }
def reportFailure(t: Throwable): Unit = {}
override def prepare(): scala.concurrent.ExecutionContext = this
}
import ConcurrentTaskProcessingActor._
private[this] case class Pulled(result: Try[Seq[T]], limit: Int) extends TaskProcessingActorMessage
private[this] case class Processed(task: T, result: Try[Unit]) extends TaskProcessingActorMessage
private[this] var closing = false
private[this] var pulling = 0
private[this] var processing = Set.empty[T]
private def concurrentFetchTasks = pulling + processing.size
def receive: akka.actor.Actor.Receive = {
case taskProcessingMessage: TaskProcessingActorMessage => {
taskProcessingMessage match {
case IfYouCouldJustGoAhead => startPulling()
case Pulled(result, limit) => endPulling(result, limit)
case Processed(task, result) => endProcessing(task)
case Close => close()
}
}
case m => throw new UnsupportedActorMessageException(m)
}
private def startPulling(): Unit = if (!closing) {
val limit = maxConcurrentTasks - concurrentFetchTasks
if (limit > 0) {
pulling += limit
val pulledTasks = try {
pullTasks(limit)
} catch {
case error: Exception => Future.failed(error)
}
pulledTasks.onComplete { result =>
self ! Pulled(result, limit)
}(immediately)
}
}
private def endPulling(pulled: Try[Seq[T]], limit: Int): Unit = {
pulling -= limit
pulled match {
case Success(tasks) => {
startProcessing(tasks)
}
case Failure(error) => {
}
}
}
private def startProcessing(tasks: Seq[T]): Unit = {
if (!closing && tasks.nonEmpty) {
processing ++= tasks
val processedTasks = try {
processTasks(tasks)
} catch {
case error: Exception => Map.empty[T, Future[Unit]] withDefaultValue Future.failed(error)
}
tasks.foreach { task =>
val processedTask = processedTasks.getOrElse(task, Future.failed(UnknownTaskStatusException(task)))
processedTask.onComplete { result =>
self ! Processed(task, result)
}(immediately)
}
}
}
private def endProcessing(task: T): Unit = {
processing -= task
if (concurrentFetchTasks < minConcurrentTasks) {
startPulling()
}
}
private def close(): Unit = {
closing = true
}
}
trait BatchProcessingActor[T] extends ConcurrentTaskProcessingActor[Seq[T]] { _: Actor =>
final protected val minConcurrentTasks: Int = 1
final protected val maxConcurrentTasks: Int = 1
final protected def pullTasks(limit: Int): Future[Seq[Seq[T]]] = {
if (limit == 1) nextBatch.map(Seq(_).filter(_.nonEmpty))(immediately) else Future.successful(Seq.empty)
}
final protected def processTasks(tasks: Seq[Seq[T]]): Map[Seq[T], Future[Unit]] = {
tasks.map { batch => batch -> processBatch(batch) }.toMap
}
protected def nextBatch: Future[Seq[T]]
protected def processBatch(batch: Seq[T]): Future[Unit]
}