Coletar registros do Proofpoint Secure Email Relay

Compatível com:

Este documento explica como ingerir registros do Proofpoint Secure Email Relay no Google Security Operations usando o Google Cloud Storage V2.

O Proofpoint Secure Email Relay (SER) é um serviço de segurança de e-mail de saída que oferece criptografia, prevenção contra perda de dados (DLP) e aplicação de compliance para mensagens enviadas da sua organização. O analisador extrai campos dos dados de rastreamento de mensagens da SER e os mapeia para o modelo de dados unificado (UDM), capturando metadados de e-mail, métricas de capacidade de processamento, status de entrega e atividade do usuário.

Antes de começar

Verifique se você tem os pré-requisitos a seguir:

  • Uma instância do Google SecOps
  • Um projeto do GCP com a API Storage ativada
  • Permissões para criar e gerenciar intervalos do GCS
  • Permissões para gerenciar políticas do IAM em buckets do GCS
  • Permissões para criar serviços do Cloud Run, tópicos do Pub/Sub e jobs do Cloud Scheduler
  • Acesso privilegiado ao Proofpoint Secure Email Relay com acesso à chave de API

Criar um bucket do Google Cloud Storage

  1. Acesse o Console do Google Cloud.
  2. Selecione um projeto ou crie um novo.
  3. No menu de navegação, acesse Cloud Storage > Buckets.
  4. Clique em Criar bucket.
  5. Informe os seguintes detalhes de configuração:

    Configuração Valor
    Nomeie seu bucket Insira um nome exclusivo globalmente, por exemplo, proofpoint-ser-logs.
    Tipo de local Escolha de acordo com suas necessidades (região, birregional, multirregional)
    Local Selecione o local (por exemplo, us-central1).
    Classe de armazenamento Padrão (recomendado para registros acessados com frequência)
    Controle de acesso Uniforme (recomendado)
    Ferramentas de proteção Opcional: ativar o controle de versões de objetos ou a política de retenção
  6. Clique em Criar.

Coletar credenciais da API SER do Proofpoint

Receber credenciais de API

  1. Faça login no portal de administração do Proofpoint Secure Email Relay com as credenciais de administrador.
  2. Acesse Configurações > Chaves de API.
  3. Clique em Gerar ou Criar chave de API.
  4. Copie e armazene com segurança as seguintes credenciais:

    • Chave de API: copie esse valor.
    • Chave secreta da API: copie esse valor.

Verifique as permissões

Para verificar se as credenciais da API têm as permissões necessárias:

  1. Faça login no portal de administrador do Proofpoint SER.
  2. Acesse Configurações > Chaves de API.
  3. Confirme se a chave de API está listada e tem o status Ativa.
  4. Verifique se a chave tem acesso aos endpoints de rastreamento e geração de relatórios de mensagens.

Testar o acesso à API

  • Teste suas credenciais antes de prosseguir com a integração:

    # Replace with your actual credentials
    API_KEY="<your-api-key>"
    API_SECRET="<your-api-secret>"
    
    # Test API access - retrieve recent messages
    curl -v -u "${API_KEY}:${API_SECRET}" \
      "https://ser-api.proofpoint.com/v1/messages?startDate=$(date -u -v-1H +%Y-%m-%dT%H:%M:%SZ)&endDate=$(date -u +%Y-%m-%dT%H:%M:%SZ)"
    

Criar uma conta de serviço para a função do Cloud Run

A função do Cloud Run precisa de uma conta de serviço com permissões para gravar no bucket do GCS e ser invocada pelo Pub/Sub.

Criar conta de serviço

  1. No Console do GCP, acesse IAM e administrador > Contas de serviço.
  2. Clique em Criar conta de serviço.
  3. Informe os seguintes detalhes de configuração:
    • Nome da conta de serviço: insira proofpoint-ser-collector-sa
    • Descrição da conta de serviço: digite Service account for Cloud Run function to collect Proofpoint Secure Email Relay logs
  4. Clique em Criar e continuar.
  5. Na seção Conceder acesso a essa conta de serviço ao projeto, adicione os seguintes papéis:
    1. Clique em Selecionar papel.
    2. Pesquise e selecione Administrador de objetos do Storage.
    3. Clique em + Adicionar outro papel.
    4. Pesquise e selecione Invocador do Cloud Run.
    5. Clique em + Adicionar outro papel.
    6. Pesquise e selecione Invocador do Cloud Functions.
  6. Clique em Continuar.
  7. Clique em Concluído.

