Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 27 additions & 24 deletions demo/chat/Database.hs
Original file line number Diff line number Diff line change
@@ -1,34 +1,36 @@
{-# LANGUAGE NoImplicitPrelude #-}

module Database
( databaseToUIUpdater
( chatroomUuid
, databaseToUIUpdater
, loadAllMessages
, messagePoster
, newMessage
) where

import RON.Prelude

import Control.Concurrent.STM (TChan, atomically, readTChan,
writeTChan)
import Control.Monad (forever, when)
import Control.Monad.IO.Class (liftIO)
import Data.List (sortOn)
import Data.Maybe (catMaybes)
import Data.Traversable (for)
import Control.Monad (forever)
import RON.Data.ORSet.Experimental (ORSet)
import qualified RON.Data.ORSet.Experimental as ORSet
import RON.Error (MonadE)
import RON.Event (ReplicaClock)
import RON.Store (MonadStore, newObject, openNamedObject, readObject)
import RON.Store (MonadStore, newObject, readObject)
import RON.Store.FS (runStore)
import qualified RON.Store.FS as Store
import RON.Types (Atom (AString), ObjectRef (ObjectRef))
import RON.Types (Atom (AString, AUuid), UUID)
import RON.Types.Experimental (Ref (..))
import qualified RON.UUID as UUID
import System.IO (putStrLn)

import Types (MessageContent (MessageContent), MessageView, postTime)
import qualified Types
import Types (Env (..), MessageContent (..), MessageView, postTime)

loadAllMessages :: Store.Handle -> IO [MessageView]
loadAllMessages db =
runStore db $ do
gMessages <- openMessages
mMessageSet <- readObject gMessages
mMessageSet <- readObject gMessageSetRef
case mMessageSet of
Nothing -> do
liftIO $ putStrLn "!!! messages collection doesn't exist !!!"
Expand All @@ -37,36 +39,37 @@ loadAllMessages db =
messageRefs <- ORSet.toList messageSet
sortOn postTime . catMaybes <$> for messageRefs readObject

openMessages ::
(MonadE m, MonadStore m, ReplicaClock m) =>
m (ObjectRef (ORSet (ObjectRef MessageView)))
openMessages = openNamedObject "messages"

newMessage ::
(MonadE m, MonadStore m, ReplicaClock m) =>
MessageContent -> m (ObjectRef MessageView)
MessageContent -> m (Ref MessageView)
newMessage MessageContent{username, text} = do
gMessages <- openMessages
msgRef <- newObject @MessageView
ORSet.add_ msgRef ("username", [AString username])
ORSet.add_ msgRef ("text", [AString text ])
ORSet.add_ gMessages msgRef
ORSet.add_ gMessageSetRef msgRef
pure msgRef

messagePoster :: TChan MessageContent -> Store.Handle -> IO ()
messagePoster onMessagePosted db =
messagePoster :: TChan MessageContent -> Store.Handle -> Env -> IO ()
messagePoster onMessagePosted db Env{putLog} =
forever $ do
message <- atomically $ readTChan onMessagePosted
putLog $ "Saving message " <> show message
runStore db $ newMessage message

databaseToUIUpdater :: Store.Handle -> TChan [MessageView] -> IO ()
databaseToUIUpdater db onMessageListUpdated = do
ObjectRef messageSetId <- runStore db openMessages
Store.subcribeToObject db messageSetId
Store.subcribeToObject db chatroomUuid
onObjectChanged <- Store.fetchUpdates db
forever $ do
objectId <- atomically $ readTChan onObjectChanged
when (objectId == messageSetId) $ do
when (objectId == chatroomUuid) $ do
messages <- loadAllMessages db
atomically $ writeTChan onMessageListUpdated messages
-- ignore other changes

chatroomUuid :: UUID
chatroomUuid = $(UUID.liftName "chatroom")

gMessageSetRef :: Ref (ORSet (Ref MessageView))
gMessageSetRef = Ref chatroomUuid [AUuid $(UUID.liftName "message")]
9 changes: 9 additions & 0 deletions demo/chat/NetNode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import System.Exit (ExitCode (ExitFailure))
import System.IO (hPutStrLn, stderr)
import System.Posix.Process (exitImmediately)

import Database (chatroomUuid)
import Fork (fork)

startWorkers ::
Expand All @@ -34,6 +35,14 @@ dialog db conn = do
-- send object update requests
-- fork $
do
ops <- fmap fold $ Store.runStore db $ Store.loadObjectLog chatroomUuid mempty
let netMessage = ObjectOps chatroomUuid ops
if null ops then do
putLog "No ops for chatroom"
else do
putLog $ "Log for chatroom " <> show netMessage
WS.sendBinaryData conn $ Aeson.encode netMessage

objectSubscriptions <- Store.readObjectSubscriptions db
for_ objectSubscriptions $ \object ->
WS.sendBinaryData conn =<< encodeNetMessage RequestChanges{object}
Expand Down
18 changes: 18 additions & 0 deletions demo/chat/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# RON demo chat

## PoC. One god room

# Room

There must be a god object with id `chatroom` of type ORSet (`:set`).

If it does not exist, it must be created on first run.

Room references a message with op with payload starting with `message` keyword
followed by the message object id:

message {object_id : UUID}

# Messages

Each message is an object of type ORSet (`:set`).
1 change: 1 addition & 0 deletions demo/demo.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ executable demo-chat
OverloadedStrings
RecordWildCards
ScopedTypeVariables
TemplateHaskell
TypeApplications
TypeFamilies
ghc-options: -dynamic -rtsopts -with-rtsopts=-N
Expand Down
10 changes: 10 additions & 0 deletions ron-rdt/lib/RON/Data/Experimental.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
Expand All @@ -18,6 +19,7 @@ import RON.Prelude
import RON.Error (MonadE, throwErrorText)
import RON.Types (Atom (AString, AUuid), ObjectRef (..), OpenFrame,
UUID)
import RON.Types.Experimental (Ref (..))

class Replicated a where

Expand Down Expand Up @@ -91,3 +93,11 @@ instance (AsAtom head, AsAtoms tail) => AsAtoms (head, tail) where
fromAtoms = \case
[] -> throwErrorText "Expected some atoms, got none"
head : tail -> (,) <$> fromAtom head <*> fromAtoms tail

instance AsAtoms (Ref a) where
toAtoms Ref{object, path} = AUuid object : path

fromAtoms = \case
[] -> throwErrorText "Expected some atoms, got none"
AUuid object : path -> pure Ref{object, path}
a : _ -> throwErrorText $ "Expected UUID, got " <> show a
21 changes: 12 additions & 9 deletions ron-rdt/lib/RON/Data/ORSet/Experimental.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ import RON.Data.Experimental (AsAtom, AsAtoms, Rep, Replicated,
import RON.Data.ORSet (setType)
import RON.Error (MonadE, liftMaybe)
import RON.Event (ReplicaClock, advanceToUuid, getEventUuid)
import RON.Store.Class (MonadStore, appendPatch)
import RON.Store.Class (MonadStore, appendPatchFromOneOrigin)
import RON.Text.Serialize (serializeAtom)
import RON.Types (ObjectRef (..), Op (..), Payload, UUID)
import RON.Types (Op (..), Payload, UUID)
import RON.Types.Experimental (Ref (..))

-- | Observed-Remove Set.
-- Implementation: a map from the itemId to the original op.
Expand Down Expand Up @@ -65,23 +66,25 @@ instance ReplicatedObject (ORSet a) where
-- | Add value to the set. Return the reference to the set item.
add ::
(Rep container ~ ORSet item, AsAtoms item, MonadStore m, ReplicaClock m) =>
ObjectRef container -> item -> m UUID
add (ObjectRef object) value = do
Ref container -> item -> m UUID
add (Ref object path) value = do
advanceToUuid object
opId <- getEventUuid
appendPatch object [Op{opId, refId = object, payload = toAtoms value}]
appendPatchFromOneOrigin
object
[Op{opId, refId = object, payload = path ++ toAtoms value}]
pure opId

{- |
Add value to the set or map.

@add_ :: ObjectRef (ORSet a) -> a -> m ()@
@add_ :: ObjectRef (ORMap k v) -> (k, v) -> m ()@
@add_ :: Ref (ORSet a) -> a -> m ()@
@add_ :: Ref (ORMap k v) -> (k, v) -> m ()@
-}
add_ ::
(Rep container ~ ORSet item, AsAtoms item, MonadStore m, ReplicaClock m) =>
ObjectRef container -> item -> m ()
add_ objectRef payload = void $ add objectRef payload
Ref container -> item -> m ()
add_ ref payload = void $ add ref payload

toList :: (AsAtoms a, MonadE m) => ORSet a -> m [a]
toList (ORSet rep) =
Expand Down
86 changes: 33 additions & 53 deletions ron-rdt/lib/RON/Store.hs
Original file line number Diff line number Diff line change
@@ -1,85 +1,65 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}

module RON.Store (
MonadStore (..),
newObject,
openNamedObject,
readGlobalSet,
readObject,
) where

import RON.Prelude

import RON.Data.Experimental (AsAtoms, Rep, ReplicatedObject,
replicatedTypeId, stateFromFrame, view)
import RON.Data.ORSet (setType)
import RON.Data.ORSet.Experimental (ORMap)
import qualified RON.Data.ORSet.Experimental as ORMap
import Data.List (stripPrefix)
import RON.Data.Experimental (Rep, ReplicatedObject, replicatedTypeId,
stateFromFrame, view)
import RON.Error (MonadE, errorContext)
import RON.Event (ReplicaClock, getEventUuid)
import RON.Store.Class (MonadStore (..))
import RON.Types (Atom, ObjectRef (..), Op (..), UUID)
import RON.Types (Op (..), UUID)
import RON.Types.Experimental (Ref (..))
import qualified RON.UUID as UUID

newObject ::
forall a m.
(MonadStore m, ReplicatedObject a, ReplicaClock m) => m (ObjectRef a)
forall a m. (MonadStore m, ReplicatedObject a, ReplicaClock m) => m (Ref a)
newObject = do
objectId <- getEventUuid
let typeId = replicatedTypeId @(Rep a)
let initOp = Op{opId = objectId, refId = typeId, payload = []}
appendPatch objectId [initOp]
pure $ ObjectRef objectId
appendPatchFromOneOrigin objectId [initOp]
pure $ Ref objectId []

-- | Nothing if object doesn't exist in the replica.
readObject ::
(MonadE m, MonadStore m, ReplicatedObject a, Typeable a) =>
ObjectRef a -> m (Maybe a)
readObject object@(ObjectRef objectId) =
Ref a -> m (Maybe a)
readObject object@(Ref objectId path) =
errorContext ("readObject " <> show object) $ do
ops <- fold <$> loadObjectLog objectId mempty
case ops of
[] -> pure Nothing
_ -> Just <$> view objectId (stateFromFrame objectId $ sortOn opId ops)
_ ->
fmap Just $
view objectId $
stateFromFrame objectId $
sortOn
opId
[ op{payload = payload'}
| op@Op{payload} <- ops
, Just payload' <- [stripPrefix path payload]
]

-- | Read global variable identified by atom and return result as set.
readGlobalSet ::
(MonadE m, MonadStore m, AsAtoms a, Typeable a) => Atom -> m [a]
readGlobalSet name =
errorContext ("readGlobalSet " <> show name) $ do
mGlobals <- readObject globalsRef
globals <- case mGlobals of
Just globals -> pure globals
Nothing -> do
createGlobals
pure ORMap.empty
ORMap.lookupSet name globals
where
createGlobals =
appendPatch
globalsId
[Op{opId = globalsId, refId = setType, payload = []}]

globalsId :: UUID
globalsId = $(UUID.liftName "globals")

globalsRef :: ObjectRef (ORMap Atom a)
globalsRef = ObjectRef globalsId

openNamedObject ::
(MonadE m, MonadStore m, ReplicaClock m, ReplicatedObject a, Typeable a) =>
Atom -> m (ObjectRef a)
openNamedObject name = do
set <- readGlobalSet name
case set of
[obj] -> pure obj
[] -> do
obj <- newObject
ORMap.add_ globalsRef (name, obj)
pure obj
_ -> error "TODO: merge objects"
-- TODO: Check when this was introduced, and if is needed
-- | Append an arbitrary sequence of operations to an object. No preconditions.
-- appendPatches :: MonadStore m => UUID -> [Op] -> m ()
-- appendPatches object ops =
-- for_ patches $ appendPatchFromOneOrigin object
-- where
-- patches =
-- Map.fromListWith
-- (++)
-- [ (uuidOrigin, [op])
-- | op@Op{opId = UUID.split -> UuidFields{uuidOrigin}} <- ops
-- ]
8 changes: 4 additions & 4 deletions ron-rdt/lib/RON/Store/Class.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ class Monad m => MonadStore m where
listObjects :: m [UUID]

{- |
Append a sequence of operations to an existing object.
Must have the same origin.
-}
appendPatch :: UUID -> [Op] -> m ()
Append a sequence of operations to an object.
Precondition: all operations must have the same origin.
-}
appendPatchFromOneOrigin :: UUID -> [Op] -> m ()

-- | Get all object logs split by replicas. Replicas order is not guaranteed.
loadObjectLog ::
Expand Down
Loading