Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
INSERT INTO "activityTypes" ("activityType", platform, "isCodeContribution", "isCollaboration", description, "label") VALUES
('enrolled-certification', 'tnc', false, false, 'Successful payment purchase of certification enrollment', 'Enrolled in certification'),
('enrolled-training', 'tnc', false, false, 'Successful payment purchase of training enrollment', 'Enrolled in training'),
('issued-certification', 'tnc', false, false, 'User is granted a certification', 'Issued certification'),
('attempted-course', 'tnc', false, false, 'Certification course is completed', 'Attempted course'),
('attempted-exam', 'tnc', false, false, 'Certification exam is completed', 'Attempted exam');
12 changes: 11 additions & 1 deletion services/apps/snowflake_connectors/src/core/transformerBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,17 @@ export abstract class TransformerBase {
try {
return this.transformRow(row)
} catch (err) {
log.warn({ err, platform: this.platform }, 'Failed to transform row, skipping')
const message = err instanceof Error ? err.message : String(err)
const stack = err instanceof Error ? err.stack : undefined
log.warn(
{
errMessage: message,
errStack: stack,
platform: this.platform,
rowKeys: Object.keys(row),
},
'Failed to transform row, skipping',
)
return null
}
}
Expand Down
25 changes: 25 additions & 0 deletions services/apps/snowflake_connectors/src/integrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ import { PlatformType } from '@crowd/types'

import { buildSourceQuery as cventBuildSourceQuery } from './cvent/event-registrations/buildSourceQuery'
import { CventTransformer } from './cvent/event-registrations/transformer'
import { buildSourceQuery as tncCertificatesBuildQuery } from './tnc/certificates/buildSourceQuery'
import { TncCertificatesTransformer } from './tnc/certificates/transformer'
import { buildSourceQuery as tncCoursesBuildQuery } from './tnc/courses/buildSourceQuery'
import { TncCoursesTransformer } from './tnc/courses/transformer'
import { buildSourceQuery as tncEnrollmentsBuildQuery } from './tnc/enrollments/buildSourceQuery'
import { TncEnrollmentsTransformer } from './tnc/enrollments/transformer'
import { DataSource, DataSourceName, PlatformDefinition } from './types'

export type { BuildSourceQuery, DataSource, PlatformDefinition } from './types'
Expand All @@ -23,6 +29,25 @@ const supported: Partial<Record<PlatformType, PlatformDefinition>> = {
},
],
},
[PlatformType.TNC]: {
sources: [
{
name: DataSourceName.TNC_ENROLLMENTS,
buildSourceQuery: tncEnrollmentsBuildQuery,
transformer: new TncEnrollmentsTransformer(),
},
{
name: DataSourceName.TNC_CERTIFICATES,
buildSourceQuery: tncCertificatesBuildQuery,
transformer: new TncCertificatesTransformer(),
},
{
name: DataSourceName.TNC_COURSES,
buildSourceQuery: tncCoursesBuildQuery,
transformer: new TncCoursesTransformer(),
},
],
},
}

const enabled = (process.env.CROWD_SNOWFLAKE_ENABLED_PLATFORMS || '')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { IS_PROD_ENV } from '@crowd/common'

// Main: analytics.silver_fact.certificates (certificate data)
// Joins:
// - analytics.silver_dim._crowd_dev_segments_union (segment resolution)
// - analytics.bronze_fivetran_salesforce.bronze_salesforce_merged_user (LFID)
// - analytics.silver_dim.users (LFID fallback)
// - analytics.bronze_fivetran_salesforce.accounts + analytics.bronze_fivetran_salesforce_b2b.accounts (org data)

const CDP_MATCHED_SEGMENTS = `
cdp_matched_segments AS (
SELECT DISTINCT
s.SOURCE_ID AS sourceId,
s.slug
FROM ANALYTICS.SILVER_DIM._CROWD_DEV_SEGMENTS_UNION s
WHERE s.PARENT_SLUG IS NOT NULL
AND s.GRANDPARENTS_SLUG IS NOT NULL
AND s.SOURCE_ID IS NOT NULL
)`