Esses papéis são necessários para:

  • Administrador de objetos do Storage: grava registros no bucket do GCS e gerencia arquivos de estado.
  • Invocador do Cloud Run: permite que o Pub/Sub invoque a função.
  • Invocador do Cloud Functions: permite a invocação de funções

Conceder permissões do IAM no bucket do GCS

Conceda à conta de serviço permissões de gravação no bucket do GCS:

  1. Acesse Cloud Storage > Buckets.
  2. Clique no nome do bucket (por exemplo, proofpoint-ser-logs).
  3. Acesse a guia Permissões.
  4. Clique em Conceder acesso.
  5. Informe os seguintes detalhes de configuração:
    • Adicionar principais: insira o e-mail da conta de serviço (por exemplo, proofpoint-ser-collector-sa@PROJECT_ID.iam.gserviceaccount.com).
    • Atribuir papéis: selecione Administrador de objetos do Storage.
  6. Clique em Salvar.

Criar tópico Pub/Sub

Crie um tópico do Pub/Sub em que o Cloud Scheduler vai publicar e a função do Cloud Run vai se inscrever.

  1. No Console do GCP, acesse Pub/Sub > Tópicos.
  2. Selecione Criar tópico.
  3. Informe os seguintes detalhes de configuração:
    • ID do tópico: insira proofpoint-ser-trigger
    • Não mude as outras configurações.
  4. Clique em Criar.

Criar uma função do Cloud Run para coletar registros

A função do Cloud Run será acionada por mensagens do Pub/Sub do Cloud Scheduler para buscar registros de mensagens da API SER do Proofpoint e gravá-los no GCS.

  1. No console do GCP, acesse o Cloud Run.
  2. Clique em Criar serviço.
  3. Selecione Função (use um editor in-line para criar uma função).
  4. Na seção Configurar, forneça os seguintes detalhes de configuração:

    Configuração Valor
    Nome do serviço proofpoint-ser-collector
    Região Selecione a região que corresponde ao seu bucket do GCS (por exemplo, us-central1).
    Ambiente de execução Selecione Python 3.12 ou uma versão mais recente.
  5. Na seção Acionador (opcional):

    1. Clique em + Adicionar gatilho.
    2. Selecione Cloud Pub/Sub.
    3. Em Selecionar um tópico do Cloud Pub/Sub, escolha o tópico do Pub/Sub (proofpoint-ser-trigger).
    4. Clique em Salvar.
  6. Na seção Autenticação:

    1. Selecione Exigir autenticação.
    2. Confira o Identity and Access Management (IAM).
  7. Role a tela para baixo e expanda Contêineres, rede, segurança.

  8. Acesse a guia Segurança:

    • Conta de serviço: selecione a conta de serviço (proofpoint-ser-collector-sa)
  9. Acesse a guia Contêineres:

    1. Clique em Variáveis e secrets.
    2. Clique em + Adicionar variável para cada variável de ambiente:
    Nome da variável Valor de exemplo Descrição
    GCS_BUCKET proofpoint-ser-logs Nome do bucket do GCS
    GCS_PREFIX ser-logs Prefixo para arquivos de registro
    STATE_KEY ser-logs/state.json Caminho do arquivo de estado
    API_KEY your-api-key Chave de API do Proofpoint SER
    API_SECRET your-api-secret Chave secreta da API SER do Proofpoint
    MAX_RECORDS 1000 Máximo de registros por execução
    PAGE_SIZE 100 Registros por página
    LOOKBACK_HOURS 1 Período de lookback inicial
  10. Na seção Variáveis e secrets, role a tela para baixo até Solicitações:

    • Tempo limite da solicitação: insira 600 segundos (10 minutos)
  11. Acesse a guia Configurações:

    • Na seção Recursos:
      • Memória: selecione 512 MiB ou mais.
      • CPU: selecione 1
  12. Na seção Escalonamento de revisão:

    • Número mínimo de instâncias: insira 0
    • Número máximo de instâncias: insira 100 ou ajuste com base na carga esperada.
  13. Clique em Criar.

  14. Aguarde a criação do serviço (1 a 2 minutos).

  15. Depois que o serviço for criado, o editor de código inline será aberto automaticamente.

