blob_data_sas_url = "<YOUR_BLOB_SAS_URL_FOR_ZIP_FILE>"
start_time = datetime(2021, 1, 1, 0, 0, 0)
end_time = datetime(2021, 1, 2, 12, 0, 0)
detection_req = DetectionRequest(source = blob_data_sas_url, start_time = start_time, end_time = end_time)
response_header = anomaly_detector_client.detect_anomaly(multi_variate_trained_model_id,
detection_req,
cls = lambda *args: [args[i] for i in range(len(args))]
)[-1]
result_id = response_header['Location'].split("/")[-1]
results = anomaly_detector_client.get_detection_result(result_id)
while results.summary.status != DetectionStatus.READY and results.summary.status != DetectionStatus.FAILED:
results = anomaly_detector_client.get_detection_result(result_id)
time.sleep(1)
if results.summary.status == DetectionStatus.FAILED:
if results.summary.errors:
for error in results.summary.errors:
print("The Error code: {}. ".format(error.code))
print("The Error Message: {}".format(error.message))
anomaly_detect_results = []
for res in results.results:
anomaly = {}
if res.value is not None:
value = res.value
is_anomaly = value.is_anomaly
if is_anomaly is True:
anomaly["timestamp"] = res.timestamp.strftime("%m/%d/%Y, %H:%M:%S")
anomaly["variable_contributing"] = []
anomaly["severity"] = value.severity
for contributing_factor in value.contributors:
anomaly["variable_contributing"].append(contributing_factor.variable)
anomaly_detect_results.append(anomaly)
print("Total number of anomalies identified: ", len(anomaly_detect_results))
print("Anomaly Identified:")
for anomaly in anomaly_detect_results:
print(anomaly)