Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions adsrefpipe/tests/unittests/test_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import io
import os
import sys
import unittest
from contextlib import redirect_stderr
from datetime import datetime
from unittest.mock import patch

project_home = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../'))
if project_home not in sys.path:
sys.path.insert(0, project_home)

import run


class TestRunResolveTimeDelay(unittest.TestCase):

def test_resolve_uses_config_default_time_delay(self):
subdir = ['/tmp/input/A/file1.raw', '/tmp/input/A/file2.raw']

with patch.object(run, 'get_date', return_value=datetime(2024, 1, 1)), \
patch.object(run, 'get_source_filenames', return_value=[subdir]), \
patch.object(run, 'process_files') as mock_process_files, \
patch.object(run.time, 'sleep') as mock_sleep, \
patch.object(run.logger, 'info'), \
patch.object(run.processed_log, 'info'):
result = run.main(['RESOLVE', '-p', '/tmp/input', '-e', '*.raw'])

self.assertEqual(result, 0)
mock_process_files.assert_called_once_with(subdir)
mock_sleep.assert_called_once_with(len(subdir) / run.config['REFERENCE_PIPELINE_DEFAULT_TIME_DELAY'])

def test_resolve_explicit_time_delay_overrides_config_default(self):
subdir = [
'/tmp/input/A/file1.raw',
'/tmp/input/A/file2.raw',
'/tmp/input/A/file3.raw',
'/tmp/input/A/file4.raw',
]

with patch.object(run, 'get_date', return_value=datetime(2024, 1, 1)), \
patch.object(run, 'get_source_filenames', return_value=[subdir]), \
patch.object(run, 'process_files') as mock_process_files, \
patch.object(run.time, 'sleep') as mock_sleep, \
patch.object(run.logger, 'info'), \
patch.object(run.processed_log, 'info'):
result = run.main(['RESOLVE', '-p', '/tmp/input', '-e', '*.raw', '-t', '2'])

self.assertEqual(result, 0)
mock_process_files.assert_called_once_with(subdir)
mock_sleep.assert_called_once_with(2.0)

def test_resolve_rejects_zero_time_delay(self):
stderr = io.StringIO()

with redirect_stderr(stderr), self.assertRaises(SystemExit) as exc:
run.main(['RESOLVE', '-p', '/tmp/input', '-e', '*.raw', '-t', '0'])

self.assertEqual(exc.exception.code, 2)
self.assertIn('time_delay must be greater than 0.', stderr.getvalue())

def test_resolve_rejects_negative_time_delay(self):
stderr = io.StringIO()

with redirect_stderr(stderr), self.assertRaises(SystemExit) as exc:
run.main(['RESOLVE', '-p', '/tmp/input', '-e', '*.raw', '-t', '-5'])

self.assertEqual(exc.exception.code, 2)
self.assertIn('time_delay must be greater than 0.', stderr.getvalue())


if __name__ == '__main__':
unittest.main()
5 changes: 4 additions & 1 deletion config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
# checking queues every this many seconds
QUEUE_AUDIT_INTERVAL = 10

# default delay rate divisor used for RESOLVE batch pauses
REFERENCE_PIPELINE_DEFAULT_TIME_DELAY = 1000.0

# true if to compare the resolved records with classic
COMPARE_CLASSIC = True

Expand All @@ -39,4 +42,4 @@
MAX_QUEUE_RETRIES = 3

# indication that this is considered an incomplete reference
INCOMPLETE_REFERENCE = ' --- Incomplete'
INCOMPLETE_REFERENCE = ' --- Incomplete'
93 changes: 84 additions & 9 deletions run.py
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
import os, fnmatch
from collections import defaultdict

from adsputils import setup_logging, load_config, get_date
from datetime import timedelta
Expand All @@ -16,6 +17,20 @@

app = tasks.app
logger = setup_logging('run.py')
processed_log = setup_logging('processed_subdirectories.py')


def positive_float(value: str) -> float:
"""
argparse type for positive floating point values.

:param value: CLI argument value to validate
:return: validated float value
"""
parsed_value = float(value)
if parsed_value <= 0:
raise argparse.ArgumentTypeError('time_delay must be greater than 0.')
return parsed_value


def run_diagnostics(bibcodes: list, source_filenames: list) -> None:
Expand All @@ -40,21 +55,38 @@ def run_diagnostics(bibcodes: list, source_filenames: list) -> None:

