As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Lidando com dados que chegam tardiamente
Você pode ter cenários em que os dados chegam com um atraso significativo, por exemplo, o horário em que os dados foram ingeridos no Timestream LiveAnalytics está significativamente atrasado em comparação com o timestamp associado às linhas que são ingeridas. Nos exemplos anteriores, você viu como usar os intervalos de tempo definidos pelo parâmetro @scheduled_runtime para contabilizar alguns dados que chegam tarde. No entanto, se você tiver casos de uso em que os dados podem ser atrasados em horas ou dias, talvez seja necessário um padrão diferente para garantir que seus pré-cálculos na tabela derivada sejam atualizados adequadamente para refletir esses dados que chegam tardiamente. Para obter informações gerais sobre dados de chegada tardia, consulte. Gravando dados (inserções e acréscimos)
A seguir, você verá duas maneiras diferentes de lidar com esses dados que chegam tardiamente.
-
Se você tiver atrasos previsíveis na chegada dos dados, poderá usar outra computação programada “atualizada” para atualizar seus agregados para dados que chegam tarde.
-
Se você tiver atrasos imprevisíveis ou dados ocasionais de chegada tardia, poderá usar execuções manuais para atualizar as tabelas derivadas.
Esta discussão aborda cenários de chegada tardia de dados. No entanto, os mesmos princípios se aplicam às correções de dados, nas quais você modificou os dados na tabela de origem e deseja atualizar os agregados nas tabelas derivadas.
Tópicos
Consultas de atualização programadas
Consulta agregando dados que chegaram a tempo
Abaixo está um padrão: você verá como usar uma forma automatizada de atualizar seus agregados se houver atrasos previsíveis na chegada dos dados. Considere um dos exemplos anteriores de um cálculo programado em dados em tempo real abaixo. Esse cálculo programado atualiza a tabela derivada uma vez a cada 30 minutos e já contabiliza dados com até uma hora de atraso.
{ "Name": "MultiPT30mPerHrPerTimeseriesDPCount", "QueryString": "SELECT region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h) as hour, SUM(CASE WHEN measure_name = 'metrics' THEN 20 ELSE 5 END) as numDataPoints FROM raw_data.devops WHERE time BETWEEN bin(@scheduled_runtime, 1h) - 1h AND @scheduled_runtime + 1h GROUP BY region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h)", "ScheduleConfiguration": { "ScheduleExpression": "cron(0/30 * * * ? *)" }, "NotificationConfiguration": { "SnsConfiguration": { "TopicArn": "******" } }, "TargetConfiguration": { "TimestreamConfiguration": { "DatabaseName": "derived", "TableName": "dp_per_timeseries_per_hr", "TimeColumn": "hour", "DimensionMappings": [ { "Name": "region", "DimensionValueType": "VARCHAR" }, { "Name": "cell", "DimensionValueType": "VARCHAR" }, { "Name": "silo", "DimensionValueType": "VARCHAR" }, { "Name": "availability_zone", "DimensionValueType": "VARCHAR" }, { "Name": "microservice_name", "DimensionValueType": "VARCHAR" }, { "Name": "instance_type", "DimensionValueType": "VARCHAR" }, { "Name": "os_version", "DimensionValueType": "VARCHAR" }, { "Name": "instance_name", "DimensionValueType": "VARCHAR" }, { "Name": "process_name", "DimensionValueType": "VARCHAR" }, { "Name": "jdk_version", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": { "TargetMultiMeasureName": "numDataPoints", "MultiMeasureAttributeMappings": [ { "SourceColumn": "numDataPoints", "MeasureValueType": "BIGINT" } ] } } }, "ErrorReportConfiguration": { "S3Configuration" : { "BucketName" : "******", "ObjectKeyPrefix": "errors", "EncryptionOption": "SSE_S3" } }, "ScheduledQueryExecutionRoleArn": "******" }
Consulta de atualização atualizando os agregados para dados que chegam tardiamente
Agora, se você considerar o caso, seus dados podem ser atrasados em cerca de 12 horas. Abaixo está uma variante da mesma consulta. No entanto, a diferença é que ele calcula os agregados em dados que estão atrasados em até 12 horas em comparação com quando a computação programada está sendo acionada. Por exemplo, você vê a consulta no exemplo abaixo. O intervalo de tempo que essa consulta tem como alvo é entre 2h e 14h antes de ser acionada. Além disso, se você observar a expressão de cronograma cron (0 0,12 * *? *), ele acionará o cálculo às 00:00 UTC e 12:00 UTC todos os dias. Portanto, quando a consulta é acionada em 01/12/2021 00:00:00, a consulta atualiza os agregados no intervalo de tempo 2021-11-30 10:00:00 a 2021-11-30 22:00:00. As consultas agendadas usam uma semântica ascendente semelhante à Timestream para LiveAnalytics gravações, em que essa consulta de recuperação atualizará os valores agregados com valores mais novos se houver dados atrasados na janela ou se novos agregados forem encontrados (por exemplo, um novo agrupamento aparecerá nesse agregado que não estava presente quando a computação programada original foi acionada) e, em seguida, o novo agregado será inserido na tabela derivada. Da mesma forma, quando a próxima instância for acionada em 01/12/2021 12:00:00, essa instância atualizará os agregados no intervalo 2021-11-30 22:00:00 a 2021-12-01 10:00:00.
{ "Name": "MultiPT12HPerHrPerTimeseriesDPCountCatchUp", "QueryString": "SELECT region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h) as hour, SUM(CASE WHEN measure_name = 'metrics' THEN 20 ELSE 5 END) as numDataPoints FROM raw_data.devops WHERE time BETWEEN bin(@scheduled_runtime, 1h) - 14h AND bin(@scheduled_runtime, 1h) - 2h GROUP BY region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h)", "ScheduleConfiguration": { "ScheduleExpression": "cron(0 0,12 * * ? *)" }, "NotificationConfiguration": { "SnsConfiguration": { "TopicArn": "******" } }, "TargetConfiguration": { "TimestreamConfiguration": { "DatabaseName": "derived", "TableName": "dp_per_timeseries_per_hr", "TimeColumn": "hour", "DimensionMappings": [ { "Name": "region", "DimensionValueType": "VARCHAR" }, { "Name": "cell", "DimensionValueType": "VARCHAR" }, { "Name": "silo", "DimensionValueType": "VARCHAR" }, { "Name": "availability_zone", "DimensionValueType": "VARCHAR" }, { "Name": "microservice_name", "DimensionValueType": "VARCHAR" }, { "Name": "instance_type", "DimensionValueType": "VARCHAR" }, { "Name": "os_version", "DimensionValueType": "VARCHAR" }, { "Name": "instance_name", "DimensionValueType": "VARCHAR" }, { "Name": "process_name", "DimensionValueType": "VARCHAR" }, { "Name": "jdk_version", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": { "TargetMultiMeasureName": "numDataPoints", "MultiMeasureAttributeMappings": [ { "SourceColumn": "numDataPoints", "MeasureValueType": "BIGINT" } ] } } }, "ErrorReportConfiguration": { "S3Configuration" : { "BucketName" : "******", "ObjectKeyPrefix": "errors", "EncryptionOption": "SSE_S3" } }, "ScheduledQueryExecutionRoleArn": "******" }
O exemplo anterior é uma ilustração, supondo que sua chegada tardia esteja limitada a 12 horas e que não há problema em atualizar a tabela derivada uma vez a cada 12 horas para que os dados cheguem depois da janela em tempo real. Você pode adaptar esse padrão para atualizar sua tabela derivada uma vez a cada hora, para que ela reflita os dados que chegam tardiamente mais cedo. Da mesma forma, você pode adaptar o intervalo de tempo para ser superior a 12 horas, por exemplo, um dia ou até uma semana ou mais, para lidar com dados previsíveis que chegam tarde.
Execuções manuais para dados imprevisíveis que chegam tardiamente
Pode haver casos em que você tenha dados imprevisíveis chegando atrasados ou tenha feito alterações nos dados de origem e atualizado alguns valores após o fato. Em todos esses casos, você pode acionar manualmente consultas agendadas para atualizar a tabela derivada. Abaixo está um exemplo de como você pode conseguir isso.
Suponha que você tenha o caso de uso em que a computação foi gravada na tabela derivada dp_per_timeseries_per_hr. Seus dados básicos na tabela devops foram atualizados no intervalo de tempo 2021-11-30 23:00:00 - 2021-12-01 00:00:00. Há duas consultas agendadas diferentes que podem ser usadas para atualizar essa tabela derivada: Multi PT3 0 mPerHr PerTimeseries DPCount e Multi. PT12 HPer HrPerTimeseries DPCount CatchUp Cada cálculo agendado para o qual você cria no Timestream LiveAnalytics tem um ARN exclusivo que você obtém ao criar o cálculo ou ao realizar uma operação de lista. Você pode usar o ARN para o cálculo e um valor para o parâmetro @scheduled_runtime obtido pela consulta para realizar essa operação.
Suponha que o cálculo para Multi PT3 0 mPerHr PerTimeseries DPCount tenha um ARN arn_1 e você queira usar esse cálculo para atualizar a tabela derivada. Como o cálculo agendado anterior atualiza os agregados 1 hora antes e 1 hora depois do valor @scheduled_runtime, você pode cobrir o intervalo de tempo da atualização (2021-11-30 23:00:00 - 2021-12-01 00:00:00) usando um valor de 2021-12-01 00:00:00 para o parâmetro @scheduled_runtime. Você pode usar a ExecuteScheduledQuery API para passar o ARN desse cálculo e o valor do parâmetro de tempo em segundos de época (em UTC) para fazer isso. Abaixo está um exemplo usando a AWS CLI e você pode seguir o mesmo padrão usando qualquer um dos SDKs suportados pelo Timestream for. LiveAnalytics
aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638316800 --profile profile --region us-east-1
No exemplo anterior, profile é o AWS perfil que tem os privilégios apropriados para fazer essa chamada de API e 1638316800 corresponde à segunda época de 2021-12-01 00:00:00. Esse gatilho manual se comporta quase como o gatilho automatizado, supondo que o sistema tenha acionado essa invocação no período de tempo desejado.
Se você teve uma atualização em um período mais longo, digamos que os dados básicos foram atualizados para 2021-11-30 23:00:00 - 2021-12-01 11:00:00, então você pode acionar as consultas anteriores várias vezes para cobrir todo esse intervalo de tempo. Por exemplo, você pode fazer seis execuções diferentes da seguinte maneira.
aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638316800 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638324000 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638331200 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638338400 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638345600 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638352800 --profile profile --region us-east-1
Os seis comandos anteriores correspondem à computação programada invocada em 2021-12-01 00:00:00, 2021-12-01 02:00:00, 2021-12-01 04:0:00, 2021-12-01 06:00:00, 2021-12-01 08:00:00e 2021-12-01 10:00:
Como alternativa, você pode usar o cálculo Multi PT12 HPer HrPerTimeseries DPCount CatchUp acionado às 13:00:00 de 01/12/2021 para uma execução para atualizar os agregados em todo o intervalo de tempo de 12 horas. Por exemplo, se arn_2 for o ARN desse cálculo, você poderá executar o seguinte comando na CLI.
aws timestream-query execute-scheduled-query --scheduled-query-arn arn_2 --invocation-time 1638363600 --profile profile --region us-east-1
É importante notar que, para um gatilho manual, você pode usar um carimbo de data/hora para o parâmetro de tempo de invocação que não precisa estar alinhado com os carimbos de data/hora do gatilho automatizado. Por exemplo, no exemplo anterior, você acionou o cálculo no horário 2021-12-01 13:00:00 embora a programação automatizada só seja acionada nos timestamps 2021-12-01 10:00:00, 2021-12-01 12:00:00e 2021-12-02 00:00:00. O Timestream for LiveAnalytics fornece a flexibilidade de acioná-lo com valores apropriados, conforme necessário para suas operações manuais.
Veja a seguir algumas considerações importantes ao usar a ExecuteScheduledQuery API.
-
Se você estiver acionando várias dessas invocações, precisará garantir que essas invocações não gerem resultados em intervalos de tempo sobrepostos. Por exemplo, nos exemplos anteriores, havia seis invocações. Cada invocação abrange um intervalo de tempo de 2 horas e, portanto, os timestamps de invocação foram distribuídos em duas horas cada para evitar qualquer sobreposição nas atualizações. Isso garante que os dados na tabela derivada terminem em um estado que corresponda aos agregados da tabela de origem. Se você não puder garantir intervalos de tempo não sobrepostos, certifique-se de que essas execuções sejam acionadas sequencialmente uma após a outra. Se você acionar várias execuções simultaneamente que se sobrepõem em seus intervalos de tempo, poderá ver falhas de gatilho nas quais poderá ver conflitos de versão nos relatórios de erros dessas execuções. Os resultados gerados por uma invocação de consulta agendada recebem uma versão com base em quando a invocação foi acionada. Portanto, as linhas geradas por invocações mais recentes têm versões superiores. Um registro de versão superior pode sobrescrever um registro de versão inferior. Para consultas agendadas acionadas automaticamente, o Timestream for gerencia LiveAnalytics automaticamente os agendamentos para que você não veja esses problemas, mesmo que as invocações subsequentes tenham intervalos de tempo sobrepostos.
-
observado anteriormente, você pode acionar as invocações com qualquer valor de timestamp para @scheduled_runtime. Portanto, é sua responsabilidade definir adequadamente os valores para que os intervalos de tempo apropriados sejam atualizados na tabela derivada correspondente aos intervalos em que os dados foram atualizados na tabela de origem.
-
Você também pode usar esses gatilhos manuais para consultas agendadas que estão no estado DESATIVADO. Isso permite definir consultas especiais que não são executadas em um agendamento automatizado, pois estão no estado DESATIVADO. Em vez disso, você pode usar os acionadores manuais neles para gerenciar correções de dados ou casos de uso de chegada tardia.