Adicionar código da função

  1. Insira main no campo Ponto de entrada.
  2. No editor de código em linha, crie dois arquivos:

    • Primeiro arquivo:main.py:

      import functions_framework
      from google.cloud import storage
      import json
      import os
      import urllib3
      from datetime import datetime, timezone, timedelta
      import time
      import base64
      
      # Initialize HTTP client with timeouts
      http = urllib3.PoolManager(
        timeout=urllib3.Timeout(connect=5.0, read=30.0),
        retries=False,
      )
      
      # Initialize Storage client
      storage_client = storage.Client()
      
      # Environment variables
      GCS_BUCKET = os.environ.get('GCS_BUCKET')
      GCS_PREFIX = os.environ.get('GCS_PREFIX', 'ser-logs')
      STATE_KEY = os.environ.get('STATE_KEY', 'ser-logs/state.json')
      API_KEY = os.environ.get('API_KEY')
      API_SECRET = os.environ.get('API_SECRET')
      MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '1000'))
      PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100'))
      LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '1'))
      
      API_BASE = "https://ser-api.proofpoint.com/v1"
      
      def parse_datetime(value: str) -> datetime:
        """Parse ISO datetime string to datetime object."""
        if value.endswith("Z"):
          value = value[:-1] + "+00:00"
        return datetime.fromisoformat(value)
      
      @functions_framework.cloud_event
      def main(cloud_event):
        """
        Cloud Run function triggered by Pub/Sub to fetch Proofpoint SER
        message logs and write to GCS.
      
        Args:
          cloud_event: CloudEvent object containing Pub/Sub message
        """
      
        if not all([GCS_BUCKET, API_KEY, API_SECRET]):
          print('Error: Missing required environment variables')
          return
      
        try:
          # Get GCS bucket
          bucket = storage_client.bucket(GCS_BUCKET)
      
          # Load state
          state = load_state(bucket, STATE_KEY)
      
          # Determine time window
          now = datetime.now(timezone.utc)
          last_time = None
      
          if isinstance(state, dict) and state.get("last_event_time"):
            try:
              last_time = parse_datetime(state["last_event_time"])
              last_time = last_time - timedelta(minutes=2)
            except Exception as e:
              print(f"Warning: Could not parse last_event_time: {e}")
      
          if last_time is None:
            last_time = now - timedelta(hours=LOOKBACK_HOURS)
      
          print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}")
      
          # Build auth header (Basic auth with API key and secret)
          auth_string = f"{API_KEY}:{API_SECRET}"
          auth_bytes = auth_string.encode('utf-8')
          auth_b64 = base64.b64encode(auth_bytes).decode('utf-8')
      
          # Fetch messages
          records, newest_event_time = fetch_messages(
            auth_b64=auth_b64,
            start_time=last_time,
            end_time=now,
            page_size=PAGE_SIZE,
            max_records=MAX_RECORDS,
          )
      
          if not records:
            print("No new log records found.")
            save_state(bucket, STATE_KEY, now.isoformat())
            return
      
          # Write to GCS as NDJSON
          timestamp = now.strftime('%Y%m%d_%H%M%S')
          object_key = f"{GCS_PREFIX}/logs_{timestamp}.ndjson"
          blob = bucket.blob(object_key)
      
          ndjson = '\n'.join([json.dumps(record, ensure_ascii=False) for record in records]) + '\n'
          blob.upload_from_string(ndjson, content_type='application/x-ndjson')
      
          print(f"Wrote {len(records)} records to gs://{GCS_BUCKET}/{object_key}")
      
          # Update state with newest event time
          if newest_event_time:
            save_state(bucket, STATE_KEY, newest_event_time)
          else:
            save_state(bucket, STATE_KEY, now.isoformat())
      
          print(f"Successfully processed {len(records)} records")
      
        except Exception as e:
          print(f'Error processing logs: {str(e)}')
          raise
      
      def load_state(bucket, key):
        """Load state from GCS."""
        try:
          blob = bucket.blob(key)
          if blob.exists():
            state_data = blob.download_as_text()
            return json.loads(state_data)
        except Exception as e:
          print(f"Warning: Could not load state: {e}")
      
        return {}
      
      def save_state(bucket, key, last_event_time_iso: str):
        """Save the last event timestamp to GCS state file."""
        try:
          state = {'last_event_time': last_event_time_iso}
          blob = bucket.blob(key)
          blob.upload_from_string(
            json.dumps(state, indent=2),
            content_type='application/json'
          )
          print(f"Saved state: last_event_time={last_event_time_iso}")
        except Exception as e:
          print(f"Warning: Could not save state: {e}")
      
      def fetch_messages(auth_b64: str, start_time: datetime, end_time: datetime, page_size: int, max_records: int):
        """
        Fetch message logs from Proofpoint SER API with pagination.
      
        Args:
          auth_b64: Base64-encoded API key:secret for Basic auth
          start_time: Start time for log query
          end_time: End time for log query
          page_size: Number of records per page
          max_records: Maximum total records to fetch
      
        Returns:
          Tuple of (records list, newest_event_time ISO string)
        """
        headers = {
          'Authorization': f'Basic {auth_b64}',
          'Accept': 'application/json',
          'Content-Type': 'application/json',
          'User-Agent': 'GoogleSecOps-ProofpointSERCollector/1.0',
        }
      
        records = []
        newest_time = None
        page_num = 0
        backoff = 1.0
        offset = 0
      
        start_date = start_time.strftime('%Y-%m-%dT%H:%M:%SZ')
        end_date = end_time.strftime('%Y-%m-%dT%H:%M:%SZ')
      
        while True:
          page_num += 1
      
          if len(records) >= max_records:
            print(f"Reached max_records limit ({max_records})")
            break
      
          current_limit = min(page_size, max_records - len(records))
          url = f"{API_BASE}/messages?startDate={start_date}&endDate={end_date}&offset={offset}&limit={current_limit}"
      
          try:
            response = http.request('GET', url, headers=headers)
      
            # Handle rate limiting with exponential backoff
            if response.status == 429:
              retry_after = int(response.headers.get('Retry-After', str(int(backoff))))
              print(f"Rate limited (429). Retrying after {retry_after}s...")
              time.sleep(retry_after)
              backoff = min(backoff * 2, 30.0)
              continue
      
            backoff = 1.0
      
            if response.status != 200:
              print(f"HTTP Error: {response.status}")
              response_text = response.data.decode('utf-8')
              print(f"Response body: {response_text}")
              return [], None
      
            data = json.loads(response.data.decode('utf-8'))
      
            if isinstance(data, list):
              page_results = data
            else:
              page_results = data.get('messages', data.get('results', data.get('data', [])))
      
            if not page_results:
              print(f"No more results (empty page)")
              break
      
            print(f"Page {page_num}: Retrieved {len(page_results)} messages")
            records.extend(page_results)
      
            # Track newest event time
            for event in page_results:
              try:
                event_time = event.get('date') or event.get('timestamp') or event.get('sentDate')
                if event_time:
                  if newest_time is None or parse_datetime(event_time) > parse_datetime(newest_time):
                    newest_time = event_time
              except Exception as e:
                print(f"Warning: Could not parse event time: {e}")
      
            # Check for more results
            if len(page_results) < current_limit:
              print(f"Reached last page (size={len(page_results)} < limit={current_limit})")
              break
      
            offset += len(page_results)
      
          except Exception as e:
            print(f"Error fetching messages: {e}")
            return [], None
      
        print(f"Retrieved {len(records)} total messages from {page_num} pages")
        return records, newest_time
      
    • Segundo arquivo:requirements.txt:

      functions-framework==3.*
      google-cloud-storage==2.*
      urllib3>=2.0.0
      
  3. Clique em Implantar para salvar e implantar a função.

  4. Aguarde a conclusão da implantação (2 a 3 minutos).

