-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdaemon.go
More file actions
569 lines (516 loc) · 17.5 KB
/
daemon.go
File metadata and controls
569 lines (516 loc) · 17.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
package main
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"runtime/debug"
"sync"
"time"
dbs "github.com/CHESSComputing/DataBookkeeping/dbs"
srvConfig "github.com/CHESSComputing/golib/config"
"github.com/CHESSComputing/golib/services"
"github.com/CHESSComputing/golib/utils"
)
// SyncResources represents sync record resources like URLs, dids, etc.
type SyncResources struct {
SourceUrl string
SourceToken string
SourceDids []string
TargetUrl string
TargetToken string
TargetDids []string
Dids []string
}
// FailedRecord represents failed record
type FailedRecord struct {
Did string
Error error
}
// syncDaemon function provide async functionality of
// FOXDEN sync process
func syncDaemon(sleep int) {
// ensure we dump stack of this goroutine when it suddenly dies
defer func() {
log.Printf("syncDaemon goroutine exited\n%s", debug.Stack())
}()
dbname := srvConfig.Config.Sync.MongoDB.DBName
dbcoll := srvConfig.Config.Sync.MongoDB.DBColl
idx := 0
limit := 0
spec := make(map[string]any)
// initially we sleep for couple of seconds to allow other services to wake up
time.Sleep(time.Duration(2 * int(time.Second)))
for {
records := metaDB.Get(dbname, dbcoll, spec, idx, limit)
log.Printf("sync daemon loop found %d records to sync", len(records))
for _, rec := range records {
if Verbose > 1 {
log.Printf("processing %+v", rec)
}
data, err := json.Marshal(rec)
if err != nil {
log.Println("unable to marshall sync record", err)
continue
}
var syncRecord SyncRequest
if err := json.Unmarshal(data, &syncRecord); err == nil {
e := syncWorker(syncRecord)
log.Println("sync job is finished with error:", e)
} else {
log.Println("sync record failure, error:", err)
}
}
// if sleep interval is negative we quite the daemon cycle
if sleep < 0 {
break
}
// otherwise we'll sleep for provided duration
time.Sleep(time.Duration(sleep * int(time.Second)))
}
}
// syncWorker function provide async functionality of
// FOXDEN sync process which is based on the following logic
// - obtain all FOXDEN metadata records from source FOXDEN instance
// - send PUT request to target FOXDEN isntance with all metadata records
// - update sync record with SyncMetadata status code
// - obtain all FOXDEN provenance records from source FOXDEN instance
// - send PUT request to target FOXDEN instance with all provenance records
// - update sync record with SyncProvenance status code
// - perform verification step
// - update sync status code with Completed or Aborted
func syncWorker(syncRecord SyncRequest) error {
// extract sync record uuid from sync record
suuid := syncRecord.UUID
// check if we requested to continue with this record
if syncRecord.StatusCode == Completed {
if syncRecord.Continuous {
if Verbose > 0 {
log.Printf("INFO: sync process will be continued for sync record UUID=%s", suuid)
}
} else {
log.Printf("INFO: sync process is completed and not required to be continued for sync record UUID=%s", suuid)
return nil
}
}
// get sync resources
syncResources, err := getResources(syncRecord)
if err != nil {
msg := fmt.Sprintf("Fail to sync all records, error %s", err)
if e := updateSyncRecordStatus(suuid, msg, Failed); e != nil {
return fmt.Errorf("[SyncService.main.syncWorker] updateSyncRecordStatus error: %w", e)
}
return fmt.Errorf("[SyncService.main.syncWorker] getResources error: %w", err)
}
// update metadata records
if err := updateSyncRecordStatus(suuid, "in progress", InProgress); err != nil {
return fmt.Errorf("[SyncService.main.syncWorker] updateSyncRecordStatus error: %w", err)
}
failedRecords := updateMetadataRecords(syncResources)
if len(failedRecords) != 0 {
msg := fmt.Sprintf("Failed to sync all metadata records, # of failed records %d", len(failedRecords))
log.Printf("WARINIG: %s, failedRecords=%+v", msg, failedRecords)
if err := updateSyncRecordStatus(suuid, msg, Failed); err != nil {
return fmt.Errorf("[SyncService.main.syncWorker] updateSyncRecordStatus error: %w", err)
}
return errors.New(msg)
}
if err := updateSyncRecordStatus(suuid, "metadata records are synched", SyncMetadata); err != nil {
return fmt.Errorf("[SyncService.main.syncWorker] updateSyncRecordStatus error: %w", err)
}
// update provenance records
if err := updateSyncRecordStatus(suuid, "in progress", InProgress); err != nil {
return fmt.Errorf("[SyncService.main.syncWorker] updateSyncRecordStatus error: %w", err)
}
failedRecords = updateProvenanceRecords(syncResources)
if len(failedRecords) != 0 {
msg := fmt.Sprintf("Failed to sync all provenance records, # of failed records %d", len(failedRecords))
log.Printf("WARINIG: %s, failedRecords=%+v", msg, failedRecords)
if err := updateSyncRecordStatus(suuid, msg, Failed); err != nil {
return fmt.Errorf("[SyncService.main.syncWorker] updateSyncRecordStatus error: %w", err)
}
return errors.New(msg)
}
if err := updateSyncRecordStatus(suuid, "provenance records are synched", SyncProvenance); err != nil {
return fmt.Errorf("[SyncService.main.syncWorker] updateSyncRecordStatus error: %w", err)
}
// final update
if err := updateSyncRecordStatus(suuid, "sync is completed", Completed); err != nil {
return fmt.Errorf("[SyncService.main.syncWorker] updateSyncRecordStatus error: %w", err)
}
return nil
}
// helper function to update sync record status
func updateSyncRecordStatus(suuid, status string, statusCode int) error {
dbname := srvConfig.Config.Sync.MongoDB.DBName
dbcoll := srvConfig.Config.Sync.MongoDB.DBColl
spec := make(map[string]any)
spec["uuid"] = suuid
rec := make(map[string]any)
newRecord := make(map[string]any)
// update sync record
rec["status_code"] = statusCode
rec["status"] = status
rec["updated_at"] = time.Now().Format(time.RFC3339)
newRecord["$set"] = rec
if err := metaDB.Update(dbname, dbcoll, spec, newRecord); err != nil {
return fmt.Errorf("[SyncService.main.updateSyncRecordStatus] metaDB.Update error: %w", err)
}
printSyncRecordStatus(suuid)
return nil
}
// helper function to print record status
func printSyncRecordStatus(suuid string) {
dbname := srvConfig.Config.Sync.MongoDB.DBName
dbcoll := srvConfig.Config.Sync.MongoDB.DBColl
spec := make(map[string]any)
spec["uuid"] = suuid
records := metaDB.Get(dbname, dbcoll, spec, 0, 1)
for _, rec := range records {
if status, ok := rec["status"]; ok {
log.Printf("INFO: sync record %s status %s", suuid, status)
}
}
}
// helper function to update provenance records
func updateProvenanceRecords(syncResources SyncResources) []FailedRecord {
failedRecords := updateRecords("provenance", syncResources)
for _, r := range failedRecords {
log.Printf("ERROR: failed provenance record %+v", r)
}
return failedRecords
}
// helper function to update metadata records
func updateMetadataRecords(syncResources SyncResources) []FailedRecord {
failedRecords := updateRecords("metadata", syncResources)
for _, r := range failedRecords {
log.Printf("ERROR: failed metadata record %+v", r)
}
return failedRecords
}
// helper function to get dids for given FOXDEN url and access token
func getDIDs(rurl, token string) ([]string, error) {
var dids []string
var err error
didRecords, err := getRecords(rurl, token)
if err != nil {
log.Printf("ERROR: unable to get records from url=%s token=%s error=%v", rurl, token, err)
return dids, fmt.Errorf("[SyncService.main.getDIDs] getRecords error: %w", err)
}
for _, rec := range didRecords {
did := fmt.Sprintf("%s", rec["did"])
dids = append(dids, did)
}
return dids, nil
}
// helper function to obtain all resources we need to process sync request
func getResources(syncRecord SyncRequest) (SyncResources, error) {
sourceUrl := fmt.Sprintf("%s/dids", syncRecord.SourceURL)
targetUrl := fmt.Sprintf("%s/dids", syncRecord.TargetURL)
sourceToken := syncRecord.SourceToken
targetToken := syncRecord.TargetToken
// obtain all FOXDEN records from source FOXDEN instance
sourceDIDs, err := getDIDs(sourceUrl, sourceToken)
if err != nil {
return SyncResources{}, fmt.Errorf("[SyncService.main.getResources] getDIDs error: %w", err)
}
targetDIDs, err := getDIDs(targetUrl, targetToken)
if err != nil {
return SyncResources{}, fmt.Errorf("[SyncService.main.getResources] getDIDs error: %w", err)
}
// construct unique list of dids to fetch from source FOXDEN instance
var dids []string
for _, did := range sourceDIDs {
if !utils.InList[string](did, targetDIDs) {
dids = append(dids, did)
}
}
if Verbose > 0 {
log.Printf("Source dids=%d, target dids=%d, sync dids=%d", len(sourceDIDs), len(targetDIDs), len(dids))
}
return SyncResources{
SourceUrl: syncRecord.SourceURL,
SourceToken: sourceToken,
SourceDids: sourceDIDs,
TargetUrl: syncRecord.TargetURL,
TargetToken: targetToken,
TargetDids: targetDIDs,
Dids: dids,
}, nil
}
// helper function to update records in FOXDEN
func updateRecords(srv string, syncResources SyncResources) []FailedRecord {
var records []map[string]any
// now we'll get either metadata or provenance records
if srvConfig.Config.Sync.NWorkers != 0 {
nWorkers := srvConfig.Config.Sync.NWorkers
records = getDidRecordsConcurrent(srv, syncResources, nWorkers)
} else {
records = getDidRecords(srv, syncResources)
}
if Verbose > 0 {
log.Printf("Fetched %d records from source FOXDEN instance %s", len(records), syncResources.SourceUrl)
}
// push records to target FOXDEN instance
var failedRecords []FailedRecord
if srvConfig.Config.Sync.NWorkers != 0 {
nWorkers := srvConfig.Config.Sync.NWorkers
failedRecords = pushRecordsConcurrent(srv, syncResources, records, nWorkers)
} else {
failedRecords = pushRecords(srv, syncResources, records)
}
return failedRecords
}
// helper function to get proper FOXDEN url for given service, url and did
func getFoxdenUrl(srv, surl, did string) string {
rurl := fmt.Sprintf("%s/record?did=%s", surl, did)
if srv == "provenance" {
rurl = fmt.Sprintf("%s/provenance?did=%s", surl, did)
}
return rurl
}
// helper function to get records for given set of urls
func getDidRecords(srv string, syncResources SyncResources) []map[string]any {
var records []map[string]any
// fetch records sequentially
for _, did := range syncResources.SourceDids {
rurl := getFoxdenUrl(srv, syncResources.SourceUrl, did)
recs, err := getRecords(rurl, syncResources.SourceToken)
if err == nil {
for _, r := range recs {
val, ok := r["did"]
if ok && fmt.Sprintf("%s", val) != "" {
records = append(records, r)
}
}
//records = append(records, recs...)
} else {
log.Println("ERROR: unable to get records for url", rurl, err)
}
}
return records
}
// concurrent version of getDidRecords with worker pool
func getDidRecordsConcurrent(srv string, syncResources SyncResources, maxWorkers int) []map[string]any {
var (
records []map[string]any
mu sync.Mutex
wg sync.WaitGroup
sem = make(chan struct{}, maxWorkers) // semaphore channel
)
for _, did := range syncResources.Dids {
rurl := getFoxdenUrl(srv, syncResources.SourceUrl, did)
wg.Add(1)
// acquire slot
sem <- struct{}{}
go func(u string) {
defer wg.Done()
defer func() { <-sem }() // release slot
recs, err := getRecords(u, syncResources.SourceToken)
if err != nil {
log.Println("ERROR: unable to get records for url", u, err)
return
}
// safely append results
mu.Lock()
records = append(records, recs...)
mu.Unlock()
}(rurl)
}
// wait for all workers
wg.Wait()
return records
}
// helper function to get records from given FOXDEN instance
func getRecords(rurl, token string) ([]map[string]any, error) {
log.Printf("INFO: get records for %s", rurl)
var records []map[string]any
_httpReadRequest.SetToken(token)
resp, err := _httpReadRequest.Get(rurl)
if err != nil {
return records, fmt.Errorf("[SyncService.main.getRecords] _httpReadRequest.Get error: %w", err)
}
defer resp.Body.Close()
data, err := io.ReadAll(resp.Body)
if err != nil {
return records, fmt.Errorf("[SyncService.main.getRecords] io.ReadAll error: %w", err)
}
err = json.Unmarshal(data, &records)
if err != nil {
// try out service response record
var srec services.ServiceResponse
if err = json.Unmarshal(data, &srec); err == nil {
return srec.Results.Records, nil
}
msg := "unable to fetch records from upstream server"
log.Printf("ERROR: %s, error=%v\n", msg, err)
return records, errors.New(msg)
}
return records, nil
}
// MetaRecord defines structure of metadata record
type MetaRecord struct {
Schema string
Record map[string]any
}
// helper function to get records from given FOXDEN instance
func pushRecord(syncResources SyncResources, srv, rurl string, rec map[string]any) error {
token := syncResources.TargetToken
turl := syncResources.TargetUrl
dids := syncResources.TargetDids
var did string
if val, ok := rec["did"]; ok {
did = fmt.Sprintf("%s", val)
log.Printf("INFO: push record did=%s to %s", did, rurl)
} else {
log.Printf("INFO: push record to %s", rurl)
}
// first, check if this did already present in our local instance
if utils.InList(did, dids) {
log.Printf("INFO: record with did=%s already exist in source FOXDEN instance %s", did, turl)
return nil
}
var data []byte
var err error
if srv == "provenance" {
// check if provenance record is empty, if it is the case we simply return
if provenanceEmpty(rec) {
log.Printf("WARNING: provenance record %+v is empty", rec)
return nil
}
// we submit provenance record
data, err = json.Marshal(rec)
log.Printf("### provenance record=%+v", rec)
} else {
// we submit MetaRecord
mrec := MetaRecord{Record: rec}
if val, ok := rec["schema"]; ok {
mrec.Schema = fmt.Sprintf("%s", val)
} else {
msg := fmt.Sprintf("non complaint FOXDEN metadata record %+v", rec)
return errors.New(msg)
}
data, err = json.Marshal(mrec)
}
if Verbose > 1 {
log.Printf("push record %+v to %s\n", rec, rurl)
}
if err != nil {
return fmt.Errorf("[SyncService.main.pushRecord] json.Marshal error: %w", err)
}
_httpWriteRequest.SetToken(token)
resp, err := _httpWriteRequest.Post(rurl, "application/json", bytes.NewBuffer(data))
if err != nil {
log.Printf("ERROR: unable to push record to target FOXDEN URL %s, error %v", rurl, err)
return fmt.Errorf("[SyncService.main.pushRecord] _httpWriteRequest.Post error: %w", err)
}
defer resp.Body.Close()
data, err = io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("[SyncService.main.pushRecord] io.ReadAll error: %w", err)
}
if resp.StatusCode != 200 {
if Verbose > 1 {
log.Printf("ERROR: reponse from target FOXDEN %+v", resp.Status)
}
return errors.New(resp.Status)
}
return nil
}
// helper function to push FOXDEN records to upstream target url
func pushRecords(srv string, syncResources SyncResources, records []map[string]any) []FailedRecord {
// output error error records
var failedRecords []FailedRecord
// Frontend /record end-point for metadata injectio
rurl := fmt.Sprintf("%s/record", syncResources.TargetUrl)
if srv == "provenance" {
// Frontend /provenance end-point for provenance injection
rurl = fmt.Sprintf("%s/provenance", syncResources.TargetUrl)
}
if Verbose > 0 {
log.Printf("Push %d records to target FOXDEN instance %s", len(records), rurl)
}
for _, rec := range records {
err := pushRecord(syncResources, srv, rurl, rec)
if err != nil {
var did string
if val, ok := rec["did"]; ok {
did = fmt.Sprintf("%s", val)
}
frec := FailedRecord{Did: did, Error: err}
failedRecords = append(failedRecords, frec)
}
}
return failedRecords
}
// pushRecordsConcurrent pushes records concurrently using a worker pool.
// nWorkers controls concurrency (e.g., 4).
func pushRecordsConcurrent(srv string, syncResources SyncResources, records []map[string]any, nWorkers int) []FailedRecord {
var failedRecords []FailedRecord
rurl := fmt.Sprintf("%s/record", syncResources.TargetUrl)
if srv == "provenance" {
rurl = fmt.Sprintf("%s/provenance", syncResources.TargetUrl)
}
if Verbose > 0 {
log.Printf("Push %d records to target FOXDEN instance %s with %d workers",
len(records), rurl, nWorkers)
}
// job and error channels
jobs := make(chan map[string]any)
var wg sync.WaitGroup
// start worker pool
for i := 0; i < nWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for rec := range jobs {
if err := pushRecord(syncResources, srv, rurl, rec); err != nil {
log.Printf("WARNING: unable to push record to %s, error=%v", rurl, err)
var did string
if val, ok := rec["did"]; ok {
did = fmt.Sprintf("%s", val)
frec := FailedRecord{Did: did, Error: err}
failedRecords = append(failedRecords, frec)
} else {
log.Printf("WARNING: record %+v does not have did", rec)
}
}
}
}()
}
// send jobs
go func() {
for _, rec := range records {
jobs <- rec
}
close(jobs)
}()
// wait for all workers to finish
wg.Wait()
return failedRecords
}
// helper function to check if provenance record is empty
func provenanceEmpty(rec map[string]any) bool {
if provRecord, err := mapToProvenance(rec); err == nil {
return provRecord.IsEmpty()
} else {
log.Println("ERROR: unable to convert map to provenance record, error", err)
}
return false
}
// helper function to map generic map to provenance record
func mapToProvenance(rec map[string]any) (dbs.ProvenanceRecord, error) {
var pr dbs.ProvenanceRecord
// Marshal the map back into JSON
data, err := json.Marshal(rec)
if err != nil {
return pr, fmt.Errorf("marshal error: %w", err)
}
// Unmarshal JSON into the struct
if err := json.Unmarshal(data, &pr); err != nil {
return pr, fmt.Errorf("unmarshal error: %w", err)
}
return pr, nil
}