const ORG_ACCOUNTS = `
org_accounts AS (
SELECT account_id, account_name, website, domain_aliases, LOGO_URL, INDUSTRY, N_EMPLOYEES
FROM analytics.bronze_fivetran_salesforce.accounts
WHERE website IS NOT NULL
UNION ALL
SELECT account_id, account_name, website, domain_aliases, NULL AS LOGO_URL, NULL AS INDUSTRY, NULL AS N_EMPLOYEES
FROM analytics.bronze_fivetran_salesforce_b2b.accounts
WHERE website IS NOT NULL
)`

const LFID_COALESCE = `COALESCE(mu.user_name, u.lf_username)`

export const buildSourceQuery = (sinceTimestamp?: string): string => {
let select = `
SELECT
c.*,
cms.slug AS PROJECT_SLUG,
org.account_name AS ORGANIZATION_NAME,
org.website AS ORG_WEBSITE,
org.domain_aliases AS ORG_DOMAIN_ALIASES,
org.logo_url AS LOGO_URL,
org.industry AS ORGANIZATION_INDUSTRY,
CAST(org.n_employees AS VARCHAR) AS ORGANIZATION_SIZE,
${LFID_COALESCE} AS LFID
FROM analytics.silver_fact.certificates c
INNER JOIN cdp_matched_segments cms
ON cms.sourceId = c.project_id
LEFT JOIN analytics.bronze_fivetran_salesforce.bronze_salesforce_merged_user mu
ON c.user_id = mu.user_id
AND mu.user_name IS NOT NULL
LEFT JOIN analytics.silver_dim.users u
ON LOWER(c.user_email) = LOWER(u.email)
AND u.lf_username IS NOT NULL
LEFT JOIN org_accounts org
ON c.account_id = org.account_id
WHERE c.user_email IS NOT NULL`

// Limit to a single project in non-prod to avoid exporting all projects data
if (!IS_PROD_ENV) {
select += ` AND cms.slug = 'cncf'`
}
Comment on lines +60 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: Can we add a comment here just to explain that this is being added for staging/testing purposes?


const dedup = `
QUALIFY ROW_NUMBER() OVER (PARTITION BY c.certificate_id ORDER BY org.website DESC) = 1`

if (!sinceTimestamp) {
return `
WITH ${ORG_ACCOUNTS},
${CDP_MATCHED_SEGMENTS}
${select}
${dedup}`.trim()
}

return `
WITH ${ORG_ACCOUNTS},
${CDP_MATCHED_SEGMENTS},
new_cdp_segments AS (
SELECT DISTINCT
s.SOURCE_ID AS sourceId,
s.slug
FROM ANALYTICS.SILVER_DIM._CROWD_DEV_SEGMENTS_UNION s
WHERE s.CREATED_TS >= '${sinceTimestamp}'
AND s.PARENT_SLUG IS NOT NULL
AND s.GRANDPARENTS_SLUG IS NOT NULL
AND s.SOURCE_ID IS NOT NULL
)

-- New certificates since last export
${select}
AND c.issued_ts >= '${sinceTimestamp}'
${dedup}

UNION

-- All certificates in newly created segments
${select}
AND EXISTS (
SELECT 1 FROM new_cdp_segments ncs
WHERE ncs.slug = cms.slug AND ncs.sourceId = cms.sourceId
)
${dedup}`.trim()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { TNC_GRID, TncActivityType } from '@crowd/integrations'
import { getServiceChildLogger } from '@crowd/logging'
import {
IActivityData,
IMemberData,
MemberAttributeName,
MemberIdentityType,
PlatformType,
} from '@crowd/types'

import { TransformedActivity } from '../../../core/transformerBase'
import { TncTransformerBase } from '../tncTransformerBase'

const log = getServiceChildLogger('tncCertificatesTransformer')

export class TncCertificatesTransformer extends TncTransformerBase {
transformRow(row: Record<string, unknown>): TransformedActivity | null {
const email = (row.USER_EMAIL as string | null)?.trim() || null
if (!email) {
log.debug({ certificateId: row.CERTIFICATE_ID }, 'Skipping row: missing email')
return null
}

const certificateId = (row.CERTIFICATE_ID as string)?.trim()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing sourceId validation in certificates and enrollments transformers

Medium Severity

The certificates transformer extracts certificateId via (row.CERTIFICATE_ID as string)?.trim() and the enrollments transformer extracts enrollmentId via (row.ENROLLMENT_ID as string)?.trim() — neither validates the value before assigning it to activity.sourceId. If the row value is null/undefined, the result is undefined, violating the IActivityData interface where sourceId: string is required (not optional). The courses transformer correctly validates courseActionId and returns null if missing, but the other two transformers skip this check, risking activities with undefined sourceId being passed downstream.

Additional Locations (1)

Fix in Cursor Fix in Web

const learnerName = (row.LEARNER_NAME as string | null)?.trim() || null
const lfUsername = (row.LFID as string | null)?.trim() || null

const identities: IMemberData['identities'] = []
const sourceId = (row.USER_ID as string | null) || undefined

if (lfUsername) {
identities.push(
{
platform: PlatformType.TNC,
value: email,
type: MemberIdentityType.EMAIL,
verified: true,
sourceId,
},
{
platform: PlatformType.TNC,
value: lfUsername,
type: MemberIdentityType.USERNAME,
verified: true,
sourceId,
},
{
platform: PlatformType.LFID,
value: lfUsername,
type: MemberIdentityType.USERNAME,
verified: true,
sourceId,
},
)
} else {
identities.push({
platform: PlatformType.TNC,
value: email,
type: MemberIdentityType.USERNAME,
verified: true,
sourceId,
})
}

const activity: IActivityData = {
type: TncActivityType.ISSUED_CERTIFICATION,
platform: PlatformType.TNC,
timestamp: (row.ISSUED_TS as string | null) || null,
score: TNC_GRID[TncActivityType.ISSUED_CERTIFICATION].score,
sourceId: certificateId,
sourceParentId: (row.COURSE_ID as string | null) || undefined,
member: {
displayName: learnerName || email.split('@')[0],
identities,
organizations: this.buildOrganizations(row),
attributes: {
...((row.JOB_TITLE as string | null) && {
[MemberAttributeName.JOB_TITLE]: { [PlatformType.TNC]: row.JOB_TITLE as string },
}),
...((row.USER_COUNTRY as string | null) && {
[MemberAttributeName.COUNTRY]: { [PlatformType.TNC]: row.USER_COUNTRY as string },
}),
},
},
attributes: {
productName: (row.COURSE_NAME as string | null) || null,
technology: (row.TECHNOLOGIES_LIST as string | null) || null,
didExpire: row.DID_EXPIRE as boolean | null,
expirationDate: (row.EXPIRATION_DATE as string | null) || null,
},
}

const segmentSlug = (row.PROJECT_SLUG as string | null)?.trim() || null
const segmentSourceId = (row.PROJECT_ID as string | null)?.trim() || null
if (!segmentSlug || !segmentSourceId) {
return null
}

return { activity, segment: { slug: segmentSlug, sourceId: segmentSourceId } }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { IS_PROD_ENV } from '@crowd/common'

// Main: analytics.bronze_census_ti.course_actions (course action data)
// Joins:
// - analytics.bronze_census_ti.users (user resolution via internal_ti_user_id)
// - analytics.bronze_census_ti.courses (course metadata)
// - analytics.silver_fact.enrollments (segment + org resolution via email + course_id)
// - analytics.silver_dim._crowd_dev_segments_union (segment resolution)
// - analytics.bronze_fivetran_salesforce.accounts + analytics.bronze_fivetran_salesforce_b2b.accounts (org data)

const CDP_MATCHED_SEGMENTS = `
cdp_matched_segments AS (
SELECT DISTINCT
s.SOURCE_ID AS sourceId,
s.slug
FROM ANALYTICS.SILVER_DIM._CROWD_DEV_SEGMENTS_UNION s
WHERE s.PARENT_SLUG IS NOT NULL
AND s.GRANDPARENTS_SLUG IS NOT NULL
AND s.SOURCE_ID IS NOT NULL
)`

const ORG_ACCOUNTS = `
org_accounts AS (
SELECT account_id, account_name, website, domain_aliases, LOGO_URL, INDUSTRY, N_EMPLOYEES
FROM analytics.bronze_fivetran_salesforce.accounts
WHERE website IS NOT NULL
UNION ALL
SELECT account_id, account_name, website, domain_aliases, NULL AS LOGO_URL, NULL AS INDUSTRY, NULL AS N_EMPLOYEES
FROM analytics.bronze_fivetran_salesforce_b2b.accounts
WHERE website IS NOT NULL
)`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL CTE constants duplicated across three files

Low Severity

The CDP_MATCHED_SEGMENTS and ORG_ACCOUNTS SQL CTE constants are identically copy-pasted across all three TNC buildSourceQuery files (certificates, courses, enrollments). LFID_COALESCE is also duplicated between certificates and enrollments. These could live in a shared TNC SQL fragments module to avoid divergence during future updates.

Additional Locations (2)

Fix in Cursor Fix in Web


export const buildSourceQuery = (sinceTimestamp?: string): string => {
let select = `
SELECT
ca.*,
co.*,
tu.user_email,
tu.lfid,
tu.learner_name,
tu.user_country,
tu.job_title,
e.project_slug AS PROJECT_SLUG,
e.project_id AS PROJECT_ID,
e.account_id,
org.account_name AS ORGANIZATION_NAME,
org.website AS ORG_WEBSITE,
org.domain_aliases AS ORG_DOMAIN_ALIASES,
org.logo_url AS LOGO_URL,
org.industry AS ORGANIZATION_INDUSTRY,
CAST(org.n_employees AS VARCHAR) AS ORGANIZATION_SIZE
FROM analytics.bronze_census_ti.course_actions ca
INNER JOIN analytics.bronze_census_ti.users tu
ON ca.internal_ti_user_id = tu.internal_ti_user_id
INNER JOIN analytics.bronze_census_ti.courses co
ON ca.course_id = co.course_id
INNER JOIN analytics.silver_fact.enrollments e
ON e.course_id = ca.course_id
AND LOWER(e.user_email) = LOWER(tu.user_email)
INNER JOIN cdp_matched_segments cms
ON cms.slug = e.project_slug
AND cms.sourceId = e.project_id
LEFT JOIN org_accounts org
ON e.account_id = org.account_id
WHERE ca.type = 'status_change'
AND ca.source = 'course_started'
AND co.is_test_or_archived = false
AND tu.user_email IS NOT NULL`

// Limit to a single project in non-prod to avoid exporting all projects data
if (!IS_PROD_ENV) {
select += ` AND e.project_slug = 'cncf'`
}

const dedup = `
QUALIFY ROW_NUMBER() OVER (PARTITION BY ca.course_action_id ORDER BY org.website DESC) = 1`

if (!sinceTimestamp) {
return `
WITH ${ORG_ACCOUNTS},
${CDP_MATCHED_SEGMENTS}
${select}
${dedup}`.trim()
}

return `
WITH ${ORG_ACCOUNTS},
${CDP_MATCHED_SEGMENTS},
new_cdp_segments AS (
SELECT DISTINCT
s.SOURCE_ID AS sourceId,
s.slug
FROM ANALYTICS.SILVER_DIM._CROWD_DEV_SEGMENTS_UNION s
WHERE s.CREATED_TS >= '${sinceTimestamp}'
AND s.PARENT_SLUG IS NOT NULL
AND s.GRANDPARENTS_SLUG IS NOT NULL
AND s.SOURCE_ID IS NOT NULL
)

-- New course actions since last export
${select}
AND ca.timestamp >= '${sinceTimestamp}'
${dedup}

UNION

-- All course actions in newly created segments
${select}
AND EXISTS (
SELECT 1 FROM new_cdp_segments ncs
WHERE ncs.slug = cms.slug AND ncs.sourceId = cms.sourceId
)
${dedup}`.trim()
}
Loading
Loading