Criar o job do Cloud Scheduler

O Cloud Scheduler vai publicar mensagens no tópico do Pub/Sub em intervalos regulares, acionando a função do Cloud Run.

  1. No Console do GCP, acesse o Cloud Scheduler.
  2. Clique em Criar job.
  3. Informe os seguintes detalhes de configuração:

    Configuração Valor
    Nome proofpoint-ser-collector-hourly
    Região Selecione a mesma região da função do Cloud Run
    Frequência 0 * * * * (a cada hora, na hora)
    Fuso horário Selecione o fuso horário (UTC recomendado)
    Tipo de destino Pub/Sub
    Tópico Selecione o tópico do Pub/Sub (proofpoint-ser-trigger).
    Corpo da mensagem {} (objeto JSON vazio)
  4. Clique em Criar.

Opções de frequência de programação

Escolha a frequência com base no volume de registros e nos requisitos de latência:

Frequência Expressão Cron Caso de uso
A cada 5 minutos */5 * * * * Alto volume e baixa latência
A cada 15 minutos */15 * * * * Volume médio
A cada hora 0 * * * * Padrão (recomendado)
A cada 6 horas 0 */6 * * * Baixo volume, processamento em lote
Diariamente 0 0 * * * Coleta de dados históricos

Testar a integração

  1. No console do Cloud Scheduler, encontre seu job.
  2. Clique em Forçar execução para acionar o job manualmente.
  3. Aguarde alguns segundos.
  4. Acesse Cloud Run > Serviços.
  5. Clique no nome da função (proofpoint-ser-collector).
  6. Clique na guia Registros.
  7. Verifique se a função foi executada com sucesso. Procure:

    Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00
    Page 1: Retrieved X messages
    Wrote X records to gs://proofpoint-ser-logs/ser-logs/logs_YYYYMMDD_HHMMSS.ndjson
    Successfully processed X records
    
  8. Acesse Cloud Storage > Buckets.

  9. Clique no nome do bucket (proofpoint-ser-logs).

  10. Navegue até a pasta de prefixo (ser-logs/).

  11. Verifique se um novo arquivo .ndjson foi criado com o carimbo de data/hora atual.

