Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions example/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from gevent import monkey
monkey.patch_all()
import time
import sys
from tellapart.taba.client import taba_client

import gevent
from tellapart.taba import taba_client

taba_client.Initialize('test_client', 'http://localhost:8280/post')
taba_client.Initialize('test_client1', 'http://localhost:8280/post')
taba_client.RecordValue('test_name', (100, ))
taba_client.RecordValue('test_name1', (100, ))
taba_client.RecordValue('test_name2', (100, ))
taba_client.RecordValue('test_name', (1000, ))
taba_client.RecordValue('test_name1', (1000, ))
taba_client.RecordValue('test_name2', (1000, ))

time.sleep(1)
taba_client.Flush()
taba_client.Kill()

taba_client.Initialize('test_client2', 'http://localhost:8280/post')
taba_client.RecordValue('test_name3', (100, ))
taba_client.RecordValue('test_name4', (100, ))
taba_client.RecordValue('test_name5', (100, ))
taba_client.RecordValue('test_name3', (1000, ))
taba_client.RecordValue('test_name4', (1000, ))
taba_client.RecordValue('test_name5', (1000, ))

gevent.sleep(1)
time.sleep(1)
taba_client.Flush()
taba_client.Kill()

sys.exit()
43 changes: 43 additions & 0 deletions example/client_gevent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright 2012 TellApart, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from gevent import monkey
monkey.patch_all()

import gevent
from tellapart.taba.client.gevent import taba_client

taba_client.Initialize('test_client1', 'http://localhost:8280/post')
taba_client.RecordValue('test_name', (100, ))
taba_client.RecordValue('test_name1', (100, ))
taba_client.RecordValue('test_name2', (100, ))
taba_client.RecordValue('test_name', (1000, ))
taba_client.RecordValue('test_name1', (1000, ))
taba_client.RecordValue('test_name2', (1000, ))

gevent.sleep(1)
taba_client.Flush()

taba_client.Initialize('test_client2', 'http://localhost:8280/post')
taba_client.RecordValue('test_name3', (100, ))
taba_client.RecordValue('test_name4', (100, ))
taba_client.RecordValue('test_name5', (100, ))
taba_client.RecordValue('test_name3', (1000, ))
taba_client.RecordValue('test_name4', (1000, ))
taba_client.RecordValue('test_name5', (1000, ))

gevent.sleep(1)
taba_client.Flush()


4 changes: 4 additions & 0 deletions example/run_client_gevent.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash

PYTHONPATH=../py:../py/tellapart/third_party python client_gevent.py

2 changes: 2 additions & 0 deletions py/tellapart/storage/engine/memory_redis_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

from tellapart.storage.operation import Operation

import fnmatch

class MemoryRedisEngine(object):
def __init__(self, endpoints, num_vbuckets):
"""RedisEngine compatible signature.
Expand Down
1 change: 1 addition & 0 deletions py/tellapart/storage/engine/redis_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import itertools
import random
import traceback
import fnmatch

import gevent
from gevent.queue import LifoQueue
Expand Down
3 changes: 3 additions & 0 deletions py/tellapart/taba/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# See: http://docs.python.org/library/pkgutil.html#pkgutil.extend_path
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
3 changes: 3 additions & 0 deletions py/tellapart/taba/client/gevent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# See: http://docs.python.org/library/pkgutil.html#pkgutil.extend_path
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
224 changes: 224 additions & 0 deletions py/tellapart/taba/client/taba_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
# Copyright 2012 TellApart, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Taba Client class and singleton instance. Acts as an interface for recording
Taba Events from a client application to a Taba Agent.
"""

import time
import threading
import cjson

from tellapart.taba.taba_event import SerializeEvent
from tellapart.taba.taba_event import TabaEvent
from tellapart.taba.taba_type import TabaType
from tellapart.taba.util import misc_util

LOG = misc_util.MakeStreamLogger(__name__)

client = None

class TimedExecutorThread(threading.Thread):
"""Creates an executor thread that executes the given function with arguments
every 'period_seconds' seconds.

Args:
period_seconds - How long, in float seconds, to wait between function calls.
fn - A Python callable to invoke.
*args, **kwargs - The arguments to pass to 'fn'.

"""
def __init__(self, period_seconds, fn, *args, **kwargs):
self.period_seconds = period_seconds
self.func = fn
self.args = args
self.kwargs = kwargs
self.kill_recvd = False
threading.Thread.__init__(self)

