.env <- new.env()

.env$breakoutDetection<-function(query,broker){
  
  tryCatch({  
    
    options(RCurlOptions=list(verbose=T,ssl.verifypeer=FALSE,   httpheader = c("Content-Type"="application/json"),writeheader=T))
    
    suppressMessages(library(RJSONIO))
    suppressMessages(library(RDruid))
    suppressMessages(library(BreakoutDetection))
    
    #Read query and select intervals
    
    jsonQuery<-fromJSON(query)
    intervals<-jsonQuery$intervals
    t0<-strsplit(intervals,"/")[[1]][1]
    tf<-strsplit(intervals,"/")[[1]][2]
    
    data<-querytoDruid(query,broker)
    
    try(data<-RDruid:::druid.resulttodf(data))
    if(is.null(fromJSON(query)$postAggregations)){
      selection<-as.character(fromJSON(query)$aggregations['name'])
    }else{
      selection<-as.character(fromJSON(query)$postAggregations['name'])
    } 
    data<-data.frame(data[,1],data[,selection])
    
    colnames(data)<-c("timestamp","count")
    anom2 = breakout(data, min.size=24, method='multi', beta=.001, degree=1, plot=TRUE)
    
    #Select data at the interval t0-tf
    if(length(anom2$loc) > 0){
      
    dataSelected<-data.frame(anom2$loc,"yes")
    
    
    colnames (dataSelected)<- c("timestamp","breakout")
    clnms <- colnames(dataSelected)
    name<-colnames(dataSelected)[2]
    
    name.value <- function(i){
      quote <- '';
      
      if(i =='timestamp'){
        quote <- '"';
      }
      if(i =='timestamp'){
        paste('"', i, '" : ', quote, paste(toISO(as.numeric(dataSelected[,i]))), quote, sep='')
      }
      else{
        if(i =='events'){
          paste('"result":{"', name, '" : ', quote, dataSelected[,i], quote, sep='')
        }
        else{
          paste('"', i, '" : ', quote, dataSelected[,i], quote, sep='') 
        }
      }
    }
    
    objs <- apply(sapply(clnms, name.value), 1, function(x){paste(x, collapse=', ')})
    
    objs <- paste('{', objs, '}}')
    
    res <- paste('[', paste(objs, collapse=', '), ']')
    
    
    
  }else{res="[]"}
    
    
  },error=function(err){cat("\n Error generating outlier detection \n")}
  ,finally= return(res) )
  
}

.env$outlierDetection<-function(query,broker){
  
  tryCatch({  
    
    options(RCurlOptions=list(verbose=T,ssl.verifypeer=FALSE,   httpheader = c("Content-Type"="application/json"),writeheader=T))
    
    suppressMessages(library(RJSONIO))
    suppressMessages(library(RDruid))
    suppressMessages(library(AnomalyDetection))
    
    #Read query and select intervals
    
    jsonQuery<-fromJSON(query)
    intervals<-jsonQuery$intervals
    t0<-strsplit(intervals,"/")[[1]][1]
    tf<-strsplit(intervals,"/")[[1]][2]
    #Query 3 weeks
    newt0<-toISO(as.integer(fromISO(t0))-1814400)
    #Parse query
    query2<-gsub(t0,newt0,query,fixed=TRUE)
    
    data<-querytoDruid(query2,broker)
    
    try(data<-RDruid:::druid.resulttodf(data))
    if(is.null(fromJSON(query)$postAggregations)){
      selection<-as.character(fromJSON(query)$aggregations['name'])
    }else{
      selection<-as.character(fromJSON(query)$postAggregations['name'])
    } 
    
    data<-data.frame(data[,1],data[,selection])

#     #Select outliers
#     a <- which(data[,selection] %in% boxplot.stats(data[,selection])$out)
#     #Outliers selected
#     dataSelected<-data[a,]
#     
#     stats<-boxplot.stats(data[,selection])$stats
#     
#     #Outliers from all data
#     outliers.as.percentage<-sapply(dataSelected[,2],function(x) {
#       if(x>stats[5]){
#         p<-(x-stats[4])/(stats[4]-stats[2])
#         percentage<-(p-1.5)/1.5*100
#       }else if(x<stats[1]){
#         p<-(x-stats[2])/(stats[2]-stats[4])
#         percentage<-(-p+1.5)/1.5*100
#       }
#     })
#     #Dataframe with outliers percentage
#     dataSelected[,2]<-dataSelected[,2]<-outliers.as.percentage
    anom2<-AnomalyDetectionTs(data, max_anoms=0.02, direction='both', plot=FALSE,e_value=TRUE)
    #Select data at the interval t0-tf
    dataSelected<-data.frame(anom2$anoms[,1],anom2$anoms[,3])

    selectedtoZero<-which(dataSelected[,2]<0)
    dataSelected[selectedtoZero,2]<-0
    selected<-which(as.integer(dataSelected[,1])>as.integer(fromISO(t0)))
    dataSelected<-dataSelected[selected,]
    colnames (dataSelected)<- c("timestamp","expected")
    clnms <- colnames(dataSelected)
    name<-colnames(dataSelected)[2]
    
    name.value <- function(i){
      quote <- '';
      
      if(i =='timestamp'){
        quote <- '"';
      }
      if(i =='timestamp'){
        paste('"', i, '" : ', quote, paste(toISO(as.numeric(dataSelected[,i]))), quote, sep='')
      }
      else{
        if(i =='events'){
          paste('"result":{"', name, '" : ', quote, dataSelected[,i], quote, sep='')
        }
        else{
          paste('"', i, '" : ', quote, dataSelected[,i], quote, sep='') 
        }
      }
    }
    
    objs <- apply(sapply(clnms, name.value), 1, function(x){paste(x, collapse=', ')})
    
    objs <- paste('{', objs, '}}')
    
    res <- paste('[', paste(objs, collapse=', '), ']')
    
    
    
    
    
    
  },error=function(err){cat("\n Error generating outlier detection \n")}
  ,finally= return(res) )
  
}