Se você encontrar erros nos registros:

  • HTTP 401: verifique a chave de API e o secret de API nas variáveis de ambiente.
  • HTTP 403: verifique se a chave de API tem acesso ao endpoint de rastreamento de mensagens.
  • HTTP 429: limitação de taxa. A função vai tentar de novo automaticamente com espera.
  • Variáveis de ambiente ausentes: verifique se todas as variáveis necessárias estão definidas.

Recuperar a conta de serviço do Google SecOps

O Google SecOps usa uma conta de serviço exclusiva para ler dados do seu bucket do GCS. Você precisa conceder a essa conta de serviço acesso ao seu bucket.

Receber o e-mail da conta de serviço

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em Adicionar novo feed.
  3. Clique em Configurar um único feed.
  4. No campo Nome do feed, insira um nome para o feed (por exemplo, Proofpoint SER Logs).
  5. Selecione Google Cloud Storage V2 como o Tipo de origem.
  6. Selecione ProofPoint Secure Email Relay como o Tipo de registro.
  7. Clique em Receber conta de serviço.
  8. Um e-mail exclusivo da conta de serviço será exibido, por exemplo:

    chronicle-12345678@chronicle-gcp-prod.iam.gserviceaccount.com
    
  9. Copie esse endereço de e-mail para usar na próxima etapa.

  10. Clique em Próxima.

  11. Especifique valores para os seguintes parâmetros de entrada:

    • URL do bucket de armazenamento: insira o URI do bucket do GCS com o caminho do prefixo:

      gs://proofpoint-ser-logs/ser-logs/
      
      • Substitua:
        • proofpoint-ser-logs: o nome do bucket do GCS.
        • ser-logs: prefixo/caminho da pasta opcional onde os registros são armazenados (deixe em branco para a raiz).
    • Opção de exclusão da fonte: selecione a opção de exclusão de acordo com sua preferência:

      • Nunca: nunca exclui arquivos após as transferências (recomendado para testes).
      • Excluir arquivos transferidos: exclui os arquivos após a transferência bem-sucedida.
      • Excluir arquivos transferidos e diretórios vazios: exclui arquivos e diretórios vazios após a transferência bem-sucedida.

    • Idade máxima do arquivo: inclui arquivos modificados nos últimos dias. O padrão é 180 dias.

    • Namespace do recurso: o namespace do recurso

    • Rótulos de ingestão: o rótulo a ser aplicado aos eventos deste feed

  12. Clique em Próxima.

  13. Revise a nova configuração do feed na tela Finalizar e clique em Enviar.

