sealed abstract class AbstractOrchestrator[R] extends PersistentActor with AtLeastOnceDelivery with ActorLogging with IdImplicits
An Orchestrator executes a set of, possibly dependent, Tasks
.
A task corresponds to sending a message to an actor, handling its response and possibly
mutate the internal state of the Orchestrator.
The Orchestrator together with the Task is able to:
- Delivering messages with at-least-once delivery guarantee. The
DistinctIdsOrchestrator
ensures each destination will see an independent strictly monotonically increasing sequence number without gaps. - Handling Status messages, that is, if some actor is interested in querying the Orchestrator for its current status, the Orchestrator will respond with the status of each task.
- When all the dependencies of a task finish that task will be started and the Orchestrator will be prepared to handle the messages that the task destination sends.
- If the Orchestrator crashes, the state of each task will be correctly restored.
NOTE: the responses that are received must be Serializable.
In order for the Orchestrator and the Tasks to be able to achieve all of this they have to access and modify each others state directly. This means they are very tightly coupled with each other. To make this relation more obvious and to enforce it, you will only be able to create tasks if you have a reference to an orchestrator (which is passed implicitly to a task).
If you have the need to refactor the creation of tasks so that you can use them in multiple orchestrators you can leverage self type annotations.
- R
the type of result this orchestrator returns when it finishes.
- Source
- Orchestrator.scala
- Alphabetic
- By Inheritance
- AbstractOrchestrator
- IdImplicits
- ActorLogging
- AtLeastOnceDelivery
- AtLeastOnceDeliveryLike
- PersistentActor
- Eventsourced
- PersistenceRecovery
- PersistenceIdentity
- PersistenceStash
- StashFactory
- Stash
- RequiresMessageQueue
- UnrestrictedStash
- StashSupport
- Snapshotter
- Actor
- AnyRef
- Any
- by any2stringadd
- by StringFormat
- by Ensuring
- by ArrowAssoc
- Hide All
- Show All
- Public
- All
Type Members
Abstract Value Members
-
abstract
def
computeID(destination: ActorPath, deliveryId: DeliveryId): ID
Computes ID from the deliveryId of akka-persistence.
-
abstract
def
deliveryIdOf(destination: ActorPath, id: ID): DeliveryId
Converts ID to the deliveryId needed for the confirmDelivery method of akka-persistence.
-
abstract
def
matchId(task: Task[_], id: Long): Boolean
Ensures the received message was in fact destined to be received by
task
. -
abstract
def
persistenceId: String
- Definition Classes
- PersistenceIdentity
Concrete Value Members
-
final
def
!=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
##(): Int
- Definition Classes
- AnyRef → Any
-
def
+(other: String): String
- Implicit
- This member is added by an implicit conversion from AbstractOrchestrator[R] to any2stringadd[AbstractOrchestrator[R]] performed by method any2stringadd in scala.Predef.
- Definition Classes
- any2stringadd
-
def
->[B](y: B): (AbstractOrchestrator[R], B)
- Implicit
- This member is added by an implicit conversion from AbstractOrchestrator[R] to ArrowAssoc[AbstractOrchestrator[R]] performed by method ArrowAssoc in scala.Predef.
- Definition Classes
- ArrowAssoc
- Annotations
- @inline()
-
final
def
==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
var
_waitingTasks: HashMap[Int, Task[_]]
We use a HashMap to ensure remove/insert operations are very fast O(eC).
We use a HashMap to ensure remove/insert operations are very fast O(eC). The keys are the task indexes.
- Attributes
- protected[this]
- final def alwaysAvailableCommands: akka.actor.Actor.Receive
-
def
aroundPostRestart(reason: Throwable): Unit
- Attributes
- protected[akka]
- Definition Classes
- Eventsourced → Actor
-
def
aroundPostStop(): Unit
- Attributes
- protected[akka]
- Definition Classes
- AtLeastOnceDeliveryLike → Eventsourced → Actor
-
def
aroundPreRestart(reason: Throwable, message: Option[Any]): Unit
- Attributes
- protected[akka]
- Definition Classes
- AtLeastOnceDeliveryLike → Eventsourced → Actor
-
def
aroundPreStart(): Unit
- Attributes
- protected[akka]
- Definition Classes
- Eventsourced → Actor
-
def
aroundReceive(receive: Receive, message: Any): Unit
- Attributes
- protected[akka]
- Definition Classes
- AtLeastOnceDeliveryLike → Eventsourced → Actor
-
final
def
asInstanceOf[T0]: T0
- Definition Classes
- Any
-
def
clone(): AnyRef
- Attributes
- protected[java.lang]
- Definition Classes
- AnyRef
- Annotations
- @native() @throws( ... )
- final def computeCurrentBehavior(): Receive
-
def
confirmDelivery(deliveryId: Long): Boolean
- Definition Classes
- AtLeastOnceDeliveryLike
-
implicit
val
context: ActorContext
- Definition Classes
- Actor
-
def
defer[A](event: A)(handler: (A) ⇒ Unit): Unit
- Definition Classes
- PersistentActor
-
def
deferAsync[A](event: A)(handler: (A) ⇒ Unit): Unit
- Definition Classes
- PersistentActor
-
def
deleteMessages(toSequenceNr: Long): Unit
- Definition Classes
- Eventsourced
-
def
deleteSnapshot(sequenceNr: Long): Unit
- Definition Classes
- Snapshotter
-
def
deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit
- Definition Classes
- Snapshotter
-
def
deliver(destination: ActorSelection)(deliveryIdToMessage: (Long) ⇒ Any): Unit
- Definition Classes
- AtLeastOnceDelivery
-
def
deliver(destination: ActorPath)(deliveryIdToMessage: (Long) ⇒ Any): Unit
- Definition Classes
- AtLeastOnceDelivery
-
def
ensuring(cond: (AbstractOrchestrator[R]) ⇒ Boolean, msg: ⇒ Any): AbstractOrchestrator[R]
- Implicit
- This member is added by an implicit conversion from AbstractOrchestrator[R] to Ensuring[AbstractOrchestrator[R]] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
-
def
ensuring(cond: (AbstractOrchestrator[R]) ⇒ Boolean): AbstractOrchestrator[R]
- Implicit
- This member is added by an implicit conversion from AbstractOrchestrator[R] to Ensuring[AbstractOrchestrator[R]] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
-
def
ensuring(cond: Boolean, msg: ⇒ Any): AbstractOrchestrator[R]
- Implicit
- This member is added by an implicit conversion from AbstractOrchestrator[R] to Ensuring[AbstractOrchestrator[R]] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
-
def
ensuring(cond: Boolean): AbstractOrchestrator[R]
- Implicit
- This member is added by an implicit conversion from AbstractOrchestrator[R] to Ensuring[AbstractOrchestrator[R]] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
-
final
def
eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
def
equals(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
def
extraCommands: akka.actor.Actor.Receive
Override this method to add extra commands that are always handled by this orchestrator (except when recovering).
-
def
finalize(): Unit
- Attributes
- protected[java.lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( classOf[java.lang.Throwable] )
-
final
def
finishedTasks: Int
How many tasks of this orchestrator have successfully finished.
How many tasks of this orchestrator have successfully finished. Aborted tasks do not count as a finished task.
-
def
formatted(fmtstr: String): String
- Implicit
- This member is added by an implicit conversion from AbstractOrchestrator[R] to StringFormat[AbstractOrchestrator[R]] performed by method StringFormat in scala.Predef.
- Definition Classes
- StringFormat
- Annotations
- @inline()
-
final
def
getClass(): Class[_]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
-
def
getDeliverySnapshot: AtLeastOnceDeliverySnapshot
- Definition Classes
- AtLeastOnceDeliveryLike
-
def
hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
-
def
internalStashOverflowStrategy: StashOverflowStrategy
- Definition Classes
- PersistenceStash
-
final
def
isInstanceOf[T0]: Boolean
- Definition Classes
- Any
-
def
journalPluginId: String
- Definition Classes
- PersistenceIdentity
-
def
lastSequenceNr: Long
- Definition Classes
- Eventsourced
-
def
loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long): Unit
- Definition Classes
- Snapshotter
-
def
log: LoggingAdapter
- Definition Classes
- ActorLogging
-
def
maxUnconfirmedMessages: Int
- Definition Classes
- AtLeastOnceDeliveryLike
-
final
def
ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
final
def
notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
-
final
def
notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
-
def
numberOfUnconfirmed: Int
- Definition Classes
- AtLeastOnceDeliveryLike
-
def
onAbort(failure: Failure): Unit
User overridable callback.
User overridable callback. Its called when the orchestrator is aborted. By default an orchestrator aborts as soon as a task aborts. However this functionality can be changed by overriding
onTaskAbort
.By default logs that the orchestrator has aborted, sends a message to its parent explaining why the orchestrator aborted then stops it.
You can use this to implement your termination strategy.
-
def
onFinish(): Unit
User overridable callback.
User overridable callback. Its called after every task finishes. If a task aborts then it will prevent this method from being invoked.
By default logs that the Orchestrator has finished then stops it.
You can use this to implement your termination strategy.
If a orchestrator starts without tasks it will finish right away.
-
def
onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit
- Attributes
- protected
- Definition Classes
- Eventsourced
-
def
onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit
- Attributes
- protected
- Definition Classes
- Eventsourced
-
def
onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit
- Attributes
- protected
- Definition Classes
- Eventsourced
-
def
onStart(startId: Long): Unit
User overridable callback.
User overridable callback. Its called after the orchestrator starts but before any of the tasks start.
By default logs that the Orchestrator has started.
-
def
onTaskAbort(task: FullTask[_, _], cause: Throwable): Unit
User overridable callback.
User overridable callback. Its called every time a task aborts.
You can use this to implement very refined termination strategies.
By default aborts the orchestrator via
onAbort
with aTaskAborted
failure.Note: if you invoke become/unbecome inside this method, the contract that states "Waiting tasks or tasks which do not have this task as a dependency will remain untouched" will no longer be guaranteed. If you wish to still have this guarantee you can do
context.become(computeCurrentBehavior() orElse yourBehavior)
{ @see onTaskStart} for a callback when a task starts. { @see onTaskFinish } for a callback when a task finishes.
- task
the task that aborted.
-
def
onTaskFinish(task: FullTask[_, _]): Unit
User overridable callback.
User overridable callback. Its called every time a task finishes.
You can use this to implement very refined termination strategies.
By default just logs the
task
has finished.{ @see onTaskStart} for a callback when a task starts. { @see onTaskAbort} for a callback when a task aborts.
-
def
onTaskStart(task: FullTask[_, _], innerTask: Task[_]): Unit
User overridable callback.
User overridable callback. Its called every time a task starts.
By default just logs the
task
as started.{ @see onTaskFinish} for a callback when a task finishes. { @see onTaskAbort} for a callback when a task aborts.
-
implicit final
val
orchestrator: AbstractOrchestrator[_]
This exists to make the creation of FullTasks easier.
-
def
persist[A](event: A)(handler: (A) ⇒ Unit): Unit
- Definition Classes
- PersistentActor
-
def
persistAll[A](events: Seq[A])(handler: (A) ⇒ Unit): Unit
- Definition Classes
- PersistentActor
-
def
persistAllAsync[A](events: Seq[A])(handler: (A) ⇒ Unit): Unit
- Definition Classes
- PersistentActor
-
def
persistAsync[A](event: A)(handler: (A) ⇒ Unit): Unit
- Definition Classes
- PersistentActor
-
def
postRestart(reason: Throwable): Unit
- Definition Classes
- Actor
- Annotations
- @throws( classOf[java.lang.Exception] )
-
def
postStop(): Unit
- Definition Classes
- UnrestrictedStash → Actor
-
def
preRestart(reason: Throwable, message: Option[Any]): Unit
- Definition Classes
- UnrestrictedStash → Actor
-
def
preStart(): Unit
- Definition Classes
- Actor
- Annotations
- @throws( classOf[java.lang.Exception] )
-
def
receive: Receive
- Definition Classes
- PersistentActor → Actor
-
final
def
receiveCommand: akka.actor.Actor.Receive
- Definition Classes
- AbstractOrchestrator → Eventsourced
-
def
receiveRecover: akka.actor.Actor.Receive
- Definition Classes
- AbstractOrchestrator → Eventsourced
-
def
recovery: Recovery
- Definition Classes
- PersistenceRecovery
- final def recoveryAwarePersist(event: Any)(handler: ⇒ Unit): Unit
-
def
recoveryFinished: Boolean
- Definition Classes
- Eventsourced
-
def
recoveryRunning: Boolean
- Definition Classes
- Eventsourced
-
def
redeliverInterval: FiniteDuration
- Definition Classes
- AtLeastOnceDeliveryLike
-
def
redeliveryBurstLimit: Int
- Definition Classes
- AtLeastOnceDeliveryLike
-
def
saveSnapshot(snapshot: Any): Unit
- Definition Classes
- Snapshotter
-
implicit final
val
self: ActorRef
- Definition Classes
- Actor
-
final
def
sender(): ActorRef
- Definition Classes
- Actor
-
def
setDeliverySnapshot(snapshot: AtLeastOnceDeliverySnapshot): Unit
- Definition Classes
- AtLeastOnceDeliveryLike
- val settings: Settings
-
def
snapshotPluginId: String
- Definition Classes
- PersistenceIdentity
-
def
snapshotSequenceNr: Long
- Definition Classes
- Eventsourced → Snapshotter
-
def
snapshotterId: String
- Definition Classes
- Eventsourced → Snapshotter
- final def startId: Long
-
def
stash(): Unit
- Definition Classes
- Eventsourced → StashSupport
-
def
supervisorStrategy: SupervisorStrategy
- Definition Classes
- Actor
-
final
def
synchronized[T0](arg0: ⇒ T0): T0
- Definition Classes
- AnyRef
- final def tasks: Seq[FullTask[_, _]]
-
implicit
def
toCorrelationId(l: Long): CorrelationId
- Definition Classes
- IdImplicits
-
implicit
def
toDeliveryId(l: Long): DeliveryId
- Definition Classes
- IdImplicits
-
def
toString(): String
- Definition Classes
- AnyRef → Any
-
def
unhandled(message: Any): Unit
- Definition Classes
- Eventsourced → Actor
- final def unstarted: akka.actor.Actor.Receive
-
def
unstashAll(): Unit
- Definition Classes
- Eventsourced → StashSupport
-
final
def
wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @throws( ... )
- final def waitingTasks: HashMap[Int, Task[_]]
-
def
warnAfterNumberOfUnconfirmedAttempts: Int
- Definition Classes
- AtLeastOnceDeliveryLike
- def withLogPrefix(message: ⇒ String): String
-
def
→[B](y: B): (AbstractOrchestrator[R], B)
- Implicit
- This member is added by an implicit conversion from AbstractOrchestrator[R] to ArrowAssoc[AbstractOrchestrator[R]] performed by method ArrowAssoc in scala.Predef.
- Definition Classes
- ArrowAssoc