Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
217 changes: 191 additions & 26 deletions Sources/Skyflow/collect/CollectAPICallback.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,120 @@ internal class CollectAPICallback: Callback {
self.options = options
self.contextOptions = contextOptions
}

internal func onSuccess(_ responseBody: Any) {
guard let url = URL(string: self.apiClient.vaultURL + self.apiClient.vaultID) else {
self.callback.onFailure(ErrorCodes.INVALID_URL().getErrorObject(contextOptions: self.contextOptions))
let insertRecords = records["records"] as? [[String: Any]]
let hasInsert = insertRecords?.isEmpty == false
let updateRecords = records["update"] as? [String: Any]
let hasUpdate = updateRecords?.isEmpty == false
let group = DispatchGroup()
var insertResponse: [String: Any]? = nil
var updateResponses: [[String: Any]] = []
var requestError: [Any]? = []
var requestUpdateError: [Any]? = []

let callbackQueue = DispatchQueue.main

if !hasInsert && !hasUpdate {
self.callback.onSuccess([:])
return
}

do {
let (request, session) = try self.getRequestSession(url: url)


let task = session.dataTask(with: request) { data, response, error in

if hasInsert {
group.enter()
guard let url = URL(string: self.apiClient.vaultURL + self.apiClient.vaultID) else {
self.callback.onFailure(ErrorCodes.INVALID_URL().getErrorObject(contextOptions: self.contextOptions))
return
}
do {
let (request, session) = try self.getRequestSession(url: url)
let task = session.dataTask(with: request) { data, response, error in
defer {
group.leave()
}
do {
let response = try self.processResponse(data: data, response: response, error: error)
if response["error"] != nil {
requestError?.append(response)
} else {
insertResponse = response
}
} catch {
requestError?.append(error)
}
}
task.resume()
} catch let error {
requestError?.append(error)
}
}
if hasUpdate, let updateArray = records["update"] as? [String: [String: Any]] {
for (_, updateRecord) in updateArray {
group.enter()
guard let table = updateRecord["table"] as? String, let skyflowID = updateRecord["skyflowID"] as? String else {
group.leave()
continue
}
let urlString = self.apiClient.vaultURL + self.apiClient.vaultID + "/" + table + "/" + skyflowID
guard let url = URL(string: urlString) else {
group.leave()
requestError?.append(ErrorCodes.INVALID_URL().getErrorObject(contextOptions: self.contextOptions))
continue
}
do {
let response = try self.processResponse(data: data, response: response, error: error)
self.callback.onSuccess(response)
} catch {
self.callback.onFailure(error)
var singleUpdateRecords: [String: Any] = [:]
singleUpdateRecords["fields"] = updateRecord["fields"]
singleUpdateRecords["table"] = table
singleUpdateRecords["skyflowID"] = skyflowID
let (request, session) = try self.getRequestSessionForUpdate(url: url, updateRecords: singleUpdateRecords)
let task = session.dataTask(with: request) { data, response, error in
defer {
group.leave() }
do {
let response = try self.processUpdateResponse(data: data, response: response, error: error, table: table)
if response["error"] != nil {
requestUpdateError?.append(response)
} else {
updateResponses.append(response)
}
} catch {
requestUpdateError?.append(error)
}
}
task.resume()
} catch let error {
group.leave()
requestUpdateError?.append(error)
continue
}
}
task.resume()
} catch let error {
self.callback.onFailure(error)
return
}

group.notify(queue: callbackQueue) {
var mergedRecords: [Any] = []
var mergedErrors: [Any] = []
if let insert = insertResponse?["records"] as? [Any] {
mergedRecords.append(contentsOf: insert)
}
for updateResp in updateResponses {
if let update = updateResp["records"] as? [Any] {
mergedRecords.append(contentsOf: update)
}
}
if requestUpdateError != nil {
mergedErrors.append(contentsOf: requestUpdateError!)
}
if requestError != nil {
mergedErrors.append(contentsOf: requestError!)
}
if mergedRecords.isEmpty {
self.callback.onFailure(["errors": mergedErrors])
} else if requestError?.isEmpty == true {
self.callback.onSuccess(["records": mergedRecords])
}
if !mergedErrors.isEmpty && !mergedRecords.isEmpty {
self.callback.onFailure(["records": mergedRecords, "errors": mergedErrors])
}
}
}

Expand Down Expand Up @@ -86,31 +178,104 @@ internal class CollectAPICallback: Callback {

}

// Helper for single update request
private func getRequestSessionForUpdate(url: URL, updateRecords: [String: Any]) throws -> (URLRequest, URLSession) {
var jsonString = ""
do {
let deviceDetails = FetchMetrices().getMetrices()
let jsonData = try JSONSerialization.data(withJSONObject: deviceDetails, options: [])
jsonString = String(data: jsonData, encoding: .utf8) ?? ""
} catch {
jsonString = ""
}
var request = URLRequest(url: url)
request.httpMethod = "PUT"
do {
let data = try JSONSerialization.data(withJSONObject: self.apiClient.constructUpdateRequestBody(records: updateRecords, options: options))
request.httpBody = data
}
request.setValue(("Bearer " + self.apiClient.token), forHTTPHeaderField: "Authorization")
request.setValue(jsonString, forHTTPHeaderField: "sky-metadata")
return (request, URLSession(configuration: .default))
}

func processUpdateResponse(data: Data?, response: URLResponse?, error: Error?, table: String) throws -> [String: Any] {
if error != nil || response == nil {
return ["error": ["message": (error)?.localizedDescription ?? "Unknown error"]]
}
if let httpResponse = response as? HTTPURLResponse {
let range = 400...599
if range ~= httpResponse.statusCode {
var description = "Update call failed with the following status code" + String(httpResponse.statusCode)
if let safeData = data {
do {
let desc = try JSONSerialization.jsonObject(with: safeData, options: .allowFragments) as! [String: Any]
let error = desc["error"] as? [String: Any]
if let error = error, let message = error["message"] as? String {
description = message
}
if let requestId = httpResponse.allHeaderFields["x-request-id"] {
description += " - request-id: \(requestId)"
}
} catch {
return ["error": ["message": String(data: safeData, encoding: .utf8) ?? "Unknown error", "code": httpResponse.statusCode]]
}
}
return ["error": ["message": description, "code": httpResponse.statusCode]]
}
}
guard let safeData = data else {
return ["records": []]
}
let jsonData = try JSONSerialization.jsonObject(with: safeData, options: .allowFragments) as! [String: Any]
var record: [String: Any] = [:]
var id = ""
if let skyflowId = jsonData["skyflow_id"] as? String{
id = skyflowId
} else {
id = String(describing: jsonData["skyflow_id"] ?? "")
}

if self.options.tokens {
let fieldsDict = jsonData["tokens"] as? [String: Any]
if fieldsDict != nil {
let fieldsData = try JSONSerialization.data(withJSONObject: fieldsDict!)
let fieldsObj = try JSONSerialization.jsonObject(with: fieldsData, options: .allowFragments)
var fieldsSkyflowId: [String: Any] = self.buildFieldsDict(dict: fieldsObj as? [String: Any] ?? [:])
fieldsSkyflowId["skyflow_id"] = id
record["fields"] = fieldsSkyflowId
}
} else {
record["skyflow_id"] = id
}
record["table"] = table
return ["records": [record]]
}

func processResponse(data: Data?, response: URLResponse?, error: Error?) throws -> [String: Any] {
if error != nil || response == nil {
throw error!
return ["error": ["message": (error)?.localizedDescription ?? "Unknown error"]]
}

if let httpResponse = response as? HTTPURLResponse {
let range = 400...599
if range ~= httpResponse.statusCode {
var description = "Insert call failed with the following status code" + String(httpResponse.statusCode)
var errorObject: Error = ErrorCodes.APIError(code: httpResponse.statusCode, message: description).getErrorObject(contextOptions: self.contextOptions)

var description = "Insert call failed with the following status code " + String(httpResponse.statusCode)
if let safeData = data {
do {
let desc = try JSONSerialization.jsonObject(with: safeData, options: .allowFragments) as! [String: Any]
let error = desc["error"] as! [String: Any]
description = error["message"] as! String
let errorResponse = try JSONSerialization.jsonObject(with: safeData, options: .allowFragments) as! [String: Any]
if let errorDetails = errorResponse["error"] as? [String: Any],
let message = errorDetails["message"] as? String {
description = message
}
if let requestId = httpResponse.allHeaderFields["x-request-id"] {
description += " - request-id: \(requestId)"
}
errorObject = ErrorCodes.APIError(code: httpResponse.statusCode, message: description).getErrorObject(contextOptions: self.contextOptions)
} catch {
errorObject = ErrorCodes.APIError(code: httpResponse.statusCode, message: String(data: safeData, encoding: .utf8)!).getErrorObject(contextOptions: self.contextOptions)
return ["error": ["message": String(data: safeData, encoding: .utf8) ?? "Unknown error", "code": httpResponse.statusCode]]
}
}
throw errorObject
return ["error": ["message": description, "code": httpResponse.statusCode]]
}
}

Expand Down
Loading
Loading