Conceder permissões do IAM à conta de serviço do Google SecOps

3A conta de serviço do Google SecOps precisa do papel Leitor de objetos do Storage no bucket do GCS.

  1. Acesse Cloud Storage > Buckets.
  2. Clique no nome do bucket (por exemplo, proofpoint-ser-logs).
  3. Acesse a guia Permissões.
  4. Clique em Conceder acesso.
  5. Informe os seguintes detalhes de configuração:
    • Adicionar principais: cole o e-mail da conta de serviço do Google SecOps.
    • Atribuir papéis: selecione Leitor de objetos do Storage.
  6. Clique em Salvar.

Tabela de mapeamento do UDM

Campo de registro Mapeamento do UDM Lógica
status, details, data.throughputLimit, data.throughput, data.totalThroughput, log_metadata.totalThroughput, data.averageDailyThroughput, data.throughputForecast, data.remainingThroughput, data.acceptedThroughput, data.licenseStartDate, data.licenseEndDate, data.average7DayThroughput, data.average30DayThroughput, data.requestedMessages, data.acceptedMessages, data.sentMessages, data.deliveredMessages, data.avgAcceptedMessageSize, data.blockedMessages, data.quarantinedMessages, data.rejectedMessages, data.requestedThroughput, data.totalMessages, data.undeliveredMessages additional.fields Rótulos mesclados do mapa de status como valores de string, detalhes do mapa aninhado como chaves simplificadas com valores de string e vários campos de dados como valores de string ou número.
desc, data.name metadata.description Valor de "desc" se não estiver vazio, caso contrário, "data.name"
event_type metadata.event_type Definido como EMAIL_TRANSACTION se user_present for verdadeiro, caso contrário, GENERIC_EVENT
metadata.product_name Defina como "PROOFPOINT SER"
metadata.vendor_name Defina como "PROOFPOINT"
fromEnvelope network.email.bounce_address Valor de fromEnvelope se corresponder ao padrão de e-mail
fromHeader network.email.from Valor de fromHeader se corresponder ao padrão de e-mail
applicationName principal.administrative_domain Valor copiado diretamente
principal_host principal.asset.hostname Valor copiado diretamente
principal_host principal.hostname Valor copiado diretamente
principal_port principal.port Valor de principal_port convertido em número inteiro
userId, data.relayUserId principal.user.product_object_id Valor de userId se não estiver vazio, caso contrário, data.relayUserId
applicationUserName principal.user.user_display_name Valor copiado diretamente
senderName target.administrative_domain Valor copiado diretamente
senderId target.user.product_object_id Valor copiado diretamente
data.throughput" and "data.totalThroughput additional.fields Mapeado do registro de mudanças
data.throughputLimit", "data.averageDailyThroughput", "data.throughputForecast", "data.remainingThroughput", "data.acceptedThroughput", "data.licenseStartDate", "data.licenseEndDate", "data.average7DayThroughput", "data.average30DayThroughput", "data.requestedMessages", "data.acceptedMessages", "data.sentMessages", "data.deliveredMessages", "data.avgAcceptedMessageSize", "data.blockedMessages", "data.quarantinedMessages", "data.rejectedMessages", "data.requestedThroughput", "data.totalMessages", and "data.undeliveredMessages additional.fields Mapeado do registro de mudanças

Registro de alterações

Ver o registro de alterações deste analisador

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.