def get_source_filenames(source_file_path: str, file_extension: str, date_cutoff: time.struct_time) -> list:
"""
retrieves a list of files from the given directory with the specified file extension and modified date after the cutoff
Return a list of lists of matching files, grouped by the first-level
subdirectory under `source_file_path`. If files live directly in
`source_file_path`, they are grouped together as one inner list.

:param source_file_path: the path of the directory to search for files
:param file_extension: the file extension pattern to match
:param date_cutoff: the modified date cutoff, files modified after this date will be included only
:return: list of files in the directory with modified date after the cutoff, if any
:return: list of lists of files in the directory with modified date after the cutoff, if any
"""
list_files = []
groups = defaultdict(list)
ROOT = "__ROOT__"

for root, dirs, files in os.walk(source_file_path):
for basename in files:
if fnmatch.fnmatch(basename, file_extension):
filename = os.path.join(root, basename)
if get_date_modified_struct_time(filename) >= date_cutoff:
list_files.append(filename)
return list_files
rel_dir = os.path.relpath(root, source_file_path)
key = ROOT if rel_dir in (".", "") else rel_dir.split(os.sep, 1)[0]
groups[key].append(filename)

if not groups:
return []

# Build a stable list-of-lists: root group first (if present), then subdirs sorted
result = []
if ROOT in groups:
result.append(sorted(groups[ROOT]))
for key in sorted(k for k in groups.keys() if k != ROOT):
result.append(sorted(groups[key]))
return result



def queue_references(references: list, source_filename: str, source_bibcode: str, parsername: str) -> None:
Expand Down Expand Up @@ -199,7 +231,7 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib
logger.error("Unable to process %s. Skipped!" % toREFs.filename)


if __name__ == '__main__':
def main(argv=None) -> int:

parser = argparse.ArgumentParser(description='Process user input.')

Expand Down Expand Up @@ -276,6 +308,20 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib
dest='fail',
action='store_true',
help='Reprocess records that failed to get resolved')
resolve.add_argument('-t',
'--time_delay',
dest='time_delay',
action='store',
type=positive_float,
default=config['REFERENCE_PIPELINE_DEFAULT_TIME_DELAY'],
help='Add time delay between processing subdirectories for large batches. The delay time is batch size divided by input value in seconds. Defaults to REFERENCE_PIPELINE_DEFAULT_TIME_DELAY from config.')
resolve.add_argument('-sp',
'--skip_processed_directories',
dest='skip_processed',
action='store',
default=None,
help='Skip directories that have been previously processed')


stats = subparsers.add_parser('STATS', help='Print out statistics of the reference source file')
stats.add_argument('-b',
Expand Down Expand Up @@ -315,7 +361,8 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib
action='store_true',
help='Return all resolved bibcode')

args = parser.parse_args()
args = parser.parse_args(argv)
#import pdb;pdb.set_trace()

if args.action == 'DIAGNOSTICS':
if args.parse_filename:
Expand Down Expand Up @@ -345,8 +392,32 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib
else:
date_cutoff = get_date('1972')
source_filenames = get_source_filenames(args.path, args.extension, date_cutoff.timetuple())
delay_rate = args.time_delay
skip_files = []
if len(source_filenames) > 0:
process_files(source_filenames)
for subdir in source_filenames:
subdir_name = subdir[0].split('/')
subdir_name = "/".join(subdir_name[:-1])
delay_time = float(len(subdir)) / delay_rate
if args.skip_processed:
skip_file = args.skip_processed
try:
with open(skip_file,'r') as file:
skip_files = file.read().splitlines()
print(f'Skipping {len(skip_files)} subdirectories')
except:
skip_files = []
print('No files to skip')
if subdir_name not in skip_files:
process_files(subdir)
processed_log.info(f"{subdir_name}")
logger.info(f"Processed subdirectoy: {subdir_name}")
print(f"Processed subdirectoy: {subdir_name}")
logger.info(f"Pause for {delay_time} seconds to process")
print(f"Pause for {delay_time} seconds to process")
time.sleep(delay_time)
else:
print(f'Skipping {subdir_name}')
elif args.confidence:
date_cutoff = get_date() - timedelta(days=int(args.days)) if args.days else None
reprocess_references(ReprocessQueryType.score, score_cutoff=float(args.confidence), date_cutoff=date_cutoff)
Expand Down Expand Up @@ -391,4 +462,8 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib
# if args.all:
# else:

sys.exit(0)
return 0


if __name__ == '__main__':
sys.exit(main())
Loading