.env$querytoDruid<-function (jsonstr, url, verbose = F, benchmark = F, ...) 
{
    if (is.null(jsonstr)) {
        res <- GET(url = url, encoding = "gzip", .encoding = "UTF-8")
    }
    else {
        if (verbose) {
            message(jsonstr)
        }
        res <- POST(url, content_type_json(), body = jsonstr, 
            encoding = "gzip", .encoding = "UTF-8", verbose = verbose)
    }
    if (status_code(res) >= 300 && !is.na(pmatch("application/json", 
        res$header$`content-type`))) {
        err <- content(res, type = "application/json")
        stop(http_condition(res, "error", message = err$error, 
            call = sys.call(-1)))
    }
    else {
        stop_for_status(res)
    }
    if (benchmark) {
        list()
    }
    else {
        content(res, type = "application/json")
    }
}


.env$forecasting<-function(query,period,t0toPredict,tftoPredict,start_time,end_time,percentage,broker,intervals){
  
  
  if (period == "pt1m"){
    freq <- 1440
    step<-60
  }
  if (period == "pt2m"){
    freq<- 720
    step<-120
  }
  if (period == "pt5m"){
    freq<- 288
    step<-300}
  if (period == "pt15m"){
    freq<- 96
    step<-900}
  if (period == "pt30m"){
    freq<-48
    step<-1800}
  if (period == "pt1h"){
    freq<-24
    step<-3600}
  if (period == "pt60m"){
    freq<-24
    step<-3600}
  if (period == "pt2h"){
    freq<-12 
    step<-7200 }
  if (period == "pt8h"){
    freq<-3
    step<-28800}
  if (period == "pt24h"){
    freq<-7
    step<-86400}
  if (period == "pt120h"){
    freq<-6
    step<-432000}
  if (period == "pt168h"){
    freq<-4
    step<-604800}
  if (period == "pt720h"){
    freq<-3
    step<-2592000}
  if (period == "p1m"){
    freq<-3
    step<-2592000}
  
  
  suppressMessages(library(RDruid))
  suppressMessages(library(RJSONIO))
  suppressMessages(library(forecast))
  options(RCurlOptions=list(verbose=F,ssl.verifypeer=FALSE,   httpheader = c("Content-Type"="application/json"),writeheader=T))
  
  
  data<-querytoDruid(query,broker)   
  try(data<-RDruid:::druid.resulttodf(data))
  
  if(dim(data)[1]==0){
  	forecast = as.integer(0)
    cat(paste("#### R: No available data for this interval"))
    cat(paste("Query:",query,"\n"))
  }else{
    dataSource<-fromJSON(query)$dataSource
    
    if(fromJSON(query)$dataSource == "rb_monitor"){
      dataaux<-data[,1]
      
      timearray<-seq(as.numeric(dataaux[1]), as.numeric(dataaux[length(dataaux)])-step,by=step)
      timearray<-data.frame(timearray,0)
      if(is.null(fromJSON(query)$postAggregations)){
        selection<-as.character(fromJSON(query)$aggregations[[1]]['name'])
      }else{
        selection<-as.character(fromJSON(query)$postAggregations[[1]]['name'])
      } 
      for(i in 1:length(dataaux)){
        
        timearray[which(timearray[,1]==as.numeric(dataaux[i])),2] = data[i,selection]
        
        if(i<length(dataaux) && (as.numeric(dataaux[i+1])-as.numeric(dataaux[i]))>step){
          j<-0
          while(j<5 && j<floor((as.numeric(dataaux[i+1])-as.numeric(dataaux[i]))/step)){
            
            timearray[which(timearray[,1]==as.numeric(dataaux[i]))+j,2]=data[i,selection]  
            j=j+1
            
          }
          
          
        }
        
      }
      a<-ts(timearray[,2],frequency=freq)
      b<-a+0.00001  
    }
    
    else{

      dataaux<-data[,1]
      
      timearray<-seq(as.numeric(dataaux[1]), as.numeric(dataaux[length(dataaux)])-step,by=step)
      timearray<-data.frame(timearray,0)
      if(is.null(fromJSON(query)$postAggregations)){
        selection<-as.character(fromJSON(query)$aggregations[[1]]['name'])
      }else{
        selection<-as.character(fromJSON(query)$postAggregations[[1]]['name'])
      } 
      for(i in 1:length(dataaux)){
        
        timearray[which(timearray[,1]==as.numeric(dataaux[i])),2] = data[i,selection]
        
      }
      a<-ts(timearray[,2],frequency=freq)
      b<-a+0.00001
    }
    
    h0toReturn<-0
    hftoReturn<-(as.numeric(tftoPredict)-as.numeric(t0toPredict))/step
    if(length(b)<2*freq){

      forecast = as.integer(0)

     
   	  
      cat("\n\n#### R: Data available is less than 2 times the interval that you want to predict.\n\n")
      cat(paste("Query:",query,"\n"))
      return(forecast)
    }
    else{
      if(percentage==0){
        fore<-stlf(b,method="arima",h=hftoReturn,lambda=0,robust=FALSE)
        
      }else{
        # Bounds
        bottomLimit <- 0
        upperLimit <- 100
        b[b>=100]=99.99999
        # Transform data
        y <- log((b-bottomLimit)/(upperLimit-b))
        
        if(length(which(a==0))/length(a)>0.5){
          fore<-stlf(y,method="arima",h=hftoReturn,robust=FALSE)
        }else{
          fore<-stlf(y,method="arima",h=hftoReturn,robust=FALSE)
        }
        
        
        # Back-transform forecasts
        fore$mean <- (upperLimit-bottomLimit)*exp(fore$mean)/(1+exp(fore$mean)) + bottomLimit
        fore$lower <- (upperLimit-bottomLimit)*exp(fore$lower)/(1+exp(fore$lower)) + bottomLimit
        fore$upper <- (upperLimit-bottomLimit)*exp(fore$upper)/(1+exp(fore$upper)) + bottomLimit
        fore$x<-b
      }
      
      
      if (intervals == 0){
        
        forec<-data.frame(seq(as.numeric(t0toPredict), as.numeric(tftoPredict)-step,by=step),fore$mean[(h0toReturn+1):hftoReturn])
        
        
        colnames(forec)<-(c("timestamp","events"))
      }else{
        forec<-data.frame(seq(as.numeric(t0toPredict), as.numeric(tftoPredict)-step,by=step),fore$mean[h0toReturn:hftoReturn],fore$upper[h0toReturn:hftoReturn,2],fore$lower[h0toReturn:hftoReturn,2])
        
        
        colnames(forec)<-(c("timestamp","events","topInterval","bottomInterval"))
        
        
      }
      clnms <- colnames(forec)
      name<-colnames(forec)[2]
      
      name.value <- function(i){
        quote <- '';
        
        if(i =='timestamp'){
          quote <- '"';
        }
        if(i =='timestamp'){
          paste('"', i, '" : ', quote, paste(toISO(as.numeric(forec[,i]))), quote, sep='')
        }
        else{
          if(i =='events'){
            paste('"result":{"', name, '" : ', quote, forec[,i], quote, sep='')
          }
          else{
            paste('"', i, '" : ', quote, forec[,i], quote, sep='') 
          }
        }
      }
      
      objs <- apply(sapply(clnms, name.value), 1, function(x){paste(x, collapse=', ')})
      
      objs <- paste('{', objs, '}}')
      
      res <- paste('[', paste(objs, collapse=', '), ']')
      
      return(res)
    }
  }
}







## Attach all the variables above
attach(.env)
 
## .First() run at the start of every R session. 
## Use to load commonly used packages? 
.First <- function() {
	# suppressMessages(library(ggplot2))
	cat("\nSuccessfully loaded new .Rprofile at", date(), "\n")
}

