// gov_source / gov_opportunity 적재 로직. 중복 제거는 (source_code, external_id) 유니크 키로 한다. import { withConnection, oracledb } from '../db.js'; import { log } from '../logger.js'; function clobBind(val) { return { dir: oracledb.BIND_IN, type: oracledb.DB_TYPE_CLOB, val: val ?? null }; } /** * 소스를 upsert 하고 RAWTOHEX(id) 를 반환한다. */ export async function ensureSource({ code, name, baseUrl, type, config }) { return withConnection(async (conn) => { await conn.execute( `MERGE INTO gov_source t USING (SELECT :code AS code FROM dual) s ON (t.code = s.code) WHEN MATCHED THEN UPDATE SET name = :name, base_url = :baseUrl, type = :type, config = :config, updated_at = SYSTIMESTAMP WHEN NOT MATCHED THEN INSERT (id, code, name, base_url, type, config, active, created_at, updated_at) VALUES (SYS_GUID(), :code, :name, :baseUrl, :type, :config, 1, SYSTIMESTAMP, SYSTIMESTAMP)`, { code, name, baseUrl: baseUrl ?? null, type, config: clobBind(config ? JSON.stringify(config) : null), } ); await conn.commit(); const r = await conn.execute( `SELECT RAWTOHEX(id) AS id, active FROM gov_source WHERE code = :code`, { code } ); return { id: r.rows[0][0], active: r.rows[0][1] === 1 }; }); } /** * 활성 소스 목록을 반환한다. */ export async function listActiveSources() { return withConnection(async (conn) => { const r = await conn.execute( `SELECT RAWTOHEX(id) AS id, code, name, base_url, type, config FROM gov_source WHERE active = 1 ORDER BY code`, {}, { outFormat: oracledb.OUT_FORMAT_OBJECT } ); return r.rows.map((row) => ({ id: row.ID, code: row.CODE, name: row.NAME, baseUrl: row.BASE_URL, type: row.TYPE, config: row.CONFIG ? JSON.parse(row.CONFIG) : {}, })); }); } /** * 목록 단계 공고들을 dedup-merge 한다. 기존 행의 본문/상세 상태는 보존한다. * @returns {{inserted:number, updated:number}} */ export async function upsertOpportunities(sourceIdHex, sourceCode, items) { if (!items || items.length === 0) return { inserted: 0, updated: 0 }; return withConnection(async (conn) => { let inserted = 0; let updated = 0; // 신규/갱신 판별을 위해 기존 external_id 를 한 번에 로드(행당 SELECT 제거). const existing = new Set(); { const r = await conn.execute( `SELECT external_id FROM gov_opportunity WHERE source_code = :sc`, { sc: sourceCode } ); for (const row of r.rows) existing.add(String(row[0])); } for (const it of items) { if (!it.externalId || !it.title) { throw new Error( `필수 필드 누락 (externalId/title): ${JSON.stringify(it).slice(0, 200)}` ); } const isNew = !existing.has(String(it.externalId)); const hasBody = it.body && it.body.trim() ? 1 : 0; // body 가 있으면(API 처럼) 목록 단계에서 바로 본문 저장 → 상태 DETAILED. // 기존 행 갱신 시 body 가 없으면 기존 본문/상태를 보존한다. await conn.execute( `MERGE INTO gov_opportunity t USING (SELECT :sourceCode AS source_code, :externalId AS external_id FROM dual) s ON (t.source_code = s.source_code AND t.external_id = s.external_id) WHEN MATCHED THEN UPDATE SET title = :title, agency = :agency, category = :category, target = :target, apply_start = :applyStart, apply_end = :applyEnd, detail_url = :detailUrl, raw_json = :rawJson, body_text = CASE WHEN :hasBody = 1 THEN :body ELSE body_text END, status = CASE WHEN :hasBody = 1 THEN 'DETAILED' ELSE status END, detail_collected_at = CASE WHEN :hasBody = 1 THEN SYSTIMESTAMP ELSE detail_collected_at END, updated_at = SYSTIMESTAMP WHEN NOT MATCHED THEN INSERT (id, source_id, source_code, external_id, title, agency, category, target, apply_start, apply_end, detail_url, raw_json, body_text, status, list_collected_at, detail_collected_at, created_at, updated_at) VALUES (SYS_GUID(), HEXTORAW(:sourceId), :sourceCode, :externalId, :title, :agency, :category, :target, :applyStart, :applyEnd, :detailUrl, :rawJson, :body, CASE WHEN :hasBody = 1 THEN 'DETAILED' ELSE 'LISTED' END, SYSTIMESTAMP, CASE WHEN :hasBody = 1 THEN SYSTIMESTAMP ELSE NULL END, SYSTIMESTAMP, SYSTIMESTAMP)`, { sourceId: sourceIdHex, sourceCode, externalId: String(it.externalId), title: it.title.slice(0, 1000), agency: it.agency ? it.agency.slice(0, 300) : null, category: it.category ? it.category.slice(0, 200) : null, target: it.target ? it.target.slice(0, 1000) : null, applyStart: it.applyStart ?? null, applyEnd: it.applyEnd ?? null, detailUrl: it.detailUrl ? it.detailUrl.slice(0, 1000) : null, rawJson: clobBind(it.raw ? JSON.stringify(it.raw) : null), body: clobBind(hasBody ? it.body : null), hasBody, } ); if (isNew) inserted += 1; else updated += 1; } await conn.commit(); return { processed: items.length, inserted, updated }; }); } /** * 상세 본문 미수집(LISTED) 공고를 가져온다. */ export async function findPendingDetail(sourceCode, limit) { return withConnection(async (conn) => { const r = await conn.execute( `SELECT RAWTOHEX(id) AS id, external_id, detail_url FROM gov_opportunity WHERE source_code = :sourceCode AND status = 'LISTED' AND detail_url IS NOT NULL ORDER BY created_at FETCH FIRST :lim ROWS ONLY`, { sourceCode, lim: limit }, { outFormat: oracledb.OUT_FORMAT_OBJECT } ); return r.rows.map((row) => ({ id: row.ID, externalId: row.EXTERNAL_ID, detailUrl: row.DETAIL_URL, })); }); } /** * 상세 본문을 저장하고 상태를 DETAILED 로 갱신한다. */ export async function saveDetail(idHex, bodyText) { return withConnection(async (conn) => { await conn.execute( `UPDATE gov_opportunity SET body_text = :body, status = 'DETAILED', detail_collected_at = SYSTIMESTAMP, updated_at = SYSTIMESTAMP WHERE id = HEXTORAW(:id)`, { body: clobBind(bodyText), id: idHex } ); await conn.commit(); }); } /** * 상세 수집 실패 표시. */ export async function markDetailError(idHex) { return withConnection(async (conn) => { await conn.execute( `UPDATE gov_opportunity SET status = 'ERROR', updated_at = SYSTIMESTAMP WHERE id = HEXTORAW(:id)`, { id: idHex } ); await conn.commit(); }); } export async function markSourceCrawled(sourceIdHex) { return withConnection(async (conn) => { await conn.execute( `UPDATE gov_source SET last_crawled_at = SYSTIMESTAMP, updated_at = SYSTIMESTAMP WHERE id = HEXTORAW(:id)`, { id: sourceIdHex } ); await conn.commit(); }); }