Coletar registros do IBM Security Identity Manager
Este documento explica como ingerir registros do IBM Security Identity Manager no Google Security Operations usando o Google Cloud Storage V2.
O IBM Security Identity Manager (ISIM) é uma solução automatizada e baseada em políticas que gerencia o acesso dos usuários em ambientes de TI, ajudando a impulsionar o gerenciamento e a governança de identidades eficazes em toda a empresa. Ao usar papéis, contas e permissões de acesso, ele automatiza a criação, a modificação, a recertificação e o encerramento de identidades de usuários durante todo o ciclo de vida do usuário. O produto também é conhecido como IBM Security Verify Governance - Identity Manager em versões mais recentes.
Antes de começar
Verifique se você atende os seguintes pré-requisitos:
- Uma instância do Google SecOps
- Um projeto do GCP com a API Storage ativada
- Permissões para criar e gerenciar intervalos do GCS
- Permissões para gerenciar políticas do IAM em buckets do GCS
- Permissões para criar serviços do Cloud Run, tópicos do Pub/Sub e jobs do Cloud Scheduler
- Acesso privilegiado à API REST do IBM Security Identity Manager.
- IBM Security Identity Manager versão 7.0.2 ou mais recente ou IBM Security Verify Governance 10.0.1 ou mais recente
- Uma conta de usuário do ISIM com privilégios administrativos para acessar os endpoints da API REST (
/itim/rest/activitiese/itim/rest/requests). - Conectividade de rede entre o projeto do GCP e o dispositivo virtual do ISIM (as regras de firewall precisam permitir o tráfego HTTPS na porta do ISIM)
Configurar o acesso à API do IBM Security Identity Manager
Para permitir que o Google SecOps recupere atividades e solicite dados de auditoria, configure uma conta de serviço dedicada no ISIM com as permissões necessárias para acesso à API REST.
Criar uma conta do ISIM dedicada para acesso à API
- Faça login no console de administração do IBM Security Identity Manager.
- Acesse Gerenciar usuários > Criar usuário.
- Crie uma nova conta de usuário para a integração (por exemplo,
secops-api-user). - Atribua a função de Administrador do sistema ou uma função personalizada que inclua permissões para visualizar atividades e solicitações pela API REST.
- Salve as credenciais da conta com segurança:
- Nome de usuário: o nome de usuário de login do ISIM (por exemplo,
secops-api-user) - Senha: a senha de login do ISIM
- Nome de usuário: o nome de usuário de login do ISIM (por exemplo,
Identificar o URL de base da API ISIM
O URL base da API ISIM segue este formato:
https://<isim-hostname>:<port>Por exemplo, se o nome do host do seu dispositivo virtual ISIM for
isim.example.come ele usar a porta HTTPS padrão:https://isim.example.com:9082
Testar o acesso à API
Teste suas credenciais antes de prosseguir com a integração:
# Replace with your actual credentials and ISIM URL ISIM_BASE_URL="https://isim.example.com:9082" ISIM_USER="chronicle-api-user" ISIM_PASS="your-password" # Authenticate and obtain session cookie curl -v -k -c cookies.txt \ -d "j_username=${ISIM_USER}&j_password=${ISIM_PASS}" \ "${ISIM_BASE_URL}/itim/j_security_check" # Retrieve CSRF token from systemusers/me endpoint curl -v -k -b cookies.txt \ -H "Accept: application/json" \ "${ISIM_BASE_URL}/itim/rest/systemusers/me" # Test activities endpoint curl -v -k -b cookies.txt \ -H "Accept: application/json" \ "${ISIM_BASE_URL}/itim/rest/activities"
Uma resposta bem-sucedida retorna uma matriz JSON de atividades recentes do ISIM.
Criar um bucket do Google Cloud Storage
- Acesse o Console do Google Cloud.
- Selecione um projeto ou crie um novo.
- No menu de navegação, acesse Cloud Storage > Buckets.
- Clique em Criar bucket.
Informe os seguintes detalhes de configuração:
Configuração Valor Nomeie seu bucket Insira um nome exclusivo globalmente, por exemplo, ibm-sim-logs-gcs.Tipo de local Escolha de acordo com suas necessidades (região, birregional, multirregional) Local Selecione o local (por exemplo, us-central1).Classe de armazenamento Padrão (recomendado para registros acessados com frequência) Controle de acesso Uniforme (recomendado) Ferramentas de proteção Opcional: ativar o controle de versões de objetos ou a política de retenção Clique em Criar.
Criar uma conta de serviço para a função do Cloud Run
A função do Cloud Run precisa de uma conta de serviço com permissões para gravar no bucket do GCS e ser invocada pelo Pub/Sub.
Criar conta de serviço
- No Console do GCP, acesse IAM e administrador > Contas de serviço.
- Clique em Criar conta de serviço.
- Informe os seguintes detalhes de configuração:
- Nome da conta de serviço: insira
ibm-sim-collector-sa - Descrição da conta de serviço: digite
Service account for Cloud Run function to collect IBM Security Identity Manager logs
- Nome da conta de serviço: insira
- Clique em Criar e continuar.
- Na seção Conceder acesso a essa conta de serviço ao projeto, adicione os seguintes papéis:
- Clique em Selecionar papel.
- Pesquise e selecione Administrador de objetos do Storage.
- Clique em + Adicionar outro papel.
- Pesquise e selecione Invocador do Cloud Run.
- Clique em + Adicionar outro papel.
- Pesquise e selecione Invocador do Cloud Functions.
- Clique em Continuar.
- Clique em Concluído.
Esses papéis são necessários para:
- Administrador de objetos do Storage: grava registros no bucket do GCS e gerencia arquivos de estado.
- Invocador do Cloud Run: permite que o Pub/Sub invoque a função.
- Invocador do Cloud Functions: permite a invocação de funções
Conceder permissões do IAM no bucket do GCS
Conceda permissões de gravação à conta de serviço no bucket do GCS:
- Acesse Cloud Storage > Buckets.
- Clique no nome do bucket (
ibm-sim-logs-gcs). - Acesse a guia Permissões.
- Clique em Conceder acesso.
- Informe os seguintes detalhes de configuração:
- Adicionar principais: insira o e-mail da conta de serviço (
ibm-sim-collector-sa@PROJECT_ID.iam.gserviceaccount.com). - Atribuir papéis: selecione Administrador de objetos do Storage.
- Adicionar principais: insira o e-mail da conta de serviço (
- Clique em Salvar.
Criar tópico Pub/Sub
Crie um tópico do Pub/Sub em que o Cloud Scheduler vai publicar e a função do Cloud Run vai se inscrever.
- No Console do GCP, acesse Pub/Sub > Tópicos.
- Selecione Criar tópico.
- Informe os seguintes detalhes de configuração:
- ID do tópico: insira
ibm-sim-logs-trigger - Não mude as outras configurações.
- ID do tópico: insira
- Clique em Criar.
Criar uma função do Cloud Run para coletar registros
A função do Cloud Run será acionada por mensagens do Pub/Sub do Cloud Scheduler para buscar registros da API REST do IBM Security Identity Manager e gravá-los no GCS.
Para criar uma função do Cloud Run que colete registros, faça o seguinte:
- No console do GCP, acesse o Cloud Run.
- Clique em Criar serviço.
- Selecione Função (use um editor in-line para criar uma função).
Na seção Configurar, forneça os seguintes detalhes de configuração:
Configuração Valor Nome do serviço ibm-sim-collectorRegião Selecione a região que corresponde ao seu bucket do GCS (por exemplo, us-central1).Ambiente de execução Selecione Python 3.12 ou uma versão mais recente. Na seção Acionador (opcional):
- Clique em + Adicionar gatilho.
- Selecione Cloud Pub/Sub.
- Em Selecionar um tópico do Cloud Pub/Sub, escolha
ibm-sim-logs-trigger. - Clique em Salvar.
Na seção Autenticação:
- Selecione Exigir autenticação.
- Confira o Identity and Access Management (IAM).
Role a tela para baixo e expanda Contêineres, rede, segurança.
Acesse a guia Segurança:
- Conta de serviço: selecione
ibm-sim-collector-sa
- Conta de serviço: selecione
Acesse a guia Contêineres:
- Clique em Variáveis e secrets.
Clique em + Adicionar variável para cada variável de ambiente:
Nome da variável Valor de exemplo Descrição GCS_BUCKETibm-sim-logs-gcsNome do bucket do GCS GCS_PREFIXibm-simPrefixo para arquivos de registro STATE_KEYibm-sim/state.jsonCaminho do arquivo de estado ISIM_BASE_URLhttps://isim.example.com:9082URL base do servidor ISIM ISIM_USERNAMEsecops-api-userNome de usuário da API ISIM ISIM_PASSWORDyour-passwordSenha da API ISIM LOOKBACK_HOURS24Período de lookback inicial PAGE_SIZE100Página "Registros por API" MAX_RECORDS5000Máximo de registros por execução
Na seção Variáveis e secrets, role a tela para baixo até Solicitações:
- Tempo limite da solicitação: insira
600segundos (10 minutos)
- Tempo limite da solicitação: insira
Acesse a guia Configurações:
- Na seção Recursos:
- Memória: selecione 512 MiB ou mais.
- CPU: selecione 1
- Na seção Recursos:
Na seção Escalonamento de revisão:
- Número mínimo de instâncias: insira
0 - Número máximo de instâncias: insira
100
- Número mínimo de instâncias: insira
Clique em Criar.
Aguarde a criação do serviço (1 a 2 minutos).
Depois que o serviço for criado, o editor de código inline será aberto automaticamente.
Adicionar código da função
- Insira main no campo Ponto de entrada.
No editor de código em linha, crie dois arquivos:
main.py:
import functions_framework from google.cloud import storage import json import os import urllib3 import urllib.parse from datetime import datetime, timezone, timedelta # Disable SSL warnings for self-signed certificates urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) http = urllib3.PoolManager( timeout=urllib3.Timeout(connect=10.0, read=60.0), retries=False, cert_reqs='CERT_NONE', ) storage_client = storage.Client() GCS_BUCKET = os.environ.get('GCS_BUCKET') GCS_PREFIX = os.environ.get('GCS_PREFIX', 'ibm-sim') STATE_KEY = os.environ.get('STATE_KEY', 'ibm-sim/state.json') ISIM_BASE_URL = os.environ.get('ISIM_BASE_URL', '').rstrip('/') ISIM_USERNAME = os.environ.get('ISIM_USERNAME') ISIM_PASSWORD = os.environ.get('ISIM_PASSWORD') LOOKBACK_HOURS = int(os.environ.get('LOOKBACK_HOURS', '24')) PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '100')) MAX_RECORDS = int(os.environ.get('MAX_RECORDS', '5000')) @functions_framework.cloud_event def main(cloud_event): if not all([GCS_BUCKET, ISIM_BASE_URL, ISIM_USERNAME, ISIM_PASSWORD]): print('Error: Missing required environment variables') return try: bucket = storage_client.bucket(GCS_BUCKET) state = load_state(bucket) now = datetime.now(timezone.utc) if isinstance(state, dict) and state.get('last_event_time'): try: last_val = state['last_event_time'] if last_val.endswith('Z'): last_val = last_val[:-1] + '+00:00' last_time = datetime.fromisoformat(last_val) last_time = last_time - timedelta(minutes=2) except Exception as e: print(f"Warning: Could not parse last_event_time: {e}") last_time = now - timedelta(hours=LOOKBACK_HOURS) else: last_time = now - timedelta(hours=LOOKBACK_HOURS) print(f"Fetching logs from {last_time.isoformat()} to {now.isoformat()}") session_cookies = authenticate() csrf_token = get_csrf_token(session_cookies) activities = fetch_activities(session_cookies, csrf_token, last_time, now) requests_data = fetch_requests(session_cookies, csrf_token, last_time, now) all_records = [] for a in activities: a['_isim_record_type'] = 'activity' all_records.append(a) for r in requests_data: r['_isim_record_type'] = 'request' all_records.append(r) if not all_records: print("No new records found.") save_state(bucket, now.isoformat()) return timestamp = now.strftime('%Y%m%d_%H%M%S') object_key = f"{GCS_PREFIX}/ibm_sim_audit_{timestamp}.ndjson" blob = bucket.blob(object_key) ndjson = '\n'.join( [json.dumps(r, ensure_ascii=False, default=str) for r in all_records] ) + '\n' blob.upload_from_string(ndjson, content_type='application/x-ndjson') print(f"Wrote {len(all_records)} records to gs://{GCS_BUCKET}/{object_key}") newest = find_newest_time(all_records) save_state(bucket, newest if newest else now.isoformat()) print(f"Successfully processed {len(all_records)} records " f"(activities: {len(activities)}, requests: {len(requests_data)})") except Exception as e: print(f'Error processing logs: {str(e)}') raise def authenticate(): url = f"{ISIM_BASE_URL}/itim/j_security_check" encoded_body = urllib.parse.urlencode({ 'j_username': ISIM_USERNAME, 'j_password': ISIM_PASSWORD }).encode('utf-8') response = http.request( 'POST', url, body=encoded_body, headers={'Content-Type': 'application/x-www-form-urlencoded'}, redirect=False ) cookies = {} set_cookie_headers = response.headers.getlist('Set-Cookie') if hasattr(response.headers, 'getlist') else [] if not set_cookie_headers: raw = response.headers.get('Set-Cookie', '') if raw: set_cookie_headers = [raw] for cookie_header in set_cookie_headers: parts = cookie_header.split(';')[0] if '=' in parts: name, value = parts.split('=', 1) cookies[name.strip()] = value.strip() if not cookies: raise Exception( f"Authentication failed: no session cookies returned " f"(HTTP {response.status})" ) print("Successfully authenticated to ISIM") return cookies def get_csrf_token(cookies): url = f"{ISIM_BASE_URL}/itim/rest/systemusers/me" cookie_str = '; '.join([f"{k}={v}" for k, v in cookies.items()]) response = http.request( 'GET', url, headers={ 'Accept': 'application/json', 'Cookie': cookie_str } ) if response.status != 200: raise Exception( f"Failed to get CSRF token: {response.status} - " f"{response.data.decode('utf-8')}" ) data = json.loads(response.data.decode('utf-8')) csrf_token = None if isinstance(data, dict): csrf_token = data.get('_csrf', data.get('CSRFToken')) if not csrf_token: csrf_header = response.headers.get('CSRFToken', '') if csrf_header: csrf_token = csrf_header print("Successfully retrieved CSRF token") return csrf_token def fetch_activities(cookies, csrf_token, start_time, end_time): url = f"{ISIM_BASE_URL}/itim/rest/activities" cookie_str = '; '.join([f"{k}={v}" for k, v in cookies.items()]) headers = { 'Accept': 'application/json', 'Cookie': cookie_str } if csrf_token: headers['CSRFToken'] = csrf_token all_records = [] start_index = 0 while len(all_records) < MAX_RECORDS: params = { '_start': str(start_index), '_count': str(min(PAGE_SIZE, MAX_RECORDS - len(all_records))) } query_string = urllib.parse.urlencode(params) request_url = f"{url}?{query_string}" response = http.request('GET', request_url, headers=headers) if response.status != 200: print(f"Activities query failed: {response.status} - " f"{response.data.decode('utf-8')}") break page_results = json.loads(response.data.decode('utf-8')) if isinstance(page_results, dict): page_results = page_results.get('activities', page_results.get('items', [])) if not page_results: break all_records.extend(page_results) print(f"Activities: Retrieved {len(page_results)} records " f"(total: {len(all_records)})") if len(page_results) < PAGE_SIZE: break start_index += len(page_results) print(f"Total activities fetched: {len(all_records)}") return all_records def fetch_requests(cookies, csrf_token, start_time, end_time): url = f"{ISIM_BASE_URL}/itim/rest/requests" cookie_str = '; '.join([f"{k}={v}" for k, v in cookies.items()]) headers = { 'Accept': 'application/json', 'Cookie': cookie_str } if csrf_token: headers['CSRFToken'] = csrf_token all_records = [] start_index = 0 while len(all_records) < MAX_RECORDS: params = { '_start': str(start_index), '_count': str(min(PAGE_SIZE, MAX_RECORDS - len(all_records))) } query_string = urllib.parse.urlencode(params) request_url = f"{url}?{query_string}" response = http.request('GET', request_url, headers=headers) if response.status != 200: print(f"Requests query failed: {response.status} - " f"{response.data.decode('utf-8')}") break page_results = json.loads(response.data.decode('utf-8')) if isinstance(page_results, dict): page_results = page_results.get('requests', page_results.get('items', [])) if not page_results: break all_records.extend(page_results) print(f"Requests: Retrieved {len(page_results)} records " f"(total: {len(all_records)})") if len(page_results) < PAGE_SIZE: break start_index += len(page_results) print(f"Total requests fetched: {len(all_records)}") return all_records def find_newest_time(records): newest = None for r in records: for field in ['completedTime', 'scheduledTime', 'startedTime', 'lastModified', 'requestDate', 'timestamp']: t = r.get(field) if t: try: if isinstance(t, (int, float)): dt = datetime.fromtimestamp(t / 1000, tz=timezone.utc) t_iso = dt.isoformat() else: t_iso = str(t) if newest is None or t_iso > newest: newest = t_iso except Exception: continue return newest def load_state(bucket): try: blob = bucket.blob(STATE_KEY) if blob.exists(): return json.loads(blob.download_as_text()) except Exception as e: print(f"Warning: Could not load state: {e}") return {} def save_state(bucket, last_event_time_iso): try: state = { 'last_event_time': last_event_time_iso, 'last_run': datetime.now(timezone.utc).isoformat() } blob = bucket.blob(STATE_KEY) blob.upload_from_string( json.dumps(state, indent=2), content_type='application/json' ) print(f"Saved state: last_event_time={last_event_time_iso}") except Exception as e: print(f"Warning: Could not save state: {e}")requirements.txt:
functions-framework==3.* google-cloud-storage==2.* urllib3>=2.0.0
Clique em Implantar para salvar e implantar a função.
Aguarde a conclusão da implantação (2 a 3 minutos).
Criar o job do Cloud Scheduler
O Cloud Scheduler vai publicar mensagens no tópico do Pub/Sub em intervalos regulares, acionando a função do Cloud Run.
- No Console do GCP, acesse o Cloud Scheduler.
- Clique em Criar job.
Informe os seguintes detalhes de configuração:
Configuração Valor Nome ibm-sim-collector-hourlyRegião Selecione a mesma região da função do Cloud Run Frequência 0 * * * *(a cada hora, na hora)Fuso horário Selecione o fuso horário (UTC recomendado) Tipo de destino Pub/Sub Tópico Selecionar ibm-sim-logs-triggerCorpo da mensagem {}(objeto JSON vazio)Clique em Criar.
Opções de frequência de programação
Escolha a frequência com base no volume de registros e nos requisitos de latência:
| Frequência | Expressão Cron | Caso de uso |
|---|---|---|
| A cada 5 minutos | */5 * * * * |
Alto volume e baixa latência |
| A cada 15 minutos | */15 * * * * |
Volume médio |
| A cada hora | 0 * * * * |
Padrão (recomendado) |
| A cada 6 horas | 0 */6 * * * |
Baixo volume, processamento em lote |
| Diariamente | 0 0 * * * |
Coleta de dados históricos |
Testar a integração
- No console do Cloud Scheduler, encontre seu job (
ibm-sim-collector-hourly). - Clique em Forçar execução para acionar o job manualmente.
- Aguarde alguns segundos.
- Acesse Cloud Run > Serviços.
- Clique em
ibm-sim-collector. - Clique na guia Registros.
Verifique se a função foi executada com sucesso. Procure:
Fetching logs from YYYY-MM-DDTHH:MM:SS+00:00 to YYYY-MM-DDTHH:MM:SS+00:00 Successfully authenticated to ISIM Successfully retrieved CSRF token Activities: Retrieved X records (total: X) Requests: Retrieved X records (total: X) Wrote X records to gs://ibm-sim-logs-gcs/ibm-sim/ibm_sim_audit_YYYYMMDD_HHMMSS.ndjson Successfully processed X records (activities: X, requests: X)Acesse Cloud Storage > Buckets.
Clique em
ibm-sim-logs-gcs.Navegue até a pasta
ibm-sim/.Verifique se um novo arquivo
.ndjsonfoi criado com o carimbo de data/hora atual.
Se você encontrar erros nos registros:
- HTTP 401/302: verifique se as variáveis de ambiente
ISIM_USERNAMEeISIM_PASSWORDestão corretas e se a conta de usuário não está bloqueada. - HTTP 403: verifique se a conta de usuário do ISIM tem privilégios de administrador para acesso à API REST.
- HTTP 404: verifique se o
ISIM_BASE_URLestá correto e inclui o número da porta adequado. - Erros de conexão: verifique a conectividade de rede entre a função do Cloud Run e o dispositivo virtual do ISIM.
- Variáveis de ambiente ausentes: verifique se todas as variáveis necessárias estão definidas na configuração da função do Cloud Run.
Recuperar a conta de serviço do Google SecOps
O Google SecOps usa uma conta de serviço exclusiva para ler dados do seu bucket do GCS. Você precisa conceder a essa conta de serviço acesso ao seu bucket.
Receber o e-mail da conta de serviço
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo feed.
- Clique em Configurar um único feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
IBM SIM Logs GCS). - Selecione Google Cloud Storage V2 como o Tipo de origem.
- Selecione IBM Security Identity Manager como o Tipo de registro.
Clique em Receber conta de serviço. Um e-mail exclusivo da conta de serviço será exibido, por exemplo:
secops-12345678@secops-gcp-prod.iam.gserviceaccount.comCopie esse endereço de e-mail para usar na próxima etapa.
Clique em Próxima.
Especifique valores para os seguintes parâmetros de entrada:
URL do bucket de armazenamento: insira o URI do bucket do GCS com o caminho do prefixo:
gs://ibm-sim-logs-gcs/ibm-sim/
Opção de exclusão da fonte: selecione a opção de exclusão de acordo com sua preferência:
- Nunca: nunca exclui arquivos após as transferências (recomendado para testes).
- Excluir arquivos transferidos: exclui os arquivos após a transferência bem-sucedida.
Excluir arquivos transferidos e diretórios vazios: exclui arquivos e diretórios vazios após a transferência bem-sucedida.
Idade máxima do arquivo: inclui arquivos modificados nos últimos dias. O padrão é 180 dias.
Namespace do recurso: o namespace do recurso
Rótulos de ingestão: o rótulo a ser aplicado aos eventos deste feed
Clique em Próxima.
Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Conceder permissões do IAM à conta de serviço do Google SecOps
A conta de serviço do Google SecOps precisa do papel Leitor de objetos do Storage no seu bucket do GCS.
Para conceder permissões do IAM à conta de serviço do Google SecOps, faça o seguinte:
- Acesse Cloud Storage > Buckets.
- Clique em
ibm-sim-logs-gcs. - Acesse a guia Permissões.
- Clique em Conceder acesso.
- Informe os seguintes detalhes de configuração:
- Adicionar principais: cole o e-mail da conta de serviço do Google SecOps.
- Atribuir papéis: selecione Leitor de objetos do Storage.
- Clique em Salvar.
Tabela de mapeamento do UDM
| Campo de registro | Mapeamento do UDM | Lógica |
|---|---|---|
| descrição | metadata.description | A descrição do evento |
| receivedTime | metadata.event_timestamp | Carimbo de data/hora em que o evento ocorreu |
| metadata.event_type | metadata.event_type | Tipo de evento |
| version | metadata.product_version | Versão do produto |
| connectionId | network.session_id | Identificador de sessão |
| tls | network.tls.version | Versão TLS |
| ip | principal.asset.ip | Endereço IP do recurso |
| nome do host | principal.hostname | Nome do host |
| ip | principal.ip | Endereço IP |
| porta | principal.port | Número da porta |
| kv_data | principal.user.attribute.labels | Rótulos de atributos do usuário |
| ação | security_result.action_details | Detalhes da ação realizada |
| metadata.product_name | metadata.product_name | Nome do produto |
| metadata.vendor_name | metadata.vendor_name | Nome do fornecedor |
Registro de alterações
Ver o registro de alterações deste analisador
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.