def run(self):
while not self.kill_recvd:
time.sleep(self.period_seconds)
self.func(*self.args, **self.kwargs)


class TabaClient(object):
"""Class for maintaining and flushing a buffer of Taba Events. This buffer
is designed to be as small and simple as possible, relying on the Taba Agent
to handle sophisticated buffering and connections.
"""
def __init__(self, client_id, flush_period, event_post_url, dummy_mode=False):
"""
Args:
client_id - The Client ID string for this Taba Client. This value should
be unique within the Taba deployment, and should be durable between
restarts of the same logical client.
"""
self.flush_period = flush_period
self.event_post_url = event_post_url
self.dummy_mode = dummy_mode
self.lock = threading.Lock()

self.buffer = []
self.client_id = client_id
self.failures = 0

def _Flush(self):
"""Serialize and flush the internal buffer of Taba Events to the Taba Agent.
"""

with self.lock:
num_events = len(self.buffer)
if num_events == 0:
return

body_lines = []
body_lines.append(self.client_id)
body_lines.extend([cjson.encode(SerializeEvent(e)) for e in self.buffer])

self.buffer = []

if self.dummy_mode:
return

response = misc_util.GenericFetchFromUrlToString(
self.event_post_url,
post_data='\n'.join(body_lines) + '\n')

if response.status_code != 200:
LOG.error('Failed to flush %d Taba buffer to Agent' % num_events)
self.failures += 1

def Initialize(self):
"""Start periodically flushing the Taba Event buffer.
"""
self.flush_thread = TimedExecutorThread(
self.flush_period,
self._Flush)
self.flush_thread.start()

def RecordEvent(self, name, type, value):
"""Create and buffer a Taba Event.

Args:
name - Taba Name to post the event to.
type - Taba Type of the Taba Name.
value - The value being recorded.
"""
now = time.time()
self.buffer.append(TabaEvent(name, type, value, now))

def State(self):
"""Return a JSON encoded dictionary of the Taba Client state. The returned
value has the fields:
failures - The number of flush events that failed.
buffer_size - The number of Taba Events currently in the buffer.
"""
state = {
'failures' : self.failures,
'buffer_size' : len(self.buffer)}
return cjson.encode(state)

def Kill(self):
"""Kills the TimedExecutor Thread. ReInitialization required after this.
"""
self.flush_thread.kill_recvd = True


class Taba(object):
"""Proxy object for posting events to a specific Taba Name (Tab)"""

def __init__(self, name, type=TabaType.COUNTER_GROUP):
"""
Args:
name - Taba Name string to post Events to.
type - TabaType of this Tab to record Events to.
"""
self.name = name
self.type = type

def RecordValue(self, *args):
"""Post an Event to this Tab.

Args:
*args - List of arguments to pass to this event.
"""
RecordValue(self.name, args, self.type)

#----------------------------------------------------------
# Proxy functions to the global Taba Client
#----------------------------------------------------------

def RecordValue(name, value_tuple=(1,), type=TabaType.COUNTER_GROUP):
"""Record an Event to a Taba Name.

Args:
name - The Taba Name to post the Event to.
value_tuple - The tuple of values being recorded.
type - TabaType of the Tab.
"""
global client
if not client:
return
else:
client.RecordEvent(name, type, value_tuple)

def Initialize(client_id, event_post_url, flush_period=1, dummy_mode=False):
"""Initialize the singleton Taba Client.

Args:
client_id - The Client ID string for this Taba Client. This value should be
unique in the Taba deployment, and should be durable between restarts
of the same logical client.
"""
global client
client = TabaClient(client_id, flush_period, event_post_url, dummy_mode)
client.Initialize()

def Flush():
"""Force the Taba Client to flush it's buffer to the Agent.
"""
global client
if not client:
return
else:
client._Flush()

def GetStatus():
"""Return a JSON encoded dictionary of the Taba Client state. The returned
value has the fields:
failures - The number of flush events that failed
buffer_size - The number of Taba Events currently in the buffer
"""
global client
if not client:
status = "Uninitialized"
else:
status = client.State()

return status

def Kill():
""" Kills the thread that flushes the buffer and nulls the global client
Would require re-initialization after this.
"""
global client
if not client:
return
else:
client.Kill()
client = None
return

Loading