diff --git a/example/client.py b/example/client.py index 026e730..794e9b1 100644 --- a/example/client.py +++ b/example/client.py @@ -12,16 +12,32 @@ # See the License for the specific language governing permissions and # limitations under the License. -from gevent import monkey -monkey.patch_all() +import time +import sys +from tellapart.taba.client import taba_client -import gevent -from tellapart.taba import taba_client - -taba_client.Initialize('test_client', 'http://localhost:8280/post') +taba_client.Initialize('test_client1', 'http://localhost:8280/post') taba_client.RecordValue('test_name', (100, )) +taba_client.RecordValue('test_name1', (100, )) +taba_client.RecordValue('test_name2', (100, )) taba_client.RecordValue('test_name', (1000, )) +taba_client.RecordValue('test_name1', (1000, )) +taba_client.RecordValue('test_name2', (1000, )) + +time.sleep(1) +taba_client.Flush() +taba_client.Kill() + +taba_client.Initialize('test_client2', 'http://localhost:8280/post') +taba_client.RecordValue('test_name3', (100, )) +taba_client.RecordValue('test_name4', (100, )) +taba_client.RecordValue('test_name5', (100, )) +taba_client.RecordValue('test_name3', (1000, )) +taba_client.RecordValue('test_name4', (1000, )) +taba_client.RecordValue('test_name5', (1000, )) -gevent.sleep(1) +time.sleep(1) taba_client.Flush() +taba_client.Kill() +sys.exit() diff --git a/example/client_gevent.py b/example/client_gevent.py new file mode 100644 index 0000000..d7b4e81 --- /dev/null +++ b/example/client_gevent.py @@ -0,0 +1,43 @@ +# Copyright 2012 TellApart, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from gevent import monkey +monkey.patch_all() + +import gevent +from tellapart.taba.client.gevent import taba_client + +taba_client.Initialize('test_client1', 'http://localhost:8280/post') +taba_client.RecordValue('test_name', (100, )) +taba_client.RecordValue('test_name1', (100, )) +taba_client.RecordValue('test_name2', (100, )) +taba_client.RecordValue('test_name', (1000, )) +taba_client.RecordValue('test_name1', (1000, )) +taba_client.RecordValue('test_name2', (1000, )) + +gevent.sleep(1) +taba_client.Flush() + +taba_client.Initialize('test_client2', 'http://localhost:8280/post') +taba_client.RecordValue('test_name3', (100, )) +taba_client.RecordValue('test_name4', (100, )) +taba_client.RecordValue('test_name5', (100, )) +taba_client.RecordValue('test_name3', (1000, )) +taba_client.RecordValue('test_name4', (1000, )) +taba_client.RecordValue('test_name5', (1000, )) + +gevent.sleep(1) +taba_client.Flush() + + diff --git a/example/run_client_gevent.sh b/example/run_client_gevent.sh new file mode 100644 index 0000000..de90921 --- /dev/null +++ b/example/run_client_gevent.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +PYTHONPATH=../py:../py/tellapart/third_party python client_gevent.py + diff --git a/py/tellapart/storage/engine/memory_redis_engine.py b/py/tellapart/storage/engine/memory_redis_engine.py index 8f8ae1b..05c30cc 100644 --- a/py/tellapart/storage/engine/memory_redis_engine.py +++ b/py/tellapart/storage/engine/memory_redis_engine.py @@ -19,6 +19,8 @@ from tellapart.storage.operation import Operation +import fnmatch + class MemoryRedisEngine(object): def __init__(self, endpoints, num_vbuckets): """RedisEngine compatible signature. diff --git a/py/tellapart/storage/engine/redis_engine.py b/py/tellapart/storage/engine/redis_engine.py index 85ca00d..5fc9cf9 100644 --- a/py/tellapart/storage/engine/redis_engine.py +++ b/py/tellapart/storage/engine/redis_engine.py @@ -21,6 +21,7 @@ import itertools import random import traceback +import fnmatch import gevent from gevent.queue import LifoQueue diff --git a/py/tellapart/taba/client/__init__.py b/py/tellapart/taba/client/__init__.py new file mode 100644 index 0000000..c768b96 --- /dev/null +++ b/py/tellapart/taba/client/__init__.py @@ -0,0 +1,3 @@ +# See: http://docs.python.org/library/pkgutil.html#pkgutil.extend_path +from pkgutil import extend_path +__path__ = extend_path(__path__, __name__) diff --git a/py/tellapart/taba/client/gevent/__init__.py b/py/tellapart/taba/client/gevent/__init__.py new file mode 100644 index 0000000..c768b96 --- /dev/null +++ b/py/tellapart/taba/client/gevent/__init__.py @@ -0,0 +1,3 @@ +# See: http://docs.python.org/library/pkgutil.html#pkgutil.extend_path +from pkgutil import extend_path +__path__ = extend_path(__path__, __name__) diff --git a/py/tellapart/taba/taba_client.py b/py/tellapart/taba/client/gevent/taba_client.py similarity index 100% rename from py/tellapart/taba/taba_client.py rename to py/tellapart/taba/client/gevent/taba_client.py diff --git a/py/tellapart/taba/client/taba_client.py b/py/tellapart/taba/client/taba_client.py new file mode 100644 index 0000000..657228b --- /dev/null +++ b/py/tellapart/taba/client/taba_client.py @@ -0,0 +1,224 @@ +# Copyright 2012 TellApart, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Taba Client class and singleton instance. Acts as an interface for recording +Taba Events from a client application to a Taba Agent. +""" + +import time +import threading +import cjson + +from tellapart.taba.taba_event import SerializeEvent +from tellapart.taba.taba_event import TabaEvent +from tellapart.taba.taba_type import TabaType +from tellapart.taba.util import misc_util + +LOG = misc_util.MakeStreamLogger(__name__) + +client = None + +class TimedExecutorThread(threading.Thread): + """Creates an executor thread that executes the given function with arguments + every 'period_seconds' seconds. + + Args: + period_seconds - How long, in float seconds, to wait between function calls. + fn - A Python callable to invoke. + *args, **kwargs - The arguments to pass to 'fn'. + + """ + def __init__(self, period_seconds, fn, *args, **kwargs): + self.period_seconds = period_seconds + self.func = fn + self.args = args + self.kwargs = kwargs + self.kill_recvd = False + threading.Thread.__init__(self) + + def run(self): + while not self.kill_recvd: + time.sleep(self.period_seconds) + self.func(*self.args, **self.kwargs) + + +class TabaClient(object): + """Class for maintaining and flushing a buffer of Taba Events. This buffer + is designed to be as small and simple as possible, relying on the Taba Agent + to handle sophisticated buffering and connections. + """ + def __init__(self, client_id, flush_period, event_post_url, dummy_mode=False): + """ + Args: + client_id - The Client ID string for this Taba Client. This value should + be unique within the Taba deployment, and should be durable between + restarts of the same logical client. + """ + self.flush_period = flush_period + self.event_post_url = event_post_url + self.dummy_mode = dummy_mode + self.lock = threading.Lock() + + self.buffer = [] + self.client_id = client_id + self.failures = 0 + + def _Flush(self): + """Serialize and flush the internal buffer of Taba Events to the Taba Agent. + """ + + with self.lock: + num_events = len(self.buffer) + if num_events == 0: + return + + body_lines = [] + body_lines.append(self.client_id) + body_lines.extend([cjson.encode(SerializeEvent(e)) for e in self.buffer]) + + self.buffer = [] + + if self.dummy_mode: + return + + response = misc_util.GenericFetchFromUrlToString( + self.event_post_url, + post_data='\n'.join(body_lines) + '\n') + + if response.status_code != 200: + LOG.error('Failed to flush %d Taba buffer to Agent' % num_events) + self.failures += 1 + + def Initialize(self): + """Start periodically flushing the Taba Event buffer. + """ + self.flush_thread = TimedExecutorThread( + self.flush_period, + self._Flush) + self.flush_thread.start() + + def RecordEvent(self, name, type, value): + """Create and buffer a Taba Event. + + Args: + name - Taba Name to post the event to. + type - Taba Type of the Taba Name. + value - The value being recorded. + """ + now = time.time() + self.buffer.append(TabaEvent(name, type, value, now)) + + def State(self): + """Return a JSON encoded dictionary of the Taba Client state. The returned + value has the fields: + failures - The number of flush events that failed. + buffer_size - The number of Taba Events currently in the buffer. + """ + state = { + 'failures' : self.failures, + 'buffer_size' : len(self.buffer)} + return cjson.encode(state) + + def Kill(self): + """Kills the TimedExecutor Thread. ReInitialization required after this. + """ + self.flush_thread.kill_recvd = True + + +class Taba(object): + """Proxy object for posting events to a specific Taba Name (Tab)""" + + def __init__(self, name, type=TabaType.COUNTER_GROUP): + """ + Args: + name - Taba Name string to post Events to. + type - TabaType of this Tab to record Events to. + """ + self.name = name + self.type = type + + def RecordValue(self, *args): + """Post an Event to this Tab. + + Args: + *args - List of arguments to pass to this event. + """ + RecordValue(self.name, args, self.type) + +#---------------------------------------------------------- +# Proxy functions to the global Taba Client +#---------------------------------------------------------- + +def RecordValue(name, value_tuple=(1,), type=TabaType.COUNTER_GROUP): + """Record an Event to a Taba Name. + + Args: + name - The Taba Name to post the Event to. + value_tuple - The tuple of values being recorded. + type - TabaType of the Tab. + """ + global client + if not client: + return + else: + client.RecordEvent(name, type, value_tuple) + +def Initialize(client_id, event_post_url, flush_period=1, dummy_mode=False): + """Initialize the singleton Taba Client. + + Args: + client_id - The Client ID string for this Taba Client. This value should be + unique in the Taba deployment, and should be durable between restarts + of the same logical client. + """ + global client + client = TabaClient(client_id, flush_period, event_post_url, dummy_mode) + client.Initialize() + +def Flush(): + """Force the Taba Client to flush it's buffer to the Agent. + """ + global client + if not client: + return + else: + client._Flush() + +def GetStatus(): + """Return a JSON encoded dictionary of the Taba Client state. The returned + value has the fields: + failures - The number of flush events that failed + buffer_size - The number of Taba Events currently in the buffer + """ + global client + if not client: + status = "Uninitialized" + else: + status = client.State() + + return status + +def Kill(): + """ Kills the thread that flushes the buffer and nulls the global client + Would require re-initialization after this. + """ + global client + if not client: + return + else: + client.Kill() + client = None + return + diff --git a/py/tellapart/taba/server/taba_server.py b/py/tellapart/taba/server/taba_server.py index ead5e4d..9382204 100644 --- a/py/tellapart/taba/server/taba_server.py +++ b/py/tellapart/taba/server/taba_server.py @@ -22,6 +22,7 @@ import random import time import traceback +import fnmatch import cjson import gevent @@ -147,16 +148,16 @@ def ReceiveEvents(self, client_id, events): self._UpdateAppliedLatency(client_id, events) - def GetStates(self, client_id=None, names=None, name_blocks=None): + def GetStates(self, clients=None, names=None, name_blocks=None): """Retrieve the raw State object(s). If the client_id or name arguments are None or empty then the State(s) are retrieved for all clients or names respectively. Args: - client_id - Client ID string to retrieve state objects for. If None or - empty, then all clients are retrieved. - names - List of Taba Names to retrieve state objects for. If None or - empty, then all Taba Names will be retrieved. + clients - Client IDs string to retrieve state objects for. If None + then all clients are retrieved. + names - List of Taba Names to retrieve state objects for. If None + then all Taba Names will be retrieved. name_block - Taba Name Block to retrieve state objects for. If None or empty, then all blocks will be retrieved. @@ -164,24 +165,26 @@ def GetStates(self, client_id=None, names=None, name_blocks=None): A list of ((client_id, name), state) tuples. """ # Specific State object - retrieve directly. - if names and len(names) == 1 and client_id: - op = self.dao.StateGet(client_id, names[0]) + + # Add check to see if name is a glob + if names and len(names) == 1 and clients and len(clients) == 1: + op = self.dao.StateGet(clients[0], names[0]) if op.success and op.response_value[1] is not None: states = [op.response_value] else: states = [] # Specific Taba Names and Client ID - don't bother distributing. - elif names and client_id: - states = self.dao.StateIteratorForNameList(names, client_id=client_id) + elif names and clients: + states = self.dao.StateIteratorForNameList(names, clients=clients) # Specific Taba Names - don't bother distributing. elif names: states = self.dao.StateIteratorForNameList(names) # Specific Client ID - don't bother distributing. - elif client_id: - states = self.dao.StateIteratorForClient(client_id) + elif clients: + states = self.dao.StateIteratorForClients(clients) # Specific Taba Name Blocks - retrieve all States in the Block. elif name_blocks is not None: @@ -193,14 +196,14 @@ def GetStates(self, client_id=None, names=None, name_blocks=None): return states - def GetProjections(self, client_id=None, names=None, name_blocks=None, + def GetProjections(self, clients=None, names=None, name_blocks=None, handlers=None): """Retrieve the raw Projection object(s). If the client_id or name arguments are None or empty then the Projection(s) are retrieved for all clients or names respectively. Args: - client_id - Client ID string to retrieve projection objects for. If None + clients - Client IDs string to retrieve projection objects for. If None or empty, then all clients are retrieved. names - List of Taba Name to retrieve projection objects for. if None or empty, then all Taba Names will be retrieved. @@ -213,14 +216,14 @@ def GetProjections(self, client_id=None, names=None, name_blocks=None, Returns: A list of ((client_id, name), projection) tuples. """ - if not (names or client_id) and name_blocks is None and self.redistribute: + if not (names or clients) and name_blocks is None and self.redistribute: # Non-specific request - redistribute the request. projections = self._DistributeRequestByTabaNameBlock('projection') for projection in projections.iteritems(): yield projection else: - states = self.GetStates(client_id, names, name_blocks) + states = self.GetStates(clients, names, name_blocks) # Get the required Handlers. if not handlers: @@ -249,8 +252,8 @@ def GetAggregates(self, names=None, blocks=None, handlers=None): all Taba Names. Args: - names - List of Taba Name to retrieve Aggregate Projections for. If None - or blank, retrieve for all Taba Names. + names - List of Taba Name to retrieve Aggregate Projections for. If None, + retrieve for all Taba Names. blocks - Taba Name Block to retrieve state objects for. If None or empty, then all blocks will be retrieved. handlers - Map of {Taba Name: Handler} of pre-fetched handlers. If set, @@ -322,12 +325,12 @@ def GetAggregates(self, names=None, blocks=None, handlers=None): if aggregate: yield (last_name, aggregate) - def GetRendered(self, client_id=None, names=None, name_blocks=None): + def GetRendered(self, clients=None, names=None, name_blocks=None): """Retrieve Rendered Tabas. If client_id is specified, render Projections, otherwise, render Aggregates. Args: - client_id - Client ID string to product Rendered Tabas for. If specified + clients - Client IDs string to product Rendered Tabas for. If specified Projections will be rendered, otherwise Aggregates across all Clients will be rendered. names - List of Taba Name strings to render. If not specified, then all @@ -342,8 +345,8 @@ def GetRendered(self, client_id=None, names=None, name_blocks=None): all_names = self.GetNames() handlers = self._GetHandlers(all_names) - if client_id: - to_renders = self.GetProjections(client_id, names, name_blocks, handlers) + if clients: + to_renders = self.GetProjections(clients, names, name_blocks, handlers) to_renders = [(nm, proj) for (_, nm), proj in to_renders] else: @@ -421,6 +424,7 @@ def GetTabaTypes(self, names=None): if not op.success: raise Exception(op) names = op.response_value + op = self.dao.TabaTypeGetBatch(names) if not op.success: diff --git a/py/tellapart/taba/server/taba_server_handlers.py b/py/tellapart/taba/server/taba_server_handlers.py index cfe9285..364d2c5 100644 --- a/py/tellapart/taba/server/taba_server_handlers.py +++ b/py/tellapart/taba/server/taba_server_handlers.py @@ -23,6 +23,7 @@ import cjson import gevent +import fnmatch from tellapart.taba.util import misc_util from tellapart.taba.taba_event import TABA_EVENT_IDX_VALUE @@ -32,6 +33,14 @@ global_taba_server = None +class Param: + """ Encapsulates certain metadata of a parameter, such as if its a glob + """ + def __init__(self, value, isGlob): + self.value = value + self.isGlob = isGlob + + def TimedLoggedRequest(request_name): """Decorator for Juno request handlers that logs their parameters and execution time. @@ -165,7 +174,9 @@ def HandleGetRaw(request): Get Parameters: client - Client ID to get State objects for, or blank for all Clients IDs. + client_glob - Client ID to get State objects for. taba - Taba Name to get State objects for, or blank for all Taba Names. + taba_glob - Taba Glob to get State objects for. block - Taba Name Block ID to get State objects for, or blank for all. Response: @@ -180,15 +191,34 @@ def HandleGetRaw(request): name = request.input('taba') block = request.input('block') - if name and block: + # add glob params for taba and client + name_glob = request.input('taba_glob') + client_glob = request.input('client_glob') + + if (name or name_glob) and block: juno.status(400) juno.append('Cannot specify both "taba" and "block"') return + + # add rule.. only one of taba or taba_glob + if name and name_glob: + juno.status(400) + juno.append('Cannot specify both "taba_glob" and "taba"') + return + + # add rule.. only one of client_id or client_glob + if client_id and client_glob: + juno.status(400) + juno.append('Cannot specify both "client_glob" and "client"') + return blocks = block.split(',') if block else None names = [name] if name else None + names_param = Param(name_glob, True) if name_glob else Param(names, False) + clients_param = Param(client_glob, True) if client_glob \ + else Param(client_id, False) - return _RawResponse(client_id, names, blocks, _GetAccept(request)) + return _RawResponse(clients_param, names_param, blocks, _GetAccept(request)) @TimedLoggedRequest("Get Raw Batch") def HandleGetRawBatch(request): @@ -196,7 +226,7 @@ def HandleGetRawBatch(request): Post Body - a JSON dictionary with the following fields: client - Client ID to get State objects or, or blank for all. - taba - List of Taba Names to get State objects for, or blank for all. + taba - Taba Name to get State objects for, or blank for all Taba Names. block - List of Taba Name Block IDs to get State objects, or blank for all. Response: @@ -219,19 +249,29 @@ def HandleGetRawBatch(request): juno.append('Cannot specify both "names" and "block"') return - return _RawResponse(client_id, names, blocks, _GetAccept(request)) + return _RawResponse(Param(client_id, False), + Param(names, False), blocks, _GetAccept(request)) -def _RawResponse(client_id, names, blocks, accept): +def _RawResponse(clients_param, names_param, blocks, accept): """Retrieve raw State results, and add them to the response. Args: - client_id - Client ID string to retrieve State objects for. - names - List of Taba Names to retrieve State objects for. + clients_param - Param object containing either client_id or client glob + to retrieve State objects for. + names_param - Param object containing either List of Taba Names, + or a names_glob to retrieve State objects for. blocks - List of Taba Name Block IDs to retrieve State objects for. accept - MIME type in which to format the output. """ - # Get the requested State objects. - states = global_taba_server.GetStates(client_id, names, blocks) + + clients, names = _GetClientsAndNames(clients_param, names_param) + + if (clients != None and len(clients) == 0) \ + or (names != None and len(names) == 0): + states = [] + else: + # Get the requested State objects. + states = global_taba_server.GetStates(clients, names, blocks) # Format the response in accordance with the Accept header. if accept == 'text/json': @@ -249,7 +289,9 @@ def HandleGetProjection(request): Get Parameters: client - Client ID to get Projection(s) for, or blank for all Clients IDs. + client_glob - Client ID to get Projection(s) for. taba - Taba Name to get Projection(s) for, or blank for all Taba Names. + taba_glob - Taba Glob to get Projection(s) for. block - Taba Name Block to get Projections for, or blank for all. Response: @@ -264,15 +306,36 @@ def HandleGetProjection(request): name = request.input('taba') block = request.input('block') - if name and block: + + # add glob params for taba and client + name_glob = request.input('taba_glob') + client_glob = request.input('client_glob') + + if (name or name_glob) and block: juno.status(400) juno.append('Cannot specify both "taba" and "block"') return + + # add rule.. only one of taba or taba_glob + if name and name_glob: + juno.status(400) + juno.append('Cannot specify both "taba_glob" and "taba"') + return + + # add rule.. only one of client_id or client_glob + if client_id and client_glob: + juno.status(400) + juno.append('Cannot specify both "client_glob" and "client"') + return blocks = block.split(',') if block else None names = [name] if name else None + names_param = Param(name_glob, True) if name_glob else Param(names, False) + clients_param = Param(client_glob, True) if client_glob \ + else Param(client_id, False) - return _ProjectionResponse(client_id, names, blocks, _GetAccept(request)) + return _ProjectionResponse(clients_param, names_param, + blocks, _GetAccept(request)) @TimedLoggedRequest("Get Projection Batch") def HandleGetProjectionBatch(request): @@ -303,19 +366,29 @@ def HandleGetProjectionBatch(request): juno.append('Cannot specify both "taba" and "block"') return - return _ProjectionResponse(client_id, names, blocks, _GetAccept(request)) + return _ProjectionResponse(Param(client_id, False), + Param(names, False), blocks, _GetAccept(request)) -def _ProjectionResponse(client_id, names, blocks, accept): +def _ProjectionResponse(clients_param, names_param, blocks, accept): """Retrieve Projection results, and add them to the response. Args: - client_id - Client ID string to retrieve Projections for. - names - List of Taba Names to retrieve Projections for. + clients_param - Param object containing either client_id or client glob + to retrieve Projections for. + names_param - Param object containing either List of Taba Names, + or a names_glob to retrieve Projections for. blocks - List of Taba Name Block IDs to retrieve Projections for. accept - MIME type in which to format the output. """ - # Retrieve the requested Projections. - projections = global_taba_server.GetProjections(client_id, names, blocks) + clients, names = _GetClientsAndNames(clients_param, names_param) + + if (clients != None and len(clients) == 0) \ + or (names != None and len(names) == 0): + projections = [] + + else: + # Retrieve the requested Projections. + projections = global_taba_server.GetProjections(clients, names, blocks) # Render the Projection objects according to the requested format. if accept == 'text/json': @@ -345,16 +418,25 @@ def HandleGetAggregate(request): # Parse and validate query parameters. name = request.input('taba') block = request.input('block') + + name_glob = request.input('taba_glob') if name and block: juno.status(400) juno.append('Cannot specify both "taba" and "block"') return + # add rule.. only one of taba or taba_glob + if name and name_glob: + juno.status(400) + juno.append('Cannot specify both "taba_glob" and "taba"') + return + names = [name] if name else None blocks = block.split(',') if block else None - return _AggregateResponse(names, blocks, _GetAccept(request)) + names_param = Param(name_glob, True) if name_glob else Param(names, False) + return _AggregateResponse(names_param, blocks, _GetAccept(request)) @TimedLoggedRequest("Get Aggregate Batch") def HandleGetAggretateBatch(request): @@ -383,7 +465,7 @@ def HandleGetAggretateBatch(request): juno.append('Cannot specify both "taba" and "block"') return - return _AggregateResponse(names, blocks, _GetAccept(request)) + return _AggregateResponse(Param(names, False), blocks, _GetAccept(request)) def _AggregateResponse(names, blocks, accept): """Retrieve Aggregate results, and add them to the response. @@ -394,8 +476,17 @@ def _AggregateResponse(names, blocks, accept): accept - MIME type in which to format the output. """ - # Retrieve the requested Aggregates. - aggregates = global_taba_server.GetAggregates(names, blocks) + names = names_param.value + + if names_param.isGlob: + all_names = [] + names = fnmatch.filter(global_taba_server.GetNames(), names_param.value) + + if names != None and len(names) == 0: + aggregates = [] + + else: # Retrieve the requested Aggregates. + aggregates = global_taba_server.GetAggregates(names, blocks) # Format the response in accordance to the Accept header. if accept == 'text/json': @@ -424,22 +515,49 @@ def HandleGetTaba(request): text/json - JSON serialized. """ # Parse and validate query parameters. + client_id = request.input('client') name = request.input('taba') block = request.input('block') - if name and block: + # add glob params for taba and client + name_glob = request.input('taba_glob') + client_glob = request.input('client_glob') + + if (name or name_glob) and block: juno.status(400) juno.append('Cannot specify both "taba" and "block"') return + + # add rule.. only one of taba or taba_glob + if name and name_glob: + juno.status(400) + juno.append('Cannot specify both "taba_glob" and "taba"') + return - LOG.info("Starting Get Taba (%s, %s, %s)" % (client_id, name, block)) - start = time.time() + # add rule.. only one of client_id or client_glob + if client_id and client_glob: + juno.status(400) + juno.append('Cannot specify both "client_glob" and "client"') + return blocks = block.split(',') if block else None names = [name] if name else None + names_param = Param(name_glob, True) if name_glob else Param(names, False) + clients_param = Param(client_glob, True) if client_glob \ + else Param(client_id, False) + + LOG.info("Starting Get Taba (%s, %s, %s)" % (client_id, name, block)) + start = time.time() - renders = global_taba_server.GetRendered(client_id, names, blocks) + clients, names = _GetClientsAndNames(clients_param, names_param) + + if (clients != None and len(clients) == 0) \ + or (names != None and len(names) == 0): + renders = [] + + else: + renders = global_taba_server.GetRendered(clients, names, blocks) # Render the Projection objects according to the requested format. accept = request.raw.get('HTTP_ACCEPT') or 'text/plain' @@ -455,6 +573,41 @@ def HandleGetTaba(request): LOG.info("Finished Get Taba (%s, %s, %s) (%.2f)" % \ (client_id, name, block, time.time() - start)) + +def _GetClientsAndNames(clients_param, names_param): + """Helper function to process the client and names Param objects + and returns a list of clients and names + + Args: + clients_param - Param object wrapping client request parameter. + names_param - Param object wrapping taba name request parameter. + + """ + clients = clients_param.value + names = names_param.value + + # Ensure clients is either a list or None + if clients_param.isGlob: + all_clients = global_taba_server.GetClients() + clients = fnmatch.filter(all_clients, clients_param.value) + + else: + if clients_param.value: + clients = [clients_param.value] + + if names_param.isGlob: + all_names = [] + if clients: + for x in clients: + all_names.extend(global_taba_server.GetNamesForClient(x)) + names = fnmatch.filter(all_names, names_param.value) + + else: + names = fnmatch.filter(global_taba_server.GetNames(), names_param.value) + + return clients, names + + def HandleGetClients(request): """Juno request handler to retrieve all the Client IDs. @@ -513,7 +666,8 @@ def HandleGetType(request): """Juno request handler to retrieve the Taba Type for a Taba Name. Get Parameters: - taba - (Required) The Taba Name to retrieve the Taba Type for. + taba - The Taba Name to retrieve the Taba Type for. + taba_glob - Glob for the Taba Name (One of the above is required) Response: Taba Type, serialized depending on the Accept MIME type. @@ -523,8 +677,17 @@ def HandleGetType(request): text/json - JSON serialized. """ name = request.input('taba') + name_glob = request.input('taba_glob') + + if name and name_glob: + juno.status(400) + juno.append('Cannot specify both "taba_glob" and "taba"') + return + if name: names = [name] + elif name_glob: + names = fnmatch.filter(global_taba_server.GetNames(), name_glob) else: names = None diff --git a/py/tellapart/taba/server/taba_server_storage_manager.py b/py/tellapart/taba/server/taba_server_storage_manager.py index b44af00..0b6b308 100644 --- a/py/tellapart/taba/server/taba_server_storage_manager.py +++ b/py/tellapart/taba/server/taba_server_storage_manager.py @@ -254,19 +254,23 @@ def StateDeleteBatch(self, id_tuples): return op - def StateIteratorForClient(self, client_id): + def StateIteratorForClients(self, clients): """Generator which iterates over all State objects for a Client ID. Args: - client_id - Client ID string to retrieve State objects for. + clients - Client IDs string to retrieve State objects for. Returns: Generator that yields values of the form ((client_id, name), state) """ - op = self.TabaNamesForClientGet(client_id) - if not op.success: - raise Exception('Error retrieving list of Taba Names\n%s' % op) - names = op.response_value + + names = [] + + for client_id in clients: + op = self.TabaNamesForClientGet(client_id) + if not op.success: + raise Exception('Error retrieving list of Taba Names\n%s' % op) + names.extend(op.response_value) state_ids = self._IdsIterator([client_id], names) @@ -291,12 +295,12 @@ def StateIteratorForName(self, name): return self._StateIteratorForIdTuples(state_ids) - def StateIteratorForNameList(self, names, client_id=None, by_name=False): + def StateIteratorForNameList(self, names, clients=None, by_name=False): """Generator which iterates over all state objects for a list of Taba Name. Args: names - List of Taba Name string to retrieve State objects for. - client_id - Client ID string to retrieve State objects for. + clients - Client IDs string to retrieve State objects for. by_name - Group generated States by Taba Name (instead of Client ID, which id the default) @@ -304,10 +308,8 @@ def StateIteratorForNameList(self, names, client_id=None, by_name=False): Generator that yields values of the form ((client_id, name), state) """ # Get the set of clients to retrieve. - if client_id: - clients = [client_id] - else: + if not clients: op = self.ClientIdsGet() if not op.success: raise Exception('Error retrieving list of Client IDs\n%s' % op) @@ -529,6 +531,7 @@ def TabaNamesForClientGet(self, client_id): field contains the set of Taba Names for the Client ID. """ key = self._MakeKey(KEY_NAMES, client_id) + op = self._CheckedOp("retrieving Taba Names for Client %s" % client_id, self.engine.SetMembers, key) diff --git a/py/tellapart/taba/util/misc_util.py b/py/tellapart/taba/util/misc_util.py index b6999df..703c818 100644 --- a/py/tellapart/taba/util/misc_util.py +++ b/py/tellapart/taba/util/misc_util.py @@ -29,6 +29,7 @@ import cjson + class Bunch(object): """A simple container for named fields/values that acts like a class. """ @@ -102,6 +103,19 @@ def MakeStreamLogger( return logger + +def isGlob(word): + """Check whether 'str' is a glob expression""" + return containsAny(str, '*?[]!') + +def containsAny(word, charSet): + """Check whether 'str' contains ANY of the chars in 'set'""" + return any([c in word for c in charSet]) + +def containsAll(word, charSet): + """Check whether 'str' contains ALL of the chars in 'set'""" + return all([c in word for c in charSet]) + class HandlerSpec(object): """A HTTP request handler specification. diff --git a/pytest/tellapart/taba/taba_server_storage_manager_test.py b/pytest/tellapart/taba/taba_server_storage_manager_test.py index c4a4346..073f904 100644 --- a/pytest/tellapart/taba/taba_server_storage_manager_test.py +++ b/pytest/tellapart/taba/taba_server_storage_manager_test.py @@ -290,7 +290,7 @@ def testStateIterators(self): for tuple in tuples_put: self.assertTrue(tuple in vals) - vals = [v for v in self.vssm.StateIteratorForClient('c1')] + vals = [v for v in self.vssm.StateIteratorForClients(['c1'])] self.assertEqual(len(vals), 2) for tuple in tuples_put[:2]: self.assertTrue(tuple in vals)